< prev index next >

src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java

Print this page
rev 15335 : Async Queues

@@ -48,23 +48,30 @@
 class PlainHttpConnection extends HttpConnection implements AsyncConnection {
 
     protected SocketChannel chan;
     private volatile boolean connected;
     private boolean closed;
-    Consumer<ByteBuffer> asyncReceiver;
-    Consumer<Throwable> errorReceiver;
-    Queue<ByteBuffer> asyncOutputQ;
+
+    // should be volatile to provide proper synchronization action
+    volatile Consumer<ByteBuffer> asyncReceiver;
+
+    // should be volatile to provide proper synchronization action
+    volatile Consumer<Throwable> errorReceiver;
+
+    // asyncOutputQ may be non volatile if and only if the following conditions are satisfied:
+    // 1. Writing to the field is performed BEFORE setting field HttpConnection.mode (which is volatile) to ASYNC.
+    //    That provides proper "release" operation.
+    // 2. Reading from the field is performed AFTER checking that HttpConnection.mode set to ASYNC.
+    //    That provides proper "acquire" action.
+    AsyncWriteQueue<ByteBuffer[]> asyncOutputQ;
+
     final Object reading = new Object();
-    final Object writing = new Object();
+
 
     @Override
     public void startReading() {
-        try {
             client.registerEvent(new ReadEvent());
-        } catch (IOException e) {
-            shutdown();
-        }
     }
 
     class ConnectEvent extends AsyncEvent {
         CompletableFuture<Void> cf;
 

@@ -139,64 +146,75 @@
     @Override
     long write(ByteBuffer[] buffers, int start, int number) throws IOException {
         if (mode != Mode.ASYNC)
             return chan.write(buffers, start, number);
         // async
-        synchronized(writing) {
-            int qlen = asyncOutputQ.size();
-            ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
-            long n = Utils.remaining(bufs);
-            asyncOutputQ.putAll(bufs);
-            if (qlen == 0)
-                asyncOutput();
+        buffers = Utils.reduce(buffers, start, number);
+        long n = Utils.remaining(buffers);
+        asyncOutputQ.put(buffers);
+        flushAsync();
             return n;
         }
+
+    @Override
+    long write(ByteBuffer buffer) throws IOException {
+        if (mode != Mode.ASYNC)
+            return chan.write(buffer);
+        // async
+        long n = buffer.remaining();
+        asyncOutputQ.put(new ByteBuffer[]{buffer});
+        flushAsync();
+        return n;
     }
 
-    ByteBuffer asyncBuffer = null;
+    // handle registered WriteEvent; invoked from SelectorManager thread
+    void flushRegistered() {
+        if (mode == Mode.ASYNC) {
+            asyncOutputQ.flushDelayed();
+        }
+    }
 
-    void asyncOutput() {
-        synchronized (writing) {
-            try {
-                while (true) {
-                    if (asyncBuffer == null) {
-                        asyncBuffer = asyncOutputQ.poll();
-                        if (asyncBuffer == null) {
-                            return;
+    @Override
+    public void writeAsync(ByteBuffer[] buffers) throws IOException {
+        if (mode != Mode.ASYNC) {
+            write(buffers, 0, buffers.length);
+        } else {
+            asyncOutputQ.put(buffers);
                         }
                     }
-                    if (!asyncBuffer.hasRemaining()) {
-                        asyncBuffer = null;
-                        continue;
+
+    @Override
+    public void writeAsyncUnordered(ByteBuffer[] buffers) throws IOException {
+        if (mode != Mode.ASYNC) {
+            write(buffers, 0, buffers.length);
+        } else {
+            // Unordered frames are sent before existing frames.
+            asyncOutputQ.putFirst(buffers);
+        }
+    }
+
+    @Override
+    public void flushAsync() {
+        if (mode == Mode.ASYNC) {
+            asyncOutputQ.flush();
                     }
-                    int n = chan.write(asyncBuffer);
-                    //System.err.printf("Written %d bytes to chan\n", n);
+    }
+
+    void asyncOutput(ByteBuffer[] bufs, Consumer<ByteBuffer[]> setDelayCallback) {
+        try {
+            while (Utils.remaining(bufs) > 0) {
+                long n = chan.write(bufs);
                     if (n == 0) {
+                    setDelayCallback.accept(bufs);
                         client.registerEvent(new WriteEvent());
                         return;
                     }
                 }
             } catch (IOException e) {
                 shutdown();
             }
         }
-    }
-
-    @Override
-    long write(ByteBuffer buffer) throws IOException {
-        if (mode != Mode.ASYNC)
-            return chan.write(buffer);
-        // async
-        synchronized(writing) {
-            int qlen = asyncOutputQ.size();
-            long n = buffer.remaining();
-            asyncOutputQ.put(buffer);
-            if (qlen == 0)
-                asyncOutput();
-            return n;
-        }
-    }
 
     @Override
     public String toString() {
         return "PlainHttpConnection: " + super.toString();
     }

@@ -303,11 +321,11 @@
             return SelectionKey.OP_WRITE;
         }
 
         @Override
         public void handle() {
-            asyncOutput();
+            flushRegistered();
         }
 
         @Override
         public void abort() {
             shutdown();

@@ -384,20 +402,15 @@
     @Override
     public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver,
             Consumer<Throwable> errorReceiver) {
         this.asyncReceiver = asyncReceiver;
         this.errorReceiver = errorReceiver;
-        asyncOutputQ = new Queue<>();
-        asyncOutputQ.registerPutCallback(this::asyncOutput);
+        this.asyncOutputQ = new AsyncWriteQueue<>(this::asyncOutput);
     }
 
     @Override
     CompletableFuture<Void> whenReceivingResponse() {
         CompletableFuture<Void> cf = new CompletableFuture<>();
-        try {
             client.registerEvent(new ReceiveResponseEvent(cf));
-        } catch (IOException e) {
-            cf.completeExceptionally(e);
-        }
         return cf;
     }
 }
< prev index next >