1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import javax.net.ssl.SSLContext; 29 import javax.net.ssl.SSLParameters; 30 import java.io.IOException; 31 import java.io.UncheckedIOException; 32 import java.lang.ref.Reference; 33 import java.lang.ref.WeakReference; 34 import java.net.Authenticator; 35 import java.net.ConnectException; 36 import java.net.CookieHandler; 37 import java.net.ProxySelector; 38 import java.net.http.HttpConnectTimeoutException; 39 import java.net.http.HttpTimeoutException; 40 import java.nio.ByteBuffer; 41 import java.nio.channels.CancelledKeyException; 42 import java.nio.channels.ClosedChannelException; 43 import java.nio.channels.SelectableChannel; 44 import java.nio.channels.SelectionKey; 45 import java.nio.channels.Selector; 46 import java.nio.channels.SocketChannel; 47 import java.security.AccessControlContext; 48 import java.security.AccessController; 49 import java.security.NoSuchAlgorithmException; 50 import java.security.PrivilegedAction; 51 import java.time.Duration; 52 import java.time.Instant; 53 import java.time.temporal.ChronoUnit; 54 import java.util.ArrayList; 55 import java.util.HashSet; 56 import java.util.Iterator; 57 import java.util.LinkedList; 58 import java.util.List; 59 import java.util.Objects; 60 import java.util.Optional; 61 import java.util.Set; 62 import java.util.TreeSet; 63 import java.util.concurrent.CompletableFuture; 64 import java.util.concurrent.CompletionException; 65 import java.util.concurrent.ExecutionException; 66 import java.util.concurrent.Executor; 67 import java.util.concurrent.Executors; 68 import java.util.concurrent.ThreadFactory; 69 import java.util.concurrent.atomic.AtomicInteger; 70 import java.util.concurrent.atomic.AtomicLong; 71 import java.util.function.BooleanSupplier; 72 import java.util.stream.Stream; 73 import java.net.http.HttpClient; 74 import java.net.http.HttpRequest; 75 import java.net.http.HttpResponse; 76 import java.net.http.HttpResponse.BodyHandler; 77 import java.net.http.HttpResponse.PushPromiseHandler; 78 import java.net.http.WebSocket; 79 import jdk.internal.net.http.common.BufferSupplier; 80 import jdk.internal.net.http.common.Log; 81 import jdk.internal.net.http.common.Logger; 82 import jdk.internal.net.http.common.Pair; 83 import jdk.internal.net.http.common.Utils; 84 import jdk.internal.net.http.common.OperationTrackers.Trackable; 85 import jdk.internal.net.http.common.OperationTrackers.Tracker; 86 import jdk.internal.net.http.websocket.BuilderImpl; 87 import jdk.internal.misc.InnocuousThread; 88 89 /** 90 * Client implementation. Contains all configuration information and also 91 * the selector manager thread which allows async events to be registered 92 * and delivered when they occur. See AsyncEvent. 93 */ 94 final class HttpClientImpl extends HttpClient implements Trackable { 95 96 static final boolean DEBUGELAPSED = Utils.TESTING || Utils.DEBUG; // dev flag 97 static final boolean DEBUGTIMEOUT = false; // dev flag 98 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 99 final Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED); 100 final Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT); 101 static final AtomicLong CLIENT_IDS = new AtomicLong(); 102 103 // Define the default factory as a static inner class 104 // that embeds all the necessary logic to avoid 105 // the risk of using a lambda that might keep a reference on the 106 // HttpClient instance from which it was created (helps with 107 // heapdump analysis). 108 private static final class DefaultThreadFactory implements ThreadFactory { 109 private final String namePrefix; 110 private final AtomicInteger nextId = new AtomicInteger(); 111 112 DefaultThreadFactory(long clientID) { 113 namePrefix = "HttpClient-" + clientID + "-Worker-"; 114 } 115 116 @Override 117 public Thread newThread(Runnable r) { 118 String name = namePrefix + nextId.getAndIncrement(); 119 Thread t; 120 if (System.getSecurityManager() == null) { 121 t = new Thread(null, r, name, 0, false); 122 } else { 123 t = InnocuousThread.newThread(name, r); 124 } 125 t.setDaemon(true); 126 return t; 127 } 128 } 129 130 /** 131 * A DelegatingExecutor is an executor that delegates tasks to 132 * a wrapped executor when it detects that the current thread 133 * is the SelectorManager thread. If the current thread is not 134 * the selector manager thread the given task is executed inline. 135 */ 136 final static class DelegatingExecutor implements Executor { 137 private final BooleanSupplier isInSelectorThread; 138 private final Executor delegate; 139 DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) { 140 this.isInSelectorThread = isInSelectorThread; 141 this.delegate = delegate; 142 } 143 144 Executor delegate() { 145 return delegate; 146 } 147 148 @Override 149 public void execute(Runnable command) { 150 if (isInSelectorThread.getAsBoolean()) { 151 delegate.execute(command); 152 } else { 153 command.run(); 154 } 155 } 156 } 157 158 private final CookieHandler cookieHandler; 159 private final Duration connectTimeout; 160 private final Redirect followRedirects; 161 private final Optional<ProxySelector> userProxySelector; 162 private final ProxySelector proxySelector; 163 private final Authenticator authenticator; 164 private final Version version; 165 private final ConnectionPool connections; 166 private final DelegatingExecutor delegatingExecutor; 167 private final boolean isDefaultExecutor; 168 // Security parameters 169 private final SSLContext sslContext; 170 private final SSLParameters sslParams; 171 private final SelectorManager selmgr; 172 private final FilterFactory filters; 173 private final Http2ClientImpl client2; 174 private final long id; 175 private final String dbgTag; 176 177 // The SSL DirectBuffer Supplier provides the ability to recycle 178 // buffers used between the socket reader and the SSLEngine, or 179 // more precisely between the SocketTube publisher and the 180 // SSLFlowDelegate reader. 181 private final SSLDirectBufferSupplier sslBufferSupplier 182 = new SSLDirectBufferSupplier(this); 183 184 // This reference is used to keep track of the facade HttpClient 185 // that was returned to the application code. 186 // It makes it possible to know when the application no longer 187 // holds any reference to the HttpClient. 188 // Unfortunately, this information is not enough to know when 189 // to exit the SelectorManager thread. Because of the asynchronous 190 // nature of the API, we also need to wait until all pending operations 191 // have completed. 192 private final WeakReference<HttpClientFacade> facadeRef; 193 194 // This counter keeps track of the number of operations pending 195 // on the HttpClient. The SelectorManager thread will wait 196 // until there are no longer any pending operations and the 197 // facadeRef is cleared before exiting. 198 // 199 // The pendingOperationCount is incremented every time a send/sendAsync 200 // operation is invoked on the HttpClient, and is decremented when 201 // the HttpResponse<T> object is returned to the user. 202 // However, at this point, the body may not have been fully read yet. 203 // This is the case when the response T is implemented as a streaming 204 // subscriber (such as an InputStream). 205 // 206 // To take care of this issue the pendingOperationCount will additionally 207 // be incremented/decremented in the following cases: 208 // 209 // 1. For HTTP/2 it is incremented when a stream is added to the 210 // Http2Connection streams map, and decreased when the stream is removed 211 // from the map. This should also take care of push promises. 212 // 2. For WebSocket the count is increased when creating a 213 // DetachedConnectionChannel for the socket, and decreased 214 // when the channel is closed. 215 // In addition, the HttpClient facade is passed to the WebSocket builder, 216 // (instead of the client implementation delegate). 217 // 3. For HTTP/1.1 the count is incremented before starting to parse the body 218 // response, and decremented when the parser has reached the end of the 219 // response body flow. 220 // 221 // This should ensure that the selector manager thread remains alive until 222 // the response has been fully received or the web socket is closed. 223 private final AtomicLong pendingOperationCount = new AtomicLong(); 224 private final AtomicLong pendingWebSocketCount = new AtomicLong(); 225 private final AtomicLong pendingHttpRequestCount = new AtomicLong(); 226 private final AtomicLong pendingHttp2StreamCount = new AtomicLong(); 227 228 /** A Set of, deadline first, ordered timeout events. */ 229 private final TreeSet<TimeoutEvent> timeouts; 230 231 /** 232 * This is a bit tricky: 233 * 1. an HttpClientFacade has a final HttpClientImpl field. 234 * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field, 235 * where the referent is the facade created for that instance. 236 * 3. We cannot just create the HttpClientFacade in the HttpClientImpl 237 * constructor, because it would be only weakly referenced and could 238 * be GC'ed before we can return it. 239 * The solution is to use an instance of SingleFacadeFactory which will 240 * allow the caller of new HttpClientImpl(...) to retrieve the facade 241 * after the HttpClientImpl has been created. 242 */ 243 private static final class SingleFacadeFactory { 244 HttpClientFacade facade; 245 HttpClientFacade createFacade(HttpClientImpl impl) { 246 assert facade == null; 247 return (facade = new HttpClientFacade(impl)); 248 } 249 } 250 251 static HttpClientFacade create(HttpClientBuilderImpl builder) { 252 SingleFacadeFactory facadeFactory = new SingleFacadeFactory(); 253 HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory); 254 impl.start(); 255 assert facadeFactory.facade != null; 256 assert impl.facadeRef.get() == facadeFactory.facade; 257 return facadeFactory.facade; 258 } 259 260 private HttpClientImpl(HttpClientBuilderImpl builder, 261 SingleFacadeFactory facadeFactory) { 262 id = CLIENT_IDS.incrementAndGet(); 263 dbgTag = "HttpClientImpl(" + id +")"; 264 if (builder.sslContext == null) { 265 try { 266 sslContext = SSLContext.getDefault(); 267 } catch (NoSuchAlgorithmException ex) { 268 throw new InternalError(ex); 269 } 270 } else { 271 sslContext = builder.sslContext; 272 } 273 Executor ex = builder.executor; 274 if (ex == null) { 275 ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id)); 276 isDefaultExecutor = true; 277 } else { 278 isDefaultExecutor = false; 279 } 280 delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex); 281 facadeRef = new WeakReference<>(facadeFactory.createFacade(this)); 282 client2 = new Http2ClientImpl(this); 283 cookieHandler = builder.cookieHandler; 284 connectTimeout = builder.connectTimeout; 285 followRedirects = builder.followRedirects == null ? 286 Redirect.NEVER : builder.followRedirects; 287 this.userProxySelector = Optional.ofNullable(builder.proxy); 288 this.proxySelector = userProxySelector 289 .orElseGet(HttpClientImpl::getDefaultProxySelector); 290 if (debug.on()) 291 debug.log("proxySelector is %s (user-supplied=%s)", 292 this.proxySelector, userProxySelector.isPresent()); 293 authenticator = builder.authenticator; 294 if (builder.version == null) { 295 version = HttpClient.Version.HTTP_2; 296 } else { 297 version = builder.version; 298 } 299 if (builder.sslParams == null) { 300 sslParams = getDefaultParams(sslContext); 301 } else { 302 sslParams = builder.sslParams; 303 } 304 connections = new ConnectionPool(id); 305 connections.start(); 306 timeouts = new TreeSet<>(); 307 try { 308 selmgr = new SelectorManager(this); 309 } catch (IOException e) { 310 // unlikely 311 throw new InternalError(e); 312 } 313 selmgr.setDaemon(true); 314 filters = new FilterFactory(); 315 initFilters(); 316 assert facadeRef.get() != null; 317 } 318 319 private void start() { 320 selmgr.start(); 321 } 322 323 // Called from the SelectorManager thread, just before exiting. 324 // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections 325 // that may be still lingering there are properly closed (and their 326 // possibly still opened SocketChannel released). 327 private void stop() { 328 // Clears HTTP/1.1 cache and close its connections 329 connections.stop(); 330 // Clears HTTP/2 cache and close its connections. 331 client2.stop(); 332 } 333 334 private static SSLParameters getDefaultParams(SSLContext ctx) { 335 SSLParameters params = ctx.getSupportedSSLParameters(); 336 String[] protocols = params.getProtocols(); 337 boolean found13 = false; 338 for (String proto : protocols) { 339 if (proto.equals("TLSv1.3")) { 340 found13 = true; 341 break; 342 } 343 } 344 if (found13) 345 params.setProtocols(new String[] {"TLSv1.3", "TLSv1.2"}); 346 else 347 params.setProtocols(new String[] {"TLSv1.2"}); 348 return params; 349 } 350 351 private static ProxySelector getDefaultProxySelector() { 352 PrivilegedAction<ProxySelector> action = ProxySelector::getDefault; 353 return AccessController.doPrivileged(action); 354 } 355 356 // Returns the facade that was returned to the application code. 357 // May be null if that facade is no longer referenced. 358 final HttpClientFacade facade() { 359 return facadeRef.get(); 360 } 361 362 // Increments the pendingOperationCount. 363 final long reference() { 364 pendingHttpRequestCount.incrementAndGet(); 365 return pendingOperationCount.incrementAndGet(); 366 } 367 368 // Decrements the pendingOperationCount. 369 final long unreference() { 370 final long count = pendingOperationCount.decrementAndGet(); 371 final long httpCount = pendingHttpRequestCount.decrementAndGet(); 372 final long http2Count = pendingHttp2StreamCount.get(); 373 final long webSocketCount = pendingWebSocketCount.get(); 374 if (count == 0 && facade() == null) { 375 selmgr.wakeupSelector(); 376 } 377 assert httpCount >= 0 : "count of HTTP/1.1 operations < 0"; 378 assert http2Count >= 0 : "count of HTTP/2 operations < 0"; 379 assert webSocketCount >= 0 : "count of WS operations < 0"; 380 assert count >= 0 : "count of pending operations < 0"; 381 return count; 382 } 383 384 // Increments the pendingOperationCount. 385 final long streamReference() { 386 pendingHttp2StreamCount.incrementAndGet(); 387 return pendingOperationCount.incrementAndGet(); 388 } 389 390 // Decrements the pendingOperationCount. 391 final long streamUnreference() { 392 final long count = pendingOperationCount.decrementAndGet(); 393 final long http2Count = pendingHttp2StreamCount.decrementAndGet(); 394 final long httpCount = pendingHttpRequestCount.get(); 395 final long webSocketCount = pendingWebSocketCount.get(); 396 if (count == 0 && facade() == null) { 397 selmgr.wakeupSelector(); 398 } 399 assert httpCount >= 0 : "count of HTTP/1.1 operations < 0"; 400 assert http2Count >= 0 : "count of HTTP/2 operations < 0"; 401 assert webSocketCount >= 0 : "count of WS operations < 0"; 402 assert count >= 0 : "count of pending operations < 0"; 403 return count; 404 } 405 406 // Increments the pendingOperationCount. 407 final long webSocketOpen() { 408 pendingWebSocketCount.incrementAndGet(); 409 return pendingOperationCount.incrementAndGet(); 410 } 411 412 // Decrements the pendingOperationCount. 413 final long webSocketClose() { 414 final long count = pendingOperationCount.decrementAndGet(); 415 final long webSocketCount = pendingWebSocketCount.decrementAndGet(); 416 final long httpCount = pendingHttpRequestCount.get(); 417 final long http2Count = pendingHttp2StreamCount.get(); 418 if (count == 0 && facade() == null) { 419 selmgr.wakeupSelector(); 420 } 421 assert httpCount >= 0 : "count of HTTP/1.1 operations < 0"; 422 assert http2Count >= 0 : "count of HTTP/2 operations < 0"; 423 assert webSocketCount >= 0 : "count of WS operations < 0"; 424 assert count >= 0 : "count of pending operations < 0"; 425 return count; 426 } 427 428 // Returns the pendingOperationCount. 429 final long referenceCount() { 430 return pendingOperationCount.get(); 431 } 432 433 final static class HttpClientTracker implements Tracker { 434 final AtomicLong httpCount; 435 final AtomicLong http2Count; 436 final AtomicLong websocketCount; 437 final AtomicLong operationsCount; 438 final Reference<?> reference; 439 final String name; 440 HttpClientTracker(AtomicLong http, 441 AtomicLong http2, 442 AtomicLong ws, 443 AtomicLong ops, 444 Reference<?> ref, 445 String name) { 446 this.httpCount = http; 447 this.http2Count = http2; 448 this.websocketCount = ws; 449 this.operationsCount = ops; 450 this.reference = ref; 451 this.name = name; 452 } 453 @Override 454 public long getOutstandingOperations() { 455 return operationsCount.get(); 456 } 457 @Override 458 public long getOutstandingHttpOperations() { 459 return httpCount.get(); 460 } 461 @Override 462 public long getOutstandingHttp2Streams() { return http2Count.get(); } 463 @Override 464 public long getOutstandingWebSocketOperations() { 465 return websocketCount.get(); 466 } 467 @Override 468 public boolean isFacadeReferenced() { 469 return reference.get() != null; 470 } 471 @Override 472 public String getName() { 473 return name; 474 } 475 } 476 477 public Tracker getOperationsTracker() { 478 return new HttpClientTracker(pendingHttpRequestCount, 479 pendingHttp2StreamCount, 480 pendingWebSocketCount, 481 pendingOperationCount, 482 facadeRef, 483 dbgTag); 484 } 485 486 // Called by the SelectorManager thread to figure out whether it's time 487 // to terminate. 488 final boolean isReferenced() { 489 HttpClient facade = facade(); 490 return facade != null || referenceCount() > 0; 491 } 492 493 /** 494 * Wait for activity on given exchange. 495 * The following occurs in the SelectorManager thread. 496 * 497 * 1) add to selector 498 * 2) If selector fires for this exchange then 499 * call AsyncEvent.handle() 500 * 501 * If exchange needs to change interest ops, then call registerEvent() again. 502 */ 503 void registerEvent(AsyncEvent exchange) throws IOException { 504 selmgr.register(exchange); 505 } 506 507 /** 508 * Allows an AsyncEvent to modify its interestOps. 509 * @param event The modified event. 510 */ 511 void eventUpdated(AsyncEvent event) throws ClosedChannelException { 512 assert !(event instanceof AsyncTriggerEvent); 513 selmgr.eventUpdated(event); 514 } 515 516 boolean isSelectorThread() { 517 return Thread.currentThread() == selmgr; 518 } 519 520 Http2ClientImpl client2() { 521 return client2; 522 } 523 524 private void debugCompleted(String tag, long startNanos, HttpRequest req) { 525 if (debugelapsed.on()) { 526 debugelapsed.log(tag + " elapsed " 527 + (System.nanoTime() - startNanos)/1000_000L 528 + " millis for " + req.method() 529 + " to " + req.uri()); 530 } 531 } 532 533 @Override 534 public <T> HttpResponse<T> 535 send(HttpRequest req, BodyHandler<T> responseHandler) 536 throws IOException, InterruptedException 537 { 538 CompletableFuture<HttpResponse<T>> cf = null; 539 try { 540 cf = sendAsync(req, responseHandler, null, null); 541 return cf.get(); 542 } catch (InterruptedException ie) { 543 if (cf != null ) 544 cf.cancel(true); 545 throw ie; 546 } catch (ExecutionException e) { 547 final Throwable throwable = e.getCause(); 548 final String msg = throwable.getMessage(); 549 550 if (throwable instanceof IllegalArgumentException) { 551 throw new IllegalArgumentException(msg, throwable); 552 } else if (throwable instanceof SecurityException) { 553 throw new SecurityException(msg, throwable); 554 } else if (throwable instanceof HttpConnectTimeoutException) { 555 HttpConnectTimeoutException hcte = new HttpConnectTimeoutException(msg); 556 hcte.initCause(throwable); 557 throw hcte; 558 } else if (throwable instanceof HttpTimeoutException) { 559 throw new HttpTimeoutException(msg); 560 } else if (throwable instanceof ConnectException) { 561 ConnectException ce = new ConnectException(msg); 562 ce.initCause(throwable); 563 throw ce; 564 } else if (throwable instanceof IOException) { 565 throw new IOException(msg, throwable); 566 } else { 567 throw new IOException(msg, throwable); 568 } 569 } 570 } 571 572 private static final Executor ASYNC_POOL = new CompletableFuture<Void>().defaultExecutor(); 573 574 @Override 575 public <T> CompletableFuture<HttpResponse<T>> 576 sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler) 577 { 578 return sendAsync(userRequest, responseHandler, null); 579 } 580 581 @Override 582 public <T> CompletableFuture<HttpResponse<T>> 583 sendAsync(HttpRequest userRequest, 584 BodyHandler<T> responseHandler, 585 PushPromiseHandler<T> pushPromiseHandler) { 586 return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate); 587 } 588 589 private <T> CompletableFuture<HttpResponse<T>> 590 sendAsync(HttpRequest userRequest, 591 BodyHandler<T> responseHandler, 592 PushPromiseHandler<T> pushPromiseHandler, 593 Executor exchangeExecutor) { 594 595 Objects.requireNonNull(userRequest); 596 Objects.requireNonNull(responseHandler); 597 598 AccessControlContext acc = null; 599 if (System.getSecurityManager() != null) 600 acc = AccessController.getContext(); 601 602 // Clone the, possibly untrusted, HttpRequest 603 HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector); 604 if (requestImpl.method().equals("CONNECT")) 605 throw new IllegalArgumentException("Unsupported method CONNECT"); 606 607 long start = DEBUGELAPSED ? System.nanoTime() : 0; 608 reference(); 609 try { 610 if (debugelapsed.on()) 611 debugelapsed.log("ClientImpl (async) send %s", userRequest); 612 613 // When using sendAsync(...) we explicitly pass the 614 // executor's delegate as exchange executor to force 615 // asynchronous scheduling of the exchange. 616 // When using send(...) we don't specify any executor 617 // and default to using the client's delegating executor 618 // which only spawns asynchronous tasks if it detects 619 // that the current thread is the selector manager 620 // thread. This will cause everything to execute inline 621 // until we need to schedule some event with the selector. 622 Executor executor = exchangeExecutor == null 623 ? this.delegatingExecutor : exchangeExecutor; 624 625 MultiExchange<T> mex = new MultiExchange<>(userRequest, 626 requestImpl, 627 this, 628 responseHandler, 629 pushPromiseHandler, 630 acc); 631 CompletableFuture<HttpResponse<T>> res = 632 mex.responseAsync(executor).whenComplete((b,t) -> unreference()); 633 if (DEBUGELAPSED) { 634 res = res.whenComplete( 635 (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); 636 } 637 638 // makes sure that any dependent actions happen in the CF default 639 // executor. This is only needed for sendAsync(...), when 640 // exchangeExecutor is non-null. 641 if (exchangeExecutor != null) { 642 res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL); 643 } 644 return res; 645 } catch(Throwable t) { 646 unreference(); 647 debugCompleted("ClientImpl (async)", start, userRequest); 648 throw t; 649 } 650 } 651 652 // Main loop for this client's selector 653 private final static class SelectorManager extends Thread { 654 655 // For testing purposes we have an internal System property that 656 // can control the frequency at which the selector manager will wake 657 // up when there are no pending operations. 658 // Increasing the frequency (shorter delays) might allow the selector 659 // to observe that the facade is no longer referenced and might allow 660 // the selector thread to terminate more timely - for when nothing is 661 // ongoing it will only check for that condition every NODEADLINE ms. 662 // To avoid misuse of the property, the delay that can be specified 663 // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default 664 // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms 665 // The property is -Djdk.internal.httpclient.selectorTimeout=<millis> 666 private static final int MIN_NODEADLINE = 1000; // ms 667 private static final int MAX_NODEADLINE = 1000 * 1200; // ms 668 private static final int DEF_NODEADLINE = 3000; // ms 669 private static final long NODEADLINE; // default is DEF_NODEADLINE ms 670 static { 671 // ensure NODEADLINE is initialized with some valid value. 672 long deadline = Utils.getIntegerProperty( 673 "jdk.internal.httpclient.selectorTimeout", 674 DEF_NODEADLINE); // millis 675 if (deadline <= 0) deadline = DEF_NODEADLINE; 676 deadline = Math.max(deadline, MIN_NODEADLINE); 677 NODEADLINE = Math.min(deadline, MAX_NODEADLINE); 678 } 679 680 private final Selector selector; 681 private volatile boolean closed; 682 private final List<AsyncEvent> registrations; 683 private final List<AsyncTriggerEvent> deregistrations; 684 private final Logger debug; 685 private final Logger debugtimeout; 686 HttpClientImpl owner; 687 ConnectionPool pool; 688 689 SelectorManager(HttpClientImpl ref) throws IOException { 690 super(null, null, 691 "HttpClient-" + ref.id + "-SelectorManager", 692 0, false); 693 owner = ref; 694 debug = ref.debug; 695 debugtimeout = ref.debugtimeout; 696 pool = ref.connectionPool(); 697 registrations = new ArrayList<>(); 698 deregistrations = new ArrayList<>(); 699 selector = Selector.open(); 700 } 701 702 void eventUpdated(AsyncEvent e) throws ClosedChannelException { 703 if (Thread.currentThread() == this) { 704 SelectionKey key = e.channel().keyFor(selector); 705 if (key != null && key.isValid()) { 706 SelectorAttachment sa = (SelectorAttachment) key.attachment(); 707 sa.register(e); 708 } else if (e.interestOps() != 0){ 709 // We don't care about paused events. 710 // These are actually handled by 711 // SelectorAttachment::resetInterestOps later on. 712 // But if we reach here when trying to resume an 713 // event then it's better to fail fast. 714 if (debug.on()) debug.log("No key for channel"); 715 e.abort(new IOException("No key for channel")); 716 } 717 } else { 718 register(e); 719 } 720 } 721 722 // This returns immediately. So caller not allowed to send/receive 723 // on connection. 724 synchronized void register(AsyncEvent e) { 725 registrations.add(e); 726 selector.wakeup(); 727 } 728 729 synchronized void cancel(SocketChannel e) { 730 SelectionKey key = e.keyFor(selector); 731 if (key != null) { 732 key.cancel(); 733 } 734 selector.wakeup(); 735 } 736 737 void wakeupSelector() { 738 selector.wakeup(); 739 } 740 741 synchronized void shutdown() { 742 Log.logTrace("{0}: shutting down", getName()); 743 if (debug.on()) debug.log("SelectorManager shutting down"); 744 closed = true; 745 try { 746 selector.close(); 747 } catch (IOException ignored) { 748 } finally { 749 owner.stop(); 750 } 751 } 752 753 @Override 754 public void run() { 755 List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>(); 756 List<AsyncEvent> readyList = new ArrayList<>(); 757 List<Runnable> resetList = new ArrayList<>(); 758 try { 759 if (Log.channel()) Log.logChannel(getName() + ": starting"); 760 while (!Thread.currentThread().isInterrupted()) { 761 synchronized (this) { 762 assert errorList.isEmpty(); 763 assert readyList.isEmpty(); 764 assert resetList.isEmpty(); 765 for (AsyncTriggerEvent event : deregistrations) { 766 event.handle(); 767 } 768 deregistrations.clear(); 769 for (AsyncEvent event : registrations) { 770 if (event instanceof AsyncTriggerEvent) { 771 readyList.add(event); 772 continue; 773 } 774 SelectableChannel chan = event.channel(); 775 SelectionKey key = null; 776 try { 777 key = chan.keyFor(selector); 778 SelectorAttachment sa; 779 if (key == null || !key.isValid()) { 780 if (key != null) { 781 // key is canceled. 782 // invoke selectNow() to purge it 783 // before registering the new event. 784 selector.selectNow(); 785 } 786 sa = new SelectorAttachment(chan, selector); 787 } else { 788 sa = (SelectorAttachment) key.attachment(); 789 } 790 // may throw IOE if channel closed: that's OK 791 sa.register(event); 792 if (!chan.isOpen()) { 793 throw new IOException("Channel closed"); 794 } 795 } catch (IOException e) { 796 Log.logTrace("{0}: {1}", getName(), e); 797 if (debug.on()) 798 debug.log("Got " + e.getClass().getName() 799 + " while handling registration events"); 800 chan.close(); 801 // let the event abort deal with it 802 errorList.add(new Pair<>(event, e)); 803 if (key != null) { 804 key.cancel(); 805 selector.selectNow(); 806 } 807 } 808 } 809 registrations.clear(); 810 selector.selectedKeys().clear(); 811 } 812 813 for (AsyncEvent event : readyList) { 814 assert event instanceof AsyncTriggerEvent; 815 event.handle(); 816 } 817 readyList.clear(); 818 819 for (Pair<AsyncEvent,IOException> error : errorList) { 820 // an IOException was raised and the channel closed. 821 handleEvent(error.first, error.second); 822 } 823 errorList.clear(); 824 825 // Check whether client is still alive, and if not, 826 // gracefully stop this thread 827 if (!owner.isReferenced()) { 828 Log.logTrace("{0}: {1}", 829 getName(), 830 "HttpClient no longer referenced. Exiting..."); 831 return; 832 } 833 834 // Timeouts will have milliseconds granularity. It is important 835 // to handle them in a timely fashion. 836 long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline(); 837 if (debugtimeout.on()) 838 debugtimeout.log("next timeout: %d", nextTimeout); 839 840 // Keep-alive have seconds granularity. It's not really an 841 // issue if we keep connections linger a bit more in the keep 842 // alive cache. 843 long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline(); 844 if (debugtimeout.on()) 845 debugtimeout.log("next expired: %d", nextExpiry); 846 847 assert nextTimeout >= 0; 848 assert nextExpiry >= 0; 849 850 // Don't wait for ever as it might prevent the thread to 851 // stop gracefully. millis will be 0 if no deadline was found. 852 if (nextTimeout <= 0) nextTimeout = NODEADLINE; 853 854 // Clip nextExpiry at NODEADLINE limit. The default 855 // keep alive is 1200 seconds (half an hour) - we don't 856 // want to wait that long. 857 if (nextExpiry <= 0) nextExpiry = NODEADLINE; 858 else nextExpiry = Math.min(NODEADLINE, nextExpiry); 859 860 // takes the least of the two. 861 long millis = Math.min(nextExpiry, nextTimeout); 862 863 if (debugtimeout.on()) 864 debugtimeout.log("Next deadline is %d", 865 (millis == 0 ? NODEADLINE : millis)); 866 //debugPrint(selector); 867 int n = selector.select(millis == 0 ? NODEADLINE : millis); 868 if (n == 0) { 869 // Check whether client is still alive, and if not, 870 // gracefully stop this thread 871 if (!owner.isReferenced()) { 872 Log.logTrace("{0}: {1}", 873 getName(), 874 "HttpClient no longer referenced. Exiting..."); 875 return; 876 } 877 owner.purgeTimeoutsAndReturnNextDeadline(); 878 continue; 879 } 880 881 Set<SelectionKey> keys = selector.selectedKeys(); 882 assert errorList.isEmpty(); 883 884 for (SelectionKey key : keys) { 885 SelectorAttachment sa = (SelectorAttachment) key.attachment(); 886 if (!key.isValid()) { 887 IOException ex = sa.chan.isOpen() 888 ? new IOException("Invalid key") 889 : new ClosedChannelException(); 890 sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex))); 891 sa.pending.clear(); 892 continue; 893 } 894 895 int eventsOccurred; 896 try { 897 eventsOccurred = key.readyOps(); 898 } catch (CancelledKeyException ex) { 899 IOException io = Utils.getIOException(ex); 900 sa.pending.forEach(e -> errorList.add(new Pair<>(e,io))); 901 sa.pending.clear(); 902 continue; 903 } 904 sa.events(eventsOccurred).forEach(readyList::add); 905 resetList.add(() -> sa.resetInterestOps(eventsOccurred)); 906 } 907 908 selector.selectNow(); // complete cancellation 909 selector.selectedKeys().clear(); 910 911 // handle selected events 912 readyList.forEach((e) -> handleEvent(e, null)); 913 readyList.clear(); 914 915 // handle errors (closed channels etc...) 916 errorList.forEach((p) -> handleEvent(p.first, p.second)); 917 errorList.clear(); 918 919 // reset interest ops for selected channels 920 resetList.forEach(r -> r.run()); 921 resetList.clear(); 922 923 } 924 } catch (Throwable e) { 925 if (!closed) { 926 // This terminates thread. So, better just print stack trace 927 String err = Utils.stackTrace(e); 928 Log.logError("{0}: {1}: {2}", getName(), 929 "HttpClientImpl shutting down due to fatal error", err); 930 } 931 if (debug.on()) debug.log("shutting down", e); 932 if (Utils.ASSERTIONSENABLED && !debug.on()) { 933 e.printStackTrace(System.err); // always print the stack 934 } 935 } finally { 936 if (Log.channel()) Log.logChannel(getName() + ": stopping"); 937 shutdown(); 938 } 939 } 940 941 // void debugPrint(Selector selector) { 942 // System.err.println("Selector: debugprint start"); 943 // Set<SelectionKey> keys = selector.keys(); 944 // for (SelectionKey key : keys) { 945 // SelectableChannel c = key.channel(); 946 // int ops = key.interestOps(); 947 // System.err.printf("selector chan:%s ops:%d\n", c, ops); 948 // } 949 // System.err.println("Selector: debugprint end"); 950 // } 951 952 /** Handles the given event. The given ioe may be null. */ 953 void handleEvent(AsyncEvent event, IOException ioe) { 954 if (closed || ioe != null) { 955 event.abort(ioe); 956 } else { 957 event.handle(); 958 } 959 } 960 } 961 962 final String debugInterestOps(SelectableChannel channel) { 963 try { 964 SelectionKey key = channel.keyFor(selmgr.selector); 965 if (key == null) return "channel not registered with selector"; 966 String keyInterestOps = key.isValid() 967 ? "key.interestOps=" + key.interestOps() : "invalid key"; 968 return String.format("channel registered with selector, %s, sa.interestOps=%s", 969 keyInterestOps, 970 ((SelectorAttachment)key.attachment()).interestOps); 971 } catch (Throwable t) { 972 return String.valueOf(t); 973 } 974 } 975 976 /** 977 * Tracks multiple user level registrations associated with one NIO 978 * registration (SelectionKey). In this implementation, registrations 979 * are one-off and when an event is posted the registration is cancelled 980 * until explicitly registered again. 981 * 982 * <p> No external synchronization required as this class is only used 983 * by the SelectorManager thread. One of these objects required per 984 * connection. 985 */ 986 private static class SelectorAttachment { 987 private final SelectableChannel chan; 988 private final Selector selector; 989 private final Set<AsyncEvent> pending; 990 private final static Logger debug = 991 Utils.getDebugLogger("SelectorAttachment"::toString, Utils.DEBUG); 992 private int interestOps; 993 994 SelectorAttachment(SelectableChannel chan, Selector selector) { 995 this.pending = new HashSet<>(); 996 this.chan = chan; 997 this.selector = selector; 998 } 999 1000 void register(AsyncEvent e) throws ClosedChannelException { 1001 int newOps = e.interestOps(); 1002 // re register interest if we are not already interested 1003 // in the event. If the event is paused, then the pause will 1004 // be taken into account later when resetInterestOps is called. 1005 boolean reRegister = (interestOps & newOps) != newOps; 1006 interestOps |= newOps; 1007 pending.add(e); 1008 if (debug.on()) 1009 debug.log("Registering %s for %d (%s)", e, newOps, reRegister); 1010 if (reRegister) { 1011 // first time registration happens here also 1012 try { 1013 chan.register(selector, interestOps, this); 1014 } catch (Throwable x) { 1015 abortPending(x); 1016 } 1017 } else if (!chan.isOpen()) { 1018 abortPending(new ClosedChannelException()); 1019 } 1020 } 1021 1022 /** 1023 * Returns a Stream<AsyncEvents> containing only events that are 1024 * registered with the given {@code interestOps}. 1025 */ 1026 Stream<AsyncEvent> events(int interestOps) { 1027 return pending.stream() 1028 .filter(ev -> (ev.interestOps() & interestOps) != 0); 1029 } 1030 1031 /** 1032 * Removes any events with the given {@code interestOps}, and if no 1033 * events remaining, cancels the associated SelectionKey. 1034 */ 1035 void resetInterestOps(int interestOps) { 1036 int newOps = 0; 1037 1038 Iterator<AsyncEvent> itr = pending.iterator(); 1039 while (itr.hasNext()) { 1040 AsyncEvent event = itr.next(); 1041 int evops = event.interestOps(); 1042 if (event.repeating()) { 1043 newOps |= evops; 1044 continue; 1045 } 1046 if ((evops & interestOps) != 0) { 1047 itr.remove(); 1048 } else { 1049 newOps |= evops; 1050 } 1051 } 1052 1053 this.interestOps = newOps; 1054 SelectionKey key = chan.keyFor(selector); 1055 if (newOps == 0 && key != null && pending.isEmpty()) { 1056 key.cancel(); 1057 } else { 1058 try { 1059 if (key == null || !key.isValid()) { 1060 throw new CancelledKeyException(); 1061 } 1062 key.interestOps(newOps); 1063 // double check after 1064 if (!chan.isOpen()) { 1065 abortPending(new ClosedChannelException()); 1066 return; 1067 } 1068 assert key.interestOps() == newOps; 1069 } catch (CancelledKeyException x) { 1070 // channel may have been closed 1071 if (debug.on()) debug.log("key cancelled for " + chan); 1072 abortPending(x); 1073 } 1074 } 1075 } 1076 1077 void abortPending(Throwable x) { 1078 if (!pending.isEmpty()) { 1079 AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]); 1080 pending.clear(); 1081 IOException io = Utils.getIOException(x); 1082 for (AsyncEvent event : evts) { 1083 event.abort(io); 1084 } 1085 } 1086 } 1087 } 1088 1089 /*package-private*/ SSLContext theSSLContext() { 1090 return sslContext; 1091 } 1092 1093 @Override 1094 public SSLContext sslContext() { 1095 return sslContext; 1096 } 1097 1098 @Override 1099 public SSLParameters sslParameters() { 1100 return Utils.copySSLParameters(sslParams); 1101 } 1102 1103 @Override 1104 public Optional<Authenticator> authenticator() { 1105 return Optional.ofNullable(authenticator); 1106 } 1107 1108 /*package-private*/ final DelegatingExecutor theExecutor() { 1109 return delegatingExecutor; 1110 } 1111 1112 @Override 1113 public final Optional<Executor> executor() { 1114 return isDefaultExecutor 1115 ? Optional.empty() 1116 : Optional.of(delegatingExecutor.delegate()); 1117 } 1118 1119 ConnectionPool connectionPool() { 1120 return connections; 1121 } 1122 1123 @Override 1124 public Redirect followRedirects() { 1125 return followRedirects; 1126 } 1127 1128 1129 @Override 1130 public Optional<CookieHandler> cookieHandler() { 1131 return Optional.ofNullable(cookieHandler); 1132 } 1133 1134 @Override 1135 public Optional<Duration> connectTimeout() { 1136 return Optional.ofNullable(connectTimeout); 1137 } 1138 1139 @Override 1140 public Optional<ProxySelector> proxy() { 1141 return this.userProxySelector; 1142 } 1143 1144 // Return the effective proxy that this client uses. 1145 ProxySelector proxySelector() { 1146 return proxySelector; 1147 } 1148 1149 @Override 1150 public WebSocket.Builder newWebSocketBuilder() { 1151 // Make sure to pass the HttpClientFacade to the WebSocket builder. 1152 // This will ensure that the facade is not released before the 1153 // WebSocket has been created, at which point the pendingOperationCount 1154 // will have been incremented by the RawChannelTube. 1155 // See RawChannelTube. 1156 return new BuilderImpl(this.facade(), proxySelector); 1157 } 1158 1159 @Override 1160 public Version version() { 1161 return version; 1162 } 1163 1164 String dbgString() { 1165 return dbgTag; 1166 } 1167 1168 @Override 1169 public String toString() { 1170 // Used by tests to get the client's id and compute the 1171 // name of the SelectorManager thread. 1172 return super.toString() + ("(" + id + ")"); 1173 } 1174 1175 private void initFilters() { 1176 addFilter(AuthenticationFilter.class); 1177 addFilter(RedirectFilter.class); 1178 if (this.cookieHandler != null) { 1179 addFilter(CookieFilter.class); 1180 } 1181 } 1182 1183 private void addFilter(Class<? extends HeaderFilter> f) { 1184 filters.addFilter(f); 1185 } 1186 1187 final LinkedList<HeaderFilter> filterChain() { 1188 return filters.getFilterChain(); 1189 } 1190 1191 // Timer controls. 1192 // Timers are implemented through timed Selector.select() calls. 1193 1194 synchronized void registerTimer(TimeoutEvent event) { 1195 Log.logTrace("Registering timer {0}", event); 1196 timeouts.add(event); 1197 selmgr.wakeupSelector(); 1198 } 1199 1200 synchronized void cancelTimer(TimeoutEvent event) { 1201 Log.logTrace("Canceling timer {0}", event); 1202 timeouts.remove(event); 1203 } 1204 1205 /** 1206 * Purges ( handles ) timer events that have passed their deadline, and 1207 * returns the amount of time, in milliseconds, until the next earliest 1208 * event. A return value of 0 means that there are no events. 1209 */ 1210 private long purgeTimeoutsAndReturnNextDeadline() { 1211 long diff = 0L; 1212 List<TimeoutEvent> toHandle = null; 1213 int remaining = 0; 1214 // enter critical section to retrieve the timeout event to handle 1215 synchronized(this) { 1216 if (timeouts.isEmpty()) return 0L; 1217 1218 Instant now = Instant.now(); 1219 Iterator<TimeoutEvent> itr = timeouts.iterator(); 1220 while (itr.hasNext()) { 1221 TimeoutEvent event = itr.next(); 1222 diff = now.until(event.deadline(), ChronoUnit.MILLIS); 1223 if (diff <= 0) { 1224 itr.remove(); 1225 toHandle = (toHandle == null) ? new ArrayList<>() : toHandle; 1226 toHandle.add(event); 1227 } else { 1228 break; 1229 } 1230 } 1231 remaining = timeouts.size(); 1232 } 1233 1234 // can be useful for debugging 1235 if (toHandle != null && Log.trace()) { 1236 Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling " 1237 + toHandle.size() + " events, " 1238 + "remaining " + remaining 1239 + ", next deadline: " + (diff < 0 ? 0L : diff)); 1240 } 1241 1242 // handle timeout events out of critical section 1243 if (toHandle != null) { 1244 Throwable failed = null; 1245 for (TimeoutEvent event : toHandle) { 1246 try { 1247 Log.logTrace("Firing timer {0}", event); 1248 event.handle(); 1249 } catch (Error | RuntimeException e) { 1250 // Not expected. Handle remaining events then throw... 1251 // If e is an OOME or SOE it might simply trigger a new 1252 // error from here - but in this case there's not much we 1253 // could do anyway. Just let it flow... 1254 if (failed == null) failed = e; 1255 else failed.addSuppressed(e); 1256 Log.logTrace("Failed to handle event {0}: {1}", event, e); 1257 } 1258 } 1259 if (failed instanceof Error) throw (Error) failed; 1260 if (failed instanceof RuntimeException) throw (RuntimeException) failed; 1261 } 1262 1263 // return time to wait until next event. 0L if there's no more events. 1264 return diff < 0 ? 0L : diff; 1265 } 1266 1267 // used for the connection window 1268 int getReceiveBufferSize() { 1269 return Utils.getIntegerNetProperty( 1270 "jdk.httpclient.receiveBufferSize", 1271 0 // only set the size if > 0 1272 ); 1273 } 1274 1275 // Optimization for reading SSL encrypted data 1276 // -------------------------------------------- 1277 1278 // Returns a BufferSupplier that can be used for reading 1279 // encrypted bytes of the channel. These buffers can then 1280 // be recycled by the SSLFlowDelegate::Reader after their 1281 // content has been copied in the SSLFlowDelegate::Reader 1282 // readBuf. 1283 // Because allocating, reading, copying, and recycling 1284 // all happen in the SelectorManager thread, 1285 // then this BufferSupplier can be shared between all 1286 // the SSL connections managed by this client. 1287 BufferSupplier getSSLBufferSupplier() { 1288 return sslBufferSupplier; 1289 } 1290 1291 // An implementation of BufferSupplier that manages a pool of 1292 // maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that 1293 // are used for reading encrypted bytes off the channel before 1294 // copying and subsequent unwrapping. 1295 private static final class SSLDirectBufferSupplier implements BufferSupplier { 1296 private static final int POOL_SIZE = SocketTube.MAX_BUFFERS; 1297 private final ByteBuffer[] pool = new ByteBuffer[POOL_SIZE]; 1298 private final HttpClientImpl client; 1299 private final Logger debug; 1300 private int tail, count; // no need for volatile: only accessed in SM thread. 1301 1302 SSLDirectBufferSupplier(HttpClientImpl client) { 1303 this.client = Objects.requireNonNull(client); 1304 this.debug = client.debug; 1305 } 1306 1307 // Gets a buffer from the pool, or allocates a new one if needed. 1308 @Override 1309 public ByteBuffer get() { 1310 assert client.isSelectorThread(); 1311 assert tail <= POOL_SIZE : "allocate tail is " + tail; 1312 ByteBuffer buf; 1313 if (tail == 0) { 1314 if (debug.on()) { 1315 // should not appear more than SocketTube.MAX_BUFFERS 1316 debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE); 1317 } 1318 assert count++ < POOL_SIZE : "trying to allocate more than " 1319 + POOL_SIZE + " buffers"; 1320 buf = ByteBuffer.allocateDirect(Utils.BUFSIZE); 1321 } else { 1322 assert tail > 0 : "non positive tail value: " + tail; 1323 tail--; 1324 buf = pool[tail]; 1325 pool[tail] = null; 1326 } 1327 assert buf.isDirect(); 1328 assert buf.position() == 0; 1329 assert buf.hasRemaining(); 1330 assert buf.limit() == Utils.BUFSIZE; 1331 assert tail < POOL_SIZE; 1332 assert tail >= 0; 1333 return buf; 1334 } 1335 1336 // Returns the given buffer to the pool. 1337 @Override 1338 public void recycle(ByteBuffer buffer) { 1339 assert client.isSelectorThread(); 1340 assert buffer.isDirect(); 1341 assert !buffer.hasRemaining(); 1342 assert tail < POOL_SIZE : "recycle tail is " + tail; 1343 assert tail >= 0; 1344 buffer.position(0); 1345 buffer.limit(buffer.capacity()); 1346 // don't fail if assertions are off. we have asserted above. 1347 if (tail < POOL_SIZE) { 1348 pool[tail] = buffer; 1349 tail++; 1350 } 1351 assert tail <= POOL_SIZE; 1352 assert tail > 0; 1353 } 1354 } 1355 1356 }