< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java

Print this page




   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 

  28 import java.io.IOException;

  29 import java.net.InetSocketAddress;
  30 import java.net.URI;
  31 import jdk.incubator.http.HttpConnection.Mode;
  32 import java.nio.ByteBuffer;
  33 import java.nio.charset.StandardCharsets;
  34 import java.util.HashMap;
  35 import java.util.Iterator;
  36 import java.util.LinkedList;
  37 import java.util.List;
  38 import java.util.Map;
  39 import java.util.concurrent.CompletableFuture;
  40 import java.util.ArrayList;
  41 import java.util.Collections;
  42 import java.util.Formatter;
  43 import java.util.concurrent.ConcurrentHashMap;
  44 import java.util.concurrent.CountDownLatch;
  45 import java.util.stream.Collectors;


  46 import javax.net.ssl.SSLEngine;
  47 import jdk.incubator.http.internal.common.*;
  48 import jdk.incubator.http.internal.frame.*;






















  49 import jdk.incubator.http.internal.hpack.Encoder;
  50 import jdk.incubator.http.internal.hpack.Decoder;
  51 import jdk.incubator.http.internal.hpack.DecodingCallback;
  52 
  53 import static jdk.incubator.http.internal.frame.SettingsFrame.*;
  54 
  55 
  56 /**
  57  * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
  58  * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
  59  *
  60  * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
  61  * to a HttpClientImpl.
  62  *
  63  * Creation cases:
  64  * 1) upgraded HTTP/1.1 plain tcp connection
  65  * 2) prior knowledge directly created plain tcp connection
  66  * 3) directly created HTTP/2 SSL connection which uses ALPN.
  67  *
  68  * Sending is done by writing directly to underlying HttpConnection object which
  69  * is operating in async mode. No flow control applies on output at this level
  70  * and all writes are just executed as puts to an output Q belonging to HttpConnection
  71  * Flow control is implemented by HTTP/2 protocol itself.
  72  *
  73  * Hpack header compression
  74  * and outgoing stream creation is also done here, because these operations
  75  * must be synchronized at the socket level. Stream objects send frames simply
  76  * by placing them on the connection's output Queue. sendFrame() is called
  77  * from a higher level (Stream) thread.
  78  *
  79  * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
  80  * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
  81  * or handles them directly itself. This thread performs hpack decompression
  82  * and incoming stream creation (Server push). Incoming frames destined for a
  83  * stream are provided by calling Stream.incoming().
  84  */
  85 class Http2Connection  {










  86     /*
  87      *  ByteBuffer pooling strategy for HTTP/2 protocol:
  88      *
  89      * In general there are 4 points where ByteBuffers are used:
  90      *  - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data
  91      *    in case of SSL connection.
  92      *
  93      * 1. Outgoing frames encoded to ByteBuffers.
  94      *    Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
  95      *    At this place no pools at all. All outgoing buffers should be collected by GC.
  96      *
  97      * 2. Incoming ByteBuffers (decoded to frames).
  98      *    Here, total elimination of BB pool is not a good idea.
  99      *    We don't know how many bytes we will receive through network.
 100      * So here we allocate buffer of reasonable size. The following life of the BB:
 101      * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses)
 102      *     BB is returned to pool,
 103      * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method.
 104      *     Such BB is never returned to pool and will be GCed.
 105      * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and
 106      *     the buffer could be release to pool.
 107      *
 108      * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool,
 109      *    because of we can't predict size encrypted packets.
 110      *
 111      */
 112 
 113 
 114     // A small class that allows to control frames with respect to the state of
 115     // the connection preface. Any data received before the connection
 116     // preface is sent will be buffered.
 117     private final class FramesController {
 118         volatile boolean prefaceSent;
 119         volatile List<ByteBufferReference> pending;
 120 
 121         boolean processReceivedData(FramesDecoder decoder, ByteBufferReference buf)
 122                 throws IOException
 123         {
 124             // if preface is not sent, buffers data in the pending list
 125             if (!prefaceSent) {


 126                 synchronized (this) {
 127                     if (!prefaceSent) {
 128                         if (pending == null) pending = new ArrayList<>();
 129                         pending.add(buf);



 130                         return false;
 131                     }
 132                 }
 133             }
 134 
 135             // Preface is sent. Checks for pending data and flush it.
 136             // We rely on this method being called from within the readlock,
 137             // so we know that no other thread could execute this method
 138             // concurrently while we're here.
 139             // This ensures that later incoming buffers will not
 140             // be processed before we have flushed the pending queue.
 141             // No additional synchronization is therefore necessary here.
 142             List<ByteBufferReference> pending = this.pending;
 143             this.pending = null;
 144             if (pending != null) {
 145                 // flush pending data
 146                 for (ByteBufferReference b : pending) {


 147                     decoder.decode(b);
 148                 }
 149             }
 150 
 151             // push the received buffer to the frames decoder.


 152             decoder.decode(buf);

 153             return true;
 154         }
 155 
 156         // Mark that the connection preface is sent
 157         void markPrefaceSent() {
 158             assert !prefaceSent;
 159             synchronized (this) {
 160                 prefaceSent = true;
 161             }
 162         }
 163 
 164     }
 165 
 166     volatile boolean closed;
 167 
 168     //-------------------------------------
 169     final HttpConnection connection;
 170     private final HttpClientImpl client;
 171     private final Http2ClientImpl client2;
 172     private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
 173     private int nextstreamid;
 174     private int nextPushStream = 2;
 175     private final Encoder hpackOut;
 176     private final Decoder hpackIn;
 177     final SettingsFrame clientSettings;
 178     private volatile SettingsFrame serverSettings;
 179     private final String key; // for HttpClientImpl.connections map
 180     private final FramesDecoder framesDecoder;
 181     private final FramesEncoder framesEncoder = new FramesEncoder();
 182 
 183     /**
 184      * Send Window controller for both connection and stream windows.
 185      * Each of this connection's Streams MUST use this controller.
 186      */
 187     private final WindowController windowController = new WindowController();
 188     private final FramesController framesController = new FramesController();

 189     final WindowUpdateSender windowUpdater;


 190 
 191     static final int DEFAULT_FRAME_SIZE = 16 * 1024;
 192 
 193 
 194     // TODO: need list of control frames from other threads
 195     // that need to be sent
 196 
 197     private Http2Connection(HttpConnection connection,
 198                             Http2ClientImpl client2,
 199                             int nextstreamid,
 200                             String key) {
 201         this.connection = connection;
 202         this.client = client2.client();
 203         this.client2 = client2;
 204         this.nextstreamid = nextstreamid;
 205         this.key = key;
 206         this.clientSettings = this.client2.getClientSettings();
 207         this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
 208         // serverSettings will be updated by server
 209         this.serverSettings = SettingsFrame.getDefaultSettings();
 210         this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 211         this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 212         this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize());



 213     }
 214 
 215     /**
 216      * Case 1) Create from upgraded HTTP/1.1 connection.
 217      * Is ready to use. Will not be SSL. exchange is the Exchange
 218      * that initiated the connection, whose response will be delivered
 219      * on a Stream.
 220      */
 221     Http2Connection(HttpConnection connection,
 222                     Http2ClientImpl client2,
 223                     Exchange<?> exchange,
 224                     ByteBuffer initial)
 225         throws IOException, InterruptedException
 226     {
 227         this(connection,
 228                 client2,
 229                 3, // stream 1 is registered during the upgrade
 230                 keyFor(connection));
 231         assert !(connection instanceof SSLConnection);
 232         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 233 
 234         Stream<?> initialStream = createStream(exchange);
 235         initialStream.registerStream(1);
 236         windowController.registerStream(1, getInitialSendWindowSize());
 237         initialStream.requestSent();





 238         sendConnectionPreface();
 239         // start reading and writing
 240         // start reading
 241         AsyncConnection asyncConn = (AsyncConnection)connection;
 242         asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
 243         connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
 244         asyncReceive(ByteBufferReference.of(initial));
 245         asyncConn.startReading();
 246     }
 247 
 248     // async style but completes immediately


 249     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
 250                                                           Http2ClientImpl client2,
 251                                                           Exchange<?> exchange,
 252                                                           ByteBuffer initial) {

 253         return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
 254     }
 255 























 256     /**
 257      * Cases 2) 3)
 258      *
 259      * request is request to be sent.
 260      */
 261     Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client)
 262         throws IOException, InterruptedException


 263     {
 264         this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true),
 265                 h2client,
 266                 1,
 267                 keyFor(request.uri(), request.proxy(h2client.client())));

 268         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 269 
 270         // start reading
 271         AsyncConnection asyncConn = (AsyncConnection)connection;
 272         asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
 273         connection.connect();
 274         checkSSLConfig();
 275         // safe to resume async reading now.
 276         asyncConn.enableCallback();
 277         sendConnectionPreface();
 278     }
 279 










 280     /**
 281      * Throws an IOException if h2 was not negotiated
 282      */
 283     private void checkSSLConfig() throws IOException {
 284         AbstractAsyncSSLConnection aconn = (AbstractAsyncSSLConnection)connection;



 285         SSLEngine engine = aconn.getEngine();
 286         String alpn = engine.getApplicationProtocol();



 287         if (alpn == null || !alpn.equals("h2")) {
 288             String msg;
 289             if (alpn == null) {
 290                 Log.logSSL("ALPN not supported");
 291                 msg = "ALPN not supported";
 292             } else switch (alpn) {

 293               case "":
 294                 Log.logSSL("No ALPN returned");
 295                 msg = "No ALPN negotiated";
 296                 break;
 297               case "http/1.1":
 298                 Log.logSSL("HTTP/1.1 ALPN returned");
 299                 msg = "HTTP/1.1 ALPN returned";
 300                 break;
 301               default:
 302                 Log.logSSL("unknown ALPN returned");
 303                 msg = "Unexpected ALPN: " + alpn;
 304                 throw new IOException(msg);
 305             }
 306             throw new ALPNException(msg, aconn);
 307         }








 308     }
 309 
 310     static String keyFor(HttpConnection connection) {
 311         boolean isProxy = connection.isProxied();
 312         boolean isSecure = connection.isSecure();
 313         InetSocketAddress addr = connection.address();
 314 
 315         return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
 316     }
 317 
 318     static String keyFor(URI uri, InetSocketAddress proxy) {
 319         boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
 320         boolean isProxy = proxy != null;
 321 
 322         String host;
 323         int port;
 324 
 325         if (isProxy) {
 326             host = proxy.getHostString();
 327             port = proxy.getPort();
 328         } else {
 329             host = uri.getHost();
 330             port = uri.getPort();
 331         }
 332         return keyString(isSecure, isProxy, host, port);
 333     }
 334 
 335     // {C,S}:{H:P}:host:port
 336     // C indicates clear text connection "http"
 337     // S indicates secure "https"
 338     // H indicates host (direct) connection
 339     // P indicates proxy
 340     // Eg: "S:H:foo.com:80"
 341     static String keyString(boolean secure, boolean proxy, String host, int port) {
 342         return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
 343     }
 344 
 345     String key() {
 346         return this.key;
 347     }
 348 
 349     void putConnection() {
 350         client2.putConnection(this);
 351     }
 352 
 353     private static String toHexdump1(ByteBuffer bb) {
 354         bb.mark();
 355         StringBuilder sb = new StringBuilder(512);
 356         Formatter f = new Formatter(sb);
 357 
 358         while (bb.hasRemaining()) {
 359             int i =  Byte.toUnsignedInt(bb.get());
 360             f.format("%02x:", i);
 361         }
 362         sb.deleteCharAt(sb.length()-1);
 363         bb.reset();
 364         return sb.toString();
 365     }
 366 
 367     private static String toHexdump(ByteBuffer bb) {
 368         List<String> words = new ArrayList<>();
 369         int i = 0;
 370         bb.mark();
 371         while (bb.hasRemaining()) {
 372             if (i % 2 == 0) {
 373                 words.add("");
 374             }
 375             byte b = bb.get();
 376             String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
 377             words.set(i / 2, words.get(i / 2) + hex);
 378             i++;
 379         }
 380         bb.reset();
 381         return words.stream().collect(Collectors.joining(" "));
 382     }
 383 
 384     private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {




 385         boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
 386 
 387         ByteBufferReference[] buffers = frame.getHeaderBlock();
 388         for (int i = 0; i < buffers.length; i++) {
 389             hpackIn.decode(buffers[i].get(), endOfHeaders && (i == buffers.length - 1), decoder);


 390         }
 391     }
 392 
 393     int getInitialSendWindowSize() {
 394         return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
 395     }
 396 
 397     void close() {
 398         GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
 399         // TODO: set last stream. For now zero ok.
 400         sendFrame(f);
 401     }
 402 
 403     private ByteBufferPool readBufferPool = new ByteBufferPool();
 404 
 405     // provides buffer to read data (default size)
 406     public ByteBufferReference getReadBuffer() {
 407         return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE);
 408     }
 409 
 410     private final Object readlock = new Object();
 411 
 412     public void asyncReceive(ByteBufferReference buffer) {
 413         // We don't need to read anything and
 414         // we don't want to send anything back to the server
 415         // until the connection preface has been sent.
 416         // Therefore we're going to wait if needed before reading
 417         // (and thus replying) to anything.
 418         // Starting to reply to something (e.g send an ACK to a
 419         // SettingsFrame sent by the server) before the connection
 420         // preface is fully sent might result in the server
 421         // sending a GOAWAY frame with 'invalid_preface'.
 422         synchronized (readlock) {


 423             try {
 424                 // the readlock ensures that the order of incoming buffers
 425                 // is preserved.

























 426                 framesController.processReceivedData(framesDecoder, buffer);


 427             } catch (Throwable e) {
 428                 String msg = Utils.stackTrace(e);
 429                 Log.logTrace(msg);
 430                 shutdown(e);
 431             }
 432         }
 433     }
 434 



 435 
 436     void shutdown(Throwable t) {
 437         Log.logError(t);



 438         closed = true;




 439         client2.deleteConnection(this);
 440         List<Stream<?>> c = new LinkedList<>(streams.values());
 441         for (Stream<?> s : c) {
 442             s.cancelImpl(t);
 443         }
 444         connection.close();
 445     }
 446 
 447     /**
 448      * Handles stream 0 (common) frames that apply to whole connection and passes
 449      * other stream specific frames to that Stream object.
 450      *
 451      * Invokes Stream.incoming() which is expected to process frame without
 452      * blocking.
 453      */
 454     void processFrame(Http2Frame frame) throws IOException {
 455         Log.logFrames(frame, "IN");
 456         int streamid = frame.streamid();
 457         if (frame instanceof MalformedFrame) {
 458             Log.logError(((MalformedFrame) frame).getMessage());
 459             if (streamid == 0) {
 460                 protocolError(((MalformedFrame) frame).getErrorCode());

 461             } else {


 462                 resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
 463             }
 464             return;
 465         }
 466         if (streamid == 0) {
 467             handleConnectionFrame(frame);
 468         } else {
 469             if (frame instanceof SettingsFrame) {
 470                 // The stream identifier for a SETTINGS frame MUST be zero
 471                 protocolError(GoAwayFrame.PROTOCOL_ERROR);
 472                 return;
 473             }
 474 
 475             Stream<?> stream = getStream(streamid);
 476             if (stream == null) {
 477                 // Should never receive a frame with unknown stream id
 478 
 479                 // To avoid looping, an endpoint MUST NOT send a RST_STREAM in
 480                 // response to a RST_STREAM frame.
 481                 if (!(frame instanceof ResetFrame)) {







 482                     resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 483                 }
 484                 return;
 485             }
 486             if (frame instanceof PushPromiseFrame) {
 487                 PushPromiseFrame pp = (PushPromiseFrame)frame;
 488                 handlePushPromise(stream, pp);
 489             } else if (frame instanceof HeaderFrame) {
 490                 // decode headers (or continuation)
 491                 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
 492                 stream.incoming(frame);
 493             } else {
 494                 stream.incoming(frame);
 495             }
 496         }
 497     }
 498 
 499     private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
 500         throws IOException
 501     {





 502         HttpRequestImpl parentReq = parent.request;
 503         int promisedStreamid = pp.getPromisedStream();
 504         if (promisedStreamid != nextPushStream) {
 505             resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
 506             return;
 507         } else {
 508             nextPushStream += 2;
 509         }
 510         HeaderDecoder decoder = new HeaderDecoder();
 511         decodeHeaders(pp, decoder);
 512         HttpHeadersImpl headers = decoder.headers();
 513         HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
 514         Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
 515         Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch);
 516         pushExch.exchImpl = pushStream;
 517         pushStream.registerStream(promisedStreamid);
 518         parent.incoming_pushPromise(pushReq, pushStream);
 519     }
 520 
 521     private void handleConnectionFrame(Http2Frame frame)
 522         throws IOException
 523     {
 524         switch (frame.type()) {
 525           case SettingsFrame.TYPE:
 526               handleSettings((SettingsFrame)frame);
 527               break;
 528           case PingFrame.TYPE:
 529               handlePing((PingFrame)frame);
 530               break;
 531           case GoAwayFrame.TYPE:
 532               handleGoAway((GoAwayFrame)frame);
 533               break;
 534           case WindowUpdateFrame.TYPE:
 535               handleWindowUpdate((WindowUpdateFrame)frame);
 536               break;
 537           default:
 538             protocolError(ErrorFrame.PROTOCOL_ERROR);
 539         }
 540     }
 541 
 542     void resetStream(int streamid, int code) throws IOException {
 543         Log.logError(
 544             "Resetting stream {0,number,integer} with error code {1,number,integer}",
 545             streamid, code);
 546         ResetFrame frame = new ResetFrame(streamid, code);
 547         sendFrame(frame);
 548         closeStream(streamid);
 549     }
 550 
 551     void closeStream(int streamid) {

 552         Stream<?> s = streams.remove(streamid);







 553         // ## Remove s != null. It is a hack for delayed cancellation,reset
 554         if (s != null && !(s instanceof Stream.PushedStream)) {
 555             // Since PushStreams have no request body, then they have no
 556             // corresponding entry in the window controller.
 557             windowController.removeStream(streamid);
 558         }
 559     }
 560     /**
 561      * Increments this connection's send Window by the amount in the given frame.
 562      */
 563     private void handleWindowUpdate(WindowUpdateFrame f)
 564         throws IOException
 565     {
 566         int amount = f.getUpdate();
 567         if (amount <= 0) {
 568             // ## temporarily disable to workaround a bug in Jetty where it
 569             // ## sends Window updates with a 0 update value.
 570             //protocolError(ErrorFrame.PROTOCOL_ERROR);
 571         } else {
 572             boolean success = windowController.increaseConnectionWindow(amount);
 573             if (!success) {
 574                 protocolError(ErrorFrame.FLOW_CONTROL_ERROR);  // overflow
 575             }
 576         }
 577     }
 578 
 579     private void protocolError(int errorCode)
 580         throws IOException
 581     {






 582         GoAwayFrame frame = new GoAwayFrame(0, errorCode);
 583         sendFrame(frame);
 584         shutdown(new IOException("protocol error"));
 585     }
 586 
 587     private void handleSettings(SettingsFrame frame)
 588         throws IOException
 589     {
 590         assert frame.streamid() == 0;
 591         if (!frame.getFlag(SettingsFrame.ACK)) {
 592             int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
 593             int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
 594             int diff = newWindowSize - oldWindowSize;
 595             if (diff != 0) {
 596                 windowController.adjustActiveStreams(diff);
 597             }
 598             serverSettings = frame;
 599             sendFrame(new SettingsFrame(SettingsFrame.ACK));
 600         }
 601     }
 602 
 603     private void handlePing(PingFrame frame)
 604         throws IOException


 616     }
 617 
 618     /**
 619      * Max frame size we are allowed to send
 620      */
 621     public int getMaxSendFrameSize() {
 622         int param = serverSettings.getParameter(MAX_FRAME_SIZE);
 623         if (param == -1) {
 624             param = DEFAULT_FRAME_SIZE;
 625         }
 626         return param;
 627     }
 628 
 629     /**
 630      * Max frame size we will receive
 631      */
 632     public int getMaxReceiveFrameSize() {
 633         return clientSettings.getParameter(MAX_FRAME_SIZE);
 634     }
 635 
 636     // Not sure how useful this is.
 637     public int getMaxHeadersSize() {
 638         return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
 639     }
 640 
 641     private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
 642 
 643     private static final byte[] PREFACE_BYTES =
 644         CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
 645 
 646     /**
 647      * Sends Connection preface and Settings frame with current preferred
 648      * values
 649      */
 650     private void sendConnectionPreface() throws IOException {
 651         Log.logTrace("{0}: start sending connection preface to {1}",
 652                      connection.channel().getLocalAddress(),
 653                      connection.address());
 654         SettingsFrame sf = client2.getClientSettings();
 655         ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
 656         Log.logFrames(sf, "OUT");
 657         // send preface bytes and SettingsFrame together
 658         connection.write(ref.get());


 659         // mark preface sent.
 660         framesController.markPrefaceSent();
 661         Log.logTrace("PREFACE_BYTES sent");
 662         Log.logTrace("Settings Frame sent");
 663 
 664         // send a Window update for the receive buffer we are using
 665         // minus the initial 64 K specified in protocol
 666         final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1);
 667         windowUpdater.sendWindowUpdate(len);
 668         // there will be an ACK to the windows update - which should
 669         // cause any pending data stored before the preface was sent to be
 670         // flushed (see PrefaceController).
 671         Log.logTrace("finished sending connection preface");



 672     }
 673 
 674     /**
 675      * Returns an existing Stream with given id, or null if doesn't exist
 676      */
 677     @SuppressWarnings("unchecked")
 678     <T> Stream<T> getStream(int streamid) {
 679         return (Stream<T>)streams.get(streamid);
 680     }
 681 
 682     /**
 683      * Creates Stream with given id.
 684      */
 685     <T> Stream<T> createStream(Exchange<T> exchange) {
 686         Stream<T> stream = new Stream<>(client, this, exchange, windowController);
 687         return stream;
 688     }
 689 
 690     <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
 691         PushGroup<?,T> pg = parent.exchange.getPushGroup();
 692         return new Stream.PushedStream<>(pg, client, this, parent, pushEx);
 693     }
 694 
 695     <T> void putStream(Stream<T> stream, int streamid) {




 696         streams.put(streamid, stream);
 697     }
 698 
 699     void deleteStream(int streamid) {
 700         streams.remove(streamid);
 701         windowController.removeStream(streamid);
 702     }
 703 
 704     /**
 705      * Encode the headers into a List<ByteBuffer> and then create HEADERS
 706      * and CONTINUATION frames from the list and return the List<Http2Frame>.
 707      */
 708     private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
 709         List<ByteBufferReference> buffers = encodeHeadersImpl(
 710                 getMaxSendFrameSize(),
 711                 frame.getAttachment().getRequestPseudoHeaders(),
 712                 frame.getUserHeaders(),
 713                 frame.getSystemHeaders());
 714 
 715         List<HeaderFrame> frames = new ArrayList<>(buffers.size());
 716         Iterator<ByteBufferReference> bufIterator = buffers.iterator();
 717         HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
 718         frames.add(oframe);
 719         while(bufIterator.hasNext()) {
 720             oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
 721             frames.add(oframe);
 722         }
 723         oframe.setFlag(HeaderFrame.END_HEADERS);
 724         return frames;
 725     }
 726 
 727     // Dedicated cache for headers encoding ByteBuffer.
 728     // There can be no concurrent access to this  buffer as all access to this buffer
 729     // and its content happen within a single critical code block section protected
 730     // by the sendLock. / (see sendFrame())
 731     private ByteBufferPool headerEncodingPool = new ByteBufferPool();
 732 
 733     private ByteBufferReference getHeaderBuffer(int maxFrameSize) {
 734         ByteBufferReference ref = headerEncodingPool.get(maxFrameSize);
 735         ref.get().limit(maxFrameSize);
 736         return ref;
 737     }
 738 
 739     /*
 740      * Encodes all the headers from the given HttpHeaders into the given List
 741      * of buffers.
 742      *
 743      * From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
 744      *
 745      *     ...Just as in HTTP/1.x, header field names are strings of ASCII
 746      *     characters that are compared in a case-insensitive fashion.  However,
 747      *     header field names MUST be converted to lowercase prior to their
 748      *     encoding in HTTP/2...
 749      */
 750     private List<ByteBufferReference> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
 751         ByteBufferReference buffer = getHeaderBuffer(maxFrameSize);
 752         List<ByteBufferReference> buffers = new ArrayList<>();
 753         for(HttpHeaders header : headers) {
 754             for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
 755                 String lKey = e.getKey().toLowerCase();
 756                 List<String> values = e.getValue();
 757                 for (String value : values) {
 758                     hpackOut.header(lKey, value);
 759                     while (!hpackOut.encode(buffer.get())) {
 760                         buffer.get().flip();
 761                         buffers.add(buffer);
 762                         buffer =  getHeaderBuffer(maxFrameSize);
 763                     }
 764                 }
 765             }
 766         }
 767         buffer.get().flip();
 768         buffers.add(buffer);
 769         return buffers;
 770     }
 771 
 772     private ByteBufferReference[] encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
 773         oh.streamid(stream.streamid);
 774         if (Log.headers()) {
 775             StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
 776             sb.append(stream.streamid).append(")\n");
 777             Log.dumpHeaders(sb, "    ", oh.getAttachment().getRequestPseudoHeaders());
 778             Log.dumpHeaders(sb, "    ", oh.getSystemHeaders());
 779             Log.dumpHeaders(sb, "    ", oh.getUserHeaders());
 780             Log.logHeaders(sb.toString());
 781         }
 782         List<HeaderFrame> frames = encodeHeaders(oh);
 783         return encodeFrames(frames);
 784     }
 785 
 786     private ByteBufferReference[] encodeFrames(List<HeaderFrame> frames) {
 787         if (Log.frames()) {
 788             frames.forEach(f -> Log.logFrames(f, "OUT"));
 789         }
 790         return framesEncoder.encodeFrames(frames);
 791     }
 792 
 793     static Throwable getExceptionFrom(CompletableFuture<?> cf) {
 794         try {
 795             cf.get();
 796             return null;
 797         } catch (Throwable e) {
 798             if (e.getCause() != null) {
 799                 return e.getCause();
 800             } else {
 801                 return e;
 802             }
 803         }
 804     }
 805 
 806     private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
 807         Stream<?> stream = oh.getAttachment();
 808         int streamid = nextstreamid;
 809         nextstreamid += 2;
 810         stream.registerStream(streamid);
 811         // set outgoing window here. This allows thread sending
 812         // body to proceed.
 813         windowController.registerStream(streamid, getInitialSendWindowSize());
 814         return stream;
 815     }
 816 
 817     private final Object sendlock = new Object();
 818 
 819     void sendFrame(Http2Frame frame) {
 820         try {

 821             synchronized (sendlock) {
 822                 if (frame instanceof OutgoingHeaders) {
 823                     @SuppressWarnings("unchecked")
 824                     OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
 825                     Stream<?> stream = registerNewStream(oh);
 826                     // provide protection from inserting unordered frames between Headers and Continuation
 827                     connection.writeAsync(encodeHeaders(oh, stream));
 828                 } else {
 829                     connection.writeAsync(encodeFrame(frame));
 830                 }
 831             }
 832             connection.flushAsync();
 833         } catch (IOException e) {
 834             if (!closed) {
 835                 Log.logError(e);
 836                 shutdown(e);
 837             }
 838         }
 839     }
 840 
 841     private ByteBufferReference[] encodeFrame(Http2Frame frame) {
 842         Log.logFrames(frame, "OUT");
 843         return framesEncoder.encodeFrame(frame);
 844     }
 845 
 846     void sendDataFrame(DataFrame frame) {
 847         try {
 848             connection.writeAsync(encodeFrame(frame));
 849             connection.flushAsync();

 850         } catch (IOException e) {
 851             if (!closed) {
 852                 Log.logError(e);
 853                 shutdown(e);
 854             }
 855         }
 856     }
 857 
 858     /*
 859      * Direct call of the method bypasses synchronization on "sendlock" and
 860      * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
 861      * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
 862      */
 863     void sendUnorderedFrame(Http2Frame frame) {
 864         try {
 865             connection.writeAsyncUnordered(encodeFrame(frame));
 866             connection.flushAsync();

 867         } catch (IOException e) {
 868             if (!closed) {
 869                 Log.logError(e);
 870                 shutdown(e);
 871             }


































































































































































































 872         }
 873     }
 874 
 875     static class HeaderDecoder implements DecodingCallback {
 876         HttpHeadersImpl headers;
 877 
 878         HeaderDecoder() {
 879             this.headers = new HttpHeadersImpl();
 880         }
 881 
 882         @Override
 883         public void onDecoded(CharSequence name, CharSequence value) {
 884             headers.addHeader(name.toString(), value.toString());
 885         }
 886 
 887         HttpHeadersImpl headers() {
 888             return headers;
 889         }
 890     }
 891 




   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.EOFException;
  29 import java.io.IOException;
  30 import java.lang.System.Logger.Level;
  31 import java.net.InetSocketAddress;
  32 import java.net.URI;

  33 import java.nio.ByteBuffer;
  34 import java.nio.charset.StandardCharsets;

  35 import java.util.Iterator;
  36 import java.util.LinkedList;
  37 import java.util.List;
  38 import java.util.Map;
  39 import java.util.concurrent.CompletableFuture;
  40 import java.util.ArrayList;
  41 import java.util.Objects;

  42 import java.util.concurrent.ConcurrentHashMap;
  43 import java.util.concurrent.ConcurrentLinkedQueue;
  44 import java.util.concurrent.Flow;
  45 import java.util.function.Function;
  46 import java.util.function.Supplier;
  47 import javax.net.ssl.SSLEngine;
  48 import jdk.incubator.http.HttpConnection.HttpPublisher;
  49 import jdk.incubator.http.internal.common.FlowTube;
  50 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
  51 import jdk.incubator.http.internal.common.HttpHeadersImpl;
  52 import jdk.incubator.http.internal.common.Log;
  53 import jdk.incubator.http.internal.common.MinimalFuture;
  54 import jdk.incubator.http.internal.common.SequentialScheduler;
  55 import jdk.incubator.http.internal.common.Utils;
  56 import jdk.incubator.http.internal.frame.ContinuationFrame;
  57 import jdk.incubator.http.internal.frame.DataFrame;
  58 import jdk.incubator.http.internal.frame.ErrorFrame;
  59 import jdk.incubator.http.internal.frame.FramesDecoder;
  60 import jdk.incubator.http.internal.frame.FramesEncoder;
  61 import jdk.incubator.http.internal.frame.GoAwayFrame;
  62 import jdk.incubator.http.internal.frame.HeaderFrame;
  63 import jdk.incubator.http.internal.frame.HeadersFrame;
  64 import jdk.incubator.http.internal.frame.Http2Frame;
  65 import jdk.incubator.http.internal.frame.MalformedFrame;
  66 import jdk.incubator.http.internal.frame.OutgoingHeaders;
  67 import jdk.incubator.http.internal.frame.PingFrame;
  68 import jdk.incubator.http.internal.frame.PushPromiseFrame;
  69 import jdk.incubator.http.internal.frame.ResetFrame;
  70 import jdk.incubator.http.internal.frame.SettingsFrame;
  71 import jdk.incubator.http.internal.frame.WindowUpdateFrame;
  72 import jdk.incubator.http.internal.hpack.Encoder;
  73 import jdk.incubator.http.internal.hpack.Decoder;
  74 import jdk.incubator.http.internal.hpack.DecodingCallback;
  75 
  76 import static jdk.incubator.http.internal.frame.SettingsFrame.*;
  77 
  78 
  79 /**
  80  * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
  81  * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
  82  *
  83  * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
  84  * to a HttpClientImpl.
  85  *
  86  * Creation cases:
  87  * 1) upgraded HTTP/1.1 plain tcp connection
  88  * 2) prior knowledge directly created plain tcp connection
  89  * 3) directly created HTTP/2 SSL connection which uses ALPN.
  90  *
  91  * Sending is done by writing directly to underlying HttpConnection object which
  92  * is operating in async mode. No flow control applies on output at this level
  93  * and all writes are just executed as puts to an output Q belonging to HttpConnection
  94  * Flow control is implemented by HTTP/2 protocol itself.
  95  *
  96  * Hpack header compression
  97  * and outgoing stream creation is also done here, because these operations
  98  * must be synchronized at the socket level. Stream objects send frames simply
  99  * by placing them on the connection's output Queue. sendFrame() is called
 100  * from a higher level (Stream) thread.
 101  *
 102  * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
 103  * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
 104  * or handles them directly itself. This thread performs hpack decompression
 105  * and incoming stream creation (Server push). Incoming frames destined for a
 106  * stream are provided by calling Stream.incoming().
 107  */
 108 class Http2Connection  {
 109 
 110     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
 111     static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag.
 112     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
 113     final static System.Logger  DEBUG_LOGGER =
 114             Utils.getDebugLogger("Http2Connection"::toString, DEBUG);
 115     private final System.Logger debugHpack =
 116                   Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
 117     static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
 118 
 119     /*
 120      *  ByteBuffer pooling strategy for HTTP/2 protocol:
 121      *
 122      * In general there are 4 points where ByteBuffers are used:
 123      *  - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data
 124      *    in case of SSL connection.
 125      *
 126      * 1. Outgoing frames encoded to ByteBuffers.
 127      *    Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
 128      *    At this place no pools at all. All outgoing buffers should be collected by GC.
 129      *
 130      * 2. Incoming ByteBuffers (decoded to frames).
 131      *    Here, total elimination of BB pool is not a good idea.
 132      *    We don't know how many bytes we will receive through network.
 133      * So here we allocate buffer of reasonable size. The following life of the BB:
 134      * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses)
 135      *     BB is returned to pool,
 136      * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method.
 137      *     Such BB is never returned to pool and will be GCed.
 138      * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and
 139      *     the buffer could be release to pool.
 140      *
 141      * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool,
 142      *    because of we can't predict size encrypted packets.
 143      *
 144      */
 145 
 146 
 147     // A small class that allows to control frames with respect to the state of
 148     // the connection preface. Any data received before the connection
 149     // preface is sent will be buffered.
 150     private final class FramesController {
 151         volatile boolean prefaceSent;
 152         volatile List<ByteBuffer> pending;
 153 
 154         boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf)
 155                 throws IOException
 156         {
 157             // if preface is not sent, buffers data in the pending list
 158             if (!prefaceSent) {
 159                 debug.log(Level.DEBUG, "Preface is not sent: buffering %d",
 160                           buf.remaining());
 161                 synchronized (this) {
 162                     if (!prefaceSent) {
 163                         if (pending == null) pending = new ArrayList<>();
 164                         pending.add(buf);
 165                         debug.log(Level.DEBUG, () -> "there are now "
 166                               + Utils.remaining(pending)
 167                               + " bytes buffered waiting for preface to be sent");
 168                         return false;
 169                     }
 170                 }
 171             }
 172 
 173             // Preface is sent. Checks for pending data and flush it.
 174             // We rely on this method being called from within the Http2TubeSubscriber
 175             // scheduler, so we know that no other thread could execute this method
 176             // concurrently while we're here.
 177             // This ensures that later incoming buffers will not
 178             // be processed before we have flushed the pending queue.
 179             // No additional synchronization is therefore necessary here.
 180             List<ByteBuffer> pending = this.pending;
 181             this.pending = null;
 182             if (pending != null) {
 183                 // flush pending data
 184                 debug.log(Level.DEBUG, () -> "Processing buffered data: "
 185                       + Utils.remaining(pending));
 186                 for (ByteBuffer b : pending) {
 187                     decoder.decode(b);
 188                 }
 189             }

 190             // push the received buffer to the frames decoder.
 191             if (buf != EMPTY_TRIGGER) {
 192                 debug.log(Level.DEBUG, "Processing %d", buf.remaining());
 193                 decoder.decode(buf);
 194             }
 195             return true;
 196         }
 197 
 198         // Mark that the connection preface is sent
 199         void markPrefaceSent() {
 200             assert !prefaceSent;
 201             synchronized (this) {
 202                 prefaceSent = true;
 203             }
 204         }
 205 
 206     }
 207 
 208     volatile boolean closed;
 209 
 210     //-------------------------------------
 211     final HttpConnection connection;

 212     private final Http2ClientImpl client2;
 213     private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
 214     private int nextstreamid;
 215     private int nextPushStream = 2;
 216     private final Encoder hpackOut;
 217     private final Decoder hpackIn;
 218     final SettingsFrame clientSettings;
 219     private volatile SettingsFrame serverSettings;
 220     private final String key; // for HttpClientImpl.connections map
 221     private final FramesDecoder framesDecoder;
 222     private final FramesEncoder framesEncoder = new FramesEncoder();
 223 
 224     /**
 225      * Send Window controller for both connection and stream windows.
 226      * Each of this connection's Streams MUST use this controller.
 227      */
 228     private final WindowController windowController = new WindowController();
 229     private final FramesController framesController = new FramesController();
 230     private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber();
 231     final WindowUpdateSender windowUpdater;
 232     private volatile Throwable cause;
 233     private volatile Supplier<ByteBuffer> initial;
 234 
 235     static final int DEFAULT_FRAME_SIZE = 16 * 1024;
 236 
 237 
 238     // TODO: need list of control frames from other threads
 239     // that need to be sent
 240 
 241     private Http2Connection(HttpConnection connection,
 242                             Http2ClientImpl client2,
 243                             int nextstreamid,
 244                             String key) {
 245         this.connection = connection;

 246         this.client2 = client2;
 247         this.nextstreamid = nextstreamid;
 248         this.key = key;
 249         this.clientSettings = this.client2.getClientSettings();
 250         this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
 251         // serverSettings will be updated by server
 252         this.serverSettings = SettingsFrame.getDefaultSettings();
 253         this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 254         this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 255         debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString());
 256         debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn);
 257         debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut);
 258         this.windowUpdater = new ConnectionWindowUpdateSender(this, client().getReceiveBufferSize());
 259     }
 260 
 261     /**
 262      * Case 1) Create from upgraded HTTP/1.1 connection.
 263      * Is ready to use. Can be SSL. exchange is the Exchange
 264      * that initiated the connection, whose response will be delivered
 265      * on a Stream.
 266      */
 267     private Http2Connection(HttpConnection connection,
 268                     Http2ClientImpl client2,
 269                     Exchange<?> exchange,
 270                     Supplier<ByteBuffer> initial)
 271         throws IOException, InterruptedException
 272     {
 273         this(connection,
 274                 client2,
 275                 3, // stream 1 is registered during the upgrade
 276                 keyFor(connection));

 277         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 278 
 279         Stream<?> initialStream = createStream(exchange);
 280         initialStream.registerStream(1);
 281         windowController.registerStream(1, getInitialSendWindowSize());
 282         initialStream.requestSent();
 283         // Upgrading:
 284         //    set callbacks before sending preface - makes sure anything that
 285         //    might be sent by the server will come our way.
 286         this.initial = initial;
 287         connectFlows(connection);
 288         sendConnectionPreface();







 289     }
 290 
 291     // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
 292     // agreement from the server. Async style but completes immediately, because
 293     // the connection is already connected.
 294     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
 295                                                           Http2ClientImpl client2,
 296                                                           Exchange<?> exchange,
 297                                                           Supplier<ByteBuffer> initial)
 298     {
 299         return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
 300     }
 301 
 302     // Requires TLS handshake. So, is really async
 303     static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
 304                                                           Http2ClientImpl h2client) {
 305         assert request.secure();
 306         AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
 307         HttpConnection.getConnection(request.getAddress(),
 308                                      h2client.client(),
 309                                      request,
 310                                      HttpClient.Version.HTTP_2);
 311 
 312         return connection.connectAsync()
 313                   .thenCompose(unused -> checkSSLConfig(connection))
 314                   .thenCompose(notused-> {
 315                       CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
 316                       try {
 317                           Http2Connection hc = new Http2Connection(request, h2client, connection);
 318                           cf.complete(hc);
 319                       } catch (IOException e) {
 320                           cf.completeExceptionally(e);
 321                       }
 322                       return cf; } );
 323     }
 324 
 325     /**
 326      * Cases 2) 3)
 327      *
 328      * request is request to be sent.
 329      */
 330     private Http2Connection(HttpRequestImpl request,
 331                             Http2ClientImpl h2client,
 332                             HttpConnection connection)
 333         throws IOException
 334     {
 335         this(connection,
 336              h2client,
 337              1,
 338              keyFor(request.uri(), request.proxy()));
 339 
 340         Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
 341 





 342         // safe to resume async reading now.
 343         connectFlows(connection);
 344         sendConnectionPreface();
 345     }
 346 
 347     private void connectFlows(HttpConnection connection) {
 348         FlowTube tube =  connection.getConnectionFlow();
 349         // Connect the flow to our Http2TubeSubscriber:
 350         tube.connectFlows(connection.publisher(), subscriber);
 351     }
 352 
 353     final HttpClientImpl client() {
 354         return client2.client();
 355     }
 356 
 357     /**
 358      * Throws an IOException if h2 was not negotiated
 359      */
 360     private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
 361         assert aconn.isSecure();
 362 
 363         Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
 364             CompletableFuture<Void> cf = new MinimalFuture<>();
 365             SSLEngine engine = aconn.getEngine();
 366             assert Objects.equals(alpn, engine.getApplicationProtocol());
 367 
 368             DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn );
 369 
 370             if (alpn == null || !alpn.equals("h2")) {
 371                 String msg;
 372                 if (alpn == null) {
 373                     Log.logSSL("ALPN not supported");
 374                     msg = "ALPN not supported";
 375                 } else {
 376                     switch (alpn) {
 377                         case "":
 378                             Log.logSSL(msg = "No ALPN negotiated");

 379                             break;
 380                         case "http/1.1":
 381                             Log.logSSL( msg = "HTTP/1.1 ALPN returned");

 382                             break;
 383                         default:
 384                             Log.logSSL(msg = "Unexpected ALPN: " + alpn);
 385                             cf.completeExceptionally(new IOException(msg));

 386                     }

 387                 }
 388                 cf.completeExceptionally(new ALPNException(msg, aconn));
 389                 return cf;
 390             }
 391             cf.complete(null);
 392             return cf;
 393         };
 394 
 395         return aconn.getALPN().thenCompose(checkAlpnCF);
 396     }
 397 
 398     static String keyFor(HttpConnection connection) {
 399         boolean isProxy = connection.isProxied();
 400         boolean isSecure = connection.isSecure();
 401         InetSocketAddress addr = connection.address();
 402 
 403         return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
 404     }
 405 
 406     static String keyFor(URI uri, InetSocketAddress proxy) {
 407         boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
 408         boolean isProxy = proxy != null;
 409 
 410         String host;
 411         int port;
 412 
 413         if (proxy != null) {
 414             host = proxy.getHostString();
 415             port = proxy.getPort();
 416         } else {
 417             host = uri.getHost();
 418             port = uri.getPort();
 419         }
 420         return keyString(isSecure, isProxy, host, port);
 421     }
 422 
 423     // {C,S}:{H:P}:host:port
 424     // C indicates clear text connection "http"
 425     // S indicates secure "https"
 426     // H indicates host (direct) connection
 427     // P indicates proxy
 428     // Eg: "S:H:foo.com:80"
 429     static String keyString(boolean secure, boolean proxy, String host, int port) {
 430         return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
 431     }
 432 
 433     String key() {
 434         return this.key;
 435     }
 436 
 437     void putConnection() {
 438         client2.putConnection(this);
 439     }
 440 
 441     private HttpPublisher publisher() {
 442         return connection.publisher();



























 443     }
 444 
 445     private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
 446             throws IOException
 447     {
 448         debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder);
 449 
 450         boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
 451 
 452         List<ByteBuffer> buffers = frame.getHeaderBlock();
 453         int len = buffers.size();
 454         for (int i = 0; i < len; i++) {
 455             ByteBuffer b = buffers.get(i);
 456             hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder);
 457         }
 458     }
 459 
 460     final int getInitialSendWindowSize() {
 461         return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
 462     }
 463 
 464     void close() {
 465         GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
 466         // TODO: set last stream. For now zero ok.
 467         sendFrame(f);
 468     }
 469 
 470     long count;
 471     final void asyncReceive(ByteBuffer buffer) {








 472         // We don't need to read anything and
 473         // we don't want to send anything back to the server
 474         // until the connection preface has been sent.
 475         // Therefore we're going to wait if needed before reading
 476         // (and thus replying) to anything.
 477         // Starting to reply to something (e.g send an ACK to a
 478         // SettingsFrame sent by the server) before the connection
 479         // preface is fully sent might result in the server
 480         // sending a GOAWAY frame with 'invalid_preface'.
 481         //
 482         // Note: asyncReceive is only called from the Http2TubeSubscriber
 483         //       sequential scheduler.
 484         try {
 485             Supplier<ByteBuffer> bs = initial;
 486             // ensure that we always handle the initial buffer first,
 487             // if any.
 488             if (bs != null) {
 489                 initial = null;
 490                 ByteBuffer b = bs.get();
 491                 if (b.hasRemaining()) {
 492                     long c = ++count;
 493                     debug.log(Level.DEBUG, () -> "H2 Receiving Initial("
 494                         + c +"): " + b.remaining());
 495                     framesController.processReceivedData(framesDecoder, b);
 496                 }
 497             }
 498             ByteBuffer b = buffer;
 499             // the Http2TubeSubscriber scheduler ensures that the order of incoming
 500             // buffers is preserved.
 501             if (b == EMPTY_TRIGGER) {
 502                 debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER");
 503                 boolean prefaceSent = framesController.prefaceSent;
 504                 assert prefaceSent;
 505                 // call framesController.processReceivedData to potentially
 506                 // trigger the processing of all the data buffered there.
 507                 framesController.processReceivedData(framesDecoder, buffer);
 508                 debug.log(Level.DEBUG, "H2 processed buffered data");
 509             } else {
 510                 long c = ++count;
 511                 debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining());
 512                 framesController.processReceivedData(framesDecoder, buffer);
 513                 debug.log(Level.DEBUG, "H2 processed(%d)", c);
 514             }
 515         } catch (Throwable e) {
 516             String msg = Utils.stackTrace(e);
 517             Log.logTrace(msg);
 518             shutdown(e);
 519         }
 520     }

 521 
 522     Throwable getRecordedCause() {
 523         return cause;
 524     }
 525 
 526     void shutdown(Throwable t) {
 527         debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t);
 528         if (closed == true) return;
 529         synchronized (this) {
 530             if (closed == true) return;
 531             closed = true;
 532         }
 533         Log.logError(t);
 534         Throwable initialCause = this.cause;
 535         if (initialCause == null) this.cause = t;
 536         client2.deleteConnection(this);
 537         List<Stream<?>> c = new LinkedList<>(streams.values());
 538         for (Stream<?> s : c) {
 539             s.cancelImpl(t);
 540         }
 541         connection.close();
 542     }
 543 
 544     /**
 545      * Handles stream 0 (common) frames that apply to whole connection and passes
 546      * other stream specific frames to that Stream object.
 547      *
 548      * Invokes Stream.incoming() which is expected to process frame without
 549      * blocking.
 550      */
 551     void processFrame(Http2Frame frame) throws IOException {
 552         Log.logFrames(frame, "IN");
 553         int streamid = frame.streamid();
 554         if (frame instanceof MalformedFrame) {
 555             Log.logError(((MalformedFrame) frame).getMessage());
 556             if (streamid == 0) {
 557                 protocolError(((MalformedFrame) frame).getErrorCode(),
 558                         ((MalformedFrame) frame).getMessage());
 559             } else {
 560                 debug.log(Level.DEBUG, () -> "Reset stream: "
 561                           + ((MalformedFrame) frame).getMessage());
 562                 resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
 563             }
 564             return;
 565         }
 566         if (streamid == 0) {
 567             handleConnectionFrame(frame);
 568         } else {
 569             if (frame instanceof SettingsFrame) {
 570                 // The stream identifier for a SETTINGS frame MUST be zero
 571                 protocolError(GoAwayFrame.PROTOCOL_ERROR);
 572                 return;
 573             }
 574 
 575             Stream<?> stream = getStream(streamid);
 576             if (stream == null) {
 577                 // Should never receive a frame with unknown stream id
 578 
 579                 if (frame instanceof HeaderFrame) {
 580                     // always decode the headers as they may affect
 581                     // connection-level HPACK decoding state
 582                     HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
 583                     decodeHeaders((HeaderFrame) frame, decoder);
 584                 }
 585 
 586                 int sid = frame.streamid();
 587                 if (sid >= nextstreamid && !(frame instanceof ResetFrame)) {
 588                     // otherwise the stream has already been reset/closed
 589                     resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 590                 }
 591                 return;
 592             }
 593             if (frame instanceof PushPromiseFrame) {
 594                 PushPromiseFrame pp = (PushPromiseFrame)frame;
 595                 handlePushPromise(stream, pp);
 596             } else if (frame instanceof HeaderFrame) {
 597                 // decode headers (or continuation)
 598                 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
 599                 stream.incoming(frame);
 600             } else {
 601                 stream.incoming(frame);
 602             }
 603         }
 604     }
 605 
 606     private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
 607         throws IOException
 608     {
 609         // always decode the headers as they may affect connection-level HPACK
 610         // decoding state
 611         HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
 612         decodeHeaders(pp, decoder);
 613 
 614         HttpRequestImpl parentReq = parent.request;
 615         int promisedStreamid = pp.getPromisedStream();
 616         if (promisedStreamid != nextPushStream) {
 617             resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
 618             return;
 619         } else {
 620             nextPushStream += 2;
 621         }
 622 

 623         HttpHeadersImpl headers = decoder.headers();
 624         HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
 625         Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
 626         Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch);
 627         pushExch.exchImpl = pushStream;
 628         pushStream.registerStream(promisedStreamid);
 629         parent.incoming_pushPromise(pushReq, pushStream);
 630     }
 631 
 632     private void handleConnectionFrame(Http2Frame frame)
 633         throws IOException
 634     {
 635         switch (frame.type()) {
 636           case SettingsFrame.TYPE:
 637               handleSettings((SettingsFrame)frame);
 638               break;
 639           case PingFrame.TYPE:
 640               handlePing((PingFrame)frame);
 641               break;
 642           case GoAwayFrame.TYPE:
 643               handleGoAway((GoAwayFrame)frame);
 644               break;
 645           case WindowUpdateFrame.TYPE:
 646               handleWindowUpdate((WindowUpdateFrame)frame);
 647               break;
 648           default:
 649             protocolError(ErrorFrame.PROTOCOL_ERROR);
 650         }
 651     }
 652 
 653     void resetStream(int streamid, int code) throws IOException {
 654         Log.logError(
 655             "Resetting stream {0,number,integer} with error code {1,number,integer}",
 656             streamid, code);
 657         ResetFrame frame = new ResetFrame(streamid, code);
 658         sendFrame(frame);
 659         closeStream(streamid);
 660     }
 661 
 662     void closeStream(int streamid) {
 663         debug.log(Level.DEBUG, "Closed stream %d", streamid);
 664         Stream<?> s = streams.remove(streamid);
 665         if (s != null) {
 666             // decrement the reference count on the HttpClientImpl
 667             // to allow the SelectorManager thread to exit if no
 668             // other operation is pending and the facade is no
 669             // longer referenced.
 670             client().unreference();
 671         }
 672         // ## Remove s != null. It is a hack for delayed cancellation,reset
 673         if (s != null && !(s instanceof Stream.PushedStream)) {
 674             // Since PushStreams have no request body, then they have no
 675             // corresponding entry in the window controller.
 676             windowController.removeStream(streamid);
 677         }
 678     }
 679     /**
 680      * Increments this connection's send Window by the amount in the given frame.
 681      */
 682     private void handleWindowUpdate(WindowUpdateFrame f)
 683         throws IOException
 684     {
 685         int amount = f.getUpdate();
 686         if (amount <= 0) {
 687             // ## temporarily disable to workaround a bug in Jetty where it
 688             // ## sends Window updates with a 0 update value.
 689             //protocolError(ErrorFrame.PROTOCOL_ERROR);
 690         } else {
 691             boolean success = windowController.increaseConnectionWindow(amount);
 692             if (!success) {
 693                 protocolError(ErrorFrame.FLOW_CONTROL_ERROR);  // overflow
 694             }
 695         }
 696     }
 697 
 698     private void protocolError(int errorCode)
 699         throws IOException
 700     {
 701         protocolError(errorCode, null);
 702     }
 703 
 704     private void protocolError(int errorCode, String msg)
 705         throws IOException
 706     {
 707         GoAwayFrame frame = new GoAwayFrame(0, errorCode);
 708         sendFrame(frame);
 709         shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg))));
 710     }
 711 
 712     private void handleSettings(SettingsFrame frame)
 713         throws IOException
 714     {
 715         assert frame.streamid() == 0;
 716         if (!frame.getFlag(SettingsFrame.ACK)) {
 717             int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
 718             int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
 719             int diff = newWindowSize - oldWindowSize;
 720             if (diff != 0) {
 721                 windowController.adjustActiveStreams(diff);
 722             }
 723             serverSettings = frame;
 724             sendFrame(new SettingsFrame(SettingsFrame.ACK));
 725         }
 726     }
 727 
 728     private void handlePing(PingFrame frame)
 729         throws IOException


 741     }
 742 
 743     /**
 744      * Max frame size we are allowed to send
 745      */
 746     public int getMaxSendFrameSize() {
 747         int param = serverSettings.getParameter(MAX_FRAME_SIZE);
 748         if (param == -1) {
 749             param = DEFAULT_FRAME_SIZE;
 750         }
 751         return param;
 752     }
 753 
 754     /**
 755      * Max frame size we will receive
 756      */
 757     public int getMaxReceiveFrameSize() {
 758         return clientSettings.getParameter(MAX_FRAME_SIZE);
 759     }
 760 





 761     private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
 762 
 763     private static final byte[] PREFACE_BYTES =
 764         CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
 765 
 766     /**
 767      * Sends Connection preface and Settings frame with current preferred
 768      * values
 769      */
 770     private void sendConnectionPreface() throws IOException {
 771         Log.logTrace("{0}: start sending connection preface to {1}",
 772                      connection.channel().getLocalAddress(),
 773                      connection.address());
 774         SettingsFrame sf = client2.getClientSettings();
 775         ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
 776         Log.logFrames(sf, "OUT");
 777         // send preface bytes and SettingsFrame together
 778         HttpPublisher publisher = publisher();
 779         publisher.enqueue(List.of(buf));
 780         publisher.signalEnqueued();
 781         // mark preface sent.
 782         framesController.markPrefaceSent();
 783         Log.logTrace("PREFACE_BYTES sent");
 784         Log.logTrace("Settings Frame sent");
 785 
 786         // send a Window update for the receive buffer we are using
 787         // minus the initial 64 K specified in protocol
 788         final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1);
 789         windowUpdater.sendWindowUpdate(len);
 790         // there will be an ACK to the windows update - which should
 791         // cause any pending data stored before the preface was sent to be
 792         // flushed (see PrefaceController).
 793         Log.logTrace("finished sending connection preface");
 794         debug.log(Level.DEBUG, "Triggering processing of buffered data"
 795                   + " after sending connection preface");
 796         subscriber.onNext(List.of(EMPTY_TRIGGER));
 797     }
 798 
 799     /**
 800      * Returns an existing Stream with given id, or null if doesn't exist
 801      */
 802     @SuppressWarnings("unchecked")
 803     <T> Stream<T> getStream(int streamid) {
 804         return (Stream<T>)streams.get(streamid);
 805     }
 806 
 807     /**
 808      * Creates Stream with given id.
 809      */
 810     final <T> Stream<T> createStream(Exchange<T> exchange) {
 811         Stream<T> stream = new Stream<>(this, exchange, windowController);
 812         return stream;
 813     }
 814 
 815     <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
 816         PushGroup<?,T> pg = parent.exchange.getPushGroup();
 817         return new Stream.PushedStream<>(pg, this, pushEx);
 818     }
 819 
 820     <T> void putStream(Stream<T> stream, int streamid) {
 821         // increment the reference count on the HttpClientImpl
 822         // to prevent the SelectorManager thread from exiting until
 823         // the stream is closed.
 824         client().reference();
 825         streams.put(streamid, stream);
 826     }
 827 





 828     /**
 829      * Encode the headers into a List<ByteBuffer> and then create HEADERS
 830      * and CONTINUATION frames from the list and return the List<Http2Frame>.
 831      */
 832     private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
 833         List<ByteBuffer> buffers = encodeHeadersImpl(
 834                 getMaxSendFrameSize(),
 835                 frame.getAttachment().getRequestPseudoHeaders(),
 836                 frame.getUserHeaders(),
 837                 frame.getSystemHeaders());
 838 
 839         List<HeaderFrame> frames = new ArrayList<>(buffers.size());
 840         Iterator<ByteBuffer> bufIterator = buffers.iterator();
 841         HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
 842         frames.add(oframe);
 843         while(bufIterator.hasNext()) {
 844             oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
 845             frames.add(oframe);
 846         }
 847         oframe.setFlag(HeaderFrame.END_HEADERS);
 848         return frames;
 849     }
 850 
 851     // Dedicated cache for headers encoding ByteBuffer.
 852     // There can be no concurrent access to this  buffer as all access to this buffer
 853     // and its content happen within a single critical code block section protected
 854     // by the sendLock. / (see sendFrame())
 855     // private final ByteBufferPool headerEncodingPool = new ByteBufferPool();
 856 
 857     private ByteBuffer getHeaderBuffer(int maxFrameSize) {
 858         ByteBuffer buf = ByteBuffer.allocate(maxFrameSize);
 859         buf.limit(maxFrameSize);
 860         return buf;
 861     }
 862 
 863     /*
 864      * Encodes all the headers from the given HttpHeaders into the given List
 865      * of buffers.
 866      *
 867      * From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
 868      *
 869      *     ...Just as in HTTP/1.x, header field names are strings of ASCII
 870      *     characters that are compared in a case-insensitive fashion.  However,
 871      *     header field names MUST be converted to lowercase prior to their
 872      *     encoding in HTTP/2...
 873      */
 874     private List<ByteBuffer> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
 875         ByteBuffer buffer = getHeaderBuffer(maxFrameSize);
 876         List<ByteBuffer> buffers = new ArrayList<>();
 877         for(HttpHeaders header : headers) {
 878             for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
 879                 String lKey = e.getKey().toLowerCase();
 880                 List<String> values = e.getValue();
 881                 for (String value : values) {
 882                     hpackOut.header(lKey, value);
 883                     while (!hpackOut.encode(buffer)) {
 884                         buffer.flip();
 885                         buffers.add(buffer);
 886                         buffer =  getHeaderBuffer(maxFrameSize);
 887                     }
 888                 }
 889             }
 890         }
 891         buffer.flip();
 892         buffers.add(buffer);
 893         return buffers;
 894     }
 895 
 896     private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
 897         oh.streamid(stream.streamid);
 898         if (Log.headers()) {
 899             StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
 900             sb.append(stream.streamid).append(")\n");
 901             Log.dumpHeaders(sb, "    ", oh.getAttachment().getRequestPseudoHeaders());
 902             Log.dumpHeaders(sb, "    ", oh.getSystemHeaders());
 903             Log.dumpHeaders(sb, "    ", oh.getUserHeaders());
 904             Log.logHeaders(sb.toString());
 905         }
 906         List<HeaderFrame> frames = encodeHeaders(oh);
 907         return encodeFrames(frames);
 908     }
 909 
 910     private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) {
 911         if (Log.frames()) {
 912             frames.forEach(f -> Log.logFrames(f, "OUT"));
 913         }
 914         return framesEncoder.encodeFrames(frames);
 915     }
 916 













 917     private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
 918         Stream<?> stream = oh.getAttachment();
 919         int streamid = nextstreamid;
 920         nextstreamid += 2;
 921         stream.registerStream(streamid);
 922         // set outgoing window here. This allows thread sending
 923         // body to proceed.
 924         windowController.registerStream(streamid, getInitialSendWindowSize());
 925         return stream;
 926     }
 927 
 928     private final Object sendlock = new Object();
 929 
 930     void sendFrame(Http2Frame frame) {
 931         try {
 932             HttpPublisher publisher = publisher();
 933             synchronized (sendlock) {
 934                 if (frame instanceof OutgoingHeaders) {
 935                     @SuppressWarnings("unchecked")
 936                     OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
 937                     Stream<?> stream = registerNewStream(oh);
 938                     // provide protection from inserting unordered frames between Headers and Continuation
 939                     publisher.enqueue(encodeHeaders(oh, stream));
 940                 } else {
 941                     publisher.enqueue(encodeFrame(frame));
 942                 }
 943             }
 944             publisher.signalEnqueued();
 945         } catch (IOException e) {
 946             if (!closed) {
 947                 Log.logError(e);
 948                 shutdown(e);
 949             }
 950         }
 951     }
 952 
 953     private List<ByteBuffer> encodeFrame(Http2Frame frame) {
 954         Log.logFrames(frame, "OUT");
 955         return framesEncoder.encodeFrame(frame);
 956     }
 957 
 958     void sendDataFrame(DataFrame frame) {
 959         try {
 960             HttpPublisher publisher = publisher();
 961             publisher.enqueue(encodeFrame(frame));
 962             publisher.signalEnqueued();
 963         } catch (IOException e) {
 964             if (!closed) {
 965                 Log.logError(e);
 966                 shutdown(e);
 967             }
 968         }
 969     }
 970 
 971     /*
 972      * Direct call of the method bypasses synchronization on "sendlock" and
 973      * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
 974      * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
 975      */
 976     void sendUnorderedFrame(Http2Frame frame) {
 977         try {
 978             HttpPublisher publisher = publisher();
 979             publisher.enqueueUnordered(encodeFrame(frame));
 980             publisher.signalEnqueued();
 981         } catch (IOException e) {
 982             if (!closed) {
 983                 Log.logError(e);
 984                 shutdown(e);
 985             }
 986         }
 987     }
 988 
 989     /**
 990      * A simple tube subscriber for reading from the connection flow.
 991      */
 992     final class Http2TubeSubscriber implements TubeSubscriber {
 993         volatile Flow.Subscription subscription;
 994         volatile boolean completed;
 995         volatile boolean dropped;
 996         volatile Throwable error;
 997         final ConcurrentLinkedQueue<ByteBuffer> queue
 998                 = new ConcurrentLinkedQueue<>();
 999         final SequentialScheduler scheduler =
1000                 SequentialScheduler.synchronizedScheduler(this::processQueue);
1001 
1002         final void processQueue() {
1003             try {
1004                 while (!queue.isEmpty() && !scheduler.isStopped()) {
1005                     ByteBuffer buffer = queue.poll();
1006                     debug.log(Level.DEBUG,
1007                               "sending %d to Http2Connection.asyncReceive",
1008                               buffer.remaining());
1009                     asyncReceive(buffer);
1010                 }
1011             } catch (Throwable t) {
1012                 Throwable x = error;
1013                 if (x == null) error = t;
1014             } finally {
1015                 Throwable x = error;
1016                 if (x != null) {
1017                     debug.log(Level.DEBUG, "Stopping scheduler", x);
1018                     scheduler.stop();
1019                     Http2Connection.this.shutdown(x);
1020                 }
1021             }
1022         }
1023 
1024 
1025         public void onSubscribe(Flow.Subscription subscription) {
1026             // supports being called multiple time.
1027             // doesn't cancel the previous subscription, since that is
1028             // most probably the same as the new subscription.
1029             assert this.subscription == null || dropped == false;
1030             this.subscription = subscription;
1031             dropped = false;
1032             // TODO FIXME: request(1) should be done by the delegate.
1033             if (!completed) {
1034                 debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading");
1035                 subscription.request(Long.MAX_VALUE);
1036             } else {
1037                 debug.log(Level.DEBUG, "onSubscribe: already completed");
1038             }
1039         }
1040 
1041         @Override
1042         public void onNext(List<ByteBuffer> item) {
1043             debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item)
1044                     + " bytes in " + item.size() + " buffers");
1045             queue.addAll(item);
1046             scheduler.deferOrSchedule(client().theExecutor());
1047         }
1048 
1049         @Override
1050         public void onError(Throwable throwable) {
1051             debug.log(Level.DEBUG, () -> "onError: " + throwable);
1052             error = throwable;
1053             completed = true;
1054             scheduler.deferOrSchedule(client().theExecutor());
1055         }
1056 
1057         @Override
1058         public void onComplete() {
1059             debug.log(Level.DEBUG, "EOF");
1060             error = new EOFException("EOF reached while reading");
1061             completed = true;
1062             scheduler.deferOrSchedule(client().theExecutor());
1063         }
1064 
1065         public void dropSubscription() {
1066             debug.log(Level.DEBUG, "dropSubscription");
1067             // we could probably set subscription to null here...
1068             // then we might not need the 'dropped' boolean?
1069             dropped = true;
1070         }
1071     }
1072 
1073     @Override
1074     public final String toString() {
1075         return dbgString();
1076     }
1077 
1078     final String dbgString() {
1079         return "Http2Connection("
1080                     + connection.getConnectionFlow() + ")";
1081     }
1082 
1083     final class LoggingHeaderDecoder extends HeaderDecoder {
1084 
1085         private final HeaderDecoder delegate;
1086         private final System.Logger debugHpack =
1087                 Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
1088 
1089         LoggingHeaderDecoder(HeaderDecoder delegate) {
1090             this.delegate = delegate;
1091         }
1092 
1093         String dbgString() {
1094             return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder";
1095         }
1096 
1097         @Override
1098         public void onDecoded(CharSequence name, CharSequence value) {
1099             delegate.onDecoded(name, value);
1100         }
1101 
1102         @Override
1103         public void onIndexed(int index,
1104                               CharSequence name,
1105                               CharSequence value) {
1106             debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n",
1107                            index, name, value);
1108             delegate.onIndexed(index, name, value);
1109         }
1110 
1111         @Override
1112         public void onLiteral(int index,
1113                               CharSequence name,
1114                               CharSequence value,
1115                               boolean valueHuffman) {
1116             debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
1117                               index, name, value, valueHuffman);
1118             delegate.onLiteral(index, name, value, valueHuffman);
1119         }
1120 
1121         @Override
1122         public void onLiteral(CharSequence name,
1123                               boolean nameHuffman,
1124                               CharSequence value,
1125                               boolean valueHuffman) {
1126             debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
1127                            name, nameHuffman, value, valueHuffman);
1128             delegate.onLiteral(name, nameHuffman, value, valueHuffman);
1129         }
1130 
1131         @Override
1132         public void onLiteralNeverIndexed(int index,
1133                                           CharSequence name,
1134                                           CharSequence value,
1135                                           boolean valueHuffman) {
1136             debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
1137                            index, name, value, valueHuffman);
1138             delegate.onLiteralNeverIndexed(index, name, value, valueHuffman);
1139         }
1140 
1141         @Override
1142         public void onLiteralNeverIndexed(CharSequence name,
1143                                           boolean nameHuffman,
1144                                           CharSequence value,
1145                                           boolean valueHuffman) {
1146             debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
1147                            name, nameHuffman, value, valueHuffman);
1148             delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman);
1149         }
1150 
1151         @Override
1152         public void onLiteralWithIndexing(int index,
1153                                           CharSequence name,
1154                                           CharSequence value,
1155                                           boolean valueHuffman) {
1156             debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
1157                            index, name, value, valueHuffman);
1158             delegate.onLiteralWithIndexing(index, name, value, valueHuffman);
1159         }
1160 
1161         @Override
1162         public void onLiteralWithIndexing(CharSequence name,
1163                                           boolean nameHuffman,
1164                                           CharSequence value,
1165                                           boolean valueHuffman) {
1166             debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
1167                               name, nameHuffman, value, valueHuffman);
1168             delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman);
1169         }
1170 
1171         @Override
1172         public void onSizeUpdate(int capacity) {
1173             debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity);
1174             delegate.onSizeUpdate(capacity);
1175         }
1176 
1177         @Override
1178         HttpHeadersImpl headers() {
1179             return delegate.headers();
1180         }
1181     }
1182 
1183     static class HeaderDecoder implements DecodingCallback {
1184         HttpHeadersImpl headers;
1185 
1186         HeaderDecoder() {
1187             this.headers = new HttpHeadersImpl();
1188         }
1189 
1190         @Override
1191         public void onDecoded(CharSequence name, CharSequence value) {
1192             headers.addHeader(name.toString(), value.toString());
1193         }
1194 
1195         HttpHeadersImpl headers() {
1196             return headers;
1197         }
1198     }
1199 


< prev index next >