1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.net.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.URI; 31 import java.net.http.HttpConnection.Mode; 32 import java.nio.ByteBuffer; 33 import java.nio.charset.StandardCharsets; 34 import java.util.HashMap; 35 import java.util.LinkedList; 36 import java.util.List; 37 import java.util.Map; 38 import java.util.concurrent.CompletableFuture; 39 import sun.net.httpclient.hpack.Encoder; 40 import sun.net.httpclient.hpack.Decoder; 41 import static java.net.http.SettingsFrame.*; 42 import static java.net.http.Utils.BUFSIZE; 43 import java.util.ArrayList; 44 import java.util.Collections; 45 import java.util.Formatter; 46 import java.util.stream.Collectors; 47 import sun.net.httpclient.hpack.DecodingCallback; 48 49 /** 50 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used 51 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff. 52 * 53 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs 54 * to a HttpClientImpl. 55 * 56 * Creation cases: 57 * 1) upgraded HTTP/1.1 plain tcp connection 58 * 2) prior knowledge directly created plain tcp connection 59 * 3) directly created HTTP/2 SSL connection which uses ALPN. 60 * 61 * Sending is done by writing directly to underlying HttpConnection object which 62 * is operating in async mode. No flow control applies on output at this level 63 * and all writes are just executed as puts to an output Q belonging to HttpConnection 64 * Flow control is implemented by HTTP/2 protocol itself. 65 * 66 * Hpack header compression 67 * and outgoing stream creation is also done here, because these operations 68 * must be synchronized at the socket level. Stream objects send frames simply 69 * by placing them on the connection's output Queue. sendFrame() is called 70 * from a higher level (Stream) thread. 71 * 72 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles 73 * incoming Http2Frames, and directs them to the appropriate Stream.incoming() 74 * or handles them directly itself. This thread performs hpack decompression 75 * and incoming stream creation (Server push). Incoming frames destined for a 76 * stream are provided by calling Stream.incoming(). 77 */ 78 class Http2Connection implements BufferHandler { 79 80 volatile boolean closed; 81 82 //------------------------------------- 83 final HttpConnection connection; 84 final AsyncConnection connectionAsync; 85 HttpClientImpl client; 86 final Http2ClientImpl client2; 87 Map<Integer,Stream> streams; 88 int nextstreamid = 3; // stream 1 is registered separately 89 int nextPushStream = 2; 90 Encoder hpackOut; 91 Decoder hpackIn; 92 SettingsFrame clientSettings, serverSettings; 93 final LinkedList<ByteBuffer> freeList; 94 final String key; // for HttpClientImpl.connections map 95 FrameReader reader; 96 97 // Connection level flow control windows 98 final WindowControl connectionSendWindow = new WindowControl(INITIAL_WINDOW_SIZE); 99 100 final static int DEFAULT_FRAME_SIZE = 16 * 1024; 101 private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY; 102 103 final ExecutorWrapper executor; 104 105 WindowUpdateSender windowUpdater; 106 /** 107 * This is established by the protocol spec and the peer will update it with 108 * WINDOW_UPDATEs, which affects the connectionSendWindow. 109 */ 110 final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1; 111 112 // TODO: need list of control frames from other threads 113 // that need to be sent 114 115 /** 116 * Case 1) Create from upgraded HTTP/1.1 connection. 117 * Is ready to use. Will not be SSL. exchange is the Exchange 118 * that initiated the connection, whose response will be delivered 119 * on a Stream. 120 */ 121 Http2Connection(HttpConnection connection, Http2ClientImpl client2, 122 Exchange exchange) throws IOException, InterruptedException { 123 String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available()); 124 Log.logTrace(msg); 125 126 //this.initialExchange = exchange; 127 assert !(connection instanceof SSLConnection); 128 this.connection = connection; 129 this.connectionAsync = (AsyncConnection)connection; 130 this.client = client2.client(); 131 this.client2 = client2; 132 this.executor = client.executorWrapper(); 133 this.freeList = new LinkedList<>(); 134 this.key = keyFor(connection); 135 streams = Collections.synchronizedMap(new HashMap<>()); 136 initCommon(); 137 //sendConnectionPreface(); 138 Stream initialStream = createStream(exchange); 139 initialStream.registerStream(1); 140 initialStream.requestSent(); 141 sendConnectionPreface(); 142 // start reading and writing 143 // start reading 144 connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown); 145 connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility. 146 asyncReceive(connection.getRemaining()); 147 connectionAsync.startReading(); 148 } 149 150 // async style but completes immediately 151 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, 152 Http2ClientImpl client2, Exchange exchange) { 153 CompletableFuture<Http2Connection> cf = new CompletableFuture<>(); 154 try { 155 Http2Connection c = new Http2Connection(connection, client2, exchange); 156 cf.complete(c); 157 } catch (IOException | InterruptedException e) { 158 cf.completeExceptionally(e); 159 } 160 return cf; 161 } 162 163 /** 164 * Cases 2) 3) 165 * 166 * request is request to be sent. 167 */ 168 Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException { 169 InetSocketAddress proxy = request.proxy(); 170 URI uri = request.uri(); 171 InetSocketAddress addr = Utils.getAddress(request); 172 String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available()); 173 Log.logTrace(msg); 174 this.key = keyFor(uri, proxy); 175 this.connection = HttpConnection.getConnection(addr, request, this); 176 this.connectionAsync = (AsyncConnection)connection; 177 streams = Collections.synchronizedMap(new HashMap<>()); 178 this.client = request.client(); 179 this.client2 = client.client2(); 180 this.executor = client.executorWrapper(); 181 this.freeList = new LinkedList<>(); 182 nextstreamid = 1; 183 initCommon(); 184 connection.connect(); 185 sendConnectionPreface(); 186 // start reading 187 connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown); 188 connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility. 189 190 connectionAsync.startReading(); 191 } 192 193 static String keyFor(HttpConnection connection) { 194 boolean isProxy = connection.isProxied(); 195 boolean isSecure = connection.isSecure(); 196 InetSocketAddress addr = connection.address(); 197 198 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort()); 199 } 200 201 static String keyFor(URI uri, InetSocketAddress proxy) { 202 boolean isSecure = uri.getScheme().equalsIgnoreCase("https"); 203 boolean isProxy = proxy != null; 204 205 String host; 206 int port; 207 208 if (isProxy) { 209 host = proxy.getHostString(); 210 port = proxy.getPort(); 211 } else { 212 host = uri.getHost(); 213 port = uri.getPort(); 214 } 215 return keyString(isSecure, isProxy, host, port); 216 } 217 218 // {C,S}:{H:P}:host:port 219 // C indicates clear text connection "http" 220 // S indicates secure "https" 221 // H indicates host (direct) connection 222 // P indicates proxy 223 // Eg: "S:H:foo.com:80" 224 static String keyString(boolean secure, boolean proxy, String host, int port) { 225 char c1 = secure ? 'S' : 'C'; 226 char c2 = proxy ? 'P' : 'H'; 227 228 StringBuilder sb = new StringBuilder(128); 229 sb.append(c1).append(':').append(c2).append(':') 230 .append(host).append(':').append(port); 231 return sb.toString(); 232 } 233 234 String key() { 235 return this.key; 236 } 237 238 void putConnection() { 239 client2.putConnection(this); 240 } 241 242 private static String toHexdump1(ByteBuffer bb) { 243 bb.mark(); 244 StringBuilder sb = new StringBuilder(512); 245 Formatter f = new Formatter(sb); 246 247 while (bb.hasRemaining()) { 248 int i = Byte.toUnsignedInt(bb.get()); 249 f.format("%02x:", i); 250 } 251 sb.deleteCharAt(sb.length()-1); 252 bb.reset(); 253 return sb.toString(); 254 } 255 256 private static String toHexdump(ByteBuffer bb) { 257 List<String> words = new ArrayList<>(); 258 int i = 0; 259 bb.mark(); 260 while (bb.hasRemaining()) { 261 if (i % 2 == 0) { 262 words.add(""); 263 } 264 byte b = bb.get(); 265 String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1); 266 words.set(i / 2, words.get(i / 2) + hex); 267 i++; 268 } 269 bb.reset(); 270 return words.stream().collect(Collectors.joining(" ")); 271 } 272 273 private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) { 274 boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); 275 276 ByteBuffer[] buffers = frame.getHeaderBlock(); 277 for (int i = 0; i < buffers.length; i++) { 278 hpackIn.decode(buffers[i], endOfHeaders && (i == buffers.length - 1), decoder); 279 } 280 } 281 282 int getInitialSendWindowSize() { 283 return serverSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE); 284 } 285 286 void close() { 287 GoAwayFrame f = new GoAwayFrame(); 288 f.setDebugData("Requested by user".getBytes()); 289 // TODO: set last stream. For now zero ok. 290 sendFrame(f); 291 } 292 293 // BufferHandler methods 294 295 @Override 296 public ByteBuffer getBuffer(int n) { 297 return client.getBuffer(n); 298 } 299 300 @Override 301 public void returnBuffer(ByteBuffer buf) { 302 client.returnBuffer(buf); 303 } 304 305 @Override 306 public void setMinBufferSize(int n) { 307 client.setMinBufferSize(n); 308 } 309 310 private final Object readlock = new Object(); 311 312 void asyncReceive(ByteBuffer buffer) { 313 synchronized (readlock) { 314 try { 315 if (reader == null) { 316 reader = new FrameReader(buffer); 317 } else { 318 reader.input(buffer); 319 } 320 while (true) { 321 if (reader.haveFrame()) { 322 List<ByteBuffer> buffers = reader.frame(); 323 324 ByteBufferConsumer bbc = new ByteBufferConsumer(buffers, this::getBuffer); 325 processFrame(bbc); 326 if (bbc.consumed()) { 327 reader = new FrameReader(); 328 return; 329 } else { 330 reader = new FrameReader(reader); 331 } 332 } else 333 return; 334 } 335 } catch (Throwable e) { 336 String msg = Utils.stackTrace(e); 337 Log.logTrace(msg); 338 shutdown(e); 339 } 340 } 341 } 342 343 void shutdown(Throwable t) { 344 Log.logError(t); 345 closed = true; 346 client2.deleteConnection(this); 347 List<Stream> c = new LinkedList<>(streams.values()); 348 for (Stream s : c) { 349 s.cancelImpl(t); 350 } 351 connection.close(); 352 } 353 354 /** 355 * Handles stream 0 (common) frames that apply to whole connection and passes 356 * other stream specific frames to that Stream object. 357 * 358 * Invokes Stream.incoming() which is expected to process frame without 359 * blocking. 360 */ 361 void processFrame(ByteBufferConsumer bbc) throws IOException, InterruptedException { 362 Http2Frame frame = Http2Frame.readIncoming(bbc); 363 Log.logFrames(frame, "IN"); 364 int streamid = frame.streamid(); 365 if (streamid == 0) { 366 handleCommonFrame(frame); 367 } else { 368 Stream stream = getStream(streamid); 369 if (stream == null) { 370 // should never receive a frame with unknown stream id 371 resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 372 } 373 if (frame instanceof PushPromiseFrame) { 374 PushPromiseFrame pp = (PushPromiseFrame)frame; 375 handlePushPromise(stream, pp); 376 } else if (frame instanceof HeaderFrame) { 377 // decode headers (or continuation) 378 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer()); 379 stream.incoming(frame); 380 } else 381 stream.incoming(frame); 382 } 383 } 384 385 private void handlePushPromise(Stream parent, PushPromiseFrame pp) 386 throws IOException, InterruptedException { 387 388 HttpRequestImpl parentReq = parent.request; 389 int promisedStreamid = pp.getPromisedStream(); 390 if (promisedStreamid != nextPushStream) { 391 resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); 392 return; 393 } else { 394 nextPushStream += 2; 395 } 396 HeaderDecoder decoder = new HeaderDecoder(); 397 decodeHeaders(pp, decoder); 398 HttpHeadersImpl headers = decoder.headers(); 399 HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); 400 401 Stream.PushedStream pushStream = createPushStream(parent, pushReq); 402 pushStream.registerStream(promisedStreamid); 403 parent.incoming_pushPromise(pushReq, pushStream); 404 } 405 406 private void handleCommonFrame(Http2Frame frame) 407 throws IOException, InterruptedException { 408 409 switch (frame.type()) { 410 case SettingsFrame.TYPE: 411 { SettingsFrame f = (SettingsFrame)frame; 412 handleSettings(f);} 413 break; 414 case PingFrame.TYPE: 415 { PingFrame f = (PingFrame)frame; 416 handlePing(f);} 417 break; 418 case GoAwayFrame.TYPE: 419 { GoAwayFrame f = (GoAwayFrame)frame; 420 handleGoAway(f);} 421 break; 422 case WindowUpdateFrame.TYPE: 423 { WindowUpdateFrame f = (WindowUpdateFrame)frame; 424 handleWindowUpdate(f);} 425 break; 426 default: 427 protocolError(ErrorFrame.PROTOCOL_ERROR); 428 } 429 } 430 431 void resetStream(int streamid, int code) throws IOException, InterruptedException { 432 Log.logError( 433 "Resetting stream {0,number,integer} with error code {1,number,integer}", 434 streamid, code); 435 ResetFrame frame = new ResetFrame(); 436 frame.streamid(streamid); 437 frame.setErrorCode(code); 438 sendFrame(frame); 439 streams.remove(streamid); 440 } 441 442 private void handleWindowUpdate(WindowUpdateFrame f) 443 throws IOException, InterruptedException { 444 connectionSendWindow.update(f.getUpdate()); 445 } 446 447 private void protocolError(int errorCode) 448 throws IOException, InterruptedException { 449 GoAwayFrame frame = new GoAwayFrame(); 450 frame.setErrorCode(errorCode); 451 sendFrame(frame); 452 String msg = "Error code: " + errorCode; 453 shutdown(new IOException("protocol error")); 454 } 455 456 private void handleSettings(SettingsFrame frame) 457 throws IOException, InterruptedException { 458 if (frame.getFlag(SettingsFrame.ACK)) { 459 // ignore ack frames for now. 460 return; 461 } 462 serverSettings = frame; 463 SettingsFrame ack = getAckFrame(frame.streamid()); 464 sendFrame(ack); 465 } 466 467 private void handlePing(PingFrame frame) 468 throws IOException, InterruptedException { 469 frame.setFlag(PingFrame.ACK); 470 sendUnorderedFrame(frame); 471 } 472 473 private void handleGoAway(GoAwayFrame frame) 474 throws IOException, InterruptedException { 475 //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode())); 476 shutdown(new IOException("GOAWAY received")); 477 } 478 479 private void initCommon() { 480 clientSettings = client2.getClientSettings(); 481 482 // serverSettings will be updated by server 483 serverSettings = SettingsFrame.getDefaultSettings(); 484 hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 485 hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 486 487 windowUpdater = new WindowUpdateSender(this, client2.client().getReceiveBufferSize()) { 488 @Override 489 int getStreamId() { 490 return 0; 491 } 492 }; 493 } 494 495 /** 496 * Max frame size we are allowed to send 497 */ 498 public int getMaxSendFrameSize() { 499 int param = serverSettings.getParameter(MAX_FRAME_SIZE); 500 if (param == -1) { 501 param = DEFAULT_FRAME_SIZE; 502 } 503 return param; 504 } 505 506 /** 507 * Max frame size we will receive 508 */ 509 public int getMaxReceiveFrameSize() { 510 return clientSettings.getParameter(MAX_FRAME_SIZE); 511 } 512 513 // Not sure how useful this is. 514 public int getMaxHeadersSize() { 515 return serverSettings.getParameter(MAX_HEADER_LIST_SIZE); 516 } 517 518 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 519 520 private static final byte[] PREFACE_BYTES = 521 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1); 522 523 /** 524 * Sends Connection preface and Settings frame with current preferred 525 * values 526 */ 527 private void sendConnectionPreface() throws IOException { 528 ByteBufferGenerator bg = new ByteBufferGenerator(this); 529 bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES); 530 SettingsFrame sf = client2.getClientSettings(); 531 Log.logFrames(sf, "OUT"); 532 sf.writeOutgoing(bg); 533 ByteBuffer[] ba = bg.getBufferArray(); 534 connection.write(ba, 0, ba.length); // write is performed before switch to async mode 535 // send a Window update for the receive buffer we are using 536 // minus the initial 64 K specified in protocol 537 windowUpdater.sendWindowUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1)); 538 } 539 540 /** 541 * Returns an existing Stream with given id, or null if doesn't exist 542 */ 543 Stream getStream(int streamid) { 544 return streams.get(streamid); 545 } 546 547 /** 548 * Creates Stream with given id. 549 */ 550 Stream createStream(Exchange exchange) { 551 Stream stream = new Stream(client, this, exchange); 552 return stream; 553 } 554 555 Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) { 556 Stream.PushGroup<?> pg = parent.request.pushGroup(); 557 return new Stream.PushedStream(pg, client, this, parent, pushReq); 558 } 559 560 void putStream(Stream stream, int streamid) { 561 streams.put(streamid, stream); 562 } 563 564 void deleteStream(Stream stream) { 565 streams.remove(stream.streamid); 566 } 567 568 static final int MAX_STREAM = Integer.MAX_VALUE - 2; 569 570 // Number of header bytes in a Headers Frame 571 final static int HEADERS_HEADER_SIZE = 15; 572 573 // Number of header bytes in a Continuation frame 574 final static int CONTIN_HEADER_SIZE = 9; 575 576 /** 577 * Encode the headers into a List<ByteBuffer> and then create HEADERS 578 * and CONTINUATION frames from the list and return the List<Http2Frame>. 579 * 580 * @param frame 581 * @return 582 */ 583 private LinkedList<Http2Frame> encodeHeaders(OutgoingHeaders frame) { 584 LinkedList<ByteBuffer> buffers = new LinkedList<>(); 585 ByteBuffer buf = getBuffer(); 586 buffers.add(buf); 587 encodeHeadersImpl(frame.stream.getRequestPseudoHeaders(), buffers); 588 encodeHeadersImpl(frame.getUserHeaders(), buffers); 589 encodeHeadersImpl(frame.getSystemHeaders(), buffers); 590 591 for (ByteBuffer b : buffers) { 592 b.flip(); 593 } 594 595 LinkedList<Http2Frame> frames = new LinkedList<>(); 596 int maxframesize = getMaxSendFrameSize(); 597 598 HeadersFrame oframe = new HeadersFrame(); 599 oframe.setFlags(frame.getFlags()); 600 oframe.streamid(frame.streamid()); 601 602 oframe.setHeaderBlock(getBufferArray(buffers, maxframesize)); 603 frames.add(oframe); 604 // Any buffers left? 605 boolean done = buffers.isEmpty(); 606 if (done) { 607 oframe.setFlag(HeaderFrame.END_HEADERS); 608 } else { 609 ContinuationFrame cf = null; 610 while (!done) { 611 cf = new ContinuationFrame(); 612 cf.streamid(frame.streamid()); 613 cf.setHeaderBlock(getBufferArray(buffers, maxframesize)); 614 frames.add(cf); 615 done = buffers.isEmpty(); 616 } 617 cf.setFlag(HeaderFrame.END_HEADERS); 618 } 619 return frames; 620 } 621 622 // should always return at least one buffer 623 private static ByteBuffer[] getBufferArray(LinkedList<ByteBuffer> list, int maxsize) { 624 assert maxsize >= BUFSIZE; 625 LinkedList<ByteBuffer> newlist = new LinkedList<>(); 626 int size = list.size(); 627 int nbytes = 0; 628 for (int i=0; i<size; i++) { 629 ByteBuffer buf = list.getFirst(); 630 if (nbytes + buf.remaining() <= maxsize) { 631 nbytes += buf.remaining(); 632 newlist.add(buf); 633 list.remove(); 634 } else { 635 break; 636 } 637 } 638 return newlist.toArray(empty); 639 } 640 641 /** 642 * Encode all the headers from the given HttpHeadersImpl into the given List. 643 */ 644 private void encodeHeadersImpl(HttpHeaders hdrs, LinkedList<ByteBuffer> buffers) { 645 ByteBuffer buffer; 646 if (!(buffer = buffers.getLast()).hasRemaining()) { 647 buffer = getBuffer(); 648 buffers.add(buffer); 649 } 650 for (Map.Entry<String, List<String>> e : hdrs.map().entrySet()) { 651 String key = e.getKey(); 652 String lkey = key.toLowerCase(); 653 List<String> values = e.getValue(); 654 for (String value : values) { 655 hpackOut.header(lkey, value); 656 boolean encoded = false; 657 do { 658 encoded = hpackOut.encode(buffer); 659 if (!encoded) { 660 buffer = getBuffer(); 661 buffers.add(buffer); 662 } 663 } while (!encoded); 664 } 665 } 666 } 667 668 static Throwable getExceptionFrom(CompletableFuture<?> cf) { 669 try { 670 cf.get(); 671 return null; 672 } catch (Throwable e) { 673 if (e.getCause() != null) 674 return e.getCause(); 675 else 676 return e; 677 } 678 } 679 680 681 void execute(Runnable r) { 682 executor.execute(r, null); 683 } 684 685 private final Object sendlock = new Object(); 686 687 /** 688 * 689 */ 690 void sendFrame(Http2Frame frame) { 691 synchronized (sendlock) { 692 try { 693 ByteBuffer[] bufs; 694 if (frame instanceof OutgoingHeaders) { 695 OutgoingHeaders oh = (OutgoingHeaders) frame; 696 Stream stream = oh.getStream(); 697 stream.registerStream(nextstreamid); 698 oh.streamid(nextstreamid); 699 nextstreamid += 2; 700 // set outgoing window here. This allows thread sending 701 // body to proceed. 702 stream.updateOutgoingWindow(getInitialSendWindowSize()); 703 bufs = encodeFrames(encodeHeaders(oh)); 704 } else { 705 bufs = encodeFrame(frame); 706 } 707 connectionAsync.writeAsync(bufs); 708 709 } catch (IOException e) { 710 if (!closed) { 711 Log.logError(e); 712 shutdown(e); 713 } 714 return; 715 } 716 } 717 connectionAsync.flushAsync(); 718 } 719 720 private ByteBuffer[] encodeFrame(Http2Frame frame) throws IOException { 721 ByteBufferGenerator bbg = new ByteBufferGenerator(this); 722 frame.computeLength(); 723 Log.logFrames(frame, "OUT"); 724 frame.writeOutgoing(bbg); 725 return bbg.getBufferList().toArray(new ByteBuffer[0]); 726 } 727 728 private ByteBuffer[] encodeFrames(List<Http2Frame> frames) throws IOException { 729 List<ByteBuffer> bufs = new ArrayList<>(); 730 for(Http2Frame frame : frames) { 731 ByteBufferGenerator bbg = new ByteBufferGenerator(this); 732 frame.computeLength(); 733 Log.logFrames(frame, "OUT"); 734 frame.writeOutgoing(bbg); 735 bufs.addAll(bbg.getBufferList()); 736 } 737 return bufs.toArray(new ByteBuffer[0]); 738 } 739 740 void sendDataFrame(DataFrame frame) { 741 try { 742 connectionAsync.writeAsync(encodeFrame(frame)); 743 connectionAsync.flushAsync(); 744 } catch (IOException e) { 745 if (!closed) { 746 Log.logError(e); 747 shutdown(e); 748 } 749 } 750 } 751 752 /* 753 * Direct call of the method bypasses synchronization on "sendlock" and 754 * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. 755 * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. 756 */ 757 void sendUnorderedFrame(Http2Frame frame){ 758 try { 759 connectionAsync.writeAsyncUnordered(encodeFrame(frame)); 760 connectionAsync.flushAsync(); 761 } catch (IOException e) { 762 if (!closed) { 763 Log.logError(e); 764 shutdown(e); 765 } 766 } 767 } 768 769 private SettingsFrame getAckFrame(int streamid) { 770 SettingsFrame frame = new SettingsFrame(); 771 frame.setFlag(SettingsFrame.ACK); 772 frame.streamid(streamid); 773 return frame; 774 } 775 776 static class HeaderDecoder implements DecodingCallback { 777 HttpHeadersImpl headers; 778 779 HeaderDecoder() { 780 this.headers = new HttpHeadersImpl(); 781 } 782 783 @Override 784 public void onDecoded(CharSequence name, CharSequence value) { 785 headers.addHeader(name.toString(), value.toString()); 786 } 787 788 HttpHeadersImpl headers() { 789 return headers; 790 } 791 } 792 }