< prev index next >
   1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  */
  24 package java.net.http;
  25 
  26 import java.io.IOException;
  27 import java.net.InetSocketAddress;
  28 import java.net.URI;
  29 import java.nio.ByteBuffer;
  30 import java.nio.charset.StandardCharsets;
  31 import java.util.Collection;
  32 import java.util.HashMap;
  33 import java.util.LinkedList;
  34 import java.util.List;
  35 import java.util.Map;
  36 import java.util.Set;
  37 import java.util.concurrent.CompletableFuture;
  38 import sun.net.httpclient.hpack.Encoder;
  39 import sun.net.httpclient.hpack.Decoder;
  40 import static java.net.http.SettingsFrame.*;
  41 import static java.net.http.Utils.BUFSIZE;
  42 import java.util.ArrayList;
  43 import java.util.Collections;
  44 import java.util.Formatter;
  45 import java.util.stream.Collectors;
  46 import sun.net.httpclient.hpack.DecodingCallback;
  47 
  48 /**
  49  * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
  50  * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
  51  *
  52  * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
  53  * to a HttpClientImpl.
  54  *
  55  * Creation cases:
  56  * 1) upgraded HTTP/1.1 plain tcp connection
  57  * 2) prior knowledge directly created plain tcp connection
  58  * 3) directly created HTTP/2 SSL connection which uses ALPN.
  59  *
  60  * Each Http2Connection uses a thread for reading the socket and another
  61  * for writing to the socket.
  62  * 
  63  * Sending is done in the sendLoop() method where it reads frames off a blocking
  64  * queue (Queue class) and writes them to the socket. Hpack header compression
  65  * and outgoing stream creation is also done here, because these operations
  66  * must be synchronized at the socket level. Stream objects send frames simply
  67  * by placing them on the connection's output Queue.
  68  * 
  69  * Another thread runs readLoop() which assembles incoming Http2Frames, and 
  70  * directs them to the appropriate Stream.incoming() or handles them directly
  71  * itself. This thread performs hpack decompression and incoming stream creation
  72  * (Server push). Incoming frames destined for a stream are provided by calling
  73  * Stream.incoming().
  74  */
  75 class Http2Connection implements BufferHandler {
  76 
  77     final Queue outputQ;
  78     volatile boolean closed;
  79     
  80     //-------------------------------------
  81     final HttpConnection connection;
  82     HttpClientImpl client;
  83     final Http2ClientImpl client2;
  84     Map<Integer,Stream> streams;
  85     int nextstreamid = 3; // stream 1 is registered separately
  86     int nextPushStream = 2;
  87     Encoder hpackOut;
  88     Decoder hpackIn;
  89     SettingsFrame clientSettings, serverSettings;
  90     ByteBufferConsumer bbc;
  91     final LinkedList<ByteBuffer> freeList;
  92     final String key; // for HttpClientImpl.connections map
  93     FrameReader reader;
  94     
  95     // Connection level flow control windows
  96     int sendWindow = INITIAL_WINDOW_SIZE;
  97     
  98     final static int DEFAULT_FRAME_SIZE = 16 * 1024;
  99     private static ByteBuffer[] empty = new ByteBuffer[0];
 100     
 101     final ExecutorWrapper executor;
 102 
 103     /**
 104      * This is established by the protocol spec and the peer will update it with
 105      * WINDOW_UPDATEs, which affects the sendWindow.
 106      */
 107     final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
 108 
 109     // TODO: need list of control frames from other threads
 110     // that need to be sent
 111 
 112     /**
 113      * Case 1) Create from upgraded HTTP/1.1 connection.
 114      * Is ready to use. Will not be SSL. exchange is the Exchange
 115      * that initiated the connection, whose response will be delivered
 116      * on a Stream.
 117      */
 118     Http2Connection(HttpConnection connection, Http2ClientImpl client2, 
 119             Exchange exchange) throws IOException, InterruptedException {
 120         this.outputQ = new Queue();
 121         
 122         //this.initialExchange = exchange;
 123         assert !(connection instanceof SSLConnection);
 124         this.connection = connection;
 125         this.client = client2.client;
 126         this.client2 = client2;
 127         this.executor = client.executorWrapper();
 128         this.freeList = new LinkedList<>();
 129         this.key = keyFor(connection);
 130         streams = Collections.synchronizedMap(new HashMap<>());
 131         initCommon();
 132         sendConnectionPreface();
 133         Stream initialStream = createStream(exchange);
 134         initialStream.registerStream(1);
 135         initialStream.requestSent();
 136         connection.configureBlocking(true);
 137         // start reader and writer
 138         executor.execute(() -> {
 139             readLoop(connection.getRemaining());
 140         }, null);
 141 
 142         executor.execute(() -> {
 143             sendLoop();
 144         }, null);
 145     }
 146 
 147     // async style but completes immediately
 148     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, 
 149             Http2ClientImpl client2, Exchange exchange) {
 150         CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
 151         try {
 152             Http2Connection c = new Http2Connection(connection, client2, exchange);
 153             cf.complete(c);
 154         } catch (IOException | InterruptedException e) {
 155             cf.completeExceptionally(e);
 156         }
 157         return cf;
 158     }
 159     
 160     /**
 161      * Cases 2) 3)
 162      * 
 163      * request is request to be sent.
 164      */
 165     Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
 166         InetSocketAddress proxy = request.proxy();
 167         URI uri = request.uri();
 168         String host = uri.getHost();
 169         int port = uri.getPort();
 170         InetSocketAddress addr = (proxy == null)
 171             ? new InetSocketAddress(host, port)
 172             : proxy;
 173 
 174         this.key = keyFor(uri, proxy);
 175         this.connection = HttpConnection.getConnection(addr, request);
 176 
 177         streams = Collections.synchronizedMap(new HashMap<>());
 178         this.client = request.client();
 179         this.client2 = client.client2();
 180         this.executor = client.executorWrapper();
 181         this.freeList = new LinkedList<>();
 182         this.outputQ = new Queue();
 183         nextstreamid = 1;
 184         initCommon();
 185         connection.connect();
 186         connection.configureBlocking(true);
 187         sendConnectionPreface();
 188         // start reader and writer
 189         executor.execute(() -> {
 190             readLoop(connection.getRemaining());
 191         }, null);
 192         
 193         executor.execute(() -> {
 194             sendLoop();
 195         }, null);
 196     }
 197         
 198     // NEW
 199     synchronized void obtainSendWindow(int amount) throws InterruptedException {
 200         while (amount > 0) {
 201             int n = Math.min(amount, sendWindow);
 202             sendWindow -= n;
 203             amount -= n;
 204             if (amount > 0)
 205                 wait();
 206         }
 207     }
 208     
 209     synchronized void updateSendWindow(int amount) {
 210         if (sendWindow == 0) {
 211             sendWindow += amount;
 212             notify();
 213         } else
 214             sendWindow += amount;
 215     }
 216     
 217     synchronized int sendWindow() {
 218         return sendWindow;
 219     }
 220     
 221     static String keyFor(HttpConnection connection) {
 222         boolean isProxy = connection.isProxied();
 223         boolean isSecure = connection.isSecure();
 224         InetSocketAddress addr = connection.address();
 225         
 226         return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
 227     }
 228     
 229     static String keyFor(URI uri, InetSocketAddress proxy) {
 230         boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
 231         boolean isProxy = proxy != null;
 232         
 233         String host;
 234         int port;
 235         
 236         if (isProxy) {
 237             host = proxy.getHostString();
 238             port = proxy.getPort();
 239         } else {
 240             host = uri.getHost();
 241             port = uri.getPort();
 242         }
 243         return keyString(isSecure, isProxy, host, port);
 244     }
 245     
 246     // {C,S}:{H:P}:host:port
 247     // C indicates clear text connection "http"
 248     // S indicates secure "https"
 249     // H indicates host (direct) connection
 250     // P indicates proxy
 251     // Eg: "S:H:foo.com:80"
 252     static String keyString(boolean secure, boolean proxy, String host, int port) {
 253         char c1 = secure ? 'S' : 'C';
 254         char c2 = proxy ? 'P' : 'H';
 255                
 256         StringBuilder sb = new StringBuilder(128);
 257         sb.append(c1).append(':').append(c2).append(':')
 258                 .append(host).append(':').append(port);
 259         return sb.toString();
 260     }
 261     
 262     String key() {
 263         return this.key;
 264     }
 265     
 266     void putConnection() {
 267         client2.putConnection(this);
 268     }
 269     
 270     private static String toHexdump1(ByteBuffer bb) {
 271         bb.mark();
 272         StringBuilder sb = new StringBuilder(512);
 273         Formatter f = new Formatter(sb);
 274         
 275         while (bb.hasRemaining()) {
 276             int i =  Byte.toUnsignedInt(bb.get());
 277             f.format("%02x:", i);
 278         }
 279         sb.deleteCharAt(sb.length()-1);
 280         bb.reset();
 281         return sb.toString();
 282     }
 283     
 284     private static String toHexdump(ByteBuffer bb) {
 285         List<String> words = new ArrayList<>();
 286         int i = 0;
 287         bb.mark();
 288         while (bb.hasRemaining()) {
 289             if (i % 2 == 0) {
 290                 words.add("");
 291             }
 292             byte b = bb.get();
 293             String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
 294             words.set(i / 2, words.get(i / 2) + hex);
 295             i++;
 296         }
 297         bb.reset();
 298         return words.stream().collect(Collectors.joining(" "));
 299     }
 300 
 301     private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
 302         boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
 303         
 304         ByteBuffer[] buffers = frame.getHeaderBlock();
 305         for (int i = 0; i < buffers.length; i++) {
 306             hpackIn.decode(buffers[i], decoder, endOfHeaders);
 307         }
 308     }
 309 
 310     int getInitialSendWindowSize() {
 311         return serverSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE);
 312     }
 313 
 314 
 315     @Override
 316     public void returnBuffer(ByteBuffer buf) {
 317         client.returnBuffer(buf);
 318     }
 319     
 320     void close() {
 321         GoAwayFrame f = new GoAwayFrame();
 322         f.setDebugData("Requested by user".getBytes());
 323         // TODO: set last stream. For now zero ok.
 324         try {
 325             sendFrame(f);
 326         } catch (IOException | InterruptedException e) {} 
 327         // Shouldn't throw InterruptedException as it can only happen
 328         // if the socket send blocks which it shouldn't.
 329         // connection.close(); <-- need to wait for ack and confirmation
 330         // that requests completed.
 331     }
 332     
 333     @Override
 334     public ByteBuffer getBuffer() {
 335         return client.getBuffer();
 336     }
 337     
 338     /**
 339      * Runs in one thread continuously reading off connection in blocking mode.
 340      * 
 341      * @param initial 
 342      */
 343     private void readLoop(ByteBuffer initial) {
 344         try {
 345             int n;
 346             // initialize frame reader if necessary
 347             if (reader == null) {
 348                 reader = new FrameReader(initial);
 349             } else if (initial != null) {
 350                 reader.input(initial);
 351             }
 352 
 353             while (!closed) {
 354                 while (!reader.haveFrame()) {
 355                     ByteBuffer b = getBuffer();
 356                     n = connection.read(b);
 357                     if (n == -1) {
 358                         throw new IOException("Connection closed");
 359                     }
 360                     reader.input(b);
 361                 }
 362                 List<ByteBuffer> buffers = reader.frame();
 363 
 364                 ByteBufferConsumer bbc = new ByteBufferConsumer(buffers, this::getBuffer);
 365                 processFrame(bbc);
 366                 if (bbc.consumed()) {
 367                     reader = new FrameReader();
 368                 } else {
 369                     // buffers will only contain the left-over
 370                     reader = new FrameReader(buffers);
 371                 }
 372             }
 373         } catch (IOException e) {
 374             String msg = Utils.stackTrace(e);
 375             Log.logTrace(msg);
 376             shutdown(e);
 377         } catch (Throwable t) {
 378             String msg = Utils.stackTrace(t);
 379             Log.logTrace(msg);
 380             shutdown(t);
 381         }
 382     }
 383 
 384     void shutdown(Throwable t) {
 385         System.err.println("Shutdown: " + t);
 386         closed = true;
 387         client2.deleteConnection(this);
 388         Collection<Stream> c = streams.values();
 389         for (Stream s : c) {
 390             s.cancelImpl(t);
 391         }
 392         connection.close();
 393     }
 394     
 395     /**
 396      * Handles stream 0 (common) frames that apply to whole connection and passes
 397      * other stream specific frames to that Stream object.
 398      * 
 399      * Invokes Stream.incoming() which is expected to process frame without
 400      * blocking.
 401      * 
 402      * @param bbc
 403      * @throws IOException
 404      * @throws InterruptedException 
 405      */
 406     void processFrame(ByteBufferConsumer bbc) throws IOException, InterruptedException {
 407         Http2Frame frame = Http2Frame.readIncoming(bbc);
 408         Log.logFrames(frame, "IN");
 409         int streamid = frame.streamid();
 410         if (streamid == 0) {
 411             handleCommonFrame(frame);
 412         } else {
 413             Stream stream = getStream(streamid);
 414             if (stream == null) {
 415                 // should never receive a frame with unknown stream id
 416                 resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 417             }
 418             if (frame instanceof PushPromiseFrame) {
 419                 PushPromiseFrame pp = (PushPromiseFrame)frame;
 420                 handlePushPromise(stream, pp);
 421             } else if (frame instanceof HeaderFrame) {
 422                 // decode headers (or continuation)
 423                 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
 424                 stream.incoming(frame);
 425             } else
 426                 stream.incoming(frame);
 427         }
 428     }
 429             
 430     private void handlePushPromise(Stream parent, PushPromiseFrame pp) throws IOException, InterruptedException {
 431         HttpRequestImpl parentReq = parent.request;
 432         int promisedStreamid = pp.getPromisedStream();
 433         if (promisedStreamid != nextPushStream) {
 434             resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
 435             return;
 436         } else {
 437             nextPushStream += 2;
 438         }
 439         HeaderDecoder decoder = new HeaderDecoder();
 440         decodeHeaders(pp, decoder);
 441         HttpHeadersImpl headers = decoder.headers();
 442         HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
 443         
 444         Stream.PushedStream pushStream = createPushStream(parent, pushReq);
 445         pushStream.registerStream(promisedStreamid);
 446         parent.incoming_pushPromise(pushReq, pushStream);
 447     }
 448     
 449     private void handleCommonFrame(Http2Frame frame) throws IOException, InterruptedException {
 450         switch (frame.type()) {
 451           case SettingsFrame.TYPE:
 452           { SettingsFrame f = (SettingsFrame)frame;
 453             handleSettings(f);}
 454             break;
 455           case PingFrame.TYPE:
 456           { PingFrame f = (PingFrame)frame;
 457             handlePing(f);}
 458             break;
 459           case GoAwayFrame.TYPE:
 460           { GoAwayFrame f = (GoAwayFrame)frame;
 461             handleGoAway(f);}
 462             break;
 463           case WindowUpdateFrame.TYPE:
 464           { WindowUpdateFrame f = (WindowUpdateFrame)frame;
 465             handleWindowUpdate(f);}
 466             break;
 467           default:
 468             protocolError(ErrorFrame.PROTOCOL_ERROR);
 469         }
 470     }
 471     
 472     void resetStream(int streamid, int code) throws IOException, InterruptedException {
 473         Log.logError("Resetting stream %d with error code %d", streamid, code);
 474         ResetFrame frame = new ResetFrame();
 475         frame.streamid(streamid);
 476         frame.setErrorCode(code);
 477         sendFrame(frame);
 478         streams.remove(streamid);
 479     }
 480 
 481     private void handleWindowUpdate(WindowUpdateFrame f) throws IOException, InterruptedException {
 482         updateSendWindow(f.getUpdate());
 483     }
 484 
 485     private void protocolError(int errorCode) throws IOException, InterruptedException {
 486         GoAwayFrame frame = new GoAwayFrame();
 487         frame.setErrorCode(errorCode);
 488         sendFrame(frame);
 489         String msg = "Error code: " + errorCode;
 490         shutdown(new IOException("protocol error"));
 491     }
 492 
 493     private void handleSettings(SettingsFrame frame) throws IOException, InterruptedException {
 494         if (frame.getFlag(SettingsFrame.ACK)) {
 495             // ignore ack frames for now.
 496             return;
 497         }
 498         serverSettings = frame;
 499         SettingsFrame ack = getAckFrame(frame.streamid());
 500         sendFrame(ack);
 501     }
 502 
 503     private void handlePing(PingFrame frame) throws IOException, InterruptedException {
 504         frame.setFlag(PingFrame.ACK);
 505         sendFrame(frame);
 506     }
 507 
 508     private void handleGoAway(GoAwayFrame frame) throws IOException, InterruptedException {
 509         System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
 510         shutdown(new IOException("GOAWAY received"));
 511     }
 512 
 513     private void initCommon() {
 514         clientSettings = client2.getClientSettings();
 515 
 516         // serverSettings will be updated by server
 517         serverSettings = SettingsFrame.getDefaultSettings();
 518         hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 519         hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 520     }
 521 
 522     /**
 523      * Max frame size we are allowed to send
 524      */
 525     public int getMaxSendFrameSize() {
 526         int param = serverSettings.getParameter(MAX_FRAME_SIZE);
 527         if (param == -1) {
 528             param = DEFAULT_FRAME_SIZE;
 529         }
 530         return param;
 531     }
 532 
 533     /**
 534      * Max frame size we will receive
 535      */
 536     public int getMaxReceiveFrameSize() {
 537         return clientSettings.getParameter(MAX_FRAME_SIZE);
 538     }
 539 
 540     // Not sure how useful this is.
 541     public int getMaxHeadersSize() {
 542         return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
 543     }
 544 
 545     private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
 546 
 547     private static final byte[] PREFACE_BYTES = 
 548         CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
 549 
 550     /**
 551      * Sends Connection preface and Settings frame with current preferred
 552      * values
 553      */
 554     private void sendConnectionPreface() throws IOException {
 555         SettingsFrame sf = client2.getClientSettings();
 556         ByteBufferGenerator bg = new ByteBufferGenerator(this);
 557         bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
 558         sf.writeOutgoing(bg);
 559         Log.logFrames(sf, "OUT");
 560         WindowUpdateFrame wup = new WindowUpdateFrame();
 561         wup.streamid(0);
 562         // send a Window update for the receive buffer we are using
 563         // minus the initial 64 K specified in protocol
 564         wup.setUpdate(client2.client.getReceiveBufferSize() - (64 * 1024 - 1));
 565         wup.setLength();
 566         wup.writeOutgoing(bg);
 567         Log.logFrames(wup, "OUT");
 568         ByteBuffer[] ba = bg.getBufferArray();
 569         connection.write(ba, 0, ba.length);
 570     }
 571 
 572     /**
 573      * Returns an existing Stream with given id, or null if doesn't exist
 574      */
 575     Stream getStream(int streamid) {
 576         return streams.get(streamid);
 577     }
 578 
 579     /**
 580      * Creates Stream with given id.
 581      */
 582     Stream createStream(Exchange exchange) {
 583         Stream stream = new Stream(client, this, exchange);
 584         return stream;
 585     }
 586 
 587     Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) {
 588         Stream.PushGroup<?> pg = parent.request.pushGroup();
 589         return new Stream.PushedStream(pg, client, this, parent, pushReq);
 590     }
 591     
 592     void putStream(Stream stream, int streamid) {
 593         streams.put(streamid, stream);        
 594     }
 595     
 596     void deleteStream(Stream stream) {
 597         streams.remove(stream.streamid);
 598     }
 599     
 600     static final int MAX_STREAM = Integer.MAX_VALUE - 2;
 601 
 602     // Number of header bytes in a Headers Frame
 603     final static int HEADERS_HEADER_SIZE = 15;
 604 
 605     // Number of header bytes in a Continuation frame
 606     final static int CONTIN_HEADER_SIZE = 9;
 607 
 608     /**
 609      * Encode the headers into a List<ByteBuffer> and then create HEADERS
 610      * and CONTINUATION frames from the list and return the List<Http2Frame>.
 611      * 
 612      * @param frame
 613      * @return 
 614      */
 615     private LinkedList<Http2Frame> encodeHeaders(OutgoingHeaders frame) {
 616         LinkedList<ByteBuffer> buffers = new LinkedList<>();
 617         ByteBuffer buf = getBuffer();
 618         buffers.add(buf);
 619         encodeHeadersImpl(frame.stream.getRequestPseudoHeaders(), buffers);
 620         encodeHeadersImpl(frame.getUserHeaders(), buffers);
 621         encodeHeadersImpl(frame.getSystemHeaders(), buffers);
 622         
 623         for (ByteBuffer b : buffers) {
 624             b.flip();
 625         }
 626 
 627         LinkedList<Http2Frame> frames = new LinkedList<>();
 628         int maxframesize = getMaxSendFrameSize();
 629 
 630         HeadersFrame oframe = new HeadersFrame();
 631         oframe.setFlags(frame.getFlags());
 632         oframe.streamid(frame.streamid());
 633 
 634         oframe.setHeaderBlock(getBufferArray(buffers, maxframesize));
 635         frames.add(oframe);  
 636         // Any buffers left?
 637         boolean done = buffers.isEmpty();
 638         if (done) {
 639             oframe.setFlag(HeaderFrame.END_HEADERS);
 640         } else {
 641             ContinuationFrame cf = null;
 642             while (!done) {
 643                 cf = new ContinuationFrame();
 644                 cf.streamid(frame.streamid());
 645                 cf.setHeaderBlock(getBufferArray(buffers, maxframesize));
 646                 frames.add(cf);
 647                 done = buffers.isEmpty();
 648             }
 649             cf.setFlag(HeaderFrame.END_HEADERS);
 650         }
 651         return frames;
 652     }
 653 
 654     // should always return at least one buffer
 655     private static ByteBuffer[] getBufferArray(LinkedList<ByteBuffer> list, int maxsize) {
 656         assert maxsize >= BUFSIZE;
 657         LinkedList<ByteBuffer> newlist = new LinkedList<>();
 658         int size = list.size();
 659         int nbytes = 0;
 660         for (int i=0; i<size; i++) {
 661             ByteBuffer buf = list.getFirst();
 662             if (nbytes + buf.remaining() <= maxsize) {
 663                 nbytes += buf.remaining();
 664                 newlist.add(buf);
 665                 list.remove();
 666             } else {
 667                 break;
 668             }
 669         }
 670         return newlist.toArray(empty);
 671     }
 672 
 673     /**
 674      * Encode all the headers from the given HttpHeadersImpl into the given List.
 675      */
 676     private void encodeHeadersImpl(HttpHeaders hdrs, LinkedList<ByteBuffer> buffers) {
 677         ByteBuffer buffer;
 678         if (!(buffer=buffers.getLast()).hasRemaining()) {
 679             buffer = getBuffer();
 680             buffers.add(buffer);
 681         }
 682         Map<String,List<String>> map = hdrs.map();
 683         Set<String> keys = map.keySet();
 684         for (String key : keys) {
 685             String lkey = key.toLowerCase();
 686             List<String> values = map.get(key);
 687             for (String value : values) {
 688                 boolean encoded = false;
 689                 do {
 690                     encoded = hpackOut.header(lkey, value).encode(buffer);
 691                     if (!encoded) {
 692                         buffer = getBuffer();
 693                         buffers.add(buffer);
 694                     }
 695                 } while(!encoded);
 696             }
 697         }
 698     } 
 699 
 700     public void sendFrame(Http2Frame frame) throws IOException, InterruptedException {
 701         outputQ.put(frame);
 702     }
 703         
 704     public void sendFrames(List<Http2Frame> frames) throws IOException, InterruptedException {
 705         synchronized (outputQ) {
 706             for (Http2Frame frame : frames)
 707                 outputQ.put(frame);
 708         }
 709     }
 710     
 711     static Throwable getExceptionFrom(CompletableFuture<?> cf) {
 712         try {
 713             cf.get();
 714             return null;
 715         } catch (Throwable e) {
 716             if (e.getCause() != null)
 717                 return e.getCause();
 718             else
 719                 return e;
 720         }
 721     }
 722 
 723 
 724     void execute(Runnable r) {
 725         executor.execute(r, null);
 726     }
 727     
 728     
 729     /**
 730      * Take frames off outputQ and send them. Runs 
 731      */
 732     private void sendLoop() {
 733         try {
 734             while (!closed) {
 735                 Http2Frame frame = outputQ.take();
 736                 if (frame instanceof OutgoingHeaders) {
 737                     OutgoingHeaders oh = (OutgoingHeaders)frame;
 738                     Stream stream = oh.getStream();
 739                     stream.registerStream(nextstreamid);
 740                     oh.streamid(nextstreamid);
 741                     nextstreamid += 2;
 742                     // set outgoing window here. This allows thread sending
 743                     // body to proceed.
 744                     stream.updateOutgoingWindow(getInitialSendWindowSize());
 745                     LinkedList<Http2Frame> frames = encodeHeaders(oh);
 746                     for (Http2Frame f : frames) {
 747                         sendOneFrame(f);
 748                     }
 749                 } else {
 750                     sendOneFrame(frame);
 751                 }
 752             }
 753         } catch (IOException e) {
 754             if (!closed) {
 755                 Log.logError(e);
 756                 shutdown(e);
 757             }
 758         }
 759     }
 760 
 761     private void sendOneFrame(Http2Frame frame) throws IOException {
 762         ByteBufferGenerator bbg = new ByteBufferGenerator(this);
 763         frame.setLength();
 764         Log.logFrames(frame, "OUT");
 765         frame.writeOutgoing(bbg);
 766         ByteBuffer[] bufs = bbg.getBufferArray();
 767         connection.write(bufs, 0, bufs.length);
 768     }
 769 
 770     private SettingsFrame getAckFrame(int streamid) {
 771         SettingsFrame frame = new SettingsFrame();
 772         frame.setFlag(SettingsFrame.ACK);
 773         frame.streamid(streamid);
 774         return frame;
 775     }
 776     
 777     static class HeaderDecoder implements DecodingCallback {
 778         HttpHeadersImpl headers;
 779         
 780         HeaderDecoder() {
 781             this.headers = new HttpHeadersImpl();
 782         }
 783         
 784         @Override
 785         public void onDecoded(CharSequence name, CharSequence value) {
 786             headers.addHeader(name.toString(), value.toString());
 787         }
 788         
 789         HttpHeadersImpl headers() {
 790             return headers;
 791         }
 792     }
 793 }
< prev index next >