< prev index next >

src/java.httpclient/share/classes/java/net/http/Http2Connection.java

Print this page
rev 15333 : JDK-8162497 fix obtainSendWindow deadlock
rev 15334 : JDK-8161004 bulk sendWindowUpdate
rev 15335 : Async Queues


  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.net.http;
  27 
  28 import java.io.IOException;
  29 import java.net.InetSocketAddress;
  30 import java.net.URI;
  31 import java.net.http.HttpConnection.Mode;
  32 import java.nio.ByteBuffer;
  33 import java.nio.charset.StandardCharsets;
  34 import java.util.Collection;
  35 import java.util.HashMap;
  36 import java.util.LinkedList;
  37 import java.util.List;
  38 import java.util.Map;
  39 import java.util.concurrent.CompletableFuture;
  40 import sun.net.httpclient.hpack.Encoder;
  41 import sun.net.httpclient.hpack.Decoder;
  42 import static java.net.http.SettingsFrame.*;
  43 import static java.net.http.Utils.BUFSIZE;
  44 import java.util.ArrayList;
  45 import java.util.Collections;
  46 import java.util.Formatter;
  47 import java.util.stream.Collectors;
  48 import sun.net.httpclient.hpack.DecodingCallback;
  49 
  50 /**
  51  * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
  52  * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
  53  *
  54  * Http2Connections belong to a Http2ClientImpl, (one of) which belongs


  61  *
  62  * Sending is done by writing directly to underlying HttpConnection object which
  63  * is operating in async mode. No flow control applies on output at this level
  64  * and all writes are just executed as puts to an output Q belonging to HttpConnection
  65  * Flow control is implemented by HTTP/2 protocol itself.
  66  *
  67  * Hpack header compression
  68  * and outgoing stream creation is also done here, because these operations
  69  * must be synchronized at the socket level. Stream objects send frames simply
  70  * by placing them on the connection's output Queue. sendFrame() is called
  71  * from a higher level (Stream) thread.
  72  *
  73  * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
  74  * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
  75  * or handles them directly itself. This thread performs hpack decompression
  76  * and incoming stream creation (Server push). Incoming frames destined for a
  77  * stream are provided by calling Stream.incoming().
  78  */
  79 class Http2Connection implements BufferHandler {
  80 
  81     final Queue<Http2Frame> outputQ;
  82     volatile boolean closed;
  83 
  84     //-------------------------------------
  85     final HttpConnection connection;

  86     HttpClientImpl client;
  87     final Http2ClientImpl client2;
  88     Map<Integer,Stream> streams;
  89     int nextstreamid = 3; // stream 1 is registered separately
  90     int nextPushStream = 2;
  91     Encoder hpackOut;
  92     Decoder hpackIn;
  93     SettingsFrame clientSettings, serverSettings;
  94     ByteBufferConsumer bbc;
  95     final LinkedList<ByteBuffer> freeList;
  96     final String key; // for HttpClientImpl.connections map
  97     FrameReader reader;
  98 
  99     // Connection level flow control windows
 100     int sendWindow = INITIAL_WINDOW_SIZE;
 101 
 102     final static int DEFAULT_FRAME_SIZE = 16 * 1024;
 103     private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY;
 104 
 105     final ExecutorWrapper executor;
 106 

 107     /**
 108      * This is established by the protocol spec and the peer will update it with
 109      * WINDOW_UPDATEs, which affects the sendWindow.
 110      */
 111     final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
 112 
 113     // TODO: need list of control frames from other threads
 114     // that need to be sent
 115 
 116     /**
 117      * Case 1) Create from upgraded HTTP/1.1 connection.
 118      * Is ready to use. Will not be SSL. exchange is the Exchange
 119      * that initiated the connection, whose response will be delivered
 120      * on a Stream.
 121      */
 122     Http2Connection(HttpConnection connection, Http2ClientImpl client2,
 123             Exchange exchange) throws IOException, InterruptedException {
 124         this.outputQ = new Queue<>();
 125         String msg = "Connection send window size " + Integer.toString(sendWindow);
 126         Log.logTrace(msg);
 127 
 128         //this.initialExchange = exchange;
 129         assert !(connection instanceof SSLConnection);
 130         this.connection = connection;

 131         this.client = client2.client();
 132         this.client2 = client2;
 133         this.executor = client.executorWrapper();
 134         this.freeList = new LinkedList<>();
 135         this.key = keyFor(connection);
 136         streams = Collections.synchronizedMap(new HashMap<>());
 137         initCommon();
 138         //sendConnectionPreface();
 139         Stream initialStream = createStream(exchange);
 140         initialStream.registerStream(1);
 141         initialStream.requestSent();
 142         sendConnectionPreface();
 143         connection.configureMode(Mode.ASYNC);
 144         // start reading and writing
 145         // start reading
 146         AsyncConnection asyncConn = (AsyncConnection)connection;
 147         asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
 148         asyncReceive(connection.getRemaining());
 149         asyncConn.startReading();
 150     }
 151 
 152     // async style but completes immediately
 153     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
 154             Http2ClientImpl client2, Exchange exchange) {
 155         CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
 156         try {
 157             Http2Connection c = new Http2Connection(connection, client2, exchange);
 158             cf.complete(c);
 159         } catch (IOException | InterruptedException e) {
 160             cf.completeExceptionally(e);
 161         }
 162         return cf;
 163     }
 164 
 165     /**
 166      * Cases 2) 3)
 167      *
 168      * request is request to be sent.
 169      */
 170     Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
 171         InetSocketAddress proxy = request.proxy();
 172         URI uri = request.uri();
 173         InetSocketAddress addr = Utils.getAddress(request);
 174         String msg = "Connection send window size " + Integer.toString(sendWindow);
 175         Log.logTrace(msg);
 176         this.key = keyFor(uri, proxy);
 177         this.connection = HttpConnection.getConnection(addr, request, this);

 178         streams = Collections.synchronizedMap(new HashMap<>());
 179         this.client = request.client();
 180         this.client2 = client.client2();
 181         this.executor = client.executorWrapper();
 182         this.freeList = new LinkedList<>();
 183         this.outputQ = new Queue<>();
 184         nextstreamid = 1;
 185         initCommon();
 186         connection.connect();
 187         connection.configureMode(Mode.ASYNC);
 188         // start reading
 189         AsyncConnection asyncConn = (AsyncConnection)connection;
 190         asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
 191         sendConnectionPreface();
 192         asyncConn.startReading();
 193     }
 194 
 195     // NEW
 196     synchronized void obtainSendWindow(int amount) throws InterruptedException {
 197         while (amount > 0) {
 198             int n = Math.min(amount, sendWindow);
 199             sendWindow -= n;
 200             amount -= n;
 201             if (amount > 0)
 202                 wait();
 203         }
 204     }
 205 
 206     synchronized void updateSendWindow(int amount) {
 207         if (sendWindow == 0) {
 208             sendWindow += amount;
 209             notifyAll();
 210         } else
 211             sendWindow += amount;
 212     }
 213 
 214     synchronized int sendWindow() {
 215         return sendWindow;
 216     }
 217 
 218     static String keyFor(HttpConnection connection) {
 219         boolean isProxy = connection.isProxied();
 220         boolean isSecure = connection.isSecure();
 221         InetSocketAddress addr = connection.address();
 222 
 223         return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
 224     }
 225 
 226     static String keyFor(URI uri, InetSocketAddress proxy) {
 227         boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
 228         boolean isProxy = proxy != null;
 229 
 230         String host;
 231         int port;
 232 
 233         if (isProxy) {
 234             host = proxy.getHostString();
 235             port = proxy.getPort();


 449             handleWindowUpdate(f);}
 450             break;
 451           default:
 452             protocolError(ErrorFrame.PROTOCOL_ERROR);
 453         }
 454     }
 455 
 456     void resetStream(int streamid, int code) throws IOException, InterruptedException {
 457         Log.logError(
 458             "Resetting stream {0,number,integer} with error code {1,number,integer}",
 459             streamid, code);
 460         ResetFrame frame = new ResetFrame();
 461         frame.streamid(streamid);
 462         frame.setErrorCode(code);
 463         sendFrame(frame);
 464         streams.remove(streamid);
 465     }
 466 
 467     private void handleWindowUpdate(WindowUpdateFrame f)
 468             throws IOException, InterruptedException {
 469         updateSendWindow(f.getUpdate());
 470     }
 471 
 472     private void protocolError(int errorCode)
 473             throws IOException, InterruptedException {
 474         GoAwayFrame frame = new GoAwayFrame();
 475         frame.setErrorCode(errorCode);
 476         sendFrame(frame);
 477         String msg = "Error code: " + errorCode;
 478         shutdown(new IOException("protocol error"));
 479     }
 480 
 481     private void handleSettings(SettingsFrame frame)
 482             throws IOException, InterruptedException {
 483         if (frame.getFlag(SettingsFrame.ACK)) {
 484             // ignore ack frames for now.
 485             return;
 486         }
 487         serverSettings = frame;
 488         SettingsFrame ack = getAckFrame(frame.streamid());
 489         sendFrame(ack);
 490     }
 491 
 492     private void handlePing(PingFrame frame)
 493             throws IOException, InterruptedException {
 494         frame.setFlag(PingFrame.ACK);
 495         sendFrame(frame);
 496     }
 497 
 498     private void handleGoAway(GoAwayFrame frame)
 499             throws IOException, InterruptedException {
 500         //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
 501         shutdown(new IOException("GOAWAY received"));
 502     }
 503 
 504     private void initCommon() {
 505         clientSettings = client2.getClientSettings();
 506 
 507         // serverSettings will be updated by server
 508         serverSettings = SettingsFrame.getDefaultSettings();
 509         hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 510         hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));







 511     }
 512 
 513     /**
 514      * Max frame size we are allowed to send
 515      */
 516     public int getMaxSendFrameSize() {
 517         int param = serverSettings.getParameter(MAX_FRAME_SIZE);
 518         if (param == -1) {
 519             param = DEFAULT_FRAME_SIZE;
 520         }
 521         return param;
 522     }
 523 
 524     /**
 525      * Max frame size we will receive
 526      */
 527     public int getMaxReceiveFrameSize() {
 528         return clientSettings.getParameter(MAX_FRAME_SIZE);
 529     }
 530 
 531     // Not sure how useful this is.
 532     public int getMaxHeadersSize() {
 533         return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
 534     }
 535 
 536     private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
 537 
 538     private static final byte[] PREFACE_BYTES =
 539         CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
 540 
 541     /**
 542      * Sends Connection preface and Settings frame with current preferred
 543      * values
 544      */
 545     private void sendConnectionPreface() throws IOException {
 546         ByteBufferGenerator bg = new ByteBufferGenerator(this);
 547         bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
 548         ByteBuffer[] ba = bg.getBufferArray();
 549         connection.write(ba, 0, ba.length);
 550 
 551         bg = new ByteBufferGenerator(this);
 552         SettingsFrame sf = client2.getClientSettings();
 553         Log.logFrames(sf, "OUT");
 554         sf.writeOutgoing(bg);
 555         WindowUpdateFrame wup = new WindowUpdateFrame();
 556         wup.streamid(0);
 557         // send a Window update for the receive buffer we are using
 558         // minus the initial 64 K specified in protocol
 559         wup.setUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));
 560         wup.computeLength();
 561         wup.writeOutgoing(bg);
 562         Log.logFrames(wup, "OUT");
 563         ba = bg.getBufferArray();
 564         connection.write(ba, 0, ba.length);
 565     }
 566 
 567     /**
 568      * Returns an existing Stream with given id, or null if doesn't exist
 569      */
 570     Stream getStream(int streamid) {
 571         return streams.get(streamid);
 572     }
 573 
 574     /**
 575      * Creates Stream with given id.
 576      */
 577     Stream createStream(Exchange exchange) {
 578         Stream stream = new Stream(client, this, exchange);
 579         return stream;
 580     }
 581 
 582     Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) {
 583         Stream.PushGroup<?> pg = parent.request.pushGroup();
 584         return new Stream.PushedStream(pg, client, this, parent, pushReq);


 675             buffers.add(buffer);
 676         }
 677         for (Map.Entry<String, List<String>> e : hdrs.map().entrySet()) {
 678             String key = e.getKey();
 679             String lkey = key.toLowerCase();
 680             List<String> values = e.getValue();
 681             for (String value : values) {
 682                 hpackOut.header(lkey, value);
 683                 boolean encoded = false;
 684                 do {
 685                     encoded = hpackOut.encode(buffer);
 686                     if (!encoded) {
 687                         buffer = getBuffer();
 688                         buffers.add(buffer);
 689                     }
 690                 } while (!encoded);
 691             }
 692         }
 693     }
 694 
 695     public void sendFrames(List<Http2Frame> frames) throws IOException, InterruptedException {
 696         for (Http2Frame frame : frames) {
 697             sendFrame(frame);
 698         }
 699     }
 700 
 701     static Throwable getExceptionFrom(CompletableFuture<?> cf) {
 702         try {
 703             cf.get();
 704             return null;
 705         } catch (Throwable e) {
 706             if (e.getCause() != null)
 707                 return e.getCause();
 708             else
 709                 return e;
 710         }
 711     }
 712 
 713 
 714     void execute(Runnable r) {
 715         executor.execute(r, null);
 716     }
 717 
 718     private final Object sendlock = new Object();
 719 
 720     /**
 721      *
 722      */
 723     void sendFrame(Http2Frame frame) {
 724         synchronized (sendlock) {
 725             try {

 726                 if (frame instanceof OutgoingHeaders) {
 727                     OutgoingHeaders oh = (OutgoingHeaders) frame;
 728                     Stream stream = oh.getStream();
 729                     stream.registerStream(nextstreamid);
 730                     oh.streamid(nextstreamid);
 731                     nextstreamid += 2;
 732                     // set outgoing window here. This allows thread sending
 733                     // body to proceed.
 734                     stream.updateOutgoingWindow(getInitialSendWindowSize());
 735                     LinkedList<Http2Frame> frames = encodeHeaders(oh);
 736                     for (Http2Frame f : frames) {
 737                         sendOneFrame(f);
 738                     }
 739                 } else {
 740                     sendOneFrame(frame);
 741                 }

 742 
 743             } catch (IOException e) {
 744                 if (!closed) {
 745                     Log.logError(e);
 746                     shutdown(e);
 747                 }

 748             }
 749         }

 750     }
 751 
 752     /**
 753      * Send a frame.
 754      *
 755      * @param frame
 756      * @throws IOException
 757      */
 758     private void sendOneFrame(Http2Frame frame) throws IOException {
 759         ByteBufferGenerator bbg = new ByteBufferGenerator(this);
 760         frame.computeLength();
 761         Log.logFrames(frame, "OUT");
 762         frame.writeOutgoing(bbg);
 763         ByteBuffer[] currentBufs = bbg.getBufferArray();
 764         connection.write(currentBufs, 0, currentBufs.length);
 765     }
 766 








































 767 
 768     private SettingsFrame getAckFrame(int streamid) {
 769         SettingsFrame frame = new SettingsFrame();
 770         frame.setFlag(SettingsFrame.ACK);
 771         frame.streamid(streamid);
 772         return frame;
 773     }
 774 
 775     static class HeaderDecoder implements DecodingCallback {
 776         HttpHeadersImpl headers;
 777 
 778         HeaderDecoder() {
 779             this.headers = new HttpHeadersImpl();
 780         }
 781 
 782         @Override
 783         public void onDecoded(CharSequence name, CharSequence value) {
 784             headers.addHeader(name.toString(), value.toString());
 785         }
 786 


  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.net.http;
  27 
  28 import java.io.IOException;
  29 import java.net.InetSocketAddress;
  30 import java.net.URI;
  31 import java.net.http.HttpConnection.Mode;
  32 import java.nio.ByteBuffer;
  33 import java.nio.charset.StandardCharsets;

  34 import java.util.HashMap;
  35 import java.util.LinkedList;
  36 import java.util.List;
  37 import java.util.Map;
  38 import java.util.concurrent.CompletableFuture;
  39 import sun.net.httpclient.hpack.Encoder;
  40 import sun.net.httpclient.hpack.Decoder;
  41 import static java.net.http.SettingsFrame.*;
  42 import static java.net.http.Utils.BUFSIZE;
  43 import java.util.ArrayList;
  44 import java.util.Collections;
  45 import java.util.Formatter;
  46 import java.util.stream.Collectors;
  47 import sun.net.httpclient.hpack.DecodingCallback;
  48 
  49 /**
  50  * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
  51  * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
  52  *
  53  * Http2Connections belong to a Http2ClientImpl, (one of) which belongs


  60  *
  61  * Sending is done by writing directly to underlying HttpConnection object which
  62  * is operating in async mode. No flow control applies on output at this level
  63  * and all writes are just executed as puts to an output Q belonging to HttpConnection
  64  * Flow control is implemented by HTTP/2 protocol itself.
  65  *
  66  * Hpack header compression
  67  * and outgoing stream creation is also done here, because these operations
  68  * must be synchronized at the socket level. Stream objects send frames simply
  69  * by placing them on the connection's output Queue. sendFrame() is called
  70  * from a higher level (Stream) thread.
  71  *
  72  * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
  73  * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
  74  * or handles them directly itself. This thread performs hpack decompression
  75  * and incoming stream creation (Server push). Incoming frames destined for a
  76  * stream are provided by calling Stream.incoming().
  77  */
  78 class Http2Connection implements BufferHandler {
  79 

  80     volatile boolean closed;
  81 
  82     //-------------------------------------
  83     final HttpConnection connection;
  84     final AsyncConnection connectionAsync;
  85     HttpClientImpl client;
  86     final Http2ClientImpl client2;
  87     Map<Integer,Stream> streams;
  88     int nextstreamid = 3; // stream 1 is registered separately
  89     int nextPushStream = 2;
  90     Encoder hpackOut;
  91     Decoder hpackIn;
  92     SettingsFrame clientSettings, serverSettings;

  93     final LinkedList<ByteBuffer> freeList;
  94     final String key; // for HttpClientImpl.connections map
  95     FrameReader reader;
  96 
  97     // Connection level flow control windows
  98     final WindowControl connectionSendWindow = new WindowControl(INITIAL_WINDOW_SIZE);
  99 
 100     final static int DEFAULT_FRAME_SIZE = 16 * 1024;
 101     private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY;
 102 
 103     final ExecutorWrapper executor;
 104 
 105     WindowUpdateSender windowUpdater;
 106     /**
 107      * This is established by the protocol spec and the peer will update it with
 108      * WINDOW_UPDATEs, which affects the connectionSendWindow.
 109      */
 110     final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
 111 
 112     // TODO: need list of control frames from other threads
 113     // that need to be sent
 114 
 115     /**
 116      * Case 1) Create from upgraded HTTP/1.1 connection.
 117      * Is ready to use. Will not be SSL. exchange is the Exchange
 118      * that initiated the connection, whose response will be delivered
 119      * on a Stream.
 120      */
 121     Http2Connection(HttpConnection connection, Http2ClientImpl client2,
 122             Exchange exchange) throws IOException, InterruptedException {
 123         String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available());

 124         Log.logTrace(msg);
 125 
 126         //this.initialExchange = exchange;
 127         assert !(connection instanceof SSLConnection);
 128         this.connection = connection;
 129         this.connectionAsync = (AsyncConnection)connection;
 130         this.client = client2.client();
 131         this.client2 = client2;
 132         this.executor = client.executorWrapper();
 133         this.freeList = new LinkedList<>();
 134         this.key = keyFor(connection);
 135         streams = Collections.synchronizedMap(new HashMap<>());
 136         initCommon();
 137         //sendConnectionPreface();
 138         Stream initialStream = createStream(exchange);
 139         initialStream.registerStream(1);
 140         initialStream.requestSent();
 141         sendConnectionPreface();

 142         // start reading and writing
 143         // start reading
 144         connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown);
 145         connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
 146         asyncReceive(connection.getRemaining());
 147         connectionAsync.startReading();
 148     }
 149 
 150     // async style but completes immediately
 151     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
 152             Http2ClientImpl client2, Exchange exchange) {
 153         CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
 154         try {
 155             Http2Connection c = new Http2Connection(connection, client2, exchange);
 156             cf.complete(c);
 157         } catch (IOException | InterruptedException e) {
 158             cf.completeExceptionally(e);
 159         }
 160         return cf;
 161     }
 162 
 163     /**
 164      * Cases 2) 3)
 165      *
 166      * request is request to be sent.
 167      */
 168     Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
 169         InetSocketAddress proxy = request.proxy();
 170         URI uri = request.uri();
 171         InetSocketAddress addr = Utils.getAddress(request);
 172         String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available());
 173         Log.logTrace(msg);
 174         this.key = keyFor(uri, proxy);
 175         this.connection = HttpConnection.getConnection(addr, request, this);
 176         this.connectionAsync = (AsyncConnection)connection;
 177         streams = Collections.synchronizedMap(new HashMap<>());
 178         this.client = request.client();
 179         this.client2 = client.client2();
 180         this.executor = client.executorWrapper();
 181         this.freeList = new LinkedList<>();

 182         nextstreamid = 1;
 183         initCommon();
 184         connection.connect();




 185         sendConnectionPreface();
 186         // start reading
 187         connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown);
 188         connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.


















 189 
 190         connectionAsync.startReading();

 191     }
 192 
 193     static String keyFor(HttpConnection connection) {
 194         boolean isProxy = connection.isProxied();
 195         boolean isSecure = connection.isSecure();
 196         InetSocketAddress addr = connection.address();
 197 
 198         return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
 199     }
 200 
 201     static String keyFor(URI uri, InetSocketAddress proxy) {
 202         boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
 203         boolean isProxy = proxy != null;
 204 
 205         String host;
 206         int port;
 207 
 208         if (isProxy) {
 209             host = proxy.getHostString();
 210             port = proxy.getPort();


 424             handleWindowUpdate(f);}
 425             break;
 426           default:
 427             protocolError(ErrorFrame.PROTOCOL_ERROR);
 428         }
 429     }
 430 
 431     void resetStream(int streamid, int code) throws IOException, InterruptedException {
 432         Log.logError(
 433             "Resetting stream {0,number,integer} with error code {1,number,integer}",
 434             streamid, code);
 435         ResetFrame frame = new ResetFrame();
 436         frame.streamid(streamid);
 437         frame.setErrorCode(code);
 438         sendFrame(frame);
 439         streams.remove(streamid);
 440     }
 441 
 442     private void handleWindowUpdate(WindowUpdateFrame f)
 443             throws IOException, InterruptedException {
 444         connectionSendWindow.update(f.getUpdate());
 445     }
 446 
 447     private void protocolError(int errorCode)
 448             throws IOException, InterruptedException {
 449         GoAwayFrame frame = new GoAwayFrame();
 450         frame.setErrorCode(errorCode);
 451         sendFrame(frame);
 452         String msg = "Error code: " + errorCode;
 453         shutdown(new IOException("protocol error"));
 454     }
 455 
 456     private void handleSettings(SettingsFrame frame)
 457             throws IOException, InterruptedException {
 458         if (frame.getFlag(SettingsFrame.ACK)) {
 459             // ignore ack frames for now.
 460             return;
 461         }
 462         serverSettings = frame;
 463         SettingsFrame ack = getAckFrame(frame.streamid());
 464         sendFrame(ack);
 465     }
 466 
 467     private void handlePing(PingFrame frame)
 468             throws IOException, InterruptedException {
 469         frame.setFlag(PingFrame.ACK);
 470         sendUnorderedFrame(frame);
 471     }
 472 
 473     private void handleGoAway(GoAwayFrame frame)
 474             throws IOException, InterruptedException {
 475         //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
 476         shutdown(new IOException("GOAWAY received"));
 477     }
 478 
 479     private void initCommon() {
 480         clientSettings = client2.getClientSettings();
 481 
 482         // serverSettings will be updated by server
 483         serverSettings = SettingsFrame.getDefaultSettings();
 484         hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 485         hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 486 
 487         windowUpdater = new WindowUpdateSender(this, client2.client().getReceiveBufferSize()) {
 488             @Override
 489             int getStreamId() {
 490                 return 0;
 491             }
 492         };
 493     }
 494 
 495     /**
 496      * Max frame size we are allowed to send
 497      */
 498     public int getMaxSendFrameSize() {
 499         int param = serverSettings.getParameter(MAX_FRAME_SIZE);
 500         if (param == -1) {
 501             param = DEFAULT_FRAME_SIZE;
 502         }
 503         return param;
 504     }
 505 
 506     /**
 507      * Max frame size we will receive
 508      */
 509     public int getMaxReceiveFrameSize() {
 510         return clientSettings.getParameter(MAX_FRAME_SIZE);
 511     }
 512 
 513     // Not sure how useful this is.
 514     public int getMaxHeadersSize() {
 515         return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
 516     }
 517 
 518     private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
 519 
 520     private static final byte[] PREFACE_BYTES =
 521         CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
 522 
 523     /**
 524      * Sends Connection preface and Settings frame with current preferred
 525      * values
 526      */
 527     private void sendConnectionPreface() throws IOException {
 528         ByteBufferGenerator bg = new ByteBufferGenerator(this);
 529         bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);




 530         SettingsFrame sf = client2.getClientSettings();
 531         Log.logFrames(sf, "OUT");
 532         sf.writeOutgoing(bg);
 533         ByteBuffer[] ba = bg.getBufferArray();
 534         connection.write(ba, 0, ba.length); // write is performed before switch to async mode
 535         // send a Window update for the receive buffer we are using
 536         // minus the initial 64 K specified in protocol
 537         windowUpdater.sendWindowUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));





 538     }
 539 
 540     /**
 541      * Returns an existing Stream with given id, or null if doesn't exist
 542      */
 543     Stream getStream(int streamid) {
 544         return streams.get(streamid);
 545     }
 546 
 547     /**
 548      * Creates Stream with given id.
 549      */
 550     Stream createStream(Exchange exchange) {
 551         Stream stream = new Stream(client, this, exchange);
 552         return stream;
 553     }
 554 
 555     Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) {
 556         Stream.PushGroup<?> pg = parent.request.pushGroup();
 557         return new Stream.PushedStream(pg, client, this, parent, pushReq);


 648             buffers.add(buffer);
 649         }
 650         for (Map.Entry<String, List<String>> e : hdrs.map().entrySet()) {
 651             String key = e.getKey();
 652             String lkey = key.toLowerCase();
 653             List<String> values = e.getValue();
 654             for (String value : values) {
 655                 hpackOut.header(lkey, value);
 656                 boolean encoded = false;
 657                 do {
 658                     encoded = hpackOut.encode(buffer);
 659                     if (!encoded) {
 660                         buffer = getBuffer();
 661                         buffers.add(buffer);
 662                     }
 663                 } while (!encoded);
 664             }
 665         }
 666     }
 667 






 668     static Throwable getExceptionFrom(CompletableFuture<?> cf) {
 669         try {
 670             cf.get();
 671             return null;
 672         } catch (Throwable e) {
 673             if (e.getCause() != null)
 674                 return e.getCause();
 675             else
 676                 return e;
 677         }
 678     }
 679 
 680 
 681     void execute(Runnable r) {
 682         executor.execute(r, null);
 683     }
 684 
 685     private final Object sendlock = new Object();
 686 
 687     /**
 688      *
 689      */
 690     void sendFrame(Http2Frame frame) {
 691         synchronized (sendlock) {
 692             try {
 693                 ByteBuffer[] bufs;
 694                 if (frame instanceof OutgoingHeaders) {
 695                     OutgoingHeaders oh = (OutgoingHeaders) frame;
 696                     Stream stream = oh.getStream();
 697                     stream.registerStream(nextstreamid);
 698                     oh.streamid(nextstreamid);
 699                     nextstreamid += 2;
 700                     // set outgoing window here. This allows thread sending
 701                     // body to proceed.
 702                     stream.updateOutgoingWindow(getInitialSendWindowSize());
 703                     bufs = encodeFrames(encodeHeaders(oh));



 704                 } else {
 705                     bufs = encodeFrame(frame);
 706                 }
 707                 connectionAsync.writeAsync(bufs);
 708 
 709             } catch (IOException e) {
 710                 if (!closed) {
 711                     Log.logError(e);
 712                     shutdown(e);
 713                 }
 714                 return;
 715             }
 716         }
 717         connectionAsync.flushAsync();
 718     }
 719 
 720     private ByteBuffer[] encodeFrame(Http2Frame frame) throws IOException {






 721         ByteBufferGenerator bbg = new ByteBufferGenerator(this);
 722         frame.computeLength();
 723         Log.logFrames(frame, "OUT");
 724         frame.writeOutgoing(bbg);
 725         return bbg.getBufferList().toArray(new ByteBuffer[0]);

 726     }
 727 
 728     private ByteBuffer[] encodeFrames(List<Http2Frame> frames) throws IOException {
 729         List<ByteBuffer> bufs = new ArrayList<>();
 730         for(Http2Frame frame : frames) {
 731             ByteBufferGenerator bbg = new ByteBufferGenerator(this);
 732             frame.computeLength();
 733             Log.logFrames(frame, "OUT");
 734             frame.writeOutgoing(bbg);
 735             bufs.addAll(bbg.getBufferList());
 736         }
 737         return bufs.toArray(new ByteBuffer[0]);
 738     }
 739 
 740     void sendDataFrame(DataFrame frame) {
 741         try {
 742             connectionAsync.writeAsync(encodeFrame(frame));
 743             connectionAsync.flushAsync();
 744         } catch (IOException e) {
 745             if (!closed) {
 746                 Log.logError(e);
 747                 shutdown(e);
 748             }
 749         }
 750     }
 751 
 752     /*
 753      * Direct call of the method bypasses synchronization on "sendlock" and
 754      * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
 755      * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
 756      */
 757     void sendUnorderedFrame(Http2Frame frame){
 758         try {
 759             connectionAsync.writeAsyncUnordered(encodeFrame(frame));
 760             connectionAsync.flushAsync();
 761         } catch (IOException e) {
 762             if (!closed) {
 763                 Log.logError(e);
 764                 shutdown(e);
 765             }
 766         }
 767     }
 768 
 769     private SettingsFrame getAckFrame(int streamid) {
 770         SettingsFrame frame = new SettingsFrame();
 771         frame.setFlag(SettingsFrame.ACK);
 772         frame.streamid(streamid);
 773         return frame;
 774     }
 775 
 776     static class HeaderDecoder implements DecodingCallback {
 777         HttpHeadersImpl headers;
 778 
 779         HeaderDecoder() {
 780             this.headers = new HttpHeadersImpl();
 781         }
 782 
 783         @Override
 784         public void onDecoded(CharSequence name, CharSequence value) {
 785             headers.addHeader(name.toString(), value.toString());
 786         }
 787 
< prev index next >