< prev index next >

src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java

Print this page

        

@@ -29,24 +29,47 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 /**
- * Plain raw TCP connection direct to destination
+ * Plain raw TCP connection direct to destination. 2 modes
+ * 1) Blocking used by http/1. In this case the connect is actually non
+ *    blocking but the request is sent blocking. The first byte of a response
+ *    is received non-blocking and the remainder of the response is received
+ *    blocking
+ * 2) Non-blocking. In this case (for http/2) the connection is actually opened
+ *    blocking but all reads and writes are done non-blocking under the
+ *    control of a Http2Connection object.
  */
-class PlainHttpConnection extends HttpConnection {
+class PlainHttpConnection extends HttpConnection implements AsyncConnection {
 
     protected SocketChannel chan;
     private volatile boolean connected;
     private boolean closed;
+    Consumer<ByteBuffer> asyncReceiver;
+    Consumer<Throwable> errorReceiver;
+    Queue<ByteBuffer> asyncOutputQ;
+    final Object reading = new Object();
+    final Object writing = new Object();
 
-    class ConnectEvent extends AsyncEvent implements AsyncEvent.Blocking {
+    @Override
+    public void startReading() {
+        try {
+            client.registerEvent(new ReadEvent());
+        } catch (IOException e) {
+            shutdown();
+        }
+    }
+
+    class ConnectEvent extends AsyncEvent {
         CompletableFuture<Void> cf;
 
         ConnectEvent(CompletableFuture<Void> cf) {
+            super(AsyncEvent.BLOCKING);
             this.cf = cf;
         }
 
         @Override
         public SelectableChannel channel() {

@@ -110,18 +133,66 @@
         }
     }
 
     @Override
     long write(ByteBuffer[] buffers, int start, int number) throws IOException {
-        //debugPrint("Send", buffers, start, number);
+        if (mode != Mode.ASYNC)
         return chan.write(buffers, start, number);
+        // async
+        synchronized(writing) {
+            int qlen = asyncOutputQ.size();
+            ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
+            long n = Utils.remaining(bufs);
+            asyncOutputQ.putAll(bufs);
+            if (qlen == 0)
+                asyncOutput();
+            return n;
+        }
+    }
+
+    ByteBuffer asyncBuffer = null;
+
+    void asyncOutput() {
+        synchronized (writing) {
+            try {
+                while (true) {
+                    if (asyncBuffer == null) {
+                        asyncBuffer = asyncOutputQ.poll();
+                        if (asyncBuffer == null) {
+                            return;
+                        }
+                    }
+                    if (!asyncBuffer.hasRemaining()) {
+                        asyncBuffer = null;
+                        continue;
+                    }
+                    int n = chan.write(asyncBuffer);
+                    //System.err.printf("Written %d bytes to chan\n", n);
+                    if (n == 0) {
+                        client.registerEvent(new WriteEvent());
+                        return;
+                    }
+                }
+            } catch (IOException e) {
+                shutdown();
+            }
+        }
     }
 
     @Override
     long write(ByteBuffer buffer) throws IOException {
-        //debugPrint("Send", buffer);
+        if (mode != Mode.ASYNC)
         return chan.write(buffer);
+        // async
+        synchronized(writing) {
+            int qlen = asyncOutputQ.size();
+            long n = buffer.remaining();
+            asyncOutputQ.put(buffer);
+            if (qlen == 0)
+                asyncOutput();
+            return n;
+        }
     }
 
     @Override
     public String toString() {
         return "PlainHttpConnection: " + super.toString();

@@ -129,11 +200,11 @@
 
     /**
      * Close this connection
      */
     @Override
-    synchronized void close() {
+    public synchronized void close() {
         if (closed)
             return;
         closed = true;
         try {
             Log.logError("Closing: " + toString());

@@ -153,18 +224,53 @@
         String s = "Receive (" + n + " bytes) ";
         //debugPrint(s, buf);
         return buf;
     }
 
+    void shutdown() {
+        close();
+        errorReceiver.accept(new IOException("Connection aborted"));
+    }
+
+    void asyncRead() {
+        synchronized (reading) {
+            try {
+                while (true) {
+                    ByteBuffer buf = getBuffer();
+                    int n = chan.read(buf);
+                    //System.err.printf("Read %d bytes from chan\n", n);
+                    if (n == -1) {
+                        throw new IOException();
+                    }
+                    if (n == 0) {
+                        returnBuffer(buf);
+                        return;
+                    }
+                    buf.flip();
+                    asyncReceiver.accept(buf);
+                }
+            } catch (IOException e) {
+                shutdown();
+            }
+        }
+    }
+
     @Override
     protected int readImpl(ByteBuffer buf) throws IOException {
         int mark = buf.position();
-        int n = chan.read(buf);
+        int n;
+        // FIXME: this hack works in conjunction with the corresponding change
+        // in java.net.http.RawChannel.registerEvent
+        if ((n = buffer.remaining()) != 0) {
+            buf.put(buffer);
+        } else {
+            n = chan.read(buf);
+        }
         if (n == -1) {
             return -1;
         }
-        Utils.flipToMark(buffer, mark);
+        Utils.flipToMark(buf, mark);
         String s = "Receive (" + n + " bytes) ";
         //debugPrint(s, buf);
         return n;
     }
 

@@ -176,14 +282,71 @@
     @Override
     synchronized boolean connected() {
         return connected;
     }
 
-    class ReceiveResponseEvent extends AsyncEvent implements AsyncEvent.Blocking {
+    // used for all output in HTTP/2
+    class WriteEvent extends AsyncEvent {
+        WriteEvent() {
+            super(0);
+        }
+
+        @Override
+        public SelectableChannel channel() {
+            return chan;
+        }
+
+        @Override
+        public int interestOps() {
+            return SelectionKey.OP_WRITE;
+        }
+
+        @Override
+        public void handle() {
+            asyncOutput();
+        }
+
+        @Override
+        public void abort() {
+            shutdown();
+        }
+    }
+
+    // used for all input in HTTP/2
+    class ReadEvent extends AsyncEvent {
+        ReadEvent() {
+            super(AsyncEvent.REPEATING); // && !BLOCKING
+        }
+
+        @Override
+        public SelectableChannel channel() {
+            return chan;
+        }
+
+        @Override
+        public int interestOps() {
+            return SelectionKey.OP_READ;
+        }
+
+        @Override
+        public void handle() {
+            asyncRead();
+        }
+
+        @Override
+        public void abort() {
+            shutdown();
+        }
+
+    }
+
+    // used in blocking channels only
+    class ReceiveResponseEvent extends AsyncEvent {
         CompletableFuture<Void> cf;
 
         ReceiveResponseEvent(CompletableFuture<Void> cf) {
+            super(AsyncEvent.BLOCKING);
             this.cf = cf;
         }
         @Override
         public SelectableChannel channel() {
             return chan;

@@ -214,10 +377,19 @@
     boolean isProxied() {
         return false;
     }
 
     @Override
+    public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver,
+            Consumer<Throwable> errorReceiver) {
+        this.asyncReceiver = asyncReceiver;
+        this.errorReceiver = errorReceiver;
+        asyncOutputQ = new Queue<>();
+        asyncOutputQ.registerPutCallback(this::asyncOutput);
+    }
+
+    @Override
     CompletableFuture<Void> whenReceivingResponse() {
         CompletableFuture<Void> cf = new CompletableFuture<>();
         try {
             client.registerEvent(new ReceiveResponseEvent(cf));
         } catch (IOException e) {
< prev index next >