< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java

Print this page

        

*** 26,110 **** package jdk.incubator.http; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; import java.io.IOException; import java.lang.ref.WeakReference; import java.net.Authenticator; ! import java.net.CookieManager; import java.net.ProxySelector; ! import java.net.URI; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.security.NoSuchAlgorithmException; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.stream.Stream; import jdk.incubator.http.internal.common.Log; import jdk.incubator.http.internal.common.Utils; import jdk.incubator.http.internal.websocket.BuilderImpl; /** * Client implementation. Contains all configuration information and also * the selector manager thread which allows async events to be registered * and delivered when they occur. See AsyncEvent. */ class HttpClientImpl extends HttpClient { // Define the default factory as a static inner class // that embeds all the necessary logic to avoid // the risk of using a lambda that might keep a reference on the // HttpClient instance from which it was created (helps with // heapdump analysis). private static final class DefaultThreadFactory implements ThreadFactory { ! private DefaultThreadFactory() {} @Override public Thread newThread(Runnable r) { ! Thread t = new Thread(null, r, "HttpClient_worker", 0, true); t.setDaemon(true); return t; } - static final ThreadFactory INSTANCE = new DefaultThreadFactory(); } ! private final CookieManager cookieManager; private final Redirect followRedirects; private final ProxySelector proxySelector; private final Authenticator authenticator; private final Version version; private final ConnectionPool connections; private final Executor executor; // Security parameters private final SSLContext sslContext; private final SSLParameters sslParams; private final SelectorManager selmgr; private final FilterFactory filters; private final Http2ClientImpl client2; /** A Set of, deadline first, ordered timeout events. */ private final TreeSet<TimeoutEvent> timeouts; ! public static HttpClientImpl create(HttpClientBuilderImpl builder) { ! HttpClientImpl impl = new HttpClientImpl(builder); impl.start(); ! return impl; } ! private HttpClientImpl(HttpClientBuilderImpl builder) { if (builder.sslContext == null) { try { sslContext = SSLContext.getDefault(); } catch (NoSuchAlgorithmException ex) { throw new InternalError(ex); --- 26,214 ---- package jdk.incubator.http; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; import java.io.IOException; + import java.lang.System.Logger.Level; import java.lang.ref.WeakReference; import java.net.Authenticator; ! import java.net.CookieHandler; import java.net.ProxySelector; ! import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; + import java.security.AccessControlContext; + import java.security.AccessController; import java.security.NoSuchAlgorithmException; + import java.security.PrivilegedAction; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; + import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; + import jdk.incubator.http.HttpResponse.BodyHandler; + import jdk.incubator.http.HttpResponse.MultiSubscriber; import jdk.incubator.http.internal.common.Log; + import jdk.incubator.http.internal.common.Pair; import jdk.incubator.http.internal.common.Utils; import jdk.incubator.http.internal.websocket.BuilderImpl; + import jdk.internal.misc.InnocuousThread; /** * Client implementation. Contains all configuration information and also * the selector manager thread which allows async events to be registered * and delivered when they occur. See AsyncEvent. */ class HttpClientImpl extends HttpClient { + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + static final boolean DEBUGELAPSED = Utils.TESTING || DEBUG; // Revisit: temporary dev flag. + static final boolean DEBUGTIMEOUT = false; // Revisit: temporary dev flag. + final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); + final System.Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED); + final System.Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT); + static final AtomicLong CLIENT_IDS = new AtomicLong(); + // Define the default factory as a static inner class // that embeds all the necessary logic to avoid // the risk of using a lambda that might keep a reference on the // HttpClient instance from which it was created (helps with // heapdump analysis). private static final class DefaultThreadFactory implements ThreadFactory { ! private final String namePrefix; ! private final AtomicInteger nextId = new AtomicInteger(); ! ! DefaultThreadFactory(long clientID) { ! namePrefix = "HttpClient-" + clientID + "-Worker-"; ! } ! @Override public Thread newThread(Runnable r) { ! String name = namePrefix + nextId.getAndIncrement(); ! Thread t; ! if (System.getSecurityManager() == null) { ! t = new Thread(null, r, name, 0, false); ! } else { ! t = InnocuousThread.newThread(name, r); ! } t.setDaemon(true); return t; } } ! private final CookieHandler cookieHandler; private final Redirect followRedirects; + private final Optional<ProxySelector> userProxySelector; private final ProxySelector proxySelector; private final Authenticator authenticator; private final Version version; private final ConnectionPool connections; private final Executor executor; + private final boolean isDefaultExecutor; // Security parameters private final SSLContext sslContext; private final SSLParameters sslParams; private final SelectorManager selmgr; private final FilterFactory filters; private final Http2ClientImpl client2; + private final long id; + private final String dbgTag; + + // This reference is used to keep track of the facade HttpClient + // that was returned to the application code. + // It makes it possible to know when the application no longer + // holds any reference to the HttpClient. + // Unfortunately, this information is not enough to know when + // to exit the SelectorManager thread. Because of the asynchronous + // nature of the API, we also need to wait until all pending operations + // have completed. + private final WeakReference<HttpClientFacade> facadeRef; + + // This counter keeps track of the number of operations pending + // on the HttpClient. The SelectorManager thread will wait + // until there are no longer any pending operations and the + // facadeRef is cleared before exiting. + // + // The pendingOperationCount is incremented every time a send/sendAsync + // operation is invoked on the HttpClient, and is decremented when + // the HttpResponse<T> object is returned to the user. + // However, at this point, the body may not have been fully read yet. + // This is the case when the response T is implemented as a streaming + // subscriber (such as an InputStream). + // + // To take care of this issue the pendingOperationCount will additionally + // be incremented/decremented in the following cases: + // + // 1. For HTTP/2 it is incremented when a stream is added to the + // Http2Connection streams map, and decreased when the stream is removed + // from the map. This should also take care of push promises. + // 2. For WebSocket the count is increased when creating a + // DetachedConnectionChannel for the socket, and decreased + // when the the channel is closed. + // In addition, the HttpClient facade is passed to the WebSocket builder, + // (instead of the client implementation delegate). + // 3. For HTTP/1.1 the count is incremented before starting to parse the body + // response, and decremented when the parser has reached the end of the + // response body flow. + // + // This should ensure that the selector manager thread remains alive until + // the response has been fully received or the web socket is closed. + private final AtomicLong pendingOperationCount = new AtomicLong(); + private final AtomicLong pendingWebSocketCount = new AtomicLong(); + private final AtomicLong pendingHttpRequestCount = new AtomicLong(); /** A Set of, deadline first, ordered timeout events. */ private final TreeSet<TimeoutEvent> timeouts; ! /** ! * This is a bit tricky: ! * 1. an HttpClientFacade has a final HttpClientImpl field. ! * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field, ! * where the referent is the facade created for that instance. ! * 3. We cannot just create the HttpClientFacade in the HttpClientImpl ! * constructor, because it would be only weakly referenced and could ! * be GC'ed before we can return it. ! * The solution is to use an instance of SingleFacadeFactory which will ! * allow the caller of new HttpClientImpl(...) to retrieve the facade ! * after the HttpClientImpl has been created. ! */ ! private static final class SingleFacadeFactory { ! HttpClientFacade facade; ! HttpClientFacade createFacade(HttpClientImpl impl) { ! assert facade == null; ! return (facade = new HttpClientFacade(impl)); ! } ! } ! ! static HttpClientFacade create(HttpClientBuilderImpl builder) { ! SingleFacadeFactory facadeFactory = new SingleFacadeFactory(); ! HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory); impl.start(); ! assert facadeFactory.facade != null; ! assert impl.facadeRef.get() == facadeFactory.facade; ! return facadeFactory.facade; } ! private HttpClientImpl(HttpClientBuilderImpl builder, ! SingleFacadeFactory facadeFactory) { ! id = CLIENT_IDS.incrementAndGet(); ! dbgTag = "HttpClientImpl(" + id +")"; if (builder.sslContext == null) { try { sslContext = SSLContext.getDefault(); } catch (NoSuchAlgorithmException ex) { throw new InternalError(ex);
*** 112,131 **** } else { sslContext = builder.sslContext; } Executor ex = builder.executor; if (ex == null) { ! ex = Executors.newCachedThreadPool(DefaultThreadFactory.INSTANCE); } else { ex = builder.executor; } client2 = new Http2ClientImpl(this); executor = ex; ! cookieManager = builder.cookieManager; followRedirects = builder.followRedirects == null ? Redirect.NEVER : builder.followRedirects; ! this.proxySelector = builder.proxy; authenticator = builder.authenticator; if (builder.version == null) { version = HttpClient.Version.HTTP_2; } else { version = builder.version; --- 216,242 ---- } else { sslContext = builder.sslContext; } Executor ex = builder.executor; if (ex == null) { ! ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id)); ! isDefaultExecutor = true; } else { ex = builder.executor; + isDefaultExecutor = false; } + facadeRef = new WeakReference<>(facadeFactory.createFacade(this)); client2 = new Http2ClientImpl(this); executor = ex; ! cookieHandler = builder.cookieHandler; followRedirects = builder.followRedirects == null ? Redirect.NEVER : builder.followRedirects; ! this.userProxySelector = Optional.ofNullable(builder.proxy); ! this.proxySelector = userProxySelector ! .orElseGet(HttpClientImpl::getDefaultProxySelector); ! debug.log(Level.DEBUG, "proxySelector is %s (user-supplied=%s)", ! this.proxySelector, userProxySelector.isPresent()); authenticator = builder.authenticator; if (builder.version == null) { version = HttpClient.Version.HTTP_2; } else { version = builder.version;
*** 133,143 **** if (builder.sslParams == null) { sslParams = getDefaultParams(sslContext); } else { sslParams = builder.sslParams; } ! connections = new ConnectionPool(); connections.start(); timeouts = new TreeSet<>(); try { selmgr = new SelectorManager(this); } catch (IOException e) { --- 244,254 ---- if (builder.sslParams == null) { sslParams = getDefaultParams(sslContext); } else { sslParams = builder.sslParams; } ! connections = new ConnectionPool(id); connections.start(); timeouts = new TreeSet<>(); try { selmgr = new SelectorManager(this); } catch (IOException e) {
*** 145,178 **** throw new InternalError(e); } selmgr.setDaemon(true); filters = new FilterFactory(); initFilters(); } private void start() { selmgr.start(); } private static SSLParameters getDefaultParams(SSLContext ctx) { SSLParameters params = ctx.getSupportedSSLParameters(); params.setProtocols(new String[]{"TLSv1.2"}); return params; } /** ! * Wait for activity on given exchange (assuming blocking = false). ! * It's a no-op if blocking = true. In particular, the following occurs ! * in the SelectorManager thread. * ! * 1) mark the connection non-blocking ! * 2) add to selector ! * 3) If selector fires for this exchange then ! * 4) - mark connection as blocking ! * 5) - call AsyncEvent.handle() * ! * If exchange needs to block again, then call registerEvent() again */ void registerEvent(AsyncEvent exchange) throws IOException { selmgr.register(exchange); } --- 256,361 ---- throw new InternalError(e); } selmgr.setDaemon(true); filters = new FilterFactory(); initFilters(); + assert facadeRef.get() != null; } private void start() { selmgr.start(); } + // Called from the SelectorManager thread, just before exiting. + // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections + // that may be still lingering there are properly closed (and their + // possibly still opened SocketChannel released). + private void stop() { + // Clears HTTP/1.1 cache and close its connections + connections.stop(); + // Clears HTTP/2 cache and close its connections. + client2.stop(); + } + private static SSLParameters getDefaultParams(SSLContext ctx) { SSLParameters params = ctx.getSupportedSSLParameters(); params.setProtocols(new String[]{"TLSv1.2"}); return params; } + private static ProxySelector getDefaultProxySelector() { + PrivilegedAction<ProxySelector> action = ProxySelector::getDefault; + return AccessController.doPrivileged(action); + } + + // Returns the facade that was returned to the application code. + // May be null if that facade is no longer referenced. + final HttpClientFacade facade() { + return facadeRef.get(); + } + + // Increments the pendingOperationCount. + final long reference() { + pendingHttpRequestCount.incrementAndGet(); + return pendingOperationCount.incrementAndGet(); + } + + // Decrements the pendingOperationCount. + final long unreference() { + final long count = pendingOperationCount.decrementAndGet(); + final long httpCount = pendingHttpRequestCount.decrementAndGet(); + final long webSocketCount = pendingWebSocketCount.get(); + if (count == 0 && facade() == null) { + selmgr.wakeupSelector(); + } + assert httpCount >= 0 : "count of HTTP operations < 0"; + assert webSocketCount >= 0 : "count of WS operations < 0"; + assert count >= 0 : "count of pending operations < 0"; + return count; + } + + // Increments the pendingOperationCount. + final long webSocketOpen() { + pendingWebSocketCount.incrementAndGet(); + return pendingOperationCount.incrementAndGet(); + } + + // Decrements the pendingOperationCount. + final long webSocketClose() { + final long count = pendingOperationCount.decrementAndGet(); + final long webSocketCount = pendingWebSocketCount.decrementAndGet(); + final long httpCount = pendingHttpRequestCount.get(); + if (count == 0 && facade() == null) { + selmgr.wakeupSelector(); + } + assert httpCount >= 0 : "count of HTTP operations < 0"; + assert webSocketCount >= 0 : "count of WS operations < 0"; + assert count >= 0 : "count of pending operations < 0"; + return count; + } + + // Returns the pendingOperationCount. + final long referenceCount() { + return pendingOperationCount.get(); + } + + // Called by the SelectorManager thread to figure out whether it's time + // to terminate. + final boolean isReferenced() { + HttpClient facade = facade(); + return facade != null || referenceCount() > 0; + } + /** ! * Wait for activity on given exchange. ! * The following occurs in the SelectorManager thread. * ! * 1) add to selector ! * 2) If selector fires for this exchange then ! * call AsyncEvent.handle() * ! * If exchange needs to change interest ops, then call registerEvent() again. */ void registerEvent(AsyncEvent exchange) throws IOException { selmgr.register(exchange); }
*** 182,316 **** */ void cancelRegistration(SocketChannel s) { selmgr.cancel(s); } ! ! Http2ClientImpl client2() { ! return client2; ! } ! ! /* ! @Override ! public ByteBuffer getBuffer() { ! return pool.getBuffer(); } ! // SSL buffers are larger. Manage separately ! ! int size = 16 * 1024; ! ! ByteBuffer getSSLBuffer() { ! return ByteBuffer.allocate(size); } ! /** ! * Return a new buffer that's a bit bigger than the given one ! * ! * @param buf ! * @return ! * ! ByteBuffer reallocSSLBuffer(ByteBuffer buf) { ! size = buf.capacity() * 12 / 10; // 20% bigger ! return ByteBuffer.allocate(size); } ! synchronized void returnSSLBuffer(ByteBuffer buf) { ! if (buf.capacity() >= size) ! sslBuffers.add(0, buf); } - - @Override - public void returnBuffer(ByteBuffer buffer) { - pool.returnBuffer(buffer); } - */ @Override public <T> HttpResponse<T> ! send(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler) throws IOException, InterruptedException { ! MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler); ! return mex.response(); } @Override public <T> CompletableFuture<HttpResponse<T>> ! sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler) { ! MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler); ! return mex.responseAsync() ! .thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b); ! } ! ! @Override ! public <U, T> CompletableFuture<U> ! sendAsync(HttpRequest req, HttpResponse.MultiProcessor<U, T> responseHandler) { ! MultiExchange<U,T> mex = new MultiExchange<>(req, this, responseHandler); ! return mex.multiResponseAsync(); ! } ! ! // new impl. Should get rid of above ! /* ! static class BufferPool implements BufferHandler { ! final LinkedList<ByteBuffer> freelist = new LinkedList<>(); ! ! @Override ! public synchronized ByteBuffer getBuffer() { ! ByteBuffer buf; ! while (!freelist.isEmpty()) { ! buf = freelist.removeFirst(); ! buf.clear(); ! return buf; } - return ByteBuffer.allocate(BUFSIZE); } @Override ! public synchronized void returnBuffer(ByteBuffer buffer) { ! assert buffer.capacity() > 0; ! freelist.add(buffer); ! } ! } ! static BufferPool pool = new BufferPool(); ! static BufferHandler pool() { ! return pool; } ! */ // Main loop for this client's selector private final static class SelectorManager extends Thread { ! private static final long NODEADLINE = 3000L; private final Selector selector; private volatile boolean closed; - private final List<AsyncEvent> readyList; private final List<AsyncEvent> registrations; ! ! // Uses a weak reference to the HttpClient owning this ! // selector: a strong reference prevents its garbage ! // collection while the thread is running. ! // We want the thread to exit gracefully when the ! // HttpClient that owns it gets GC'ed. ! WeakReference<HttpClientImpl> ownerRef; SelectorManager(HttpClientImpl ref) throws IOException { ! super(null, null, "SelectorManager", 0, false); ! ownerRef = new WeakReference<>(ref); ! readyList = new ArrayList<>(); registrations = new ArrayList<>(); selector = Selector.open(); } // This returns immediately. So caller not allowed to send/receive // on connection. ! ! synchronized void register(AsyncEvent e) throws IOException { registrations.add(e); selector.wakeup(); } synchronized void cancel(SocketChannel e) { --- 365,564 ---- */ void cancelRegistration(SocketChannel s) { selmgr.cancel(s); } ! /** ! * Allows an AsyncEvent to modify its interestOps. ! * @param event The modified event. ! */ ! void eventUpdated(AsyncEvent event) throws ClosedChannelException { ! assert !(event instanceof AsyncTriggerEvent); ! selmgr.eventUpdated(event); } ! boolean isSelectorThread() { ! return Thread.currentThread() == selmgr; } ! Http2ClientImpl client2() { ! return client2; } ! private void debugCompleted(String tag, long startNanos, HttpRequest req) { ! if (debugelapsed.isLoggable(Level.DEBUG)) { ! debugelapsed.log(Level.DEBUG, () -> tag + " elapsed " ! + (System.nanoTime() - startNanos)/1000_000L ! + " millis for " + req.method() ! + " to " + req.uri()); } } @Override public <T> HttpResponse<T> ! send(HttpRequest req, BodyHandler<T> responseHandler) throws IOException, InterruptedException { ! try { ! return sendAsync(req, responseHandler).get(); ! } catch (ExecutionException e) { ! Throwable t = e.getCause(); ! if (t instanceof Error) ! throw (Error)t; ! if (t instanceof RuntimeException) ! throw (RuntimeException)t; ! else if (t instanceof IOException) ! throw Utils.getIOException(t); ! else ! throw new InternalError("Unexpected exception", t); ! } } @Override public <T> CompletableFuture<HttpResponse<T>> ! sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler) { ! AccessControlContext acc = null; ! if (System.getSecurityManager() != null) ! acc = AccessController.getContext(); ! ! // Clone the, possibly untrusted, HttpRequest ! HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc); ! if (requestImpl.method().equals("CONNECT")) ! throw new IllegalArgumentException("Unsupported method CONNECT"); ! long start = DEBUGELAPSED ? System.nanoTime() : 0; ! reference(); ! try { ! debugelapsed.log(Level.DEBUG, "ClientImpl (async) send %s", userRequest); ! MultiExchange<Void,T> mex = new MultiExchange<>(userRequest, ! requestImpl, ! this, ! responseHandler, ! acc); ! CompletableFuture<HttpResponse<T>> res = ! mex.responseAsync().whenComplete((b,t) -> unreference()); ! if (DEBUGELAPSED) { ! res = res.whenComplete( ! (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); ! } ! // makes sure that any dependent actions happen in the executor ! if (acc != null) { ! res.whenCompleteAsync((r, t) -> { /* do nothing */}, ! new PrivilegedExecutor(executor, acc)); ! } ! ! return res; ! } catch(Throwable t) { ! unreference(); ! debugCompleted("ClientImpl (async)", start, userRequest); ! throw t; } } @Override ! public <U, T> CompletableFuture<U> ! sendAsync(HttpRequest userRequest, MultiSubscriber<U, T> responseHandler) { ! AccessControlContext acc = null; ! if (System.getSecurityManager() != null) ! acc = AccessController.getContext(); ! ! // Clone the, possibly untrusted, HttpRequest ! HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector, acc); ! if (requestImpl.method().equals("CONNECT")) ! throw new IllegalArgumentException("Unsupported method CONNECT"); ! long start = DEBUGELAPSED ? System.nanoTime() : 0; ! reference(); ! try { ! debugelapsed.log(Level.DEBUG, "ClientImpl (async) send multi %s", userRequest); ! MultiExchange<U,T> mex = new MultiExchange<>(userRequest, ! requestImpl, ! this, ! responseHandler, ! acc); ! CompletableFuture<U> res = mex.multiResponseAsync() ! .whenComplete((b,t) -> unreference()); ! if (DEBUGELAPSED) { ! res = res.whenComplete( ! (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); ! } ! // makes sure that any dependent actions happen in the executor ! if (acc != null) { ! res.whenCompleteAsync((r, t) -> { /* do nothing */}, ! new PrivilegedExecutor(executor, acc)); ! } ! ! return res; ! } catch(Throwable t) { ! unreference(); ! debugCompleted("ClientImpl (async)", start, userRequest); ! throw t; ! } } ! // Main loop for this client's selector private final static class SelectorManager extends Thread { ! // For testing purposes we have an internal System property that ! // can control the frequency at which the selector manager will wake ! // up when there are no pending operations. ! // Increasing the frequency (shorter delays) might allow the selector ! // to observe that the facade is no longer referenced and might allow ! // the selector thread to terminate more timely - for when nothing is ! // ongoing it will only check for that condition every NODEADLINE ms. ! // To avoid misuse of the property, the delay that can be specified ! // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default ! // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms ! // The property is -Djdk.httpclient.internal.selector.timeout=<millis> ! private static final int MIN_NODEADLINE = 1000; // ms ! private static final int MAX_NODEADLINE = 1000 * 1200; // ms ! private static final int DEF_NODEADLINE = 3000; // ms ! private static final long NODEADLINE; // default is DEF_NODEADLINE ms ! static { ! // ensure NODEADLINE is initialized with some valid value. ! long deadline = Utils.getIntegerNetProperty( ! "jdk.httpclient.internal.selector.timeout", ! DEF_NODEADLINE); // millis ! if (deadline <= 0) deadline = DEF_NODEADLINE; ! deadline = Math.max(deadline, MIN_NODEADLINE); ! NODEADLINE = Math.min(deadline, MAX_NODEADLINE); ! } ! private final Selector selector; private volatile boolean closed; private final List<AsyncEvent> registrations; ! private final System.Logger debug; ! private final System.Logger debugtimeout; ! HttpClientImpl owner; ! ConnectionPool pool; SelectorManager(HttpClientImpl ref) throws IOException { ! super(null, null, "HttpClient-" + ref.id + "-SelectorManager", 0, false); ! owner = ref; ! debug = ref.debug; ! debugtimeout = ref.debugtimeout; ! pool = ref.connectionPool(); registrations = new ArrayList<>(); selector = Selector.open(); } + void eventUpdated(AsyncEvent e) throws ClosedChannelException { + if (Thread.currentThread() == this) { + SelectionKey key = e.channel().keyFor(selector); + SelectorAttachment sa = (SelectorAttachment) key.attachment(); + if (sa != null) sa.register(e); + } else { + register(e); + } + } + // This returns immediately. So caller not allowed to send/receive // on connection. ! synchronized void register(AsyncEvent e) { registrations.add(e); selector.wakeup(); } synchronized void cancel(SocketChannel e) {
*** 324,443 **** void wakeupSelector() { selector.wakeup(); } synchronized void shutdown() { closed = true; try { selector.close(); ! } catch (IOException ignored) { } } @Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { - HttpClientImpl client; synchronized (this) { ! for (AsyncEvent exchange : registrations) { ! SelectableChannel c = exchange.channel(); try { ! c.configureBlocking(false); ! SelectionKey key = c.keyFor(selector); SelectorAttachment sa; if (key == null || !key.isValid()) { if (key != null) { // key is canceled. // invoke selectNow() to purge it // before registering the new event. selector.selectNow(); } ! sa = new SelectorAttachment(c, selector); } else { sa = (SelectorAttachment) key.attachment(); } ! sa.register(exchange); } catch (IOException e) { ! Log.logError("HttpClientImpl: " + e); ! c.close(); ! // let the exchange deal with it ! handleEvent(exchange); } } registrations.clear(); } // Check whether client is still alive, and if not, // gracefully stop this thread ! if ((client = ownerRef.get()) == null) { Log.logTrace("HttpClient no longer referenced. Exiting..."); return; } - long millis = client.purgeTimeoutsAndReturnNextDeadline(); - client = null; // don't hold onto the client ref ! //debugPrint(selector); // Don't wait for ever as it might prevent the thread to // stop gracefully. millis will be 0 if no deadline was found. int n = selector.select(millis == 0 ? NODEADLINE : millis); if (n == 0) { // Check whether client is still alive, and if not, // gracefully stop this thread ! if ((client = ownerRef.get()) == null) { Log.logTrace("HttpClient no longer referenced. Exiting..."); return; } ! client.purgeTimeoutsAndReturnNextDeadline(); ! client = null; // don't hold onto the client ref continue; } Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { SelectorAttachment sa = (SelectorAttachment) key.attachment(); ! int eventsOccurred = key.readyOps(); sa.events(eventsOccurred).forEach(readyList::add); sa.resetInterestOps(eventsOccurred); } selector.selectNow(); // complete cancellation selector.selectedKeys().clear(); ! for (AsyncEvent exchange : readyList) { ! if (exchange.blocking()) { ! exchange.channel().configureBlocking(true); ! } ! handleEvent(exchange); // will be delegated to executor } readyList.clear(); } } catch (Throwable e) { if (!closed) { // This terminates thread. So, better just print stack trace String err = Utils.stackTrace(e); Log.logError("HttpClientImpl: fatal error: " + err); } } finally { shutdown(); } } ! void debugPrint(Selector selector) { ! System.err.println("Selector: debugprint start"); ! Set<SelectionKey> keys = selector.keys(); ! for (SelectionKey key : keys) { ! SelectableChannel c = key.channel(); ! int ops = key.interestOps(); ! System.err.printf("selector chan:%s ops:%d\n", c, ops); ! } ! System.err.println("Selector: debugprint end"); ! } ! ! void handleEvent(AsyncEvent e) { ! if (closed) { ! e.abort(); } else { ! e.handle(); } } } /** --- 572,774 ---- void wakeupSelector() { selector.wakeup(); } synchronized void shutdown() { + debug.log(Level.DEBUG, "SelectorManager shutting down"); closed = true; try { selector.close(); ! } catch (IOException ignored) { ! } finally { ! owner.stop(); ! } } @Override public void run() { + List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>(); + List<AsyncEvent> readyList = new ArrayList<>(); try { while (!Thread.currentThread().isInterrupted()) { synchronized (this) { ! assert errorList.isEmpty(); ! assert readyList.isEmpty(); ! for (AsyncEvent event : registrations) { ! if (event instanceof AsyncTriggerEvent) { ! readyList.add(event); ! continue; ! } ! SelectableChannel chan = event.channel(); ! SelectionKey key = null; try { ! key = chan.keyFor(selector); SelectorAttachment sa; if (key == null || !key.isValid()) { if (key != null) { // key is canceled. // invoke selectNow() to purge it // before registering the new event. selector.selectNow(); } ! sa = new SelectorAttachment(chan, selector); } else { sa = (SelectorAttachment) key.attachment(); } ! // may throw IOE if channel closed: that's OK ! sa.register(event); ! if (!chan.isOpen()) { ! throw new IOException("Channel closed"); ! } } catch (IOException e) { ! Log.logTrace("HttpClientImpl: " + e); ! debug.log(Level.DEBUG, () -> ! "Got " + e.getClass().getName() ! + " while handling" ! + " registration events"); ! chan.close(); ! // let the event abort deal with it ! errorList.add(new Pair<>(event, e)); ! if (key != null) { ! key.cancel(); ! selector.selectNow(); ! } } } registrations.clear(); + selector.selectedKeys().clear(); + } + + for (AsyncEvent event : readyList) { + assert event instanceof AsyncTriggerEvent; + event.handle(); } + readyList.clear(); + + for (Pair<AsyncEvent,IOException> error : errorList) { + // an IOException was raised and the channel closed. + handleEvent(error.first, error.second); + } + errorList.clear(); // Check whether client is still alive, and if not, // gracefully stop this thread ! if (!owner.isReferenced()) { Log.logTrace("HttpClient no longer referenced. Exiting..."); return; } ! // Timeouts will have milliseconds granularity. It is important ! // to handle them in a timely fashion. ! long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline(); ! debugtimeout.log(Level.DEBUG, "next timeout: %d", nextTimeout); ! ! // Keep-alive have seconds granularity. It's not really an ! // issue if we keep connections linger a bit more in the keep ! // alive cache. ! long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline(); ! debugtimeout.log(Level.DEBUG, "next expired: %d", nextExpiry); ! ! assert nextTimeout >= 0; ! assert nextExpiry >= 0; ! // Don't wait for ever as it might prevent the thread to // stop gracefully. millis will be 0 if no deadline was found. + if (nextTimeout <= 0) nextTimeout = NODEADLINE; + + // Clip nextExpiry at NODEADLINE limit. The default + // keep alive is 1200 seconds (half an hour) - we don't + // want to wait that long. + if (nextExpiry <= 0) nextExpiry = NODEADLINE; + else nextExpiry = Math.min(NODEADLINE, nextExpiry); + + // takes the least of the two. + long millis = Math.min(nextExpiry, nextTimeout); + + debugtimeout.log(Level.DEBUG, "Next deadline is %d", + (millis == 0 ? NODEADLINE : millis)); + //debugPrint(selector); int n = selector.select(millis == 0 ? NODEADLINE : millis); if (n == 0) { // Check whether client is still alive, and if not, // gracefully stop this thread ! if (!owner.isReferenced()) { Log.logTrace("HttpClient no longer referenced. Exiting..."); return; } ! owner.purgeTimeoutsAndReturnNextDeadline(); continue; } Set<SelectionKey> keys = selector.selectedKeys(); + assert errorList.isEmpty(); for (SelectionKey key : keys) { SelectorAttachment sa = (SelectorAttachment) key.attachment(); ! if (!key.isValid()) { ! IOException ex = sa.chan.isOpen() ! ? new IOException("Invalid key") ! : new ClosedChannelException(); ! sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex))); ! sa.pending.clear(); ! continue; ! } ! ! int eventsOccurred; ! try { ! eventsOccurred = key.readyOps(); ! } catch (CancelledKeyException ex) { ! IOException io = Utils.getIOException(ex); ! sa.pending.forEach(e -> errorList.add(new Pair<>(e,io))); ! sa.pending.clear(); ! continue; ! } sa.events(eventsOccurred).forEach(readyList::add); sa.resetInterestOps(eventsOccurred); } selector.selectNow(); // complete cancellation selector.selectedKeys().clear(); ! for (AsyncEvent event : readyList) { ! handleEvent(event, null); // will be delegated to executor } readyList.clear(); + errorList.forEach((p) -> handleEvent(p.first, p.second)); + errorList.clear(); } } catch (Throwable e) { + //e.printStackTrace(); if (!closed) { // This terminates thread. So, better just print stack trace String err = Utils.stackTrace(e); Log.logError("HttpClientImpl: fatal error: " + err); } + debug.log(Level.DEBUG, "shutting down", e); + if (Utils.ASSERTIONSENABLED && !debug.isLoggable(Level.DEBUG)) { + e.printStackTrace(System.err); // always print the stack + } } finally { shutdown(); } } ! // void debugPrint(Selector selector) { ! // System.err.println("Selector: debugprint start"); ! // Set<SelectionKey> keys = selector.keys(); ! // for (SelectionKey key : keys) { ! // SelectableChannel c = key.channel(); ! // int ops = key.interestOps(); ! // System.err.printf("selector chan:%s ops:%d\n", c, ops); ! // } ! // System.err.println("Selector: debugprint end"); ! // } ! ! /** Handles the given event. The given ioe may be null. */ ! void handleEvent(AsyncEvent event, IOException ioe) { ! if (closed || ioe != null) { ! event.abort(ioe); } else { ! event.handle(); } } } /**
*** 451,465 **** * connection. */ private static class SelectorAttachment { private final SelectableChannel chan; private final Selector selector; ! private final ArrayList<AsyncEvent> pending; private int interestOps; SelectorAttachment(SelectableChannel chan, Selector selector) { ! this.pending = new ArrayList<>(); this.chan = chan; this.selector = selector; } void register(AsyncEvent e) throws ClosedChannelException { --- 782,798 ---- * connection. */ private static class SelectorAttachment { private final SelectableChannel chan; private final Selector selector; ! private final Set<AsyncEvent> pending; ! private final static System.Logger debug = ! Utils.getDebugLogger("SelectorAttachment"::toString, DEBUG); private int interestOps; SelectorAttachment(SelectableChannel chan, Selector selector) { ! this.pending = new HashSet<>(); this.chan = chan; this.selector = selector; } void register(AsyncEvent e) throws ClosedChannelException {
*** 504,542 **** } } this.interestOps = newOps; SelectionKey key = chan.keyFor(selector); ! if (newOps == 0) { key.cancel(); } else { key.interestOps(newOps); } } } @Override public SSLContext sslContext() { - Utils.checkNetPermission("getSSLContext"); return sslContext; } @Override ! public Optional<SSLParameters> sslParameters() { ! return Optional.ofNullable(sslParams); } @Override public Optional<Authenticator> authenticator() { return Optional.ofNullable(authenticator); } ! @Override ! public Executor executor() { return executor; } ConnectionPool connectionPool() { return connections; } @Override --- 837,899 ---- } } this.interestOps = newOps; SelectionKey key = chan.keyFor(selector); ! if (newOps == 0 && pending.isEmpty()) { key.cancel(); } else { + try { key.interestOps(newOps); + } catch (CancelledKeyException x) { + // channel may have been closed + debug.log(Level.DEBUG, "key cancelled for " + chan); + abortPending(x); } } } + void abortPending(Throwable x) { + if (!pending.isEmpty()) { + AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]); + pending.clear(); + IOException io = Utils.getIOException(x); + for (AsyncEvent event : evts) { + event.abort(io); + } + } + } + } + + /*package-private*/ SSLContext theSSLContext() { + return sslContext; + } + @Override public SSLContext sslContext() { return sslContext; } @Override ! public SSLParameters sslParameters() { ! return Utils.copySSLParameters(sslParams); } @Override public Optional<Authenticator> authenticator() { return Optional.ofNullable(authenticator); } ! /*package-private*/ final Executor theExecutor() { return executor; } + @Override + public final Optional<Executor> executor() { + return isDefaultExecutor ? Optional.empty() : Optional.of(executor); + } + ConnectionPool connectionPool() { return connections; } @Override
*** 544,583 **** return followRedirects; } @Override ! public Optional<CookieManager> cookieManager() { ! return Optional.ofNullable(cookieManager); } @Override public Optional<ProxySelector> proxy() { ! return Optional.ofNullable(this.proxySelector); } @Override ! public WebSocket.Builder newWebSocketBuilder(URI uri, ! WebSocket.Listener listener) { ! return new BuilderImpl(this, uri, listener); } @Override public Version version() { return version; } ! //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>(); ! boolean getHttp2Allowed() { ! return version.equals(Version.HTTP_2); } private void initFilters() { addFilter(AuthenticationFilter.class); addFilter(RedirectFilter.class); ! if (this.cookieManager != null) { addFilter(CookieFilter.class); } } private void addFilter(Class<? extends HeaderFilter> f) { --- 901,954 ---- return followRedirects; } @Override ! public Optional<CookieHandler> cookieHandler() { ! return Optional.ofNullable(cookieHandler); } @Override public Optional<ProxySelector> proxy() { ! return this.userProxySelector; ! } ! ! // Return the effective proxy that this client uses. ! ProxySelector proxySelector() { ! return proxySelector; } @Override ! public WebSocket.Builder newWebSocketBuilder() { ! // Make sure to pass the HttpClientFacade to the WebSocket builder. ! // This will ensure that the facade is not released before the ! // WebSocket has been created, at which point the pendingOperationCount ! // will have been incremented by the DetachedConnectionChannel ! // (see PlainHttpConnection.detachChannel()) ! return new BuilderImpl(this.facade(), proxySelector); } @Override public Version version() { return version; } ! String dbgString() { ! return dbgTag; ! } ! @Override ! public String toString() { ! // Used by tests to get the client's id and compute the ! // name of the SelectorManager thread. ! return super.toString() + ("(" + id + ")"); } private void initFilters() { addFilter(AuthenticationFilter.class); addFilter(RedirectFilter.class); ! if (this.cookieHandler != null) { addFilter(CookieFilter.class); } } private void addFilter(Class<? extends HeaderFilter> f) {
< prev index next >