< prev index next >

src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java

Print this page

        

@@ -24,18 +24,20 @@
  */
 
 package jdk.internal.net.http;
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.StandardSocketOptions;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.security.AccessController;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import jdk.internal.net.http.common.FlowTube;
 import jdk.internal.net.http.common.Log;
 import jdk.internal.net.http.common.MinimalFuture;
 import jdk.internal.net.http.common.Utils;

@@ -51,13 +53,56 @@
     protected final SocketChannel chan;
     private final SocketTube tube; // need SocketTube to call signalClosed().
     private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
     private volatile boolean connected;
     private boolean closed;
+    private volatile ConnectTimerEvent connectTimerEvent;  // may be null
 
     // should be volatile to provide proper synchronization(visibility) action
 
+    /**
+     * Returns a ConnectTimerEvent iff there is a connect timeout duration,
+     * otherwise null.
+     */
+    private ConnectTimerEvent newConnectTimer(Exchange<?> exchange,
+                                              CompletableFuture<Void> cf) {
+        Duration duration = client().connectTimeout().orElse(null);
+        if (duration != null) {
+            ConnectTimerEvent cte = new ConnectTimerEvent(duration, exchange, cf);
+            return cte;
+        }
+        return null;
+    }
+
+    final class ConnectTimerEvent extends TimeoutEvent {
+        private final CompletableFuture<Void> cf;
+        private final Exchange<?> exchange;
+
+        ConnectTimerEvent(Duration duration,
+                          Exchange<?> exchange,
+                          CompletableFuture<Void> cf) {
+            super(duration);
+            this.exchange = exchange;
+            this.cf = cf;
+        }
+
+        @Override
+        public void handle() {
+            if (debug.on()) {
+                debug.log("HTTP connect timed out");
+            }
+            ConnectException ce = new ConnectException("HTTP connect timed out");
+            exchange.multi.cancel(ce);
+            client().theExecutor().execute(() -> cf.completeExceptionally(ce));
+        }
+
+        @Override
+        public String toString() {
+            return "ConnectTimerEvent, " + super.toString();
+        }
+    }
+
     final class ConnectEvent extends AsyncEvent {
         private final CompletableFuture<Void> cf;
 
         ConnectEvent(CompletableFuture<Void> cf) {
             this.cf = cf;

@@ -83,11 +128,10 @@
                 boolean finished = chan.finishConnect();
                 assert finished : "Expected channel to be connected";
                 if (debug.on())
                     debug.log("ConnectEvent: connect finished: %s Local addr: %s",
                               finished, chan.getLocalAddress());
-                connected = true;
                 // complete async since the event runs on the SelectorManager thread
                 cf.completeAsync(() -> null, client().theExecutor());
             } catch (Throwable e) {
                 Throwable t = Utils.toConnectException(e);
                 client().theExecutor().execute( () -> cf.completeExceptionally(t));

@@ -101,26 +145,33 @@
             close();
         }
     }
 
     @Override
-    public CompletableFuture<Void> connectAsync() {
+    public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
         CompletableFuture<Void> cf = new MinimalFuture<>();
         try {
             assert !connected : "Already connected";
             assert !chan.isBlocking() : "Unexpected blocking channel";
-            boolean finished = false;
+            boolean finished;
+
+            connectTimerEvent = newConnectTimer(exchange, cf);
+            if (connectTimerEvent != null) {
+                if (debug.on())
+                    debug.log("registering connect timer: " + connectTimerEvent);
+                client().registerTimer(connectTimerEvent);
+            }
+
             PrivilegedExceptionAction<Boolean> pa =
                     () -> chan.connect(Utils.resolveAddress(address));
             try {
                  finished = AccessController.doPrivileged(pa);
             } catch (PrivilegedActionException e) {
                throw e.getCause();
             }
             if (finished) {
                 if (debug.on()) debug.log("connect finished without blocking");
-                connected = true;
                 cf.complete(null);
             } else {
                 if (debug.on()) debug.log("registering connect event");
                 client().registerEvent(new ConnectEvent(cf));
             }

@@ -135,10 +186,20 @@
         }
         return cf;
     }
 
     @Override
+    public CompletableFuture<Void> finishConnect() {
+        assert connected == false;
+        if (debug.on()) debug.log("finishConnect, setting connected=true");
+        connected = true;
+        if (connectTimerEvent != null)
+            client().cancelTimer(connectTimerEvent);
+        return MinimalFuture.completedFuture(null);
+    }
+
+    @Override
     SocketChannel channel() {
         return chan;
     }
 
     @Override

@@ -208,10 +269,12 @@
         }
         try {
             Log.logTrace("Closing: " + toString());
             if (debug.on())
                 debug.log("Closing channel: " + client().debugInterestOps(chan));
+            if (connectTimerEvent != null)
+                client().cancelTimer(connectTimerEvent);
             chan.close();
             tube.signalClosed();
         } catch (IOException e) {
             Log.logTrace("Closing resulted in " + e);
         }
< prev index next >