< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java
Print this page
*** 26,110 ****
package jdk.incubator.http;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.Authenticator;
! import java.net.CookieManager;
import java.net.ProxySelector;
! import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Stream;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.websocket.BuilderImpl;
/**
* Client implementation. Contains all configuration information and also
* the selector manager thread which allows async events to be registered
* and delivered when they occur. See AsyncEvent.
*/
class HttpClientImpl extends HttpClient {
// Define the default factory as a static inner class
// that embeds all the necessary logic to avoid
// the risk of using a lambda that might keep a reference on the
// HttpClient instance from which it was created (helps with
// heapdump analysis).
private static final class DefaultThreadFactory implements ThreadFactory {
! private DefaultThreadFactory() {}
@Override
public Thread newThread(Runnable r) {
! Thread t = new Thread(null, r, "HttpClient_worker", 0, true);
t.setDaemon(true);
return t;
}
- static final ThreadFactory INSTANCE = new DefaultThreadFactory();
}
! private final CookieManager cookieManager;
private final Redirect followRedirects;
private final ProxySelector proxySelector;
private final Authenticator authenticator;
private final Version version;
private final ConnectionPool connections;
private final Executor executor;
// Security parameters
private final SSLContext sslContext;
private final SSLParameters sslParams;
private final SelectorManager selmgr;
private final FilterFactory filters;
private final Http2ClientImpl client2;
/** A Set of, deadline first, ordered timeout events. */
private final TreeSet<TimeoutEvent> timeouts;
! public static HttpClientImpl create(HttpClientBuilderImpl builder) {
! HttpClientImpl impl = new HttpClientImpl(builder);
impl.start();
! return impl;
}
! private HttpClientImpl(HttpClientBuilderImpl builder) {
if (builder.sslContext == null) {
try {
sslContext = SSLContext.getDefault();
} catch (NoSuchAlgorithmException ex) {
throw new InternalError(ex);
--- 26,214 ----
package jdk.incubator.http;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.io.IOException;
+ import java.lang.System.Logger.Level;
import java.lang.ref.WeakReference;
import java.net.Authenticator;
! import java.net.CookieHandler;
import java.net.ProxySelector;
! import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
+ import java.security.AccessControlContext;
+ import java.security.AccessController;
import java.security.NoSuchAlgorithmException;
+ import java.security.PrivilegedAction;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
+ import jdk.incubator.http.HttpResponse.BodyHandler;
+ import jdk.incubator.http.HttpResponse.MultiSubscriber;
import jdk.incubator.http.internal.common.Log;
+ import jdk.incubator.http.internal.common.Pair;
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.websocket.BuilderImpl;
+ import jdk.internal.misc.InnocuousThread;
/**
* Client implementation. Contains all configuration information and also
* the selector manager thread which allows async events to be registered
* and delivered when they occur. See AsyncEvent.
*/
class HttpClientImpl extends HttpClient {
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ static final boolean DEBUGELAPSED = Utils.TESTING || DEBUG; // Revisit: temporary dev flag.
+ static final boolean DEBUGTIMEOUT = false; // Revisit: temporary dev flag.
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+ final System.Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
+ final System.Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
+ static final AtomicLong CLIENT_IDS = new AtomicLong();
+
// Define the default factory as a static inner class
// that embeds all the necessary logic to avoid
// the risk of using a lambda that might keep a reference on the
// HttpClient instance from which it was created (helps with
// heapdump analysis).
private static final class DefaultThreadFactory implements ThreadFactory {
! private final String namePrefix;
! private final AtomicInteger nextId = new AtomicInteger();
!
! DefaultThreadFactory(long clientID) {
! namePrefix = "HttpClient-" + clientID + "-Worker-";
! }
!
@Override
public Thread newThread(Runnable r) {
! String name = namePrefix + nextId.getAndIncrement();
! Thread t;
! if (System.getSecurityManager() == null) {
! t = new Thread(null, r, name, 0, false);
! } else {
! t = InnocuousThread.newThread(name, r);
! }
t.setDaemon(true);
return t;
}
}
! private final CookieHandler cookieHandler;
private final Redirect followRedirects;
+ private final Optional<ProxySelector> userProxySelector;
private final ProxySelector proxySelector;
private final Authenticator authenticator;
private final Version version;
private final ConnectionPool connections;
private final Executor executor;
+ private final boolean isDefaultExecutor;
// Security parameters
private final SSLContext sslContext;
private final SSLParameters sslParams;
private final SelectorManager selmgr;
private final FilterFactory filters;
private final Http2ClientImpl client2;
+ private final long id;
+ private final String dbgTag;
+
+ // This reference is used to keep track of the facade HttpClient
+ // that was returned to the application code.
+ // It makes it possible to know when the application no longer
+ // holds any reference to the HttpClient.
+ // Unfortunately, this information is not enough to know when
+ // to exit the SelectorManager thread. Because of the asynchronous
+ // nature of the API, we also need to wait until all pending operations
+ // have completed.
+ private final WeakReference<HttpClientFacade> facadeRef;
+
+ // This counter keeps track of the number of operations pending
+ // on the HttpClient. The SelectorManager thread will wait
+ // until there are no longer any pending operations and the
+ // facadeRef is cleared before exiting.
+ //
+ // The pendingOperationCount is incremented every time a send/sendAsync
+ // operation is invoked on the HttpClient, and is decremented when
+ // the HttpResponse<T> object is returned to the user.
+ // However, at this point, the body may not have been fully read yet.
+ // This is the case when the response T is implemented as a streaming
+ // subscriber (such as an InputStream).
+ //
+ // To take care of this issue the pendingOperationCount will additionally
+ // be incremented/decremented in the following cases:
+ //
+ // 1. For HTTP/2 it is incremented when a stream is added to the
+ // Http2Connection streams map, and decreased when the stream is removed
+ // from the map. This should also take care of push promises.
+ // 2. For WebSocket the count is increased when creating a
+ // DetachedConnectionChannel for the socket, and decreased
+ // when the the channel is closed.
+ // In addition, the HttpClient facade is passed to the WebSocket builder,
+ // (instead of the client implementation delegate).
+ // 3. For HTTP/1.1 the count is incremented before starting to parse the body
+ // response, and decremented when the parser has reached the end of the
+ // response body flow.
+ //
+ // This should ensure that the selector manager thread remains alive until
+ // the response has been fully received or the web socket is closed.
+ private final AtomicLong pendingOperationCount = new AtomicLong();
+ private final AtomicLong pendingWebSocketCount = new AtomicLong();
+ private final AtomicLong pendingHttpRequestCount = new AtomicLong();
/** A Set of, deadline first, ordered timeout events. */
private final TreeSet<TimeoutEvent> timeouts;
! /**
! * This is a bit tricky:
! * 1. an HttpClientFacade has a final HttpClientImpl field.
! * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
! * where the referent is the facade created for that instance.
! * 3. We cannot just create the HttpClientFacade in the HttpClientImpl
! * constructor, because it would be only weakly referenced and could
! * be GC'ed before we can return it.
! * The solution is to use an instance of SingleFacadeFactory which will
! * allow the caller of new HttpClientImpl(...) to retrieve the facade
! * after the HttpClientImpl has been created.
! */
! private static final class SingleFacadeFactory {
! HttpClientFacade facade;
! HttpClientFacade createFacade(HttpClientImpl impl) {
! assert facade == null;
! return (facade = new HttpClientFacade(impl));
! }
! }
!
! static HttpClientFacade create(HttpClientBuilderImpl builder) {
! SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
! HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
impl.start();
! assert facadeFactory.facade != null;
! assert impl.facadeRef.get() == facadeFactory.facade;
! return facadeFactory.facade;
}
! private HttpClientImpl(HttpClientBuilderImpl builder,
! SingleFacadeFactory facadeFactory) {
! id = CLIENT_IDS.incrementAndGet();
! dbgTag = "HttpClientImpl(" + id +")";
if (builder.sslContext == null) {
try {
sslContext = SSLContext.getDefault();
} catch (NoSuchAlgorithmException ex) {
throw new InternalError(ex);
*** 112,131 ****
} else {
sslContext = builder.sslContext;
}
Executor ex = builder.executor;
if (ex == null) {
! ex = Executors.newCachedThreadPool(DefaultThreadFactory.INSTANCE);
} else {
ex = builder.executor;
}
client2 = new Http2ClientImpl(this);
executor = ex;
! cookieManager = builder.cookieManager;
followRedirects = builder.followRedirects == null ?
Redirect.NEVER : builder.followRedirects;
! this.proxySelector = builder.proxy;
authenticator = builder.authenticator;
if (builder.version == null) {
version = HttpClient.Version.HTTP_2;
} else {
version = builder.version;
--- 216,242 ----
} else {
sslContext = builder.sslContext;
}
Executor ex = builder.executor;
if (ex == null) {
! ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
! isDefaultExecutor = true;
} else {
ex = builder.executor;
+ isDefaultExecutor = false;
}
+ facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
client2 = new Http2ClientImpl(this);
executor = ex;
! cookieHandler = builder.cookieHandler;
followRedirects = builder.followRedirects == null ?
Redirect.NEVER : builder.followRedirects;
! this.userProxySelector = Optional.ofNullable(builder.proxy);
! this.proxySelector = userProxySelector
! .orElseGet(HttpClientImpl::getDefaultProxySelector);
! debug.log(Level.DEBUG, "proxySelector is %s (user-supplied=%s)",
! this.proxySelector, userProxySelector.isPresent());
authenticator = builder.authenticator;
if (builder.version == null) {
version = HttpClient.Version.HTTP_2;
} else {
version = builder.version;
*** 133,143 ****
if (builder.sslParams == null) {
sslParams = getDefaultParams(sslContext);
} else {
sslParams = builder.sslParams;
}
! connections = new ConnectionPool();
connections.start();
timeouts = new TreeSet<>();
try {
selmgr = new SelectorManager(this);
} catch (IOException e) {
--- 244,254 ----
if (builder.sslParams == null) {
sslParams = getDefaultParams(sslContext);
} else {
sslParams = builder.sslParams;
}
! connections = new ConnectionPool(id);
connections.start();
timeouts = new TreeSet<>();
try {
selmgr = new SelectorManager(this);
} catch (IOException e) {
*** 145,178 ****
throw new InternalError(e);
}
selmgr.setDaemon(true);
filters = new FilterFactory();
initFilters();
}
private void start() {
selmgr.start();
}
private static SSLParameters getDefaultParams(SSLContext ctx) {
SSLParameters params = ctx.getSupportedSSLParameters();
params.setProtocols(new String[]{"TLSv1.2"});
return params;
}
/**
! * Wait for activity on given exchange (assuming blocking = false).
! * It's a no-op if blocking = true. In particular, the following occurs
! * in the SelectorManager thread.
*
! * 1) mark the connection non-blocking
! * 2) add to selector
! * 3) If selector fires for this exchange then
! * 4) - mark connection as blocking
! * 5) - call AsyncEvent.handle()
*
! * If exchange needs to block again, then call registerEvent() again
*/
void registerEvent(AsyncEvent exchange) throws IOException {
selmgr.register(exchange);
}
--- 256,361 ----
throw new InternalError(e);
}
selmgr.setDaemon(true);
filters = new FilterFactory();
initFilters();
+ assert facadeRef.get() != null;
}
private void start() {
selmgr.start();
}
+ // Called from the SelectorManager thread, just before exiting.
+ // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections
+ // that may be still lingering there are properly closed (and their
+ // possibly still opened SocketChannel released).
+ private void stop() {
+ // Clears HTTP/1.1 cache and close its connections
+ connections.stop();
+ // Clears HTTP/2 cache and close its connections.
+ client2.stop();
+ }
+
private static SSLParameters getDefaultParams(SSLContext ctx) {
SSLParameters params = ctx.getSupportedSSLParameters();
params.setProtocols(new String[]{"TLSv1.2"});
return params;
}
+ private static ProxySelector getDefaultProxySelector() {
+ PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;
+ return AccessController.doPrivileged(action);
+ }
+
+ // Returns the facade that was returned to the application code.
+ // May be null if that facade is no longer referenced.
+ final HttpClientFacade facade() {
+ return facadeRef.get();
+ }
+
+ // Increments the pendingOperationCount.
+ final long reference() {
+ pendingHttpRequestCount.incrementAndGet();
+ return pendingOperationCount.incrementAndGet();
+ }
+
+ // Decrements the pendingOperationCount.
+ final long unreference() {
+ final long count = pendingOperationCount.decrementAndGet();
+ final long httpCount = pendingHttpRequestCount.decrementAndGet();
+ final long webSocketCount = pendingWebSocketCount.get();
+ if (count == 0 && facade() == null) {
+ selmgr.wakeupSelector();
+ }
+ assert httpCount >= 0 : "count of HTTP operations < 0";
+ assert webSocketCount >= 0 : "count of WS operations < 0";
+ assert count >= 0 : "count of pending operations < 0";
+ return count;
+ }
+
+ // Increments the pendingOperationCount.
+ final long webSocketOpen() {
+ pendingWebSocketCount.incrementAndGet();
+ return pendingOperationCount.incrementAndGet();
+ }
+
+ // Decrements the pendingOperationCount.
+ final long webSocketClose() {
+ final long count = pendingOperationCount.decrementAndGet();
+ final long webSocketCount = pendingWebSocketCount.decrementAndGet();
+ final long httpCount = pendingHttpRequestCount.get();
+ if (count == 0 && facade() == null) {
+ selmgr.wakeupSelector();
+ }
+ assert httpCount >= 0 : "count of HTTP operations < 0";
+ assert webSocketCount >= 0 : "count of WS operations < 0";
+ assert count >= 0 : "count of pending operations < 0";
+ return count;
+ }
+
+ // Returns the pendingOperationCount.
+ final long referenceCount() {
+ return pendingOperationCount.get();
+ }
+
+ // Called by the SelectorManager thread to figure out whether it's time
+ // to terminate.
+ final boolean isReferenced() {
+ HttpClient facade = facade();
+ return facade != null || referenceCount() > 0;
+ }
+
/**
! * Wait for activity on given exchange.
! * The following occurs in the SelectorManager thread.
*
! * 1) add to selector
! * 2) If selector fires for this exchange then
! * call AsyncEvent.handle()
*
! * If exchange needs to change interest ops, then call registerEvent() again.
*/
void registerEvent(AsyncEvent exchange) throws IOException {
selmgr.register(exchange);
}
*** 182,316 ****
*/
void cancelRegistration(SocketChannel s) {
selmgr.cancel(s);
}
!
! Http2ClientImpl client2() {
! return client2;
! }
!
! /*
! @Override
! public ByteBuffer getBuffer() {
! return pool.getBuffer();
}
! // SSL buffers are larger. Manage separately
!
! int size = 16 * 1024;
!
! ByteBuffer getSSLBuffer() {
! return ByteBuffer.allocate(size);
}
! /**
! * Return a new buffer that's a bit bigger than the given one
! *
! * @param buf
! * @return
! *
! ByteBuffer reallocSSLBuffer(ByteBuffer buf) {
! size = buf.capacity() * 12 / 10; // 20% bigger
! return ByteBuffer.allocate(size);
}
! synchronized void returnSSLBuffer(ByteBuffer buf) {
! if (buf.capacity() >= size)
! sslBuffers.add(0, buf);
}
-
- @Override
- public void returnBuffer(ByteBuffer buffer) {
- pool.returnBuffer(buffer);
}
- */
@Override
public <T> HttpResponse<T>
! send(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
throws IOException, InterruptedException
{
! MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
! return mex.response();
}
@Override
public <T> CompletableFuture<HttpResponse<T>>
! sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
{
! MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
! return mex.responseAsync()
! .thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b);
! }
!
! @Override
! public <U, T> CompletableFuture<U>
! sendAsync(HttpRequest req, HttpResponse.MultiProcessor<U, T> responseHandler) {
! MultiExchange<U,T> mex = new MultiExchange<>(req, this, responseHandler);
! return mex.multiResponseAsync();
! }
!
! // new impl. Should get rid of above
! /*
! static class BufferPool implements BufferHandler {
! final LinkedList<ByteBuffer> freelist = new LinkedList<>();
!
! @Override
! public synchronized ByteBuffer getBuffer() {
! ByteBuffer buf;
! while (!freelist.isEmpty()) {
! buf = freelist.removeFirst();
! buf.clear();
! return buf;
}
- return ByteBuffer.allocate(BUFSIZE);
}
@Override
! public synchronized void returnBuffer(ByteBuffer buffer) {
! assert buffer.capacity() > 0;
! freelist.add(buffer);
! }
! }
! static BufferPool pool = new BufferPool();
! static BufferHandler pool() {
! return pool;
}
! */
// Main loop for this client's selector
private final static class SelectorManager extends Thread {
! private static final long NODEADLINE = 3000L;
private final Selector selector;
private volatile boolean closed;
- private final List<AsyncEvent> readyList;
private final List<AsyncEvent> registrations;
!
! // Uses a weak reference to the HttpClient owning this
! // selector: a strong reference prevents its garbage
! // collection while the thread is running.
! // We want the thread to exit gracefully when the
! // HttpClient that owns it gets GC'ed.
! WeakReference<HttpClientImpl> ownerRef;
SelectorManager(HttpClientImpl ref) throws IOException {
! super(null, null, "SelectorManager", 0, false);
! ownerRef = new WeakReference<>(ref);
! readyList = new ArrayList<>();
registrations = new ArrayList<>();
selector = Selector.open();
}
// This returns immediately. So caller not allowed to send/receive
// on connection.
!
! synchronized void register(AsyncEvent e) throws IOException {
registrations.add(e);
selector.wakeup();
}
synchronized void cancel(SocketChannel e) {
--- 365,564 ----
*/
void cancelRegistration(SocketChannel s) {
selmgr.cancel(s);
}
! /**
! * Allows an AsyncEvent to modify its interestOps.
! * @param event The modified event.
! */
! void eventUpdated(AsyncEvent event) throws ClosedChannelException {
! assert !(event instanceof AsyncTriggerEvent);
! selmgr.eventUpdated(event);
}
! boolean isSelectorThread() {
! return Thread.currentThread() == selmgr;
}
! Http2ClientImpl client2() {
! return client2;
}
! private void debugCompleted(String tag, long startNanos, HttpRequest req) {
! if (debugelapsed.isLoggable(Level.DEBUG)) {
! debugelapsed.log(Level.DEBUG, () -> tag + " elapsed "
! + (System.nanoTime() - startNanos)/1000_000L
! + " millis for " + req.method()
! + " to " + req.uri());
}
}
@Override
public <T> HttpResponse<T>
! send(HttpRequest req, BodyHandler<T> responseHandler)
throws IOException, InterruptedException
{
! try {
! return sendAsync(req, responseHandler).get();
! } catch (ExecutionException e) {
! Throwable t = e.getCause();
! if (t instanceof Error)
! throw (Error)t;
! if (t instanceof RuntimeException)
! throw (RuntimeException)t;
! else if (t instanceof IOException)
! throw Utils.getIOException(t);
! else
! throw new InternalError("Unexpected exception", t);
! }
}
@Override
public <T> CompletableFuture<HttpResponse<T>>
! sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
{
! AccessControlContext acc = null;
! if (System.getSecurityManager() != null)
! acc = AccessController.getContext();
!
! // Clone the, possibly untrusted, HttpRequest
! HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc);
! if (requestImpl.method().equals("CONNECT"))
! throw new IllegalArgumentException("Unsupported method CONNECT");
! long start = DEBUGELAPSED ? System.nanoTime() : 0;
! reference();
! try {
! debugelapsed.log(Level.DEBUG, "ClientImpl (async) send %s", userRequest);
! MultiExchange<Void,T> mex = new MultiExchange<>(userRequest,
! requestImpl,
! this,
! responseHandler,
! acc);
! CompletableFuture<HttpResponse<T>> res =
! mex.responseAsync().whenComplete((b,t) -> unreference());
! if (DEBUGELAPSED) {
! res = res.whenComplete(
! (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
! }
! // makes sure that any dependent actions happen in the executor
! if (acc != null) {
! res.whenCompleteAsync((r, t) -> { /* do nothing */},
! new PrivilegedExecutor(executor, acc));
! }
!
! return res;
! } catch(Throwable t) {
! unreference();
! debugCompleted("ClientImpl (async)", start, userRequest);
! throw t;
}
}
@Override
! public <U, T> CompletableFuture<U>
! sendAsync(HttpRequest userRequest, MultiSubscriber<U, T> responseHandler) {
! AccessControlContext acc = null;
! if (System.getSecurityManager() != null)
! acc = AccessController.getContext();
!
! // Clone the, possibly untrusted, HttpRequest
! HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc);
! if (requestImpl.method().equals("CONNECT"))
! throw new IllegalArgumentException("Unsupported method CONNECT");
! long start = DEBUGELAPSED ? System.nanoTime() : 0;
! reference();
! try {
! debugelapsed.log(Level.DEBUG, "ClientImpl (async) send multi %s", userRequest);
! MultiExchange<U,T> mex = new MultiExchange<>(userRequest,
! requestImpl,
! this,
! responseHandler,
! acc);
! CompletableFuture<U> res = mex.multiResponseAsync()
! .whenComplete((b,t) -> unreference());
! if (DEBUGELAPSED) {
! res = res.whenComplete(
! (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
! }
! // makes sure that any dependent actions happen in the executor
! if (acc != null) {
! res.whenCompleteAsync((r, t) -> { /* do nothing */},
! new PrivilegedExecutor(executor, acc));
! }
!
! return res;
! } catch(Throwable t) {
! unreference();
! debugCompleted("ClientImpl (async)", start, userRequest);
! throw t;
! }
}
!
// Main loop for this client's selector
private final static class SelectorManager extends Thread {
! // For testing purposes we have an internal System property that
! // can control the frequency at which the selector manager will wake
! // up when there are no pending operations.
! // Increasing the frequency (shorter delays) might allow the selector
! // to observe that the facade is no longer referenced and might allow
! // the selector thread to terminate more timely - for when nothing is
! // ongoing it will only check for that condition every NODEADLINE ms.
! // To avoid misuse of the property, the delay that can be specified
! // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default
! // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms
! // The property is -Djdk.httpclient.internal.selector.timeout=<millis>
! private static final int MIN_NODEADLINE = 1000; // ms
! private static final int MAX_NODEADLINE = 1000 * 1200; // ms
! private static final int DEF_NODEADLINE = 3000; // ms
! private static final long NODEADLINE; // default is DEF_NODEADLINE ms
! static {
! // ensure NODEADLINE is initialized with some valid value.
! long deadline = Utils.getIntegerNetProperty(
! "jdk.httpclient.internal.selector.timeout",
! DEF_NODEADLINE); // millis
! if (deadline <= 0) deadline = DEF_NODEADLINE;
! deadline = Math.max(deadline, MIN_NODEADLINE);
! NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
! }
!
private final Selector selector;
private volatile boolean closed;
private final List<AsyncEvent> registrations;
! private final System.Logger debug;
! private final System.Logger debugtimeout;
! HttpClientImpl owner;
! ConnectionPool pool;
SelectorManager(HttpClientImpl ref) throws IOException {
! super(null, null, "HttpClient-" + ref.id + "-SelectorManager", 0, false);
! owner = ref;
! debug = ref.debug;
! debugtimeout = ref.debugtimeout;
! pool = ref.connectionPool();
registrations = new ArrayList<>();
selector = Selector.open();
}
+ void eventUpdated(AsyncEvent e) throws ClosedChannelException {
+ if (Thread.currentThread() == this) {
+ SelectionKey key = e.channel().keyFor(selector);
+ SelectorAttachment sa = (SelectorAttachment) key.attachment();
+ if (sa != null) sa.register(e);
+ } else {
+ register(e);
+ }
+ }
+
// This returns immediately. So caller not allowed to send/receive
// on connection.
! synchronized void register(AsyncEvent e) {
registrations.add(e);
selector.wakeup();
}
synchronized void cancel(SocketChannel e) {
*** 324,443 ****
void wakeupSelector() {
selector.wakeup();
}
synchronized void shutdown() {
closed = true;
try {
selector.close();
! } catch (IOException ignored) { }
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
- HttpClientImpl client;
synchronized (this) {
! for (AsyncEvent exchange : registrations) {
! SelectableChannel c = exchange.channel();
try {
! c.configureBlocking(false);
! SelectionKey key = c.keyFor(selector);
SelectorAttachment sa;
if (key == null || !key.isValid()) {
if (key != null) {
// key is canceled.
// invoke selectNow() to purge it
// before registering the new event.
selector.selectNow();
}
! sa = new SelectorAttachment(c, selector);
} else {
sa = (SelectorAttachment) key.attachment();
}
! sa.register(exchange);
} catch (IOException e) {
! Log.logError("HttpClientImpl: " + e);
! c.close();
! // let the exchange deal with it
! handleEvent(exchange);
}
}
registrations.clear();
}
// Check whether client is still alive, and if not,
// gracefully stop this thread
! if ((client = ownerRef.get()) == null) {
Log.logTrace("HttpClient no longer referenced. Exiting...");
return;
}
- long millis = client.purgeTimeoutsAndReturnNextDeadline();
- client = null; // don't hold onto the client ref
! //debugPrint(selector);
// Don't wait for ever as it might prevent the thread to
// stop gracefully. millis will be 0 if no deadline was found.
int n = selector.select(millis == 0 ? NODEADLINE : millis);
if (n == 0) {
// Check whether client is still alive, and if not,
// gracefully stop this thread
! if ((client = ownerRef.get()) == null) {
Log.logTrace("HttpClient no longer referenced. Exiting...");
return;
}
! client.purgeTimeoutsAndReturnNextDeadline();
! client = null; // don't hold onto the client ref
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
SelectorAttachment sa = (SelectorAttachment) key.attachment();
! int eventsOccurred = key.readyOps();
sa.events(eventsOccurred).forEach(readyList::add);
sa.resetInterestOps(eventsOccurred);
}
selector.selectNow(); // complete cancellation
selector.selectedKeys().clear();
! for (AsyncEvent exchange : readyList) {
! if (exchange.blocking()) {
! exchange.channel().configureBlocking(true);
! }
! handleEvent(exchange); // will be delegated to executor
}
readyList.clear();
}
} catch (Throwable e) {
if (!closed) {
// This terminates thread. So, better just print stack trace
String err = Utils.stackTrace(e);
Log.logError("HttpClientImpl: fatal error: " + err);
}
} finally {
shutdown();
}
}
! void debugPrint(Selector selector) {
! System.err.println("Selector: debugprint start");
! Set<SelectionKey> keys = selector.keys();
! for (SelectionKey key : keys) {
! SelectableChannel c = key.channel();
! int ops = key.interestOps();
! System.err.printf("selector chan:%s ops:%d\n", c, ops);
! }
! System.err.println("Selector: debugprint end");
! }
!
! void handleEvent(AsyncEvent e) {
! if (closed) {
! e.abort();
} else {
! e.handle();
}
}
}
/**
--- 572,774 ----
void wakeupSelector() {
selector.wakeup();
}
synchronized void shutdown() {
+ debug.log(Level.DEBUG, "SelectorManager shutting down");
closed = true;
try {
selector.close();
! } catch (IOException ignored) {
! } finally {
! owner.stop();
! }
}
@Override
public void run() {
+ List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
+ List<AsyncEvent> readyList = new ArrayList<>();
try {
while (!Thread.currentThread().isInterrupted()) {
synchronized (this) {
! assert errorList.isEmpty();
! assert readyList.isEmpty();
! for (AsyncEvent event : registrations) {
! if (event instanceof AsyncTriggerEvent) {
! readyList.add(event);
! continue;
! }
! SelectableChannel chan = event.channel();
! SelectionKey key = null;
try {
! key = chan.keyFor(selector);
SelectorAttachment sa;
if (key == null || !key.isValid()) {
if (key != null) {
// key is canceled.
// invoke selectNow() to purge it
// before registering the new event.
selector.selectNow();
}
! sa = new SelectorAttachment(chan, selector);
} else {
sa = (SelectorAttachment) key.attachment();
}
! // may throw IOE if channel closed: that's OK
! sa.register(event);
! if (!chan.isOpen()) {
! throw new IOException("Channel closed");
! }
} catch (IOException e) {
! Log.logTrace("HttpClientImpl: " + e);
! debug.log(Level.DEBUG, () ->
! "Got " + e.getClass().getName()
! + " while handling"
! + " registration events");
! chan.close();
! // let the event abort deal with it
! errorList.add(new Pair<>(event, e));
! if (key != null) {
! key.cancel();
! selector.selectNow();
! }
}
}
registrations.clear();
+ selector.selectedKeys().clear();
+ }
+
+ for (AsyncEvent event : readyList) {
+ assert event instanceof AsyncTriggerEvent;
+ event.handle();
}
+ readyList.clear();
+
+ for (Pair<AsyncEvent,IOException> error : errorList) {
+ // an IOException was raised and the channel closed.
+ handleEvent(error.first, error.second);
+ }
+ errorList.clear();
// Check whether client is still alive, and if not,
// gracefully stop this thread
! if (!owner.isReferenced()) {
Log.logTrace("HttpClient no longer referenced. Exiting...");
return;
}
! // Timeouts will have milliseconds granularity. It is important
! // to handle them in a timely fashion.
! long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
! debugtimeout.log(Level.DEBUG, "next timeout: %d", nextTimeout);
!
! // Keep-alive have seconds granularity. It's not really an
! // issue if we keep connections linger a bit more in the keep
! // alive cache.
! long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
! debugtimeout.log(Level.DEBUG, "next expired: %d", nextExpiry);
!
! assert nextTimeout >= 0;
! assert nextExpiry >= 0;
!
// Don't wait for ever as it might prevent the thread to
// stop gracefully. millis will be 0 if no deadline was found.
+ if (nextTimeout <= 0) nextTimeout = NODEADLINE;
+
+ // Clip nextExpiry at NODEADLINE limit. The default
+ // keep alive is 1200 seconds (half an hour) - we don't
+ // want to wait that long.
+ if (nextExpiry <= 0) nextExpiry = NODEADLINE;
+ else nextExpiry = Math.min(NODEADLINE, nextExpiry);
+
+ // takes the least of the two.
+ long millis = Math.min(nextExpiry, nextTimeout);
+
+ debugtimeout.log(Level.DEBUG, "Next deadline is %d",
+ (millis == 0 ? NODEADLINE : millis));
+ //debugPrint(selector);
int n = selector.select(millis == 0 ? NODEADLINE : millis);
if (n == 0) {
// Check whether client is still alive, and if not,
// gracefully stop this thread
! if (!owner.isReferenced()) {
Log.logTrace("HttpClient no longer referenced. Exiting...");
return;
}
! owner.purgeTimeoutsAndReturnNextDeadline();
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
+ assert errorList.isEmpty();
for (SelectionKey key : keys) {
SelectorAttachment sa = (SelectorAttachment) key.attachment();
! if (!key.isValid()) {
! IOException ex = sa.chan.isOpen()
! ? new IOException("Invalid key")
! : new ClosedChannelException();
! sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
! sa.pending.clear();
! continue;
! }
!
! int eventsOccurred;
! try {
! eventsOccurred = key.readyOps();
! } catch (CancelledKeyException ex) {
! IOException io = Utils.getIOException(ex);
! sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
! sa.pending.clear();
! continue;
! }
sa.events(eventsOccurred).forEach(readyList::add);
sa.resetInterestOps(eventsOccurred);
}
selector.selectNow(); // complete cancellation
selector.selectedKeys().clear();
! for (AsyncEvent event : readyList) {
! handleEvent(event, null); // will be delegated to executor
}
readyList.clear();
+ errorList.forEach((p) -> handleEvent(p.first, p.second));
+ errorList.clear();
}
} catch (Throwable e) {
+ //e.printStackTrace();
if (!closed) {
// This terminates thread. So, better just print stack trace
String err = Utils.stackTrace(e);
Log.logError("HttpClientImpl: fatal error: " + err);
}
+ debug.log(Level.DEBUG, "shutting down", e);
+ if (Utils.ASSERTIONSENABLED && !debug.isLoggable(Level.DEBUG)) {
+ e.printStackTrace(System.err); // always print the stack
+ }
} finally {
shutdown();
}
}
! // void debugPrint(Selector selector) {
! // System.err.println("Selector: debugprint start");
! // Set<SelectionKey> keys = selector.keys();
! // for (SelectionKey key : keys) {
! // SelectableChannel c = key.channel();
! // int ops = key.interestOps();
! // System.err.printf("selector chan:%s ops:%d\n", c, ops);
! // }
! // System.err.println("Selector: debugprint end");
! // }
!
! /** Handles the given event. The given ioe may be null. */
! void handleEvent(AsyncEvent event, IOException ioe) {
! if (closed || ioe != null) {
! event.abort(ioe);
} else {
! event.handle();
}
}
}
/**
*** 451,465 ****
* connection.
*/
private static class SelectorAttachment {
private final SelectableChannel chan;
private final Selector selector;
! private final ArrayList<AsyncEvent> pending;
private int interestOps;
SelectorAttachment(SelectableChannel chan, Selector selector) {
! this.pending = new ArrayList<>();
this.chan = chan;
this.selector = selector;
}
void register(AsyncEvent e) throws ClosedChannelException {
--- 782,798 ----
* connection.
*/
private static class SelectorAttachment {
private final SelectableChannel chan;
private final Selector selector;
! private final Set<AsyncEvent> pending;
! private final static System.Logger debug =
! Utils.getDebugLogger("SelectorAttachment"::toString, DEBUG);
private int interestOps;
SelectorAttachment(SelectableChannel chan, Selector selector) {
! this.pending = new HashSet<>();
this.chan = chan;
this.selector = selector;
}
void register(AsyncEvent e) throws ClosedChannelException {
*** 504,542 ****
}
}
this.interestOps = newOps;
SelectionKey key = chan.keyFor(selector);
! if (newOps == 0) {
key.cancel();
} else {
key.interestOps(newOps);
}
}
}
@Override
public SSLContext sslContext() {
- Utils.checkNetPermission("getSSLContext");
return sslContext;
}
@Override
! public Optional<SSLParameters> sslParameters() {
! return Optional.ofNullable(sslParams);
}
@Override
public Optional<Authenticator> authenticator() {
return Optional.ofNullable(authenticator);
}
! @Override
! public Executor executor() {
return executor;
}
ConnectionPool connectionPool() {
return connections;
}
@Override
--- 837,899 ----
}
}
this.interestOps = newOps;
SelectionKey key = chan.keyFor(selector);
! if (newOps == 0 && pending.isEmpty()) {
key.cancel();
} else {
+ try {
key.interestOps(newOps);
+ } catch (CancelledKeyException x) {
+ // channel may have been closed
+ debug.log(Level.DEBUG, "key cancelled for " + chan);
+ abortPending(x);
}
}
}
+ void abortPending(Throwable x) {
+ if (!pending.isEmpty()) {
+ AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
+ pending.clear();
+ IOException io = Utils.getIOException(x);
+ for (AsyncEvent event : evts) {
+ event.abort(io);
+ }
+ }
+ }
+ }
+
+ /*package-private*/ SSLContext theSSLContext() {
+ return sslContext;
+ }
+
@Override
public SSLContext sslContext() {
return sslContext;
}
@Override
! public SSLParameters sslParameters() {
! return Utils.copySSLParameters(sslParams);
}
@Override
public Optional<Authenticator> authenticator() {
return Optional.ofNullable(authenticator);
}
! /*package-private*/ final Executor theExecutor() {
return executor;
}
+ @Override
+ public final Optional<Executor> executor() {
+ return isDefaultExecutor ? Optional.empty() : Optional.of(executor);
+ }
+
ConnectionPool connectionPool() {
return connections;
}
@Override
*** 544,583 ****
return followRedirects;
}
@Override
! public Optional<CookieManager> cookieManager() {
! return Optional.ofNullable(cookieManager);
}
@Override
public Optional<ProxySelector> proxy() {
! return Optional.ofNullable(this.proxySelector);
}
@Override
! public WebSocket.Builder newWebSocketBuilder(URI uri,
! WebSocket.Listener listener) {
! return new BuilderImpl(this, uri, listener);
}
@Override
public Version version() {
return version;
}
! //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>();
! boolean getHttp2Allowed() {
! return version.equals(Version.HTTP_2);
}
private void initFilters() {
addFilter(AuthenticationFilter.class);
addFilter(RedirectFilter.class);
! if (this.cookieManager != null) {
addFilter(CookieFilter.class);
}
}
private void addFilter(Class<? extends HeaderFilter> f) {
--- 901,954 ----
return followRedirects;
}
@Override
! public Optional<CookieHandler> cookieHandler() {
! return Optional.ofNullable(cookieHandler);
}
@Override
public Optional<ProxySelector> proxy() {
! return this.userProxySelector;
! }
!
! // Return the effective proxy that this client uses.
! ProxySelector proxySelector() {
! return proxySelector;
}
@Override
! public WebSocket.Builder newWebSocketBuilder() {
! // Make sure to pass the HttpClientFacade to the WebSocket builder.
! // This will ensure that the facade is not released before the
! // WebSocket has been created, at which point the pendingOperationCount
! // will have been incremented by the DetachedConnectionChannel
! // (see PlainHttpConnection.detachChannel())
! return new BuilderImpl(this.facade(), proxySelector);
}
@Override
public Version version() {
return version;
}
! String dbgString() {
! return dbgTag;
! }
! @Override
! public String toString() {
! // Used by tests to get the client's id and compute the
! // name of the SelectorManager thread.
! return super.toString() + ("(" + id + ")");
}
private void initFilters() {
addFilter(AuthenticationFilter.class);
addFilter(RedirectFilter.class);
! if (this.cookieHandler != null) {
addFilter(CookieFilter.class);
}
}
private void addFilter(Class<? extends HeaderFilter> f) {
< prev index next >