/* * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package jdk.incubator.http; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; import java.io.IOException; import java.lang.System.Logger.Level; import java.lang.ref.WeakReference; import java.net.Authenticator; import java.net.CookieHandler; import java.net.ProxySelector; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.security.AccessControlContext; import java.security.AccessController; import java.security.NoSuchAlgorithmException; import java.security.PrivilegedAction; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import jdk.incubator.http.HttpResponse.BodyHandler; import jdk.incubator.http.HttpResponse.MultiSubscriber; import jdk.incubator.http.internal.common.Log; import jdk.incubator.http.internal.common.Pair; import jdk.incubator.http.internal.common.Utils; import jdk.incubator.http.internal.websocket.BuilderImpl; import jdk.internal.misc.InnocuousThread; /** * Client implementation. Contains all configuration information and also * the selector manager thread which allows async events to be registered * and delivered when they occur. See AsyncEvent. */ class HttpClientImpl extends HttpClient { static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. static final boolean DEBUGELAPSED = Utils.TESTING || DEBUG; // Revisit: temporary dev flag. static final boolean DEBUGTIMEOUT = false; // Revisit: temporary dev flag. final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); final System.Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED); final System.Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT); static final AtomicLong CLIENT_IDS = new AtomicLong(); // Define the default factory as a static inner class // that embeds all the necessary logic to avoid // the risk of using a lambda that might keep a reference on the // HttpClient instance from which it was created (helps with // heapdump analysis). private static final class DefaultThreadFactory implements ThreadFactory { private final String namePrefix; private final AtomicInteger nextId = new AtomicInteger(); DefaultThreadFactory(long clientID) { namePrefix = "HttpClient-" + clientID + "-Worker-"; } @Override public Thread newThread(Runnable r) { String name = namePrefix + nextId.getAndIncrement(); Thread t; if (System.getSecurityManager() == null) { t = new Thread(null, r, name, 0, false); } else { t = InnocuousThread.newThread(name, r); } t.setDaemon(true); return t; } } private final CookieHandler cookieHandler; private final Redirect followRedirects; private final Optional userProxySelector; private final ProxySelector proxySelector; private final Authenticator authenticator; private final Version version; private final ConnectionPool connections; private final Executor executor; private final boolean isDefaultExecutor; // Security parameters private final SSLContext sslContext; private final SSLParameters sslParams; private final SelectorManager selmgr; private final FilterFactory filters; private final Http2ClientImpl client2; private final long id; private final String dbgTag; // This reference is used to keep track of the facade HttpClient // that was returned to the application code. // It makes it possible to know when the application no longer // holds any reference to the HttpClient. // Unfortunately, this information is not enough to know when // to exit the SelectorManager thread. Because of the asynchronous // nature of the API, we also need to wait until all pending operations // have completed. private final WeakReference facadeRef; // This counter keeps track of the number of operations pending // on the HttpClient. The SelectorManager thread will wait // until there are no longer any pending operations and the // facadeRef is cleared before exiting. // // The pendingOperationCount is incremented every time a send/sendAsync // operation is invoked on the HttpClient, and is decremented when // the HttpResponse object is returned to the user. // However, at this point, the body may not have been fully read yet. // This is the case when the response T is implemented as a streaming // subscriber (such as an InputStream). // // To take care of this issue the pendingOperationCount will additionally // be incremented/decremented in the following cases: // // 1. For HTTP/2 it is incremented when a stream is added to the // Http2Connection streams map, and decreased when the stream is removed // from the map. This should also take care of push promises. // 2. For WebSocket the count is increased when creating a // DetachedConnectionChannel for the socket, and decreased // when the the channel is closed. // In addition, the HttpClient facade is passed to the WebSocket builder, // (instead of the client implementation delegate). // 3. For HTTP/1.1 the count is incremented before starting to parse the body // response, and decremented when the parser has reached the end of the // response body flow. // // This should ensure that the selector manager thread remains alive until // the response has been fully received or the web socket is closed. private final AtomicLong pendingOperationCount = new AtomicLong(); private final AtomicLong pendingWebSocketCount = new AtomicLong(); private final AtomicLong pendingHttpRequestCount = new AtomicLong(); /** A Set of, deadline first, ordered timeout events. */ private final TreeSet timeouts; /** * This is a bit tricky: * 1. an HttpClientFacade has a final HttpClientImpl field. * 2. an HttpClientImpl has a final WeakReference field, * where the referent is the facade created for that instance. * 3. We cannot just create the HttpClientFacade in the HttpClientImpl * constructor, because it would be only weakly referenced and could * be GC'ed before we can return it. * The solution is to use an instance of SingleFacadeFactory which will * allow the caller of new HttpClientImpl(...) to retrieve the facade * after the HttpClientImpl has been created. */ private static final class SingleFacadeFactory { HttpClientFacade facade; HttpClientFacade createFacade(HttpClientImpl impl) { assert facade == null; return (facade = new HttpClientFacade(impl)); } } static HttpClientFacade create(HttpClientBuilderImpl builder) { SingleFacadeFactory facadeFactory = new SingleFacadeFactory(); HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory); impl.start(); assert facadeFactory.facade != null; assert impl.facadeRef.get() == facadeFactory.facade; return facadeFactory.facade; } private HttpClientImpl(HttpClientBuilderImpl builder, SingleFacadeFactory facadeFactory) { id = CLIENT_IDS.incrementAndGet(); dbgTag = "HttpClientImpl(" + id +")"; if (builder.sslContext == null) { try { sslContext = SSLContext.getDefault(); } catch (NoSuchAlgorithmException ex) { throw new InternalError(ex); } } else { sslContext = builder.sslContext; } Executor ex = builder.executor; if (ex == null) { ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id)); isDefaultExecutor = true; } else { ex = builder.executor; isDefaultExecutor = false; } facadeRef = new WeakReference<>(facadeFactory.createFacade(this)); client2 = new Http2ClientImpl(this); executor = ex; cookieHandler = builder.cookieHandler; followRedirects = builder.followRedirects == null ? Redirect.NEVER : builder.followRedirects; this.userProxySelector = Optional.ofNullable(builder.proxy); this.proxySelector = userProxySelector .orElseGet(HttpClientImpl::getDefaultProxySelector); debug.log(Level.DEBUG, "proxySelector is %s (user-supplied=%s)", this.proxySelector, userProxySelector.isPresent()); authenticator = builder.authenticator; if (builder.version == null) { version = HttpClient.Version.HTTP_2; } else { version = builder.version; } if (builder.sslParams == null) { sslParams = getDefaultParams(sslContext); } else { sslParams = builder.sslParams; } connections = new ConnectionPool(id); connections.start(); timeouts = new TreeSet<>(); try { selmgr = new SelectorManager(this); } catch (IOException e) { // unlikely throw new InternalError(e); } selmgr.setDaemon(true); filters = new FilterFactory(); initFilters(); assert facadeRef.get() != null; } private void start() { selmgr.start(); } // Called from the SelectorManager thread, just before exiting. // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections // that may be still lingering there are properly closed (and their // possibly still opened SocketChannel released). private void stop() { // Clears HTTP/1.1 cache and close its connections connections.stop(); // Clears HTTP/2 cache and close its connections. client2.stop(); } private static SSLParameters getDefaultParams(SSLContext ctx) { SSLParameters params = ctx.getSupportedSSLParameters(); params.setProtocols(new String[]{"TLSv1.2"}); return params; } private static ProxySelector getDefaultProxySelector() { PrivilegedAction action = ProxySelector::getDefault; return AccessController.doPrivileged(action); } // Returns the facade that was returned to the application code. // May be null if that facade is no longer referenced. final HttpClientFacade facade() { return facadeRef.get(); } // Increments the pendingOperationCount. final long reference() { pendingHttpRequestCount.incrementAndGet(); return pendingOperationCount.incrementAndGet(); } // Decrements the pendingOperationCount. final long unreference() { final long count = pendingOperationCount.decrementAndGet(); final long httpCount = pendingHttpRequestCount.decrementAndGet(); final long webSocketCount = pendingWebSocketCount.get(); if (count == 0 && facade() == null) { selmgr.wakeupSelector(); } assert httpCount >= 0 : "count of HTTP operations < 0"; assert webSocketCount >= 0 : "count of WS operations < 0"; assert count >= 0 : "count of pending operations < 0"; return count; } // Increments the pendingOperationCount. final long webSocketOpen() { pendingWebSocketCount.incrementAndGet(); return pendingOperationCount.incrementAndGet(); } // Decrements the pendingOperationCount. final long webSocketClose() { final long count = pendingOperationCount.decrementAndGet(); final long webSocketCount = pendingWebSocketCount.decrementAndGet(); final long httpCount = pendingHttpRequestCount.get(); if (count == 0 && facade() == null) { selmgr.wakeupSelector(); } assert httpCount >= 0 : "count of HTTP operations < 0"; assert webSocketCount >= 0 : "count of WS operations < 0"; assert count >= 0 : "count of pending operations < 0"; return count; } // Returns the pendingOperationCount. final long referenceCount() { return pendingOperationCount.get(); } // Called by the SelectorManager thread to figure out whether it's time // to terminate. final boolean isReferenced() { HttpClient facade = facade(); return facade != null || referenceCount() > 0; } /** * Wait for activity on given exchange. * The following occurs in the SelectorManager thread. * * 1) add to selector * 2) If selector fires for this exchange then * call AsyncEvent.handle() * * If exchange needs to change interest ops, then call registerEvent() again. */ void registerEvent(AsyncEvent exchange) throws IOException { selmgr.register(exchange); } /** * Only used from RawChannel to disconnect the channel from * the selector */ void cancelRegistration(SocketChannel s) { selmgr.cancel(s); } /** * Allows an AsyncEvent to modify its interestOps. * @param event The modified event. */ void eventUpdated(AsyncEvent event) throws ClosedChannelException { assert !(event instanceof AsyncTriggerEvent); selmgr.eventUpdated(event); } boolean isSelectorThread() { return Thread.currentThread() == selmgr; } Http2ClientImpl client2() { return client2; } private void debugCompleted(String tag, long startNanos, HttpRequest req) { if (debugelapsed.isLoggable(Level.DEBUG)) { debugelapsed.log(Level.DEBUG, () -> tag + " elapsed " + (System.nanoTime() - startNanos)/1000_000L + " millis for " + req.method() + " to " + req.uri()); } } @Override public HttpResponse send(HttpRequest req, BodyHandler responseHandler) throws IOException, InterruptedException { try { return sendAsync(req, responseHandler).get(); } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof Error) throw (Error)t; if (t instanceof RuntimeException) throw (RuntimeException)t; else if (t instanceof IOException) throw Utils.getIOException(t); else throw new InternalError("Unexpected exception", t); } } @Override public CompletableFuture> sendAsync(HttpRequest userRequest, BodyHandler responseHandler) { AccessControlContext acc = null; if (System.getSecurityManager() != null) acc = AccessController.getContext(); // Clone the, possibly untrusted, HttpRequest HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc); if (requestImpl.method().equals("CONNECT")) throw new IllegalArgumentException("Unsupported method CONNECT"); long start = DEBUGELAPSED ? System.nanoTime() : 0; reference(); try { debugelapsed.log(Level.DEBUG, "ClientImpl (async) send %s", userRequest); MultiExchange mex = new MultiExchange<>(userRequest, requestImpl, this, responseHandler, acc); CompletableFuture> res = mex.responseAsync().whenComplete((b,t) -> unreference()); if (DEBUGELAPSED) { res = res.whenComplete( (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); } // makes sure that any dependent actions happen in the executor if (acc != null) { res.whenCompleteAsync((r, t) -> { /* do nothing */}, new PrivilegedExecutor(executor, acc)); } return res; } catch(Throwable t) { unreference(); debugCompleted("ClientImpl (async)", start, userRequest); throw t; } } @Override public CompletableFuture sendAsync(HttpRequest userRequest, MultiSubscriber responseHandler) { AccessControlContext acc = null; if (System.getSecurityManager() != null) acc = AccessController.getContext(); // Clone the, possibly untrusted, HttpRequest HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc); if (requestImpl.method().equals("CONNECT")) throw new IllegalArgumentException("Unsupported method CONNECT"); long start = DEBUGELAPSED ? System.nanoTime() : 0; reference(); try { debugelapsed.log(Level.DEBUG, "ClientImpl (async) send multi %s", userRequest); MultiExchange mex = new MultiExchange<>(userRequest, requestImpl, this, responseHandler, acc); CompletableFuture res = mex.multiResponseAsync() .whenComplete((b,t) -> unreference()); if (DEBUGELAPSED) { res = res.whenComplete( (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); } // makes sure that any dependent actions happen in the executor if (acc != null) { res.whenCompleteAsync((r, t) -> { /* do nothing */}, new PrivilegedExecutor(executor, acc)); } return res; } catch(Throwable t) { unreference(); debugCompleted("ClientImpl (async)", start, userRequest); throw t; } } // Main loop for this client's selector private final static class SelectorManager extends Thread { // For testing purposes we have an internal System property that // can control the frequency at which the selector manager will wake // up when there are no pending operations. // Increasing the frequency (shorter delays) might allow the selector // to observe that the facade is no longer referenced and might allow // the selector thread to terminate more timely - for when nothing is // ongoing it will only check for that condition every NODEADLINE ms. // To avoid misuse of the property, the delay that can be specified // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms // The property is -Djdk.httpclient.internal.selector.timeout= private static final int MIN_NODEADLINE = 1000; // ms private static final int MAX_NODEADLINE = 1000 * 1200; // ms private static final int DEF_NODEADLINE = 3000; // ms private static final long NODEADLINE; // default is DEF_NODEADLINE ms static { // ensure NODEADLINE is initialized with some valid value. long deadline = Utils.getIntegerNetProperty( "jdk.httpclient.internal.selector.timeout", DEF_NODEADLINE); // millis if (deadline <= 0) deadline = DEF_NODEADLINE; deadline = Math.max(deadline, MIN_NODEADLINE); NODEADLINE = Math.min(deadline, MAX_NODEADLINE); } private final Selector selector; private volatile boolean closed; private final List registrations; private final System.Logger debug; private final System.Logger debugtimeout; HttpClientImpl owner; ConnectionPool pool; SelectorManager(HttpClientImpl ref) throws IOException { super(null, null, "HttpClient-" + ref.id + "-SelectorManager", 0, false); owner = ref; debug = ref.debug; debugtimeout = ref.debugtimeout; pool = ref.connectionPool(); registrations = new ArrayList<>(); selector = Selector.open(); } void eventUpdated(AsyncEvent e) throws ClosedChannelException { if (Thread.currentThread() == this) { SelectionKey key = e.channel().keyFor(selector); SelectorAttachment sa = (SelectorAttachment) key.attachment(); if (sa != null) sa.register(e); } else { register(e); } } // This returns immediately. So caller not allowed to send/receive // on connection. synchronized void register(AsyncEvent e) { registrations.add(e); selector.wakeup(); } synchronized void cancel(SocketChannel e) { SelectionKey key = e.keyFor(selector); if (key != null) { key.cancel(); } selector.wakeup(); } void wakeupSelector() { selector.wakeup(); } synchronized void shutdown() { debug.log(Level.DEBUG, "SelectorManager shutting down"); closed = true; try { selector.close(); } catch (IOException ignored) { } finally { owner.stop(); } } @Override public void run() { List> errorList = new ArrayList<>(); List readyList = new ArrayList<>(); try { while (!Thread.currentThread().isInterrupted()) { synchronized (this) { assert errorList.isEmpty(); assert readyList.isEmpty(); for (AsyncEvent event : registrations) { if (event instanceof AsyncTriggerEvent) { readyList.add(event); continue; } SelectableChannel chan = event.channel(); SelectionKey key = null; try { key = chan.keyFor(selector); SelectorAttachment sa; if (key == null || !key.isValid()) { if (key != null) { // key is canceled. // invoke selectNow() to purge it // before registering the new event. selector.selectNow(); } sa = new SelectorAttachment(chan, selector); } else { sa = (SelectorAttachment) key.attachment(); } // may throw IOE if channel closed: that's OK sa.register(event); if (!chan.isOpen()) { throw new IOException("Channel closed"); } } catch (IOException e) { Log.logTrace("HttpClientImpl: " + e); debug.log(Level.DEBUG, () -> "Got " + e.getClass().getName() + " while handling" + " registration events"); chan.close(); // let the event abort deal with it errorList.add(new Pair<>(event, e)); if (key != null) { key.cancel(); selector.selectNow(); } } } registrations.clear(); selector.selectedKeys().clear(); } for (AsyncEvent event : readyList) { assert event instanceof AsyncTriggerEvent; event.handle(); } readyList.clear(); for (Pair error : errorList) { // an IOException was raised and the channel closed. handleEvent(error.first, error.second); } errorList.clear(); // Check whether client is still alive, and if not, // gracefully stop this thread if (!owner.isReferenced()) { Log.logTrace("HttpClient no longer referenced. Exiting..."); return; } // Timeouts will have milliseconds granularity. It is important // to handle them in a timely fashion. long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline(); debugtimeout.log(Level.DEBUG, "next timeout: %d", nextTimeout); // Keep-alive have seconds granularity. It's not really an // issue if we keep connections linger a bit more in the keep // alive cache. long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline(); debugtimeout.log(Level.DEBUG, "next expired: %d", nextExpiry); assert nextTimeout >= 0; assert nextExpiry >= 0; // Don't wait for ever as it might prevent the thread to // stop gracefully. millis will be 0 if no deadline was found. if (nextTimeout <= 0) nextTimeout = NODEADLINE; // Clip nextExpiry at NODEADLINE limit. The default // keep alive is 1200 seconds (half an hour) - we don't // want to wait that long. if (nextExpiry <= 0) nextExpiry = NODEADLINE; else nextExpiry = Math.min(NODEADLINE, nextExpiry); // takes the least of the two. long millis = Math.min(nextExpiry, nextTimeout); debugtimeout.log(Level.DEBUG, "Next deadline is %d", (millis == 0 ? NODEADLINE : millis)); //debugPrint(selector); int n = selector.select(millis == 0 ? NODEADLINE : millis); if (n == 0) { // Check whether client is still alive, and if not, // gracefully stop this thread if (!owner.isReferenced()) { Log.logTrace("HttpClient no longer referenced. Exiting..."); return; } owner.purgeTimeoutsAndReturnNextDeadline(); continue; } Set keys = selector.selectedKeys(); assert errorList.isEmpty(); for (SelectionKey key : keys) { SelectorAttachment sa = (SelectorAttachment) key.attachment(); if (!key.isValid()) { IOException ex = sa.chan.isOpen() ? new IOException("Invalid key") : new ClosedChannelException(); sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex))); sa.pending.clear(); continue; } int eventsOccurred; try { eventsOccurred = key.readyOps(); } catch (CancelledKeyException ex) { IOException io = Utils.getIOException(ex); sa.pending.forEach(e -> errorList.add(new Pair<>(e,io))); sa.pending.clear(); continue; } sa.events(eventsOccurred).forEach(readyList::add); sa.resetInterestOps(eventsOccurred); } selector.selectNow(); // complete cancellation selector.selectedKeys().clear(); for (AsyncEvent event : readyList) { handleEvent(event, null); // will be delegated to executor } readyList.clear(); errorList.forEach((p) -> handleEvent(p.first, p.second)); errorList.clear(); } } catch (Throwable e) { //e.printStackTrace(); if (!closed) { // This terminates thread. So, better just print stack trace String err = Utils.stackTrace(e); Log.logError("HttpClientImpl: fatal error: " + err); } debug.log(Level.DEBUG, "shutting down", e); if (Utils.ASSERTIONSENABLED && !debug.isLoggable(Level.DEBUG)) { e.printStackTrace(System.err); // always print the stack } } finally { shutdown(); } } // void debugPrint(Selector selector) { // System.err.println("Selector: debugprint start"); // Set keys = selector.keys(); // for (SelectionKey key : keys) { // SelectableChannel c = key.channel(); // int ops = key.interestOps(); // System.err.printf("selector chan:%s ops:%d\n", c, ops); // } // System.err.println("Selector: debugprint end"); // } /** Handles the given event. The given ioe may be null. */ void handleEvent(AsyncEvent event, IOException ioe) { if (closed || ioe != null) { event.abort(ioe); } else { event.handle(); } } } /** * Tracks multiple user level registrations associated with one NIO * registration (SelectionKey). In this implementation, registrations * are one-off and when an event is posted the registration is cancelled * until explicitly registered again. * *

No external synchronization required as this class is only used * by the SelectorManager thread. One of these objects required per * connection. */ private static class SelectorAttachment { private final SelectableChannel chan; private final Selector selector; private final Set pending; private final static System.Logger debug = Utils.getDebugLogger("SelectorAttachment"::toString, DEBUG); private int interestOps; SelectorAttachment(SelectableChannel chan, Selector selector) { this.pending = new HashSet<>(); this.chan = chan; this.selector = selector; } void register(AsyncEvent e) throws ClosedChannelException { int newOps = e.interestOps(); boolean reRegister = (interestOps & newOps) != newOps; interestOps |= newOps; pending.add(e); if (reRegister) { // first time registration happens here also chan.register(selector, interestOps, this); } } /** * Returns a Stream containing only events that are * registered with the given {@code interestOps}. */ Stream events(int interestOps) { return pending.stream() .filter(ev -> (ev.interestOps() & interestOps) != 0); } /** * Removes any events with the given {@code interestOps}, and if no * events remaining, cancels the associated SelectionKey. */ void resetInterestOps(int interestOps) { int newOps = 0; Iterator itr = pending.iterator(); while (itr.hasNext()) { AsyncEvent event = itr.next(); int evops = event.interestOps(); if (event.repeating()) { newOps |= evops; continue; } if ((evops & interestOps) != 0) { itr.remove(); } else { newOps |= evops; } } this.interestOps = newOps; SelectionKey key = chan.keyFor(selector); if (newOps == 0 && pending.isEmpty()) { key.cancel(); } else { try { key.interestOps(newOps); } catch (CancelledKeyException x) { // channel may have been closed debug.log(Level.DEBUG, "key cancelled for " + chan); abortPending(x); } } } void abortPending(Throwable x) { if (!pending.isEmpty()) { AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]); pending.clear(); IOException io = Utils.getIOException(x); for (AsyncEvent event : evts) { event.abort(io); } } } } /*package-private*/ SSLContext theSSLContext() { return sslContext; } @Override public SSLContext sslContext() { return sslContext; } @Override public SSLParameters sslParameters() { return Utils.copySSLParameters(sslParams); } @Override public Optional authenticator() { return Optional.ofNullable(authenticator); } /*package-private*/ final Executor theExecutor() { return executor; } @Override public final Optional executor() { return isDefaultExecutor ? Optional.empty() : Optional.of(executor); } ConnectionPool connectionPool() { return connections; } @Override public Redirect followRedirects() { return followRedirects; } @Override public Optional cookieHandler() { return Optional.ofNullable(cookieHandler); } @Override public Optional proxy() { return this.userProxySelector; } // Return the effective proxy that this client uses. ProxySelector proxySelector() { return proxySelector; } @Override public WebSocket.Builder newWebSocketBuilder() { // Make sure to pass the HttpClientFacade to the WebSocket builder. // This will ensure that the facade is not released before the // WebSocket has been created, at which point the pendingOperationCount // will have been incremented by the DetachedConnectionChannel // (see PlainHttpConnection.detachChannel()) return new BuilderImpl(this.facade(), proxySelector); } @Override public Version version() { return version; } String dbgString() { return dbgTag; } @Override public String toString() { // Used by tests to get the client's id and compute the // name of the SelectorManager thread. return super.toString() + ("(" + id + ")"); } private void initFilters() { addFilter(AuthenticationFilter.class); addFilter(RedirectFilter.class); if (this.cookieHandler != null) { addFilter(CookieFilter.class); } } private void addFilter(Class f) { filters.addFilter(f); } final List filterChain() { return filters.getFilterChain(); } // Timer controls. // Timers are implemented through timed Selector.select() calls. synchronized void registerTimer(TimeoutEvent event) { Log.logTrace("Registering timer {0}", event); timeouts.add(event); selmgr.wakeupSelector(); } synchronized void cancelTimer(TimeoutEvent event) { Log.logTrace("Canceling timer {0}", event); timeouts.remove(event); } /** * Purges ( handles ) timer events that have passed their deadline, and * returns the amount of time, in milliseconds, until the next earliest * event. A return value of 0 means that there are no events. */ private long purgeTimeoutsAndReturnNextDeadline() { long diff = 0L; List toHandle = null; int remaining = 0; // enter critical section to retrieve the timeout event to handle synchronized(this) { if (timeouts.isEmpty()) return 0L; Instant now = Instant.now(); Iterator itr = timeouts.iterator(); while (itr.hasNext()) { TimeoutEvent event = itr.next(); diff = now.until(event.deadline(), ChronoUnit.MILLIS); if (diff <= 0) { itr.remove(); toHandle = (toHandle == null) ? new ArrayList<>() : toHandle; toHandle.add(event); } else { break; } } remaining = timeouts.size(); } // can be useful for debugging if (toHandle != null && Log.trace()) { Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling " + (toHandle == null ? 0 : toHandle.size()) + " events, " + "remaining " + remaining + ", next deadline: " + (diff < 0 ? 0L : diff)); } // handle timeout events out of critical section if (toHandle != null) { Throwable failed = null; for (TimeoutEvent event : toHandle) { try { Log.logTrace("Firing timer {0}", event); event.handle(); } catch (Error | RuntimeException e) { // Not expected. Handle remaining events then throw... // If e is an OOME or SOE it might simply trigger a new // error from here - but in this case there's not much we // could do anyway. Just let it flow... if (failed == null) failed = e; else failed.addSuppressed(e); Log.logTrace("Failed to handle event {0}: {1}", event, e); } } if (failed instanceof Error) throw (Error) failed; if (failed instanceof RuntimeException) throw (RuntimeException) failed; } // return time to wait until next event. 0L if there's no more events. return diff < 0 ? 0L : diff; } // used for the connection window int getReceiveBufferSize() { return Utils.getIntegerNetProperty( "jdk.httpclient.connectionWindowSize", 256 * 1024 ); } }