1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 */ 24 package java.net.http; 25 26 import java.io.IOException; 27 import java.net.Authenticator; 28 import java.net.CookieManager; 29 import java.net.ProxySelector; 30 import java.net.URI; 31 import static java.net.http.Utils.BUFSIZE; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.ClosedChannelException; 34 import java.nio.channels.SelectableChannel; 35 import java.nio.channels.SelectionKey; 36 import static java.nio.channels.SelectionKey.OP_CONNECT; 37 import static java.nio.channels.SelectionKey.OP_READ; 38 import static java.nio.channels.SelectionKey.OP_WRITE; 39 import java.nio.channels.Selector; 40 import java.util.LinkedList; 41 import java.util.List; 42 import java.util.Set; 43 import java.util.concurrent.ExecutorService; 44 import java.security.NoSuchAlgorithmException; 45 import java.util.Iterator; 46 import java.util.ListIterator; 47 import java.util.Optional; 48 import java.util.concurrent.Executors; 49 import java.util.concurrent.ThreadFactory; 50 import javax.net.ssl.SSLContext; 51 import javax.net.ssl.SSLParameters; 52 53 /** 54 * Client implementation. Contains all configuration information and also 55 * the selector manager thread which allows async events to be registered 56 * and delivered when they occur. See AsyncEvent. 57 */ 58 class HttpClientImpl extends HttpClient implements BufferHandler { 59 60 private final CookieManager cookieManager; 61 private final Redirect followRedirects; 62 private final ProxySelector proxySelector; 63 private final Authenticator authenticator; 64 private final Version version; 65 private boolean pipelining = false; 66 private final ConnectionPool connections; 67 private final ExecutorWrapper executor; 68 // Security parameters 69 private final SSLContext sslContext; 70 private final SSLParameters sslParams; 71 private final SelectorManager selmgr; 72 private final FilterFactory filters; 73 private final Http2ClientImpl client2; 74 private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); 75 private final LinkedList<TimeoutEvent> timeouts; 76 77 //@Override 78 void debugPrint() { 79 selmgr.debugPrint(); 80 client2.debugPrint(); 81 } 82 83 public static HttpClientImpl create(HttpClientBuilderImpl builder) { 84 HttpClientImpl impl = new HttpClientImpl(builder); 85 impl.start(); 86 return impl; 87 } 88 89 private HttpClientImpl(HttpClientBuilderImpl builder) { 90 if (builder.sslContext == null) { 91 try { 92 sslContext = SSLContext.getDefault(); 93 } catch (NoSuchAlgorithmException ex) { 94 throw new InternalError(ex); 95 } 96 } else { 97 sslContext = builder.sslContext; 98 } 99 ExecutorService ex = builder.executor; 100 if (ex == null) { 101 ex = Executors.newCachedThreadPool((r) -> { 102 Thread t = defaultFactory.newThread(r); 103 t.setDaemon(true); 104 return t; 105 }); 106 } else { 107 ex = builder.executor; 108 } 109 client2 = new Http2ClientImpl(this); 110 executor = ExecutorWrapper.wrap(ex); 111 cookieManager = builder.cookieManager; 112 followRedirects = builder.followRedirects == null ? 113 Redirect.NEVER : builder.followRedirects; 114 this.proxySelector = builder.proxy; 115 authenticator = builder.authenticator; 116 version = builder.version; 117 sslParams = builder.sslParams; 118 connections = new ConnectionPool(); 119 connections.start(); 120 timeouts = new LinkedList<>(); 121 try { 122 selmgr = new SelectorManager(); 123 } catch (IOException e) { 124 // unlikely 125 throw new InternalError(e); 126 } 127 selmgr.setDaemon(true); 128 selmgr.setName("HttpSelector"); 129 filters = new FilterFactory(); 130 initFilters(); 131 } 132 133 private void start() { 134 selmgr.start(); 135 } 136 137 /** 138 * Wait for activity on given exchange (assuming blocking = false). 139 * It's a no-op if blocking = true. In particular, the following occurs 140 * in the SelectorManager thread. 141 * 142 * 1) mark the connection non-blocking 143 * 2) add to selector 144 * 3) If selector fires for this exchange then 145 * 4) - mark connection as blocking 146 * 5) - call AsyncEvent.handle() 147 * 148 * If exchange needs to block again, then call registerEvent() again 149 */ 150 void registerEvent(AsyncEvent exchange) throws IOException { 151 selmgr.register(exchange); 152 } 153 154 Http2ClientImpl client2() { 155 return client2; 156 } 157 158 LinkedList<ByteBuffer> freelist = new LinkedList<>(); 159 160 @Override 161 public synchronized ByteBuffer getBuffer() { 162 if (freelist.isEmpty()) { 163 return ByteBuffer.allocate(BUFSIZE); 164 } 165 return freelist.removeFirst(); 166 } 167 168 @Override 169 public synchronized void returnBuffer(ByteBuffer buffer) { 170 buffer.clear(); 171 freelist.add(buffer); 172 } 173 174 175 // Main loop for this client's selector 176 177 class SelectorManager extends Thread { 178 final Selector selector; 179 boolean closed; 180 181 final List<AsyncEvent> readyList; 182 final List<AsyncEvent> registrations; 183 184 List<AsyncEvent> debugList; 185 186 SelectorManager() throws IOException { 187 readyList = new LinkedList<>(); 188 registrations = new LinkedList<>(); 189 debugList = new LinkedList<>(); 190 selector = Selector.open(); 191 } 192 193 // This returns immediately. So caller not allowed to send/receive 194 // on connection. 195 196 synchronized void register(AsyncEvent e) throws IOException { 197 registrations.add(e); 198 selector.wakeup(); 199 } 200 201 void wakeupSelector() { 202 selector.wakeup(); 203 } 204 205 synchronized void shutdown() { 206 closed = true; 207 try { 208 selector.close(); 209 } catch (IOException e) {} 210 } 211 212 private List<AsyncEvent> copy(List<AsyncEvent> list) { 213 LinkedList<AsyncEvent> c = new LinkedList<>(); 214 for (AsyncEvent e : list) { 215 c.add(e); 216 } 217 return c; 218 } 219 220 synchronized void debugPrint() { 221 System.err.println("Selecting on:"); 222 for (AsyncEvent e : debugList) { 223 System.err.println(opvals(e.interestOps())); 224 } 225 } 226 227 String opvals(int i) { 228 StringBuilder sb = new StringBuilder(); 229 if ((i & OP_READ) != 0) 230 sb.append("OP_READ "); 231 if ((i & OP_CONNECT) != 0) 232 sb.append("OP_CONNECT "); 233 if ((i & OP_WRITE) != 0) 234 sb.append("OP_WRITE "); 235 return sb.toString(); 236 } 237 238 @Override 239 public void run() { 240 try { 241 while (true) { 242 synchronized (this) { 243 debugList = copy(registrations); 244 for (AsyncEvent exchange : registrations) { 245 SelectableChannel c = exchange.channel(); 246 try { 247 c.configureBlocking(false); 248 SelectionKey key = c.keyFor(selector); 249 SelectorAttachment sa; 250 if (key == null) { 251 sa = new SelectorAttachment(c); 252 } else { 253 sa = (SelectorAttachment)key.attachment(); 254 } 255 sa.register(exchange); 256 } catch (IOException e) { 257 Log.logError("HttpClientImpl: " + e); 258 c.close(); 259 // let the exchange deal with it 260 handleEvent(exchange); 261 } 262 } 263 registrations.clear(); 264 } 265 long timeval = getTimeoutValue(); 266 long now = System.currentTimeMillis(); 267 int n = selector.select(timeval); 268 if (n == 0) { 269 signalTimeouts(now); 270 continue; 271 } 272 Set<SelectionKey> keys = selector.selectedKeys(); 273 274 for (SelectionKey key : keys) { 275 SelectorAttachment sa = (SelectorAttachment)key.attachment(); 276 int eventsOccurred = key.readyOps(); 277 for (AsyncEvent ev : sa.events(eventsOccurred)) { 278 readyList.add(ev); 279 } 280 sa.resetInterestOps(eventsOccurred); 281 } 282 selector.selectNow(); // complete cancellation 283 selector.selectedKeys().clear(); 284 285 for (AsyncEvent exchange : readyList) { 286 if (exchange instanceof AsyncEvent.Blocking) { 287 exchange.channel().configureBlocking(true); 288 } else { 289 assert exchange instanceof AsyncEvent.NonBlocking; 290 } 291 executor.synchronize(); 292 handleEvent(exchange); // will be delegated to executor 293 } 294 readyList.clear(); 295 } 296 } catch (Throwable e) { 297 if (!closed) { 298 System.err.println("HttpClientImpl terminating on error"); 299 // This terminates thread. So, better just print stack trace 300 String err = Utils.stackTrace(e); 301 Log.logError("HttpClientImpl: fatal error: " + err); 302 } 303 } 304 } 305 306 void handleEvent(AsyncEvent e) { 307 if (closed) { 308 e.abort(); 309 } else { 310 e.handle(); 311 } 312 } 313 314 /** 315 * Tracks multiple user level registrations associated with one 316 * NIO registration (SelectionKey). In this implementation, 317 * registrations are one-off and when an event is posted 318 * the registration is cancelled until explicitly registered 319 * again. 320 * no external synchronization required as this class is only used 321 * by the SelectorManager thread. One of these objects 322 * required per connection. 323 */ 324 private class SelectorAttachment { 325 int interestops; 326 final SelectableChannel chan; 327 final LinkedList<AsyncEvent> pending; 328 329 SelectorAttachment(SelectableChannel chan) { 330 this.pending = new LinkedList<>(); 331 this.chan = chan; 332 } 333 334 void register(AsyncEvent e) throws ClosedChannelException { 335 int newops = e.interestOps(); 336 boolean reRegister = (interestops & newops) != newops; 337 this.interestops |= newops; 338 pending.add(e); 339 if (reRegister) { 340 // first time registration happens here also 341 chan.register(selector, interestops, this); 342 } 343 } 344 345 int interestOps() { 346 return interestops; 347 } 348 349 /** 350 * Returns Iterator<AsyncEvents> containing only events that are 351 * registered with the given interestops 352 * 353 * @param type 354 * @return 355 */ 356 Iterable<AsyncEvent> events(int interestop) { 357 return new Iterable<AsyncEvent>() { 358 @Override 359 public Iterator<AsyncEvent> iterator() { 360 return pending.stream() 361 .filter(ev -> (ev.interestOps() & interestop) != 0) 362 .iterator(); 363 } 364 }; 365 } 366 367 /** 368 * Remove any events with the given interestop, and if no events 369 * remaining, cancel the associated SelectionKey. 370 * 371 * @param interestops 372 */ 373 void resetInterestOps(int interestop) { 374 int size = pending.size(); 375 int newops = 0; 376 for (int i=0; i<size; i++) { 377 AsyncEvent ev = pending.get(i); 378 int evops = ev.interestOps(); 379 if ((evops & interestop) != 0) { 380 pending.remove(i); 381 } else { 382 newops |= evops; 383 } 384 } 385 this.interestops = newops; 386 SelectionKey key = chan.keyFor(selector); 387 if (newops == 0) { 388 key.cancel(); 389 } else { 390 key.interestOps(newops); 391 } 392 } 393 } 394 } 395 396 /** 397 * Creates a HttpRequest associated with this group. 398 * 399 * @throws IllegalStateException if the group has been stopped 400 */ 401 @Override 402 public HttpRequestBuilderImpl request() { 403 return new HttpRequestBuilderImpl(this, null); 404 } 405 406 /** 407 * Creates a HttpRequest associated with this group. 408 * 409 * @throws IllegalStateException if the group has been stopped 410 */ 411 @Override 412 public HttpRequestBuilderImpl request(URI uri) { 413 return new HttpRequestBuilderImpl(this, uri); 414 } 415 416 @Override 417 public SSLContext sslContext() { 418 Utils.checkNetPermission("getSSLContext"); 419 return sslContext; 420 } 421 422 @Override 423 public Optional<SSLParameters> sslParameters() { 424 return Optional.ofNullable(sslParams); 425 } 426 427 @Override 428 public Optional<Authenticator> authenticator() { 429 return Optional.ofNullable(authenticator); 430 } 431 432 @Override 433 public ExecutorService executorService() { 434 return executor.userExecutor(); 435 } 436 437 ExecutorWrapper executorWrapper() { 438 return executor; 439 } 440 441 @Override 442 public boolean pipelining() { 443 return this.pipelining; 444 } 445 446 ConnectionPool connectionPool() { 447 return connections; 448 } 449 450 @Override 451 public Redirect followRedirects() { 452 return followRedirects; 453 } 454 455 456 @Override 457 public Optional<CookieManager> cookieManager() { 458 return Optional.ofNullable(cookieManager); 459 } 460 461 @Override 462 public Optional<ProxySelector> proxy() { 463 return Optional.ofNullable(this.proxySelector); 464 } 465 466 @Override 467 public Version version() { 468 return version; 469 } 470 471 //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>(); 472 473 boolean getHttp2Allowed() { 474 return version.equals(Version.HTTP_2); 475 } 476 477 //void setHttp2NotSupported(String host) { 478 //http2NotSupported.put(host, false); 479 //} 480 481 final void initFilters() { 482 addFilter(AuthenticationFilter.class); 483 addFilter(RedirectFilter.class); 484 } 485 486 final void addFilter(Class<? extends HeaderFilter> f) { 487 filters.addFilter(f); 488 } 489 490 final List<HeaderFilter> filterChain() { 491 return filters.getFilterChain(); 492 } 493 494 // Timer controls. Timers are implemented through timed Selector.select() 495 // calls. 496 synchronized void registerTimer(TimeoutEvent event) { 497 long elapse = event.timevalMillis(); 498 ListIterator<TimeoutEvent> iter = timeouts.listIterator(); 499 long listval = 0; 500 event.delta = event.timeval; // in case list empty 501 TimeoutEvent next; 502 while (iter.hasNext()) { 503 next = iter.next(); 504 listval += next.delta; 505 if (elapse < listval) { 506 listval -= next.delta; 507 event.delta = elapse - listval; 508 next.delta -= event.delta; 509 iter.previous(); 510 break; 511 } else if (!iter.hasNext()) { 512 event.delta = event.timeval - listval ; 513 } 514 } 515 iter.add(event); 516 //debugPrintList("register"); 517 selmgr.wakeupSelector(); 518 } 519 520 void debugPrintList(String s) { 521 System.err.printf("%s: {", s); 522 for (TimeoutEvent e : timeouts) { 523 System.err.printf("(%d,%d) ", e.delta, e.timeval); 524 } 525 System.err.println("}"); 526 } 527 528 synchronized void signalTimeouts(long then) { 529 if (timeouts.isEmpty()) { 530 return; 531 } 532 long now = System.currentTimeMillis(); 533 long duration = now - then; 534 ListIterator<TimeoutEvent> iter = timeouts.listIterator(); 535 TimeoutEvent event = iter.next(); 536 long delta = event.delta; 537 if (duration < delta) { 538 event.delta -= duration; 539 return; 540 } 541 event.handle(); 542 iter.remove(); 543 while (iter.hasNext()) { 544 event = iter.next(); 545 if (event.delta == 0) { 546 event.handle(); 547 iter.remove(); 548 } else { 549 event.delta += delta; 550 break; 551 } 552 } 553 //debugPrintList("signalTimeouts"); 554 } 555 556 synchronized void cancelTimer(TimeoutEvent event) { 557 ListIterator<TimeoutEvent> iter = timeouts.listIterator(); 558 while (iter.hasNext()) { 559 TimeoutEvent ev = iter.next(); 560 if (event == ev) { 561 if (iter.hasNext()) { 562 // adjust 563 TimeoutEvent next = iter.next(); 564 next.delta += ev.delta; 565 iter.previous(); 566 } 567 iter.remove(); 568 } 569 } 570 } 571 572 // used for the connection window 573 int getReceiveBufferSize() { 574 return Utils.getIntegerNetProperty( 575 "sun.net.httpclient.connectionWindowSize", 256 * 1024 576 ); 577 } 578 579 // returns 0 meaning block forever, or a number of millis to block for 580 synchronized long getTimeoutValue() { 581 if (timeouts.isEmpty()) { 582 return 0; 583 } else { 584 return timeouts.get(0).delta; 585 } 586 } 587 }