< prev index next >

src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java

Print this page

        

*** 24,41 **** --- 24,43 ---- */ package jdk.internal.net.http; import java.io.IOException; + import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; + import java.time.Duration; import java.util.concurrent.CompletableFuture; import jdk.internal.net.http.common.FlowTube; import jdk.internal.net.http.common.Log; import jdk.internal.net.http.common.MinimalFuture; import jdk.internal.net.http.common.Utils;
*** 51,63 **** --- 53,108 ---- protected final SocketChannel chan; private final SocketTube tube; // need SocketTube to call signalClosed(). private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading); private volatile boolean connected; private boolean closed; + private volatile ConnectTimerEvent connectTimerEvent; // may be null // should be volatile to provide proper synchronization(visibility) action + /** + * Returns a ConnectTimerEvent iff there is a connect timeout duration, + * otherwise null. + */ + private ConnectTimerEvent newConnectTimer(Exchange<?> exchange, + CompletableFuture<Void> cf) { + Duration duration = client().connectTimeout().orElse(null); + if (duration != null) { + ConnectTimerEvent cte = new ConnectTimerEvent(duration, exchange, cf); + return cte; + } + return null; + } + + final class ConnectTimerEvent extends TimeoutEvent { + private final CompletableFuture<Void> cf; + private final Exchange<?> exchange; + + ConnectTimerEvent(Duration duration, + Exchange<?> exchange, + CompletableFuture<Void> cf) { + super(duration); + this.exchange = exchange; + this.cf = cf; + } + + @Override + public void handle() { + if (debug.on()) { + debug.log("HTTP connect timed out"); + } + ConnectException ce = new ConnectException("HTTP connect timed out"); + exchange.multi.cancel(ce); + client().theExecutor().execute(() -> cf.completeExceptionally(ce)); + } + + @Override + public String toString() { + return "ConnectTimerEvent, " + super.toString(); + } + } + final class ConnectEvent extends AsyncEvent { private final CompletableFuture<Void> cf; ConnectEvent(CompletableFuture<Void> cf) { this.cf = cf;
*** 83,93 **** boolean finished = chan.finishConnect(); assert finished : "Expected channel to be connected"; if (debug.on()) debug.log("ConnectEvent: connect finished: %s Local addr: %s", finished, chan.getLocalAddress()); - connected = true; // complete async since the event runs on the SelectorManager thread cf.completeAsync(() -> null, client().theExecutor()); } catch (Throwable e) { Throwable t = Utils.toConnectException(e); client().theExecutor().execute( () -> cf.completeExceptionally(t)); --- 128,137 ----
*** 101,126 **** close(); } } @Override ! public CompletableFuture<Void> connectAsync() { CompletableFuture<Void> cf = new MinimalFuture<>(); try { assert !connected : "Already connected"; assert !chan.isBlocking() : "Unexpected blocking channel"; ! boolean finished = false; PrivilegedExceptionAction<Boolean> pa = () -> chan.connect(Utils.resolveAddress(address)); try { finished = AccessController.doPrivileged(pa); } catch (PrivilegedActionException e) { throw e.getCause(); } if (finished) { if (debug.on()) debug.log("connect finished without blocking"); - connected = true; cf.complete(null); } else { if (debug.on()) debug.log("registering connect event"); client().registerEvent(new ConnectEvent(cf)); } --- 145,177 ---- close(); } } @Override ! public CompletableFuture<Void> connectAsync(Exchange<?> exchange) { CompletableFuture<Void> cf = new MinimalFuture<>(); try { assert !connected : "Already connected"; assert !chan.isBlocking() : "Unexpected blocking channel"; ! boolean finished; ! ! connectTimerEvent = newConnectTimer(exchange, cf); ! if (connectTimerEvent != null) { ! if (debug.on()) ! debug.log("registering connect timer: " + connectTimerEvent); ! client().registerTimer(connectTimerEvent); ! } ! PrivilegedExceptionAction<Boolean> pa = () -> chan.connect(Utils.resolveAddress(address)); try { finished = AccessController.doPrivileged(pa); } catch (PrivilegedActionException e) { throw e.getCause(); } if (finished) { if (debug.on()) debug.log("connect finished without blocking"); cf.complete(null); } else { if (debug.on()) debug.log("registering connect event"); client().registerEvent(new ConnectEvent(cf)); }
*** 135,144 **** --- 186,205 ---- } return cf; } @Override + public CompletableFuture<Void> finishConnect() { + assert connected == false; + if (debug.on()) debug.log("finishConnect, setting connected=true"); + connected = true; + if (connectTimerEvent != null) + client().cancelTimer(connectTimerEvent); + return MinimalFuture.completedFuture(null); + } + + @Override SocketChannel channel() { return chan; } @Override
*** 208,217 **** --- 269,280 ---- } try { Log.logTrace("Closing: " + toString()); if (debug.on()) debug.log("Closing channel: " + client().debugInterestOps(chan)); + if (connectTimerEvent != null) + client().cancelTimer(connectTimerEvent); chan.close(); tube.signalClosed(); } catch (IOException e) { Log.logTrace("Closing resulted in " + e); }
< prev index next >