--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java 2017-11-30 04:03:58.260989528 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java 2017-11-30 04:03:58.058971867 -0800 @@ -28,33 +28,45 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; import java.io.IOException; +import java.lang.System.Logger.Level; import java.lang.ref.WeakReference; import java.net.Authenticator; -import java.net.CookieManager; +import java.net.CookieHandler; import java.net.ProxySelector; -import java.net.URI; +import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import java.security.AccessControlContext; +import java.security.AccessController; import java.security.NoSuchAlgorithmException; +import java.security.PrivilegedAction; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; +import jdk.incubator.http.HttpResponse.BodyHandler; +import jdk.incubator.http.HttpResponse.MultiSubscriber; import jdk.incubator.http.internal.common.Log; +import jdk.incubator.http.internal.common.Pair; import jdk.incubator.http.internal.common.Utils; import jdk.incubator.http.internal.websocket.BuilderImpl; +import jdk.internal.misc.InnocuousThread; /** * Client implementation. Contains all configuration information and also @@ -63,46 +75,138 @@ */ class HttpClientImpl extends HttpClient { + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + static final boolean DEBUGELAPSED = Utils.TESTING || DEBUG; // Revisit: temporary dev flag. + static final boolean DEBUGTIMEOUT = false; // Revisit: temporary dev flag. + final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); + final System.Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED); + final System.Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT); + static final AtomicLong CLIENT_IDS = new AtomicLong(); + // Define the default factory as a static inner class // that embeds all the necessary logic to avoid // the risk of using a lambda that might keep a reference on the // HttpClient instance from which it was created (helps with // heapdump analysis). private static final class DefaultThreadFactory implements ThreadFactory { - private DefaultThreadFactory() {} + private final String namePrefix; + private final AtomicInteger nextId = new AtomicInteger(); + + DefaultThreadFactory(long clientID) { + namePrefix = "HttpClient-" + clientID + "-Worker-"; + } + @Override public Thread newThread(Runnable r) { - Thread t = new Thread(null, r, "HttpClient_worker", 0, true); + String name = namePrefix + nextId.getAndIncrement(); + Thread t; + if (System.getSecurityManager() == null) { + t = new Thread(null, r, name, 0, false); + } else { + t = InnocuousThread.newThread(name, r); + } t.setDaemon(true); return t; } - static final ThreadFactory INSTANCE = new DefaultThreadFactory(); } - private final CookieManager cookieManager; + private final CookieHandler cookieHandler; private final Redirect followRedirects; + private final Optional userProxySelector; private final ProxySelector proxySelector; private final Authenticator authenticator; private final Version version; private final ConnectionPool connections; private final Executor executor; + private final boolean isDefaultExecutor; // Security parameters private final SSLContext sslContext; private final SSLParameters sslParams; private final SelectorManager selmgr; private final FilterFactory filters; private final Http2ClientImpl client2; + private final long id; + private final String dbgTag; + + // This reference is used to keep track of the facade HttpClient + // that was returned to the application code. + // It makes it possible to know when the application no longer + // holds any reference to the HttpClient. + // Unfortunately, this information is not enough to know when + // to exit the SelectorManager thread. Because of the asynchronous + // nature of the API, we also need to wait until all pending operations + // have completed. + private final WeakReference facadeRef; + + // This counter keeps track of the number of operations pending + // on the HttpClient. The SelectorManager thread will wait + // until there are no longer any pending operations and the + // facadeRef is cleared before exiting. + // + // The pendingOperationCount is incremented every time a send/sendAsync + // operation is invoked on the HttpClient, and is decremented when + // the HttpResponse object is returned to the user. + // However, at this point, the body may not have been fully read yet. + // This is the case when the response T is implemented as a streaming + // subscriber (such as an InputStream). + // + // To take care of this issue the pendingOperationCount will additionally + // be incremented/decremented in the following cases: + // + // 1. For HTTP/2 it is incremented when a stream is added to the + // Http2Connection streams map, and decreased when the stream is removed + // from the map. This should also take care of push promises. + // 2. For WebSocket the count is increased when creating a + // DetachedConnectionChannel for the socket, and decreased + // when the the channel is closed. + // In addition, the HttpClient facade is passed to the WebSocket builder, + // (instead of the client implementation delegate). + // 3. For HTTP/1.1 the count is incremented before starting to parse the body + // response, and decremented when the parser has reached the end of the + // response body flow. + // + // This should ensure that the selector manager thread remains alive until + // the response has been fully received or the web socket is closed. + private final AtomicLong pendingOperationCount = new AtomicLong(); + private final AtomicLong pendingWebSocketCount = new AtomicLong(); + private final AtomicLong pendingHttpRequestCount = new AtomicLong(); /** A Set of, deadline first, ordered timeout events. */ private final TreeSet timeouts; - public static HttpClientImpl create(HttpClientBuilderImpl builder) { - HttpClientImpl impl = new HttpClientImpl(builder); + /** + * This is a bit tricky: + * 1. an HttpClientFacade has a final HttpClientImpl field. + * 2. an HttpClientImpl has a final WeakReference field, + * where the referent is the facade created for that instance. + * 3. We cannot just create the HttpClientFacade in the HttpClientImpl + * constructor, because it would be only weakly referenced and could + * be GC'ed before we can return it. + * The solution is to use an instance of SingleFacadeFactory which will + * allow the caller of new HttpClientImpl(...) to retrieve the facade + * after the HttpClientImpl has been created. + */ + private static final class SingleFacadeFactory { + HttpClientFacade facade; + HttpClientFacade createFacade(HttpClientImpl impl) { + assert facade == null; + return (facade = new HttpClientFacade(impl)); + } + } + + static HttpClientFacade create(HttpClientBuilderImpl builder) { + SingleFacadeFactory facadeFactory = new SingleFacadeFactory(); + HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory); impl.start(); - return impl; + assert facadeFactory.facade != null; + assert impl.facadeRef.get() == facadeFactory.facade; + return facadeFactory.facade; } - private HttpClientImpl(HttpClientBuilderImpl builder) { + private HttpClientImpl(HttpClientBuilderImpl builder, + SingleFacadeFactory facadeFactory) { + id = CLIENT_IDS.incrementAndGet(); + dbgTag = "HttpClientImpl(" + id +")"; if (builder.sslContext == null) { try { sslContext = SSLContext.getDefault(); @@ -114,16 +218,23 @@ } Executor ex = builder.executor; if (ex == null) { - ex = Executors.newCachedThreadPool(DefaultThreadFactory.INSTANCE); + ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id)); + isDefaultExecutor = true; } else { ex = builder.executor; + isDefaultExecutor = false; } + facadeRef = new WeakReference<>(facadeFactory.createFacade(this)); client2 = new Http2ClientImpl(this); executor = ex; - cookieManager = builder.cookieManager; + cookieHandler = builder.cookieHandler; followRedirects = builder.followRedirects == null ? Redirect.NEVER : builder.followRedirects; - this.proxySelector = builder.proxy; + this.userProxySelector = Optional.ofNullable(builder.proxy); + this.proxySelector = userProxySelector + .orElseGet(HttpClientImpl::getDefaultProxySelector); + debug.log(Level.DEBUG, "proxySelector is %s (user-supplied=%s)", + this.proxySelector, userProxySelector.isPresent()); authenticator = builder.authenticator; if (builder.version == null) { version = HttpClient.Version.HTTP_2; @@ -135,7 +246,7 @@ } else { sslParams = builder.sslParams; } - connections = new ConnectionPool(); + connections = new ConnectionPool(id); connections.start(); timeouts = new TreeSet<>(); try { @@ -147,30 +258,102 @@ selmgr.setDaemon(true); filters = new FilterFactory(); initFilters(); + assert facadeRef.get() != null; } private void start() { selmgr.start(); } + // Called from the SelectorManager thread, just before exiting. + // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections + // that may be still lingering there are properly closed (and their + // possibly still opened SocketChannel released). + private void stop() { + // Clears HTTP/1.1 cache and close its connections + connections.stop(); + // Clears HTTP/2 cache and close its connections. + client2.stop(); + } + private static SSLParameters getDefaultParams(SSLContext ctx) { SSLParameters params = ctx.getSupportedSSLParameters(); params.setProtocols(new String[]{"TLSv1.2"}); return params; } + private static ProxySelector getDefaultProxySelector() { + PrivilegedAction action = ProxySelector::getDefault; + return AccessController.doPrivileged(action); + } + + // Returns the facade that was returned to the application code. + // May be null if that facade is no longer referenced. + final HttpClientFacade facade() { + return facadeRef.get(); + } + + // Increments the pendingOperationCount. + final long reference() { + pendingHttpRequestCount.incrementAndGet(); + return pendingOperationCount.incrementAndGet(); + } + + // Decrements the pendingOperationCount. + final long unreference() { + final long count = pendingOperationCount.decrementAndGet(); + final long httpCount = pendingHttpRequestCount.decrementAndGet(); + final long webSocketCount = pendingWebSocketCount.get(); + if (count == 0 && facade() == null) { + selmgr.wakeupSelector(); + } + assert httpCount >= 0 : "count of HTTP operations < 0"; + assert webSocketCount >= 0 : "count of WS operations < 0"; + assert count >= 0 : "count of pending operations < 0"; + return count; + } + + // Increments the pendingOperationCount. + final long webSocketOpen() { + pendingWebSocketCount.incrementAndGet(); + return pendingOperationCount.incrementAndGet(); + } + + // Decrements the pendingOperationCount. + final long webSocketClose() { + final long count = pendingOperationCount.decrementAndGet(); + final long webSocketCount = pendingWebSocketCount.decrementAndGet(); + final long httpCount = pendingHttpRequestCount.get(); + if (count == 0 && facade() == null) { + selmgr.wakeupSelector(); + } + assert httpCount >= 0 : "count of HTTP operations < 0"; + assert webSocketCount >= 0 : "count of WS operations < 0"; + assert count >= 0 : "count of pending operations < 0"; + return count; + } + + // Returns the pendingOperationCount. + final long referenceCount() { + return pendingOperationCount.get(); + } + + // Called by the SelectorManager thread to figure out whether it's time + // to terminate. + final boolean isReferenced() { + HttpClient facade = facade(); + return facade != null || referenceCount() > 0; + } + /** - * Wait for activity on given exchange (assuming blocking = false). - * It's a no-op if blocking = true. In particular, the following occurs - * in the SelectorManager thread. + * Wait for activity on given exchange. + * The following occurs in the SelectorManager thread. * - * 1) mark the connection non-blocking - * 2) add to selector - * 3) If selector fires for this exchange then - * 4) - mark connection as blocking - * 5) - call AsyncEvent.handle() + * 1) add to selector + * 2) If selector fires for this exchange then + * call AsyncEvent.handle() * - * If exchange needs to block again, then call registerEvent() again + * If exchange needs to change interest ops, then call registerEvent() again. */ void registerEvent(AsyncEvent exchange) throws IOException { selmgr.register(exchange); @@ -184,131 +367,196 @@ selmgr.cancel(s); } - - Http2ClientImpl client2() { - return client2; - } - - /* - @Override - public ByteBuffer getBuffer() { - return pool.getBuffer(); + /** + * Allows an AsyncEvent to modify its interestOps. + * @param event The modified event. + */ + void eventUpdated(AsyncEvent event) throws ClosedChannelException { + assert !(event instanceof AsyncTriggerEvent); + selmgr.eventUpdated(event); } - // SSL buffers are larger. Manage separately - - int size = 16 * 1024; - - ByteBuffer getSSLBuffer() { - return ByteBuffer.allocate(size); + boolean isSelectorThread() { + return Thread.currentThread() == selmgr; } - /** - * Return a new buffer that's a bit bigger than the given one - * - * @param buf - * @return - * - ByteBuffer reallocSSLBuffer(ByteBuffer buf) { - size = buf.capacity() * 12 / 10; // 20% bigger - return ByteBuffer.allocate(size); - } - - synchronized void returnSSLBuffer(ByteBuffer buf) { - if (buf.capacity() >= size) - sslBuffers.add(0, buf); + Http2ClientImpl client2() { + return client2; } - @Override - public void returnBuffer(ByteBuffer buffer) { - pool.returnBuffer(buffer); + private void debugCompleted(String tag, long startNanos, HttpRequest req) { + if (debugelapsed.isLoggable(Level.DEBUG)) { + debugelapsed.log(Level.DEBUG, () -> tag + " elapsed " + + (System.nanoTime() - startNanos)/1000_000L + + " millis for " + req.method() + + " to " + req.uri()); + } } - */ @Override public HttpResponse - send(HttpRequest req, HttpResponse.BodyHandler responseHandler) + send(HttpRequest req, BodyHandler responseHandler) throws IOException, InterruptedException { - MultiExchange mex = new MultiExchange<>(req, this, responseHandler); - return mex.response(); + try { + return sendAsync(req, responseHandler).get(); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof Error) + throw (Error)t; + if (t instanceof RuntimeException) + throw (RuntimeException)t; + else if (t instanceof IOException) + throw Utils.getIOException(t); + else + throw new InternalError("Unexpected exception", t); + } } @Override public CompletableFuture> - sendAsync(HttpRequest req, HttpResponse.BodyHandler responseHandler) + sendAsync(HttpRequest userRequest, BodyHandler responseHandler) { - MultiExchange mex = new MultiExchange<>(req, this, responseHandler); - return mex.responseAsync() - .thenApply((HttpResponseImpl b) -> (HttpResponse) b); + AccessControlContext acc = null; + if (System.getSecurityManager() != null) + acc = AccessController.getContext(); + + // Clone the, possibly untrusted, HttpRequest + HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc); + if (requestImpl.method().equals("CONNECT")) + throw new IllegalArgumentException("Unsupported method CONNECT"); + + long start = DEBUGELAPSED ? System.nanoTime() : 0; + reference(); + try { + debugelapsed.log(Level.DEBUG, "ClientImpl (async) send %s", userRequest); + + MultiExchange mex = new MultiExchange<>(userRequest, + requestImpl, + this, + responseHandler, + acc); + CompletableFuture> res = + mex.responseAsync().whenComplete((b,t) -> unreference()); + if (DEBUGELAPSED) { + res = res.whenComplete( + (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); + } + // makes sure that any dependent actions happen in the executor + if (acc != null) { + res.whenCompleteAsync((r, t) -> { /* do nothing */}, + new PrivilegedExecutor(executor, acc)); + } + + return res; + } catch(Throwable t) { + unreference(); + debugCompleted("ClientImpl (async)", start, userRequest); + throw t; + } } @Override public CompletableFuture - sendAsync(HttpRequest req, HttpResponse.MultiProcessor responseHandler) { - MultiExchange mex = new MultiExchange<>(req, this, responseHandler); - return mex.multiResponseAsync(); - } - - // new impl. Should get rid of above - /* - static class BufferPool implements BufferHandler { - - final LinkedList freelist = new LinkedList<>(); + sendAsync(HttpRequest userRequest, MultiSubscriber responseHandler) { + AccessControlContext acc = null; + if (System.getSecurityManager() != null) + acc = AccessController.getContext(); + + // Clone the, possibly untrusted, HttpRequest + HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc); + if (requestImpl.method().equals("CONNECT")) + throw new IllegalArgumentException("Unsupported method CONNECT"); - @Override - public synchronized ByteBuffer getBuffer() { - ByteBuffer buf; + long start = DEBUGELAPSED ? System.nanoTime() : 0; + reference(); + try { + debugelapsed.log(Level.DEBUG, "ClientImpl (async) send multi %s", userRequest); - while (!freelist.isEmpty()) { - buf = freelist.removeFirst(); - buf.clear(); - return buf; + MultiExchange mex = new MultiExchange<>(userRequest, + requestImpl, + this, + responseHandler, + acc); + CompletableFuture res = mex.multiResponseAsync() + .whenComplete((b,t) -> unreference()); + if (DEBUGELAPSED) { + res = res.whenComplete( + (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); + } + // makes sure that any dependent actions happen in the executor + if (acc != null) { + res.whenCompleteAsync((r, t) -> { /* do nothing */}, + new PrivilegedExecutor(executor, acc)); } - return ByteBuffer.allocate(BUFSIZE); - } - @Override - public synchronized void returnBuffer(ByteBuffer buffer) { - assert buffer.capacity() > 0; - freelist.add(buffer); + return res; + } catch(Throwable t) { + unreference(); + debugCompleted("ClientImpl (async)", start, userRequest); + throw t; } } - static BufferPool pool = new BufferPool(); - - static BufferHandler pool() { - return pool; - } -*/ // Main loop for this client's selector private final static class SelectorManager extends Thread { - private static final long NODEADLINE = 3000L; + // For testing purposes we have an internal System property that + // can control the frequency at which the selector manager will wake + // up when there are no pending operations. + // Increasing the frequency (shorter delays) might allow the selector + // to observe that the facade is no longer referenced and might allow + // the selector thread to terminate more timely - for when nothing is + // ongoing it will only check for that condition every NODEADLINE ms. + // To avoid misuse of the property, the delay that can be specified + // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default + // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms + // The property is -Djdk.httpclient.internal.selector.timeout= + private static final int MIN_NODEADLINE = 1000; // ms + private static final int MAX_NODEADLINE = 1000 * 1200; // ms + private static final int DEF_NODEADLINE = 3000; // ms + private static final long NODEADLINE; // default is DEF_NODEADLINE ms + static { + // ensure NODEADLINE is initialized with some valid value. + long deadline = Utils.getIntegerNetProperty( + "jdk.httpclient.internal.selector.timeout", + DEF_NODEADLINE); // millis + if (deadline <= 0) deadline = DEF_NODEADLINE; + deadline = Math.max(deadline, MIN_NODEADLINE); + NODEADLINE = Math.min(deadline, MAX_NODEADLINE); + } + private final Selector selector; private volatile boolean closed; - private final List readyList; private final List registrations; - - // Uses a weak reference to the HttpClient owning this - // selector: a strong reference prevents its garbage - // collection while the thread is running. - // We want the thread to exit gracefully when the - // HttpClient that owns it gets GC'ed. - WeakReference ownerRef; + private final System.Logger debug; + private final System.Logger debugtimeout; + HttpClientImpl owner; + ConnectionPool pool; SelectorManager(HttpClientImpl ref) throws IOException { - super(null, null, "SelectorManager", 0, false); - ownerRef = new WeakReference<>(ref); - readyList = new ArrayList<>(); + super(null, null, "HttpClient-" + ref.id + "-SelectorManager", 0, false); + owner = ref; + debug = ref.debug; + debugtimeout = ref.debugtimeout; + pool = ref.connectionPool(); registrations = new ArrayList<>(); selector = Selector.open(); } + void eventUpdated(AsyncEvent e) throws ClosedChannelException { + if (Thread.currentThread() == this) { + SelectionKey key = e.channel().keyFor(selector); + SelectorAttachment sa = (SelectorAttachment) key.attachment(); + if (sa != null) sa.register(e); + } else { + register(e); + } + } + // This returns immediately. So caller not allowed to send/receive // on connection. - - synchronized void register(AsyncEvent e) throws IOException { + synchronized void register(AsyncEvent e) { registrations.add(e); selector.wakeup(); } @@ -326,23 +574,34 @@ } synchronized void shutdown() { + debug.log(Level.DEBUG, "SelectorManager shutting down"); closed = true; try { selector.close(); - } catch (IOException ignored) { } + } catch (IOException ignored) { + } finally { + owner.stop(); + } } @Override public void run() { + List> errorList = new ArrayList<>(); + List readyList = new ArrayList<>(); try { while (!Thread.currentThread().isInterrupted()) { - HttpClientImpl client; synchronized (this) { - for (AsyncEvent exchange : registrations) { - SelectableChannel c = exchange.channel(); + assert errorList.isEmpty(); + assert readyList.isEmpty(); + for (AsyncEvent event : registrations) { + if (event instanceof AsyncTriggerEvent) { + readyList.add(event); + continue; + } + SelectableChannel chan = event.channel(); + SelectionKey key = null; try { - c.configureBlocking(false); - SelectionKey key = c.keyFor(selector); + key = chan.keyFor(selector); SelectorAttachment sa; if (key == null || !key.isValid()) { if (key != null) { @@ -351,91 +610,163 @@ // before registering the new event. selector.selectNow(); } - sa = new SelectorAttachment(c, selector); + sa = new SelectorAttachment(chan, selector); } else { sa = (SelectorAttachment) key.attachment(); } - sa.register(exchange); + // may throw IOE if channel closed: that's OK + sa.register(event); + if (!chan.isOpen()) { + throw new IOException("Channel closed"); + } } catch (IOException e) { - Log.logError("HttpClientImpl: " + e); - c.close(); - // let the exchange deal with it - handleEvent(exchange); + Log.logTrace("HttpClientImpl: " + e); + debug.log(Level.DEBUG, () -> + "Got " + e.getClass().getName() + + " while handling" + + " registration events"); + chan.close(); + // let the event abort deal with it + errorList.add(new Pair<>(event, e)); + if (key != null) { + key.cancel(); + selector.selectNow(); + } } } registrations.clear(); + selector.selectedKeys().clear(); + } + + for (AsyncEvent event : readyList) { + assert event instanceof AsyncTriggerEvent; + event.handle(); } + readyList.clear(); + + for (Pair error : errorList) { + // an IOException was raised and the channel closed. + handleEvent(error.first, error.second); + } + errorList.clear(); // Check whether client is still alive, and if not, // gracefully stop this thread - if ((client = ownerRef.get()) == null) { + if (!owner.isReferenced()) { Log.logTrace("HttpClient no longer referenced. Exiting..."); return; } - long millis = client.purgeTimeoutsAndReturnNextDeadline(); - client = null; // don't hold onto the client ref - //debugPrint(selector); + // Timeouts will have milliseconds granularity. It is important + // to handle them in a timely fashion. + long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline(); + debugtimeout.log(Level.DEBUG, "next timeout: %d", nextTimeout); + + // Keep-alive have seconds granularity. It's not really an + // issue if we keep connections linger a bit more in the keep + // alive cache. + long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline(); + debugtimeout.log(Level.DEBUG, "next expired: %d", nextExpiry); + + assert nextTimeout >= 0; + assert nextExpiry >= 0; + // Don't wait for ever as it might prevent the thread to // stop gracefully. millis will be 0 if no deadline was found. + if (nextTimeout <= 0) nextTimeout = NODEADLINE; + + // Clip nextExpiry at NODEADLINE limit. The default + // keep alive is 1200 seconds (half an hour) - we don't + // want to wait that long. + if (nextExpiry <= 0) nextExpiry = NODEADLINE; + else nextExpiry = Math.min(NODEADLINE, nextExpiry); + + // takes the least of the two. + long millis = Math.min(nextExpiry, nextTimeout); + + debugtimeout.log(Level.DEBUG, "Next deadline is %d", + (millis == 0 ? NODEADLINE : millis)); + //debugPrint(selector); int n = selector.select(millis == 0 ? NODEADLINE : millis); if (n == 0) { // Check whether client is still alive, and if not, // gracefully stop this thread - if ((client = ownerRef.get()) == null) { + if (!owner.isReferenced()) { Log.logTrace("HttpClient no longer referenced. Exiting..."); return; } - client.purgeTimeoutsAndReturnNextDeadline(); - client = null; // don't hold onto the client ref + owner.purgeTimeoutsAndReturnNextDeadline(); continue; } Set keys = selector.selectedKeys(); + assert errorList.isEmpty(); for (SelectionKey key : keys) { SelectorAttachment sa = (SelectorAttachment) key.attachment(); - int eventsOccurred = key.readyOps(); + if (!key.isValid()) { + IOException ex = sa.chan.isOpen() + ? new IOException("Invalid key") + : new ClosedChannelException(); + sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex))); + sa.pending.clear(); + continue; + } + + int eventsOccurred; + try { + eventsOccurred = key.readyOps(); + } catch (CancelledKeyException ex) { + IOException io = Utils.getIOException(ex); + sa.pending.forEach(e -> errorList.add(new Pair<>(e,io))); + sa.pending.clear(); + continue; + } sa.events(eventsOccurred).forEach(readyList::add); sa.resetInterestOps(eventsOccurred); } selector.selectNow(); // complete cancellation selector.selectedKeys().clear(); - for (AsyncEvent exchange : readyList) { - if (exchange.blocking()) { - exchange.channel().configureBlocking(true); - } - handleEvent(exchange); // will be delegated to executor + for (AsyncEvent event : readyList) { + handleEvent(event, null); // will be delegated to executor } readyList.clear(); + errorList.forEach((p) -> handleEvent(p.first, p.second)); + errorList.clear(); } } catch (Throwable e) { + //e.printStackTrace(); if (!closed) { // This terminates thread. So, better just print stack trace String err = Utils.stackTrace(e); Log.logError("HttpClientImpl: fatal error: " + err); } + debug.log(Level.DEBUG, "shutting down", e); + if (Utils.ASSERTIONSENABLED && !debug.isLoggable(Level.DEBUG)) { + e.printStackTrace(System.err); // always print the stack + } } finally { shutdown(); } } - void debugPrint(Selector selector) { - System.err.println("Selector: debugprint start"); - Set keys = selector.keys(); - for (SelectionKey key : keys) { - SelectableChannel c = key.channel(); - int ops = key.interestOps(); - System.err.printf("selector chan:%s ops:%d\n", c, ops); - } - System.err.println("Selector: debugprint end"); - } - - void handleEvent(AsyncEvent e) { - if (closed) { - e.abort(); +// void debugPrint(Selector selector) { +// System.err.println("Selector: debugprint start"); +// Set keys = selector.keys(); +// for (SelectionKey key : keys) { +// SelectableChannel c = key.channel(); +// int ops = key.interestOps(); +// System.err.printf("selector chan:%s ops:%d\n", c, ops); +// } +// System.err.println("Selector: debugprint end"); +// } + + /** Handles the given event. The given ioe may be null. */ + void handleEvent(AsyncEvent event, IOException ioe) { + if (closed || ioe != null) { + event.abort(ioe); } else { - e.handle(); + event.handle(); } } } @@ -453,11 +784,13 @@ private static class SelectorAttachment { private final SelectableChannel chan; private final Selector selector; - private final ArrayList pending; + private final Set pending; + private final static System.Logger debug = + Utils.getDebugLogger("SelectorAttachment"::toString, DEBUG); private int interestOps; SelectorAttachment(SelectableChannel chan, Selector selector) { - this.pending = new ArrayList<>(); + this.pending = new HashSet<>(); this.chan = chan; this.selector = selector; } @@ -506,23 +839,43 @@ this.interestOps = newOps; SelectionKey key = chan.keyFor(selector); - if (newOps == 0) { + if (newOps == 0 && pending.isEmpty()) { key.cancel(); } else { - key.interestOps(newOps); + try { + key.interestOps(newOps); + } catch (CancelledKeyException x) { + // channel may have been closed + debug.log(Level.DEBUG, "key cancelled for " + chan); + abortPending(x); + } + } + } + + void abortPending(Throwable x) { + if (!pending.isEmpty()) { + AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]); + pending.clear(); + IOException io = Utils.getIOException(x); + for (AsyncEvent event : evts) { + event.abort(io); + } } } } + /*package-private*/ SSLContext theSSLContext() { + return sslContext; + } + @Override public SSLContext sslContext() { - Utils.checkNetPermission("getSSLContext"); return sslContext; } @Override - public Optional sslParameters() { - return Optional.ofNullable(sslParams); + public SSLParameters sslParameters() { + return Utils.copySSLParameters(sslParams); } @Override @@ -530,11 +883,15 @@ return Optional.ofNullable(authenticator); } - @Override - public Executor executor() { + /*package-private*/ final Executor theExecutor() { return executor; } + @Override + public final Optional executor() { + return isDefaultExecutor ? Optional.empty() : Optional.of(executor); + } + ConnectionPool connectionPool() { return connections; } @@ -546,19 +903,28 @@ @Override - public Optional cookieManager() { - return Optional.ofNullable(cookieManager); + public Optional cookieHandler() { + return Optional.ofNullable(cookieHandler); } @Override public Optional proxy() { - return Optional.ofNullable(this.proxySelector); + return this.userProxySelector; + } + + // Return the effective proxy that this client uses. + ProxySelector proxySelector() { + return proxySelector; } @Override - public WebSocket.Builder newWebSocketBuilder(URI uri, - WebSocket.Listener listener) { - return new BuilderImpl(this, uri, listener); + public WebSocket.Builder newWebSocketBuilder() { + // Make sure to pass the HttpClientFacade to the WebSocket builder. + // This will ensure that the facade is not released before the + // WebSocket has been created, at which point the pendingOperationCount + // will have been incremented by the DetachedConnectionChannel + // (see PlainHttpConnection.detachChannel()) + return new BuilderImpl(this.facade(), proxySelector); } @Override @@ -566,16 +932,21 @@ return version; } - //private final HashMap http2NotSupported = new HashMap<>(); + String dbgString() { + return dbgTag; + } - boolean getHttp2Allowed() { - return version.equals(Version.HTTP_2); + @Override + public String toString() { + // Used by tests to get the client's id and compute the + // name of the SelectorManager thread. + return super.toString() + ("(" + id + ")"); } private void initFilters() { addFilter(AuthenticationFilter.class); addFilter(RedirectFilter.class); - if (this.cookieManager != null) { + if (this.cookieHandler != null) { addFilter(CookieFilter.class); } }