1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.net.http;
  27 
  28 import java.io.IOException;
  29 import java.net.InetSocketAddress;
  30 import java.net.URI;
  31 import java.net.http.HttpConnection.Mode;
  32 import java.nio.ByteBuffer;
  33 import java.nio.charset.StandardCharsets;
  34 import java.util.HashMap;
  35 import java.util.LinkedList;
  36 import java.util.List;
  37 import java.util.Map;
  38 import java.util.concurrent.CompletableFuture;
  39 import sun.net.httpclient.hpack.Encoder;
  40 import sun.net.httpclient.hpack.Decoder;
  41 import static java.net.http.SettingsFrame.*;
  42 import static java.net.http.Utils.BUFSIZE;
  43 import java.util.ArrayList;
  44 import java.util.Collections;
  45 import java.util.Formatter;
  46 import java.util.stream.Collectors;
  47 import sun.net.httpclient.hpack.DecodingCallback;
  48 
  49 /**
  50  * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
  51  * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
  52  *
  53  * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
  54  * to a HttpClientImpl.
  55  *
  56  * Creation cases:
  57  * 1) upgraded HTTP/1.1 plain tcp connection
  58  * 2) prior knowledge directly created plain tcp connection
  59  * 3) directly created HTTP/2 SSL connection which uses ALPN.
  60  *
  61  * Sending is done by writing directly to underlying HttpConnection object which
  62  * is operating in async mode. No flow control applies on output at this level
  63  * and all writes are just executed as puts to an output Q belonging to HttpConnection
  64  * Flow control is implemented by HTTP/2 protocol itself.
  65  *
  66  * Hpack header compression
  67  * and outgoing stream creation is also done here, because these operations
  68  * must be synchronized at the socket level. Stream objects send frames simply
  69  * by placing them on the connection's output Queue. sendFrame() is called
  70  * from a higher level (Stream) thread.
  71  *
  72  * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
  73  * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
  74  * or handles them directly itself. This thread performs hpack decompression
  75  * and incoming stream creation (Server push). Incoming frames destined for a
  76  * stream are provided by calling Stream.incoming().
  77  */
  78 class Http2Connection implements BufferHandler {
  79 
  80     volatile boolean closed;
  81 
  82     //-------------------------------------
  83     final HttpConnection connection;
  84     final AsyncConnection connectionAsync;
  85     HttpClientImpl client;
  86     final Http2ClientImpl client2;
  87     Map<Integer,Stream> streams;
  88     int nextstreamid = 3; // stream 1 is registered separately
  89     int nextPushStream = 2;
  90     Encoder hpackOut;
  91     Decoder hpackIn;
  92     SettingsFrame clientSettings, serverSettings;
  93     final LinkedList<ByteBuffer> freeList;
  94     final String key; // for HttpClientImpl.connections map
  95     FrameReader reader;
  96 
  97     // Connection level flow control windows
  98     final WindowControl connectionSendWindow = new WindowControl(INITIAL_WINDOW_SIZE);
  99 
 100     final static int DEFAULT_FRAME_SIZE = 16 * 1024;
 101     private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY;
 102 
 103     final ExecutorWrapper executor;
 104 
 105     WindowUpdateSender windowUpdater;
 106     /**
 107      * This is established by the protocol spec and the peer will update it with
 108      * WINDOW_UPDATEs, which affects the connectionSendWindow.
 109      */
 110     final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
 111 
 112     // TODO: need list of control frames from other threads
 113     // that need to be sent
 114 
 115     /**
 116      * Case 1) Create from upgraded HTTP/1.1 connection.
 117      * Is ready to use. Will not be SSL. exchange is the Exchange
 118      * that initiated the connection, whose response will be delivered
 119      * on a Stream.
 120      */
 121     Http2Connection(HttpConnection connection, Http2ClientImpl client2,
 122             Exchange exchange) throws IOException, InterruptedException {
 123         String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available());
 124         Log.logTrace(msg);
 125 
 126         //this.initialExchange = exchange;
 127         assert !(connection instanceof SSLConnection);
 128         this.connection = connection;
 129         this.connectionAsync = (AsyncConnection)connection;
 130         this.client = client2.client();
 131         this.client2 = client2;
 132         this.executor = client.executorWrapper();
 133         this.freeList = new LinkedList<>();
 134         this.key = keyFor(connection);
 135         streams = Collections.synchronizedMap(new HashMap<>());
 136         initCommon();
 137         //sendConnectionPreface();
 138         Stream initialStream = createStream(exchange);
 139         initialStream.registerStream(1);
 140         initialStream.requestSent();
 141         sendConnectionPreface();
 142         // start reading and writing
 143         // start reading
 144         connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown);
 145         connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
 146         asyncReceive(connection.getRemaining());
 147         connectionAsync.startReading();
 148     }
 149 
 150     // async style but completes immediately
 151     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
 152             Http2ClientImpl client2, Exchange exchange) {
 153         CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
 154         try {
 155             Http2Connection c = new Http2Connection(connection, client2, exchange);
 156             cf.complete(c);
 157         } catch (IOException | InterruptedException e) {
 158             cf.completeExceptionally(e);
 159         }
 160         return cf;
 161     }
 162 
 163     /**
 164      * Cases 2) 3)
 165      *
 166      * request is request to be sent.
 167      */
 168     Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
 169         InetSocketAddress proxy = request.proxy();
 170         URI uri = request.uri();
 171         InetSocketAddress addr = Utils.getAddress(request);
 172         String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available());
 173         Log.logTrace(msg);
 174         this.key = keyFor(uri, proxy);
 175         this.connection = HttpConnection.getConnection(addr, request, this);
 176         this.connectionAsync = (AsyncConnection)connection;
 177         streams = Collections.synchronizedMap(new HashMap<>());
 178         this.client = request.client();
 179         this.client2 = client.client2();
 180         this.executor = client.executorWrapper();
 181         this.freeList = new LinkedList<>();
 182         nextstreamid = 1;
 183         initCommon();
 184         connection.connect();
 185         sendConnectionPreface();
 186         // start reading
 187         connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown);
 188         connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
 189 
 190         connectionAsync.startReading();
 191     }
 192 
 193     static String keyFor(HttpConnection connection) {
 194         boolean isProxy = connection.isProxied();
 195         boolean isSecure = connection.isSecure();
 196         InetSocketAddress addr = connection.address();
 197 
 198         return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
 199     }
 200 
 201     static String keyFor(URI uri, InetSocketAddress proxy) {
 202         boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
 203         boolean isProxy = proxy != null;
 204 
 205         String host;
 206         int port;
 207 
 208         if (isProxy) {
 209             host = proxy.getHostString();
 210             port = proxy.getPort();
 211         } else {
 212             host = uri.getHost();
 213             port = uri.getPort();
 214         }
 215         return keyString(isSecure, isProxy, host, port);
 216     }
 217 
 218     // {C,S}:{H:P}:host:port
 219     // C indicates clear text connection "http"
 220     // S indicates secure "https"
 221     // H indicates host (direct) connection
 222     // P indicates proxy
 223     // Eg: "S:H:foo.com:80"
 224     static String keyString(boolean secure, boolean proxy, String host, int port) {
 225         char c1 = secure ? 'S' : 'C';
 226         char c2 = proxy ? 'P' : 'H';
 227 
 228         StringBuilder sb = new StringBuilder(128);
 229         sb.append(c1).append(':').append(c2).append(':')
 230                 .append(host).append(':').append(port);
 231         return sb.toString();
 232     }
 233 
 234     String key() {
 235         return this.key;
 236     }
 237 
 238     void putConnection() {
 239         client2.putConnection(this);
 240     }
 241 
 242     private static String toHexdump1(ByteBuffer bb) {
 243         bb.mark();
 244         StringBuilder sb = new StringBuilder(512);
 245         Formatter f = new Formatter(sb);
 246 
 247         while (bb.hasRemaining()) {
 248             int i =  Byte.toUnsignedInt(bb.get());
 249             f.format("%02x:", i);
 250         }
 251         sb.deleteCharAt(sb.length()-1);
 252         bb.reset();
 253         return sb.toString();
 254     }
 255 
 256     private static String toHexdump(ByteBuffer bb) {
 257         List<String> words = new ArrayList<>();
 258         int i = 0;
 259         bb.mark();
 260         while (bb.hasRemaining()) {
 261             if (i % 2 == 0) {
 262                 words.add("");
 263             }
 264             byte b = bb.get();
 265             String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
 266             words.set(i / 2, words.get(i / 2) + hex);
 267             i++;
 268         }
 269         bb.reset();
 270         return words.stream().collect(Collectors.joining(" "));
 271     }
 272 
 273     private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
 274         boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
 275 
 276         ByteBuffer[] buffers = frame.getHeaderBlock();
 277         for (int i = 0; i < buffers.length; i++) {
 278             hpackIn.decode(buffers[i], endOfHeaders && (i == buffers.length - 1), decoder);
 279         }
 280     }
 281 
 282     int getInitialSendWindowSize() {
 283         return serverSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE);
 284     }
 285 
 286     void close() {
 287         GoAwayFrame f = new GoAwayFrame();
 288         f.setDebugData("Requested by user".getBytes());
 289         // TODO: set last stream. For now zero ok.
 290         sendFrame(f);
 291     }
 292 
 293     // BufferHandler methods
 294 
 295     @Override
 296     public ByteBuffer getBuffer(int n) {
 297         return client.getBuffer(n);
 298     }
 299 
 300     @Override
 301     public void returnBuffer(ByteBuffer buf) {
 302         client.returnBuffer(buf);
 303     }
 304 
 305     @Override
 306     public void setMinBufferSize(int n) {
 307         client.setMinBufferSize(n);
 308     }
 309 
 310     private final Object readlock = new Object();
 311 
 312     void asyncReceive(ByteBuffer buffer) {
 313         synchronized (readlock) {
 314             try {
 315                 if (reader == null) {
 316                     reader = new FrameReader(buffer);
 317                 } else {
 318                     reader.input(buffer);
 319                 }
 320                 while (true) {
 321                     if (reader.haveFrame()) {
 322                         List<ByteBuffer> buffers = reader.frame();
 323 
 324                         ByteBufferConsumer bbc = new ByteBufferConsumer(buffers, this::getBuffer);
 325                         processFrame(bbc);
 326                         if (bbc.consumed()) {
 327                             reader = new FrameReader();
 328                             return;
 329                         } else {
 330                             reader = new FrameReader(reader);
 331                         }
 332                     } else
 333                         return;
 334                 }
 335             } catch (Throwable e) {
 336                 String msg = Utils.stackTrace(e);
 337                 Log.logTrace(msg);
 338                 shutdown(e);
 339             }
 340         }
 341     }
 342 
 343     void shutdown(Throwable t) {
 344         Log.logError(t);
 345         closed = true;
 346         client2.deleteConnection(this);
 347         List<Stream> c = new LinkedList<>(streams.values());
 348         for (Stream s : c) {
 349             s.cancelImpl(t);
 350         }
 351         connection.close();
 352     }
 353 
 354     /**
 355      * Handles stream 0 (common) frames that apply to whole connection and passes
 356      * other stream specific frames to that Stream object.
 357      *
 358      * Invokes Stream.incoming() which is expected to process frame without
 359      * blocking.
 360      */
 361     void processFrame(ByteBufferConsumer bbc) throws IOException, InterruptedException {
 362         Http2Frame frame = Http2Frame.readIncoming(bbc);
 363         Log.logFrames(frame, "IN");
 364         int streamid = frame.streamid();
 365         if (streamid == 0) {
 366             handleCommonFrame(frame);
 367         } else {
 368             Stream stream = getStream(streamid);
 369             if (stream == null) {
 370                 // should never receive a frame with unknown stream id
 371                 resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 372             }
 373             if (frame instanceof PushPromiseFrame) {
 374                 PushPromiseFrame pp = (PushPromiseFrame)frame;
 375                 handlePushPromise(stream, pp);
 376             } else if (frame instanceof HeaderFrame) {
 377                 // decode headers (or continuation)
 378                 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
 379                 stream.incoming(frame);
 380             } else
 381                 stream.incoming(frame);
 382         }
 383     }
 384 
 385     private void handlePushPromise(Stream parent, PushPromiseFrame pp)
 386             throws IOException, InterruptedException {
 387 
 388         HttpRequestImpl parentReq = parent.request;
 389         int promisedStreamid = pp.getPromisedStream();
 390         if (promisedStreamid != nextPushStream) {
 391             resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
 392             return;
 393         } else {
 394             nextPushStream += 2;
 395         }
 396         HeaderDecoder decoder = new HeaderDecoder();
 397         decodeHeaders(pp, decoder);
 398         HttpHeadersImpl headers = decoder.headers();
 399         HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
 400 
 401         Stream.PushedStream pushStream = createPushStream(parent, pushReq);
 402         pushStream.registerStream(promisedStreamid);
 403         parent.incoming_pushPromise(pushReq, pushStream);
 404     }
 405 
 406     private void handleCommonFrame(Http2Frame frame)
 407             throws IOException, InterruptedException {
 408 
 409         switch (frame.type()) {
 410           case SettingsFrame.TYPE:
 411           { SettingsFrame f = (SettingsFrame)frame;
 412             handleSettings(f);}
 413             break;
 414           case PingFrame.TYPE:
 415           { PingFrame f = (PingFrame)frame;
 416             handlePing(f);}
 417             break;
 418           case GoAwayFrame.TYPE:
 419           { GoAwayFrame f = (GoAwayFrame)frame;
 420             handleGoAway(f);}
 421             break;
 422           case WindowUpdateFrame.TYPE:
 423           { WindowUpdateFrame f = (WindowUpdateFrame)frame;
 424             handleWindowUpdate(f);}
 425             break;
 426           default:
 427             protocolError(ErrorFrame.PROTOCOL_ERROR);
 428         }
 429     }
 430 
 431     void resetStream(int streamid, int code) throws IOException, InterruptedException {
 432         Log.logError(
 433             "Resetting stream {0,number,integer} with error code {1,number,integer}",
 434             streamid, code);
 435         ResetFrame frame = new ResetFrame();
 436         frame.streamid(streamid);
 437         frame.setErrorCode(code);
 438         sendFrame(frame);
 439         streams.remove(streamid);
 440     }
 441 
 442     private void handleWindowUpdate(WindowUpdateFrame f)
 443             throws IOException, InterruptedException {
 444         connectionSendWindow.update(f.getUpdate());
 445     }
 446 
 447     private void protocolError(int errorCode)
 448             throws IOException, InterruptedException {
 449         GoAwayFrame frame = new GoAwayFrame();
 450         frame.setErrorCode(errorCode);
 451         sendFrame(frame);
 452         String msg = "Error code: " + errorCode;
 453         shutdown(new IOException("protocol error"));
 454     }
 455 
 456     private void handleSettings(SettingsFrame frame)
 457             throws IOException, InterruptedException {
 458         if (frame.getFlag(SettingsFrame.ACK)) {
 459             // ignore ack frames for now.
 460             return;
 461         }
 462         serverSettings = frame;
 463         SettingsFrame ack = getAckFrame(frame.streamid());
 464         sendFrame(ack);
 465     }
 466 
 467     private void handlePing(PingFrame frame)
 468             throws IOException, InterruptedException {
 469         frame.setFlag(PingFrame.ACK);
 470         sendUnorderedFrame(frame);
 471     }
 472 
 473     private void handleGoAway(GoAwayFrame frame)
 474             throws IOException, InterruptedException {
 475         //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
 476         shutdown(new IOException("GOAWAY received"));
 477     }
 478 
 479     private void initCommon() {
 480         clientSettings = client2.getClientSettings();
 481 
 482         // serverSettings will be updated by server
 483         serverSettings = SettingsFrame.getDefaultSettings();
 484         hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 485         hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 486 
 487         windowUpdater = new WindowUpdateSender(this, client2.client().getReceiveBufferSize()) {
 488             @Override
 489             int getStreamId() {
 490                 return 0;
 491             }
 492         };
 493     }
 494 
 495     /**
 496      * Max frame size we are allowed to send
 497      */
 498     public int getMaxSendFrameSize() {
 499         int param = serverSettings.getParameter(MAX_FRAME_SIZE);
 500         if (param == -1) {
 501             param = DEFAULT_FRAME_SIZE;
 502         }
 503         return param;
 504     }
 505 
 506     /**
 507      * Max frame size we will receive
 508      */
 509     public int getMaxReceiveFrameSize() {
 510         return clientSettings.getParameter(MAX_FRAME_SIZE);
 511     }
 512 
 513     // Not sure how useful this is.
 514     public int getMaxHeadersSize() {
 515         return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
 516     }
 517 
 518     private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
 519 
 520     private static final byte[] PREFACE_BYTES =
 521         CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
 522 
 523     /**
 524      * Sends Connection preface and Settings frame with current preferred
 525      * values
 526      */
 527     private void sendConnectionPreface() throws IOException {
 528         ByteBufferGenerator bg = new ByteBufferGenerator(this);
 529         bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
 530         SettingsFrame sf = client2.getClientSettings();
 531         Log.logFrames(sf, "OUT");
 532         sf.writeOutgoing(bg);
 533         ByteBuffer[] ba = bg.getBufferArray();
 534         connection.write(ba, 0, ba.length); // write is performed before switch to async mode
 535         // send a Window update for the receive buffer we are using
 536         // minus the initial 64 K specified in protocol
 537         windowUpdater.sendWindowUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));
 538     }
 539 
 540     /**
 541      * Returns an existing Stream with given id, or null if doesn't exist
 542      */
 543     Stream getStream(int streamid) {
 544         return streams.get(streamid);
 545     }
 546 
 547     /**
 548      * Creates Stream with given id.
 549      */
 550     Stream createStream(Exchange exchange) {
 551         Stream stream = new Stream(client, this, exchange);
 552         return stream;
 553     }
 554 
 555     Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) {
 556         Stream.PushGroup<?> pg = parent.request.pushGroup();
 557         return new Stream.PushedStream(pg, client, this, parent, pushReq);
 558     }
 559 
 560     void putStream(Stream stream, int streamid) {
 561         streams.put(streamid, stream);
 562     }
 563 
 564     void deleteStream(Stream stream) {
 565         streams.remove(stream.streamid);
 566     }
 567 
 568     static final int MAX_STREAM = Integer.MAX_VALUE - 2;
 569 
 570     // Number of header bytes in a Headers Frame
 571     final static int HEADERS_HEADER_SIZE = 15;
 572 
 573     // Number of header bytes in a Continuation frame
 574     final static int CONTIN_HEADER_SIZE = 9;
 575 
 576     /**
 577      * Encode the headers into a List<ByteBuffer> and then create HEADERS
 578      * and CONTINUATION frames from the list and return the List<Http2Frame>.
 579      *
 580      * @param frame
 581      * @return
 582      */
 583     private LinkedList<Http2Frame> encodeHeaders(OutgoingHeaders frame) {
 584         LinkedList<ByteBuffer> buffers = new LinkedList<>();
 585         ByteBuffer buf = getBuffer();
 586         buffers.add(buf);
 587         encodeHeadersImpl(frame.stream.getRequestPseudoHeaders(), buffers);
 588         encodeHeadersImpl(frame.getUserHeaders(), buffers);
 589         encodeHeadersImpl(frame.getSystemHeaders(), buffers);
 590 
 591         for (ByteBuffer b : buffers) {
 592             b.flip();
 593         }
 594 
 595         LinkedList<Http2Frame> frames = new LinkedList<>();
 596         int maxframesize = getMaxSendFrameSize();
 597 
 598         HeadersFrame oframe = new HeadersFrame();
 599         oframe.setFlags(frame.getFlags());
 600         oframe.streamid(frame.streamid());
 601 
 602         oframe.setHeaderBlock(getBufferArray(buffers, maxframesize));
 603         frames.add(oframe);
 604         // Any buffers left?
 605         boolean done = buffers.isEmpty();
 606         if (done) {
 607             oframe.setFlag(HeaderFrame.END_HEADERS);
 608         } else {
 609             ContinuationFrame cf = null;
 610             while (!done) {
 611                 cf = new ContinuationFrame();
 612                 cf.streamid(frame.streamid());
 613                 cf.setHeaderBlock(getBufferArray(buffers, maxframesize));
 614                 frames.add(cf);
 615                 done = buffers.isEmpty();
 616             }
 617             cf.setFlag(HeaderFrame.END_HEADERS);
 618         }
 619         return frames;
 620     }
 621 
 622     // should always return at least one buffer
 623     private static ByteBuffer[] getBufferArray(LinkedList<ByteBuffer> list, int maxsize) {
 624         assert maxsize >= BUFSIZE;
 625         LinkedList<ByteBuffer> newlist = new LinkedList<>();
 626         int size = list.size();
 627         int nbytes = 0;
 628         for (int i=0; i<size; i++) {
 629             ByteBuffer buf = list.getFirst();
 630             if (nbytes + buf.remaining() <= maxsize) {
 631                 nbytes += buf.remaining();
 632                 newlist.add(buf);
 633                 list.remove();
 634             } else {
 635                 break;
 636             }
 637         }
 638         return newlist.toArray(empty);
 639     }
 640 
 641     /**
 642      * Encode all the headers from the given HttpHeadersImpl into the given List.
 643      */
 644     private void encodeHeadersImpl(HttpHeaders hdrs, LinkedList<ByteBuffer> buffers) {
 645         ByteBuffer buffer;
 646         if (!(buffer = buffers.getLast()).hasRemaining()) {
 647             buffer = getBuffer();
 648             buffers.add(buffer);
 649         }
 650         for (Map.Entry<String, List<String>> e : hdrs.map().entrySet()) {
 651             String key = e.getKey();
 652             String lkey = key.toLowerCase();
 653             List<String> values = e.getValue();
 654             for (String value : values) {
 655                 hpackOut.header(lkey, value);
 656                 boolean encoded = false;
 657                 do {
 658                     encoded = hpackOut.encode(buffer);
 659                     if (!encoded) {
 660                         buffer = getBuffer();
 661                         buffers.add(buffer);
 662                     }
 663                 } while (!encoded);
 664             }
 665         }
 666     }
 667 
 668     static Throwable getExceptionFrom(CompletableFuture<?> cf) {
 669         try {
 670             cf.get();
 671             return null;
 672         } catch (Throwable e) {
 673             if (e.getCause() != null)
 674                 return e.getCause();
 675             else
 676                 return e;
 677         }
 678     }
 679 
 680 
 681     void execute(Runnable r) {
 682         executor.execute(r, null);
 683     }
 684 
 685     private final Object sendlock = new Object();
 686 
 687     /**
 688      *
 689      */
 690     void sendFrame(Http2Frame frame) {
 691         synchronized (sendlock) {
 692             try {
 693                 ByteBuffer[] bufs;
 694                 if (frame instanceof OutgoingHeaders) {
 695                     OutgoingHeaders oh = (OutgoingHeaders) frame;
 696                     Stream stream = oh.getStream();
 697                     stream.registerStream(nextstreamid);
 698                     oh.streamid(nextstreamid);
 699                     nextstreamid += 2;
 700                     // set outgoing window here. This allows thread sending
 701                     // body to proceed.
 702                     stream.updateOutgoingWindow(getInitialSendWindowSize());
 703                     bufs = encodeFrames(encodeHeaders(oh));
 704                 } else {
 705                     bufs = encodeFrame(frame);
 706                 }
 707                 connectionAsync.writeAsync(bufs);
 708 
 709             } catch (IOException e) {
 710                 if (!closed) {
 711                     Log.logError(e);
 712                     shutdown(e);
 713                 }
 714                 return;
 715             }
 716         }
 717         connectionAsync.flushAsync();
 718     }
 719 
 720     private ByteBuffer[] encodeFrame(Http2Frame frame) throws IOException {
 721         ByteBufferGenerator bbg = new ByteBufferGenerator(this);
 722         frame.computeLength();
 723         Log.logFrames(frame, "OUT");
 724         frame.writeOutgoing(bbg);
 725         return bbg.getBufferList().toArray(new ByteBuffer[0]);
 726     }
 727 
 728     private ByteBuffer[] encodeFrames(List<Http2Frame> frames) throws IOException {
 729         List<ByteBuffer> bufs = new ArrayList<>();
 730         for(Http2Frame frame : frames) {
 731             ByteBufferGenerator bbg = new ByteBufferGenerator(this);
 732             frame.computeLength();
 733             Log.logFrames(frame, "OUT");
 734             frame.writeOutgoing(bbg);
 735             bufs.addAll(bbg.getBufferList());
 736         }
 737         return bufs.toArray(new ByteBuffer[0]);
 738     }
 739 
 740     void sendDataFrame(DataFrame frame) {
 741         try {
 742             connectionAsync.writeAsync(encodeFrame(frame));
 743             connectionAsync.flushAsync();
 744         } catch (IOException e) {
 745             if (!closed) {
 746                 Log.logError(e);
 747                 shutdown(e);
 748             }
 749         }
 750     }
 751 
 752     /*
 753      * Direct call of the method bypasses synchronization on "sendlock" and
 754      * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
 755      * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
 756      */
 757     void sendUnorderedFrame(Http2Frame frame){
 758         try {
 759             connectionAsync.writeAsyncUnordered(encodeFrame(frame));
 760             connectionAsync.flushAsync();
 761         } catch (IOException e) {
 762             if (!closed) {
 763                 Log.logError(e);
 764                 shutdown(e);
 765             }
 766         }
 767     }
 768 
 769     private SettingsFrame getAckFrame(int streamid) {
 770         SettingsFrame frame = new SettingsFrame();
 771         frame.setFlag(SettingsFrame.ACK);
 772         frame.streamid(streamid);
 773         return frame;
 774     }
 775 
 776     static class HeaderDecoder implements DecodingCallback {
 777         HttpHeadersImpl headers;
 778 
 779         HeaderDecoder() {
 780             this.headers = new HttpHeadersImpl();
 781         }
 782 
 783         @Override
 784         public void onDecoded(CharSequence name, CharSequence value) {
 785             headers.addHeader(name.toString(), value.toString());
 786         }
 787 
 788         HttpHeadersImpl headers() {
 789             return headers;
 790         }
 791     }
 792 }