1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.EOFException; 29 import java.io.IOException; 30 import java.io.UncheckedIOException; 31 import java.net.InetSocketAddress; 32 import java.net.URI; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.StandardCharsets; 35 import java.util.Iterator; 36 import java.util.LinkedList; 37 import java.util.List; 38 import java.util.Locale; 39 import java.util.Map; 40 import java.util.Set; 41 import java.util.concurrent.CompletableFuture; 42 import java.util.ArrayList; 43 import java.util.Objects; 44 import java.util.concurrent.ConcurrentHashMap; 45 import java.util.concurrent.ConcurrentLinkedQueue; 46 import java.util.concurrent.Flow; 47 import java.util.function.Function; 48 import java.util.function.Supplier; 49 import javax.net.ssl.SSLEngine; 50 import javax.net.ssl.SSLException; 51 import java.net.http.HttpClient; 52 import java.net.http.HttpHeaders; 53 import jdk.internal.net.http.HttpConnection.HttpPublisher; 54 import jdk.internal.net.http.common.FlowTube; 55 import jdk.internal.net.http.common.FlowTube.TubeSubscriber; 56 import jdk.internal.net.http.common.HttpHeadersBuilder; 57 import jdk.internal.net.http.common.Log; 58 import jdk.internal.net.http.common.Logger; 59 import jdk.internal.net.http.common.MinimalFuture; 60 import jdk.internal.net.http.common.SequentialScheduler; 61 import jdk.internal.net.http.common.Utils; 62 import jdk.internal.net.http.frame.ContinuationFrame; 63 import jdk.internal.net.http.frame.DataFrame; 64 import jdk.internal.net.http.frame.ErrorFrame; 65 import jdk.internal.net.http.frame.FramesDecoder; 66 import jdk.internal.net.http.frame.FramesEncoder; 67 import jdk.internal.net.http.frame.GoAwayFrame; 68 import jdk.internal.net.http.frame.HeaderFrame; 69 import jdk.internal.net.http.frame.HeadersFrame; 70 import jdk.internal.net.http.frame.Http2Frame; 71 import jdk.internal.net.http.frame.MalformedFrame; 72 import jdk.internal.net.http.frame.OutgoingHeaders; 73 import jdk.internal.net.http.frame.PingFrame; 74 import jdk.internal.net.http.frame.PushPromiseFrame; 75 import jdk.internal.net.http.frame.ResetFrame; 76 import jdk.internal.net.http.frame.SettingsFrame; 77 import jdk.internal.net.http.frame.WindowUpdateFrame; 78 import jdk.internal.net.http.hpack.Encoder; 79 import jdk.internal.net.http.hpack.Decoder; 80 import jdk.internal.net.http.hpack.DecodingCallback; 81 import static java.nio.charset.StandardCharsets.UTF_8; 82 import static jdk.internal.net.http.frame.SettingsFrame.*; 83 84 /** 85 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used 86 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff. 87 * 88 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs 89 * to a HttpClientImpl. 90 * 91 * Creation cases: 92 * 1) upgraded HTTP/1.1 plain tcp connection 93 * 2) prior knowledge directly created plain tcp connection 94 * 3) directly created HTTP/2 SSL connection which uses ALPN. 95 * 96 * Sending is done by writing directly to underlying HttpConnection object which 97 * is operating in async mode. No flow control applies on output at this level 98 * and all writes are just executed as puts to an output Q belonging to HttpConnection 99 * Flow control is implemented by HTTP/2 protocol itself. 100 * 101 * Hpack header compression 102 * and outgoing stream creation is also done here, because these operations 103 * must be synchronized at the socket level. Stream objects send frames simply 104 * by placing them on the connection's output Queue. sendFrame() is called 105 * from a higher level (Stream) thread. 106 * 107 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles 108 * incoming Http2Frames, and directs them to the appropriate Stream.incoming() 109 * or handles them directly itself. This thread performs hpack decompression 110 * and incoming stream creation (Server push). Incoming frames destined for a 111 * stream are provided by calling Stream.incoming(). 112 */ 113 class Http2Connection { 114 115 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 116 final static Logger DEBUG_LOGGER = 117 Utils.getDebugLogger("Http2Connection"::toString, Utils.DEBUG); 118 private final Logger debugHpack = 119 Utils.getHpackLogger(this::dbgString, Utils.DEBUG_HPACK); 120 static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0); 121 122 static private final int MAX_CLIENT_STREAM_ID = Integer.MAX_VALUE; // 2147483647 123 static private final int MAX_SERVER_STREAM_ID = Integer.MAX_VALUE - 1; // 2147483646 124 125 /** 126 * Flag set when no more streams to be opened on this connection. 127 * Two cases where it is used. 128 * 129 * 1. Two connections to the same server were opened concurrently, in which 130 * case one of them will be put in the cache, and the second will expire 131 * when all its opened streams (which usually should be a single client 132 * stream + possibly some additional push-promise server streams) complete. 133 * 2. A cached connection reaches its maximum number of streams (~ 2^31-1) 134 * either server / or client allocated, in which case it will be taken 135 * out of the cache - allowing a new connection to replace it. It will 136 * expire when all its still open streams (which could be many) eventually 137 * complete. 138 */ 139 private boolean finalStream; 140 141 /* 142 * ByteBuffer pooling strategy for HTTP/2 protocol. 143 * 144 * In general there are 4 points where ByteBuffers are used: 145 * - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing 146 * encrypted data in case of SSL connection. 147 * 148 * 1. Outgoing frames encoded to ByteBuffers. 149 * 150 * Outgoing ByteBuffers are created with required size and frequently 151 * small (except DataFrames, etc). At this place no pools at all. All 152 * outgoing buffers should eventually be collected by GC. 153 * 154 * 2. Incoming ByteBuffers (decoded to frames). 155 * 156 * Here, total elimination of BB pool is not a good idea. 157 * We don't know how many bytes we will receive through network. 158 * 159 * A possible future improvement ( currently not implemented ): 160 * Allocate buffers of reasonable size. The following life of the BB: 161 * - If all frames decoded from the BB are other than DataFrame and 162 * HeaderFrame (and HeaderFrame subclasses) BB is returned to pool, 163 * - If a DataFrame is decoded from the BB. In that case DataFrame refers 164 * to sub-buffer obtained by slice(). Such a BB is never returned to the 165 * pool and will eventually be GC'ed. 166 * - If a HeadersFrame is decoded from the BB. Then header decoding is 167 * performed inside processFrame method and the buffer could be release 168 * back to pool. 169 * 170 * 3. SSL encrypted buffers ( received ). 171 * 172 * The current implementation recycles encrypted buffers read from the 173 * channel. The pool of buffers has a maximum size of 3, SocketTube.MAX_BUFFERS, 174 * direct buffers which are shared by all connections on a given client. 175 * The pool is used by all SSL connections - whether HTTP/1.1 or HTTP/2, 176 * but only for SSL encrypted buffers that circulate between the SocketTube 177 * Publisher and the SSLFlowDelegate Reader. Limiting the pool to this 178 * particular segment allows the use of direct buffers, thus avoiding any 179 * additional copy in the NIO socket channel implementation. See 180 * HttpClientImpl.SSLDirectBufferSupplier, SocketTube.SSLDirectBufferSource, 181 * and SSLTube.recycler. 182 */ 183 184 185 // A small class that allows to control frames with respect to the state of 186 // the connection preface. Any data received before the connection 187 // preface is sent will be buffered. 188 private final class FramesController { 189 volatile boolean prefaceSent; 190 volatile List<ByteBuffer> pending; 191 192 boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf) 193 throws IOException 194 { 195 // if preface is not sent, buffers data in the pending list 196 if (!prefaceSent) { 197 if (debug.on()) 198 debug.log("Preface not sent: buffering %d", buf.remaining()); 199 synchronized (this) { 200 if (!prefaceSent) { 201 if (pending == null) pending = new ArrayList<>(); 202 pending.add(buf); 203 if (debug.on()) 204 debug.log("there are now %d bytes buffered waiting for preface to be sent" 205 + Utils.remaining(pending) 206 ); 207 return false; 208 } 209 } 210 } 211 212 // Preface is sent. Checks for pending data and flush it. 213 // We rely on this method being called from within the Http2TubeSubscriber 214 // scheduler, so we know that no other thread could execute this method 215 // concurrently while we're here. 216 // This ensures that later incoming buffers will not 217 // be processed before we have flushed the pending queue. 218 // No additional synchronization is therefore necessary here. 219 List<ByteBuffer> pending = this.pending; 220 this.pending = null; 221 if (pending != null) { 222 // flush pending data 223 if (debug.on()) debug.log(() -> "Processing buffered data: " 224 + Utils.remaining(pending)); 225 for (ByteBuffer b : pending) { 226 decoder.decode(b); 227 } 228 } 229 // push the received buffer to the frames decoder. 230 if (buf != EMPTY_TRIGGER) { 231 if (debug.on()) debug.log("Processing %d", buf.remaining()); 232 decoder.decode(buf); 233 } 234 return true; 235 } 236 237 // Mark that the connection preface is sent 238 void markPrefaceSent() { 239 assert !prefaceSent; 240 synchronized (this) { 241 prefaceSent = true; 242 } 243 } 244 } 245 246 volatile boolean closed; 247 248 //------------------------------------- 249 final HttpConnection connection; 250 private final Http2ClientImpl client2; 251 private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>(); 252 private int nextstreamid; 253 private int nextPushStream = 2; 254 // actual stream ids are not allocated until the Headers frame is ready 255 // to be sent. The following two fields are updated as soon as a stream 256 // is created and assigned to a connection. They are checked before 257 // assigning a stream to a connection. 258 private int lastReservedClientStreamid = 1; 259 private int lastReservedServerStreamid = 0; 260 private int numReservedClientStreams = 0; // count of current streams 261 private int numReservedServerStreams = 0; // count of current streams 262 private final Encoder hpackOut; 263 private final Decoder hpackIn; 264 final SettingsFrame clientSettings; 265 private volatile SettingsFrame serverSettings; 266 private final String key; // for HttpClientImpl.connections map 267 private final FramesDecoder framesDecoder; 268 private final FramesEncoder framesEncoder = new FramesEncoder(); 269 270 /** 271 * Send Window controller for both connection and stream windows. 272 * Each of this connection's Streams MUST use this controller. 273 */ 274 private final WindowController windowController = new WindowController(); 275 private final FramesController framesController = new FramesController(); 276 private final Http2TubeSubscriber subscriber; 277 final ConnectionWindowUpdateSender windowUpdater; 278 private volatile Throwable cause; 279 private volatile Supplier<ByteBuffer> initial; 280 281 static final int DEFAULT_FRAME_SIZE = 16 * 1024; 282 283 284 // TODO: need list of control frames from other threads 285 // that need to be sent 286 287 private Http2Connection(HttpConnection connection, 288 Http2ClientImpl client2, 289 int nextstreamid, 290 String key) { 291 this.connection = connection; 292 this.client2 = client2; 293 this.subscriber = new Http2TubeSubscriber(client2.client()); 294 this.nextstreamid = nextstreamid; 295 this.key = key; 296 this.clientSettings = this.client2.getClientSettings(); 297 this.framesDecoder = new FramesDecoder(this::processFrame, 298 clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); 299 // serverSettings will be updated by server 300 this.serverSettings = SettingsFrame.defaultRFCSettings(); 301 this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 302 this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 303 if (debugHpack.on()) { 304 debugHpack.log("For the record:" + super.toString()); 305 debugHpack.log("Decoder created: %s", hpackIn); 306 debugHpack.log("Encoder created: %s", hpackOut); 307 } 308 this.windowUpdater = new ConnectionWindowUpdateSender(this, 309 client2.getConnectionWindowSize(clientSettings)); 310 } 311 312 /** 313 * Case 1) Create from upgraded HTTP/1.1 connection. 314 * Is ready to use. Can't be SSL. exchange is the Exchange 315 * that initiated the connection, whose response will be delivered 316 * on a Stream. 317 */ 318 private Http2Connection(HttpConnection connection, 319 Http2ClientImpl client2, 320 Exchange<?> exchange, 321 Supplier<ByteBuffer> initial) 322 throws IOException, InterruptedException 323 { 324 this(connection, 325 client2, 326 3, // stream 1 is registered during the upgrade 327 keyFor(connection)); 328 reserveStream(true); 329 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 330 331 Stream<?> initialStream = createStream(exchange); 332 initialStream.registerStream(1); 333 windowController.registerStream(1, getInitialSendWindowSize()); 334 initialStream.requestSent(); 335 // Upgrading: 336 // set callbacks before sending preface - makes sure anything that 337 // might be sent by the server will come our way. 338 this.initial = initial; 339 connectFlows(connection); 340 sendConnectionPreface(); 341 } 342 343 // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving 344 // agreement from the server. Async style but completes immediately, because 345 // the connection is already connected. 346 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, 347 Http2ClientImpl client2, 348 Exchange<?> exchange, 349 Supplier<ByteBuffer> initial) 350 { 351 return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial)); 352 } 353 354 // Requires TLS handshake. So, is really async 355 static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request, 356 Http2ClientImpl h2client) { 357 assert request.secure(); 358 AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection) 359 HttpConnection.getConnection(request.getAddress(), 360 h2client.client(), 361 request, 362 HttpClient.Version.HTTP_2); 363 364 return connection.connectAsync() 365 .thenCompose(unused -> checkSSLConfig(connection)) 366 .thenCompose(notused-> { 367 CompletableFuture<Http2Connection> cf = new MinimalFuture<>(); 368 try { 369 Http2Connection hc = new Http2Connection(request, h2client, connection); 370 cf.complete(hc); 371 } catch (IOException e) { 372 cf.completeExceptionally(e); 373 } 374 return cf; } ); 375 } 376 377 /** 378 * Cases 2) 3) 379 * 380 * request is request to be sent. 381 */ 382 private Http2Connection(HttpRequestImpl request, 383 Http2ClientImpl h2client, 384 HttpConnection connection) 385 throws IOException 386 { 387 this(connection, 388 h2client, 389 1, 390 keyFor(request.uri(), request.proxy())); 391 392 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 393 394 // safe to resume async reading now. 395 connectFlows(connection); 396 sendConnectionPreface(); 397 } 398 399 private void connectFlows(HttpConnection connection) { 400 FlowTube tube = connection.getConnectionFlow(); 401 // Connect the flow to our Http2TubeSubscriber: 402 tube.connectFlows(connection.publisher(), subscriber); 403 } 404 405 final HttpClientImpl client() { 406 return client2.client(); 407 } 408 409 // call these before assigning a request/stream to a connection 410 // if false returned then a new Http2Connection is required 411 // if true, the the stream may be assigned to this connection 412 // for server push, if false returned, then the stream should be cancelled 413 synchronized boolean reserveStream(boolean clientInitiated) throws IOException { 414 if (finalStream) { 415 return false; 416 } 417 if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) { 418 setFinalStream(); 419 client2.deleteConnection(this); 420 return false; 421 } else if (!clientInitiated && (lastReservedServerStreamid + 2) >= MAX_SERVER_STREAM_ID) { 422 setFinalStream(); 423 client2.deleteConnection(this); 424 return false; 425 } 426 if (clientInitiated) 427 lastReservedClientStreamid+=2; 428 else 429 lastReservedServerStreamid+=2; 430 431 assert numReservedClientStreams >= 0; 432 assert numReservedServerStreams >= 0; 433 if (clientInitiated &&numReservedClientStreams >= maxConcurrentClientInitiatedStreams()) { 434 throw new IOException("too many concurrent streams"); 435 } else if (clientInitiated) { 436 numReservedClientStreams++; 437 } 438 if (!clientInitiated && numReservedServerStreams >= maxConcurrentServerInitiatedStreams()) { 439 return false; 440 } else if (!clientInitiated) { 441 numReservedServerStreams++; 442 } 443 return true; 444 } 445 446 /** 447 * Throws an IOException if h2 was not negotiated 448 */ 449 private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) { 450 assert aconn.isSecure(); 451 452 Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> { 453 CompletableFuture<Void> cf = new MinimalFuture<>(); 454 SSLEngine engine = aconn.getEngine(); 455 assert Objects.equals(alpn, engine.getApplicationProtocol()); 456 457 DEBUG_LOGGER.log("checkSSLConfig: alpn: %s", alpn ); 458 459 if (alpn == null || !alpn.equals("h2")) { 460 String msg; 461 if (alpn == null) { 462 Log.logSSL("ALPN not supported"); 463 msg = "ALPN not supported"; 464 } else { 465 switch (alpn) { 466 case "": 467 Log.logSSL(msg = "No ALPN negotiated"); 468 break; 469 case "http/1.1": 470 Log.logSSL( msg = "HTTP/1.1 ALPN returned"); 471 break; 472 default: 473 Log.logSSL(msg = "Unexpected ALPN: " + alpn); 474 cf.completeExceptionally(new IOException(msg)); 475 } 476 } 477 cf.completeExceptionally(new ALPNException(msg, aconn)); 478 return cf; 479 } 480 cf.complete(null); 481 return cf; 482 }; 483 484 return aconn.getALPN() 485 .whenComplete((r,t) -> { 486 if (t != null && t instanceof SSLException) { 487 // something went wrong during the initial handshake 488 // close the connection 489 aconn.close(); 490 } 491 }) 492 .thenCompose(checkAlpnCF); 493 } 494 495 synchronized boolean finalStream() { 496 return finalStream; 497 } 498 499 /** 500 * Mark this connection so no more streams created on it and it will close when 501 * all are complete. 502 */ 503 synchronized void setFinalStream() { 504 finalStream = true; 505 } 506 507 static String keyFor(HttpConnection connection) { 508 boolean isProxy = connection.isProxied(); // tunnel or plain clear connection through proxy 509 boolean isSecure = connection.isSecure(); 510 InetSocketAddress addr = connection.address(); 511 512 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort()); 513 } 514 515 static String keyFor(URI uri, InetSocketAddress proxy) { 516 boolean isSecure = uri.getScheme().equalsIgnoreCase("https"); 517 boolean isProxy = proxy != null; 518 519 String host; 520 int port; 521 522 if (proxy != null && !isSecure) { 523 // clear connection through proxy: use 524 // proxy host / proxy port 525 host = proxy.getHostString(); 526 port = proxy.getPort(); 527 } else { 528 // either secure tunnel connection through proxy 529 // or direct connection to host, but in either 530 // case only that host can be reached through 531 // the connection: use target host / target port 532 host = uri.getHost(); 533 port = uri.getPort(); 534 } 535 return keyString(isSecure, isProxy, host, port); 536 } 537 538 // {C,S}:{H:P}:host:port 539 // C indicates clear text connection "http" 540 // S indicates secure "https" 541 // H indicates host (direct) connection 542 // P indicates proxy 543 // Eg: "S:H:foo.com:80" 544 static String keyString(boolean secure, boolean proxy, String host, int port) { 545 if (secure && port == -1) 546 port = 443; 547 else if (!secure && port == -1) 548 port = 80; 549 return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port; 550 } 551 552 String key() { 553 return this.key; 554 } 555 556 boolean offerConnection() { 557 return client2.offerConnection(this); 558 } 559 560 private HttpPublisher publisher() { 561 return connection.publisher(); 562 } 563 564 private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) 565 throws IOException 566 { 567 if (debugHpack.on()) debugHpack.log("decodeHeaders(%s)", decoder); 568 569 boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); 570 571 List<ByteBuffer> buffers = frame.getHeaderBlock(); 572 int len = buffers.size(); 573 for (int i = 0; i < len; i++) { 574 ByteBuffer b = buffers.get(i); 575 hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder); 576 } 577 } 578 579 final int getInitialSendWindowSize() { 580 return serverSettings.getParameter(INITIAL_WINDOW_SIZE); 581 } 582 583 final int maxConcurrentClientInitiatedStreams() { 584 return serverSettings.getParameter(MAX_CONCURRENT_STREAMS); 585 } 586 587 final int maxConcurrentServerInitiatedStreams() { 588 return clientSettings.getParameter(MAX_CONCURRENT_STREAMS); 589 } 590 591 void close() { 592 Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address()); 593 GoAwayFrame f = new GoAwayFrame(0, 594 ErrorFrame.NO_ERROR, 595 "Requested by user".getBytes(UTF_8)); 596 // TODO: set last stream. For now zero ok. 597 sendFrame(f); 598 } 599 600 long count; 601 final void asyncReceive(ByteBuffer buffer) { 602 // We don't need to read anything and 603 // we don't want to send anything back to the server 604 // until the connection preface has been sent. 605 // Therefore we're going to wait if needed before reading 606 // (and thus replying) to anything. 607 // Starting to reply to something (e.g send an ACK to a 608 // SettingsFrame sent by the server) before the connection 609 // preface is fully sent might result in the server 610 // sending a GOAWAY frame with 'invalid_preface'. 611 // 612 // Note: asyncReceive is only called from the Http2TubeSubscriber 613 // sequential scheduler. 614 try { 615 Supplier<ByteBuffer> bs = initial; 616 // ensure that we always handle the initial buffer first, 617 // if any. 618 if (bs != null) { 619 initial = null; 620 ByteBuffer b = bs.get(); 621 if (b.hasRemaining()) { 622 long c = ++count; 623 if (debug.on()) 624 debug.log(() -> "H2 Receiving Initial(" + c +"): " + b.remaining()); 625 framesController.processReceivedData(framesDecoder, b); 626 } 627 } 628 ByteBuffer b = buffer; 629 // the Http2TubeSubscriber scheduler ensures that the order of incoming 630 // buffers is preserved. 631 if (b == EMPTY_TRIGGER) { 632 if (debug.on()) debug.log("H2 Received EMPTY_TRIGGER"); 633 boolean prefaceSent = framesController.prefaceSent; 634 assert prefaceSent; 635 // call framesController.processReceivedData to potentially 636 // trigger the processing of all the data buffered there. 637 framesController.processReceivedData(framesDecoder, buffer); 638 if (debug.on()) debug.log("H2 processed buffered data"); 639 } else { 640 long c = ++count; 641 if (debug.on()) 642 debug.log("H2 Receiving(%d): %d", c, b.remaining()); 643 framesController.processReceivedData(framesDecoder, buffer); 644 if (debug.on()) debug.log("H2 processed(%d)", c); 645 } 646 } catch (Throwable e) { 647 String msg = Utils.stackTrace(e); 648 Log.logTrace(msg); 649 shutdown(e); 650 } 651 } 652 653 Throwable getRecordedCause() { 654 return cause; 655 } 656 657 void shutdown(Throwable t) { 658 if (debug.on()) debug.log(() -> "Shutting down h2c (closed="+closed+"): " + t); 659 if (closed == true) return; 660 synchronized (this) { 661 if (closed == true) return; 662 closed = true; 663 } 664 if (Log.errors()) { 665 if (!(t instanceof EOFException) || isActive()) { 666 Log.logError(t); 667 } else if (t != null) { 668 Log.logError("Shutting down connection: {0}", t.getMessage()); 669 } 670 } 671 Throwable initialCause = this.cause; 672 if (initialCause == null) this.cause = t; 673 client2.deleteConnection(this); 674 List<Stream<?>> c = new LinkedList<>(streams.values()); 675 for (Stream<?> s : c) { 676 try { 677 s.connectionClosing(t); 678 } catch (Throwable e) { 679 Log.logError("Failed to close stream {0}: {1}", s.streamid, e); 680 } 681 } 682 connection.close(); 683 } 684 685 /** 686 * Streams initiated by a client MUST use odd-numbered stream 687 * identifiers; those initiated by the server MUST use even-numbered 688 * stream identifiers. 689 */ 690 private static final boolean isServerInitiatedStream(int streamid) { 691 return (streamid & 0x1) == 0; 692 } 693 694 /** 695 * Handles stream 0 (common) frames that apply to whole connection and passes 696 * other stream specific frames to that Stream object. 697 * 698 * Invokes Stream.incoming() which is expected to process frame without 699 * blocking. 700 */ 701 void processFrame(Http2Frame frame) throws IOException { 702 Log.logFrames(frame, "IN"); 703 int streamid = frame.streamid(); 704 if (frame instanceof MalformedFrame) { 705 Log.logError(((MalformedFrame) frame).getMessage()); 706 if (streamid == 0) { 707 framesDecoder.close("Malformed frame on stream 0"); 708 protocolError(((MalformedFrame) frame).getErrorCode(), 709 ((MalformedFrame) frame).getMessage()); 710 } else { 711 if (debug.on()) 712 debug.log(() -> "Reset stream: " + ((MalformedFrame) frame).getMessage()); 713 resetStream(streamid, ((MalformedFrame) frame).getErrorCode()); 714 } 715 return; 716 } 717 if (streamid == 0) { 718 handleConnectionFrame(frame); 719 } else { 720 if (frame instanceof SettingsFrame) { 721 // The stream identifier for a SETTINGS frame MUST be zero 722 framesDecoder.close( 723 "The stream identifier for a SETTINGS frame MUST be zero"); 724 protocolError(GoAwayFrame.PROTOCOL_ERROR); 725 return; 726 } 727 728 Stream<?> stream = getStream(streamid); 729 if (stream == null) { 730 // Should never receive a frame with unknown stream id 731 732 if (frame instanceof HeaderFrame) { 733 // always decode the headers as they may affect 734 // connection-level HPACK decoding state 735 DecodingCallback decoder = new ValidatingHeadersConsumer(); 736 try { 737 decodeHeaders((HeaderFrame) frame, decoder); 738 } catch (UncheckedIOException e) { 739 protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage()); 740 return; 741 } 742 } 743 744 if (!(frame instanceof ResetFrame)) { 745 if (frame instanceof DataFrame) { 746 dropDataFrame((DataFrame)frame); 747 } 748 if (isServerInitiatedStream(streamid)) { 749 if (streamid < nextPushStream) { 750 // trailing data on a cancelled push promise stream, 751 // reset will already have been sent, ignore 752 Log.logTrace("Ignoring cancelled push promise frame " + frame); 753 } else { 754 resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 755 } 756 } else if (streamid >= nextstreamid) { 757 // otherwise the stream has already been reset/closed 758 resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 759 } 760 } 761 return; 762 } 763 if (frame instanceof PushPromiseFrame) { 764 PushPromiseFrame pp = (PushPromiseFrame)frame; 765 try { 766 handlePushPromise(stream, pp); 767 } catch (UncheckedIOException e) { 768 protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage()); 769 return; 770 } 771 } else if (frame instanceof HeaderFrame) { 772 // decode headers (or continuation) 773 try { 774 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer()); 775 } catch (UncheckedIOException e) { 776 protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage()); 777 return; 778 } 779 stream.incoming(frame); 780 } else { 781 stream.incoming(frame); 782 } 783 } 784 } 785 786 final void dropDataFrame(DataFrame df) { 787 if (closed) return; 788 if (debug.on()) { 789 debug.log("Dropping data frame for stream %d (%d payload bytes)", 790 df.streamid(), df.payloadLength()); 791 } 792 ensureWindowUpdated(df); 793 } 794 795 final void ensureWindowUpdated(DataFrame df) { 796 try { 797 if (closed) return; 798 int length = df.payloadLength(); 799 if (length > 0) { 800 windowUpdater.update(length); 801 } 802 } catch(Throwable t) { 803 Log.logError("Unexpected exception while updating window: {0}", (Object)t); 804 } 805 } 806 807 private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp) 808 throws IOException 809 { 810 // always decode the headers as they may affect connection-level HPACK 811 // decoding state 812 HeaderDecoder decoder = new HeaderDecoder(); 813 decodeHeaders(pp, decoder); 814 815 HttpRequestImpl parentReq = parent.request; 816 int promisedStreamid = pp.getPromisedStream(); 817 if (promisedStreamid != nextPushStream) { 818 resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); 819 return; 820 } else if (!reserveStream(false)) { 821 resetStream(promisedStreamid, ResetFrame.REFUSED_STREAM); 822 return; 823 } else { 824 nextPushStream += 2; 825 } 826 827 HttpHeaders headers = decoder.headers(); 828 HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); 829 Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi); 830 Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch); 831 pushExch.exchImpl = pushStream; 832 pushStream.registerStream(promisedStreamid); 833 parent.incoming_pushPromise(pushReq, pushStream); 834 } 835 836 private void handleConnectionFrame(Http2Frame frame) 837 throws IOException 838 { 839 switch (frame.type()) { 840 case SettingsFrame.TYPE: 841 handleSettings((SettingsFrame)frame); 842 break; 843 case PingFrame.TYPE: 844 handlePing((PingFrame)frame); 845 break; 846 case GoAwayFrame.TYPE: 847 handleGoAway((GoAwayFrame)frame); 848 break; 849 case WindowUpdateFrame.TYPE: 850 handleWindowUpdate((WindowUpdateFrame)frame); 851 break; 852 default: 853 protocolError(ErrorFrame.PROTOCOL_ERROR); 854 } 855 } 856 857 void resetStream(int streamid, int code) throws IOException { 858 try { 859 if (connection.channel().isOpen()) { 860 // no need to try & send a reset frame if the 861 // connection channel is already closed. 862 Log.logError( 863 "Resetting stream {0,number,integer} with error code {1,number,integer}", 864 streamid, code); 865 ResetFrame frame = new ResetFrame(streamid, code); 866 sendFrame(frame); 867 } else if (debug.on()) { 868 debug.log("Channel already closed, no need to reset stream %d", 869 streamid); 870 } 871 } finally { 872 decrementStreamsCount(streamid); 873 closeStream(streamid); 874 } 875 } 876 877 // reduce count of streams by 1 if stream still exists 878 synchronized void decrementStreamsCount(int streamid) { 879 Stream<?> s = streams.get(streamid); 880 if (s == null || !s.deRegister()) 881 return; 882 if (streamid % 2 == 1) { 883 numReservedClientStreams--; 884 assert numReservedClientStreams >= 0 : 885 "negative client stream count for stream=" + streamid; 886 } else { 887 numReservedServerStreams--; 888 assert numReservedServerStreams >= 0 : 889 "negative server stream count for stream=" + streamid; 890 } 891 } 892 893 void closeStream(int streamid) { 894 if (debug.on()) debug.log("Closed stream %d", streamid); 895 boolean isClient = (streamid % 2) == 1; 896 Stream<?> s = streams.remove(streamid); 897 if (s != null) { 898 // decrement the reference count on the HttpClientImpl 899 // to allow the SelectorManager thread to exit if no 900 // other operation is pending and the facade is no 901 // longer referenced. 902 client().streamUnreference(); 903 } 904 // ## Remove s != null. It is a hack for delayed cancellation,reset 905 if (s != null && !(s instanceof Stream.PushedStream)) { 906 // Since PushStreams have no request body, then they have no 907 // corresponding entry in the window controller. 908 windowController.removeStream(streamid); 909 } 910 if (finalStream() && streams.isEmpty()) { 911 // should be only 1 stream, but there might be more if server push 912 close(); 913 } 914 } 915 916 /** 917 * Increments this connection's send Window by the amount in the given frame. 918 */ 919 private void handleWindowUpdate(WindowUpdateFrame f) 920 throws IOException 921 { 922 int amount = f.getUpdate(); 923 if (amount <= 0) { 924 // ## temporarily disable to workaround a bug in Jetty where it 925 // ## sends Window updates with a 0 update value. 926 //protocolError(ErrorFrame.PROTOCOL_ERROR); 927 } else { 928 boolean success = windowController.increaseConnectionWindow(amount); 929 if (!success) { 930 protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow 931 } 932 } 933 } 934 935 private void protocolError(int errorCode) 936 throws IOException 937 { 938 protocolError(errorCode, null); 939 } 940 941 private void protocolError(int errorCode, String msg) 942 throws IOException 943 { 944 GoAwayFrame frame = new GoAwayFrame(0, errorCode); 945 sendFrame(frame); 946 shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg)))); 947 } 948 949 private void handleSettings(SettingsFrame frame) 950 throws IOException 951 { 952 assert frame.streamid() == 0; 953 if (!frame.getFlag(SettingsFrame.ACK)) { 954 int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE); 955 if (newWindowSize != -1) { 956 int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE); 957 int diff = newWindowSize - oldWindowSize; 958 if (diff != 0) { 959 windowController.adjustActiveStreams(diff); 960 } 961 } 962 963 serverSettings.update(frame); 964 sendFrame(new SettingsFrame(SettingsFrame.ACK)); 965 } 966 } 967 968 private void handlePing(PingFrame frame) 969 throws IOException 970 { 971 frame.setFlag(PingFrame.ACK); 972 sendUnorderedFrame(frame); 973 } 974 975 private void handleGoAway(GoAwayFrame frame) 976 throws IOException 977 { 978 shutdown(new IOException( 979 String.valueOf(connection.channel().getLocalAddress()) 980 +": GOAWAY received")); 981 } 982 983 /** 984 * Max frame size we are allowed to send 985 */ 986 public int getMaxSendFrameSize() { 987 int param = serverSettings.getParameter(MAX_FRAME_SIZE); 988 if (param == -1) { 989 param = DEFAULT_FRAME_SIZE; 990 } 991 return param; 992 } 993 994 /** 995 * Max frame size we will receive 996 */ 997 public int getMaxReceiveFrameSize() { 998 return clientSettings.getParameter(MAX_FRAME_SIZE); 999 } 1000 1001 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 1002 1003 private static final byte[] PREFACE_BYTES = 1004 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1); 1005 1006 /** 1007 * Sends Connection preface and Settings frame with current preferred 1008 * values 1009 */ 1010 private void sendConnectionPreface() throws IOException { 1011 Log.logTrace("{0}: start sending connection preface to {1}", 1012 connection.channel().getLocalAddress(), 1013 connection.address()); 1014 SettingsFrame sf = new SettingsFrame(clientSettings); 1015 ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); 1016 Log.logFrames(sf, "OUT"); 1017 // send preface bytes and SettingsFrame together 1018 HttpPublisher publisher = publisher(); 1019 publisher.enqueueUnordered(List.of(buf)); 1020 publisher.signalEnqueued(); 1021 // mark preface sent. 1022 framesController.markPrefaceSent(); 1023 Log.logTrace("PREFACE_BYTES sent"); 1024 Log.logTrace("Settings Frame sent"); 1025 1026 // send a Window update for the receive buffer we are using 1027 // minus the initial 64 K -1 specified in protocol: 1028 // RFC 7540, Section 6.9.2: 1029 // "[...] the connection flow-control window is set to the default 1030 // initial window size until a WINDOW_UPDATE frame is received." 1031 // 1032 // Note that the default initial window size, not to be confused 1033 // with the initial window size, is defined by RFC 7540 as 1034 // 64K -1. 1035 final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE; 1036 if (len != 0) { 1037 if (Log.channel()) { 1038 Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})", 1039 len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE); 1040 } 1041 windowUpdater.sendWindowUpdate(len); 1042 } 1043 // there will be an ACK to the windows update - which should 1044 // cause any pending data stored before the preface was sent to be 1045 // flushed (see PrefaceController). 1046 Log.logTrace("finished sending connection preface"); 1047 if (debug.on()) 1048 debug.log("Triggering processing of buffered data" 1049 + " after sending connection preface"); 1050 subscriber.onNext(List.of(EMPTY_TRIGGER)); 1051 } 1052 1053 /** 1054 * Returns an existing Stream with given id, or null if doesn't exist 1055 */ 1056 @SuppressWarnings("unchecked") 1057 <T> Stream<T> getStream(int streamid) { 1058 return (Stream<T>)streams.get(streamid); 1059 } 1060 1061 /** 1062 * Creates Stream with given id. 1063 */ 1064 final <T> Stream<T> createStream(Exchange<T> exchange) { 1065 Stream<T> stream = new Stream<>(this, exchange, windowController); 1066 return stream; 1067 } 1068 1069 <T> Stream.PushedStream<T> createPushStream(Stream<T> parent, Exchange<T> pushEx) { 1070 PushGroup<T> pg = parent.exchange.getPushGroup(); 1071 return new Stream.PushedStream<>(pg, this, pushEx); 1072 } 1073 1074 <T> void putStream(Stream<T> stream, int streamid) { 1075 // increment the reference count on the HttpClientImpl 1076 // to prevent the SelectorManager thread from exiting until 1077 // the stream is closed. 1078 client().streamReference(); 1079 streams.put(streamid, stream); 1080 } 1081 1082 /** 1083 * Encode the headers into a List<ByteBuffer> and then create HEADERS 1084 * and CONTINUATION frames from the list and return the List<Http2Frame>. 1085 */ 1086 private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) { 1087 List<ByteBuffer> buffers = encodeHeadersImpl( 1088 getMaxSendFrameSize(), 1089 frame.getAttachment().getRequestPseudoHeaders(), 1090 frame.getUserHeaders(), 1091 frame.getSystemHeaders()); 1092 1093 List<HeaderFrame> frames = new ArrayList<>(buffers.size()); 1094 Iterator<ByteBuffer> bufIterator = buffers.iterator(); 1095 HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next()); 1096 frames.add(oframe); 1097 while(bufIterator.hasNext()) { 1098 oframe = new ContinuationFrame(frame.streamid(), bufIterator.next()); 1099 frames.add(oframe); 1100 } 1101 oframe.setFlag(HeaderFrame.END_HEADERS); 1102 return frames; 1103 } 1104 1105 // Dedicated cache for headers encoding ByteBuffer. 1106 // There can be no concurrent access to this buffer as all access to this buffer 1107 // and its content happen within a single critical code block section protected 1108 // by the sendLock. / (see sendFrame()) 1109 // private final ByteBufferPool headerEncodingPool = new ByteBufferPool(); 1110 1111 private ByteBuffer getHeaderBuffer(int maxFrameSize) { 1112 ByteBuffer buf = ByteBuffer.allocate(maxFrameSize); 1113 buf.limit(maxFrameSize); 1114 return buf; 1115 } 1116 1117 /* 1118 * Encodes all the headers from the given HttpHeaders into the given List 1119 * of buffers. 1120 * 1121 * From https://tools.ietf.org/html/rfc7540#section-8.1.2 : 1122 * 1123 * ...Just as in HTTP/1.x, header field names are strings of ASCII 1124 * characters that are compared in a case-insensitive fashion. However, 1125 * header field names MUST be converted to lowercase prior to their 1126 * encoding in HTTP/2... 1127 */ 1128 private List<ByteBuffer> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) { 1129 ByteBuffer buffer = getHeaderBuffer(maxFrameSize); 1130 List<ByteBuffer> buffers = new ArrayList<>(); 1131 for(HttpHeaders header : headers) { 1132 for (Map.Entry<String, List<String>> e : header.map().entrySet()) { 1133 String lKey = e.getKey().toLowerCase(Locale.US); 1134 List<String> values = e.getValue(); 1135 for (String value : values) { 1136 hpackOut.header(lKey, value); 1137 while (!hpackOut.encode(buffer)) { 1138 buffer.flip(); 1139 buffers.add(buffer); 1140 buffer = getHeaderBuffer(maxFrameSize); 1141 } 1142 } 1143 } 1144 } 1145 buffer.flip(); 1146 buffers.add(buffer); 1147 return buffers; 1148 } 1149 1150 private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) { 1151 oh.streamid(stream.streamid); 1152 if (Log.headers()) { 1153 StringBuilder sb = new StringBuilder("HEADERS FRAME (stream="); 1154 sb.append(stream.streamid).append(")\n"); 1155 Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders()); 1156 Log.dumpHeaders(sb, " ", oh.getSystemHeaders()); 1157 Log.dumpHeaders(sb, " ", oh.getUserHeaders()); 1158 Log.logHeaders(sb.toString()); 1159 } 1160 List<HeaderFrame> frames = encodeHeaders(oh); 1161 return encodeFrames(frames); 1162 } 1163 1164 private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) { 1165 if (Log.frames()) { 1166 frames.forEach(f -> Log.logFrames(f, "OUT")); 1167 } 1168 return framesEncoder.encodeFrames(frames); 1169 } 1170 1171 private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) { 1172 Stream<?> stream = oh.getAttachment(); 1173 assert stream.streamid == 0; 1174 int streamid = nextstreamid; 1175 nextstreamid += 2; 1176 stream.registerStream(streamid); 1177 // set outgoing window here. This allows thread sending 1178 // body to proceed. 1179 windowController.registerStream(streamid, getInitialSendWindowSize()); 1180 return stream; 1181 } 1182 1183 private final Object sendlock = new Object(); 1184 1185 void sendFrame(Http2Frame frame) { 1186 try { 1187 HttpPublisher publisher = publisher(); 1188 synchronized (sendlock) { 1189 if (frame instanceof OutgoingHeaders) { 1190 @SuppressWarnings("unchecked") 1191 OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame; 1192 Stream<?> stream = registerNewStream(oh); 1193 // provide protection from inserting unordered frames between Headers and Continuation 1194 publisher.enqueue(encodeHeaders(oh, stream)); 1195 } else { 1196 publisher.enqueue(encodeFrame(frame)); 1197 } 1198 } 1199 publisher.signalEnqueued(); 1200 } catch (IOException e) { 1201 if (!closed) { 1202 Log.logError(e); 1203 shutdown(e); 1204 } 1205 } 1206 } 1207 1208 private List<ByteBuffer> encodeFrame(Http2Frame frame) { 1209 Log.logFrames(frame, "OUT"); 1210 return framesEncoder.encodeFrame(frame); 1211 } 1212 1213 void sendDataFrame(DataFrame frame) { 1214 try { 1215 HttpPublisher publisher = publisher(); 1216 publisher.enqueue(encodeFrame(frame)); 1217 publisher.signalEnqueued(); 1218 } catch (IOException e) { 1219 if (!closed) { 1220 Log.logError(e); 1221 shutdown(e); 1222 } 1223 } 1224 } 1225 1226 /* 1227 * Direct call of the method bypasses synchronization on "sendlock" and 1228 * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. 1229 * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. 1230 */ 1231 void sendUnorderedFrame(Http2Frame frame) { 1232 try { 1233 HttpPublisher publisher = publisher(); 1234 publisher.enqueueUnordered(encodeFrame(frame)); 1235 publisher.signalEnqueued(); 1236 } catch (IOException e) { 1237 if (!closed) { 1238 Log.logError(e); 1239 shutdown(e); 1240 } 1241 } 1242 } 1243 1244 /** 1245 * A simple tube subscriber for reading from the connection flow. 1246 */ 1247 final class Http2TubeSubscriber implements TubeSubscriber { 1248 private volatile Flow.Subscription subscription; 1249 private volatile boolean completed; 1250 private volatile boolean dropped; 1251 private volatile Throwable error; 1252 private final ConcurrentLinkedQueue<ByteBuffer> queue 1253 = new ConcurrentLinkedQueue<>(); 1254 private final SequentialScheduler scheduler = 1255 SequentialScheduler.synchronizedScheduler(this::processQueue); 1256 private final HttpClientImpl client; 1257 1258 Http2TubeSubscriber(HttpClientImpl client) { 1259 this.client = Objects.requireNonNull(client); 1260 } 1261 1262 final void processQueue() { 1263 try { 1264 while (!queue.isEmpty() && !scheduler.isStopped()) { 1265 ByteBuffer buffer = queue.poll(); 1266 if (debug.on()) 1267 debug.log("sending %d to Http2Connection.asyncReceive", 1268 buffer.remaining()); 1269 asyncReceive(buffer); 1270 } 1271 } catch (Throwable t) { 1272 Throwable x = error; 1273 if (x == null) error = t; 1274 } finally { 1275 Throwable x = error; 1276 if (x != null) { 1277 if (debug.on()) debug.log("Stopping scheduler", x); 1278 scheduler.stop(); 1279 Http2Connection.this.shutdown(x); 1280 } 1281 } 1282 } 1283 1284 private final void runOrSchedule() { 1285 if (client.isSelectorThread()) { 1286 scheduler.runOrSchedule(client.theExecutor()); 1287 } else scheduler.runOrSchedule(); 1288 } 1289 1290 @Override 1291 public void onSubscribe(Flow.Subscription subscription) { 1292 // supports being called multiple time. 1293 // doesn't cancel the previous subscription, since that is 1294 // most probably the same as the new subscription. 1295 assert this.subscription == null || dropped == false; 1296 this.subscription = subscription; 1297 dropped = false; 1298 // TODO FIXME: request(1) should be done by the delegate. 1299 if (!completed) { 1300 if (debug.on()) 1301 debug.log("onSubscribe: requesting Long.MAX_VALUE for reading"); 1302 subscription.request(Long.MAX_VALUE); 1303 } else { 1304 if (debug.on()) debug.log("onSubscribe: already completed"); 1305 } 1306 } 1307 1308 @Override 1309 public void onNext(List<ByteBuffer> item) { 1310 if (debug.on()) debug.log(() -> "onNext: got " + Utils.remaining(item) 1311 + " bytes in " + item.size() + " buffers"); 1312 queue.addAll(item); 1313 runOrSchedule(); 1314 } 1315 1316 @Override 1317 public void onError(Throwable throwable) { 1318 if (debug.on()) debug.log(() -> "onError: " + throwable); 1319 error = throwable; 1320 completed = true; 1321 runOrSchedule(); 1322 } 1323 1324 @Override 1325 public void onComplete() { 1326 String msg = isActive() 1327 ? "EOF reached while reading" 1328 : "Idle connection closed by HTTP/2 peer"; 1329 if (debug.on()) debug.log(msg); 1330 error = new EOFException(msg); 1331 completed = true; 1332 runOrSchedule(); 1333 } 1334 1335 @Override 1336 public void dropSubscription() { 1337 if (debug.on()) debug.log("dropSubscription"); 1338 // we could probably set subscription to null here... 1339 // then we might not need the 'dropped' boolean? 1340 dropped = true; 1341 } 1342 } 1343 1344 synchronized boolean isActive() { 1345 return numReservedClientStreams > 0 || numReservedServerStreams > 0; 1346 } 1347 1348 @Override 1349 public final String toString() { 1350 return dbgString(); 1351 } 1352 1353 final String dbgString() { 1354 return "Http2Connection(" 1355 + connection.getConnectionFlow() + ")"; 1356 } 1357 1358 static class HeaderDecoder extends ValidatingHeadersConsumer { 1359 1360 HttpHeadersBuilder headersBuilder; 1361 1362 HeaderDecoder() { 1363 this.headersBuilder = new HttpHeadersBuilder(); 1364 } 1365 1366 @Override 1367 public void onDecoded(CharSequence name, CharSequence value) { 1368 String n = name.toString(); 1369 String v = value.toString(); 1370 super.onDecoded(n, v); 1371 headersBuilder.addHeader(n, v); 1372 } 1373 1374 HttpHeaders headers() { 1375 return headersBuilder.build(); 1376 } 1377 } 1378 1379 /* 1380 * Checks RFC 7540 rules (relaxed) compliance regarding pseudo-headers. 1381 */ 1382 static class ValidatingHeadersConsumer implements DecodingCallback { 1383 1384 private static final Set<String> PSEUDO_HEADERS = 1385 Set.of(":authority", ":method", ":path", ":scheme", ":status"); 1386 1387 /** Used to check that if there are pseudo-headers, they go first */ 1388 private boolean pseudoHeadersEnded; 1389 1390 /** 1391 * Called when END_HEADERS was received. This consumer may be invoked 1392 * again after reset() is called, but for a whole new set of headers. 1393 */ 1394 void reset() { 1395 pseudoHeadersEnded = false; 1396 } 1397 1398 @Override 1399 public void onDecoded(CharSequence name, CharSequence value) 1400 throws UncheckedIOException 1401 { 1402 String n = name.toString(); 1403 if (n.startsWith(":")) { 1404 if (pseudoHeadersEnded) { 1405 throw newException("Unexpected pseudo-header '%s'", n); 1406 } else if (!PSEUDO_HEADERS.contains(n)) { 1407 throw newException("Unknown pseudo-header '%s'", n); 1408 } 1409 } else { 1410 pseudoHeadersEnded = true; 1411 if (!Utils.isValidName(n)) { 1412 throw newException("Bad header name '%s'", n); 1413 } 1414 } 1415 String v = value.toString(); 1416 if (!Utils.isValidValue(v)) { 1417 throw newException("Bad header value '%s'", v); 1418 } 1419 } 1420 1421 private UncheckedIOException newException(String message, String header) 1422 { 1423 return new UncheckedIOException( 1424 new IOException(String.format(message, header))); 1425 } 1426 } 1427 1428 static final class ConnectionWindowUpdateSender extends WindowUpdateSender { 1429 1430 final int initialWindowSize; 1431 public ConnectionWindowUpdateSender(Http2Connection connection, 1432 int initialWindowSize) { 1433 super(connection, initialWindowSize); 1434 this.initialWindowSize = initialWindowSize; 1435 } 1436 1437 @Override 1438 int getStreamId() { 1439 return 0; 1440 } 1441 } 1442 1443 /** 1444 * Thrown when https handshake negotiates http/1.1 alpn instead of h2 1445 */ 1446 static final class ALPNException extends IOException { 1447 private static final long serialVersionUID = 0L; 1448 final transient AbstractAsyncSSLConnection connection; 1449 1450 ALPNException(String msg, AbstractAsyncSSLConnection connection) { 1451 super(msg); 1452 this.connection = connection; 1453 } 1454 1455 AbstractAsyncSSLConnection getConnection() { 1456 return connection; 1457 } 1458 } 1459 }