1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.net.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.URI; 31 import java.net.http.HttpConnection.Mode; 32 import java.nio.ByteBuffer; 33 import java.nio.charset.StandardCharsets; 34 import java.util.Collection; 35 import java.util.HashMap; 36 import java.util.LinkedList; 37 import java.util.List; 38 import java.util.Map; 39 import java.util.concurrent.CompletableFuture; 40 import sun.net.httpclient.hpack.Encoder; 41 import sun.net.httpclient.hpack.Decoder; 42 import static java.net.http.SettingsFrame.*; 43 import static java.net.http.Utils.BUFSIZE; 44 import java.util.ArrayList; 45 import java.util.Collections; 46 import java.util.Formatter; 47 import java.util.stream.Collectors; 48 import sun.net.httpclient.hpack.DecodingCallback; 49 50 /** 51 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used 52 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff. 53 * 54 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs 55 * to a HttpClientImpl. 56 * 57 * Creation cases: 58 * 1) upgraded HTTP/1.1 plain tcp connection 59 * 2) prior knowledge directly created plain tcp connection 60 * 3) directly created HTTP/2 SSL connection which uses ALPN. 61 * 62 * Sending is done by writing directly to underlying HttpConnection object which 63 * is operating in async mode. No flow control applies on output at this level 64 * and all writes are just executed as puts to an output Q belonging to HttpConnection 65 * Flow control is implemented by HTTP/2 protocol itself. 66 * 67 * Hpack header compression 68 * and outgoing stream creation is also done here, because these operations 69 * must be synchronized at the socket level. Stream objects send frames simply 70 * by placing them on the connection's output Queue. sendFrame() is called 71 * from a higher level (Stream) thread. 72 * 73 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles 74 * incoming Http2Frames, and directs them to the appropriate Stream.incoming() 75 * or handles them directly itself. This thread performs hpack decompression 76 * and incoming stream creation (Server push). Incoming frames destined for a 77 * stream are provided by calling Stream.incoming(). 78 */ 79 class Http2Connection implements BufferHandler { 80 81 final Queue<Http2Frame> outputQ; 82 volatile boolean closed; 83 84 //------------------------------------- 85 final HttpConnection connection; 86 HttpClientImpl client; 87 final Http2ClientImpl client2; 88 Map<Integer,Stream> streams; 89 int nextstreamid = 3; // stream 1 is registered separately 90 int nextPushStream = 2; 91 Encoder hpackOut; 92 Decoder hpackIn; 93 SettingsFrame clientSettings, serverSettings; 94 ByteBufferConsumer bbc; 95 final LinkedList<ByteBuffer> freeList; 96 final String key; // for HttpClientImpl.connections map 97 FrameReader reader; 98 99 // Connection level flow control windows 100 int sendWindow = INITIAL_WINDOW_SIZE; 101 102 final static int DEFAULT_FRAME_SIZE = 16 * 1024; 103 private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY; 104 105 final ExecutorWrapper executor; 106 107 /** 108 * This is established by the protocol spec and the peer will update it with 109 * WINDOW_UPDATEs, which affects the sendWindow. 110 */ 111 final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1; 112 113 // TODO: need list of control frames from other threads 114 // that need to be sent 115 116 /** 117 * Case 1) Create from upgraded HTTP/1.1 connection. 118 * Is ready to use. Will not be SSL. exchange is the Exchange 119 * that initiated the connection, whose response will be delivered 120 * on a Stream. 121 */ 122 Http2Connection(HttpConnection connection, Http2ClientImpl client2, 123 Exchange exchange) throws IOException, InterruptedException { 124 this.outputQ = new Queue<>(); 125 String msg = "Connection send window size " + Integer.toString(sendWindow); 126 Log.logTrace(msg); 127 128 //this.initialExchange = exchange; 129 assert !(connection instanceof SSLConnection); 130 this.connection = connection; 131 this.client = client2.client(); 132 this.client2 = client2; 133 this.executor = client.executorWrapper(); 134 this.freeList = new LinkedList<>(); 135 this.key = keyFor(connection); 136 streams = Collections.synchronizedMap(new HashMap<>()); 137 initCommon(); 138 //sendConnectionPreface(); 139 Stream initialStream = createStream(exchange); 140 initialStream.registerStream(1); 141 initialStream.requestSent(); 142 sendConnectionPreface(); 143 connection.configureMode(Mode.ASYNC); 144 // start reading and writing 145 // start reading 146 AsyncConnection asyncConn = (AsyncConnection)connection; 147 asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown); 148 asyncReceive(connection.getRemaining()); 149 asyncConn.startReading(); 150 } 151 152 // async style but completes immediately 153 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, 154 Http2ClientImpl client2, Exchange exchange) { 155 CompletableFuture<Http2Connection> cf = new CompletableFuture<>(); 156 try { 157 Http2Connection c = new Http2Connection(connection, client2, exchange); 158 cf.complete(c); 159 } catch (IOException | InterruptedException e) { 160 cf.completeExceptionally(e); 161 } 162 return cf; 163 } 164 165 /** 166 * Cases 2) 3) 167 * 168 * request is request to be sent. 169 */ 170 Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException { 171 InetSocketAddress proxy = request.proxy(); 172 URI uri = request.uri(); 173 InetSocketAddress addr = Utils.getAddress(request); 174 String msg = "Connection send window size " + Integer.toString(sendWindow); 175 Log.logTrace(msg); 176 this.key = keyFor(uri, proxy); 177 this.connection = HttpConnection.getConnection(addr, request, this); 178 streams = Collections.synchronizedMap(new HashMap<>()); 179 this.client = request.client(); 180 this.client2 = client.client2(); 181 this.executor = client.executorWrapper(); 182 this.freeList = new LinkedList<>(); 183 this.outputQ = new Queue<>(); 184 nextstreamid = 1; 185 initCommon(); 186 connection.connect(); 187 connection.configureMode(Mode.ASYNC); 188 // start reading 189 AsyncConnection asyncConn = (AsyncConnection)connection; 190 asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown); 191 sendConnectionPreface(); 192 asyncConn.startReading(); 193 } 194 195 // NEW 196 synchronized void obtainSendWindow(int amount) throws InterruptedException { 197 while (amount > 0) { 198 int n = Math.min(amount, sendWindow); 199 sendWindow -= n; 200 amount -= n; 201 if (amount > 0) 202 wait(); 203 } 204 } 205 206 synchronized void updateSendWindow(int amount) { 207 if (sendWindow == 0) { 208 sendWindow += amount; 209 notifyAll(); 210 } else 211 sendWindow += amount; 212 } 213 214 synchronized int sendWindow() { 215 return sendWindow; 216 } 217 218 static String keyFor(HttpConnection connection) { 219 boolean isProxy = connection.isProxied(); 220 boolean isSecure = connection.isSecure(); 221 InetSocketAddress addr = connection.address(); 222 223 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort()); 224 } 225 226 static String keyFor(URI uri, InetSocketAddress proxy) { 227 boolean isSecure = uri.getScheme().equalsIgnoreCase("https"); 228 boolean isProxy = proxy != null; 229 230 String host; 231 int port; 232 233 if (isProxy) { 234 host = proxy.getHostString(); 235 port = proxy.getPort(); 236 } else { 237 host = uri.getHost(); 238 port = uri.getPort(); 239 } 240 return keyString(isSecure, isProxy, host, port); 241 } 242 243 // {C,S}:{H:P}:host:port 244 // C indicates clear text connection "http" 245 // S indicates secure "https" 246 // H indicates host (direct) connection 247 // P indicates proxy 248 // Eg: "S:H:foo.com:80" 249 static String keyString(boolean secure, boolean proxy, String host, int port) { 250 char c1 = secure ? 'S' : 'C'; 251 char c2 = proxy ? 'P' : 'H'; 252 253 StringBuilder sb = new StringBuilder(128); 254 sb.append(c1).append(':').append(c2).append(':') 255 .append(host).append(':').append(port); 256 return sb.toString(); 257 } 258 259 String key() { 260 return this.key; 261 } 262 263 void putConnection() { 264 client2.putConnection(this); 265 } 266 267 private static String toHexdump1(ByteBuffer bb) { 268 bb.mark(); 269 StringBuilder sb = new StringBuilder(512); 270 Formatter f = new Formatter(sb); 271 272 while (bb.hasRemaining()) { 273 int i = Byte.toUnsignedInt(bb.get()); 274 f.format("%02x:", i); 275 } 276 sb.deleteCharAt(sb.length()-1); 277 bb.reset(); 278 return sb.toString(); 279 } 280 281 private static String toHexdump(ByteBuffer bb) { 282 List<String> words = new ArrayList<>(); 283 int i = 0; 284 bb.mark(); 285 while (bb.hasRemaining()) { 286 if (i % 2 == 0) { 287 words.add(""); 288 } 289 byte b = bb.get(); 290 String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1); 291 words.set(i / 2, words.get(i / 2) + hex); 292 i++; 293 } 294 bb.reset(); 295 return words.stream().collect(Collectors.joining(" ")); 296 } 297 298 private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) { 299 boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); 300 301 ByteBuffer[] buffers = frame.getHeaderBlock(); 302 for (int i = 0; i < buffers.length; i++) { 303 hpackIn.decode(buffers[i], endOfHeaders && (i == buffers.length - 1), decoder); 304 } 305 } 306 307 int getInitialSendWindowSize() { 308 return serverSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE); 309 } 310 311 void close() { 312 GoAwayFrame f = new GoAwayFrame(); 313 f.setDebugData("Requested by user".getBytes()); 314 // TODO: set last stream. For now zero ok. 315 sendFrame(f); 316 } 317 318 // BufferHandler methods 319 320 @Override 321 public ByteBuffer getBuffer(int n) { 322 return client.getBuffer(n); 323 } 324 325 @Override 326 public void returnBuffer(ByteBuffer buf) { 327 client.returnBuffer(buf); 328 } 329 330 @Override 331 public void setMinBufferSize(int n) { 332 client.setMinBufferSize(n); 333 } 334 335 private final Object readlock = new Object(); 336 337 void asyncReceive(ByteBuffer buffer) { 338 synchronized (readlock) { 339 try { 340 if (reader == null) { 341 reader = new FrameReader(buffer); 342 } else { 343 reader.input(buffer); 344 } 345 while (true) { 346 if (reader.haveFrame()) { 347 List<ByteBuffer> buffers = reader.frame(); 348 349 ByteBufferConsumer bbc = new ByteBufferConsumer(buffers, this::getBuffer); 350 processFrame(bbc); 351 if (bbc.consumed()) { 352 reader = new FrameReader(); 353 return; 354 } else { 355 reader = new FrameReader(reader); 356 } 357 } else 358 return; 359 } 360 } catch (Throwable e) { 361 String msg = Utils.stackTrace(e); 362 Log.logTrace(msg); 363 shutdown(e); 364 } 365 } 366 } 367 368 void shutdown(Throwable t) { 369 Log.logError(t); 370 closed = true; 371 client2.deleteConnection(this); 372 List<Stream> c = new LinkedList<>(streams.values()); 373 for (Stream s : c) { 374 s.cancelImpl(t); 375 } 376 connection.close(); 377 } 378 379 /** 380 * Handles stream 0 (common) frames that apply to whole connection and passes 381 * other stream specific frames to that Stream object. 382 * 383 * Invokes Stream.incoming() which is expected to process frame without 384 * blocking. 385 */ 386 void processFrame(ByteBufferConsumer bbc) throws IOException, InterruptedException { 387 Http2Frame frame = Http2Frame.readIncoming(bbc); 388 Log.logFrames(frame, "IN"); 389 int streamid = frame.streamid(); 390 if (streamid == 0) { 391 handleCommonFrame(frame); 392 } else { 393 Stream stream = getStream(streamid); 394 if (stream == null) { 395 // should never receive a frame with unknown stream id 396 resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 397 } 398 if (frame instanceof PushPromiseFrame) { 399 PushPromiseFrame pp = (PushPromiseFrame)frame; 400 handlePushPromise(stream, pp); 401 } else if (frame instanceof HeaderFrame) { 402 // decode headers (or continuation) 403 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer()); 404 stream.incoming(frame); 405 } else 406 stream.incoming(frame); 407 } 408 } 409 410 private void handlePushPromise(Stream parent, PushPromiseFrame pp) 411 throws IOException, InterruptedException { 412 413 HttpRequestImpl parentReq = parent.request; 414 int promisedStreamid = pp.getPromisedStream(); 415 if (promisedStreamid != nextPushStream) { 416 resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); 417 return; 418 } else { 419 nextPushStream += 2; 420 } 421 HeaderDecoder decoder = new HeaderDecoder(); 422 decodeHeaders(pp, decoder); 423 HttpHeadersImpl headers = decoder.headers(); 424 HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); 425 426 Stream.PushedStream pushStream = createPushStream(parent, pushReq); 427 pushStream.registerStream(promisedStreamid); 428 parent.incoming_pushPromise(pushReq, pushStream); 429 } 430 431 private void handleCommonFrame(Http2Frame frame) 432 throws IOException, InterruptedException { 433 434 switch (frame.type()) { 435 case SettingsFrame.TYPE: 436 { SettingsFrame f = (SettingsFrame)frame; 437 handleSettings(f);} 438 break; 439 case PingFrame.TYPE: 440 { PingFrame f = (PingFrame)frame; 441 handlePing(f);} 442 break; 443 case GoAwayFrame.TYPE: 444 { GoAwayFrame f = (GoAwayFrame)frame; 445 handleGoAway(f);} 446 break; 447 case WindowUpdateFrame.TYPE: 448 { WindowUpdateFrame f = (WindowUpdateFrame)frame; 449 handleWindowUpdate(f);} 450 break; 451 default: 452 protocolError(ErrorFrame.PROTOCOL_ERROR); 453 } 454 } 455 456 void resetStream(int streamid, int code) throws IOException, InterruptedException { 457 Log.logError( 458 "Resetting stream {0,number,integer} with error code {1,number,integer}", 459 streamid, code); 460 ResetFrame frame = new ResetFrame(); 461 frame.streamid(streamid); 462 frame.setErrorCode(code); 463 sendFrame(frame); 464 streams.remove(streamid); 465 } 466 467 private void handleWindowUpdate(WindowUpdateFrame f) 468 throws IOException, InterruptedException { 469 updateSendWindow(f.getUpdate()); 470 } 471 472 private void protocolError(int errorCode) 473 throws IOException, InterruptedException { 474 GoAwayFrame frame = new GoAwayFrame(); 475 frame.setErrorCode(errorCode); 476 sendFrame(frame); 477 String msg = "Error code: " + errorCode; 478 shutdown(new IOException("protocol error")); 479 } 480 481 private void handleSettings(SettingsFrame frame) 482 throws IOException, InterruptedException { 483 if (frame.getFlag(SettingsFrame.ACK)) { 484 // ignore ack frames for now. 485 return; 486 } 487 serverSettings = frame; 488 SettingsFrame ack = getAckFrame(frame.streamid()); 489 sendFrame(ack); 490 } 491 492 private void handlePing(PingFrame frame) 493 throws IOException, InterruptedException { 494 frame.setFlag(PingFrame.ACK); 495 sendFrame(frame); 496 } 497 498 private void handleGoAway(GoAwayFrame frame) 499 throws IOException, InterruptedException { 500 //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode())); 501 shutdown(new IOException("GOAWAY received")); 502 } 503 504 private void initCommon() { 505 clientSettings = client2.getClientSettings(); 506 507 // serverSettings will be updated by server 508 serverSettings = SettingsFrame.getDefaultSettings(); 509 hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 510 hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 511 } 512 513 /** 514 * Max frame size we are allowed to send 515 */ 516 public int getMaxSendFrameSize() { 517 int param = serverSettings.getParameter(MAX_FRAME_SIZE); 518 if (param == -1) { 519 param = DEFAULT_FRAME_SIZE; 520 } 521 return param; 522 } 523 524 /** 525 * Max frame size we will receive 526 */ 527 public int getMaxReceiveFrameSize() { 528 return clientSettings.getParameter(MAX_FRAME_SIZE); 529 } 530 531 // Not sure how useful this is. 532 public int getMaxHeadersSize() { 533 return serverSettings.getParameter(MAX_HEADER_LIST_SIZE); 534 } 535 536 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 537 538 private static final byte[] PREFACE_BYTES = 539 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1); 540 541 /** 542 * Sends Connection preface and Settings frame with current preferred 543 * values 544 */ 545 private void sendConnectionPreface() throws IOException { 546 ByteBufferGenerator bg = new ByteBufferGenerator(this); 547 bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES); 548 ByteBuffer[] ba = bg.getBufferArray(); 549 connection.write(ba, 0, ba.length); 550 551 bg = new ByteBufferGenerator(this); 552 SettingsFrame sf = client2.getClientSettings(); 553 Log.logFrames(sf, "OUT"); 554 sf.writeOutgoing(bg); 555 WindowUpdateFrame wup = new WindowUpdateFrame(); 556 wup.streamid(0); 557 // send a Window update for the receive buffer we are using 558 // minus the initial 64 K specified in protocol 559 wup.setUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1)); 560 wup.computeLength(); 561 wup.writeOutgoing(bg); 562 Log.logFrames(wup, "OUT"); 563 ba = bg.getBufferArray(); 564 connection.write(ba, 0, ba.length); 565 } 566 567 /** 568 * Returns an existing Stream with given id, or null if doesn't exist 569 */ 570 Stream getStream(int streamid) { 571 return streams.get(streamid); 572 } 573 574 /** 575 * Creates Stream with given id. 576 */ 577 Stream createStream(Exchange exchange) { 578 Stream stream = new Stream(client, this, exchange); 579 return stream; 580 } 581 582 Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) { 583 Stream.PushGroup<?> pg = parent.request.pushGroup(); 584 return new Stream.PushedStream(pg, client, this, parent, pushReq); 585 } 586 587 void putStream(Stream stream, int streamid) { 588 streams.put(streamid, stream); 589 } 590 591 void deleteStream(Stream stream) { 592 streams.remove(stream.streamid); 593 } 594 595 static final int MAX_STREAM = Integer.MAX_VALUE - 2; 596 597 // Number of header bytes in a Headers Frame 598 final static int HEADERS_HEADER_SIZE = 15; 599 600 // Number of header bytes in a Continuation frame 601 final static int CONTIN_HEADER_SIZE = 9; 602 603 /** 604 * Encode the headers into a List<ByteBuffer> and then create HEADERS 605 * and CONTINUATION frames from the list and return the List<Http2Frame>. 606 * 607 * @param frame 608 * @return 609 */ 610 private LinkedList<Http2Frame> encodeHeaders(OutgoingHeaders frame) { 611 LinkedList<ByteBuffer> buffers = new LinkedList<>(); 612 ByteBuffer buf = getBuffer(); 613 buffers.add(buf); 614 encodeHeadersImpl(frame.stream.getRequestPseudoHeaders(), buffers); 615 encodeHeadersImpl(frame.getUserHeaders(), buffers); 616 encodeHeadersImpl(frame.getSystemHeaders(), buffers); 617 618 for (ByteBuffer b : buffers) { 619 b.flip(); 620 } 621 622 LinkedList<Http2Frame> frames = new LinkedList<>(); 623 int maxframesize = getMaxSendFrameSize(); 624 625 HeadersFrame oframe = new HeadersFrame(); 626 oframe.setFlags(frame.getFlags()); 627 oframe.streamid(frame.streamid()); 628 629 oframe.setHeaderBlock(getBufferArray(buffers, maxframesize)); 630 frames.add(oframe); 631 // Any buffers left? 632 boolean done = buffers.isEmpty(); 633 if (done) { 634 oframe.setFlag(HeaderFrame.END_HEADERS); 635 } else { 636 ContinuationFrame cf = null; 637 while (!done) { 638 cf = new ContinuationFrame(); 639 cf.streamid(frame.streamid()); 640 cf.setHeaderBlock(getBufferArray(buffers, maxframesize)); 641 frames.add(cf); 642 done = buffers.isEmpty(); 643 } 644 cf.setFlag(HeaderFrame.END_HEADERS); 645 } 646 return frames; 647 } 648 649 // should always return at least one buffer 650 private static ByteBuffer[] getBufferArray(LinkedList<ByteBuffer> list, int maxsize) { 651 assert maxsize >= BUFSIZE; 652 LinkedList<ByteBuffer> newlist = new LinkedList<>(); 653 int size = list.size(); 654 int nbytes = 0; 655 for (int i=0; i<size; i++) { 656 ByteBuffer buf = list.getFirst(); 657 if (nbytes + buf.remaining() <= maxsize) { 658 nbytes += buf.remaining(); 659 newlist.add(buf); 660 list.remove(); 661 } else { 662 break; 663 } 664 } 665 return newlist.toArray(empty); 666 } 667 668 /** 669 * Encode all the headers from the given HttpHeadersImpl into the given List. 670 */ 671 private void encodeHeadersImpl(HttpHeaders hdrs, LinkedList<ByteBuffer> buffers) { 672 ByteBuffer buffer; 673 if (!(buffer = buffers.getLast()).hasRemaining()) { 674 buffer = getBuffer(); 675 buffers.add(buffer); 676 } 677 for (Map.Entry<String, List<String>> e : hdrs.map().entrySet()) { 678 String key = e.getKey(); 679 String lkey = key.toLowerCase(); 680 List<String> values = e.getValue(); 681 for (String value : values) { 682 hpackOut.header(lkey, value); 683 boolean encoded = false; 684 do { 685 encoded = hpackOut.encode(buffer); 686 if (!encoded) { 687 buffer = getBuffer(); 688 buffers.add(buffer); 689 } 690 } while (!encoded); 691 } 692 } 693 } 694 695 public void sendFrames(List<Http2Frame> frames) throws IOException, InterruptedException { 696 for (Http2Frame frame : frames) { 697 sendFrame(frame); 698 } 699 } 700 701 static Throwable getExceptionFrom(CompletableFuture<?> cf) { 702 try { 703 cf.get(); 704 return null; 705 } catch (Throwable e) { 706 if (e.getCause() != null) 707 return e.getCause(); 708 else 709 return e; 710 } 711 } 712 713 714 void execute(Runnable r) { 715 executor.execute(r, null); 716 } 717 718 private final Object sendlock = new Object(); 719 720 /** 721 * 722 */ 723 void sendFrame(Http2Frame frame) { 724 synchronized (sendlock) { 725 try { 726 if (frame instanceof OutgoingHeaders) { 727 OutgoingHeaders oh = (OutgoingHeaders) frame; 728 Stream stream = oh.getStream(); 729 stream.registerStream(nextstreamid); 730 oh.streamid(nextstreamid); 731 nextstreamid += 2; 732 // set outgoing window here. This allows thread sending 733 // body to proceed. 734 stream.updateOutgoingWindow(getInitialSendWindowSize()); 735 LinkedList<Http2Frame> frames = encodeHeaders(oh); 736 for (Http2Frame f : frames) { 737 sendOneFrame(f); 738 } 739 } else { 740 sendOneFrame(frame); 741 } 742 743 } catch (IOException e) { 744 if (!closed) { 745 Log.logError(e); 746 shutdown(e); 747 } 748 } 749 } 750 } 751 752 /** 753 * Send a frame. 754 * 755 * @param frame 756 * @throws IOException 757 */ 758 private void sendOneFrame(Http2Frame frame) throws IOException { 759 ByteBufferGenerator bbg = new ByteBufferGenerator(this); 760 frame.computeLength(); 761 Log.logFrames(frame, "OUT"); 762 frame.writeOutgoing(bbg); 763 ByteBuffer[] currentBufs = bbg.getBufferArray(); 764 connection.write(currentBufs, 0, currentBufs.length); 765 } 766 767 768 private SettingsFrame getAckFrame(int streamid) { 769 SettingsFrame frame = new SettingsFrame(); 770 frame.setFlag(SettingsFrame.ACK); 771 frame.streamid(streamid); 772 return frame; 773 } 774 775 static class HeaderDecoder implements DecodingCallback { 776 HttpHeadersImpl headers; 777 778 HeaderDecoder() { 779 this.headers = new HttpHeadersImpl(); 780 } 781 782 @Override 783 public void onDecoded(CharSequence name, CharSequence value) { 784 headers.addHeader(name.toString(), value.toString()); 785 } 786 787 HttpHeadersImpl headers() { 788 return headers; 789 } 790 } 791 }