1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.URI; 31 import jdk.incubator.http.HttpConnection.Mode; 32 import java.nio.ByteBuffer; 33 import java.nio.charset.StandardCharsets; 34 import java.util.HashMap; 35 import java.util.Iterator; 36 import java.util.LinkedList; 37 import java.util.List; 38 import java.util.Map; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.ArrayList; 41 import java.util.Collections; 42 import java.util.Formatter; 43 import java.util.concurrent.ConcurrentHashMap; 44 import java.util.concurrent.CountDownLatch; 45 import java.util.stream.Collectors; 46 import javax.net.ssl.SSLEngine; 47 import jdk.incubator.http.internal.common.*; 48 import jdk.incubator.http.internal.frame.*; 49 import jdk.incubator.http.internal.hpack.Encoder; 50 import jdk.incubator.http.internal.hpack.Decoder; 51 import jdk.incubator.http.internal.hpack.DecodingCallback; 52 53 import static jdk.incubator.http.internal.frame.SettingsFrame.*; 54 55 56 /** 57 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used 58 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff. 59 * 60 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs 61 * to a HttpClientImpl. 62 * 63 * Creation cases: 64 * 1) upgraded HTTP/1.1 plain tcp connection 65 * 2) prior knowledge directly created plain tcp connection 66 * 3) directly created HTTP/2 SSL connection which uses ALPN. 67 * 68 * Sending is done by writing directly to underlying HttpConnection object which 69 * is operating in async mode. No flow control applies on output at this level 70 * and all writes are just executed as puts to an output Q belonging to HttpConnection 71 * Flow control is implemented by HTTP/2 protocol itself. 72 * 73 * Hpack header compression 74 * and outgoing stream creation is also done here, because these operations 75 * must be synchronized at the socket level. Stream objects send frames simply 76 * by placing them on the connection's output Queue. sendFrame() is called 77 * from a higher level (Stream) thread. 78 * 79 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles 80 * incoming Http2Frames, and directs them to the appropriate Stream.incoming() 81 * or handles them directly itself. This thread performs hpack decompression 82 * and incoming stream creation (Server push). Incoming frames destined for a 83 * stream are provided by calling Stream.incoming(). 84 */ 85 class Http2Connection { 86 /* 87 * ByteBuffer pooling strategy for HTTP/2 protocol: 88 * 89 * In general there are 4 points where ByteBuffers are used: 90 * - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data 91 * in case of SSL connection. 92 * 93 * 1. Outgoing frames encoded to ByteBuffers. 94 * Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc) 95 * At this place no pools at all. All outgoing buffers should be collected by GC. 96 * 97 * 2. Incoming ByteBuffers (decoded to frames). 98 * Here, total elimination of BB pool is not a good idea. 99 * We don't know how many bytes we will receive through network. 100 * So here we allocate buffer of reasonable size. The following life of the BB: 101 * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses) 102 * BB is returned to pool, 103 * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method. 104 * Such BB is never returned to pool and will be GCed. 105 * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and 106 * the buffer could be release to pool. 107 * 108 * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool, 109 * because of we can't predict size encrypted packets. 110 * 111 */ 112 113 114 // A small class that allows to control frames with respect to the state of 115 // the connection preface. Any data received before the connection 116 // preface is sent will be buffered. 117 private final class FramesController { 118 volatile boolean prefaceSent; 119 volatile List<ByteBufferReference> pending; 120 121 boolean processReceivedData(FramesDecoder decoder, ByteBufferReference buf) 122 throws IOException 123 { 124 // if preface is not sent, buffers data in the pending list 125 if (!prefaceSent) { 126 synchronized (this) { 127 if (!prefaceSent) { 128 if (pending == null) pending = new ArrayList<>(); 129 pending.add(buf); 130 return false; 131 } 132 } 133 } 134 135 // Preface is sent. Checks for pending data and flush it. 136 // We rely on this method being called from within the readlock, 137 // so we know that no other thread could execute this method 138 // concurrently while we're here. 139 // This ensures that later incoming buffers will not 140 // be processed before we have flushed the pending queue. 141 // No additional synchronization is therefore necessary here. 142 List<ByteBufferReference> pending = this.pending; 143 this.pending = null; 144 if (pending != null) { 145 // flush pending data 146 for (ByteBufferReference b : pending) { 147 decoder.decode(b); 148 } 149 } 150 151 // push the received buffer to the frames decoder. 152 decoder.decode(buf); 153 return true; 154 } 155 156 // Mark that the connection preface is sent 157 void markPrefaceSent() { 158 assert !prefaceSent; 159 synchronized (this) { 160 prefaceSent = true; 161 } 162 } 163 164 } 165 166 volatile boolean closed; 167 168 //------------------------------------- 169 final HttpConnection connection; 170 private final HttpClientImpl client; 171 private final Http2ClientImpl client2; 172 private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>(); 173 private int nextstreamid; 174 private int nextPushStream = 2; 175 private final Encoder hpackOut; 176 private final Decoder hpackIn; 177 final SettingsFrame clientSettings; 178 private volatile SettingsFrame serverSettings; 179 private final String key; // for HttpClientImpl.connections map 180 private final FramesDecoder framesDecoder; 181 private final FramesEncoder framesEncoder = new FramesEncoder(); 182 183 /** 184 * Send Window controller for both connection and stream windows. 185 * Each of this connection's Streams MUST use this controller. 186 */ 187 private final WindowController windowController = new WindowController(); 188 private final FramesController framesController = new FramesController(); 189 final WindowUpdateSender windowUpdater; 190 191 static final int DEFAULT_FRAME_SIZE = 16 * 1024; 192 193 194 // TODO: need list of control frames from other threads 195 // that need to be sent 196 197 private Http2Connection(HttpConnection connection, 198 Http2ClientImpl client2, 199 int nextstreamid, 200 String key) { 201 this.connection = connection; 202 this.client = client2.client(); 203 this.client2 = client2; 204 this.nextstreamid = nextstreamid; 205 this.key = key; 206 this.clientSettings = this.client2.getClientSettings(); 207 this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); 208 // serverSettings will be updated by server 209 this.serverSettings = SettingsFrame.getDefaultSettings(); 210 this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 211 this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 212 this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize()); 213 } 214 215 /** 216 * Case 1) Create from upgraded HTTP/1.1 connection. 217 * Is ready to use. Will not be SSL. exchange is the Exchange 218 * that initiated the connection, whose response will be delivered 219 * on a Stream. 220 */ 221 Http2Connection(HttpConnection connection, 222 Http2ClientImpl client2, 223 Exchange<?> exchange, 224 ByteBuffer initial) 225 throws IOException, InterruptedException 226 { 227 this(connection, 228 client2, 229 3, // stream 1 is registered during the upgrade 230 keyFor(connection)); 231 assert !(connection instanceof SSLConnection); 232 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 233 234 Stream<?> initialStream = createStream(exchange); 235 initialStream.registerStream(1); 236 windowController.registerStream(1, getInitialSendWindowSize()); 237 initialStream.requestSent(); 238 sendConnectionPreface(); 239 // start reading and writing 240 // start reading 241 AsyncConnection asyncConn = (AsyncConnection)connection; 242 asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer); 243 connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility. 244 asyncReceive(ByteBufferReference.of(initial)); 245 asyncConn.startReading(); 246 } 247 248 // async style but completes immediately 249 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, 250 Http2ClientImpl client2, 251 Exchange<?> exchange, 252 ByteBuffer initial) { 253 return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial)); 254 } 255 256 /** 257 * Cases 2) 3) 258 * 259 * request is request to be sent. 260 */ 261 Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client) 262 throws IOException, InterruptedException 263 { 264 this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true), 265 h2client, 266 1, 267 keyFor(request.uri(), request.proxy(h2client.client()))); 268 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 269 270 // start reading 271 AsyncConnection asyncConn = (AsyncConnection)connection; 272 asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer); 273 connection.connect(); 274 checkSSLConfig(); 275 // safe to resume async reading now. 276 asyncConn.enableCallback(); 277 sendConnectionPreface(); 278 } 279 280 /** 281 * Throws an IOException if h2 was not negotiated 282 */ 283 private void checkSSLConfig() throws IOException { 284 AbstractAsyncSSLConnection aconn = (AbstractAsyncSSLConnection)connection; 285 SSLEngine engine = aconn.getEngine(); 286 String alpn = engine.getApplicationProtocol(); 287 if (alpn == null || !alpn.equals("h2")) { 288 String msg; 289 if (alpn == null) { 290 Log.logSSL("ALPN not supported"); 291 msg = "ALPN not supported"; 292 } else switch (alpn) { 293 case "": 294 Log.logSSL("No ALPN returned"); 295 msg = "No ALPN negotiated"; 296 break; 297 case "http/1.1": 298 Log.logSSL("HTTP/1.1 ALPN returned"); 299 msg = "HTTP/1.1 ALPN returned"; 300 break; 301 default: 302 Log.logSSL("unknown ALPN returned"); 303 msg = "Unexpected ALPN: " + alpn; 304 throw new IOException(msg); 305 } 306 throw new ALPNException(msg, aconn); 307 } 308 } 309 310 static String keyFor(HttpConnection connection) { 311 boolean isProxy = connection.isProxied(); 312 boolean isSecure = connection.isSecure(); 313 InetSocketAddress addr = connection.address(); 314 315 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort()); 316 } 317 318 static String keyFor(URI uri, InetSocketAddress proxy) { 319 boolean isSecure = uri.getScheme().equalsIgnoreCase("https"); 320 boolean isProxy = proxy != null; 321 322 String host; 323 int port; 324 325 if (isProxy) { 326 host = proxy.getHostString(); 327 port = proxy.getPort(); 328 } else { 329 host = uri.getHost(); 330 port = uri.getPort(); 331 } 332 return keyString(isSecure, isProxy, host, port); 333 } 334 335 // {C,S}:{H:P}:host:port 336 // C indicates clear text connection "http" 337 // S indicates secure "https" 338 // H indicates host (direct) connection 339 // P indicates proxy 340 // Eg: "S:H:foo.com:80" 341 static String keyString(boolean secure, boolean proxy, String host, int port) { 342 return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port; 343 } 344 345 String key() { 346 return this.key; 347 } 348 349 void putConnection() { 350 client2.putConnection(this); 351 } 352 353 private static String toHexdump1(ByteBuffer bb) { 354 bb.mark(); 355 StringBuilder sb = new StringBuilder(512); 356 Formatter f = new Formatter(sb); 357 358 while (bb.hasRemaining()) { 359 int i = Byte.toUnsignedInt(bb.get()); 360 f.format("%02x:", i); 361 } 362 sb.deleteCharAt(sb.length()-1); 363 bb.reset(); 364 return sb.toString(); 365 } 366 367 private static String toHexdump(ByteBuffer bb) { 368 List<String> words = new ArrayList<>(); 369 int i = 0; 370 bb.mark(); 371 while (bb.hasRemaining()) { 372 if (i % 2 == 0) { 373 words.add(""); 374 } 375 byte b = bb.get(); 376 String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1); 377 words.set(i / 2, words.get(i / 2) + hex); 378 i++; 379 } 380 bb.reset(); 381 return words.stream().collect(Collectors.joining(" ")); 382 } 383 384 private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) { 385 boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); 386 387 ByteBufferReference[] buffers = frame.getHeaderBlock(); 388 for (int i = 0; i < buffers.length; i++) { 389 hpackIn.decode(buffers[i].get(), endOfHeaders && (i == buffers.length - 1), decoder); 390 } 391 } 392 393 int getInitialSendWindowSize() { 394 return serverSettings.getParameter(INITIAL_WINDOW_SIZE); 395 } 396 397 void close() { 398 GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes()); 399 // TODO: set last stream. For now zero ok. 400 sendFrame(f); 401 } 402 403 private ByteBufferPool readBufferPool = new ByteBufferPool(); 404 405 // provides buffer to read data (default size) 406 public ByteBufferReference getReadBuffer() { 407 return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE); 408 } 409 410 private final Object readlock = new Object(); 411 412 public void asyncReceive(ByteBufferReference buffer) { 413 // We don't need to read anything and 414 // we don't want to send anything back to the server 415 // until the connection preface has been sent. 416 // Therefore we're going to wait if needed before reading 417 // (and thus replying) to anything. 418 // Starting to reply to something (e.g send an ACK to a 419 // SettingsFrame sent by the server) before the connection 420 // preface is fully sent might result in the server 421 // sending a GOAWAY frame with 'invalid_preface'. 422 synchronized (readlock) { 423 try { 424 // the readlock ensures that the order of incoming buffers 425 // is preserved. 426 framesController.processReceivedData(framesDecoder, buffer); 427 } catch (Throwable e) { 428 String msg = Utils.stackTrace(e); 429 Log.logTrace(msg); 430 shutdown(e); 431 } 432 } 433 } 434 435 436 void shutdown(Throwable t) { 437 Log.logError(t); 438 closed = true; 439 client2.deleteConnection(this); 440 List<Stream<?>> c = new LinkedList<>(streams.values()); 441 for (Stream<?> s : c) { 442 s.cancelImpl(t); 443 } 444 connection.close(); 445 } 446 447 /** 448 * Handles stream 0 (common) frames that apply to whole connection and passes 449 * other stream specific frames to that Stream object. 450 * 451 * Invokes Stream.incoming() which is expected to process frame without 452 * blocking. 453 */ 454 void processFrame(Http2Frame frame) throws IOException { 455 Log.logFrames(frame, "IN"); 456 int streamid = frame.streamid(); 457 if (frame instanceof MalformedFrame) { 458 Log.logError(((MalformedFrame) frame).getMessage()); 459 if (streamid == 0) { 460 protocolError(((MalformedFrame) frame).getErrorCode()); 461 } else { 462 resetStream(streamid, ((MalformedFrame) frame).getErrorCode()); 463 } 464 return; 465 } 466 if (streamid == 0) { 467 handleConnectionFrame(frame); 468 } else { 469 if (frame instanceof SettingsFrame) { 470 // The stream identifier for a SETTINGS frame MUST be zero 471 protocolError(GoAwayFrame.PROTOCOL_ERROR); 472 return; 473 } 474 475 Stream<?> stream = getStream(streamid); 476 if (stream == null) { 477 // Should never receive a frame with unknown stream id 478 479 // To avoid looping, an endpoint MUST NOT send a RST_STREAM in 480 // response to a RST_STREAM frame. 481 if (!(frame instanceof ResetFrame)) { 482 resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 483 } 484 return; 485 } 486 if (frame instanceof PushPromiseFrame) { 487 PushPromiseFrame pp = (PushPromiseFrame)frame; 488 handlePushPromise(stream, pp); 489 } else if (frame instanceof HeaderFrame) { 490 // decode headers (or continuation) 491 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer()); 492 stream.incoming(frame); 493 } else { 494 stream.incoming(frame); 495 } 496 } 497 } 498 499 private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp) 500 throws IOException 501 { 502 HttpRequestImpl parentReq = parent.request; 503 int promisedStreamid = pp.getPromisedStream(); 504 if (promisedStreamid != nextPushStream) { 505 resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); 506 return; 507 } else { 508 nextPushStream += 2; 509 } 510 HeaderDecoder decoder = new HeaderDecoder(); 511 decodeHeaders(pp, decoder); 512 HttpHeadersImpl headers = decoder.headers(); 513 HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); 514 Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi); 515 Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch); 516 pushExch.exchImpl = pushStream; 517 pushStream.registerStream(promisedStreamid); 518 parent.incoming_pushPromise(pushReq, pushStream); 519 } 520 521 private void handleConnectionFrame(Http2Frame frame) 522 throws IOException 523 { 524 switch (frame.type()) { 525 case SettingsFrame.TYPE: 526 handleSettings((SettingsFrame)frame); 527 break; 528 case PingFrame.TYPE: 529 handlePing((PingFrame)frame); 530 break; 531 case GoAwayFrame.TYPE: 532 handleGoAway((GoAwayFrame)frame); 533 break; 534 case WindowUpdateFrame.TYPE: 535 handleWindowUpdate((WindowUpdateFrame)frame); 536 break; 537 default: 538 protocolError(ErrorFrame.PROTOCOL_ERROR); 539 } 540 } 541 542 void resetStream(int streamid, int code) throws IOException { 543 Log.logError( 544 "Resetting stream {0,number,integer} with error code {1,number,integer}", 545 streamid, code); 546 ResetFrame frame = new ResetFrame(streamid, code); 547 sendFrame(frame); 548 closeStream(streamid); 549 } 550 551 void closeStream(int streamid) { 552 Stream<?> s = streams.remove(streamid); 553 // ## Remove s != null. It is a hack for delayed cancellation,reset 554 if (s != null && !(s instanceof Stream.PushedStream)) { 555 // Since PushStreams have no request body, then they have no 556 // corresponding entry in the window controller. 557 windowController.removeStream(streamid); 558 } 559 } 560 /** 561 * Increments this connection's send Window by the amount in the given frame. 562 */ 563 private void handleWindowUpdate(WindowUpdateFrame f) 564 throws IOException 565 { 566 int amount = f.getUpdate(); 567 if (amount <= 0) { 568 // ## temporarily disable to workaround a bug in Jetty where it 569 // ## sends Window updates with a 0 update value. 570 //protocolError(ErrorFrame.PROTOCOL_ERROR); 571 } else { 572 boolean success = windowController.increaseConnectionWindow(amount); 573 if (!success) { 574 protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow 575 } 576 } 577 } 578 579 private void protocolError(int errorCode) 580 throws IOException 581 { 582 GoAwayFrame frame = new GoAwayFrame(0, errorCode); 583 sendFrame(frame); 584 shutdown(new IOException("protocol error")); 585 } 586 587 private void handleSettings(SettingsFrame frame) 588 throws IOException 589 { 590 assert frame.streamid() == 0; 591 if (!frame.getFlag(SettingsFrame.ACK)) { 592 int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE); 593 int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE); 594 int diff = newWindowSize - oldWindowSize; 595 if (diff != 0) { 596 windowController.adjustActiveStreams(diff); 597 } 598 serverSettings = frame; 599 sendFrame(new SettingsFrame(SettingsFrame.ACK)); 600 } 601 } 602 603 private void handlePing(PingFrame frame) 604 throws IOException 605 { 606 frame.setFlag(PingFrame.ACK); 607 sendUnorderedFrame(frame); 608 } 609 610 private void handleGoAway(GoAwayFrame frame) 611 throws IOException 612 { 613 shutdown(new IOException( 614 String.valueOf(connection.channel().getLocalAddress()) 615 +": GOAWAY received")); 616 } 617 618 /** 619 * Max frame size we are allowed to send 620 */ 621 public int getMaxSendFrameSize() { 622 int param = serverSettings.getParameter(MAX_FRAME_SIZE); 623 if (param == -1) { 624 param = DEFAULT_FRAME_SIZE; 625 } 626 return param; 627 } 628 629 /** 630 * Max frame size we will receive 631 */ 632 public int getMaxReceiveFrameSize() { 633 return clientSettings.getParameter(MAX_FRAME_SIZE); 634 } 635 636 // Not sure how useful this is. 637 public int getMaxHeadersSize() { 638 return serverSettings.getParameter(MAX_HEADER_LIST_SIZE); 639 } 640 641 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 642 643 private static final byte[] PREFACE_BYTES = 644 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1); 645 646 /** 647 * Sends Connection preface and Settings frame with current preferred 648 * values 649 */ 650 private void sendConnectionPreface() throws IOException { 651 Log.logTrace("{0}: start sending connection preface to {1}", 652 connection.channel().getLocalAddress(), 653 connection.address()); 654 SettingsFrame sf = client2.getClientSettings(); 655 ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); 656 Log.logFrames(sf, "OUT"); 657 // send preface bytes and SettingsFrame together 658 connection.write(ref.get()); 659 // mark preface sent. 660 framesController.markPrefaceSent(); 661 Log.logTrace("PREFACE_BYTES sent"); 662 Log.logTrace("Settings Frame sent"); 663 664 // send a Window update for the receive buffer we are using 665 // minus the initial 64 K specified in protocol 666 final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1); 667 windowUpdater.sendWindowUpdate(len); 668 // there will be an ACK to the windows update - which should 669 // cause any pending data stored before the preface was sent to be 670 // flushed (see PrefaceController). 671 Log.logTrace("finished sending connection preface"); 672 } 673 674 /** 675 * Returns an existing Stream with given id, or null if doesn't exist 676 */ 677 @SuppressWarnings("unchecked") 678 <T> Stream<T> getStream(int streamid) { 679 return (Stream<T>)streams.get(streamid); 680 } 681 682 /** 683 * Creates Stream with given id. 684 */ 685 <T> Stream<T> createStream(Exchange<T> exchange) { 686 Stream<T> stream = new Stream<>(client, this, exchange, windowController); 687 return stream; 688 } 689 690 <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) { 691 PushGroup<?,T> pg = parent.exchange.getPushGroup(); 692 return new Stream.PushedStream<>(pg, client, this, parent, pushEx); 693 } 694 695 <T> void putStream(Stream<T> stream, int streamid) { 696 streams.put(streamid, stream); 697 } 698 699 void deleteStream(int streamid) { 700 streams.remove(streamid); 701 windowController.removeStream(streamid); 702 } 703 704 /** 705 * Encode the headers into a List<ByteBuffer> and then create HEADERS 706 * and CONTINUATION frames from the list and return the List<Http2Frame>. 707 */ 708 private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) { 709 List<ByteBufferReference> buffers = encodeHeadersImpl( 710 getMaxSendFrameSize(), 711 frame.getAttachment().getRequestPseudoHeaders(), 712 frame.getUserHeaders(), 713 frame.getSystemHeaders()); 714 715 List<HeaderFrame> frames = new ArrayList<>(buffers.size()); 716 Iterator<ByteBufferReference> bufIterator = buffers.iterator(); 717 HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next()); 718 frames.add(oframe); 719 while(bufIterator.hasNext()) { 720 oframe = new ContinuationFrame(frame.streamid(), bufIterator.next()); 721 frames.add(oframe); 722 } 723 oframe.setFlag(HeaderFrame.END_HEADERS); 724 return frames; 725 } 726 727 // Dedicated cache for headers encoding ByteBuffer. 728 // There can be no concurrent access to this buffer as all access to this buffer 729 // and its content happen within a single critical code block section protected 730 // by the sendLock. / (see sendFrame()) 731 private ByteBufferPool headerEncodingPool = new ByteBufferPool(); 732 733 private ByteBufferReference getHeaderBuffer(int maxFrameSize) { 734 ByteBufferReference ref = headerEncodingPool.get(maxFrameSize); 735 ref.get().limit(maxFrameSize); 736 return ref; 737 } 738 739 /* 740 * Encodes all the headers from the given HttpHeaders into the given List 741 * of buffers. 742 * 743 * From https://tools.ietf.org/html/rfc7540#section-8.1.2 : 744 * 745 * ...Just as in HTTP/1.x, header field names are strings of ASCII 746 * characters that are compared in a case-insensitive fashion. However, 747 * header field names MUST be converted to lowercase prior to their 748 * encoding in HTTP/2... 749 */ 750 private List<ByteBufferReference> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) { 751 ByteBufferReference buffer = getHeaderBuffer(maxFrameSize); 752 List<ByteBufferReference> buffers = new ArrayList<>(); 753 for(HttpHeaders header : headers) { 754 for (Map.Entry<String, List<String>> e : header.map().entrySet()) { 755 String lKey = e.getKey().toLowerCase(); 756 List<String> values = e.getValue(); 757 for (String value : values) { 758 hpackOut.header(lKey, value); 759 while (!hpackOut.encode(buffer.get())) { 760 buffer.get().flip(); 761 buffers.add(buffer); 762 buffer = getHeaderBuffer(maxFrameSize); 763 } 764 } 765 } 766 } 767 buffer.get().flip(); 768 buffers.add(buffer); 769 return buffers; 770 } 771 772 private ByteBufferReference[] encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) { 773 oh.streamid(stream.streamid); 774 if (Log.headers()) { 775 StringBuilder sb = new StringBuilder("HEADERS FRAME (stream="); 776 sb.append(stream.streamid).append(")\n"); 777 Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders()); 778 Log.dumpHeaders(sb, " ", oh.getSystemHeaders()); 779 Log.dumpHeaders(sb, " ", oh.getUserHeaders()); 780 Log.logHeaders(sb.toString()); 781 } 782 List<HeaderFrame> frames = encodeHeaders(oh); 783 return encodeFrames(frames); 784 } 785 786 private ByteBufferReference[] encodeFrames(List<HeaderFrame> frames) { 787 if (Log.frames()) { 788 frames.forEach(f -> Log.logFrames(f, "OUT")); 789 } 790 return framesEncoder.encodeFrames(frames); 791 } 792 793 static Throwable getExceptionFrom(CompletableFuture<?> cf) { 794 try { 795 cf.get(); 796 return null; 797 } catch (Throwable e) { 798 if (e.getCause() != null) { 799 return e.getCause(); 800 } else { 801 return e; 802 } 803 } 804 } 805 806 private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) { 807 Stream<?> stream = oh.getAttachment(); 808 int streamid = nextstreamid; 809 nextstreamid += 2; 810 stream.registerStream(streamid); 811 // set outgoing window here. This allows thread sending 812 // body to proceed. 813 windowController.registerStream(streamid, getInitialSendWindowSize()); 814 return stream; 815 } 816 817 private final Object sendlock = new Object(); 818 819 void sendFrame(Http2Frame frame) { 820 try { 821 synchronized (sendlock) { 822 if (frame instanceof OutgoingHeaders) { 823 @SuppressWarnings("unchecked") 824 OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame; 825 Stream<?> stream = registerNewStream(oh); 826 // provide protection from inserting unordered frames between Headers and Continuation 827 connection.writeAsync(encodeHeaders(oh, stream)); 828 } else { 829 connection.writeAsync(encodeFrame(frame)); 830 } 831 } 832 connection.flushAsync(); 833 } catch (IOException e) { 834 if (!closed) { 835 Log.logError(e); 836 shutdown(e); 837 } 838 } 839 } 840 841 private ByteBufferReference[] encodeFrame(Http2Frame frame) { 842 Log.logFrames(frame, "OUT"); 843 return framesEncoder.encodeFrame(frame); 844 } 845 846 void sendDataFrame(DataFrame frame) { 847 try { 848 connection.writeAsync(encodeFrame(frame)); 849 connection.flushAsync(); 850 } catch (IOException e) { 851 if (!closed) { 852 Log.logError(e); 853 shutdown(e); 854 } 855 } 856 } 857 858 /* 859 * Direct call of the method bypasses synchronization on "sendlock" and 860 * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. 861 * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. 862 */ 863 void sendUnorderedFrame(Http2Frame frame) { 864 try { 865 connection.writeAsyncUnordered(encodeFrame(frame)); 866 connection.flushAsync(); 867 } catch (IOException e) { 868 if (!closed) { 869 Log.logError(e); 870 shutdown(e); 871 } 872 } 873 } 874 875 static class HeaderDecoder implements DecodingCallback { 876 HttpHeadersImpl headers; 877 878 HeaderDecoder() { 879 this.headers = new HttpHeadersImpl(); 880 } 881 882 @Override 883 public void onDecoded(CharSequence name, CharSequence value) { 884 headers.addHeader(name.toString(), value.toString()); 885 } 886 887 HttpHeadersImpl headers() { 888 return headers; 889 } 890 } 891 892 static final class ConnectionWindowUpdateSender extends WindowUpdateSender { 893 894 public ConnectionWindowUpdateSender(Http2Connection connection, 895 int initialWindowSize) { 896 super(connection, initialWindowSize); 897 } 898 899 @Override 900 int getStreamId() { 901 return 0; 902 } 903 } 904 905 /** 906 * Thrown when https handshake negotiates http/1.1 alpn instead of h2 907 */ 908 static final class ALPNException extends IOException { 909 private static final long serialVersionUID = 23138275393635783L; 910 final AbstractAsyncSSLConnection connection; 911 912 ALPNException(String msg, AbstractAsyncSSLConnection connection) { 913 super(msg); 914 this.connection = connection; 915 } 916 917 AbstractAsyncSSLConnection getConnection() { 918 return connection; 919 } 920 } 921 }