1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.EOFException; 29 import java.io.IOException; 30 import java.lang.System.Logger.Level; 31 import java.net.InetSocketAddress; 32 import java.net.URI; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.StandardCharsets; 35 import java.util.Iterator; 36 import java.util.LinkedList; 37 import java.util.List; 38 import java.util.Map; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.ArrayList; 41 import java.util.Objects; 42 import java.util.concurrent.ConcurrentHashMap; 43 import java.util.concurrent.ConcurrentLinkedQueue; 44 import java.util.concurrent.Flow; 45 import java.util.function.Function; 46 import java.util.function.Supplier; 47 import javax.net.ssl.SSLEngine; 48 import jdk.incubator.http.HttpConnection.HttpPublisher; 49 import jdk.incubator.http.internal.common.FlowTube; 50 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber; 51 import jdk.incubator.http.internal.common.HttpHeadersImpl; 52 import jdk.incubator.http.internal.common.Log; 53 import jdk.incubator.http.internal.common.MinimalFuture; 54 import jdk.incubator.http.internal.common.SequentialScheduler; 55 import jdk.incubator.http.internal.common.Utils; 56 import jdk.incubator.http.internal.frame.ContinuationFrame; 57 import jdk.incubator.http.internal.frame.DataFrame; 58 import jdk.incubator.http.internal.frame.ErrorFrame; 59 import jdk.incubator.http.internal.frame.FramesDecoder; 60 import jdk.incubator.http.internal.frame.FramesEncoder; 61 import jdk.incubator.http.internal.frame.GoAwayFrame; 62 import jdk.incubator.http.internal.frame.HeaderFrame; 63 import jdk.incubator.http.internal.frame.HeadersFrame; 64 import jdk.incubator.http.internal.frame.Http2Frame; 65 import jdk.incubator.http.internal.frame.MalformedFrame; 66 import jdk.incubator.http.internal.frame.OutgoingHeaders; 67 import jdk.incubator.http.internal.frame.PingFrame; 68 import jdk.incubator.http.internal.frame.PushPromiseFrame; 69 import jdk.incubator.http.internal.frame.ResetFrame; 70 import jdk.incubator.http.internal.frame.SettingsFrame; 71 import jdk.incubator.http.internal.frame.WindowUpdateFrame; 72 import jdk.incubator.http.internal.hpack.Encoder; 73 import jdk.incubator.http.internal.hpack.Decoder; 74 import jdk.incubator.http.internal.hpack.DecodingCallback; 75 76 import static jdk.incubator.http.internal.frame.SettingsFrame.*; 77 78 79 /** 80 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used 81 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff. 82 * 83 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs 84 * to a HttpClientImpl. 85 * 86 * Creation cases: 87 * 1) upgraded HTTP/1.1 plain tcp connection 88 * 2) prior knowledge directly created plain tcp connection 89 * 3) directly created HTTP/2 SSL connection which uses ALPN. 90 * 91 * Sending is done by writing directly to underlying HttpConnection object which 92 * is operating in async mode. No flow control applies on output at this level 93 * and all writes are just executed as puts to an output Q belonging to HttpConnection 94 * Flow control is implemented by HTTP/2 protocol itself. 95 * 96 * Hpack header compression 97 * and outgoing stream creation is also done here, because these operations 98 * must be synchronized at the socket level. Stream objects send frames simply 99 * by placing them on the connection's output Queue. sendFrame() is called 100 * from a higher level (Stream) thread. 101 * 102 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles 103 * incoming Http2Frames, and directs them to the appropriate Stream.incoming() 104 * or handles them directly itself. This thread performs hpack decompression 105 * and incoming stream creation (Server push). Incoming frames destined for a 106 * stream are provided by calling Stream.incoming(). 107 */ 108 class Http2Connection { 109 110 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 111 static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag. 112 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 113 final static System.Logger DEBUG_LOGGER = 114 Utils.getDebugLogger("Http2Connection"::toString, DEBUG); 115 private final System.Logger debugHpack = 116 Utils.getHpackLogger(this::dbgString, DEBUG_HPACK); 117 static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0); 118 119 private boolean singleStream; // used only for stream 1, then closed 120 121 /* 122 * ByteBuffer pooling strategy for HTTP/2 protocol: 123 * 124 * In general there are 4 points where ByteBuffers are used: 125 * - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data 126 * in case of SSL connection. 127 * 128 * 1. Outgoing frames encoded to ByteBuffers. 129 * Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc) 130 * At this place no pools at all. All outgoing buffers should be collected by GC. 131 * 132 * 2. Incoming ByteBuffers (decoded to frames). 133 * Here, total elimination of BB pool is not a good idea. 134 * We don't know how many bytes we will receive through network. 135 * So here we allocate buffer of reasonable size. The following life of the BB: 136 * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses) 137 * BB is returned to pool, 138 * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method. 139 * Such BB is never returned to pool and will be GCed. 140 * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and 141 * the buffer could be release to pool. 142 * 143 * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool, 144 * because of we can't predict size encrypted packets. 145 * 146 */ 147 148 149 // A small class that allows to control frames with respect to the state of 150 // the connection preface. Any data received before the connection 151 // preface is sent will be buffered. 152 private final class FramesController { 153 volatile boolean prefaceSent; 154 volatile List<ByteBuffer> pending; 155 156 boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf) 157 throws IOException 158 { 159 // if preface is not sent, buffers data in the pending list 160 if (!prefaceSent) { 161 debug.log(Level.DEBUG, "Preface is not sent: buffering %d", 162 buf.remaining()); 163 synchronized (this) { 164 if (!prefaceSent) { 165 if (pending == null) pending = new ArrayList<>(); 166 pending.add(buf); 167 debug.log(Level.DEBUG, () -> "there are now " 168 + Utils.remaining(pending) 169 + " bytes buffered waiting for preface to be sent"); 170 return false; 171 } 172 } 173 } 174 175 // Preface is sent. Checks for pending data and flush it. 176 // We rely on this method being called from within the Http2TubeSubscriber 177 // scheduler, so we know that no other thread could execute this method 178 // concurrently while we're here. 179 // This ensures that later incoming buffers will not 180 // be processed before we have flushed the pending queue. 181 // No additional synchronization is therefore necessary here. 182 List<ByteBuffer> pending = this.pending; 183 this.pending = null; 184 if (pending != null) { 185 // flush pending data 186 debug.log(Level.DEBUG, () -> "Processing buffered data: " 187 + Utils.remaining(pending)); 188 for (ByteBuffer b : pending) { 189 decoder.decode(b); 190 } 191 } 192 // push the received buffer to the frames decoder. 193 if (buf != EMPTY_TRIGGER) { 194 debug.log(Level.DEBUG, "Processing %d", buf.remaining()); 195 decoder.decode(buf); 196 } 197 return true; 198 } 199 200 // Mark that the connection preface is sent 201 void markPrefaceSent() { 202 assert !prefaceSent; 203 synchronized (this) { 204 prefaceSent = true; 205 } 206 } 207 } 208 209 volatile boolean closed; 210 211 //------------------------------------- 212 final HttpConnection connection; 213 private final Http2ClientImpl client2; 214 private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>(); 215 private int nextstreamid; 216 private int nextPushStream = 2; 217 private final Encoder hpackOut; 218 private final Decoder hpackIn; 219 final SettingsFrame clientSettings; 220 private volatile SettingsFrame serverSettings; 221 private final String key; // for HttpClientImpl.connections map 222 private final FramesDecoder framesDecoder; 223 private final FramesEncoder framesEncoder = new FramesEncoder(); 224 225 /** 226 * Send Window controller for both connection and stream windows. 227 * Each of this connection's Streams MUST use this controller. 228 */ 229 private final WindowController windowController = new WindowController(); 230 private final FramesController framesController = new FramesController(); 231 private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber(); 232 final ConnectionWindowUpdateSender windowUpdater; 233 private volatile Throwable cause; 234 private volatile Supplier<ByteBuffer> initial; 235 236 static final int DEFAULT_FRAME_SIZE = 16 * 1024; 237 238 239 // TODO: need list of control frames from other threads 240 // that need to be sent 241 242 private Http2Connection(HttpConnection connection, 243 Http2ClientImpl client2, 244 int nextstreamid, 245 String key) { 246 this.connection = connection; 247 this.client2 = client2; 248 this.nextstreamid = nextstreamid; 249 this.key = key; 250 this.clientSettings = this.client2.getClientSettings(); 251 this.framesDecoder = new FramesDecoder(this::processFrame, 252 clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); 253 // serverSettings will be updated by server 254 this.serverSettings = SettingsFrame.getDefaultSettings(); 255 this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 256 this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 257 debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString()); 258 debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn); 259 debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut); 260 this.windowUpdater = new ConnectionWindowUpdateSender(this, 261 client2.getConnectionWindowSize(clientSettings)); 262 } 263 264 /** 265 * Case 1) Create from upgraded HTTP/1.1 connection. 266 * Is ready to use. Can be SSL. exchange is the Exchange 267 * that initiated the connection, whose response will be delivered 268 * on a Stream. 269 */ 270 private Http2Connection(HttpConnection connection, 271 Http2ClientImpl client2, 272 Exchange<?> exchange, 273 Supplier<ByteBuffer> initial) 274 throws IOException, InterruptedException 275 { 276 this(connection, 277 client2, 278 3, // stream 1 is registered during the upgrade 279 keyFor(connection)); 280 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 281 282 Stream<?> initialStream = createStream(exchange); 283 initialStream.registerStream(1); 284 windowController.registerStream(1, getInitialSendWindowSize()); 285 initialStream.requestSent(); 286 // Upgrading: 287 // set callbacks before sending preface - makes sure anything that 288 // might be sent by the server will come our way. 289 this.initial = initial; 290 connectFlows(connection); 291 sendConnectionPreface(); 292 } 293 294 // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving 295 // agreement from the server. Async style but completes immediately, because 296 // the connection is already connected. 297 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, 298 Http2ClientImpl client2, 299 Exchange<?> exchange, 300 Supplier<ByteBuffer> initial) 301 { 302 return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial)); 303 } 304 305 // Requires TLS handshake. So, is really async 306 static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request, 307 Http2ClientImpl h2client) { 308 assert request.secure(); 309 AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection) 310 HttpConnection.getConnection(request.getAddress(), 311 h2client.client(), 312 request, 313 HttpClient.Version.HTTP_2); 314 315 return connection.connectAsync() 316 .thenCompose(unused -> checkSSLConfig(connection)) 317 .thenCompose(notused-> { 318 CompletableFuture<Http2Connection> cf = new MinimalFuture<>(); 319 try { 320 Http2Connection hc = new Http2Connection(request, h2client, connection); 321 cf.complete(hc); 322 } catch (IOException e) { 323 cf.completeExceptionally(e); 324 } 325 return cf; } ); 326 } 327 328 /** 329 * Cases 2) 3) 330 * 331 * request is request to be sent. 332 */ 333 private Http2Connection(HttpRequestImpl request, 334 Http2ClientImpl h2client, 335 HttpConnection connection) 336 throws IOException 337 { 338 this(connection, 339 h2client, 340 1, 341 keyFor(request.uri(), request.proxy())); 342 343 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 344 345 // safe to resume async reading now. 346 connectFlows(connection); 347 sendConnectionPreface(); 348 } 349 350 private void connectFlows(HttpConnection connection) { 351 FlowTube tube = connection.getConnectionFlow(); 352 // Connect the flow to our Http2TubeSubscriber: 353 tube.connectFlows(connection.publisher(), subscriber); 354 } 355 356 final HttpClientImpl client() { 357 return client2.client(); 358 } 359 360 /** 361 * Throws an IOException if h2 was not negotiated 362 */ 363 private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) { 364 assert aconn.isSecure(); 365 366 Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> { 367 CompletableFuture<Void> cf = new MinimalFuture<>(); 368 SSLEngine engine = aconn.getEngine(); 369 assert Objects.equals(alpn, engine.getApplicationProtocol()); 370 371 DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn ); 372 373 if (alpn == null || !alpn.equals("h2")) { 374 String msg; 375 if (alpn == null) { 376 Log.logSSL("ALPN not supported"); 377 msg = "ALPN not supported"; 378 } else { 379 switch (alpn) { 380 case "": 381 Log.logSSL(msg = "No ALPN negotiated"); 382 break; 383 case "http/1.1": 384 Log.logSSL( msg = "HTTP/1.1 ALPN returned"); 385 break; 386 default: 387 Log.logSSL(msg = "Unexpected ALPN: " + alpn); 388 cf.completeExceptionally(new IOException(msg)); 389 } 390 } 391 cf.completeExceptionally(new ALPNException(msg, aconn)); 392 return cf; 393 } 394 cf.complete(null); 395 return cf; 396 }; 397 398 return aconn.getALPN().thenCompose(checkAlpnCF); 399 } 400 401 synchronized boolean singleStream() { 402 return singleStream; 403 } 404 405 synchronized void setSingleStream(boolean use) { 406 singleStream = use; 407 } 408 409 static String keyFor(HttpConnection connection) { 410 boolean isProxy = connection.isProxied(); 411 boolean isSecure = connection.isSecure(); 412 InetSocketAddress addr = connection.address(); 413 414 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort()); 415 } 416 417 static String keyFor(URI uri, InetSocketAddress proxy) { 418 boolean isSecure = uri.getScheme().equalsIgnoreCase("https"); 419 boolean isProxy = proxy != null; 420 421 String host; 422 int port; 423 424 if (proxy != null) { 425 host = proxy.getHostString(); 426 port = proxy.getPort(); 427 } else { 428 host = uri.getHost(); 429 port = uri.getPort(); 430 } 431 return keyString(isSecure, isProxy, host, port); 432 } 433 434 // {C,S}:{H:P}:host:port 435 // C indicates clear text connection "http" 436 // S indicates secure "https" 437 // H indicates host (direct) connection 438 // P indicates proxy 439 // Eg: "S:H:foo.com:80" 440 static String keyString(boolean secure, boolean proxy, String host, int port) { 441 if (secure && port == -1) 442 port = 443; 443 else if (!secure && port == -1) 444 port = 80; 445 return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port; 446 } 447 448 String key() { 449 return this.key; 450 } 451 452 boolean offerConnection() { 453 return client2.offerConnection(this); 454 } 455 456 private HttpPublisher publisher() { 457 return connection.publisher(); 458 } 459 460 private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) 461 throws IOException 462 { 463 debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder); 464 465 boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); 466 467 List<ByteBuffer> buffers = frame.getHeaderBlock(); 468 int len = buffers.size(); 469 for (int i = 0; i < len; i++) { 470 ByteBuffer b = buffers.get(i); 471 hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder); 472 } 473 } 474 475 final int getInitialSendWindowSize() { 476 return serverSettings.getParameter(INITIAL_WINDOW_SIZE); 477 } 478 479 void close() { 480 Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address()); 481 GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes()); 482 // TODO: set last stream. For now zero ok. 483 sendFrame(f); 484 } 485 486 long count; 487 final void asyncReceive(ByteBuffer buffer) { 488 // We don't need to read anything and 489 // we don't want to send anything back to the server 490 // until the connection preface has been sent. 491 // Therefore we're going to wait if needed before reading 492 // (and thus replying) to anything. 493 // Starting to reply to something (e.g send an ACK to a 494 // SettingsFrame sent by the server) before the connection 495 // preface is fully sent might result in the server 496 // sending a GOAWAY frame with 'invalid_preface'. 497 // 498 // Note: asyncReceive is only called from the Http2TubeSubscriber 499 // sequential scheduler. 500 try { 501 Supplier<ByteBuffer> bs = initial; 502 // ensure that we always handle the initial buffer first, 503 // if any. 504 if (bs != null) { 505 initial = null; 506 ByteBuffer b = bs.get(); 507 if (b.hasRemaining()) { 508 long c = ++count; 509 debug.log(Level.DEBUG, () -> "H2 Receiving Initial(" 510 + c +"): " + b.remaining()); 511 framesController.processReceivedData(framesDecoder, b); 512 } 513 } 514 ByteBuffer b = buffer; 515 // the Http2TubeSubscriber scheduler ensures that the order of incoming 516 // buffers is preserved. 517 if (b == EMPTY_TRIGGER) { 518 debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER"); 519 boolean prefaceSent = framesController.prefaceSent; 520 assert prefaceSent; 521 // call framesController.processReceivedData to potentially 522 // trigger the processing of all the data buffered there. 523 framesController.processReceivedData(framesDecoder, buffer); 524 debug.log(Level.DEBUG, "H2 processed buffered data"); 525 } else { 526 long c = ++count; 527 debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining()); 528 framesController.processReceivedData(framesDecoder, buffer); 529 debug.log(Level.DEBUG, "H2 processed(%d)", c); 530 } 531 } catch (Throwable e) { 532 String msg = Utils.stackTrace(e); 533 Log.logTrace(msg); 534 shutdown(e); 535 } 536 } 537 538 Throwable getRecordedCause() { 539 return cause; 540 } 541 542 void shutdown(Throwable t) { 543 debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t); 544 if (closed == true) return; 545 synchronized (this) { 546 if (closed == true) return; 547 closed = true; 548 } 549 Log.logError(t); 550 Throwable initialCause = this.cause; 551 if (initialCause == null) this.cause = t; 552 client2.deleteConnection(this); 553 List<Stream<?>> c = new LinkedList<>(streams.values()); 554 for (Stream<?> s : c) { 555 s.cancelImpl(t); 556 } 557 connection.close(); 558 } 559 560 /** 561 * Handles stream 0 (common) frames that apply to whole connection and passes 562 * other stream specific frames to that Stream object. 563 * 564 * Invokes Stream.incoming() which is expected to process frame without 565 * blocking. 566 */ 567 void processFrame(Http2Frame frame) throws IOException { 568 Log.logFrames(frame, "IN"); 569 int streamid = frame.streamid(); 570 if (frame instanceof MalformedFrame) { 571 Log.logError(((MalformedFrame) frame).getMessage()); 572 if (streamid == 0) { 573 framesDecoder.close("Malformed frame on stream 0"); 574 protocolError(((MalformedFrame) frame).getErrorCode(), 575 ((MalformedFrame) frame).getMessage()); 576 } else { 577 debug.log(Level.DEBUG, () -> "Reset stream: " 578 + ((MalformedFrame) frame).getMessage()); 579 resetStream(streamid, ((MalformedFrame) frame).getErrorCode()); 580 } 581 return; 582 } 583 if (streamid == 0) { 584 handleConnectionFrame(frame); 585 } else { 586 if (frame instanceof SettingsFrame) { 587 // The stream identifier for a SETTINGS frame MUST be zero 588 framesDecoder.close( 589 "The stream identifier for a SETTINGS frame MUST be zero"); 590 protocolError(GoAwayFrame.PROTOCOL_ERROR); 591 return; 592 } 593 594 Stream<?> stream = getStream(streamid); 595 if (stream == null) { 596 // Should never receive a frame with unknown stream id 597 598 if (frame instanceof HeaderFrame) { 599 // always decode the headers as they may affect 600 // connection-level HPACK decoding state 601 HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder()); 602 decodeHeaders((HeaderFrame) frame, decoder); 603 } 604 605 int sid = frame.streamid(); 606 if (sid >= nextstreamid && !(frame instanceof ResetFrame)) { 607 // otherwise the stream has already been reset/closed 608 resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 609 } 610 return; 611 } 612 if (frame instanceof PushPromiseFrame) { 613 PushPromiseFrame pp = (PushPromiseFrame)frame; 614 handlePushPromise(stream, pp); 615 } else if (frame instanceof HeaderFrame) { 616 // decode headers (or continuation) 617 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer()); 618 stream.incoming(frame); 619 } else { 620 stream.incoming(frame); 621 } 622 } 623 } 624 625 private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp) 626 throws IOException 627 { 628 // always decode the headers as they may affect connection-level HPACK 629 // decoding state 630 HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder()); 631 decodeHeaders(pp, decoder); 632 633 HttpRequestImpl parentReq = parent.request; 634 int promisedStreamid = pp.getPromisedStream(); 635 if (promisedStreamid != nextPushStream) { 636 resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); 637 return; 638 } else { 639 nextPushStream += 2; 640 } 641 642 HttpHeadersImpl headers = decoder.headers(); 643 HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); 644 Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi); 645 Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch); 646 pushExch.exchImpl = pushStream; 647 pushStream.registerStream(promisedStreamid); 648 parent.incoming_pushPromise(pushReq, pushStream); 649 } 650 651 private void handleConnectionFrame(Http2Frame frame) 652 throws IOException 653 { 654 switch (frame.type()) { 655 case SettingsFrame.TYPE: 656 handleSettings((SettingsFrame)frame); 657 break; 658 case PingFrame.TYPE: 659 handlePing((PingFrame)frame); 660 break; 661 case GoAwayFrame.TYPE: 662 handleGoAway((GoAwayFrame)frame); 663 break; 664 case WindowUpdateFrame.TYPE: 665 handleWindowUpdate((WindowUpdateFrame)frame); 666 break; 667 default: 668 protocolError(ErrorFrame.PROTOCOL_ERROR); 669 } 670 } 671 672 void resetStream(int streamid, int code) throws IOException { 673 Log.logError( 674 "Resetting stream {0,number,integer} with error code {1,number,integer}", 675 streamid, code); 676 ResetFrame frame = new ResetFrame(streamid, code); 677 sendFrame(frame); 678 closeStream(streamid); 679 } 680 681 void closeStream(int streamid) { 682 debug.log(Level.DEBUG, "Closed stream %d", streamid); 683 Stream<?> s = streams.remove(streamid); 684 if (s != null) { 685 // decrement the reference count on the HttpClientImpl 686 // to allow the SelectorManager thread to exit if no 687 // other operation is pending and the facade is no 688 // longer referenced. 689 client().unreference(); 690 } 691 // ## Remove s != null. It is a hack for delayed cancellation,reset 692 if (s != null && !(s instanceof Stream.PushedStream)) { 693 // Since PushStreams have no request body, then they have no 694 // corresponding entry in the window controller. 695 windowController.removeStream(streamid); 696 } 697 if (singleStream() && streams.isEmpty()) { 698 // should be only 1 stream, but there might be more if server push 699 close(); 700 } 701 } 702 703 /** 704 * Increments this connection's send Window by the amount in the given frame. 705 */ 706 private void handleWindowUpdate(WindowUpdateFrame f) 707 throws IOException 708 { 709 int amount = f.getUpdate(); 710 if (amount <= 0) { 711 // ## temporarily disable to workaround a bug in Jetty where it 712 // ## sends Window updates with a 0 update value. 713 //protocolError(ErrorFrame.PROTOCOL_ERROR); 714 } else { 715 boolean success = windowController.increaseConnectionWindow(amount); 716 if (!success) { 717 protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow 718 } 719 } 720 } 721 722 private void protocolError(int errorCode) 723 throws IOException 724 { 725 protocolError(errorCode, null); 726 } 727 728 private void protocolError(int errorCode, String msg) 729 throws IOException 730 { 731 GoAwayFrame frame = new GoAwayFrame(0, errorCode); 732 sendFrame(frame); 733 shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg)))); 734 } 735 736 private void handleSettings(SettingsFrame frame) 737 throws IOException 738 { 739 assert frame.streamid() == 0; 740 if (!frame.getFlag(SettingsFrame.ACK)) { 741 int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE); 742 int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE); 743 int diff = newWindowSize - oldWindowSize; 744 if (diff != 0) { 745 windowController.adjustActiveStreams(diff); 746 } 747 serverSettings = frame; 748 sendFrame(new SettingsFrame(SettingsFrame.ACK)); 749 } 750 } 751 752 private void handlePing(PingFrame frame) 753 throws IOException 754 { 755 frame.setFlag(PingFrame.ACK); 756 sendUnorderedFrame(frame); 757 } 758 759 private void handleGoAway(GoAwayFrame frame) 760 throws IOException 761 { 762 shutdown(new IOException( 763 String.valueOf(connection.channel().getLocalAddress()) 764 +": GOAWAY received")); 765 } 766 767 /** 768 * Max frame size we are allowed to send 769 */ 770 public int getMaxSendFrameSize() { 771 int param = serverSettings.getParameter(MAX_FRAME_SIZE); 772 if (param == -1) { 773 param = DEFAULT_FRAME_SIZE; 774 } 775 return param; 776 } 777 778 /** 779 * Max frame size we will receive 780 */ 781 public int getMaxReceiveFrameSize() { 782 return clientSettings.getParameter(MAX_FRAME_SIZE); 783 } 784 785 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 786 787 private static final byte[] PREFACE_BYTES = 788 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1); 789 790 /** 791 * Sends Connection preface and Settings frame with current preferred 792 * values 793 */ 794 private void sendConnectionPreface() throws IOException { 795 Log.logTrace("{0}: start sending connection preface to {1}", 796 connection.channel().getLocalAddress(), 797 connection.address()); 798 SettingsFrame sf = new SettingsFrame(clientSettings); 799 int initialWindowSize = sf.getParameter(INITIAL_WINDOW_SIZE); 800 ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); 801 Log.logFrames(sf, "OUT"); 802 // send preface bytes and SettingsFrame together 803 HttpPublisher publisher = publisher(); 804 publisher.enqueue(List.of(buf)); 805 publisher.signalEnqueued(); 806 // mark preface sent. 807 framesController.markPrefaceSent(); 808 Log.logTrace("PREFACE_BYTES sent"); 809 Log.logTrace("Settings Frame sent"); 810 811 // send a Window update for the receive buffer we are using 812 // minus the initial 64 K specified in protocol 813 final int len = windowUpdater.initialWindowSize - initialWindowSize; 814 if (len > 0) { 815 windowUpdater.sendWindowUpdate(len); 816 } 817 // there will be an ACK to the windows update - which should 818 // cause any pending data stored before the preface was sent to be 819 // flushed (see PrefaceController). 820 Log.logTrace("finished sending connection preface"); 821 debug.log(Level.DEBUG, "Triggering processing of buffered data" 822 + " after sending connection preface"); 823 subscriber.onNext(List.of(EMPTY_TRIGGER)); 824 } 825 826 /** 827 * Returns an existing Stream with given id, or null if doesn't exist 828 */ 829 @SuppressWarnings("unchecked") 830 <T> Stream<T> getStream(int streamid) { 831 return (Stream<T>)streams.get(streamid); 832 } 833 834 /** 835 * Creates Stream with given id. 836 */ 837 final <T> Stream<T> createStream(Exchange<T> exchange) { 838 Stream<T> stream = new Stream<>(this, exchange, windowController); 839 return stream; 840 } 841 842 <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) { 843 PushGroup<?,T> pg = parent.exchange.getPushGroup(); 844 return new Stream.PushedStream<>(pg, this, pushEx); 845 } 846 847 <T> void putStream(Stream<T> stream, int streamid) { 848 // increment the reference count on the HttpClientImpl 849 // to prevent the SelectorManager thread from exiting until 850 // the stream is closed. 851 client().reference(); 852 streams.put(streamid, stream); 853 } 854 855 /** 856 * Encode the headers into a List<ByteBuffer> and then create HEADERS 857 * and CONTINUATION frames from the list and return the List<Http2Frame>. 858 */ 859 private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) { 860 List<ByteBuffer> buffers = encodeHeadersImpl( 861 getMaxSendFrameSize(), 862 frame.getAttachment().getRequestPseudoHeaders(), 863 frame.getUserHeaders(), 864 frame.getSystemHeaders()); 865 866 List<HeaderFrame> frames = new ArrayList<>(buffers.size()); 867 Iterator<ByteBuffer> bufIterator = buffers.iterator(); 868 HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next()); 869 frames.add(oframe); 870 while(bufIterator.hasNext()) { 871 oframe = new ContinuationFrame(frame.streamid(), bufIterator.next()); 872 frames.add(oframe); 873 } 874 oframe.setFlag(HeaderFrame.END_HEADERS); 875 return frames; 876 } 877 878 // Dedicated cache for headers encoding ByteBuffer. 879 // There can be no concurrent access to this buffer as all access to this buffer 880 // and its content happen within a single critical code block section protected 881 // by the sendLock. / (see sendFrame()) 882 // private final ByteBufferPool headerEncodingPool = new ByteBufferPool(); 883 884 private ByteBuffer getHeaderBuffer(int maxFrameSize) { 885 ByteBuffer buf = ByteBuffer.allocate(maxFrameSize); 886 buf.limit(maxFrameSize); 887 return buf; 888 } 889 890 /* 891 * Encodes all the headers from the given HttpHeaders into the given List 892 * of buffers. 893 * 894 * From https://tools.ietf.org/html/rfc7540#section-8.1.2 : 895 * 896 * ...Just as in HTTP/1.x, header field names are strings of ASCII 897 * characters that are compared in a case-insensitive fashion. However, 898 * header field names MUST be converted to lowercase prior to their 899 * encoding in HTTP/2... 900 */ 901 private List<ByteBuffer> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) { 902 ByteBuffer buffer = getHeaderBuffer(maxFrameSize); 903 List<ByteBuffer> buffers = new ArrayList<>(); 904 for(HttpHeaders header : headers) { 905 for (Map.Entry<String, List<String>> e : header.map().entrySet()) { 906 String lKey = e.getKey().toLowerCase(); 907 List<String> values = e.getValue(); 908 for (String value : values) { 909 hpackOut.header(lKey, value); 910 while (!hpackOut.encode(buffer)) { 911 buffer.flip(); 912 buffers.add(buffer); 913 buffer = getHeaderBuffer(maxFrameSize); 914 } 915 } 916 } 917 } 918 buffer.flip(); 919 buffers.add(buffer); 920 return buffers; 921 } 922 923 private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) { 924 oh.streamid(stream.streamid); 925 if (Log.headers()) { 926 StringBuilder sb = new StringBuilder("HEADERS FRAME (stream="); 927 sb.append(stream.streamid).append(")\n"); 928 Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders()); 929 Log.dumpHeaders(sb, " ", oh.getSystemHeaders()); 930 Log.dumpHeaders(sb, " ", oh.getUserHeaders()); 931 Log.logHeaders(sb.toString()); 932 } 933 List<HeaderFrame> frames = encodeHeaders(oh); 934 return encodeFrames(frames); 935 } 936 937 private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) { 938 if (Log.frames()) { 939 frames.forEach(f -> Log.logFrames(f, "OUT")); 940 } 941 return framesEncoder.encodeFrames(frames); 942 } 943 944 private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) { 945 Stream<?> stream = oh.getAttachment(); 946 int streamid = nextstreamid; 947 nextstreamid += 2; 948 stream.registerStream(streamid); 949 // set outgoing window here. This allows thread sending 950 // body to proceed. 951 windowController.registerStream(streamid, getInitialSendWindowSize()); 952 return stream; 953 } 954 955 private final Object sendlock = new Object(); 956 957 void sendFrame(Http2Frame frame) { 958 try { 959 HttpPublisher publisher = publisher(); 960 synchronized (sendlock) { 961 if (frame instanceof OutgoingHeaders) { 962 @SuppressWarnings("unchecked") 963 OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame; 964 Stream<?> stream = registerNewStream(oh); 965 // provide protection from inserting unordered frames between Headers and Continuation 966 publisher.enqueue(encodeHeaders(oh, stream)); 967 } else { 968 publisher.enqueue(encodeFrame(frame)); 969 } 970 } 971 publisher.signalEnqueued(); 972 } catch (IOException e) { 973 if (!closed) { 974 Log.logError(e); 975 shutdown(e); 976 } 977 } 978 } 979 980 private List<ByteBuffer> encodeFrame(Http2Frame frame) { 981 Log.logFrames(frame, "OUT"); 982 return framesEncoder.encodeFrame(frame); 983 } 984 985 void sendDataFrame(DataFrame frame) { 986 try { 987 HttpPublisher publisher = publisher(); 988 publisher.enqueue(encodeFrame(frame)); 989 publisher.signalEnqueued(); 990 } catch (IOException e) { 991 if (!closed) { 992 Log.logError(e); 993 shutdown(e); 994 } 995 } 996 } 997 998 /* 999 * Direct call of the method bypasses synchronization on "sendlock" and 1000 * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. 1001 * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. 1002 */ 1003 void sendUnorderedFrame(Http2Frame frame) { 1004 try { 1005 HttpPublisher publisher = publisher(); 1006 publisher.enqueueUnordered(encodeFrame(frame)); 1007 publisher.signalEnqueued(); 1008 } catch (IOException e) { 1009 if (!closed) { 1010 Log.logError(e); 1011 shutdown(e); 1012 } 1013 } 1014 } 1015 1016 /** 1017 * A simple tube subscriber for reading from the connection flow. 1018 */ 1019 final class Http2TubeSubscriber implements TubeSubscriber { 1020 volatile Flow.Subscription subscription; 1021 volatile boolean completed; 1022 volatile boolean dropped; 1023 volatile Throwable error; 1024 final ConcurrentLinkedQueue<ByteBuffer> queue 1025 = new ConcurrentLinkedQueue<>(); 1026 final SequentialScheduler scheduler = 1027 SequentialScheduler.synchronizedScheduler(this::processQueue); 1028 1029 final void processQueue() { 1030 try { 1031 while (!queue.isEmpty() && !scheduler.isStopped()) { 1032 ByteBuffer buffer = queue.poll(); 1033 debug.log(Level.DEBUG, 1034 "sending %d to Http2Connection.asyncReceive", 1035 buffer.remaining()); 1036 asyncReceive(buffer); 1037 } 1038 } catch (Throwable t) { 1039 Throwable x = error; 1040 if (x == null) error = t; 1041 } finally { 1042 Throwable x = error; 1043 if (x != null) { 1044 debug.log(Level.DEBUG, "Stopping scheduler", x); 1045 scheduler.stop(); 1046 Http2Connection.this.shutdown(x); 1047 } 1048 } 1049 } 1050 1051 1052 public void onSubscribe(Flow.Subscription subscription) { 1053 // supports being called multiple time. 1054 // doesn't cancel the previous subscription, since that is 1055 // most probably the same as the new subscription. 1056 assert this.subscription == null || dropped == false; 1057 this.subscription = subscription; 1058 dropped = false; 1059 // TODO FIXME: request(1) should be done by the delegate. 1060 if (!completed) { 1061 debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading"); 1062 subscription.request(Long.MAX_VALUE); 1063 } else { 1064 debug.log(Level.DEBUG, "onSubscribe: already completed"); 1065 } 1066 } 1067 1068 @Override 1069 public void onNext(List<ByteBuffer> item) { 1070 debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item) 1071 + " bytes in " + item.size() + " buffers"); 1072 queue.addAll(item); 1073 scheduler.deferOrSchedule(client().theExecutor()); 1074 } 1075 1076 @Override 1077 public void onError(Throwable throwable) { 1078 debug.log(Level.DEBUG, () -> "onError: " + throwable); 1079 error = throwable; 1080 completed = true; 1081 scheduler.deferOrSchedule(client().theExecutor()); 1082 } 1083 1084 @Override 1085 public void onComplete() { 1086 debug.log(Level.DEBUG, "EOF"); 1087 error = new EOFException("EOF reached while reading"); 1088 completed = true; 1089 scheduler.deferOrSchedule(client().theExecutor()); 1090 } 1091 1092 public void dropSubscription() { 1093 debug.log(Level.DEBUG, "dropSubscription"); 1094 // we could probably set subscription to null here... 1095 // then we might not need the 'dropped' boolean? 1096 dropped = true; 1097 } 1098 } 1099 1100 @Override 1101 public final String toString() { 1102 return dbgString(); 1103 } 1104 1105 final String dbgString() { 1106 return "Http2Connection(" 1107 + connection.getConnectionFlow() + ")"; 1108 } 1109 1110 final class LoggingHeaderDecoder extends HeaderDecoder { 1111 1112 private final HeaderDecoder delegate; 1113 private final System.Logger debugHpack = 1114 Utils.getHpackLogger(this::dbgString, DEBUG_HPACK); 1115 1116 LoggingHeaderDecoder(HeaderDecoder delegate) { 1117 this.delegate = delegate; 1118 } 1119 1120 String dbgString() { 1121 return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder"; 1122 } 1123 1124 @Override 1125 public void onDecoded(CharSequence name, CharSequence value) { 1126 delegate.onDecoded(name, value); 1127 } 1128 1129 @Override 1130 public void onIndexed(int index, 1131 CharSequence name, 1132 CharSequence value) { 1133 debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n", 1134 index, name, value); 1135 delegate.onIndexed(index, name, value); 1136 } 1137 1138 @Override 1139 public void onLiteral(int index, 1140 CharSequence name, 1141 CharSequence value, 1142 boolean valueHuffman) { 1143 debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n", 1144 index, name, value, valueHuffman); 1145 delegate.onLiteral(index, name, value, valueHuffman); 1146 } 1147 1148 @Override 1149 public void onLiteral(CharSequence name, 1150 boolean nameHuffman, 1151 CharSequence value, 1152 boolean valueHuffman) { 1153 debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n", 1154 name, nameHuffman, value, valueHuffman); 1155 delegate.onLiteral(name, nameHuffman, value, valueHuffman); 1156 } 1157 1158 @Override 1159 public void onLiteralNeverIndexed(int index, 1160 CharSequence name, 1161 CharSequence value, 1162 boolean valueHuffman) { 1163 debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n", 1164 index, name, value, valueHuffman); 1165 delegate.onLiteralNeverIndexed(index, name, value, valueHuffman); 1166 } 1167 1168 @Override 1169 public void onLiteralNeverIndexed(CharSequence name, 1170 boolean nameHuffman, 1171 CharSequence value, 1172 boolean valueHuffman) { 1173 debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n", 1174 name, nameHuffman, value, valueHuffman); 1175 delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman); 1176 } 1177 1178 @Override 1179 public void onLiteralWithIndexing(int index, 1180 CharSequence name, 1181 CharSequence value, 1182 boolean valueHuffman) { 1183 debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n", 1184 index, name, value, valueHuffman); 1185 delegate.onLiteralWithIndexing(index, name, value, valueHuffman); 1186 } 1187 1188 @Override 1189 public void onLiteralWithIndexing(CharSequence name, 1190 boolean nameHuffman, 1191 CharSequence value, 1192 boolean valueHuffman) { 1193 debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n", 1194 name, nameHuffman, value, valueHuffman); 1195 delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman); 1196 } 1197 1198 @Override 1199 public void onSizeUpdate(int capacity) { 1200 debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity); 1201 delegate.onSizeUpdate(capacity); 1202 } 1203 1204 @Override 1205 HttpHeadersImpl headers() { 1206 return delegate.headers(); 1207 } 1208 } 1209 1210 static class HeaderDecoder implements DecodingCallback { 1211 HttpHeadersImpl headers; 1212 1213 HeaderDecoder() { 1214 this.headers = new HttpHeadersImpl(); 1215 } 1216 1217 @Override 1218 public void onDecoded(CharSequence name, CharSequence value) { 1219 headers.addHeader(name.toString(), value.toString()); 1220 } 1221 1222 HttpHeadersImpl headers() { 1223 return headers; 1224 } 1225 } 1226 1227 static final class ConnectionWindowUpdateSender extends WindowUpdateSender { 1228 1229 final int initialWindowSize; 1230 public ConnectionWindowUpdateSender(Http2Connection connection, 1231 int initialWindowSize) { 1232 super(connection, initialWindowSize); 1233 this.initialWindowSize = initialWindowSize; 1234 } 1235 1236 @Override 1237 int getStreamId() { 1238 return 0; 1239 } 1240 } 1241 1242 /** 1243 * Thrown when https handshake negotiates http/1.1 alpn instead of h2 1244 */ 1245 static final class ALPNException extends IOException { 1246 private static final long serialVersionUID = 23138275393635783L; 1247 final AbstractAsyncSSLConnection connection; 1248 1249 ALPNException(String msg, AbstractAsyncSSLConnection connection) { 1250 super(msg); 1251 this.connection = connection; 1252 } 1253 1254 AbstractAsyncSSLConnection getConnection() { 1255 return connection; 1256 } 1257 } 1258 }