1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.net.http; 27 28 import javax.net.ssl.SSLContext; 29 import javax.net.ssl.SSLParameters; 30 import java.io.IOException; 31 import java.net.Authenticator; 32 import java.net.CookieManager; 33 import java.net.ProxySelector; 34 import java.net.URI; 35 import java.nio.ByteBuffer; 36 import java.nio.channels.ClosedChannelException; 37 import java.nio.channels.SelectableChannel; 38 import java.nio.channels.SelectionKey; 39 import java.nio.channels.Selector; 40 import java.nio.channels.SocketChannel; 41 import java.security.NoSuchAlgorithmException; 42 import java.util.ArrayList; 43 import java.util.Iterator; 44 import java.util.LinkedList; 45 import java.util.List; 46 import java.util.ListIterator; 47 import java.util.Optional; 48 import java.util.Set; 49 import java.util.concurrent.ExecutorService; 50 import java.util.concurrent.Executors; 51 import java.util.concurrent.ThreadFactory; 52 import java.util.stream.Stream; 53 54 import static java.net.http.Utils.BUFSIZE; 55 56 /** 57 * Client implementation. Contains all configuration information and also 58 * the selector manager thread which allows async events to be registered 59 * and delivered when they occur. See AsyncEvent. 60 */ 61 class HttpClientImpl extends HttpClient implements BufferHandler { 62 63 private static final ThreadFactory defaultFactory = 64 (r -> new Thread(null, r, "HttpClient_worker", 0, true)); 65 66 private final CookieManager cookieManager; 67 private final Redirect followRedirects; 68 private final ProxySelector proxySelector; 69 private final Authenticator authenticator; 70 private final Version version; 71 private boolean pipelining = false; 72 private final ConnectionPool connections; 73 private final ExecutorWrapper executor; 74 // Security parameters 75 private final SSLContext sslContext; 76 private final SSLParameters sslParams; 77 private final SelectorManager selmgr; 78 private final FilterFactory filters; 79 private final Http2ClientImpl client2; 80 private final LinkedList<TimeoutEvent> timeouts; 81 82 public static HttpClientImpl create(HttpClientBuilderImpl builder) { 83 HttpClientImpl impl = new HttpClientImpl(builder); 84 impl.start(); 85 return impl; 86 } 87 88 private HttpClientImpl(HttpClientBuilderImpl builder) { 89 if (builder.sslContext == null) { 90 try { 91 sslContext = SSLContext.getDefault(); 92 } catch (NoSuchAlgorithmException ex) { 93 throw new InternalError(ex); 94 } 95 } else { 96 sslContext = builder.sslContext; 97 } 98 ExecutorService ex = builder.executor; 99 if (ex == null) { 100 ex = Executors.newCachedThreadPool((r) -> { 101 Thread t = defaultFactory.newThread(r); 102 t.setDaemon(true); 103 return t; 104 }); 105 } else { 106 ex = builder.executor; 107 } 108 client2 = new Http2ClientImpl(this); 109 executor = ExecutorWrapper.wrap(ex); 110 cookieManager = builder.cookieManager; 111 followRedirects = builder.followRedirects == null ? 112 Redirect.NEVER : builder.followRedirects; 113 this.proxySelector = builder.proxy; 114 authenticator = builder.authenticator; 115 version = builder.version; 116 if (builder.sslParams == null) 117 sslParams = getDefaultParams(sslContext); 118 else 119 sslParams = builder.sslParams; 120 connections = new ConnectionPool(); 121 connections.start(); 122 timeouts = new LinkedList<>(); 123 try { 124 selmgr = new SelectorManager(); 125 } catch (IOException e) { 126 // unlikely 127 throw new InternalError(e); 128 } 129 selmgr.setDaemon(true); 130 filters = new FilterFactory(); 131 initFilters(); 132 } 133 134 private void start() { 135 selmgr.start(); 136 } 137 138 private static SSLParameters getDefaultParams(SSLContext ctx) { 139 SSLParameters params = ctx.getSupportedSSLParameters(); 140 params.setProtocols(new String[]{"TLSv1.2"}); 141 return params; 142 } 143 144 /** 145 * Wait for activity on given exchange (assuming blocking = false). 146 * It's a no-op if blocking = true. In particular, the following occurs 147 * in the SelectorManager thread. 148 * 149 * 1) mark the connection non-blocking 150 * 2) add to selector 151 * 3) If selector fires for this exchange then 152 * 4) - mark connection as blocking 153 * 5) - call AsyncEvent.handle() 154 * 155 * If exchange needs to block again, then call registerEvent() again 156 */ 157 void registerEvent(AsyncEvent exchange) throws IOException { 158 selmgr.register(exchange); 159 } 160 161 /** 162 * Only used from RawChannel to disconnect the channel from 163 * the selector 164 */ 165 void cancelRegistration(SocketChannel s) { 166 selmgr.cancel(s); 167 } 168 169 170 Http2ClientImpl client2() { 171 return client2; 172 } 173 174 /** 175 * We keep one size of buffer on free list. That size may increase 176 * depending on demand. If that happens we dispose of free buffers 177 * that are smaller than new size. 178 */ 179 private final LinkedList<ByteBuffer> freelist = new LinkedList<>(); 180 int currentSize = BUFSIZE; 181 182 @Override 183 public synchronized ByteBuffer getBuffer(int size) { 184 185 ByteBuffer buf; 186 if (size == -1) 187 size = currentSize; 188 189 if (size > currentSize) 190 currentSize = size; 191 192 while (!freelist.isEmpty()) { 193 buf = freelist.removeFirst(); 194 if (buf.capacity() < currentSize) 195 continue; 196 buf.clear(); 197 return buf; 198 } 199 return ByteBuffer.allocate(size); 200 } 201 202 @Override 203 public synchronized void returnBuffer(ByteBuffer buffer) { 204 freelist.add(buffer); 205 } 206 207 @Override 208 public synchronized void setMinBufferSize(int n) { 209 currentSize = Math.max(n, currentSize); 210 } 211 212 // Main loop for this client's selector 213 private final class SelectorManager extends Thread { 214 215 private final Selector selector; 216 private volatile boolean closed; 217 private final List<AsyncEvent> readyList; 218 private final List<AsyncEvent> registrations; 219 220 SelectorManager() throws IOException { 221 super(null, null, "SelectorManager", 0, false); 222 readyList = new ArrayList<>(); 223 registrations = new ArrayList<>(); 224 selector = Selector.open(); 225 } 226 227 // This returns immediately. So caller not allowed to send/receive 228 // on connection. 229 230 synchronized void register(AsyncEvent e) throws IOException { 231 registrations.add(e); 232 selector.wakeup(); 233 } 234 235 synchronized void cancel(SocketChannel e) { 236 SelectionKey key = e.keyFor(selector); 237 if (key != null) 238 key.cancel(); 239 selector.wakeup(); 240 } 241 242 void wakeupSelector() { 243 selector.wakeup(); 244 } 245 246 synchronized void shutdown() { 247 closed = true; 248 try { 249 selector.close(); 250 } catch (IOException ignored) { } 251 } 252 253 @Override 254 public void run() { 255 try { 256 while (!Thread.currentThread().isInterrupted()) { 257 synchronized (this) { 258 for (AsyncEvent exchange : registrations) { 259 SelectableChannel c = exchange.channel(); 260 try { 261 c.configureBlocking(false); 262 SelectionKey key = c.keyFor(selector); 263 SelectorAttachment sa; 264 if (key == null) { 265 sa = new SelectorAttachment(c, selector); 266 } else { 267 sa = (SelectorAttachment) key.attachment(); 268 } 269 sa.register(exchange); 270 } catch (IOException e) { 271 Log.logError("HttpClientImpl: " + e); 272 c.close(); 273 // let the exchange deal with it 274 handleEvent(exchange); 275 } 276 } 277 registrations.clear(); 278 } 279 long timeval = getTimeoutValue(); 280 long now = System.currentTimeMillis(); 281 //debugPrint(selector); 282 int n = selector.select(timeval); 283 if (n == 0) { 284 signalTimeouts(now); 285 continue; 286 } 287 Set<SelectionKey> keys = selector.selectedKeys(); 288 289 for (SelectionKey key : keys) { 290 SelectorAttachment sa = (SelectorAttachment) key.attachment(); 291 int eventsOccurred = key.readyOps(); 292 sa.events(eventsOccurred).forEach(readyList::add); 293 sa.resetInterestOps(eventsOccurred); 294 } 295 selector.selectNow(); // complete cancellation 296 selector.selectedKeys().clear(); 297 298 for (AsyncEvent exchange : readyList) { 299 if (exchange.blocking()) { 300 exchange.channel().configureBlocking(true); 301 } 302 executor.synchronize(); 303 handleEvent(exchange); // will be delegated to executor 304 } 305 readyList.clear(); 306 } 307 } catch (Throwable e) { 308 if (!closed) { 309 // This terminates thread. So, better just print stack trace 310 String err = Utils.stackTrace(e); 311 Log.logError("HttpClientImpl: fatal error: " + err); 312 } 313 } finally { 314 shutdown(); 315 } 316 } 317 318 void debugPrint(Selector selector) { 319 System.err.println("Selector: debugprint start"); 320 Set<SelectionKey> keys = selector.keys(); 321 for (SelectionKey key : keys) { 322 SelectableChannel c = key.channel(); 323 int ops = key.interestOps(); 324 System.err.printf("selector chan:%s ops:%d\n", c, ops); 325 } 326 System.err.println("Selector: debugprint end"); 327 } 328 329 void handleEvent(AsyncEvent e) { 330 if (closed) { 331 e.abort(); 332 } else { 333 e.handle(); 334 } 335 } 336 } 337 338 /** 339 * Tracks multiple user level registrations associated with one NIO 340 * registration (SelectionKey). In this implementation, registrations 341 * are one-off and when an event is posted the registration is cancelled 342 * until explicitly registered again. 343 * 344 * <p> No external synchronization required as this class is only used 345 * by the SelectorManager thread. One of these objects required per 346 * connection. 347 */ 348 private static class SelectorAttachment { 349 private final SelectableChannel chan; 350 private final Selector selector; 351 private final ArrayList<AsyncEvent> pending; 352 private int interestOps; 353 354 SelectorAttachment(SelectableChannel chan, Selector selector) { 355 this.pending = new ArrayList<>(); 356 this.chan = chan; 357 this.selector = selector; 358 } 359 360 void register(AsyncEvent e) throws ClosedChannelException { 361 int newOps = e.interestOps(); 362 boolean reRegister = (interestOps & newOps) != newOps; 363 interestOps |= newOps; 364 pending.add(e); 365 if (reRegister) { 366 // first time registration happens here also 367 chan.register(selector, interestOps, this); 368 } 369 } 370 371 /** 372 * Returns a Stream<AsyncEvents> containing only events that are 373 * registered with the given {@code interestOps}. 374 */ 375 Stream<AsyncEvent> events(int interestOps) { 376 return pending.stream() 377 .filter(ev -> (ev.interestOps() & interestOps) != 0); 378 } 379 380 /** 381 * Removes any events with the given {@code interestOps}, and if no 382 * events remaining, cancels the associated SelectionKey. 383 */ 384 void resetInterestOps(int interestOps) { 385 int newOps = 0; 386 387 Iterator<AsyncEvent> itr = pending.iterator(); 388 while (itr.hasNext()) { 389 AsyncEvent event = itr.next(); 390 int evops = event.interestOps(); 391 if (event.repeating()) { 392 newOps |= evops; 393 continue; 394 } 395 if ((evops & interestOps) != 0) { 396 itr.remove(); 397 } else { 398 newOps |= evops; 399 } 400 } 401 402 this.interestOps = newOps; 403 SelectionKey key = chan.keyFor(selector); 404 if (newOps == 0) { 405 key.cancel(); 406 } else { 407 key.interestOps(newOps); 408 } 409 } 410 } 411 412 /** 413 * Creates a HttpRequest associated with this group. 414 * 415 * @throws IllegalStateException 416 * if the group has been stopped 417 */ 418 @Override 419 public HttpRequestBuilderImpl request() { 420 return new HttpRequestBuilderImpl(this, null); 421 } 422 423 /** 424 * Creates a HttpRequest associated with this group. 425 * 426 * @throws IllegalStateException 427 * if the group has been stopped 428 */ 429 @Override 430 public HttpRequestBuilderImpl request(URI uri) { 431 return new HttpRequestBuilderImpl(this, uri); 432 } 433 434 @Override 435 public SSLContext sslContext() { 436 Utils.checkNetPermission("getSSLContext"); 437 return sslContext; 438 } 439 440 @Override 441 public Optional<SSLParameters> sslParameters() { 442 return Optional.ofNullable(sslParams); 443 } 444 445 @Override 446 public Optional<Authenticator> authenticator() { 447 return Optional.ofNullable(authenticator); 448 } 449 450 @Override 451 public ExecutorService executorService() { 452 return executor.userExecutor(); 453 } 454 455 ExecutorWrapper executorWrapper() { 456 return executor; 457 } 458 459 @Override 460 public boolean pipelining() { 461 return this.pipelining; 462 } 463 464 ConnectionPool connectionPool() { 465 return connections; 466 } 467 468 @Override 469 public Redirect followRedirects() { 470 return followRedirects; 471 } 472 473 474 @Override 475 public Optional<CookieManager> cookieManager() { 476 return Optional.ofNullable(cookieManager); 477 } 478 479 @Override 480 public Optional<ProxySelector> proxy() { 481 return Optional.ofNullable(this.proxySelector); 482 } 483 484 @Override 485 public Version version() { 486 return version; 487 } 488 489 //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>(); 490 491 boolean getHttp2Allowed() { 492 return version.equals(Version.HTTP_2); 493 } 494 495 private void initFilters() { 496 addFilter(AuthenticationFilter.class); 497 addFilter(RedirectFilter.class); 498 } 499 500 private void addFilter(Class<? extends HeaderFilter> f) { 501 filters.addFilter(f); 502 } 503 504 final List<HeaderFilter> filterChain() { 505 return filters.getFilterChain(); 506 } 507 508 // Timer controls. Timers are implemented through timed Selector.select() 509 // calls. 510 synchronized void registerTimer(TimeoutEvent event) { 511 long elapse = event.timevalMillis(); 512 ListIterator<TimeoutEvent> iter = timeouts.listIterator(); 513 long listval = 0; 514 event.delta = event.timeval; // in case list empty 515 TimeoutEvent next; 516 while (iter.hasNext()) { 517 next = iter.next(); 518 listval += next.delta; 519 if (elapse < listval) { 520 listval -= next.delta; 521 event.delta = elapse - listval; 522 next.delta -= event.delta; 523 iter.previous(); 524 break; 525 } else if (!iter.hasNext()) { 526 event.delta = event.timeval - listval; 527 } 528 } 529 iter.add(event); 530 selmgr.wakeupSelector(); 531 } 532 533 private synchronized void signalTimeouts(long then) { 534 if (timeouts.isEmpty()) { 535 return; 536 } 537 long now = System.currentTimeMillis(); 538 long duration = now - then; 539 ListIterator<TimeoutEvent> iter = timeouts.listIterator(); 540 TimeoutEvent event = iter.next(); 541 long delta = event.delta; 542 if (duration < delta) { 543 event.delta -= duration; 544 return; 545 } 546 event.handle(); 547 iter.remove(); 548 while (iter.hasNext()) { 549 event = iter.next(); 550 if (event.delta == 0) { 551 event.handle(); 552 iter.remove(); 553 } else { 554 event.delta += delta; 555 break; 556 } 557 } 558 } 559 560 synchronized void cancelTimer(TimeoutEvent event) { 561 ListIterator<TimeoutEvent> iter = timeouts.listIterator(); 562 while (iter.hasNext()) { 563 TimeoutEvent ev = iter.next(); 564 if (event == ev) { 565 if (iter.hasNext()) { 566 // adjust 567 TimeoutEvent next = iter.next(); 568 next.delta += ev.delta; 569 iter.previous(); 570 } 571 iter.remove(); 572 } 573 } 574 } 575 576 // used for the connection window 577 int getReceiveBufferSize() { 578 return Utils.getIntegerNetProperty( 579 "java.net.httpclient.connectionWindowSize", 256 * 1024 580 ); 581 } 582 583 // returns 0 meaning block forever, or a number of millis to block for 584 private synchronized long getTimeoutValue() { 585 if (timeouts.isEmpty()) { 586 return 0; 587 } else { 588 return timeouts.get(0).delta; 589 } 590 } 591 }