< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java

Print this page

        

@@ -24,77 +24,46 @@
  */
 
 package jdk.incubator.http;
 
 import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.net.InetSocketAddress;
 import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-import jdk.incubator.http.internal.common.AsyncWriteQueue;
-import jdk.incubator.http.internal.common.ByteBufferReference;
+import jdk.incubator.http.internal.common.FlowTube;
 import jdk.incubator.http.internal.common.Log;
 import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Utils;
 
 /**
- * Plain raw TCP connection direct to destination. 2 modes
- * 1) Blocking used by http/1. In this case the connect is actually non
- *    blocking but the request is sent blocking. The first byte of a response
- *    is received non-blocking and the remainder of the response is received
- *    blocking
- * 2) Non-blocking. In this case (for http/2) the connection is actually opened
- *    blocking but all reads and writes are done non-blocking under the
- *    control of a Http2Connection object.
+ * Plain raw TCP connection direct to destination.
+ * The connection operates in asynchronous non-blocking mode.
+ * All reads and writes are done non-blocking.
  */
-class PlainHttpConnection extends HttpConnection implements AsyncConnection {
+class PlainHttpConnection extends HttpConnection {
 
+    private final Object reading = new Object();
     protected final SocketChannel chan;
+    private final FlowTube tube;
+    private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
     private volatile boolean connected;
     private boolean closed;
 
     // should be volatile to provide proper synchronization(visibility) action
-    private volatile Consumer<ByteBufferReference> asyncReceiver;
-    private volatile Consumer<Throwable> errorReceiver;
-    private volatile Supplier<ByteBufferReference> readBufferSupplier;
-    private boolean asyncReading;
-
-    private final AsyncWriteQueue asyncOutputQ = new AsyncWriteQueue(this::asyncOutput);
 
-    private final Object reading = new Object();
-
-    @Override
-    public void startReading() {
-        try {
-            synchronized(reading) {
-                asyncReading = true;
-            }
-            client.registerEvent(new ReadEvent());
-        } catch (IOException e) {
-            shutdown();
-        }
-    }
-
-    @Override
-    public void stopAsyncReading() {
-        synchronized(reading) {
-            asyncReading = false;
-        }
-        client.cancelRegistration(chan);
-    }
-
-    class ConnectEvent extends AsyncEvent {
-        CompletableFuture<Void> cf;
+    final class ConnectEvent extends AsyncEvent {
+        private final CompletableFuture<Void> cf;
 
         ConnectEvent(CompletableFuture<Void> cf) {
-            super(AsyncEvent.BLOCKING);
             this.cf = cf;
         }
 
         @Override
         public SelectableChannel channel() {

@@ -106,369 +75,225 @@
             return SelectionKey.OP_CONNECT;
         }
 
         @Override
         public void handle() {
+            assert !connected : "Already connected";
+            assert !chan.isBlocking() : "Unexpected blocking channel";
             try {
-                chan.finishConnect();
+                debug.log(Level.DEBUG, "ConnectEvent: finishing connect");
+                boolean finished = chan.finishConnect();
+                assert finished : "Expected channel to be connected";
+                debug.log(Level.DEBUG,
+                          "ConnectEvent: connect finished: %s", finished);
+                connected = true;
+                // complete async since the event runs on the SelectorManager thread
+                cf.completeAsync(() -> null, client().theExecutor());
             } catch (IOException e) {
-                cf.completeExceptionally(e);
-                return;
+                client().theExecutor().execute( () -> cf.completeExceptionally(e));
             }
-            connected = true;
-            cf.complete(null);
         }
 
         @Override
-        public void abort() {
+        public void abort(IOException ioe) {
             close();
+            client().theExecutor().execute( () -> cf.completeExceptionally(ioe));
         }
     }
 
     @Override
     public CompletableFuture<Void> connectAsync() {
-        CompletableFuture<Void> plainFuture = new MinimalFuture<>();
+        assert !connected : "Already connected";
+        assert !chan.isBlocking() : "Unexpected blocking channel";
+        CompletableFuture<Void> cf = new MinimalFuture<>();
         try {
-            chan.configureBlocking(false);
-            chan.connect(address);
-            client.registerEvent(new ConnectEvent(plainFuture));
-        } catch (IOException e) {
-            plainFuture.completeExceptionally(e);
-        }
-        return plainFuture;
+            boolean finished = false;
+            PrivilegedExceptionAction<Boolean> pa = () -> chan.connect(address);
+            try {
+                 finished = AccessController.doPrivileged(pa);
+            } catch (PrivilegedActionException e) {
+                cf.completeExceptionally(e.getCause());
     }
-
-    @Override
-    public void connect() throws IOException {
-        chan.connect(address);
+            if (finished) {
+                debug.log(Level.DEBUG, "connect finished without blocking");
         connected = true;
+                cf.complete(null);
+            } else {
+                debug.log(Level.DEBUG, "registering connect event");
+                client().registerEvent(new ConnectEvent(cf));
+            }
+        } catch (Throwable throwable) {
+            cf.completeExceptionally(throwable);
+        }
+        return cf;
     }
 
     @Override
     SocketChannel channel() {
         return chan;
     }
 
+    @Override
+    final FlowTube getConnectionFlow() {
+        return tube;
+    }
+
     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
         super(addr, client);
         try {
             this.chan = SocketChannel.open();
+            chan.configureBlocking(false);
             int bufsize = client.getReceiveBufferSize();
             chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
             chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
+            // wrap the connected channel in a Tube for async reading and writing
+            tube = new SocketTube(client(), chan, Utils::getBuffer);
         } catch (IOException e) {
             throw new InternalError(e);
         }
     }
 
     @Override
-    long write(ByteBuffer[] buffers, int start, int number) throws IOException {
-        if (getMode() != Mode.ASYNC) {
-            return chan.write(buffers, start, number);
-        }
-        // async
-        buffers = Utils.reduce(buffers, start, number);
-        long n = Utils.remaining(buffers);
-        asyncOutputQ.put(ByteBufferReference.toReferences(buffers));
-        flushAsync();
-        return n;
-    }
+    HttpPublisher publisher() { return writePublisher; }
 
-    @Override
-    long write(ByteBuffer buffer) throws IOException {
-        if (getMode() != Mode.ASYNC) {
-            return chan.write(buffer);
-        }
-        // async
-        long n = buffer.remaining();
-        asyncOutputQ.put(ByteBufferReference.toReferences(buffer));
-        flushAsync();
-        return n;
-    }
-
-    // handle registered WriteEvent; invoked from SelectorManager thread
-    void flushRegistered() {
-        if (getMode() == Mode.ASYNC) {
-            try {
-                asyncOutputQ.flushDelayed();
-            } catch (IOException e) {
-                // Only IOException caused by closed Queue is expected here
-                shutdown();
-            }
-        }
-    }
-
-    @Override
-    public void writeAsync(ByteBufferReference[] buffers) throws IOException {
-        if (getMode() != Mode.ASYNC) {
-            chan.write(ByteBufferReference.toBuffers(buffers));
-            ByteBufferReference.clear(buffers);
-        } else {
-            asyncOutputQ.put(buffers);
-        }
-    }
-
-    @Override
-    public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
-        if (getMode() != Mode.ASYNC) {
-            chan.write(ByteBufferReference.toBuffers(buffers));
-            ByteBufferReference.clear(buffers);
-        } else {
-            // Unordered frames are sent before existing frames.
-            asyncOutputQ.putFirst(buffers);
-        }
-    }
-
-    @Override
-    public void flushAsync() throws IOException {
-        if (getMode() == Mode.ASYNC) {
-            asyncOutputQ.flush();
-        }
-    }
-
-    @Override
-    public void enableCallback() {
-        // not used
-        assert false;
-    }
-
-    boolean asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
-        try {
-            ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs);
-            while (Utils.remaining(bufs) > 0) {
-                long n = chan.write(bufs);
-                if (n == 0) {
-                    delayCallback.setDelayed(refs);
-                    client.registerEvent(new WriteEvent());
-                    return false;
-                }
-            }
-            ByteBufferReference.clear(refs);
-        } catch (IOException e) {
-            shutdown();
-        }
-        return true;
-    }
 
     @Override
     public String toString() {
         return "PlainHttpConnection: " + super.toString();
     }
 
     /**
-     * Close this connection
+     * Closes this connection
      */
     @Override
     public synchronized void close() {
         if (closed) {
             return;
         }
         closed = true;
         try {
-            Log.logError("Closing: " + toString());
+            Log.logTrace("Closing: " + toString());
             chan.close();
         } catch (IOException e) {}
     }
 
     @Override
     void shutdownInput() throws IOException {
+        debug.log(Level.DEBUG, "Shutting down input");
         chan.shutdownInput();
     }
 
     @Override
     void shutdownOutput() throws IOException {
+        debug.log(Level.DEBUG, "Shutting down output");
         chan.shutdownOutput();
     }
 
-    void shutdown() {
-        close();
-        errorReceiver.accept(new IOException("Connection aborted"));
-    }
-
-    void asyncRead() {
-        synchronized (reading) {
-            try {
-                while (asyncReading) {
-                    ByteBufferReference buf = readBufferSupplier.get();
-                    int n = chan.read(buf.get());
-                    if (n == -1) {
-                        throw new IOException();
-                    }
-                    if (n == 0) {
-                        buf.clear();
-                        return;
-                    }
-                    buf.get().flip();
-                    asyncReceiver.accept(buf);
-                }
-            } catch (IOException e) {
-                shutdown();
-            }
-        }
-    }
-
-    @Override
-    protected ByteBuffer readImpl() throws IOException {
-        ByteBuffer dst = ByteBuffer.allocate(8192);
-        int n = readImpl(dst);
-        if (n > 0) {
-            return dst;
-        } else if (n == 0) {
-            return Utils.EMPTY_BYTEBUFFER;
-        } else {
-            return null;
-        }
-    }
-
-    private int readImpl(ByteBuffer buf) throws IOException {
-        int mark = buf.position();
-        int n;
-        // FIXME: this hack works in conjunction with the corresponding change
-        // in jdk.incubator.http.RawChannel.registerEvent
-        //if ((n = buffer.remaining()) != 0) {
-            //buf.put(buffer);
-        //} else {
-            n = chan.read(buf);
-        //}
-        if (n == -1) {
-            return -1;
-        }
-        Utils.flipToMark(buf, mark);
-        // String s = "Receive (" + n + " bytes) ";
-        //debugPrint(s, buf);
-        return n;
-    }
-
     @Override
     ConnectionPool.CacheKey cacheKey() {
         return new ConnectionPool.CacheKey(address, null);
     }
 
     @Override
     synchronized boolean connected() {
         return connected;
     }
 
-    // used for all output in HTTP/2
-    class WriteEvent extends AsyncEvent {
-        WriteEvent() {
-            super(0);
-        }
-
-        @Override
-        public SelectableChannel channel() {
-            return chan;
-        }
-
-        @Override
-        public int interestOps() {
-            return SelectionKey.OP_WRITE;
-        }
 
         @Override
-        public void handle() {
-            flushRegistered();
+    boolean isSecure() {
+        return false;
         }
 
         @Override
-        public void abort() {
-            shutdown();
-        }
+    boolean isProxied() {
+        return false;
     }
 
-    // used for all input in HTTP/2
-    class ReadEvent extends AsyncEvent {
-        ReadEvent() {
-            super(AsyncEvent.REPEATING); // && !BLOCKING
+    // Support for WebSocket/RawChannelImpl which unfortunately
+    // still depends on synchronous read/writes.
+    // It should be removed when RawChannelImpl moves to using asynchronous APIs.
+    private static final class PlainDetachedChannel
+            extends DetachedConnectionChannel {
+        final PlainHttpConnection plainConnection;
+        boolean closed;
+        PlainDetachedChannel(PlainHttpConnection conn) {
+            // We're handing the connection channel over to a web socket.
+            // We need the selector manager's thread to stay alive until
+            // the WebSocket is closed.
+            conn.client().webSocketOpen();
+            this.plainConnection = conn;
         }
 
         @Override
-        public SelectableChannel channel() {
-            return chan;
-        }
-
-        @Override
-        public int interestOps() {
-            return SelectionKey.OP_READ;
+        SocketChannel channel() {
+            return plainConnection.channel();
         }
 
         @Override
-        public void handle() {
-            asyncRead();
+        ByteBuffer read() throws IOException {
+            ByteBuffer dst = ByteBuffer.allocate(8192);
+            int n = readImpl(dst);
+            if (n > 0) {
+                return dst;
+            } else if (n == 0) {
+                return Utils.EMPTY_BYTEBUFFER;
+            } else {
+                return null;
         }
-
-        @Override
-        public void abort() {
-            shutdown();
         }
 
         @Override
-        public String toString() {
-            return super.toString() + "/" + chan;
+        public void close() {
+            HttpClientImpl client = plainConnection.client();
+            try {
+                plainConnection.close();
+            } finally {
+                // notify the HttpClientImpl that the websocket is no
+                // no longer operating.
+                synchronized(this) {
+                    if (closed == true) return;
+                    closed = true;
         }
+                client.webSocketClose();
     }
-
-    // used in blocking channels only
-    class ReceiveResponseEvent extends AsyncEvent {
-        CompletableFuture<Void> cf;
-
-        ReceiveResponseEvent(CompletableFuture<Void> cf) {
-            super(AsyncEvent.BLOCKING);
-            this.cf = cf;
-        }
-        @Override
-        public SelectableChannel channel() {
-            return chan;
         }
 
         @Override
-        public void handle() {
-            cf.complete(null);
+        public long write(ByteBuffer[] buffers, int start, int number)
+                throws IOException
+        {
+            return channel().write(buffers, start, number);
         }
 
         @Override
-        public int interestOps() {
-            return SelectionKey.OP_READ;
+        public void shutdownInput() throws IOException {
+            plainConnection.shutdownInput();
         }
 
         @Override
-        public void abort() {
-            close();
+        public void shutdownOutput() throws IOException {
+            plainConnection.shutdownOutput();
         }
 
-        @Override
-        public String toString() {
-            return super.toString() + "/" + chan;
-        }
+        private int readImpl(ByteBuffer buf) throws IOException {
+            int mark = buf.position();
+            int n;
+            n = channel().read(buf);
+            if (n == -1) {
+                return -1;
     }
-
-    @Override
-    boolean isSecure() {
-        return false;
+            Utils.flipToMark(buf, mark);
+            return n;
     }
-
-    @Override
-    boolean isProxied() {
-        return false;
     }
 
-    @Override
-    public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver,
-                                  Consumer<Throwable> errorReceiver,
-                                  Supplier<ByteBufferReference> readBufferSupplier) {
-        this.asyncReceiver = asyncReceiver;
-        this.errorReceiver = errorReceiver;
-        this.readBufferSupplier = readBufferSupplier;
+    // Support for WebSocket/RawChannelImpl which unfortunately
+    // still depends on synchronous read/writes.
+    // It should be removed when RawChannelImpl moves to using asynchronous APIs.
+    @Override
+    DetachedConnectionChannel detachChannel() {
+        client().cancelRegistration(channel());
+        return new PlainDetachedChannel(this);
     }
 
-    @Override
-    CompletableFuture<Void> whenReceivingResponse() {
-        CompletableFuture<Void> cf = new MinimalFuture<>();
-        try {
-            ReceiveResponseEvent evt = new ReceiveResponseEvent(cf);
-            client.registerEvent(evt);
-        } catch (IOException e) {
-            cf.completeExceptionally(e);
-        }
-        return cf;
-    }
 }
< prev index next >