1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import javax.net.ssl.SSLContext; 29 import javax.net.ssl.SSLParameters; 30 import java.io.IOException; 31 import java.lang.ref.WeakReference; 32 import java.net.Authenticator; 33 import java.net.CookieManager; 34 import java.net.ProxySelector; 35 import java.net.URI; 36 import java.nio.channels.ClosedChannelException; 37 import java.nio.channels.SelectableChannel; 38 import java.nio.channels.SelectionKey; 39 import java.nio.channels.Selector; 40 import java.nio.channels.SocketChannel; 41 import java.security.NoSuchAlgorithmException; 42 import java.time.Instant; 43 import java.time.temporal.ChronoUnit; 44 import java.util.ArrayList; 45 import java.util.Iterator; 46 import java.util.List; 47 import java.util.Optional; 48 import java.util.Set; 49 import java.util.TreeSet; 50 import java.util.concurrent.CompletableFuture; 51 import java.util.concurrent.Executor; 52 import java.util.concurrent.Executors; 53 import java.util.concurrent.ThreadFactory; 54 import java.util.stream.Stream; 55 import jdk.incubator.http.internal.common.Log; 56 import jdk.incubator.http.internal.common.Utils; 57 import jdk.incubator.http.internal.websocket.BuilderImpl; 58 59 /** 60 * Client implementation. Contains all configuration information and also 61 * the selector manager thread which allows async events to be registered 62 * and delivered when they occur. See AsyncEvent. 63 */ 64 class HttpClientImpl extends HttpClient { 65 66 // Define the default factory as a static inner class 67 // that embeds all the necessary logic to avoid 68 // the risk of using a lambda that might keep a reference on the 69 // HttpClient instance from which it was created (helps with 70 // heapdump analysis). 71 private static final class DefaultThreadFactory implements ThreadFactory { 72 private DefaultThreadFactory() {} 73 @Override 74 public Thread newThread(Runnable r) { 75 Thread t = new Thread(null, r, "HttpClient_worker", 0, true); 76 t.setDaemon(true); 77 return t; 78 } 79 static final ThreadFactory INSTANCE = new DefaultThreadFactory(); 80 } 81 82 private final CookieManager cookieManager; 83 private final Redirect followRedirects; 84 private final ProxySelector proxySelector; 85 private final Authenticator authenticator; 86 private final Version version; 87 private final ConnectionPool connections; 88 private final Executor executor; 89 // Security parameters 90 private final SSLContext sslContext; 91 private final SSLParameters sslParams; 92 private final SelectorManager selmgr; 93 private final FilterFactory filters; 94 private final Http2ClientImpl client2; 95 96 /** A Set of, deadline first, ordered timeout events. */ 97 private final TreeSet<TimeoutEvent> timeouts; 98 99 public static HttpClientImpl create(HttpClientBuilderImpl builder) { 100 HttpClientImpl impl = new HttpClientImpl(builder); 101 impl.start(); 102 return impl; 103 } 104 105 private HttpClientImpl(HttpClientBuilderImpl builder) { 106 if (builder.sslContext == null) { 107 try { 108 sslContext = SSLContext.getDefault(); 109 } catch (NoSuchAlgorithmException ex) { 110 throw new InternalError(ex); 111 } 112 } else { 113 sslContext = builder.sslContext; 114 } 115 Executor ex = builder.executor; 116 if (ex == null) { 117 ex = Executors.newCachedThreadPool(DefaultThreadFactory.INSTANCE); 118 } else { 119 ex = builder.executor; 120 } 121 client2 = new Http2ClientImpl(this); 122 executor = ex; 123 cookieManager = builder.cookieManager; 124 followRedirects = builder.followRedirects == null ? 125 Redirect.NEVER : builder.followRedirects; 126 this.proxySelector = builder.proxy; 127 authenticator = builder.authenticator; 128 if (builder.version == null) { 129 version = HttpClient.Version.HTTP_2; 130 } else { 131 version = builder.version; 132 } 133 if (builder.sslParams == null) { 134 sslParams = getDefaultParams(sslContext); 135 } else { 136 sslParams = builder.sslParams; 137 } 138 connections = new ConnectionPool(); 139 connections.start(); 140 timeouts = new TreeSet<>(); 141 try { 142 selmgr = new SelectorManager(this); 143 } catch (IOException e) { 144 // unlikely 145 throw new InternalError(e); 146 } 147 selmgr.setDaemon(true); 148 filters = new FilterFactory(); 149 initFilters(); 150 } 151 152 private void start() { 153 selmgr.start(); 154 } 155 156 private static SSLParameters getDefaultParams(SSLContext ctx) { 157 SSLParameters params = ctx.getSupportedSSLParameters(); 158 params.setProtocols(new String[]{"TLSv1.2"}); 159 return params; 160 } 161 162 /** 163 * Wait for activity on given exchange (assuming blocking = false). 164 * It's a no-op if blocking = true. In particular, the following occurs 165 * in the SelectorManager thread. 166 * 167 * 1) mark the connection non-blocking 168 * 2) add to selector 169 * 3) If selector fires for this exchange then 170 * 4) - mark connection as blocking 171 * 5) - call AsyncEvent.handle() 172 * 173 * If exchange needs to block again, then call registerEvent() again 174 */ 175 void registerEvent(AsyncEvent exchange) throws IOException { 176 selmgr.register(exchange); 177 } 178 179 /** 180 * Only used from RawChannel to disconnect the channel from 181 * the selector 182 */ 183 void cancelRegistration(SocketChannel s) { 184 selmgr.cancel(s); 185 } 186 187 188 Http2ClientImpl client2() { 189 return client2; 190 } 191 192 /* 193 @Override 194 public ByteBuffer getBuffer() { 195 return pool.getBuffer(); 196 } 197 198 // SSL buffers are larger. Manage separately 199 200 int size = 16 * 1024; 201 202 ByteBuffer getSSLBuffer() { 203 return ByteBuffer.allocate(size); 204 } 205 206 /** 207 * Return a new buffer that's a bit bigger than the given one 208 * 209 * @param buf 210 * @return 211 * 212 ByteBuffer reallocSSLBuffer(ByteBuffer buf) { 213 size = buf.capacity() * 12 / 10; // 20% bigger 214 return ByteBuffer.allocate(size); 215 } 216 217 synchronized void returnSSLBuffer(ByteBuffer buf) { 218 if (buf.capacity() >= size) 219 sslBuffers.add(0, buf); 220 } 221 222 @Override 223 public void returnBuffer(ByteBuffer buffer) { 224 pool.returnBuffer(buffer); 225 } 226 */ 227 228 @Override 229 public <T> HttpResponse<T> 230 send(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler) 231 throws IOException, InterruptedException 232 { 233 MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler); 234 return mex.response(); 235 } 236 237 @Override 238 public <T> CompletableFuture<HttpResponse<T>> 239 sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler) 240 { 241 MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler); 242 return mex.responseAsync() 243 .thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b); 244 } 245 246 @Override 247 public <U, T> CompletableFuture<U> 248 sendAsync(HttpRequest req, HttpResponse.MultiProcessor<U, T> responseHandler) { 249 MultiExchange<U,T> mex = new MultiExchange<>(req, this, responseHandler); 250 return mex.multiResponseAsync(); 251 } 252 253 // new impl. Should get rid of above 254 /* 255 static class BufferPool implements BufferHandler { 256 257 final LinkedList<ByteBuffer> freelist = new LinkedList<>(); 258 259 @Override 260 public synchronized ByteBuffer getBuffer() { 261 ByteBuffer buf; 262 263 while (!freelist.isEmpty()) { 264 buf = freelist.removeFirst(); 265 buf.clear(); 266 return buf; 267 } 268 return ByteBuffer.allocate(BUFSIZE); 269 } 270 271 @Override 272 public synchronized void returnBuffer(ByteBuffer buffer) { 273 assert buffer.capacity() > 0; 274 freelist.add(buffer); 275 } 276 } 277 278 static BufferPool pool = new BufferPool(); 279 280 static BufferHandler pool() { 281 return pool; 282 } 283 */ 284 // Main loop for this client's selector 285 private final static class SelectorManager extends Thread { 286 287 private static final long NODEADLINE = 3000L; 288 private final Selector selector; 289 private volatile boolean closed; 290 private final List<AsyncEvent> readyList; 291 private final List<AsyncEvent> registrations; 292 293 // Uses a weak reference to the HttpClient owning this 294 // selector: a strong reference prevents its garbage 295 // collection while the thread is running. 296 // We want the thread to exit gracefully when the 297 // HttpClient that owns it gets GC'ed. 298 WeakReference<HttpClientImpl> ownerRef; 299 300 SelectorManager(HttpClientImpl ref) throws IOException { 301 super(null, null, "SelectorManager", 0, false); 302 ownerRef = new WeakReference<>(ref); 303 readyList = new ArrayList<>(); 304 registrations = new ArrayList<>(); 305 selector = Selector.open(); 306 } 307 308 // This returns immediately. So caller not allowed to send/receive 309 // on connection. 310 311 synchronized void register(AsyncEvent e) throws IOException { 312 registrations.add(e); 313 selector.wakeup(); 314 } 315 316 synchronized void cancel(SocketChannel e) { 317 SelectionKey key = e.keyFor(selector); 318 if (key != null) { 319 key.cancel(); 320 } 321 selector.wakeup(); 322 } 323 324 void wakeupSelector() { 325 selector.wakeup(); 326 } 327 328 synchronized void shutdown() { 329 closed = true; 330 try { 331 selector.close(); 332 } catch (IOException ignored) { } 333 } 334 335 @Override 336 public void run() { 337 try { 338 while (!Thread.currentThread().isInterrupted()) { 339 HttpClientImpl client; 340 synchronized (this) { 341 for (AsyncEvent exchange : registrations) { 342 SelectableChannel c = exchange.channel(); 343 try { 344 c.configureBlocking(false); 345 SelectionKey key = c.keyFor(selector); 346 SelectorAttachment sa; 347 if (key == null || !key.isValid()) { 348 if (key != null) { 349 // key is canceled. 350 // invoke selectNow() to purge it 351 // before registering the new event. 352 selector.selectNow(); 353 } 354 sa = new SelectorAttachment(c, selector); 355 } else { 356 sa = (SelectorAttachment) key.attachment(); 357 } 358 sa.register(exchange); 359 } catch (IOException e) { 360 Log.logError("HttpClientImpl: " + e); 361 c.close(); 362 // let the exchange deal with it 363 handleEvent(exchange); 364 } 365 } 366 registrations.clear(); 367 } 368 369 // Check whether client is still alive, and if not, 370 // gracefully stop this thread 371 if ((client = ownerRef.get()) == null) { 372 Log.logTrace("HttpClient no longer referenced. Exiting..."); 373 return; 374 } 375 long millis = client.purgeTimeoutsAndReturnNextDeadline(); 376 client = null; // don't hold onto the client ref 377 378 //debugPrint(selector); 379 // Don't wait for ever as it might prevent the thread to 380 // stop gracefully. millis will be 0 if no deadline was found. 381 int n = selector.select(millis == 0 ? NODEADLINE : millis); 382 if (n == 0) { 383 // Check whether client is still alive, and if not, 384 // gracefully stop this thread 385 if ((client = ownerRef.get()) == null) { 386 Log.logTrace("HttpClient no longer referenced. Exiting..."); 387 return; 388 } 389 client.purgeTimeoutsAndReturnNextDeadline(); 390 client = null; // don't hold onto the client ref 391 continue; 392 } 393 Set<SelectionKey> keys = selector.selectedKeys(); 394 395 for (SelectionKey key : keys) { 396 SelectorAttachment sa = (SelectorAttachment) key.attachment(); 397 int eventsOccurred = key.readyOps(); 398 sa.events(eventsOccurred).forEach(readyList::add); 399 sa.resetInterestOps(eventsOccurred); 400 } 401 selector.selectNow(); // complete cancellation 402 selector.selectedKeys().clear(); 403 404 for (AsyncEvent exchange : readyList) { 405 if (exchange.blocking()) { 406 exchange.channel().configureBlocking(true); 407 } 408 handleEvent(exchange); // will be delegated to executor 409 } 410 readyList.clear(); 411 } 412 } catch (Throwable e) { 413 if (!closed) { 414 // This terminates thread. So, better just print stack trace 415 String err = Utils.stackTrace(e); 416 Log.logError("HttpClientImpl: fatal error: " + err); 417 } 418 } finally { 419 shutdown(); 420 } 421 } 422 423 void debugPrint(Selector selector) { 424 System.err.println("Selector: debugprint start"); 425 Set<SelectionKey> keys = selector.keys(); 426 for (SelectionKey key : keys) { 427 SelectableChannel c = key.channel(); 428 int ops = key.interestOps(); 429 System.err.printf("selector chan:%s ops:%d\n", c, ops); 430 } 431 System.err.println("Selector: debugprint end"); 432 } 433 434 void handleEvent(AsyncEvent e) { 435 if (closed) { 436 e.abort(); 437 } else { 438 e.handle(); 439 } 440 } 441 } 442 443 /** 444 * Tracks multiple user level registrations associated with one NIO 445 * registration (SelectionKey). In this implementation, registrations 446 * are one-off and when an event is posted the registration is cancelled 447 * until explicitly registered again. 448 * 449 * <p> No external synchronization required as this class is only used 450 * by the SelectorManager thread. One of these objects required per 451 * connection. 452 */ 453 private static class SelectorAttachment { 454 private final SelectableChannel chan; 455 private final Selector selector; 456 private final ArrayList<AsyncEvent> pending; 457 private int interestOps; 458 459 SelectorAttachment(SelectableChannel chan, Selector selector) { 460 this.pending = new ArrayList<>(); 461 this.chan = chan; 462 this.selector = selector; 463 } 464 465 void register(AsyncEvent e) throws ClosedChannelException { 466 int newOps = e.interestOps(); 467 boolean reRegister = (interestOps & newOps) != newOps; 468 interestOps |= newOps; 469 pending.add(e); 470 if (reRegister) { 471 // first time registration happens here also 472 chan.register(selector, interestOps, this); 473 } 474 } 475 476 /** 477 * Returns a Stream<AsyncEvents> containing only events that are 478 * registered with the given {@code interestOps}. 479 */ 480 Stream<AsyncEvent> events(int interestOps) { 481 return pending.stream() 482 .filter(ev -> (ev.interestOps() & interestOps) != 0); 483 } 484 485 /** 486 * Removes any events with the given {@code interestOps}, and if no 487 * events remaining, cancels the associated SelectionKey. 488 */ 489 void resetInterestOps(int interestOps) { 490 int newOps = 0; 491 492 Iterator<AsyncEvent> itr = pending.iterator(); 493 while (itr.hasNext()) { 494 AsyncEvent event = itr.next(); 495 int evops = event.interestOps(); 496 if (event.repeating()) { 497 newOps |= evops; 498 continue; 499 } 500 if ((evops & interestOps) != 0) { 501 itr.remove(); 502 } else { 503 newOps |= evops; 504 } 505 } 506 507 this.interestOps = newOps; 508 SelectionKey key = chan.keyFor(selector); 509 if (newOps == 0) { 510 key.cancel(); 511 } else { 512 key.interestOps(newOps); 513 } 514 } 515 } 516 517 @Override 518 public SSLContext sslContext() { 519 Utils.checkNetPermission("getSSLContext"); 520 return sslContext; 521 } 522 523 @Override 524 public Optional<SSLParameters> sslParameters() { 525 return Optional.ofNullable(sslParams); 526 } 527 528 @Override 529 public Optional<Authenticator> authenticator() { 530 return Optional.ofNullable(authenticator); 531 } 532 533 @Override 534 public Executor executor() { 535 return executor; 536 } 537 538 ConnectionPool connectionPool() { 539 return connections; 540 } 541 542 @Override 543 public Redirect followRedirects() { 544 return followRedirects; 545 } 546 547 548 @Override 549 public Optional<CookieManager> cookieManager() { 550 return Optional.ofNullable(cookieManager); 551 } 552 553 @Override 554 public Optional<ProxySelector> proxy() { 555 return Optional.ofNullable(this.proxySelector); 556 } 557 558 @Override 559 public WebSocket.Builder newWebSocketBuilder(URI uri, 560 WebSocket.Listener listener) { 561 return new BuilderImpl(this, uri, listener); 562 } 563 564 @Override 565 public Version version() { 566 return version; 567 } 568 569 //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>(); 570 571 boolean getHttp2Allowed() { 572 return version.equals(Version.HTTP_2); 573 } 574 575 private void initFilters() { 576 addFilter(AuthenticationFilter.class); 577 addFilter(RedirectFilter.class); 578 if (this.cookieManager != null) { 579 addFilter(CookieFilter.class); 580 } 581 } 582 583 private void addFilter(Class<? extends HeaderFilter> f) { 584 filters.addFilter(f); 585 } 586 587 final List<HeaderFilter> filterChain() { 588 return filters.getFilterChain(); 589 } 590 591 // Timer controls. 592 // Timers are implemented through timed Selector.select() calls. 593 594 synchronized void registerTimer(TimeoutEvent event) { 595 Log.logTrace("Registering timer {0}", event); 596 timeouts.add(event); 597 selmgr.wakeupSelector(); 598 } 599 600 synchronized void cancelTimer(TimeoutEvent event) { 601 Log.logTrace("Canceling timer {0}", event); 602 timeouts.remove(event); 603 } 604 605 /** 606 * Purges ( handles ) timer events that have passed their deadline, and 607 * returns the amount of time, in milliseconds, until the next earliest 608 * event. A return value of 0 means that there are no events. 609 */ 610 private long purgeTimeoutsAndReturnNextDeadline() { 611 long diff = 0L; 612 List<TimeoutEvent> toHandle = null; 613 int remaining = 0; 614 // enter critical section to retrieve the timeout event to handle 615 synchronized(this) { 616 if (timeouts.isEmpty()) return 0L; 617 618 Instant now = Instant.now(); 619 Iterator<TimeoutEvent> itr = timeouts.iterator(); 620 while (itr.hasNext()) { 621 TimeoutEvent event = itr.next(); 622 diff = now.until(event.deadline(), ChronoUnit.MILLIS); 623 if (diff <= 0) { 624 itr.remove(); 625 toHandle = (toHandle == null) ? new ArrayList<>() : toHandle; 626 toHandle.add(event); 627 } else { 628 break; 629 } 630 } 631 remaining = timeouts.size(); 632 } 633 634 // can be useful for debugging 635 if (toHandle != null && Log.trace()) { 636 Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling " 637 + (toHandle == null ? 0 : toHandle.size()) + " events, " 638 + "remaining " + remaining 639 + ", next deadline: " + (diff < 0 ? 0L : diff)); 640 } 641 642 // handle timeout events out of critical section 643 if (toHandle != null) { 644 Throwable failed = null; 645 for (TimeoutEvent event : toHandle) { 646 try { 647 Log.logTrace("Firing timer {0}", event); 648 event.handle(); 649 } catch (Error | RuntimeException e) { 650 // Not expected. Handle remaining events then throw... 651 // If e is an OOME or SOE it might simply trigger a new 652 // error from here - but in this case there's not much we 653 // could do anyway. Just let it flow... 654 if (failed == null) failed = e; 655 else failed.addSuppressed(e); 656 Log.logTrace("Failed to handle event {0}: {1}", event, e); 657 } 658 } 659 if (failed instanceof Error) throw (Error) failed; 660 if (failed instanceof RuntimeException) throw (RuntimeException) failed; 661 } 662 663 // return time to wait until next event. 0L if there's no more events. 664 return diff < 0 ? 0L : diff; 665 } 666 667 // used for the connection window 668 int getReceiveBufferSize() { 669 return Utils.getIntegerNetProperty( 670 "jdk.httpclient.connectionWindowSize", 256 * 1024 671 ); 672 } 673 }