1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 */ 24 package java.net.http; 25 26 import java.io.IOException; 27 import java.net.Authenticator; 28 import java.net.CookieManager; 29 import java.net.ProxySelector; 30 import java.net.URI; 31 import static java.net.http.Utils.BUFSIZE; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.ClosedChannelException; 34 import java.nio.channels.SelectableChannel; 35 import java.nio.channels.SelectionKey; 36 import static java.nio.channels.SelectionKey.OP_CONNECT; 37 import static java.nio.channels.SelectionKey.OP_READ; 38 import static java.nio.channels.SelectionKey.OP_WRITE; 39 import java.nio.channels.Selector; 40 import java.util.*; 41 import java.util.stream.Stream; 42 import java.util.concurrent.ExecutorService; 43 import java.security.NoSuchAlgorithmException; 44 import java.util.concurrent.Executors; 45 import java.util.concurrent.ThreadFactory; 46 import javax.net.ssl.SSLContext; 47 import javax.net.ssl.SSLParameters; 48 49 /** 50 * Client implementation. Contains all configuration information and also 51 * the selector manager thread which allows async events to be registered 52 * and delivered when they occur. See AsyncEvent. 53 */ 54 class HttpClientImpl extends HttpClient implements BufferHandler { 55 56 private final CookieManager cookieManager; 57 private final Redirect followRedirects; 58 private final ProxySelector proxySelector; 59 private final Authenticator authenticator; 60 private final Version version; 61 private boolean pipelining = false; 62 private final ConnectionPool connections; 63 private final ExecutorWrapper executor; 64 // Security parameters 65 private final SSLContext sslContext; 66 private final SSLParameters sslParams; 67 private final SelectorManager selmgr; 68 private final FilterFactory filters; 69 private final Http2ClientImpl client2; 70 private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); 71 private final LinkedList<TimeoutEvent> timeouts; 72 73 //@Override 74 void debugPrint() { 75 selmgr.debugPrint(); 76 client2.debugPrint(); 77 } 78 79 public static HttpClientImpl create(HttpClientBuilderImpl builder) { 80 HttpClientImpl impl = new HttpClientImpl(builder); 81 impl.start(); 82 return impl; 83 } 84 85 private HttpClientImpl(HttpClientBuilderImpl builder) { 86 if (builder.sslContext == null) { 87 try { 88 sslContext = SSLContext.getDefault(); 89 } catch (NoSuchAlgorithmException ex) { 90 throw new InternalError(ex); 91 } 92 } else { 93 sslContext = builder.sslContext; 94 } 95 ExecutorService ex = builder.executor; 96 if (ex == null) { 97 ex = Executors.newCachedThreadPool((r) -> { 98 Thread t = defaultFactory.newThread(r); 99 t.setDaemon(true); 100 return t; 101 }); 102 } else { 103 ex = builder.executor; 104 } 105 client2 = new Http2ClientImpl(this); 106 executor = ExecutorWrapper.wrap(ex); 107 cookieManager = builder.cookieManager; 108 followRedirects = builder.followRedirects == null ? 109 Redirect.NEVER : builder.followRedirects; 110 this.proxySelector = builder.proxy; 111 authenticator = builder.authenticator; 112 version = builder.version; 113 sslParams = builder.sslParams; 114 connections = new ConnectionPool(); 115 connections.start(); 116 timeouts = new LinkedList<>(); 117 try { 118 selmgr = new SelectorManager(); 119 } catch (IOException e) { 120 // unlikely 121 throw new InternalError(e); 122 } 123 selmgr.setDaemon(true); 124 selmgr.setName("HttpSelector"); 125 filters = new FilterFactory(); 126 initFilters(); 127 } 128 129 private void start() { 130 selmgr.start(); 131 } 132 133 /** 134 * Wait for activity on given exchange (assuming blocking = false). 135 * It's a no-op if blocking = true. In particular, the following occurs 136 * in the SelectorManager thread. 137 * 138 * 1) mark the connection non-blocking 139 * 2) add to selector 140 * 3) If selector fires for this exchange then 141 * 4) - mark connection as blocking 142 * 5) - call AsyncEvent.handle() 143 * 144 * If exchange needs to block again, then call registerEvent() again 145 */ 146 void registerEvent(AsyncEvent exchange) throws IOException { 147 selmgr.register(exchange); 148 } 149 150 Http2ClientImpl client2() { 151 return client2; 152 } 153 154 LinkedList<ByteBuffer> freelist = new LinkedList<>(); 155 156 @Override 157 public synchronized ByteBuffer getBuffer() { 158 if (freelist.isEmpty()) { 159 return ByteBuffer.allocate(BUFSIZE); 160 } 161 return freelist.removeFirst(); 162 } 163 164 @Override 165 public synchronized void returnBuffer(ByteBuffer buffer) { 166 buffer.clear(); 167 freelist.add(buffer); 168 } 169 170 171 // Main loop for this client's selector 172 173 class SelectorManager extends Thread { 174 final Selector selector; 175 boolean closed; 176 177 final List<AsyncEvent> readyList; 178 final List<AsyncEvent> registrations; 179 180 List<AsyncEvent> debugList; 181 182 SelectorManager() throws IOException { 183 readyList = new LinkedList<>(); 184 registrations = new LinkedList<>(); 185 debugList = new LinkedList<>(); 186 selector = Selector.open(); 187 } 188 189 // This returns immediately. So caller not allowed to send/receive 190 // on connection. 191 192 synchronized void register(AsyncEvent e) throws IOException { 193 registrations.add(e); 194 selector.wakeup(); 195 } 196 197 void wakeupSelector() { 198 selector.wakeup(); 199 } 200 201 synchronized void shutdown() { 202 closed = true; 203 try { 204 selector.close(); 205 } catch (IOException e) {} 206 } 207 208 private List<AsyncEvent> copy(List<AsyncEvent> list) { 209 LinkedList<AsyncEvent> c = new LinkedList<>(); 210 for (AsyncEvent e : list) { 211 c.add(e); 212 } 213 return c; 214 } 215 216 synchronized void debugPrint() { 217 System.err.println("Selecting on:"); 218 for (AsyncEvent e : debugList) { 219 System.err.println(opvals(e.interestOps())); 220 } 221 } 222 223 String opvals(int i) { 224 StringBuilder sb = new StringBuilder(); 225 if ((i & OP_READ) != 0) 226 sb.append("OP_READ "); 227 if ((i & OP_CONNECT) != 0) 228 sb.append("OP_CONNECT "); 229 if ((i & OP_WRITE) != 0) 230 sb.append("OP_WRITE "); 231 return sb.toString(); 232 } 233 234 @Override 235 public void run() { 236 try { 237 while (true) { 238 synchronized (this) { 239 debugList = copy(registrations); 240 for (AsyncEvent exchange : registrations) { 241 SelectableChannel c = exchange.channel(); 242 try { 243 c.configureBlocking(false); 244 SelectionKey key = c.keyFor(selector); 245 SelectorAttachment sa; 246 if (key == null) { 247 sa = new SelectorAttachment(c, selector); 248 } else { 249 sa = (SelectorAttachment)key.attachment(); 250 } 251 sa.register(exchange); 252 } catch (IOException e) { 253 Log.logError("HttpClientImpl: " + e); 254 c.close(); 255 // let the exchange deal with it 256 handleEvent(exchange); 257 } 258 } 259 registrations.clear(); 260 } 261 long timeval = getTimeoutValue(); 262 long now = System.currentTimeMillis(); 263 int n = selector.select(timeval); 264 if (n == 0) { 265 signalTimeouts(now); 266 continue; 267 } 268 Set<SelectionKey> keys = selector.selectedKeys(); 269 270 for (SelectionKey key : keys) { 271 SelectorAttachment sa = (SelectorAttachment)key.attachment(); 272 int eventsOccurred = key.readyOps(); 273 sa.events(eventsOccurred).forEach(readyList::add); 274 sa.resetInterestOps(eventsOccurred); 275 } 276 selector.selectNow(); // complete cancellation 277 selector.selectedKeys().clear(); 278 279 for (AsyncEvent exchange : readyList) { 280 if (exchange instanceof AsyncEvent.Blocking) { 281 exchange.channel().configureBlocking(true); 282 } else { 283 assert exchange instanceof AsyncEvent.NonBlocking; 284 } 285 executor.synchronize(); 286 handleEvent(exchange); // will be delegated to executor 287 } 288 readyList.clear(); 289 } 290 } catch (Throwable e) { 291 if (!closed) { 292 System.err.println("HttpClientImpl terminating on error"); 293 // This terminates thread. So, better just print stack trace 294 String err = Utils.stackTrace(e); 295 Log.logError("HttpClientImpl: fatal error: " + err); 296 } 297 } 298 } 299 300 void handleEvent(AsyncEvent e) { 301 if (closed) { 302 e.abort(); 303 } else { 304 e.handle(); 305 } 306 } 307 } 308 309 /** 310 * Tracks multiple user level registrations associated with one NIO 311 * registration (SelectionKey). In this implementation, registrations 312 * are one-off and when an event is posted the registration is cancelled 313 * until explicitly registered again. 314 * 315 * <p> No external synchronization required as this class is only used 316 * by the SelectorManager thread. One of these objects required per 317 * connection. 318 */ 319 private static class SelectorAttachment { 320 private final SelectableChannel chan; 321 private final Selector selector; 322 private final ArrayList<AsyncEvent> pending; 323 private int interestops; 324 325 SelectorAttachment(SelectableChannel chan, Selector selector) { 326 this.pending = new ArrayList<>(); 327 this.chan = chan; 328 this.selector = selector; 329 } 330 331 void register(AsyncEvent e) throws ClosedChannelException { 332 int newops = e.interestOps(); 333 boolean reRegister = (interestops & newops) != newops; 334 interestops |= newops; 335 pending.add(e); 336 if (reRegister) { 337 // first time registration happens here also 338 chan.register(selector, interestops, this); 339 } 340 } 341 342 int interestOps() { 343 return interestops; 344 } 345 346 /** 347 * Returns an Iterator<AsyncEvents> containing only events that are 348 * registered with the given {@code interestop}. 349 */ 350 Stream<AsyncEvent> events(int interestop) { 351 return pending.stream() 352 .filter(ev -> (ev.interestOps() & interestop) != 0); 353 } 354 355 /** 356 * Removes any events with the given {@code interestop}, and if no 357 * events remaining, cancels the associated SelectionKey. 358 */ 359 void resetInterestOps(int interestop) { 360 int newops = 0; 361 362 Iterator<AsyncEvent> itr = pending.iterator(); 363 while (itr.hasNext()) { 364 AsyncEvent event = itr.next(); 365 int evops = event.interestOps(); 366 if ((evops & interestop) != 0) { 367 itr.remove(); 368 } else { 369 newops |= evops; 370 } 371 } 372 373 interestops = newops; 374 SelectionKey key = chan.keyFor(selector); 375 if (newops == 0) { 376 key.cancel(); 377 } else { 378 key.interestOps(newops); 379 } 380 } 381 } 382 383 /** 384 * Creates a HttpRequest associated with this group. 385 * 386 * @throws IllegalStateException if the group has been stopped 387 */ 388 @Override 389 public HttpRequestBuilderImpl request() { 390 return new HttpRequestBuilderImpl(this, null); 391 } 392 393 /** 394 * Creates a HttpRequest associated with this group. 395 * 396 * @throws IllegalStateException if the group has been stopped 397 */ 398 @Override 399 public HttpRequestBuilderImpl request(URI uri) { 400 return new HttpRequestBuilderImpl(this, uri); 401 } 402 403 @Override 404 public SSLContext sslContext() { 405 Utils.checkNetPermission("getSSLContext"); 406 return sslContext; 407 } 408 409 @Override 410 public Optional<SSLParameters> sslParameters() { 411 return Optional.ofNullable(sslParams); 412 } 413 414 @Override 415 public Optional<Authenticator> authenticator() { 416 return Optional.ofNullable(authenticator); 417 } 418 419 @Override 420 public ExecutorService executorService() { 421 return executor.userExecutor(); 422 } 423 424 ExecutorWrapper executorWrapper() { 425 return executor; 426 } 427 428 @Override 429 public boolean pipelining() { 430 return this.pipelining; 431 } 432 433 ConnectionPool connectionPool() { 434 return connections; 435 } 436 437 @Override 438 public Redirect followRedirects() { 439 return followRedirects; 440 } 441 442 443 @Override 444 public Optional<CookieManager> cookieManager() { 445 return Optional.ofNullable(cookieManager); 446 } 447 448 @Override 449 public Optional<ProxySelector> proxy() { 450 return Optional.ofNullable(this.proxySelector); 451 } 452 453 @Override 454 public Version version() { 455 return version; 456 } 457 458 //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>(); 459 460 boolean getHttp2Allowed() { 461 return version.equals(Version.HTTP_2); 462 } 463 464 //void setHttp2NotSupported(String host) { 465 //http2NotSupported.put(host, false); 466 //} 467 468 final void initFilters() { 469 addFilter(AuthenticationFilter.class); 470 addFilter(RedirectFilter.class); 471 } 472 473 final void addFilter(Class<? extends HeaderFilter> f) { 474 filters.addFilter(f); 475 } 476 477 final List<HeaderFilter> filterChain() { 478 return filters.getFilterChain(); 479 } 480 481 // Timer controls. Timers are implemented through timed Selector.select() 482 // calls. 483 synchronized void registerTimer(TimeoutEvent event) { 484 long elapse = event.timevalMillis(); 485 ListIterator<TimeoutEvent> iter = timeouts.listIterator(); 486 long listval = 0; 487 event.delta = event.timeval; // in case list empty 488 TimeoutEvent next; 489 while (iter.hasNext()) { 490 next = iter.next(); 491 listval += next.delta; 492 if (elapse < listval) { 493 listval -= next.delta; 494 event.delta = elapse - listval; 495 next.delta -= event.delta; 496 iter.previous(); 497 break; 498 } else if (!iter.hasNext()) { 499 event.delta = event.timeval - listval ; 500 } 501 } 502 iter.add(event); 503 //debugPrintList("register"); 504 selmgr.wakeupSelector(); 505 } 506 507 void debugPrintList(String s) { 508 System.err.printf("%s: {", s); 509 for (TimeoutEvent e : timeouts) { 510 System.err.printf("(%d,%d) ", e.delta, e.timeval); 511 } 512 System.err.println("}"); 513 } 514 515 synchronized void signalTimeouts(long then) { 516 if (timeouts.isEmpty()) { 517 return; 518 } 519 long now = System.currentTimeMillis(); 520 long duration = now - then; 521 ListIterator<TimeoutEvent> iter = timeouts.listIterator(); 522 TimeoutEvent event = iter.next(); 523 long delta = event.delta; 524 if (duration < delta) { 525 event.delta -= duration; 526 return; 527 } 528 event.handle(); 529 iter.remove(); 530 while (iter.hasNext()) { 531 event = iter.next(); 532 if (event.delta == 0) { 533 event.handle(); 534 iter.remove(); 535 } else { 536 event.delta += delta; 537 break; 538 } 539 } 540 //debugPrintList("signalTimeouts"); 541 } 542 543 synchronized void cancelTimer(TimeoutEvent event) { 544 ListIterator<TimeoutEvent> iter = timeouts.listIterator(); 545 while (iter.hasNext()) { 546 TimeoutEvent ev = iter.next(); 547 if (event == ev) { 548 if (iter.hasNext()) { 549 // adjust 550 TimeoutEvent next = iter.next(); 551 next.delta += ev.delta; 552 iter.previous(); 553 } 554 iter.remove(); 555 } 556 } 557 } 558 559 // used for the connection window 560 int getReceiveBufferSize() { 561 return Utils.getIntegerNetProperty( 562 "sun.net.httpclient.connectionWindowSize", 256 * 1024 563 ); 564 } 565 566 // returns 0 meaning block forever, or a number of millis to block for 567 synchronized long getTimeoutValue() { 568 if (timeouts.isEmpty()) { 569 return 0; 570 } else { 571 return timeouts.get(0).delta; 572 } 573 } 574 }