< prev index next >

src/java.httpclient/share/classes/java/net/http/Http2Connection.java

Print this page
rev 15333 : JDK-8162497 fix obtainSendWindow deadlock
rev 15334 : JDK-8161004 bulk sendWindowUpdate
rev 15335 : Async Queues

@@ -29,11 +29,10 @@
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.http.HttpConnection.Mode;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;

@@ -76,39 +75,39 @@
  * and incoming stream creation (Server push). Incoming frames destined for a
  * stream are provided by calling Stream.incoming().
  */
 class Http2Connection implements BufferHandler {
 
-    final Queue<Http2Frame> outputQ;
     volatile boolean closed;
 
     //-------------------------------------
     final HttpConnection connection;
+    final AsyncConnection connectionAsync;
     HttpClientImpl client;
     final Http2ClientImpl client2;
     Map<Integer,Stream> streams;
     int nextstreamid = 3; // stream 1 is registered separately
     int nextPushStream = 2;
     Encoder hpackOut;
     Decoder hpackIn;
     SettingsFrame clientSettings, serverSettings;
-    ByteBufferConsumer bbc;
     final LinkedList<ByteBuffer> freeList;
     final String key; // for HttpClientImpl.connections map
     FrameReader reader;
 
     // Connection level flow control windows
-    int sendWindow = INITIAL_WINDOW_SIZE;
+    final WindowControl connectionSendWindow = new WindowControl(INITIAL_WINDOW_SIZE);
 
     final static int DEFAULT_FRAME_SIZE = 16 * 1024;
     private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY;
 
     final ExecutorWrapper executor;
 
+    WindowUpdateSender windowUpdater;
     /**
      * This is established by the protocol spec and the peer will update it with
-     * WINDOW_UPDATEs, which affects the sendWindow.
+     * WINDOW_UPDATEs, which affects the connectionSendWindow.
      */
     final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
 
     // TODO: need list of control frames from other threads
     // that need to be sent

@@ -119,17 +118,17 @@
      * that initiated the connection, whose response will be delivered
      * on a Stream.
      */
     Http2Connection(HttpConnection connection, Http2ClientImpl client2,
             Exchange exchange) throws IOException, InterruptedException {
-        this.outputQ = new Queue<>();
-        String msg = "Connection send window size " + Integer.toString(sendWindow);
+        String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available());
         Log.logTrace(msg);
 
         //this.initialExchange = exchange;
         assert !(connection instanceof SSLConnection);
         this.connection = connection;
+        this.connectionAsync = (AsyncConnection)connection;
         this.client = client2.client();
         this.client2 = client2;
         this.executor = client.executorWrapper();
         this.freeList = new LinkedList<>();
         this.key = keyFor(connection);

@@ -138,17 +137,16 @@
         //sendConnectionPreface();
         Stream initialStream = createStream(exchange);
         initialStream.registerStream(1);
         initialStream.requestSent();
         sendConnectionPreface();
-        connection.configureMode(Mode.ASYNC);
         // start reading and writing
         // start reading
-        AsyncConnection asyncConn = (AsyncConnection)connection;
-        asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+        connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+        connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
         asyncReceive(connection.getRemaining());
-        asyncConn.startReading();
+        connectionAsync.startReading();
     }
 
     // async style but completes immediately
     static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
             Http2ClientImpl client2, Exchange exchange) {

@@ -169,52 +167,29 @@
      */
     Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
         InetSocketAddress proxy = request.proxy();
         URI uri = request.uri();
         InetSocketAddress addr = Utils.getAddress(request);
-        String msg = "Connection send window size " + Integer.toString(sendWindow);
+        String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available());
         Log.logTrace(msg);
         this.key = keyFor(uri, proxy);
         this.connection = HttpConnection.getConnection(addr, request, this);
+        this.connectionAsync = (AsyncConnection)connection;
         streams = Collections.synchronizedMap(new HashMap<>());
         this.client = request.client();
         this.client2 = client.client2();
         this.executor = client.executorWrapper();
         this.freeList = new LinkedList<>();
-        this.outputQ = new Queue<>();
         nextstreamid = 1;
         initCommon();
         connection.connect();
-        connection.configureMode(Mode.ASYNC);
-        // start reading
-        AsyncConnection asyncConn = (AsyncConnection)connection;
-        asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
         sendConnectionPreface();
-        asyncConn.startReading();
-    }
-
-    // NEW
-    synchronized void obtainSendWindow(int amount) throws InterruptedException {
-        while (amount > 0) {
-            int n = Math.min(amount, sendWindow);
-            sendWindow -= n;
-            amount -= n;
-            if (amount > 0)
-                wait();
-        }
-    }
-
-    synchronized void updateSendWindow(int amount) {
-        if (sendWindow == 0) {
-            sendWindow += amount;
-            notifyAll();
-        } else
-            sendWindow += amount;
-    }
+        // start reading
+        connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+        connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
 
-    synchronized int sendWindow() {
-        return sendWindow;
+        connectionAsync.startReading();
     }
 
     static String keyFor(HttpConnection connection) {
         boolean isProxy = connection.isProxied();
         boolean isSecure = connection.isSecure();

@@ -464,11 +439,11 @@
         streams.remove(streamid);
     }
 
     private void handleWindowUpdate(WindowUpdateFrame f)
             throws IOException, InterruptedException {
-        updateSendWindow(f.getUpdate());
+        connectionSendWindow.update(f.getUpdate());
     }
 
     private void protocolError(int errorCode)
             throws IOException, InterruptedException {
         GoAwayFrame frame = new GoAwayFrame();

@@ -490,11 +465,11 @@
     }
 
     private void handlePing(PingFrame frame)
             throws IOException, InterruptedException {
         frame.setFlag(PingFrame.ACK);
-        sendFrame(frame);
+        sendUnorderedFrame(frame);
     }
 
     private void handleGoAway(GoAwayFrame frame)
             throws IOException, InterruptedException {
         //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));

@@ -506,10 +481,17 @@
 
         // serverSettings will be updated by server
         serverSettings = SettingsFrame.getDefaultSettings();
         hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
         hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
+
+        windowUpdater = new WindowUpdateSender(this, client2.client().getReceiveBufferSize()) {
+            @Override
+            int getStreamId() {
+                return 0;
+            }
+        };
     }
 
     /**
      * Max frame size we are allowed to send
      */

@@ -543,27 +525,18 @@
      * values
      */
     private void sendConnectionPreface() throws IOException {
         ByteBufferGenerator bg = new ByteBufferGenerator(this);
         bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
-        ByteBuffer[] ba = bg.getBufferArray();
-        connection.write(ba, 0, ba.length);
-
-        bg = new ByteBufferGenerator(this);
         SettingsFrame sf = client2.getClientSettings();
         Log.logFrames(sf, "OUT");
         sf.writeOutgoing(bg);
-        WindowUpdateFrame wup = new WindowUpdateFrame();
-        wup.streamid(0);
+        ByteBuffer[] ba = bg.getBufferArray();
+        connection.write(ba, 0, ba.length); // write is performed before switch to async mode
         // send a Window update for the receive buffer we are using
         // minus the initial 64 K specified in protocol
-        wup.setUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));
-        wup.computeLength();
-        wup.writeOutgoing(bg);
-        Log.logFrames(wup, "OUT");
-        ba = bg.getBufferArray();
-        connection.write(ba, 0, ba.length);
+        windowUpdater.sendWindowUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));
     }
 
     /**
      * Returns an existing Stream with given id, or null if doesn't exist
      */

@@ -690,16 +663,10 @@
                 } while (!encoded);
             }
         }
     }
 
-    public void sendFrames(List<Http2Frame> frames) throws IOException, InterruptedException {
-        for (Http2Frame frame : frames) {
-            sendFrame(frame);
-        }
-    }
-
     static Throwable getExceptionFrom(CompletableFuture<?> cf) {
         try {
             cf.get();
             return null;
         } catch (Throwable e) {

@@ -721,51 +688,85 @@
      *
      */
     void sendFrame(Http2Frame frame) {
         synchronized (sendlock) {
             try {
+                ByteBuffer[] bufs;
                 if (frame instanceof OutgoingHeaders) {
                     OutgoingHeaders oh = (OutgoingHeaders) frame;
                     Stream stream = oh.getStream();
                     stream.registerStream(nextstreamid);
                     oh.streamid(nextstreamid);
                     nextstreamid += 2;
                     // set outgoing window here. This allows thread sending
                     // body to proceed.
                     stream.updateOutgoingWindow(getInitialSendWindowSize());
-                    LinkedList<Http2Frame> frames = encodeHeaders(oh);
-                    for (Http2Frame f : frames) {
-                        sendOneFrame(f);
-                    }
+                    bufs = encodeFrames(encodeHeaders(oh));
                 } else {
-                    sendOneFrame(frame);
+                    bufs = encodeFrame(frame);
                 }
+                connectionAsync.writeAsync(bufs);
 
             } catch (IOException e) {
                 if (!closed) {
                     Log.logError(e);
                     shutdown(e);
                 }
+                return;
             }
         }
+        connectionAsync.flushAsync();
     }
 
-    /**
-     * Send a frame.
-     *
-     * @param frame
-     * @throws IOException
-     */
-    private void sendOneFrame(Http2Frame frame) throws IOException {
+    private ByteBuffer[] encodeFrame(Http2Frame frame) throws IOException {
         ByteBufferGenerator bbg = new ByteBufferGenerator(this);
         frame.computeLength();
         Log.logFrames(frame, "OUT");
         frame.writeOutgoing(bbg);
-        ByteBuffer[] currentBufs = bbg.getBufferArray();
-        connection.write(currentBufs, 0, currentBufs.length);
+        return bbg.getBufferList().toArray(new ByteBuffer[0]);
     }
 
+    private ByteBuffer[] encodeFrames(List<Http2Frame> frames) throws IOException {
+        List<ByteBuffer> bufs = new ArrayList<>();
+        for(Http2Frame frame : frames) {
+            ByteBufferGenerator bbg = new ByteBufferGenerator(this);
+            frame.computeLength();
+            Log.logFrames(frame, "OUT");
+            frame.writeOutgoing(bbg);
+            bufs.addAll(bbg.getBufferList());
+        }
+        return bufs.toArray(new ByteBuffer[0]);
+    }
+
+    void sendDataFrame(DataFrame frame) {
+        try {
+            connectionAsync.writeAsync(encodeFrame(frame));
+            connectionAsync.flushAsync();
+        } catch (IOException e) {
+            if (!closed) {
+                Log.logError(e);
+                shutdown(e);
+            }
+        }
+    }
+
+    /*
+     * Direct call of the method bypasses synchronization on "sendlock" and
+     * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
+     * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
+     */
+    void sendUnorderedFrame(Http2Frame frame){
+        try {
+            connectionAsync.writeAsyncUnordered(encodeFrame(frame));
+            connectionAsync.flushAsync();
+        } catch (IOException e) {
+            if (!closed) {
+                Log.logError(e);
+                shutdown(e);
+            }
+        }
+    }
 
     private SettingsFrame getAckFrame(int streamid) {
         SettingsFrame frame = new SettingsFrame();
         frame.setFlag(SettingsFrame.ACK);
         frame.streamid(streamid);
< prev index next >