--- old/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java 2018-08-07 15:13:53.000000000 +0100 +++ new/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java 2018-08-07 15:13:52.000000000 +0100 @@ -26,6 +26,7 @@ package jdk.internal.net.http; import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.channels.SelectableChannel; @@ -34,6 +35,7 @@ import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import jdk.internal.net.http.common.FlowTube; import jdk.internal.net.http.common.Log; @@ -53,9 +55,52 @@ private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading); private volatile boolean connected; private boolean closed; + private volatile ConnectTimerEvent connectTimerEvent; // may be null // should be volatile to provide proper synchronization(visibility) action + /** + * Returns a ConnectTimerEvent iff there is a connect timeout duration, + * otherwise null. + */ + private ConnectTimerEvent newConnectTimer(Exchange exchange, + CompletableFuture cf) { + Duration duration = client().connectTimeout().orElse(null); + if (duration != null) { + ConnectTimerEvent cte = new ConnectTimerEvent(duration, exchange, cf); + return cte; + } + return null; + } + + final class ConnectTimerEvent extends TimeoutEvent { + private final CompletableFuture cf; + private final Exchange exchange; + + ConnectTimerEvent(Duration duration, + Exchange exchange, + CompletableFuture cf) { + super(duration); + this.exchange = exchange; + this.cf = cf; + } + + @Override + public void handle() { + if (debug.on()) { + debug.log("HTTP connect timed out"); + } + ConnectException ce = new ConnectException("HTTP connect timed out"); + exchange.multi.cancel(ce); + client().theExecutor().execute(() -> cf.completeExceptionally(ce)); + } + + @Override + public String toString() { + return "ConnectTimerEvent, " + super.toString(); + } + } + final class ConnectEvent extends AsyncEvent { private final CompletableFuture cf; @@ -85,7 +130,6 @@ if (debug.on()) debug.log("ConnectEvent: connect finished: %s Local addr: %s", finished, chan.getLocalAddress()); - connected = true; // complete async since the event runs on the SelectorManager thread cf.completeAsync(() -> null, client().theExecutor()); } catch (Throwable e) { @@ -103,12 +147,20 @@ } @Override - public CompletableFuture connectAsync() { + public CompletableFuture connectAsync(Exchange exchange) { CompletableFuture cf = new MinimalFuture<>(); try { assert !connected : "Already connected"; assert !chan.isBlocking() : "Unexpected blocking channel"; - boolean finished = false; + boolean finished; + + connectTimerEvent = newConnectTimer(exchange, cf); + if (connectTimerEvent != null) { + if (debug.on()) + debug.log("registering connect timer: " + connectTimerEvent); + client().registerTimer(connectTimerEvent); + } + PrivilegedExceptionAction pa = () -> chan.connect(Utils.resolveAddress(address)); try { @@ -118,7 +170,6 @@ } if (finished) { if (debug.on()) debug.log("connect finished without blocking"); - connected = true; cf.complete(null); } else { if (debug.on()) debug.log("registering connect event"); @@ -137,6 +188,16 @@ } @Override + public CompletableFuture finishConnect() { + assert connected == false; + if (debug.on()) debug.log("finishConnect, setting connected=true"); + connected = true; + if (connectTimerEvent != null) + client().cancelTimer(connectTimerEvent); + return MinimalFuture.completedFuture(null); + } + + @Override SocketChannel channel() { return chan; } @@ -210,6 +271,8 @@ Log.logTrace("Closing: " + toString()); if (debug.on()) debug.log("Closing channel: " + client().debugInterestOps(chan)); + if (connectTimerEvent != null) + client().cancelTimer(connectTimerEvent); chan.close(); tube.signalClosed(); } catch (IOException e) {