1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import javax.net.ssl.SSLContext;
  29 import javax.net.ssl.SSLParameters;
  30 import java.io.IOException;
  31 import java.io.UncheckedIOException;
  32 import java.lang.ref.Reference;
  33 import java.lang.ref.WeakReference;
  34 import java.net.Authenticator;
  35 import java.net.ConnectException;
  36 import java.net.CookieHandler;
  37 import java.net.ProxySelector;
  38 import java.net.http.HttpConnectTimeoutException;
  39 import java.net.http.HttpTimeoutException;
  40 import java.nio.ByteBuffer;
  41 import java.nio.channels.CancelledKeyException;
  42 import java.nio.channels.ClosedChannelException;
  43 import java.nio.channels.SelectableChannel;
  44 import java.nio.channels.SelectionKey;
  45 import java.nio.channels.Selector;
  46 import java.nio.channels.SocketChannel;
  47 import java.security.AccessControlContext;
  48 import java.security.AccessController;
  49 import java.security.NoSuchAlgorithmException;
  50 import java.security.PrivilegedAction;
  51 import java.time.Duration;
  52 import java.time.Instant;
  53 import java.time.temporal.ChronoUnit;
  54 import java.util.ArrayList;
  55 import java.util.HashSet;
  56 import java.util.Iterator;
  57 import java.util.LinkedList;
  58 import java.util.List;
  59 import java.util.Objects;
  60 import java.util.Optional;
  61 import java.util.Set;
  62 import java.util.TreeSet;
  63 import java.util.concurrent.CompletableFuture;
  64 import java.util.concurrent.CompletionException;
  65 import java.util.concurrent.ExecutionException;
  66 import java.util.concurrent.Executor;
  67 import java.util.concurrent.Executors;
  68 import java.util.concurrent.ThreadFactory;
  69 import java.util.concurrent.atomic.AtomicInteger;
  70 import java.util.concurrent.atomic.AtomicLong;
  71 import java.util.function.BooleanSupplier;
  72 import java.util.stream.Stream;
  73 import java.net.http.HttpClient;
  74 import java.net.http.HttpRequest;
  75 import java.net.http.HttpResponse;
  76 import java.net.http.HttpResponse.BodyHandler;
  77 import java.net.http.HttpResponse.PushPromiseHandler;
  78 import java.net.http.WebSocket;
  79 import jdk.internal.net.http.common.BufferSupplier;
  80 import jdk.internal.net.http.common.Log;
  81 import jdk.internal.net.http.common.Logger;
  82 import jdk.internal.net.http.common.Pair;
  83 import jdk.internal.net.http.common.Utils;
  84 import jdk.internal.net.http.common.OperationTrackers.Trackable;
  85 import jdk.internal.net.http.common.OperationTrackers.Tracker;
  86 import jdk.internal.net.http.websocket.BuilderImpl;
  87 import jdk.internal.misc.InnocuousThread;
  88 
  89 /**
  90  * Client implementation. Contains all configuration information and also
  91  * the selector manager thread which allows async events to be registered
  92  * and delivered when they occur. See AsyncEvent.
  93  */
  94 final class HttpClientImpl extends HttpClient implements Trackable {
  95 
  96     static final boolean DEBUGELAPSED = Utils.TESTING || Utils.DEBUG;  // dev flag
  97     static final boolean DEBUGTIMEOUT = false; // dev flag
  98     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  99     final Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
 100     final Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
 101     static final AtomicLong CLIENT_IDS = new AtomicLong();
 102 
 103     // Define the default factory as a static inner class
 104     // that embeds all the necessary logic to avoid
 105     // the risk of using a lambda that might keep a reference on the
 106     // HttpClient instance from which it was created (helps with
 107     // heapdump analysis).
 108     private static final class DefaultThreadFactory implements ThreadFactory {
 109         private final String namePrefix;
 110         private final AtomicInteger nextId = new AtomicInteger();
 111 
 112         DefaultThreadFactory(long clientID) {
 113             namePrefix = "HttpClient-" + clientID + "-Worker-";
 114         }
 115 
 116         @Override
 117         public Thread newThread(Runnable r) {
 118             String name = namePrefix + nextId.getAndIncrement();
 119             Thread t;
 120             if (System.getSecurityManager() == null) {
 121                 t = new Thread(null, r, name, 0, false);
 122             } else {
 123                 t = InnocuousThread.newThread(name, r);
 124             }
 125             t.setDaemon(true);
 126             return t;
 127         }
 128     }
 129 
 130     /**
 131      * A DelegatingExecutor is an executor that delegates tasks to
 132      * a wrapped executor when it detects that the current thread
 133      * is the SelectorManager thread. If the current thread is not
 134      * the selector manager thread the given task is executed inline.
 135      */
 136     final static class DelegatingExecutor implements Executor {
 137         private final BooleanSupplier isInSelectorThread;
 138         private final Executor delegate;
 139         DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {
 140             this.isInSelectorThread = isInSelectorThread;
 141             this.delegate = delegate;
 142         }
 143 
 144         Executor delegate() {
 145             return delegate;
 146         }
 147 
 148         @Override
 149         public void execute(Runnable command) {
 150             if (isInSelectorThread.getAsBoolean()) {
 151                 delegate.execute(command);
 152             } else {
 153                 command.run();
 154             }
 155         }
 156     }
 157 
 158     private final CookieHandler cookieHandler;
 159     private final Duration connectTimeout;
 160     private final Redirect followRedirects;
 161     private final Optional<ProxySelector> userProxySelector;
 162     private final ProxySelector proxySelector;
 163     private final Authenticator authenticator;
 164     private final Version version;
 165     private final ConnectionPool connections;
 166     private final DelegatingExecutor delegatingExecutor;
 167     private final boolean isDefaultExecutor;
 168     // Security parameters
 169     private final SSLContext sslContext;
 170     private final SSLParameters sslParams;
 171     private final SelectorManager selmgr;
 172     private final FilterFactory filters;
 173     private final Http2ClientImpl client2;
 174     private final long id;
 175     private final String dbgTag;
 176 
 177     // The SSL DirectBuffer Supplier provides the ability to recycle
 178     // buffers used between the socket reader and the SSLEngine, or
 179     // more precisely between the SocketTube publisher and the
 180     // SSLFlowDelegate reader.
 181     private final SSLDirectBufferSupplier sslBufferSupplier
 182             = new SSLDirectBufferSupplier(this);
 183 
 184     // This reference is used to keep track of the facade HttpClient
 185     // that was returned to the application code.
 186     // It makes it possible to know when the application no longer
 187     // holds any reference to the HttpClient.
 188     // Unfortunately, this information is not enough to know when
 189     // to exit the SelectorManager thread. Because of the asynchronous
 190     // nature of the API, we also need to wait until all pending operations
 191     // have completed.
 192     private final WeakReference<HttpClientFacade> facadeRef;
 193 
 194     // This counter keeps track of the number of operations pending
 195     // on the HttpClient. The SelectorManager thread will wait
 196     // until there are no longer any pending operations and the
 197     // facadeRef is cleared before exiting.
 198     //
 199     // The pendingOperationCount is incremented every time a send/sendAsync
 200     // operation is invoked on the HttpClient, and is decremented when
 201     // the HttpResponse<T> object is returned to the user.
 202     // However, at this point, the body may not have been fully read yet.
 203     // This is the case when the response T is implemented as a streaming
 204     // subscriber (such as an InputStream).
 205     //
 206     // To take care of this issue the pendingOperationCount will additionally
 207     // be incremented/decremented in the following cases:
 208     //
 209     // 1. For HTTP/2  it is incremented when a stream is added to the
 210     //    Http2Connection streams map, and decreased when the stream is removed
 211     //    from the map. This should also take care of push promises.
 212     // 2. For WebSocket the count is increased when creating a
 213     //    DetachedConnectionChannel for the socket, and decreased
 214     //    when the channel is closed.
 215     //    In addition, the HttpClient facade is passed to the WebSocket builder,
 216     //    (instead of the client implementation delegate).
 217     // 3. For HTTP/1.1 the count is incremented before starting to parse the body
 218     //    response, and decremented when the parser has reached the end of the
 219     //    response body flow.
 220     //
 221     // This should ensure that the selector manager thread remains alive until
 222     // the response has been fully received or the web socket is closed.
 223     private final AtomicLong pendingOperationCount = new AtomicLong();
 224     private final AtomicLong pendingWebSocketCount = new AtomicLong();
 225     private final AtomicLong pendingHttpRequestCount = new AtomicLong();
 226     private final AtomicLong pendingHttp2StreamCount = new AtomicLong();
 227 
 228     /** A Set of, deadline first, ordered timeout events. */
 229     private final TreeSet<TimeoutEvent> timeouts;
 230 
 231     /**
 232      * This is a bit tricky:
 233      * 1. an HttpClientFacade has a final HttpClientImpl field.
 234      * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
 235      *    where the referent is the facade created for that instance.
 236      * 3. We cannot just create the HttpClientFacade in the HttpClientImpl
 237      *    constructor, because it would be only weakly referenced and could
 238      *    be GC'ed before we can return it.
 239      * The solution is to use an instance of SingleFacadeFactory which will
 240      * allow the caller of new HttpClientImpl(...) to retrieve the facade
 241      * after the HttpClientImpl has been created.
 242      */
 243     private static final class SingleFacadeFactory {
 244         HttpClientFacade facade;
 245         HttpClientFacade createFacade(HttpClientImpl impl) {
 246             assert facade == null;
 247             return (facade = new HttpClientFacade(impl));
 248         }
 249     }
 250 
 251     static HttpClientFacade create(HttpClientBuilderImpl builder) {
 252         SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
 253         HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
 254         impl.start();
 255         assert facadeFactory.facade != null;
 256         assert impl.facadeRef.get() == facadeFactory.facade;
 257         return facadeFactory.facade;
 258     }
 259 
 260     private HttpClientImpl(HttpClientBuilderImpl builder,
 261                            SingleFacadeFactory facadeFactory) {
 262         id = CLIENT_IDS.incrementAndGet();
 263         dbgTag = "HttpClientImpl(" + id +")";
 264         if (builder.sslContext == null) {
 265             try {
 266                 sslContext = SSLContext.getDefault();
 267             } catch (NoSuchAlgorithmException ex) {
 268                 throw new InternalError(ex);
 269             }
 270         } else {
 271             sslContext = builder.sslContext;
 272         }
 273         Executor ex = builder.executor;
 274         if (ex == null) {
 275             ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
 276             isDefaultExecutor = true;
 277         } else {
 278             isDefaultExecutor = false;
 279         }
 280         delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
 281         facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
 282         client2 = new Http2ClientImpl(this);
 283         cookieHandler = builder.cookieHandler;
 284         connectTimeout = builder.connectTimeout;
 285         followRedirects = builder.followRedirects == null ?
 286                 Redirect.NEVER : builder.followRedirects;
 287         this.userProxySelector = Optional.ofNullable(builder.proxy);
 288         this.proxySelector = userProxySelector
 289                 .orElseGet(HttpClientImpl::getDefaultProxySelector);
 290         if (debug.on())
 291             debug.log("proxySelector is %s (user-supplied=%s)",
 292                       this.proxySelector, userProxySelector.isPresent());
 293         authenticator = builder.authenticator;
 294         if (builder.version == null) {
 295             version = HttpClient.Version.HTTP_2;
 296         } else {
 297             version = builder.version;
 298         }
 299         if (builder.sslParams == null) {
 300             sslParams = getDefaultParams(sslContext);
 301         } else {
 302             sslParams = builder.sslParams;
 303         }
 304         connections = new ConnectionPool(id);
 305         connections.start();
 306         timeouts = new TreeSet<>();
 307         try {
 308             selmgr = new SelectorManager(this);
 309         } catch (IOException e) {
 310             // unlikely
 311             throw new InternalError(e);
 312         }
 313         selmgr.setDaemon(true);
 314         filters = new FilterFactory();
 315         initFilters();
 316         assert facadeRef.get() != null;
 317     }
 318 
 319     private void start() {
 320         selmgr.start();
 321     }
 322 
 323     // Called from the SelectorManager thread, just before exiting.
 324     // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections
 325     // that may be still lingering there are properly closed (and their
 326     // possibly still opened SocketChannel released).
 327     private void stop() {
 328         // Clears HTTP/1.1 cache and close its connections
 329         connections.stop();
 330         // Clears HTTP/2 cache and close its connections.
 331         client2.stop();
 332     }
 333 
 334     private static SSLParameters getDefaultParams(SSLContext ctx) {
 335         SSLParameters params = ctx.getSupportedSSLParameters();
 336         String[] protocols = params.getProtocols();
 337         boolean found13 = false;
 338         for (String proto : protocols) {
 339             if (proto.equals("TLSv1.3")) {
 340                 found13 = true;
 341                 break;
 342             }
 343         }
 344         if (found13)
 345             params.setProtocols(new String[] {"TLSv1.3", "TLSv1.2"});
 346         else
 347             params.setProtocols(new String[] {"TLSv1.2"});
 348         return params;
 349     }
 350 
 351     private static ProxySelector getDefaultProxySelector() {
 352         PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;
 353         return AccessController.doPrivileged(action);
 354     }
 355 
 356     // Returns the facade that was returned to the application code.
 357     // May be null if that facade is no longer referenced.
 358     final HttpClientFacade facade() {
 359         return facadeRef.get();
 360     }
 361 
 362     // Increments the pendingOperationCount.
 363     final long reference() {
 364         pendingHttpRequestCount.incrementAndGet();
 365         return pendingOperationCount.incrementAndGet();
 366     }
 367 
 368     // Decrements the pendingOperationCount.
 369     final long unreference() {
 370         final long count = pendingOperationCount.decrementAndGet();
 371         final long httpCount = pendingHttpRequestCount.decrementAndGet();
 372         final long http2Count = pendingHttp2StreamCount.get();
 373         final long webSocketCount = pendingWebSocketCount.get();
 374         if (count == 0 && facade() == null) {
 375             selmgr.wakeupSelector();
 376         }
 377         assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
 378         assert http2Count >= 0 : "count of HTTP/2 operations < 0";
 379         assert webSocketCount >= 0 : "count of WS operations < 0";
 380         assert count >= 0 : "count of pending operations < 0";
 381         return count;
 382     }
 383 
 384     // Increments the pendingOperationCount.
 385     final long streamReference() {
 386         pendingHttp2StreamCount.incrementAndGet();
 387         return pendingOperationCount.incrementAndGet();
 388     }
 389 
 390     // Decrements the pendingOperationCount.
 391     final long streamUnreference() {
 392         final long count = pendingOperationCount.decrementAndGet();
 393         final long http2Count = pendingHttp2StreamCount.decrementAndGet();
 394         final long httpCount = pendingHttpRequestCount.get();
 395         final long webSocketCount = pendingWebSocketCount.get();
 396         if (count == 0 && facade() == null) {
 397             selmgr.wakeupSelector();
 398         }
 399         assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
 400         assert http2Count >= 0 : "count of HTTP/2 operations < 0";
 401         assert webSocketCount >= 0 : "count of WS operations < 0";
 402         assert count >= 0 : "count of pending operations < 0";
 403         return count;
 404     }
 405 
 406     // Increments the pendingOperationCount.
 407     final long webSocketOpen() {
 408         pendingWebSocketCount.incrementAndGet();
 409         return pendingOperationCount.incrementAndGet();
 410     }
 411 
 412     // Decrements the pendingOperationCount.
 413     final long webSocketClose() {
 414         final long count = pendingOperationCount.decrementAndGet();
 415         final long webSocketCount = pendingWebSocketCount.decrementAndGet();
 416         final long httpCount = pendingHttpRequestCount.get();
 417         final long http2Count = pendingHttp2StreamCount.get();
 418         if (count == 0 && facade() == null) {
 419             selmgr.wakeupSelector();
 420         }
 421         assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
 422         assert http2Count >= 0 : "count of HTTP/2 operations < 0";
 423         assert webSocketCount >= 0 : "count of WS operations < 0";
 424         assert count >= 0 : "count of pending operations < 0";
 425         return count;
 426     }
 427 
 428     // Returns the pendingOperationCount.
 429     final long referenceCount() {
 430         return pendingOperationCount.get();
 431     }
 432 
 433     final static class HttpClientTracker implements Tracker {
 434         final AtomicLong httpCount;
 435         final AtomicLong http2Count;
 436         final AtomicLong websocketCount;
 437         final AtomicLong operationsCount;
 438         final Reference<?> reference;
 439         final String name;
 440         HttpClientTracker(AtomicLong http,
 441                           AtomicLong http2,
 442                           AtomicLong ws,
 443                           AtomicLong ops,
 444                           Reference<?> ref,
 445                           String name) {
 446             this.httpCount = http;
 447             this.http2Count = http2;
 448             this.websocketCount = ws;
 449             this.operationsCount = ops;
 450             this.reference = ref;
 451             this.name = name;
 452         }
 453         @Override
 454         public long getOutstandingOperations() {
 455             return operationsCount.get();
 456         }
 457         @Override
 458         public long getOutstandingHttpOperations() {
 459             return httpCount.get();
 460         }
 461         @Override
 462         public long getOutstandingHttp2Streams() { return http2Count.get(); }
 463         @Override
 464         public long getOutstandingWebSocketOperations() {
 465             return websocketCount.get();
 466         }
 467         @Override
 468         public boolean isFacadeReferenced() {
 469             return reference.get() != null;
 470         }
 471         @Override
 472         public String getName() {
 473             return name;
 474         }
 475     }
 476 
 477     public Tracker getOperationsTracker() {
 478         return new HttpClientTracker(pendingHttpRequestCount,
 479                 pendingHttp2StreamCount,
 480                 pendingWebSocketCount,
 481                 pendingOperationCount,
 482                 facadeRef,
 483                 dbgTag);
 484     }
 485 
 486     // Called by the SelectorManager thread to figure out whether it's time
 487     // to terminate.
 488     final boolean isReferenced() {
 489         HttpClient facade = facade();
 490         return facade != null || referenceCount() > 0;
 491     }
 492 
 493     /**
 494      * Wait for activity on given exchange.
 495      * The following occurs in the SelectorManager thread.
 496      *
 497      *  1) add to selector
 498      *  2) If selector fires for this exchange then
 499      *     call AsyncEvent.handle()
 500      *
 501      * If exchange needs to change interest ops, then call registerEvent() again.
 502      */
 503     void registerEvent(AsyncEvent exchange) throws IOException {
 504         selmgr.register(exchange);
 505     }
 506 
 507     /**
 508      * Allows an AsyncEvent to modify its interestOps.
 509      * @param event The modified event.
 510      */
 511     void eventUpdated(AsyncEvent event) throws ClosedChannelException {
 512         assert !(event instanceof AsyncTriggerEvent);
 513         selmgr.eventUpdated(event);
 514     }
 515 
 516     boolean isSelectorThread() {
 517         return Thread.currentThread() == selmgr;
 518     }
 519 
 520     Http2ClientImpl client2() {
 521         return client2;
 522     }
 523 
 524     private void debugCompleted(String tag, long startNanos, HttpRequest req) {
 525         if (debugelapsed.on()) {
 526             debugelapsed.log(tag + " elapsed "
 527                     + (System.nanoTime() - startNanos)/1000_000L
 528                     + " millis for " + req.method()
 529                     + " to " + req.uri());
 530         }
 531     }
 532 
 533     @Override
 534     public <T> HttpResponse<T>
 535     send(HttpRequest req, BodyHandler<T> responseHandler)
 536         throws IOException, InterruptedException
 537     {
 538         CompletableFuture<HttpResponse<T>> cf = null;
 539         try {
 540             cf = sendAsync(req, responseHandler, null, null);
 541             return cf.get();
 542         } catch (InterruptedException ie) {
 543             if (cf != null )
 544                 cf.cancel(true);
 545             throw ie;
 546         } catch (ExecutionException e) {
 547             final Throwable throwable = e.getCause();
 548             final String msg = throwable.getMessage();
 549 
 550             if (throwable instanceof IllegalArgumentException) {
 551                 throw new IllegalArgumentException(msg, throwable);
 552             } else if (throwable instanceof SecurityException) {
 553                 throw new SecurityException(msg, throwable);
 554             } else if (throwable instanceof HttpConnectTimeoutException) {
 555                 HttpConnectTimeoutException hcte = new HttpConnectTimeoutException(msg);
 556                 hcte.initCause(throwable);
 557                 throw hcte;
 558             } else if (throwable instanceof HttpTimeoutException) {
 559                 throw new HttpTimeoutException(msg);
 560             } else if (throwable instanceof ConnectException) {
 561                 ConnectException ce = new ConnectException(msg);
 562                 ce.initCause(throwable);
 563                 throw ce;
 564             } else if (throwable instanceof IOException) {
 565                 throw new IOException(msg, throwable);
 566             } else {
 567                 throw new IOException(msg, throwable);
 568             }
 569         }
 570     }
 571 
 572     private static final Executor ASYNC_POOL = new CompletableFuture<Void>().defaultExecutor();
 573 
 574     @Override
 575     public <T> CompletableFuture<HttpResponse<T>>
 576     sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
 577     {
 578         return sendAsync(userRequest, responseHandler, null);
 579     }
 580 
 581     @Override
 582     public <T> CompletableFuture<HttpResponse<T>>
 583     sendAsync(HttpRequest userRequest,
 584               BodyHandler<T> responseHandler,
 585               PushPromiseHandler<T> pushPromiseHandler) {
 586         return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
 587     }
 588 
 589     private <T> CompletableFuture<HttpResponse<T>>
 590     sendAsync(HttpRequest userRequest,
 591               BodyHandler<T> responseHandler,
 592               PushPromiseHandler<T> pushPromiseHandler,
 593               Executor exchangeExecutor)    {
 594 
 595         Objects.requireNonNull(userRequest);
 596         Objects.requireNonNull(responseHandler);
 597 
 598         AccessControlContext acc = null;
 599         if (System.getSecurityManager() != null)
 600             acc = AccessController.getContext();
 601 
 602         // Clone the, possibly untrusted, HttpRequest
 603         HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);
 604         if (requestImpl.method().equals("CONNECT"))
 605             throw new IllegalArgumentException("Unsupported method CONNECT");
 606 
 607         long start = DEBUGELAPSED ? System.nanoTime() : 0;
 608         reference();
 609         try {
 610             if (debugelapsed.on())
 611                 debugelapsed.log("ClientImpl (async) send %s", userRequest);
 612 
 613             // When using sendAsync(...) we explicitly pass the
 614             // executor's delegate as exchange executor to force
 615             // asynchronous scheduling of the exchange.
 616             // When using send(...) we don't specify any executor
 617             // and default to using the client's delegating executor
 618             // which only spawns asynchronous tasks if it detects
 619             // that the current thread is the selector manager
 620             // thread. This will cause everything to execute inline
 621             // until we need to schedule some event with the selector.
 622             Executor executor = exchangeExecutor == null
 623                     ? this.delegatingExecutor : exchangeExecutor;
 624 
 625             MultiExchange<T> mex = new MultiExchange<>(userRequest,
 626                                                             requestImpl,
 627                                                             this,
 628                                                             responseHandler,
 629                                                             pushPromiseHandler,
 630                                                             acc);
 631             CompletableFuture<HttpResponse<T>> res =
 632                     mex.responseAsync(executor).whenComplete((b,t) -> unreference());
 633             if (DEBUGELAPSED) {
 634                 res = res.whenComplete(
 635                         (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
 636             }
 637 
 638             // makes sure that any dependent actions happen in the CF default
 639             // executor. This is only needed for sendAsync(...), when
 640             // exchangeExecutor is non-null.
 641             if (exchangeExecutor != null) {
 642                 res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
 643             }
 644             return res;
 645         } catch(Throwable t) {
 646             unreference();
 647             debugCompleted("ClientImpl (async)", start, userRequest);
 648             throw t;
 649         }
 650     }
 651 
 652     // Main loop for this client's selector
 653     private final static class SelectorManager extends Thread {
 654 
 655         // For testing purposes we have an internal System property that
 656         // can control the frequency at which the selector manager will wake
 657         // up when there are no pending operations.
 658         // Increasing the frequency (shorter delays) might allow the selector
 659         // to observe that the facade is no longer referenced and might allow
 660         // the selector thread to terminate more timely - for when nothing is
 661         // ongoing it will only check for that condition every NODEADLINE ms.
 662         // To avoid misuse of the property, the delay that can be specified
 663         // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default
 664         // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms
 665         // The property is -Djdk.internal.httpclient.selectorTimeout=<millis>
 666         private static final int MIN_NODEADLINE = 1000; // ms
 667         private static final int MAX_NODEADLINE = 1000 * 1200; // ms
 668         private static final int DEF_NODEADLINE = 3000; // ms
 669         private static final long NODEADLINE; // default is DEF_NODEADLINE ms
 670         static {
 671             // ensure NODEADLINE is initialized with some valid value.
 672             long deadline =  Utils.getIntegerProperty(
 673                 "jdk.internal.httpclient.selectorTimeout",
 674                 DEF_NODEADLINE); // millis
 675             if (deadline <= 0) deadline = DEF_NODEADLINE;
 676             deadline = Math.max(deadline, MIN_NODEADLINE);
 677             NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
 678         }
 679 
 680         private final Selector selector;
 681         private volatile boolean closed;
 682         private final List<AsyncEvent> registrations;
 683         private final List<AsyncTriggerEvent> deregistrations;
 684         private final Logger debug;
 685         private final Logger debugtimeout;
 686         HttpClientImpl owner;
 687         ConnectionPool pool;
 688 
 689         SelectorManager(HttpClientImpl ref) throws IOException {
 690             super(null, null,
 691                   "HttpClient-" + ref.id + "-SelectorManager",
 692                   0, false);
 693             owner = ref;
 694             debug = ref.debug;
 695             debugtimeout = ref.debugtimeout;
 696             pool = ref.connectionPool();
 697             registrations = new ArrayList<>();
 698             deregistrations = new ArrayList<>();
 699             selector = Selector.open();
 700         }
 701 
 702         void eventUpdated(AsyncEvent e) throws ClosedChannelException {
 703             if (Thread.currentThread() == this) {
 704                 SelectionKey key = e.channel().keyFor(selector);
 705                 if (key != null && key.isValid()) {
 706                     SelectorAttachment sa = (SelectorAttachment) key.attachment();
 707                     sa.register(e);
 708                 } else if (e.interestOps() != 0){
 709                     // We don't care about paused events.
 710                     // These are actually handled by
 711                     // SelectorAttachment::resetInterestOps later on.
 712                     // But if we reach here when trying to resume an
 713                     // event then it's better to fail fast.
 714                     if (debug.on()) debug.log("No key for channel");
 715                     e.abort(new IOException("No key for channel"));
 716                 }
 717             } else {
 718                 register(e);
 719             }
 720         }
 721 
 722         // This returns immediately. So caller not allowed to send/receive
 723         // on connection.
 724         synchronized void register(AsyncEvent e) {
 725             registrations.add(e);
 726             selector.wakeup();
 727         }
 728 
 729         synchronized void cancel(SocketChannel e) {
 730             SelectionKey key = e.keyFor(selector);
 731             if (key != null) {
 732                 key.cancel();
 733             }
 734             selector.wakeup();
 735         }
 736 
 737         void wakeupSelector() {
 738             selector.wakeup();
 739         }
 740 
 741         synchronized void shutdown() {
 742             Log.logTrace("{0}: shutting down", getName());
 743             if (debug.on()) debug.log("SelectorManager shutting down");
 744             closed = true;
 745             try {
 746                 selector.close();
 747             } catch (IOException ignored) {
 748             } finally {
 749                 owner.stop();
 750             }
 751         }
 752 
 753         @Override
 754         public void run() {
 755             List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
 756             List<AsyncEvent> readyList = new ArrayList<>();
 757             List<Runnable> resetList = new ArrayList<>();
 758             try {
 759                 if (Log.channel()) Log.logChannel(getName() + ": starting");
 760                 while (!Thread.currentThread().isInterrupted()) {
 761                     synchronized (this) {
 762                         assert errorList.isEmpty();
 763                         assert readyList.isEmpty();
 764                         assert resetList.isEmpty();
 765                         for (AsyncTriggerEvent event : deregistrations) {
 766                             event.handle();
 767                         }
 768                         deregistrations.clear();
 769                         for (AsyncEvent event : registrations) {
 770                             if (event instanceof AsyncTriggerEvent) {
 771                                 readyList.add(event);
 772                                 continue;
 773                             }
 774                             SelectableChannel chan = event.channel();
 775                             SelectionKey key = null;
 776                             try {
 777                                 key = chan.keyFor(selector);
 778                                 SelectorAttachment sa;
 779                                 if (key == null || !key.isValid()) {
 780                                     if (key != null) {
 781                                         // key is canceled.
 782                                         // invoke selectNow() to purge it
 783                                         // before registering the new event.
 784                                         selector.selectNow();
 785                                     }
 786                                     sa = new SelectorAttachment(chan, selector);
 787                                 } else {
 788                                     sa = (SelectorAttachment) key.attachment();
 789                                 }
 790                                 // may throw IOE if channel closed: that's OK
 791                                 sa.register(event);
 792                                 if (!chan.isOpen()) {
 793                                     throw new IOException("Channel closed");
 794                                 }
 795                             } catch (IOException e) {
 796                                 Log.logTrace("{0}: {1}", getName(), e);
 797                                 if (debug.on())
 798                                     debug.log("Got " + e.getClass().getName()
 799                                               + " while handling registration events");
 800                                 chan.close();
 801                                 // let the event abort deal with it
 802                                 errorList.add(new Pair<>(event, e));
 803                                 if (key != null) {
 804                                     key.cancel();
 805                                     selector.selectNow();
 806                                 }
 807                             }
 808                         }
 809                         registrations.clear();
 810                         selector.selectedKeys().clear();
 811                     }
 812 
 813                     for (AsyncEvent event : readyList) {
 814                         assert event instanceof AsyncTriggerEvent;
 815                         event.handle();
 816                     }
 817                     readyList.clear();
 818 
 819                     for (Pair<AsyncEvent,IOException> error : errorList) {
 820                         // an IOException was raised and the channel closed.
 821                         handleEvent(error.first, error.second);
 822                     }
 823                     errorList.clear();
 824 
 825                     // Check whether client is still alive, and if not,
 826                     // gracefully stop this thread
 827                     if (!owner.isReferenced()) {
 828                         Log.logTrace("{0}: {1}",
 829                                 getName(),
 830                                 "HttpClient no longer referenced. Exiting...");
 831                         return;
 832                     }
 833 
 834                     // Timeouts will have milliseconds granularity. It is important
 835                     // to handle them in a timely fashion.
 836                     long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
 837                     if (debugtimeout.on())
 838                         debugtimeout.log("next timeout: %d", nextTimeout);
 839 
 840                     // Keep-alive have seconds granularity. It's not really an
 841                     // issue if we keep connections linger a bit more in the keep
 842                     // alive cache.
 843                     long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
 844                     if (debugtimeout.on())
 845                         debugtimeout.log("next expired: %d", nextExpiry);
 846 
 847                     assert nextTimeout >= 0;
 848                     assert nextExpiry >= 0;
 849 
 850                     // Don't wait for ever as it might prevent the thread to
 851                     // stop gracefully. millis will be 0 if no deadline was found.
 852                     if (nextTimeout <= 0) nextTimeout = NODEADLINE;
 853 
 854                     // Clip nextExpiry at NODEADLINE limit. The default
 855                     // keep alive is 1200 seconds (half an hour) - we don't
 856                     // want to wait that long.
 857                     if (nextExpiry <= 0) nextExpiry = NODEADLINE;
 858                     else nextExpiry = Math.min(NODEADLINE, nextExpiry);
 859 
 860                     // takes the least of the two.
 861                     long millis = Math.min(nextExpiry, nextTimeout);
 862 
 863                     if (debugtimeout.on())
 864                         debugtimeout.log("Next deadline is %d",
 865                                          (millis == 0 ? NODEADLINE : millis));
 866                     //debugPrint(selector);
 867                     int n = selector.select(millis == 0 ? NODEADLINE : millis);
 868                     if (n == 0) {
 869                         // Check whether client is still alive, and if not,
 870                         // gracefully stop this thread
 871                         if (!owner.isReferenced()) {
 872                             Log.logTrace("{0}: {1}",
 873                                     getName(),
 874                                     "HttpClient no longer referenced. Exiting...");
 875                             return;
 876                         }
 877                         owner.purgeTimeoutsAndReturnNextDeadline();
 878                         continue;
 879                     }
 880 
 881                     Set<SelectionKey> keys = selector.selectedKeys();
 882                     assert errorList.isEmpty();
 883 
 884                     for (SelectionKey key : keys) {
 885                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
 886                         if (!key.isValid()) {
 887                             IOException ex = sa.chan.isOpen()
 888                                     ? new IOException("Invalid key")
 889                                     : new ClosedChannelException();
 890                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
 891                             sa.pending.clear();
 892                             continue;
 893                         }
 894 
 895                         int eventsOccurred;
 896                         try {
 897                             eventsOccurred = key.readyOps();
 898                         } catch (CancelledKeyException ex) {
 899                             IOException io = Utils.getIOException(ex);
 900                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
 901                             sa.pending.clear();
 902                             continue;
 903                         }
 904                         sa.events(eventsOccurred).forEach(readyList::add);
 905                         resetList.add(() -> sa.resetInterestOps(eventsOccurred));
 906                     }
 907 
 908                     selector.selectNow(); // complete cancellation
 909                     selector.selectedKeys().clear();
 910 
 911                     // handle selected events
 912                     readyList.forEach((e) -> handleEvent(e, null));
 913                     readyList.clear();
 914 
 915                     // handle errors (closed channels etc...)
 916                     errorList.forEach((p) -> handleEvent(p.first, p.second));
 917                     errorList.clear();
 918 
 919                     // reset interest ops for selected channels
 920                     resetList.forEach(r -> r.run());
 921                     resetList.clear();
 922 
 923                 }
 924             } catch (Throwable e) {
 925                 if (!closed) {
 926                     // This terminates thread. So, better just print stack trace
 927                     String err = Utils.stackTrace(e);
 928                     Log.logError("{0}: {1}: {2}", getName(),
 929                             "HttpClientImpl shutting down due to fatal error", err);
 930                 }
 931                 if (debug.on()) debug.log("shutting down", e);
 932                 if (Utils.ASSERTIONSENABLED && !debug.on()) {
 933                     e.printStackTrace(System.err); // always print the stack
 934                 }
 935             } finally {
 936                 if (Log.channel()) Log.logChannel(getName() + ": stopping");
 937                 shutdown();
 938             }
 939         }
 940 
 941 //        void debugPrint(Selector selector) {
 942 //            System.err.println("Selector: debugprint start");
 943 //            Set<SelectionKey> keys = selector.keys();
 944 //            for (SelectionKey key : keys) {
 945 //                SelectableChannel c = key.channel();
 946 //                int ops = key.interestOps();
 947 //                System.err.printf("selector chan:%s ops:%d\n", c, ops);
 948 //            }
 949 //            System.err.println("Selector: debugprint end");
 950 //        }
 951 
 952         /** Handles the given event. The given ioe may be null. */
 953         void handleEvent(AsyncEvent event, IOException ioe) {
 954             if (closed || ioe != null) {
 955                 event.abort(ioe);
 956             } else {
 957                 event.handle();
 958             }
 959         }
 960     }
 961 
 962     final String debugInterestOps(SelectableChannel channel) {
 963         try {
 964             SelectionKey key = channel.keyFor(selmgr.selector);
 965             if (key == null) return "channel not registered with selector";
 966             String keyInterestOps = key.isValid()
 967                     ? "key.interestOps=" + key.interestOps() : "invalid key";
 968             return String.format("channel registered with selector, %s, sa.interestOps=%s",
 969                                  keyInterestOps,
 970                                  ((SelectorAttachment)key.attachment()).interestOps);
 971         } catch (Throwable t) {
 972             return String.valueOf(t);
 973         }
 974     }
 975 
 976     /**
 977      * Tracks multiple user level registrations associated with one NIO
 978      * registration (SelectionKey). In this implementation, registrations
 979      * are one-off and when an event is posted the registration is cancelled
 980      * until explicitly registered again.
 981      *
 982      * <p> No external synchronization required as this class is only used
 983      * by the SelectorManager thread. One of these objects required per
 984      * connection.
 985      */
 986     private static class SelectorAttachment {
 987         private final SelectableChannel chan;
 988         private final Selector selector;
 989         private final Set<AsyncEvent> pending;
 990         private final static Logger debug =
 991                 Utils.getDebugLogger("SelectorAttachment"::toString, Utils.DEBUG);
 992         private int interestOps;
 993 
 994         SelectorAttachment(SelectableChannel chan, Selector selector) {
 995             this.pending = new HashSet<>();
 996             this.chan = chan;
 997             this.selector = selector;
 998         }
 999 
1000         void register(AsyncEvent e) throws ClosedChannelException {
1001             int newOps = e.interestOps();
1002             // re register interest if we are not already interested
1003             // in the event. If the event is paused, then the pause will
1004             // be taken into account later when resetInterestOps is called.
1005             boolean reRegister = (interestOps & newOps) != newOps;
1006             interestOps |= newOps;
1007             pending.add(e);
1008             if (debug.on())
1009                 debug.log("Registering %s for %d (%s)", e, newOps, reRegister);
1010             if (reRegister) {
1011                 // first time registration happens here also
1012                 try {
1013                     chan.register(selector, interestOps, this);
1014                 } catch (Throwable x) {
1015                     abortPending(x);
1016                 }
1017             } else if (!chan.isOpen()) {
1018                 abortPending(new ClosedChannelException());
1019             }
1020         }
1021 
1022         /**
1023          * Returns a Stream<AsyncEvents> containing only events that are
1024          * registered with the given {@code interestOps}.
1025          */
1026         Stream<AsyncEvent> events(int interestOps) {
1027             return pending.stream()
1028                     .filter(ev -> (ev.interestOps() & interestOps) != 0);
1029         }
1030 
1031         /**
1032          * Removes any events with the given {@code interestOps}, and if no
1033          * events remaining, cancels the associated SelectionKey.
1034          */
1035         void resetInterestOps(int interestOps) {
1036             int newOps = 0;
1037 
1038             Iterator<AsyncEvent> itr = pending.iterator();
1039             while (itr.hasNext()) {
1040                 AsyncEvent event = itr.next();
1041                 int evops = event.interestOps();
1042                 if (event.repeating()) {
1043                     newOps |= evops;
1044                     continue;
1045                 }
1046                 if ((evops & interestOps) != 0) {
1047                     itr.remove();
1048                 } else {
1049                     newOps |= evops;
1050                 }
1051             }
1052 
1053             this.interestOps = newOps;
1054             SelectionKey key = chan.keyFor(selector);
1055             if (newOps == 0 && key != null && pending.isEmpty()) {
1056                 key.cancel();
1057             } else {
1058                 try {
1059                     if (key == null || !key.isValid()) {
1060                         throw new CancelledKeyException();
1061                     }
1062                     key.interestOps(newOps);
1063                     // double check after
1064                     if (!chan.isOpen()) {
1065                         abortPending(new ClosedChannelException());
1066                         return;
1067                     }
1068                     assert key.interestOps() == newOps;
1069                 } catch (CancelledKeyException x) {
1070                     // channel may have been closed
1071                     if (debug.on()) debug.log("key cancelled for " + chan);
1072                     abortPending(x);
1073                 }
1074             }
1075         }
1076 
1077         void abortPending(Throwable x) {
1078             if (!pending.isEmpty()) {
1079                 AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
1080                 pending.clear();
1081                 IOException io = Utils.getIOException(x);
1082                 for (AsyncEvent event : evts) {
1083                     event.abort(io);
1084                 }
1085             }
1086         }
1087     }
1088 
1089     /*package-private*/ SSLContext theSSLContext() {
1090         return sslContext;
1091     }
1092 
1093     @Override
1094     public SSLContext sslContext() {
1095         return sslContext;
1096     }
1097 
1098     @Override
1099     public SSLParameters sslParameters() {
1100         return Utils.copySSLParameters(sslParams);
1101     }
1102 
1103     @Override
1104     public Optional<Authenticator> authenticator() {
1105         return Optional.ofNullable(authenticator);
1106     }
1107 
1108     /*package-private*/ final DelegatingExecutor theExecutor() {
1109         return delegatingExecutor;
1110     }
1111 
1112     @Override
1113     public final Optional<Executor> executor() {
1114         return isDefaultExecutor
1115                 ? Optional.empty()
1116                 : Optional.of(delegatingExecutor.delegate());
1117     }
1118 
1119     ConnectionPool connectionPool() {
1120         return connections;
1121     }
1122 
1123     @Override
1124     public Redirect followRedirects() {
1125         return followRedirects;
1126     }
1127 
1128 
1129     @Override
1130     public Optional<CookieHandler> cookieHandler() {
1131         return Optional.ofNullable(cookieHandler);
1132     }
1133 
1134     @Override
1135     public Optional<Duration> connectTimeout() {
1136         return Optional.ofNullable(connectTimeout);
1137     }
1138 
1139     @Override
1140     public Optional<ProxySelector> proxy() {
1141         return this.userProxySelector;
1142     }
1143 
1144     // Return the effective proxy that this client uses.
1145     ProxySelector proxySelector() {
1146         return proxySelector;
1147     }
1148 
1149     @Override
1150     public WebSocket.Builder newWebSocketBuilder() {
1151         // Make sure to pass the HttpClientFacade to the WebSocket builder.
1152         // This will ensure that the facade is not released before the
1153         // WebSocket has been created, at which point the pendingOperationCount
1154         // will have been incremented by the RawChannelTube.
1155         // See RawChannelTube.
1156         return new BuilderImpl(this.facade(), proxySelector);
1157     }
1158 
1159     @Override
1160     public Version version() {
1161         return version;
1162     }
1163 
1164     String dbgString() {
1165         return dbgTag;
1166     }
1167 
1168     @Override
1169     public String toString() {
1170         // Used by tests to get the client's id and compute the
1171         // name of the SelectorManager thread.
1172         return super.toString() + ("(" + id + ")");
1173     }
1174 
1175     private void initFilters() {
1176         addFilter(AuthenticationFilter.class);
1177         addFilter(RedirectFilter.class);
1178         if (this.cookieHandler != null) {
1179             addFilter(CookieFilter.class);
1180         }
1181     }
1182 
1183     private void addFilter(Class<? extends HeaderFilter> f) {
1184         filters.addFilter(f);
1185     }
1186 
1187     final LinkedList<HeaderFilter> filterChain() {
1188         return filters.getFilterChain();
1189     }
1190 
1191     // Timer controls.
1192     // Timers are implemented through timed Selector.select() calls.
1193 
1194     synchronized void registerTimer(TimeoutEvent event) {
1195         Log.logTrace("Registering timer {0}", event);
1196         timeouts.add(event);
1197         selmgr.wakeupSelector();
1198     }
1199 
1200     synchronized void cancelTimer(TimeoutEvent event) {
1201         Log.logTrace("Canceling timer {0}", event);
1202         timeouts.remove(event);
1203     }
1204 
1205     /**
1206      * Purges ( handles ) timer events that have passed their deadline, and
1207      * returns the amount of time, in milliseconds, until the next earliest
1208      * event. A return value of 0 means that there are no events.
1209      */
1210     private long purgeTimeoutsAndReturnNextDeadline() {
1211         long diff = 0L;
1212         List<TimeoutEvent> toHandle = null;
1213         int remaining = 0;
1214         // enter critical section to retrieve the timeout event to handle
1215         synchronized(this) {
1216             if (timeouts.isEmpty()) return 0L;
1217 
1218             Instant now = Instant.now();
1219             Iterator<TimeoutEvent> itr = timeouts.iterator();
1220             while (itr.hasNext()) {
1221                 TimeoutEvent event = itr.next();
1222                 diff = now.until(event.deadline(), ChronoUnit.MILLIS);
1223                 if (diff <= 0) {
1224                     itr.remove();
1225                     toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
1226                     toHandle.add(event);
1227                 } else {
1228                     break;
1229                 }
1230             }
1231             remaining = timeouts.size();
1232         }
1233 
1234         // can be useful for debugging
1235         if (toHandle != null && Log.trace()) {
1236             Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
1237                     +  toHandle.size() + " events, "
1238                     + "remaining " + remaining
1239                     + ", next deadline: " + (diff < 0 ? 0L : diff));
1240         }
1241 
1242         // handle timeout events out of critical section
1243         if (toHandle != null) {
1244             Throwable failed = null;
1245             for (TimeoutEvent event : toHandle) {
1246                 try {
1247                    Log.logTrace("Firing timer {0}", event);
1248                    event.handle();
1249                 } catch (Error | RuntimeException e) {
1250                     // Not expected. Handle remaining events then throw...
1251                     // If e is an OOME or SOE it might simply trigger a new
1252                     // error from here - but in this case there's not much we
1253                     // could do anyway. Just let it flow...
1254                     if (failed == null) failed = e;
1255                     else failed.addSuppressed(e);
1256                     Log.logTrace("Failed to handle event {0}: {1}", event, e);
1257                 }
1258             }
1259             if (failed instanceof Error) throw (Error) failed;
1260             if (failed instanceof RuntimeException) throw (RuntimeException) failed;
1261         }
1262 
1263         // return time to wait until next event. 0L if there's no more events.
1264         return diff < 0 ? 0L : diff;
1265     }
1266 
1267     // used for the connection window
1268     int getReceiveBufferSize() {
1269         return Utils.getIntegerNetProperty(
1270                 "jdk.httpclient.receiveBufferSize",
1271                 0 // only set the size if > 0
1272         );
1273     }
1274 
1275     // Optimization for reading SSL encrypted data
1276     // --------------------------------------------
1277 
1278     // Returns a BufferSupplier that can be used for reading
1279     // encrypted bytes of the channel. These buffers can then
1280     // be recycled by the SSLFlowDelegate::Reader after their
1281     // content has been copied in the SSLFlowDelegate::Reader
1282     // readBuf.
1283     // Because allocating, reading, copying, and recycling
1284     // all happen in the SelectorManager thread,
1285     // then this BufferSupplier can be shared between all
1286     // the SSL connections managed by this client.
1287     BufferSupplier getSSLBufferSupplier() {
1288         return sslBufferSupplier;
1289     }
1290 
1291     // An implementation of BufferSupplier that manages a pool of
1292     // maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that
1293     // are used for reading encrypted bytes off the channel before
1294     // copying and subsequent unwrapping.
1295     private static final class SSLDirectBufferSupplier implements BufferSupplier {
1296         private static final int POOL_SIZE = SocketTube.MAX_BUFFERS;
1297         private final ByteBuffer[] pool = new ByteBuffer[POOL_SIZE];
1298         private final HttpClientImpl client;
1299         private final Logger debug;
1300         private int tail, count; // no need for volatile: only accessed in SM thread.
1301 
1302         SSLDirectBufferSupplier(HttpClientImpl client) {
1303             this.client = Objects.requireNonNull(client);
1304             this.debug = client.debug;
1305         }
1306 
1307         // Gets a buffer from the pool, or allocates a new one if needed.
1308         @Override
1309         public ByteBuffer get() {
1310             assert client.isSelectorThread();
1311             assert tail <= POOL_SIZE : "allocate tail is " + tail;
1312             ByteBuffer buf;
1313             if (tail == 0) {
1314                 if (debug.on()) {
1315                     // should not appear more than SocketTube.MAX_BUFFERS
1316                     debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);
1317                 }
1318                 assert count++ < POOL_SIZE : "trying to allocate more than "
1319                             + POOL_SIZE + " buffers";
1320                 buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);
1321             } else {
1322                 assert tail > 0 : "non positive tail value: " + tail;
1323                 tail--;
1324                 buf = pool[tail];
1325                 pool[tail] = null;
1326             }
1327             assert buf.isDirect();
1328             assert buf.position() == 0;
1329             assert buf.hasRemaining();
1330             assert buf.limit() == Utils.BUFSIZE;
1331             assert tail < POOL_SIZE;
1332             assert tail >= 0;
1333             return buf;
1334         }
1335 
1336         // Returns the given buffer to the pool.
1337         @Override
1338         public void recycle(ByteBuffer buffer) {
1339             assert client.isSelectorThread();
1340             assert buffer.isDirect();
1341             assert !buffer.hasRemaining();
1342             assert tail < POOL_SIZE : "recycle tail is " + tail;
1343             assert tail >= 0;
1344             buffer.position(0);
1345             buffer.limit(buffer.capacity());
1346             // don't fail if assertions are off. we have asserted above.
1347             if (tail < POOL_SIZE) {
1348                 pool[tail] = buffer;
1349                 tail++;
1350             }
1351             assert tail <= POOL_SIZE;
1352             assert tail > 0;
1353         }
1354     }
1355 
1356 }