8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.URI; 31 import jdk.incubator.http.HttpConnection.Mode; 32 import java.nio.ByteBuffer; 33 import java.nio.charset.StandardCharsets; 34 import java.util.HashMap; 35 import java.util.Iterator; 36 import java.util.LinkedList; 37 import java.util.List; 38 import java.util.Map; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.ArrayList; 41 import java.util.Collections; 42 import java.util.Formatter; 43 import java.util.concurrent.ConcurrentHashMap; 44 import java.util.concurrent.CountDownLatch; 45 import java.util.stream.Collectors; 46 import javax.net.ssl.SSLEngine; 47 import jdk.incubator.http.internal.common.*; 48 import jdk.incubator.http.internal.frame.*; 49 import jdk.incubator.http.internal.hpack.Encoder; 50 import jdk.incubator.http.internal.hpack.Decoder; 51 import jdk.incubator.http.internal.hpack.DecodingCallback; 52 53 import static jdk.incubator.http.internal.frame.SettingsFrame.*; 54 55 56 /** 57 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used 58 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff. 59 * 60 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs 61 * to a HttpClientImpl. 62 * 63 * Creation cases: 64 * 1) upgraded HTTP/1.1 plain tcp connection 65 * 2) prior knowledge directly created plain tcp connection 66 * 3) directly created HTTP/2 SSL connection which uses ALPN. 67 * 68 * Sending is done by writing directly to underlying HttpConnection object which 69 * is operating in async mode. No flow control applies on output at this level 70 * and all writes are just executed as puts to an output Q belonging to HttpConnection 71 * Flow control is implemented by HTTP/2 protocol itself. 72 * 73 * Hpack header compression 74 * and outgoing stream creation is also done here, because these operations 75 * must be synchronized at the socket level. Stream objects send frames simply 76 * by placing them on the connection's output Queue. sendFrame() is called 77 * from a higher level (Stream) thread. 78 * 79 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles 80 * incoming Http2Frames, and directs them to the appropriate Stream.incoming() 81 * or handles them directly itself. This thread performs hpack decompression 82 * and incoming stream creation (Server push). Incoming frames destined for a 83 * stream are provided by calling Stream.incoming(). 84 */ 85 class Http2Connection { 86 /* 87 * ByteBuffer pooling strategy for HTTP/2 protocol: 88 * 89 * In general there are 4 points where ByteBuffers are used: 90 * - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data 91 * in case of SSL connection. 92 * 93 * 1. Outgoing frames encoded to ByteBuffers. 94 * Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc) 95 * At this place no pools at all. All outgoing buffers should be collected by GC. 96 * 97 * 2. Incoming ByteBuffers (decoded to frames). 98 * Here, total elimination of BB pool is not a good idea. 99 * We don't know how many bytes we will receive through network. 100 * So here we allocate buffer of reasonable size. The following life of the BB: 101 * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses) 102 * BB is returned to pool, 103 * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method. 104 * Such BB is never returned to pool and will be GCed. 105 * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and 106 * the buffer could be release to pool. 107 * 108 * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool, 109 * because of we can't predict size encrypted packets. 110 * 111 */ 112 113 114 // A small class that allows to control frames with respect to the state of 115 // the connection preface. Any data received before the connection 116 // preface is sent will be buffered. 117 private final class FramesController { 118 volatile boolean prefaceSent; 119 volatile List<ByteBufferReference> pending; 120 121 boolean processReceivedData(FramesDecoder decoder, ByteBufferReference buf) 122 throws IOException 123 { 124 // if preface is not sent, buffers data in the pending list 125 if (!prefaceSent) { 126 synchronized (this) { 127 if (!prefaceSent) { 128 if (pending == null) pending = new ArrayList<>(); 129 pending.add(buf); 130 return false; 131 } 132 } 133 } 134 135 // Preface is sent. Checks for pending data and flush it. 136 // We rely on this method being called from within the readlock, 137 // so we know that no other thread could execute this method 138 // concurrently while we're here. 139 // This ensures that later incoming buffers will not 140 // be processed before we have flushed the pending queue. 141 // No additional synchronization is therefore necessary here. 142 List<ByteBufferReference> pending = this.pending; 143 this.pending = null; 144 if (pending != null) { 145 // flush pending data 146 for (ByteBufferReference b : pending) { 147 decoder.decode(b); 148 } 149 } 150 151 // push the received buffer to the frames decoder. 152 decoder.decode(buf); 153 return true; 154 } 155 156 // Mark that the connection preface is sent 157 void markPrefaceSent() { 158 assert !prefaceSent; 159 synchronized (this) { 160 prefaceSent = true; 161 } 162 } 163 164 } 165 166 volatile boolean closed; 167 168 //------------------------------------- 169 final HttpConnection connection; 170 private final HttpClientImpl client; 171 private final Http2ClientImpl client2; 172 private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>(); 173 private int nextstreamid; 174 private int nextPushStream = 2; 175 private final Encoder hpackOut; 176 private final Decoder hpackIn; 177 final SettingsFrame clientSettings; 178 private volatile SettingsFrame serverSettings; 179 private final String key; // for HttpClientImpl.connections map 180 private final FramesDecoder framesDecoder; 181 private final FramesEncoder framesEncoder = new FramesEncoder(); 182 183 /** 184 * Send Window controller for both connection and stream windows. 185 * Each of this connection's Streams MUST use this controller. 186 */ 187 private final WindowController windowController = new WindowController(); 188 private final FramesController framesController = new FramesController(); 189 final WindowUpdateSender windowUpdater; 190 191 static final int DEFAULT_FRAME_SIZE = 16 * 1024; 192 193 194 // TODO: need list of control frames from other threads 195 // that need to be sent 196 197 private Http2Connection(HttpConnection connection, 198 Http2ClientImpl client2, 199 int nextstreamid, 200 String key) { 201 this.connection = connection; 202 this.client = client2.client(); 203 this.client2 = client2; 204 this.nextstreamid = nextstreamid; 205 this.key = key; 206 this.clientSettings = this.client2.getClientSettings(); 207 this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); 208 // serverSettings will be updated by server 209 this.serverSettings = SettingsFrame.getDefaultSettings(); 210 this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 211 this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 212 this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize()); 213 } 214 215 /** 216 * Case 1) Create from upgraded HTTP/1.1 connection. 217 * Is ready to use. Will not be SSL. exchange is the Exchange 218 * that initiated the connection, whose response will be delivered 219 * on a Stream. 220 */ 221 Http2Connection(HttpConnection connection, 222 Http2ClientImpl client2, 223 Exchange<?> exchange, 224 ByteBuffer initial) 225 throws IOException, InterruptedException 226 { 227 this(connection, 228 client2, 229 3, // stream 1 is registered during the upgrade 230 keyFor(connection)); 231 assert !(connection instanceof SSLConnection); 232 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 233 234 Stream<?> initialStream = createStream(exchange); 235 initialStream.registerStream(1); 236 windowController.registerStream(1, getInitialSendWindowSize()); 237 initialStream.requestSent(); 238 sendConnectionPreface(); 239 // start reading and writing 240 // start reading 241 AsyncConnection asyncConn = (AsyncConnection)connection; 242 asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer); 243 connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility. 244 asyncReceive(ByteBufferReference.of(initial)); 245 asyncConn.startReading(); 246 } 247 248 // async style but completes immediately 249 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, 250 Http2ClientImpl client2, 251 Exchange<?> exchange, 252 ByteBuffer initial) { 253 return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial)); 254 } 255 256 /** 257 * Cases 2) 3) 258 * 259 * request is request to be sent. 260 */ 261 Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client) 262 throws IOException, InterruptedException 263 { 264 this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true), 265 h2client, 266 1, 267 keyFor(request.uri(), request.proxy(h2client.client()))); 268 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 269 270 // start reading 271 AsyncConnection asyncConn = (AsyncConnection)connection; 272 asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer); 273 connection.connect(); 274 checkSSLConfig(); 275 // safe to resume async reading now. 276 asyncConn.enableCallback(); 277 sendConnectionPreface(); 278 } 279 280 /** 281 * Throws an IOException if h2 was not negotiated 282 */ 283 private void checkSSLConfig() throws IOException { 284 AbstractAsyncSSLConnection aconn = (AbstractAsyncSSLConnection)connection; 285 SSLEngine engine = aconn.getEngine(); 286 String alpn = engine.getApplicationProtocol(); 287 if (alpn == null || !alpn.equals("h2")) { 288 String msg; 289 if (alpn == null) { 290 Log.logSSL("ALPN not supported"); 291 msg = "ALPN not supported"; 292 } else switch (alpn) { 293 case "": 294 Log.logSSL("No ALPN returned"); 295 msg = "No ALPN negotiated"; 296 break; 297 case "http/1.1": 298 Log.logSSL("HTTP/1.1 ALPN returned"); 299 msg = "HTTP/1.1 ALPN returned"; 300 break; 301 default: 302 Log.logSSL("unknown ALPN returned"); 303 msg = "Unexpected ALPN: " + alpn; 304 throw new IOException(msg); 305 } 306 throw new ALPNException(msg, aconn); 307 } 308 } 309 310 static String keyFor(HttpConnection connection) { 311 boolean isProxy = connection.isProxied(); 312 boolean isSecure = connection.isSecure(); 313 InetSocketAddress addr = connection.address(); 314 315 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort()); 316 } 317 318 static String keyFor(URI uri, InetSocketAddress proxy) { 319 boolean isSecure = uri.getScheme().equalsIgnoreCase("https"); 320 boolean isProxy = proxy != null; 321 322 String host; 323 int port; 324 325 if (isProxy) { 326 host = proxy.getHostString(); 327 port = proxy.getPort(); 328 } else { 329 host = uri.getHost(); 330 port = uri.getPort(); 331 } 332 return keyString(isSecure, isProxy, host, port); 333 } 334 335 // {C,S}:{H:P}:host:port 336 // C indicates clear text connection "http" 337 // S indicates secure "https" 338 // H indicates host (direct) connection 339 // P indicates proxy 340 // Eg: "S:H:foo.com:80" 341 static String keyString(boolean secure, boolean proxy, String host, int port) { 342 return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port; 343 } 344 345 String key() { 346 return this.key; 347 } 348 349 void putConnection() { 350 client2.putConnection(this); 351 } 352 353 private static String toHexdump1(ByteBuffer bb) { 354 bb.mark(); 355 StringBuilder sb = new StringBuilder(512); 356 Formatter f = new Formatter(sb); 357 358 while (bb.hasRemaining()) { 359 int i = Byte.toUnsignedInt(bb.get()); 360 f.format("%02x:", i); 361 } 362 sb.deleteCharAt(sb.length()-1); 363 bb.reset(); 364 return sb.toString(); 365 } 366 367 private static String toHexdump(ByteBuffer bb) { 368 List<String> words = new ArrayList<>(); 369 int i = 0; 370 bb.mark(); 371 while (bb.hasRemaining()) { 372 if (i % 2 == 0) { 373 words.add(""); 374 } 375 byte b = bb.get(); 376 String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1); 377 words.set(i / 2, words.get(i / 2) + hex); 378 i++; 379 } 380 bb.reset(); 381 return words.stream().collect(Collectors.joining(" ")); 382 } 383 384 private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) { 385 boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); 386 387 ByteBufferReference[] buffers = frame.getHeaderBlock(); 388 for (int i = 0; i < buffers.length; i++) { 389 hpackIn.decode(buffers[i].get(), endOfHeaders && (i == buffers.length - 1), decoder); 390 } 391 } 392 393 int getInitialSendWindowSize() { 394 return serverSettings.getParameter(INITIAL_WINDOW_SIZE); 395 } 396 397 void close() { 398 GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes()); 399 // TODO: set last stream. For now zero ok. 400 sendFrame(f); 401 } 402 403 private ByteBufferPool readBufferPool = new ByteBufferPool(); 404 405 // provides buffer to read data (default size) 406 public ByteBufferReference getReadBuffer() { 407 return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE); 408 } 409 410 private final Object readlock = new Object(); 411 412 public void asyncReceive(ByteBufferReference buffer) { 413 // We don't need to read anything and 414 // we don't want to send anything back to the server 415 // until the connection preface has been sent. 416 // Therefore we're going to wait if needed before reading 417 // (and thus replying) to anything. 418 // Starting to reply to something (e.g send an ACK to a 419 // SettingsFrame sent by the server) before the connection 420 // preface is fully sent might result in the server 421 // sending a GOAWAY frame with 'invalid_preface'. 422 synchronized (readlock) { 423 try { 424 // the readlock ensures that the order of incoming buffers 425 // is preserved. 426 framesController.processReceivedData(framesDecoder, buffer); 427 } catch (Throwable e) { 428 String msg = Utils.stackTrace(e); 429 Log.logTrace(msg); 430 shutdown(e); 431 } 432 } 433 } 434 435 436 void shutdown(Throwable t) { 437 Log.logError(t); 438 closed = true; 439 client2.deleteConnection(this); 440 List<Stream<?>> c = new LinkedList<>(streams.values()); 441 for (Stream<?> s : c) { 442 s.cancelImpl(t); 443 } 444 connection.close(); 445 } 446 447 /** 448 * Handles stream 0 (common) frames that apply to whole connection and passes 449 * other stream specific frames to that Stream object. 450 * 451 * Invokes Stream.incoming() which is expected to process frame without 452 * blocking. 453 */ 454 void processFrame(Http2Frame frame) throws IOException { 455 Log.logFrames(frame, "IN"); 456 int streamid = frame.streamid(); 457 if (frame instanceof MalformedFrame) { 458 Log.logError(((MalformedFrame) frame).getMessage()); 459 if (streamid == 0) { 460 protocolError(((MalformedFrame) frame).getErrorCode()); 461 } else { 462 resetStream(streamid, ((MalformedFrame) frame).getErrorCode()); 463 } 464 return; 465 } 466 if (streamid == 0) { 467 handleConnectionFrame(frame); 468 } else { 469 if (frame instanceof SettingsFrame) { 470 // The stream identifier for a SETTINGS frame MUST be zero 471 protocolError(GoAwayFrame.PROTOCOL_ERROR); 472 return; 473 } 474 475 Stream<?> stream = getStream(streamid); 476 if (stream == null) { 477 // Should never receive a frame with unknown stream id 478 479 // To avoid looping, an endpoint MUST NOT send a RST_STREAM in 480 // response to a RST_STREAM frame. 481 if (!(frame instanceof ResetFrame)) { 482 resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 483 } 484 return; 485 } 486 if (frame instanceof PushPromiseFrame) { 487 PushPromiseFrame pp = (PushPromiseFrame)frame; 488 handlePushPromise(stream, pp); 489 } else if (frame instanceof HeaderFrame) { 490 // decode headers (or continuation) 491 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer()); 492 stream.incoming(frame); 493 } else { 494 stream.incoming(frame); 495 } 496 } 497 } 498 499 private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp) 500 throws IOException 501 { 502 HttpRequestImpl parentReq = parent.request; 503 int promisedStreamid = pp.getPromisedStream(); 504 if (promisedStreamid != nextPushStream) { 505 resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); 506 return; 507 } else { 508 nextPushStream += 2; 509 } 510 HeaderDecoder decoder = new HeaderDecoder(); 511 decodeHeaders(pp, decoder); 512 HttpHeadersImpl headers = decoder.headers(); 513 HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); 514 Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi); 515 Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch); 516 pushExch.exchImpl = pushStream; 517 pushStream.registerStream(promisedStreamid); 518 parent.incoming_pushPromise(pushReq, pushStream); 519 } 520 521 private void handleConnectionFrame(Http2Frame frame) 522 throws IOException 523 { 524 switch (frame.type()) { 525 case SettingsFrame.TYPE: 526 handleSettings((SettingsFrame)frame); 527 break; 528 case PingFrame.TYPE: 529 handlePing((PingFrame)frame); 530 break; 531 case GoAwayFrame.TYPE: 532 handleGoAway((GoAwayFrame)frame); 533 break; 534 case WindowUpdateFrame.TYPE: 535 handleWindowUpdate((WindowUpdateFrame)frame); 536 break; 537 default: 538 protocolError(ErrorFrame.PROTOCOL_ERROR); 539 } 540 } 541 542 void resetStream(int streamid, int code) throws IOException { 543 Log.logError( 544 "Resetting stream {0,number,integer} with error code {1,number,integer}", 545 streamid, code); 546 ResetFrame frame = new ResetFrame(streamid, code); 547 sendFrame(frame); 548 closeStream(streamid); 549 } 550 551 void closeStream(int streamid) { 552 Stream<?> s = streams.remove(streamid); 553 // ## Remove s != null. It is a hack for delayed cancellation,reset 554 if (s != null && !(s instanceof Stream.PushedStream)) { 555 // Since PushStreams have no request body, then they have no 556 // corresponding entry in the window controller. 557 windowController.removeStream(streamid); 558 } 559 } 560 /** 561 * Increments this connection's send Window by the amount in the given frame. 562 */ 563 private void handleWindowUpdate(WindowUpdateFrame f) 564 throws IOException 565 { 566 int amount = f.getUpdate(); 567 if (amount <= 0) { 568 // ## temporarily disable to workaround a bug in Jetty where it 569 // ## sends Window updates with a 0 update value. 570 //protocolError(ErrorFrame.PROTOCOL_ERROR); 571 } else { 572 boolean success = windowController.increaseConnectionWindow(amount); 573 if (!success) { 574 protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow 575 } 576 } 577 } 578 579 private void protocolError(int errorCode) 580 throws IOException 581 { 582 GoAwayFrame frame = new GoAwayFrame(0, errorCode); 583 sendFrame(frame); 584 shutdown(new IOException("protocol error")); 585 } 586 587 private void handleSettings(SettingsFrame frame) 588 throws IOException 589 { 590 assert frame.streamid() == 0; 591 if (!frame.getFlag(SettingsFrame.ACK)) { 592 int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE); 593 int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE); 594 int diff = newWindowSize - oldWindowSize; 595 if (diff != 0) { 596 windowController.adjustActiveStreams(diff); 597 } 598 serverSettings = frame; 599 sendFrame(new SettingsFrame(SettingsFrame.ACK)); 600 } 601 } 602 603 private void handlePing(PingFrame frame) 604 throws IOException 616 } 617 618 /** 619 * Max frame size we are allowed to send 620 */ 621 public int getMaxSendFrameSize() { 622 int param = serverSettings.getParameter(MAX_FRAME_SIZE); 623 if (param == -1) { 624 param = DEFAULT_FRAME_SIZE; 625 } 626 return param; 627 } 628 629 /** 630 * Max frame size we will receive 631 */ 632 public int getMaxReceiveFrameSize() { 633 return clientSettings.getParameter(MAX_FRAME_SIZE); 634 } 635 636 // Not sure how useful this is. 637 public int getMaxHeadersSize() { 638 return serverSettings.getParameter(MAX_HEADER_LIST_SIZE); 639 } 640 641 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 642 643 private static final byte[] PREFACE_BYTES = 644 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1); 645 646 /** 647 * Sends Connection preface and Settings frame with current preferred 648 * values 649 */ 650 private void sendConnectionPreface() throws IOException { 651 Log.logTrace("{0}: start sending connection preface to {1}", 652 connection.channel().getLocalAddress(), 653 connection.address()); 654 SettingsFrame sf = client2.getClientSettings(); 655 ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); 656 Log.logFrames(sf, "OUT"); 657 // send preface bytes and SettingsFrame together 658 connection.write(ref.get()); 659 // mark preface sent. 660 framesController.markPrefaceSent(); 661 Log.logTrace("PREFACE_BYTES sent"); 662 Log.logTrace("Settings Frame sent"); 663 664 // send a Window update for the receive buffer we are using 665 // minus the initial 64 K specified in protocol 666 final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1); 667 windowUpdater.sendWindowUpdate(len); 668 // there will be an ACK to the windows update - which should 669 // cause any pending data stored before the preface was sent to be 670 // flushed (see PrefaceController). 671 Log.logTrace("finished sending connection preface"); 672 } 673 674 /** 675 * Returns an existing Stream with given id, or null if doesn't exist 676 */ 677 @SuppressWarnings("unchecked") 678 <T> Stream<T> getStream(int streamid) { 679 return (Stream<T>)streams.get(streamid); 680 } 681 682 /** 683 * Creates Stream with given id. 684 */ 685 <T> Stream<T> createStream(Exchange<T> exchange) { 686 Stream<T> stream = new Stream<>(client, this, exchange, windowController); 687 return stream; 688 } 689 690 <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) { 691 PushGroup<?,T> pg = parent.exchange.getPushGroup(); 692 return new Stream.PushedStream<>(pg, client, this, parent, pushEx); 693 } 694 695 <T> void putStream(Stream<T> stream, int streamid) { 696 streams.put(streamid, stream); 697 } 698 699 void deleteStream(int streamid) { 700 streams.remove(streamid); 701 windowController.removeStream(streamid); 702 } 703 704 /** 705 * Encode the headers into a List<ByteBuffer> and then create HEADERS 706 * and CONTINUATION frames from the list and return the List<Http2Frame>. 707 */ 708 private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) { 709 List<ByteBufferReference> buffers = encodeHeadersImpl( 710 getMaxSendFrameSize(), 711 frame.getAttachment().getRequestPseudoHeaders(), 712 frame.getUserHeaders(), 713 frame.getSystemHeaders()); 714 715 List<HeaderFrame> frames = new ArrayList<>(buffers.size()); 716 Iterator<ByteBufferReference> bufIterator = buffers.iterator(); 717 HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next()); 718 frames.add(oframe); 719 while(bufIterator.hasNext()) { 720 oframe = new ContinuationFrame(frame.streamid(), bufIterator.next()); 721 frames.add(oframe); 722 } 723 oframe.setFlag(HeaderFrame.END_HEADERS); 724 return frames; 725 } 726 727 // Dedicated cache for headers encoding ByteBuffer. 728 // There can be no concurrent access to this buffer as all access to this buffer 729 // and its content happen within a single critical code block section protected 730 // by the sendLock. / (see sendFrame()) 731 private ByteBufferPool headerEncodingPool = new ByteBufferPool(); 732 733 private ByteBufferReference getHeaderBuffer(int maxFrameSize) { 734 ByteBufferReference ref = headerEncodingPool.get(maxFrameSize); 735 ref.get().limit(maxFrameSize); 736 return ref; 737 } 738 739 /* 740 * Encodes all the headers from the given HttpHeaders into the given List 741 * of buffers. 742 * 743 * From https://tools.ietf.org/html/rfc7540#section-8.1.2 : 744 * 745 * ...Just as in HTTP/1.x, header field names are strings of ASCII 746 * characters that are compared in a case-insensitive fashion. However, 747 * header field names MUST be converted to lowercase prior to their 748 * encoding in HTTP/2... 749 */ 750 private List<ByteBufferReference> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) { 751 ByteBufferReference buffer = getHeaderBuffer(maxFrameSize); 752 List<ByteBufferReference> buffers = new ArrayList<>(); 753 for(HttpHeaders header : headers) { 754 for (Map.Entry<String, List<String>> e : header.map().entrySet()) { 755 String lKey = e.getKey().toLowerCase(); 756 List<String> values = e.getValue(); 757 for (String value : values) { 758 hpackOut.header(lKey, value); 759 while (!hpackOut.encode(buffer.get())) { 760 buffer.get().flip(); 761 buffers.add(buffer); 762 buffer = getHeaderBuffer(maxFrameSize); 763 } 764 } 765 } 766 } 767 buffer.get().flip(); 768 buffers.add(buffer); 769 return buffers; 770 } 771 772 private ByteBufferReference[] encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) { 773 oh.streamid(stream.streamid); 774 if (Log.headers()) { 775 StringBuilder sb = new StringBuilder("HEADERS FRAME (stream="); 776 sb.append(stream.streamid).append(")\n"); 777 Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders()); 778 Log.dumpHeaders(sb, " ", oh.getSystemHeaders()); 779 Log.dumpHeaders(sb, " ", oh.getUserHeaders()); 780 Log.logHeaders(sb.toString()); 781 } 782 List<HeaderFrame> frames = encodeHeaders(oh); 783 return encodeFrames(frames); 784 } 785 786 private ByteBufferReference[] encodeFrames(List<HeaderFrame> frames) { 787 if (Log.frames()) { 788 frames.forEach(f -> Log.logFrames(f, "OUT")); 789 } 790 return framesEncoder.encodeFrames(frames); 791 } 792 793 static Throwable getExceptionFrom(CompletableFuture<?> cf) { 794 try { 795 cf.get(); 796 return null; 797 } catch (Throwable e) { 798 if (e.getCause() != null) { 799 return e.getCause(); 800 } else { 801 return e; 802 } 803 } 804 } 805 806 private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) { 807 Stream<?> stream = oh.getAttachment(); 808 int streamid = nextstreamid; 809 nextstreamid += 2; 810 stream.registerStream(streamid); 811 // set outgoing window here. This allows thread sending 812 // body to proceed. 813 windowController.registerStream(streamid, getInitialSendWindowSize()); 814 return stream; 815 } 816 817 private final Object sendlock = new Object(); 818 819 void sendFrame(Http2Frame frame) { 820 try { 821 synchronized (sendlock) { 822 if (frame instanceof OutgoingHeaders) { 823 @SuppressWarnings("unchecked") 824 OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame; 825 Stream<?> stream = registerNewStream(oh); 826 // provide protection from inserting unordered frames between Headers and Continuation 827 connection.writeAsync(encodeHeaders(oh, stream)); 828 } else { 829 connection.writeAsync(encodeFrame(frame)); 830 } 831 } 832 connection.flushAsync(); 833 } catch (IOException e) { 834 if (!closed) { 835 Log.logError(e); 836 shutdown(e); 837 } 838 } 839 } 840 841 private ByteBufferReference[] encodeFrame(Http2Frame frame) { 842 Log.logFrames(frame, "OUT"); 843 return framesEncoder.encodeFrame(frame); 844 } 845 846 void sendDataFrame(DataFrame frame) { 847 try { 848 connection.writeAsync(encodeFrame(frame)); 849 connection.flushAsync(); 850 } catch (IOException e) { 851 if (!closed) { 852 Log.logError(e); 853 shutdown(e); 854 } 855 } 856 } 857 858 /* 859 * Direct call of the method bypasses synchronization on "sendlock" and 860 * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. 861 * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. 862 */ 863 void sendUnorderedFrame(Http2Frame frame) { 864 try { 865 connection.writeAsyncUnordered(encodeFrame(frame)); 866 connection.flushAsync(); 867 } catch (IOException e) { 868 if (!closed) { 869 Log.logError(e); 870 shutdown(e); 871 } 872 } 873 } 874 875 static class HeaderDecoder implements DecodingCallback { 876 HttpHeadersImpl headers; 877 878 HeaderDecoder() { 879 this.headers = new HttpHeadersImpl(); 880 } 881 882 @Override 883 public void onDecoded(CharSequence name, CharSequence value) { 884 headers.addHeader(name.toString(), value.toString()); 885 } 886 887 HttpHeadersImpl headers() { 888 return headers; 889 } 890 } 891 | 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.EOFException; 29 import java.io.IOException; 30 import java.lang.System.Logger.Level; 31 import java.net.InetSocketAddress; 32 import java.net.URI; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.StandardCharsets; 35 import java.util.Iterator; 36 import java.util.LinkedList; 37 import java.util.List; 38 import java.util.Map; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.ArrayList; 41 import java.util.Objects; 42 import java.util.concurrent.ConcurrentHashMap; 43 import java.util.concurrent.ConcurrentLinkedQueue; 44 import java.util.concurrent.Flow; 45 import java.util.function.Function; 46 import java.util.function.Supplier; 47 import javax.net.ssl.SSLEngine; 48 import jdk.incubator.http.HttpConnection.HttpPublisher; 49 import jdk.incubator.http.internal.common.FlowTube; 50 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber; 51 import jdk.incubator.http.internal.common.HttpHeadersImpl; 52 import jdk.incubator.http.internal.common.Log; 53 import jdk.incubator.http.internal.common.MinimalFuture; 54 import jdk.incubator.http.internal.common.SequentialScheduler; 55 import jdk.incubator.http.internal.common.Utils; 56 import jdk.incubator.http.internal.frame.ContinuationFrame; 57 import jdk.incubator.http.internal.frame.DataFrame; 58 import jdk.incubator.http.internal.frame.ErrorFrame; 59 import jdk.incubator.http.internal.frame.FramesDecoder; 60 import jdk.incubator.http.internal.frame.FramesEncoder; 61 import jdk.incubator.http.internal.frame.GoAwayFrame; 62 import jdk.incubator.http.internal.frame.HeaderFrame; 63 import jdk.incubator.http.internal.frame.HeadersFrame; 64 import jdk.incubator.http.internal.frame.Http2Frame; 65 import jdk.incubator.http.internal.frame.MalformedFrame; 66 import jdk.incubator.http.internal.frame.OutgoingHeaders; 67 import jdk.incubator.http.internal.frame.PingFrame; 68 import jdk.incubator.http.internal.frame.PushPromiseFrame; 69 import jdk.incubator.http.internal.frame.ResetFrame; 70 import jdk.incubator.http.internal.frame.SettingsFrame; 71 import jdk.incubator.http.internal.frame.WindowUpdateFrame; 72 import jdk.incubator.http.internal.hpack.Encoder; 73 import jdk.incubator.http.internal.hpack.Decoder; 74 import jdk.incubator.http.internal.hpack.DecodingCallback; 75 76 import static jdk.incubator.http.internal.frame.SettingsFrame.*; 77 78 79 /** 80 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used 81 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff. 82 * 83 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs 84 * to a HttpClientImpl. 85 * 86 * Creation cases: 87 * 1) upgraded HTTP/1.1 plain tcp connection 88 * 2) prior knowledge directly created plain tcp connection 89 * 3) directly created HTTP/2 SSL connection which uses ALPN. 90 * 91 * Sending is done by writing directly to underlying HttpConnection object which 92 * is operating in async mode. No flow control applies on output at this level 93 * and all writes are just executed as puts to an output Q belonging to HttpConnection 94 * Flow control is implemented by HTTP/2 protocol itself. 95 * 96 * Hpack header compression 97 * and outgoing stream creation is also done here, because these operations 98 * must be synchronized at the socket level. Stream objects send frames simply 99 * by placing them on the connection's output Queue. sendFrame() is called 100 * from a higher level (Stream) thread. 101 * 102 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles 103 * incoming Http2Frames, and directs them to the appropriate Stream.incoming() 104 * or handles them directly itself. This thread performs hpack decompression 105 * and incoming stream creation (Server push). Incoming frames destined for a 106 * stream are provided by calling Stream.incoming(). 107 */ 108 class Http2Connection { 109 110 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 111 static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag. 112 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 113 final static System.Logger DEBUG_LOGGER = 114 Utils.getDebugLogger("Http2Connection"::toString, DEBUG); 115 private final System.Logger debugHpack = 116 Utils.getHpackLogger(this::dbgString, DEBUG_HPACK); 117 static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0); 118 119 /* 120 * ByteBuffer pooling strategy for HTTP/2 protocol: 121 * 122 * In general there are 4 points where ByteBuffers are used: 123 * - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data 124 * in case of SSL connection. 125 * 126 * 1. Outgoing frames encoded to ByteBuffers. 127 * Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc) 128 * At this place no pools at all. All outgoing buffers should be collected by GC. 129 * 130 * 2. Incoming ByteBuffers (decoded to frames). 131 * Here, total elimination of BB pool is not a good idea. 132 * We don't know how many bytes we will receive through network. 133 * So here we allocate buffer of reasonable size. The following life of the BB: 134 * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses) 135 * BB is returned to pool, 136 * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method. 137 * Such BB is never returned to pool and will be GCed. 138 * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and 139 * the buffer could be release to pool. 140 * 141 * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool, 142 * because of we can't predict size encrypted packets. 143 * 144 */ 145 146 147 // A small class that allows to control frames with respect to the state of 148 // the connection preface. Any data received before the connection 149 // preface is sent will be buffered. 150 private final class FramesController { 151 volatile boolean prefaceSent; 152 volatile List<ByteBuffer> pending; 153 154 boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf) 155 throws IOException 156 { 157 // if preface is not sent, buffers data in the pending list 158 if (!prefaceSent) { 159 debug.log(Level.DEBUG, "Preface is not sent: buffering %d", 160 buf.remaining()); 161 synchronized (this) { 162 if (!prefaceSent) { 163 if (pending == null) pending = new ArrayList<>(); 164 pending.add(buf); 165 debug.log(Level.DEBUG, () -> "there are now " 166 + Utils.remaining(pending) 167 + " bytes buffered waiting for preface to be sent"); 168 return false; 169 } 170 } 171 } 172 173 // Preface is sent. Checks for pending data and flush it. 174 // We rely on this method being called from within the Http2TubeSubscriber 175 // scheduler, so we know that no other thread could execute this method 176 // concurrently while we're here. 177 // This ensures that later incoming buffers will not 178 // be processed before we have flushed the pending queue. 179 // No additional synchronization is therefore necessary here. 180 List<ByteBuffer> pending = this.pending; 181 this.pending = null; 182 if (pending != null) { 183 // flush pending data 184 debug.log(Level.DEBUG, () -> "Processing buffered data: " 185 + Utils.remaining(pending)); 186 for (ByteBuffer b : pending) { 187 decoder.decode(b); 188 } 189 } 190 // push the received buffer to the frames decoder. 191 if (buf != EMPTY_TRIGGER) { 192 debug.log(Level.DEBUG, "Processing %d", buf.remaining()); 193 decoder.decode(buf); 194 } 195 return true; 196 } 197 198 // Mark that the connection preface is sent 199 void markPrefaceSent() { 200 assert !prefaceSent; 201 synchronized (this) { 202 prefaceSent = true; 203 } 204 } 205 206 } 207 208 volatile boolean closed; 209 210 //------------------------------------- 211 final HttpConnection connection; 212 private final Http2ClientImpl client2; 213 private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>(); 214 private int nextstreamid; 215 private int nextPushStream = 2; 216 private final Encoder hpackOut; 217 private final Decoder hpackIn; 218 final SettingsFrame clientSettings; 219 private volatile SettingsFrame serverSettings; 220 private final String key; // for HttpClientImpl.connections map 221 private final FramesDecoder framesDecoder; 222 private final FramesEncoder framesEncoder = new FramesEncoder(); 223 224 /** 225 * Send Window controller for both connection and stream windows. 226 * Each of this connection's Streams MUST use this controller. 227 */ 228 private final WindowController windowController = new WindowController(); 229 private final FramesController framesController = new FramesController(); 230 private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber(); 231 final WindowUpdateSender windowUpdater; 232 private volatile Throwable cause; 233 private volatile Supplier<ByteBuffer> initial; 234 235 static final int DEFAULT_FRAME_SIZE = 16 * 1024; 236 237 238 // TODO: need list of control frames from other threads 239 // that need to be sent 240 241 private Http2Connection(HttpConnection connection, 242 Http2ClientImpl client2, 243 int nextstreamid, 244 String key) { 245 this.connection = connection; 246 this.client2 = client2; 247 this.nextstreamid = nextstreamid; 248 this.key = key; 249 this.clientSettings = this.client2.getClientSettings(); 250 this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); 251 // serverSettings will be updated by server 252 this.serverSettings = SettingsFrame.getDefaultSettings(); 253 this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 254 this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 255 debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString()); 256 debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn); 257 debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut); 258 this.windowUpdater = new ConnectionWindowUpdateSender(this, client().getReceiveBufferSize()); 259 } 260 261 /** 262 * Case 1) Create from upgraded HTTP/1.1 connection. 263 * Is ready to use. Can be SSL. exchange is the Exchange 264 * that initiated the connection, whose response will be delivered 265 * on a Stream. 266 */ 267 private Http2Connection(HttpConnection connection, 268 Http2ClientImpl client2, 269 Exchange<?> exchange, 270 Supplier<ByteBuffer> initial) 271 throws IOException, InterruptedException 272 { 273 this(connection, 274 client2, 275 3, // stream 1 is registered during the upgrade 276 keyFor(connection)); 277 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 278 279 Stream<?> initialStream = createStream(exchange); 280 initialStream.registerStream(1); 281 windowController.registerStream(1, getInitialSendWindowSize()); 282 initialStream.requestSent(); 283 // Upgrading: 284 // set callbacks before sending preface - makes sure anything that 285 // might be sent by the server will come our way. 286 this.initial = initial; 287 connectFlows(connection); 288 sendConnectionPreface(); 289 } 290 291 // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving 292 // agreement from the server. Async style but completes immediately, because 293 // the connection is already connected. 294 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, 295 Http2ClientImpl client2, 296 Exchange<?> exchange, 297 Supplier<ByteBuffer> initial) 298 { 299 return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial)); 300 } 301 302 // Requires TLS handshake. So, is really async 303 static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request, 304 Http2ClientImpl h2client) { 305 assert request.secure(); 306 AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection) 307 HttpConnection.getConnection(request.getAddress(), 308 h2client.client(), 309 request, 310 HttpClient.Version.HTTP_2); 311 312 return connection.connectAsync() 313 .thenCompose(unused -> checkSSLConfig(connection)) 314 .thenCompose(notused-> { 315 CompletableFuture<Http2Connection> cf = new MinimalFuture<>(); 316 try { 317 Http2Connection hc = new Http2Connection(request, h2client, connection); 318 cf.complete(hc); 319 } catch (IOException e) { 320 cf.completeExceptionally(e); 321 } 322 return cf; } ); 323 } 324 325 /** 326 * Cases 2) 3) 327 * 328 * request is request to be sent. 329 */ 330 private Http2Connection(HttpRequestImpl request, 331 Http2ClientImpl h2client, 332 HttpConnection connection) 333 throws IOException 334 { 335 this(connection, 336 h2client, 337 1, 338 keyFor(request.uri(), request.proxy())); 339 340 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 341 342 // safe to resume async reading now. 343 connectFlows(connection); 344 sendConnectionPreface(); 345 } 346 347 private void connectFlows(HttpConnection connection) { 348 FlowTube tube = connection.getConnectionFlow(); 349 // Connect the flow to our Http2TubeSubscriber: 350 tube.connectFlows(connection.publisher(), subscriber); 351 } 352 353 final HttpClientImpl client() { 354 return client2.client(); 355 } 356 357 /** 358 * Throws an IOException if h2 was not negotiated 359 */ 360 private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) { 361 assert aconn.isSecure(); 362 363 Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> { 364 CompletableFuture<Void> cf = new MinimalFuture<>(); 365 SSLEngine engine = aconn.getEngine(); 366 assert Objects.equals(alpn, engine.getApplicationProtocol()); 367 368 DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn ); 369 370 if (alpn == null || !alpn.equals("h2")) { 371 String msg; 372 if (alpn == null) { 373 Log.logSSL("ALPN not supported"); 374 msg = "ALPN not supported"; 375 } else { 376 switch (alpn) { 377 case "": 378 Log.logSSL(msg = "No ALPN negotiated"); 379 break; 380 case "http/1.1": 381 Log.logSSL( msg = "HTTP/1.1 ALPN returned"); 382 break; 383 default: 384 Log.logSSL(msg = "Unexpected ALPN: " + alpn); 385 cf.completeExceptionally(new IOException(msg)); 386 } 387 } 388 cf.completeExceptionally(new ALPNException(msg, aconn)); 389 return cf; 390 } 391 cf.complete(null); 392 return cf; 393 }; 394 395 return aconn.getALPN().thenCompose(checkAlpnCF); 396 } 397 398 static String keyFor(HttpConnection connection) { 399 boolean isProxy = connection.isProxied(); 400 boolean isSecure = connection.isSecure(); 401 InetSocketAddress addr = connection.address(); 402 403 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort()); 404 } 405 406 static String keyFor(URI uri, InetSocketAddress proxy) { 407 boolean isSecure = uri.getScheme().equalsIgnoreCase("https"); 408 boolean isProxy = proxy != null; 409 410 String host; 411 int port; 412 413 if (proxy != null) { 414 host = proxy.getHostString(); 415 port = proxy.getPort(); 416 } else { 417 host = uri.getHost(); 418 port = uri.getPort(); 419 } 420 return keyString(isSecure, isProxy, host, port); 421 } 422 423 // {C,S}:{H:P}:host:port 424 // C indicates clear text connection "http" 425 // S indicates secure "https" 426 // H indicates host (direct) connection 427 // P indicates proxy 428 // Eg: "S:H:foo.com:80" 429 static String keyString(boolean secure, boolean proxy, String host, int port) { 430 return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port; 431 } 432 433 String key() { 434 return this.key; 435 } 436 437 void putConnection() { 438 client2.putConnection(this); 439 } 440 441 private HttpPublisher publisher() { 442 return connection.publisher(); 443 } 444 445 private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) 446 throws IOException 447 { 448 debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder); 449 450 boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); 451 452 List<ByteBuffer> buffers = frame.getHeaderBlock(); 453 int len = buffers.size(); 454 for (int i = 0; i < len; i++) { 455 ByteBuffer b = buffers.get(i); 456 hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder); 457 } 458 } 459 460 final int getInitialSendWindowSize() { 461 return serverSettings.getParameter(INITIAL_WINDOW_SIZE); 462 } 463 464 void close() { 465 GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes()); 466 // TODO: set last stream. For now zero ok. 467 sendFrame(f); 468 } 469 470 long count; 471 final void asyncReceive(ByteBuffer buffer) { 472 // We don't need to read anything and 473 // we don't want to send anything back to the server 474 // until the connection preface has been sent. 475 // Therefore we're going to wait if needed before reading 476 // (and thus replying) to anything. 477 // Starting to reply to something (e.g send an ACK to a 478 // SettingsFrame sent by the server) before the connection 479 // preface is fully sent might result in the server 480 // sending a GOAWAY frame with 'invalid_preface'. 481 // 482 // Note: asyncReceive is only called from the Http2TubeSubscriber 483 // sequential scheduler. 484 try { 485 Supplier<ByteBuffer> bs = initial; 486 // ensure that we always handle the initial buffer first, 487 // if any. 488 if (bs != null) { 489 initial = null; 490 ByteBuffer b = bs.get(); 491 if (b.hasRemaining()) { 492 long c = ++count; 493 debug.log(Level.DEBUG, () -> "H2 Receiving Initial(" 494 + c +"): " + b.remaining()); 495 framesController.processReceivedData(framesDecoder, b); 496 } 497 } 498 ByteBuffer b = buffer; 499 // the Http2TubeSubscriber scheduler ensures that the order of incoming 500 // buffers is preserved. 501 if (b == EMPTY_TRIGGER) { 502 debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER"); 503 boolean prefaceSent = framesController.prefaceSent; 504 assert prefaceSent; 505 // call framesController.processReceivedData to potentially 506 // trigger the processing of all the data buffered there. 507 framesController.processReceivedData(framesDecoder, buffer); 508 debug.log(Level.DEBUG, "H2 processed buffered data"); 509 } else { 510 long c = ++count; 511 debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining()); 512 framesController.processReceivedData(framesDecoder, buffer); 513 debug.log(Level.DEBUG, "H2 processed(%d)", c); 514 } 515 } catch (Throwable e) { 516 String msg = Utils.stackTrace(e); 517 Log.logTrace(msg); 518 shutdown(e); 519 } 520 } 521 522 Throwable getRecordedCause() { 523 return cause; 524 } 525 526 void shutdown(Throwable t) { 527 debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t); 528 if (closed == true) return; 529 synchronized (this) { 530 if (closed == true) return; 531 closed = true; 532 } 533 Log.logError(t); 534 Throwable initialCause = this.cause; 535 if (initialCause == null) this.cause = t; 536 client2.deleteConnection(this); 537 List<Stream<?>> c = new LinkedList<>(streams.values()); 538 for (Stream<?> s : c) { 539 s.cancelImpl(t); 540 } 541 connection.close(); 542 } 543 544 /** 545 * Handles stream 0 (common) frames that apply to whole connection and passes 546 * other stream specific frames to that Stream object. 547 * 548 * Invokes Stream.incoming() which is expected to process frame without 549 * blocking. 550 */ 551 void processFrame(Http2Frame frame) throws IOException { 552 Log.logFrames(frame, "IN"); 553 int streamid = frame.streamid(); 554 if (frame instanceof MalformedFrame) { 555 Log.logError(((MalformedFrame) frame).getMessage()); 556 if (streamid == 0) { 557 protocolError(((MalformedFrame) frame).getErrorCode(), 558 ((MalformedFrame) frame).getMessage()); 559 } else { 560 debug.log(Level.DEBUG, () -> "Reset stream: " 561 + ((MalformedFrame) frame).getMessage()); 562 resetStream(streamid, ((MalformedFrame) frame).getErrorCode()); 563 } 564 return; 565 } 566 if (streamid == 0) { 567 handleConnectionFrame(frame); 568 } else { 569 if (frame instanceof SettingsFrame) { 570 // The stream identifier for a SETTINGS frame MUST be zero 571 protocolError(GoAwayFrame.PROTOCOL_ERROR); 572 return; 573 } 574 575 Stream<?> stream = getStream(streamid); 576 if (stream == null) { 577 // Should never receive a frame with unknown stream id 578 579 if (frame instanceof HeaderFrame) { 580 // always decode the headers as they may affect 581 // connection-level HPACK decoding state 582 HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder()); 583 decodeHeaders((HeaderFrame) frame, decoder); 584 } 585 586 int sid = frame.streamid(); 587 if (sid >= nextstreamid && !(frame instanceof ResetFrame)) { 588 // otherwise the stream has already been reset/closed 589 resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 590 } 591 return; 592 } 593 if (frame instanceof PushPromiseFrame) { 594 PushPromiseFrame pp = (PushPromiseFrame)frame; 595 handlePushPromise(stream, pp); 596 } else if (frame instanceof HeaderFrame) { 597 // decode headers (or continuation) 598 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer()); 599 stream.incoming(frame); 600 } else { 601 stream.incoming(frame); 602 } 603 } 604 } 605 606 private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp) 607 throws IOException 608 { 609 // always decode the headers as they may affect connection-level HPACK 610 // decoding state 611 HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder()); 612 decodeHeaders(pp, decoder); 613 614 HttpRequestImpl parentReq = parent.request; 615 int promisedStreamid = pp.getPromisedStream(); 616 if (promisedStreamid != nextPushStream) { 617 resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); 618 return; 619 } else { 620 nextPushStream += 2; 621 } 622 623 HttpHeadersImpl headers = decoder.headers(); 624 HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); 625 Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi); 626 Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch); 627 pushExch.exchImpl = pushStream; 628 pushStream.registerStream(promisedStreamid); 629 parent.incoming_pushPromise(pushReq, pushStream); 630 } 631 632 private void handleConnectionFrame(Http2Frame frame) 633 throws IOException 634 { 635 switch (frame.type()) { 636 case SettingsFrame.TYPE: 637 handleSettings((SettingsFrame)frame); 638 break; 639 case PingFrame.TYPE: 640 handlePing((PingFrame)frame); 641 break; 642 case GoAwayFrame.TYPE: 643 handleGoAway((GoAwayFrame)frame); 644 break; 645 case WindowUpdateFrame.TYPE: 646 handleWindowUpdate((WindowUpdateFrame)frame); 647 break; 648 default: 649 protocolError(ErrorFrame.PROTOCOL_ERROR); 650 } 651 } 652 653 void resetStream(int streamid, int code) throws IOException { 654 Log.logError( 655 "Resetting stream {0,number,integer} with error code {1,number,integer}", 656 streamid, code); 657 ResetFrame frame = new ResetFrame(streamid, code); 658 sendFrame(frame); 659 closeStream(streamid); 660 } 661 662 void closeStream(int streamid) { 663 debug.log(Level.DEBUG, "Closed stream %d", streamid); 664 Stream<?> s = streams.remove(streamid); 665 if (s != null) { 666 // decrement the reference count on the HttpClientImpl 667 // to allow the SelectorManager thread to exit if no 668 // other operation is pending and the facade is no 669 // longer referenced. 670 client().unreference(); 671 } 672 // ## Remove s != null. It is a hack for delayed cancellation,reset 673 if (s != null && !(s instanceof Stream.PushedStream)) { 674 // Since PushStreams have no request body, then they have no 675 // corresponding entry in the window controller. 676 windowController.removeStream(streamid); 677 } 678 } 679 /** 680 * Increments this connection's send Window by the amount in the given frame. 681 */ 682 private void handleWindowUpdate(WindowUpdateFrame f) 683 throws IOException 684 { 685 int amount = f.getUpdate(); 686 if (amount <= 0) { 687 // ## temporarily disable to workaround a bug in Jetty where it 688 // ## sends Window updates with a 0 update value. 689 //protocolError(ErrorFrame.PROTOCOL_ERROR); 690 } else { 691 boolean success = windowController.increaseConnectionWindow(amount); 692 if (!success) { 693 protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow 694 } 695 } 696 } 697 698 private void protocolError(int errorCode) 699 throws IOException 700 { 701 protocolError(errorCode, null); 702 } 703 704 private void protocolError(int errorCode, String msg) 705 throws IOException 706 { 707 GoAwayFrame frame = new GoAwayFrame(0, errorCode); 708 sendFrame(frame); 709 shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg)))); 710 } 711 712 private void handleSettings(SettingsFrame frame) 713 throws IOException 714 { 715 assert frame.streamid() == 0; 716 if (!frame.getFlag(SettingsFrame.ACK)) { 717 int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE); 718 int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE); 719 int diff = newWindowSize - oldWindowSize; 720 if (diff != 0) { 721 windowController.adjustActiveStreams(diff); 722 } 723 serverSettings = frame; 724 sendFrame(new SettingsFrame(SettingsFrame.ACK)); 725 } 726 } 727 728 private void handlePing(PingFrame frame) 729 throws IOException 741 } 742 743 /** 744 * Max frame size we are allowed to send 745 */ 746 public int getMaxSendFrameSize() { 747 int param = serverSettings.getParameter(MAX_FRAME_SIZE); 748 if (param == -1) { 749 param = DEFAULT_FRAME_SIZE; 750 } 751 return param; 752 } 753 754 /** 755 * Max frame size we will receive 756 */ 757 public int getMaxReceiveFrameSize() { 758 return clientSettings.getParameter(MAX_FRAME_SIZE); 759 } 760 761 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 762 763 private static final byte[] PREFACE_BYTES = 764 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1); 765 766 /** 767 * Sends Connection preface and Settings frame with current preferred 768 * values 769 */ 770 private void sendConnectionPreface() throws IOException { 771 Log.logTrace("{0}: start sending connection preface to {1}", 772 connection.channel().getLocalAddress(), 773 connection.address()); 774 SettingsFrame sf = client2.getClientSettings(); 775 ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); 776 Log.logFrames(sf, "OUT"); 777 // send preface bytes and SettingsFrame together 778 HttpPublisher publisher = publisher(); 779 publisher.enqueue(List.of(buf)); 780 publisher.signalEnqueued(); 781 // mark preface sent. 782 framesController.markPrefaceSent(); 783 Log.logTrace("PREFACE_BYTES sent"); 784 Log.logTrace("Settings Frame sent"); 785 786 // send a Window update for the receive buffer we are using 787 // minus the initial 64 K specified in protocol 788 final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1); 789 windowUpdater.sendWindowUpdate(len); 790 // there will be an ACK to the windows update - which should 791 // cause any pending data stored before the preface was sent to be 792 // flushed (see PrefaceController). 793 Log.logTrace("finished sending connection preface"); 794 debug.log(Level.DEBUG, "Triggering processing of buffered data" 795 + " after sending connection preface"); 796 subscriber.onNext(List.of(EMPTY_TRIGGER)); 797 } 798 799 /** 800 * Returns an existing Stream with given id, or null if doesn't exist 801 */ 802 @SuppressWarnings("unchecked") 803 <T> Stream<T> getStream(int streamid) { 804 return (Stream<T>)streams.get(streamid); 805 } 806 807 /** 808 * Creates Stream with given id. 809 */ 810 final <T> Stream<T> createStream(Exchange<T> exchange) { 811 Stream<T> stream = new Stream<>(this, exchange, windowController); 812 return stream; 813 } 814 815 <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) { 816 PushGroup<?,T> pg = parent.exchange.getPushGroup(); 817 return new Stream.PushedStream<>(pg, this, pushEx); 818 } 819 820 <T> void putStream(Stream<T> stream, int streamid) { 821 // increment the reference count on the HttpClientImpl 822 // to prevent the SelectorManager thread from exiting until 823 // the stream is closed. 824 client().reference(); 825 streams.put(streamid, stream); 826 } 827 828 /** 829 * Encode the headers into a List<ByteBuffer> and then create HEADERS 830 * and CONTINUATION frames from the list and return the List<Http2Frame>. 831 */ 832 private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) { 833 List<ByteBuffer> buffers = encodeHeadersImpl( 834 getMaxSendFrameSize(), 835 frame.getAttachment().getRequestPseudoHeaders(), 836 frame.getUserHeaders(), 837 frame.getSystemHeaders()); 838 839 List<HeaderFrame> frames = new ArrayList<>(buffers.size()); 840 Iterator<ByteBuffer> bufIterator = buffers.iterator(); 841 HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next()); 842 frames.add(oframe); 843 while(bufIterator.hasNext()) { 844 oframe = new ContinuationFrame(frame.streamid(), bufIterator.next()); 845 frames.add(oframe); 846 } 847 oframe.setFlag(HeaderFrame.END_HEADERS); 848 return frames; 849 } 850 851 // Dedicated cache for headers encoding ByteBuffer. 852 // There can be no concurrent access to this buffer as all access to this buffer 853 // and its content happen within a single critical code block section protected 854 // by the sendLock. / (see sendFrame()) 855 // private final ByteBufferPool headerEncodingPool = new ByteBufferPool(); 856 857 private ByteBuffer getHeaderBuffer(int maxFrameSize) { 858 ByteBuffer buf = ByteBuffer.allocate(maxFrameSize); 859 buf.limit(maxFrameSize); 860 return buf; 861 } 862 863 /* 864 * Encodes all the headers from the given HttpHeaders into the given List 865 * of buffers. 866 * 867 * From https://tools.ietf.org/html/rfc7540#section-8.1.2 : 868 * 869 * ...Just as in HTTP/1.x, header field names are strings of ASCII 870 * characters that are compared in a case-insensitive fashion. However, 871 * header field names MUST be converted to lowercase prior to their 872 * encoding in HTTP/2... 873 */ 874 private List<ByteBuffer> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) { 875 ByteBuffer buffer = getHeaderBuffer(maxFrameSize); 876 List<ByteBuffer> buffers = new ArrayList<>(); 877 for(HttpHeaders header : headers) { 878 for (Map.Entry<String, List<String>> e : header.map().entrySet()) { 879 String lKey = e.getKey().toLowerCase(); 880 List<String> values = e.getValue(); 881 for (String value : values) { 882 hpackOut.header(lKey, value); 883 while (!hpackOut.encode(buffer)) { 884 buffer.flip(); 885 buffers.add(buffer); 886 buffer = getHeaderBuffer(maxFrameSize); 887 } 888 } 889 } 890 } 891 buffer.flip(); 892 buffers.add(buffer); 893 return buffers; 894 } 895 896 private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) { 897 oh.streamid(stream.streamid); 898 if (Log.headers()) { 899 StringBuilder sb = new StringBuilder("HEADERS FRAME (stream="); 900 sb.append(stream.streamid).append(")\n"); 901 Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders()); 902 Log.dumpHeaders(sb, " ", oh.getSystemHeaders()); 903 Log.dumpHeaders(sb, " ", oh.getUserHeaders()); 904 Log.logHeaders(sb.toString()); 905 } 906 List<HeaderFrame> frames = encodeHeaders(oh); 907 return encodeFrames(frames); 908 } 909 910 private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) { 911 if (Log.frames()) { 912 frames.forEach(f -> Log.logFrames(f, "OUT")); 913 } 914 return framesEncoder.encodeFrames(frames); 915 } 916 917 private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) { 918 Stream<?> stream = oh.getAttachment(); 919 int streamid = nextstreamid; 920 nextstreamid += 2; 921 stream.registerStream(streamid); 922 // set outgoing window here. This allows thread sending 923 // body to proceed. 924 windowController.registerStream(streamid, getInitialSendWindowSize()); 925 return stream; 926 } 927 928 private final Object sendlock = new Object(); 929 930 void sendFrame(Http2Frame frame) { 931 try { 932 HttpPublisher publisher = publisher(); 933 synchronized (sendlock) { 934 if (frame instanceof OutgoingHeaders) { 935 @SuppressWarnings("unchecked") 936 OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame; 937 Stream<?> stream = registerNewStream(oh); 938 // provide protection from inserting unordered frames between Headers and Continuation 939 publisher.enqueue(encodeHeaders(oh, stream)); 940 } else { 941 publisher.enqueue(encodeFrame(frame)); 942 } 943 } 944 publisher.signalEnqueued(); 945 } catch (IOException e) { 946 if (!closed) { 947 Log.logError(e); 948 shutdown(e); 949 } 950 } 951 } 952 953 private List<ByteBuffer> encodeFrame(Http2Frame frame) { 954 Log.logFrames(frame, "OUT"); 955 return framesEncoder.encodeFrame(frame); 956 } 957 958 void sendDataFrame(DataFrame frame) { 959 try { 960 HttpPublisher publisher = publisher(); 961 publisher.enqueue(encodeFrame(frame)); 962 publisher.signalEnqueued(); 963 } catch (IOException e) { 964 if (!closed) { 965 Log.logError(e); 966 shutdown(e); 967 } 968 } 969 } 970 971 /* 972 * Direct call of the method bypasses synchronization on "sendlock" and 973 * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. 974 * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. 975 */ 976 void sendUnorderedFrame(Http2Frame frame) { 977 try { 978 HttpPublisher publisher = publisher(); 979 publisher.enqueueUnordered(encodeFrame(frame)); 980 publisher.signalEnqueued(); 981 } catch (IOException e) { 982 if (!closed) { 983 Log.logError(e); 984 shutdown(e); 985 } 986 } 987 } 988 989 /** 990 * A simple tube subscriber for reading from the connection flow. 991 */ 992 final class Http2TubeSubscriber implements TubeSubscriber { 993 volatile Flow.Subscription subscription; 994 volatile boolean completed; 995 volatile boolean dropped; 996 volatile Throwable error; 997 final ConcurrentLinkedQueue<ByteBuffer> queue 998 = new ConcurrentLinkedQueue<>(); 999 final SequentialScheduler scheduler = 1000 SequentialScheduler.synchronizedScheduler(this::processQueue); 1001 1002 final void processQueue() { 1003 try { 1004 while (!queue.isEmpty() && !scheduler.isStopped()) { 1005 ByteBuffer buffer = queue.poll(); 1006 debug.log(Level.DEBUG, 1007 "sending %d to Http2Connection.asyncReceive", 1008 buffer.remaining()); 1009 asyncReceive(buffer); 1010 } 1011 } catch (Throwable t) { 1012 Throwable x = error; 1013 if (x == null) error = t; 1014 } finally { 1015 Throwable x = error; 1016 if (x != null) { 1017 debug.log(Level.DEBUG, "Stopping scheduler", x); 1018 scheduler.stop(); 1019 Http2Connection.this.shutdown(x); 1020 } 1021 } 1022 } 1023 1024 1025 public void onSubscribe(Flow.Subscription subscription) { 1026 // supports being called multiple time. 1027 // doesn't cancel the previous subscription, since that is 1028 // most probably the same as the new subscription. 1029 assert this.subscription == null || dropped == false; 1030 this.subscription = subscription; 1031 dropped = false; 1032 // TODO FIXME: request(1) should be done by the delegate. 1033 if (!completed) { 1034 debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading"); 1035 subscription.request(Long.MAX_VALUE); 1036 } else { 1037 debug.log(Level.DEBUG, "onSubscribe: already completed"); 1038 } 1039 } 1040 1041 @Override 1042 public void onNext(List<ByteBuffer> item) { 1043 debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item) 1044 + " bytes in " + item.size() + " buffers"); 1045 queue.addAll(item); 1046 scheduler.deferOrSchedule(client().theExecutor()); 1047 } 1048 1049 @Override 1050 public void onError(Throwable throwable) { 1051 debug.log(Level.DEBUG, () -> "onError: " + throwable); 1052 error = throwable; 1053 completed = true; 1054 scheduler.deferOrSchedule(client().theExecutor()); 1055 } 1056 1057 @Override 1058 public void onComplete() { 1059 debug.log(Level.DEBUG, "EOF"); 1060 error = new EOFException("EOF reached while reading"); 1061 completed = true; 1062 scheduler.deferOrSchedule(client().theExecutor()); 1063 } 1064 1065 public void dropSubscription() { 1066 debug.log(Level.DEBUG, "dropSubscription"); 1067 // we could probably set subscription to null here... 1068 // then we might not need the 'dropped' boolean? 1069 dropped = true; 1070 } 1071 } 1072 1073 @Override 1074 public final String toString() { 1075 return dbgString(); 1076 } 1077 1078 final String dbgString() { 1079 return "Http2Connection(" 1080 + connection.getConnectionFlow() + ")"; 1081 } 1082 1083 final class LoggingHeaderDecoder extends HeaderDecoder { 1084 1085 private final HeaderDecoder delegate; 1086 private final System.Logger debugHpack = 1087 Utils.getHpackLogger(this::dbgString, DEBUG_HPACK); 1088 1089 LoggingHeaderDecoder(HeaderDecoder delegate) { 1090 this.delegate = delegate; 1091 } 1092 1093 String dbgString() { 1094 return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder"; 1095 } 1096 1097 @Override 1098 public void onDecoded(CharSequence name, CharSequence value) { 1099 delegate.onDecoded(name, value); 1100 } 1101 1102 @Override 1103 public void onIndexed(int index, 1104 CharSequence name, 1105 CharSequence value) { 1106 debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n", 1107 index, name, value); 1108 delegate.onIndexed(index, name, value); 1109 } 1110 1111 @Override 1112 public void onLiteral(int index, 1113 CharSequence name, 1114 CharSequence value, 1115 boolean valueHuffman) { 1116 debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n", 1117 index, name, value, valueHuffman); 1118 delegate.onLiteral(index, name, value, valueHuffman); 1119 } 1120 1121 @Override 1122 public void onLiteral(CharSequence name, 1123 boolean nameHuffman, 1124 CharSequence value, 1125 boolean valueHuffman) { 1126 debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n", 1127 name, nameHuffman, value, valueHuffman); 1128 delegate.onLiteral(name, nameHuffman, value, valueHuffman); 1129 } 1130 1131 @Override 1132 public void onLiteralNeverIndexed(int index, 1133 CharSequence name, 1134 CharSequence value, 1135 boolean valueHuffman) { 1136 debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n", 1137 index, name, value, valueHuffman); 1138 delegate.onLiteralNeverIndexed(index, name, value, valueHuffman); 1139 } 1140 1141 @Override 1142 public void onLiteralNeverIndexed(CharSequence name, 1143 boolean nameHuffman, 1144 CharSequence value, 1145 boolean valueHuffman) { 1146 debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n", 1147 name, nameHuffman, value, valueHuffman); 1148 delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman); 1149 } 1150 1151 @Override 1152 public void onLiteralWithIndexing(int index, 1153 CharSequence name, 1154 CharSequence value, 1155 boolean valueHuffman) { 1156 debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n", 1157 index, name, value, valueHuffman); 1158 delegate.onLiteralWithIndexing(index, name, value, valueHuffman); 1159 } 1160 1161 @Override 1162 public void onLiteralWithIndexing(CharSequence name, 1163 boolean nameHuffman, 1164 CharSequence value, 1165 boolean valueHuffman) { 1166 debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n", 1167 name, nameHuffman, value, valueHuffman); 1168 delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman); 1169 } 1170 1171 @Override 1172 public void onSizeUpdate(int capacity) { 1173 debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity); 1174 delegate.onSizeUpdate(capacity); 1175 } 1176 1177 @Override 1178 HttpHeadersImpl headers() { 1179 return delegate.headers(); 1180 } 1181 } 1182 1183 static class HeaderDecoder implements DecodingCallback { 1184 HttpHeadersImpl headers; 1185 1186 HeaderDecoder() { 1187 this.headers = new HttpHeadersImpl(); 1188 } 1189 1190 @Override 1191 public void onDecoded(CharSequence name, CharSequence value) { 1192 headers.addHeader(name.toString(), value.toString()); 1193 } 1194 1195 HttpHeadersImpl headers() { 1196 return headers; 1197 } 1198 } 1199 |