1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.net.http;
  27 
  28 import javax.net.ssl.SSLContext;
  29 import javax.net.ssl.SSLParameters;
  30 import java.io.IOException;
  31 import java.net.Authenticator;
  32 import java.net.CookieManager;
  33 import java.net.ProxySelector;
  34 import java.net.URI;
  35 import java.nio.ByteBuffer;
  36 import java.nio.channels.ClosedChannelException;
  37 import java.nio.channels.SelectableChannel;
  38 import java.nio.channels.SelectionKey;
  39 import java.nio.channels.Selector;
  40 import java.nio.channels.SocketChannel;
  41 import java.security.NoSuchAlgorithmException;
  42 import java.util.ArrayList;
  43 import java.util.Iterator;
  44 import java.util.LinkedList;
  45 import java.util.List;
  46 import java.util.ListIterator;
  47 import java.util.Optional;
  48 import java.util.Set;
  49 import java.util.concurrent.ExecutorService;
  50 import java.util.concurrent.Executors;
  51 import java.util.concurrent.ThreadFactory;
  52 import java.util.stream.Stream;
  53 
  54 import static java.net.http.Utils.BUFSIZE;
  55 
  56 /**
  57  * Client implementation. Contains all configuration information and also
  58  * the selector manager thread which allows async events to be registered
  59  * and delivered when they occur. See AsyncEvent.
  60  */
  61 class HttpClientImpl extends HttpClient implements BufferHandler {
  62 
  63     private static final ThreadFactory defaultFactory =
  64             (r -> new Thread(null, r, "HttpClient_worker", 0, true));
  65 
  66     private final CookieManager cookieManager;
  67     private final Redirect followRedirects;
  68     private final ProxySelector proxySelector;
  69     private final Authenticator authenticator;
  70     private final Version version;
  71     private boolean pipelining = false;
  72     private final ConnectionPool connections;
  73     private final ExecutorWrapper executor;
  74     // Security parameters
  75     private final SSLContext sslContext;
  76     private final SSLParameters sslParams;
  77     private final SelectorManager selmgr;
  78     private final FilterFactory filters;
  79     private final Http2ClientImpl client2;
  80     private final LinkedList<TimeoutEvent> timeouts;
  81 
  82     public static HttpClientImpl create(HttpClientBuilderImpl builder) {
  83         HttpClientImpl impl = new HttpClientImpl(builder);
  84         impl.start();
  85         return impl;
  86     }
  87 
  88     private HttpClientImpl(HttpClientBuilderImpl builder) {
  89         if (builder.sslContext == null) {
  90             try {
  91                 sslContext = SSLContext.getDefault();
  92             } catch (NoSuchAlgorithmException ex) {
  93                 throw new InternalError(ex);
  94             }
  95         } else {
  96             sslContext = builder.sslContext;
  97         }
  98         ExecutorService ex = builder.executor;
  99         if (ex == null) {
 100             ex = Executors.newCachedThreadPool((r) -> {
 101                 Thread t = defaultFactory.newThread(r);
 102                 t.setDaemon(true);
 103                 return t;
 104             });
 105         } else {
 106             ex = builder.executor;
 107         }
 108         client2 = new Http2ClientImpl(this);
 109         executor = ExecutorWrapper.wrap(ex);
 110         cookieManager = builder.cookieManager;
 111         followRedirects = builder.followRedirects == null ?
 112                 Redirect.NEVER : builder.followRedirects;
 113         this.proxySelector = builder.proxy;
 114         authenticator = builder.authenticator;
 115         version = builder.version;
 116         if (builder.sslParams == null)
 117             sslParams = getDefaultParams(sslContext);
 118         else
 119             sslParams = builder.sslParams;
 120         connections = new ConnectionPool();
 121         connections.start();
 122         timeouts = new LinkedList<>();
 123         try {
 124             selmgr = new SelectorManager();
 125         } catch (IOException e) {
 126             // unlikely
 127             throw new InternalError(e);
 128         }
 129         selmgr.setDaemon(true);
 130         filters = new FilterFactory();
 131         initFilters();
 132     }
 133 
 134     private void start() {
 135         selmgr.start();
 136     }
 137 
 138     private static SSLParameters getDefaultParams(SSLContext ctx) {
 139         SSLParameters params = ctx.getSupportedSSLParameters();
 140         params.setProtocols(new String[]{"TLSv1.2"});
 141         return params;
 142     }
 143 
 144     /**
 145      * Wait for activity on given exchange (assuming blocking = false).
 146      * It's a no-op if blocking = true. In particular, the following occurs
 147      * in the SelectorManager thread.
 148      *
 149      *  1) mark the connection non-blocking
 150      *  2) add to selector
 151      *  3) If selector fires for this exchange then
 152      *  4)   - mark connection as blocking
 153      *  5)   - call AsyncEvent.handle()
 154      *
 155      * If exchange needs to block again, then call registerEvent() again
 156      */
 157     void registerEvent(AsyncEvent exchange) throws IOException {
 158         selmgr.register(exchange);
 159     }
 160 
 161     /**
 162      * Only used from RawChannel to disconnect the channel from
 163      * the selector
 164      */
 165     void cancelRegistration(SocketChannel s) {
 166         selmgr.cancel(s);
 167     }
 168 
 169 
 170     Http2ClientImpl client2() {
 171         return client2;
 172     }
 173 
 174     /**
 175      * We keep one size of buffer on free list. That size may increase
 176      * depending on demand. If that happens we dispose of free buffers
 177      * that are smaller than new size.
 178      */
 179     private final LinkedList<ByteBuffer> freelist = new LinkedList<>();
 180     int currentSize = BUFSIZE;
 181 
 182     @Override
 183     public synchronized ByteBuffer getBuffer(int size) {
 184 
 185         ByteBuffer buf;
 186         if (size == -1)
 187             size = currentSize;
 188 
 189         if (size > currentSize)
 190             currentSize = size;
 191 
 192         while (!freelist.isEmpty()) {
 193             buf = freelist.removeFirst();
 194             if (buf.capacity() < currentSize)
 195                 continue;
 196             buf.clear();
 197             return buf;
 198         }
 199         return ByteBuffer.allocate(size);
 200     }
 201 
 202     @Override
 203     public synchronized void returnBuffer(ByteBuffer buffer) {
 204         freelist.add(buffer);
 205     }
 206 
 207     @Override
 208     public synchronized void setMinBufferSize(int n) {
 209         currentSize = Math.max(n, currentSize);
 210     }
 211 
 212     // Main loop for this client's selector
 213     private final class SelectorManager extends Thread {
 214 
 215         private final Selector selector;
 216         private volatile boolean closed;
 217         private final List<AsyncEvent> readyList;
 218         private final List<AsyncEvent> registrations;
 219 
 220         SelectorManager() throws IOException {
 221             super(null, null, "SelectorManager", 0, false);
 222             readyList = new ArrayList<>();
 223             registrations = new ArrayList<>();
 224             selector = Selector.open();
 225         }
 226 
 227         // This returns immediately. So caller not allowed to send/receive
 228         // on connection.
 229 
 230         synchronized void register(AsyncEvent e) throws IOException {
 231             registrations.add(e);
 232             selector.wakeup();
 233         }
 234 
 235         synchronized void cancel(SocketChannel e) {
 236             SelectionKey key = e.keyFor(selector);
 237             if (key != null)
 238                 key.cancel();
 239             selector.wakeup();
 240         }
 241 
 242         void wakeupSelector() {
 243             selector.wakeup();
 244         }
 245 
 246         synchronized void shutdown() {
 247             closed = true;
 248             try {
 249                 selector.close();
 250             } catch (IOException ignored) { }
 251         }
 252 
 253         @Override
 254         public void run() {
 255             try {
 256                 while (!Thread.currentThread().isInterrupted()) {
 257                     synchronized (this) {
 258                         for (AsyncEvent exchange : registrations) {
 259                             SelectableChannel c = exchange.channel();
 260                             try {
 261                                 c.configureBlocking(false);
 262                                 SelectionKey key = c.keyFor(selector);
 263                                 SelectorAttachment sa;
 264                                 if (key == null) {
 265                                     sa = new SelectorAttachment(c, selector);
 266                                 } else {
 267                                     sa = (SelectorAttachment) key.attachment();
 268                                 }
 269                                 sa.register(exchange);
 270                             } catch (IOException e) {
 271                                 Log.logError("HttpClientImpl: " + e);
 272                                 c.close();
 273                                 // let the exchange deal with it
 274                                 handleEvent(exchange);
 275                             }
 276                         }
 277                         registrations.clear();
 278                     }
 279                     long timeval = getTimeoutValue();
 280                     long now = System.currentTimeMillis();
 281                     //debugPrint(selector);
 282                     int n = selector.select(timeval);
 283                     if (n == 0) {
 284                         signalTimeouts(now);
 285                         continue;
 286                     }
 287                     Set<SelectionKey> keys = selector.selectedKeys();
 288 
 289                     for (SelectionKey key : keys) {
 290                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
 291                         int eventsOccurred = key.readyOps();
 292                         sa.events(eventsOccurred).forEach(readyList::add);
 293                         sa.resetInterestOps(eventsOccurred);
 294                     }
 295                     selector.selectNow(); // complete cancellation
 296                     selector.selectedKeys().clear();
 297 
 298                     for (AsyncEvent exchange : readyList) {
 299                         if (exchange.blocking()) {
 300                             exchange.channel().configureBlocking(true);
 301                         }
 302                         executor.synchronize();
 303                         handleEvent(exchange); // will be delegated to executor
 304                     }
 305                     readyList.clear();
 306                 }
 307             } catch (Throwable e) {
 308                 if (!closed) {
 309                     // This terminates thread. So, better just print stack trace
 310                     String err = Utils.stackTrace(e);
 311                     Log.logError("HttpClientImpl: fatal error: " + err);
 312                 }
 313             } finally {
 314                 shutdown();
 315             }
 316         }
 317 
 318         void debugPrint(Selector selector) {
 319             System.err.println("Selector: debugprint start");
 320             Set<SelectionKey> keys = selector.keys();
 321             for (SelectionKey key : keys) {
 322                 SelectableChannel c = key.channel();
 323                 int ops = key.interestOps();
 324                 System.err.printf("selector chan:%s ops:%d\n", c, ops);
 325             }
 326             System.err.println("Selector: debugprint end");
 327         }
 328 
 329         void handleEvent(AsyncEvent e) {
 330             if (closed) {
 331                 e.abort();
 332             } else {
 333                 e.handle();
 334             }
 335         }
 336     }
 337 
 338     /**
 339      * Tracks multiple user level registrations associated with one NIO
 340      * registration (SelectionKey). In this implementation, registrations
 341      * are one-off and when an event is posted the registration is cancelled
 342      * until explicitly registered again.
 343      *
 344      * <p> No external synchronization required as this class is only used
 345      * by the SelectorManager thread. One of these objects required per
 346      * connection.
 347      */
 348     private static class SelectorAttachment {
 349         private final SelectableChannel chan;
 350         private final Selector selector;
 351         private final ArrayList<AsyncEvent> pending;
 352         private int interestOps;
 353 
 354         SelectorAttachment(SelectableChannel chan, Selector selector) {
 355             this.pending = new ArrayList<>();
 356             this.chan = chan;
 357             this.selector = selector;
 358         }
 359 
 360         void register(AsyncEvent e) throws ClosedChannelException {
 361             int newOps = e.interestOps();
 362             boolean reRegister = (interestOps & newOps) != newOps;
 363             interestOps |= newOps;
 364             pending.add(e);
 365             if (reRegister) {
 366                 // first time registration happens here also
 367                 chan.register(selector, interestOps, this);
 368             }
 369         }
 370 
 371         /**
 372          * Returns a Stream<AsyncEvents> containing only events that are
 373          * registered with the given {@code interestOps}.
 374          */
 375         Stream<AsyncEvent> events(int interestOps) {
 376             return pending.stream()
 377                     .filter(ev -> (ev.interestOps() & interestOps) != 0);
 378         }
 379 
 380         /**
 381          * Removes any events with the given {@code interestOps}, and if no
 382          * events remaining, cancels the associated SelectionKey.
 383          */
 384         void resetInterestOps(int interestOps) {
 385             int newOps = 0;
 386 
 387             Iterator<AsyncEvent> itr = pending.iterator();
 388             while (itr.hasNext()) {
 389                 AsyncEvent event = itr.next();
 390                 int evops = event.interestOps();
 391                 if (event.repeating()) {
 392                     newOps |= evops;
 393                     continue;
 394                 }
 395                 if ((evops & interestOps) != 0) {
 396                     itr.remove();
 397                 } else {
 398                     newOps |= evops;
 399                 }
 400             }
 401 
 402             this.interestOps = newOps;
 403             SelectionKey key = chan.keyFor(selector);
 404             if (newOps == 0) {
 405                 key.cancel();
 406             } else {
 407                 key.interestOps(newOps);
 408             }
 409         }
 410     }
 411 
 412     /**
 413      * Creates a HttpRequest associated with this group.
 414      *
 415      * @throws IllegalStateException
 416      *         if the group has been stopped
 417      */
 418     @Override
 419     public HttpRequestBuilderImpl request() {
 420         return new HttpRequestBuilderImpl(this, null);
 421     }
 422 
 423     /**
 424      * Creates a HttpRequest associated with this group.
 425      *
 426      * @throws IllegalStateException
 427      *         if the group has been stopped
 428      */
 429     @Override
 430     public HttpRequestBuilderImpl request(URI uri) {
 431         return new HttpRequestBuilderImpl(this, uri);
 432     }
 433 
 434     @Override
 435     public SSLContext sslContext() {
 436         Utils.checkNetPermission("getSSLContext");
 437         return sslContext;
 438     }
 439 
 440     @Override
 441     public Optional<SSLParameters> sslParameters() {
 442         return Optional.ofNullable(sslParams);
 443     }
 444 
 445     @Override
 446     public Optional<Authenticator> authenticator() {
 447         return Optional.ofNullable(authenticator);
 448     }
 449 
 450     @Override
 451     public ExecutorService executorService() {
 452         return executor.userExecutor();
 453     }
 454 
 455     ExecutorWrapper executorWrapper() {
 456         return executor;
 457     }
 458 
 459     @Override
 460     public boolean pipelining() {
 461         return this.pipelining;
 462     }
 463 
 464     ConnectionPool connectionPool() {
 465         return connections;
 466     }
 467 
 468     @Override
 469     public Redirect followRedirects() {
 470         return followRedirects;
 471     }
 472 
 473 
 474     @Override
 475     public Optional<CookieManager> cookieManager() {
 476         return Optional.ofNullable(cookieManager);
 477     }
 478 
 479     @Override
 480     public Optional<ProxySelector> proxy() {
 481         return Optional.ofNullable(this.proxySelector);
 482     }
 483 
 484     @Override
 485     public Version version() {
 486         return version;
 487     }
 488 
 489     //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>();
 490 
 491     boolean getHttp2Allowed() {
 492         return version.equals(Version.HTTP_2);
 493     }
 494 
 495     private void initFilters() {
 496         addFilter(AuthenticationFilter.class);
 497         addFilter(RedirectFilter.class);
 498     }
 499 
 500     private void addFilter(Class<? extends HeaderFilter> f) {
 501         filters.addFilter(f);
 502     }
 503 
 504     final List<HeaderFilter> filterChain() {
 505         return filters.getFilterChain();
 506     }
 507 
 508     // Timer controls. Timers are implemented through timed Selector.select()
 509     // calls.
 510     synchronized void registerTimer(TimeoutEvent event) {
 511         long elapse = event.timevalMillis();
 512         ListIterator<TimeoutEvent> iter = timeouts.listIterator();
 513         long listval = 0;
 514         event.delta = event.timeval; // in case list empty
 515         TimeoutEvent next;
 516         while (iter.hasNext()) {
 517             next = iter.next();
 518             listval += next.delta;
 519             if (elapse < listval) {
 520                 listval -= next.delta;
 521                 event.delta = elapse - listval;
 522                 next.delta -= event.delta;
 523                 iter.previous();
 524                 break;
 525             } else if (!iter.hasNext()) {
 526                 event.delta = event.timeval - listval;
 527             }
 528         }
 529         iter.add(event);
 530         selmgr.wakeupSelector();
 531     }
 532 
 533     private synchronized void signalTimeouts(long then) {
 534         if (timeouts.isEmpty()) {
 535             return;
 536         }
 537         long now = System.currentTimeMillis();
 538         long duration = now - then;
 539         ListIterator<TimeoutEvent> iter = timeouts.listIterator();
 540         TimeoutEvent event = iter.next();
 541         long delta = event.delta;
 542         if (duration < delta) {
 543             event.delta -= duration;
 544             return;
 545         }
 546         event.handle();
 547         iter.remove();
 548         while (iter.hasNext()) {
 549             event = iter.next();
 550             if (event.delta == 0) {
 551                 event.handle();
 552                 iter.remove();
 553             } else {
 554                 event.delta += delta;
 555                 break;
 556             }
 557         }
 558     }
 559 
 560     synchronized void cancelTimer(TimeoutEvent event) {
 561         ListIterator<TimeoutEvent> iter = timeouts.listIterator();
 562         while (iter.hasNext()) {
 563             TimeoutEvent ev = iter.next();
 564             if (event == ev) {
 565                 if (iter.hasNext()) {
 566                     // adjust
 567                     TimeoutEvent next = iter.next();
 568                     next.delta += ev.delta;
 569                     iter.previous();
 570                 }
 571                 iter.remove();
 572             }
 573         }
 574     }
 575 
 576     // used for the connection window
 577     int getReceiveBufferSize() {
 578         return Utils.getIntegerNetProperty(
 579                 "java.net.httpclient.connectionWindowSize", 256 * 1024
 580         );
 581     }
 582 
 583     // returns 0 meaning block forever, or a number of millis to block for
 584     private synchronized long getTimeoutValue() {
 585         if (timeouts.isEmpty()) {
 586             return 0;
 587         } else {
 588             return timeouts.get(0).delta;
 589         }
 590     }
 591 }