1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import javax.net.ssl.SSLContext;
  29 import javax.net.ssl.SSLParameters;
  30 import java.io.IOException;
  31 import java.lang.ref.WeakReference;
  32 import java.net.Authenticator;
  33 import java.net.CookieManager;
  34 import java.net.ProxySelector;
  35 import java.net.URI;
  36 import java.nio.channels.ClosedChannelException;
  37 import java.nio.channels.SelectableChannel;
  38 import java.nio.channels.SelectionKey;
  39 import java.nio.channels.Selector;
  40 import java.nio.channels.SocketChannel;
  41 import java.security.NoSuchAlgorithmException;
  42 import java.time.Instant;
  43 import java.time.temporal.ChronoUnit;
  44 import java.util.ArrayList;
  45 import java.util.Iterator;
  46 import java.util.List;
  47 import java.util.Optional;
  48 import java.util.Set;
  49 import java.util.TreeSet;
  50 import java.util.concurrent.CompletableFuture;
  51 import java.util.concurrent.Executor;
  52 import java.util.concurrent.Executors;
  53 import java.util.concurrent.ThreadFactory;
  54 import java.util.stream.Stream;
  55 import jdk.incubator.http.internal.common.Log;
  56 import jdk.incubator.http.internal.common.Utils;
  57 import jdk.incubator.http.internal.websocket.BuilderImpl;
  58 
  59 /**
  60  * Client implementation. Contains all configuration information and also
  61  * the selector manager thread which allows async events to be registered
  62  * and delivered when they occur. See AsyncEvent.
  63  */
  64 class HttpClientImpl extends HttpClient {
  65 
  66     // Define the default factory as a static inner class
  67     // that embeds all the necessary logic to avoid
  68     // the risk of using a lambda that might keep a reference on the
  69     // HttpClient instance from which it was created (helps with
  70     // heapdump analysis).
  71     private static final class DefaultThreadFactory implements ThreadFactory {
  72         private DefaultThreadFactory() {}
  73         @Override
  74         public Thread newThread(Runnable r) {
  75             Thread t = new Thread(null, r, "HttpClient_worker", 0, true);
  76             t.setDaemon(true);
  77             return t;
  78         }
  79         static final ThreadFactory INSTANCE = new DefaultThreadFactory();
  80     }
  81 
  82     private final CookieManager cookieManager;
  83     private final Redirect followRedirects;
  84     private final ProxySelector proxySelector;
  85     private final Authenticator authenticator;
  86     private final Version version;
  87     private final ConnectionPool connections;
  88     private final Executor executor;
  89     // Security parameters
  90     private final SSLContext sslContext;
  91     private final SSLParameters sslParams;
  92     private final SelectorManager selmgr;
  93     private final FilterFactory filters;
  94     private final Http2ClientImpl client2;
  95 
  96     /** A Set of, deadline first, ordered timeout events. */
  97     private final TreeSet<TimeoutEvent> timeouts;
  98 
  99     public static HttpClientImpl create(HttpClientBuilderImpl builder) {
 100         HttpClientImpl impl = new HttpClientImpl(builder);
 101         impl.start();
 102         return impl;
 103     }
 104 
 105     private HttpClientImpl(HttpClientBuilderImpl builder) {
 106         if (builder.sslContext == null) {
 107             try {
 108                 sslContext = SSLContext.getDefault();
 109             } catch (NoSuchAlgorithmException ex) {
 110                 throw new InternalError(ex);
 111             }
 112         } else {
 113             sslContext = builder.sslContext;
 114         }
 115         Executor ex = builder.executor;
 116         if (ex == null) {
 117             ex = Executors.newCachedThreadPool(DefaultThreadFactory.INSTANCE);
 118         } else {
 119             ex = builder.executor;
 120         }
 121         client2 = new Http2ClientImpl(this);
 122         executor = ex;
 123         cookieManager = builder.cookieManager;
 124         followRedirects = builder.followRedirects == null ?
 125                 Redirect.NEVER : builder.followRedirects;
 126         this.proxySelector = builder.proxy;
 127         authenticator = builder.authenticator;
 128         if (builder.version == null) {
 129             version = HttpClient.Version.HTTP_2;
 130         } else {
 131             version = builder.version;
 132         }
 133         if (builder.sslParams == null) {
 134             sslParams = getDefaultParams(sslContext);
 135         } else {
 136             sslParams = builder.sslParams;
 137         }
 138         connections = new ConnectionPool();
 139         connections.start();
 140         timeouts = new TreeSet<>();
 141         try {
 142             selmgr = new SelectorManager(this);
 143         } catch (IOException e) {
 144             // unlikely
 145             throw new InternalError(e);
 146         }
 147         selmgr.setDaemon(true);
 148         filters = new FilterFactory();
 149         initFilters();
 150     }
 151 
 152     private void start() {
 153         selmgr.start();
 154     }
 155 
 156     private static SSLParameters getDefaultParams(SSLContext ctx) {
 157         SSLParameters params = ctx.getSupportedSSLParameters();
 158         params.setProtocols(new String[]{"TLSv1.2"});
 159         return params;
 160     }
 161 
 162     /**
 163      * Wait for activity on given exchange (assuming blocking = false).
 164      * It's a no-op if blocking = true. In particular, the following occurs
 165      * in the SelectorManager thread.
 166      *
 167      *  1) mark the connection non-blocking
 168      *  2) add to selector
 169      *  3) If selector fires for this exchange then
 170      *  4)   - mark connection as blocking
 171      *  5)   - call AsyncEvent.handle()
 172      *
 173      * If exchange needs to block again, then call registerEvent() again
 174      */
 175     void registerEvent(AsyncEvent exchange) throws IOException {
 176         selmgr.register(exchange);
 177     }
 178 
 179     /**
 180      * Only used from RawChannel to disconnect the channel from
 181      * the selector
 182      */
 183     void cancelRegistration(SocketChannel s) {
 184         selmgr.cancel(s);
 185     }
 186 
 187 
 188     Http2ClientImpl client2() {
 189         return client2;
 190     }
 191 
 192     /*
 193     @Override
 194     public ByteBuffer getBuffer() {
 195         return pool.getBuffer();
 196     }
 197 
 198     // SSL buffers are larger. Manage separately
 199 
 200     int size = 16 * 1024;
 201 
 202     ByteBuffer getSSLBuffer() {
 203         return ByteBuffer.allocate(size);
 204     }
 205 
 206     /**
 207      * Return a new buffer that's a bit bigger than the given one
 208      *
 209      * @param buf
 210      * @return
 211      *
 212     ByteBuffer reallocSSLBuffer(ByteBuffer buf) {
 213         size = buf.capacity() * 12 / 10; // 20% bigger
 214         return ByteBuffer.allocate(size);
 215     }
 216 
 217     synchronized void returnSSLBuffer(ByteBuffer buf) {
 218         if (buf.capacity() >= size)
 219            sslBuffers.add(0, buf);
 220     }
 221 
 222     @Override
 223     public void returnBuffer(ByteBuffer buffer) {
 224         pool.returnBuffer(buffer);
 225     }
 226     */
 227 
 228     @Override
 229     public <T> HttpResponse<T>
 230     send(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
 231         throws IOException, InterruptedException
 232     {
 233         MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
 234         return mex.response();
 235     }
 236 
 237     @Override
 238     public <T> CompletableFuture<HttpResponse<T>>
 239     sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
 240     {
 241         MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
 242         return mex.responseAsync()
 243                   .thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b);
 244     }
 245 
 246     @Override
 247     public <U, T> CompletableFuture<U>
 248     sendAsync(HttpRequest req, HttpResponse.MultiProcessor<U, T> responseHandler) {
 249         MultiExchange<U,T> mex = new MultiExchange<>(req, this, responseHandler);
 250         return mex.multiResponseAsync();
 251     }
 252 
 253     // new impl. Should get rid of above
 254     /*
 255     static class BufferPool implements BufferHandler {
 256 
 257         final LinkedList<ByteBuffer> freelist = new LinkedList<>();
 258 
 259         @Override
 260         public synchronized ByteBuffer getBuffer() {
 261             ByteBuffer buf;
 262 
 263             while (!freelist.isEmpty()) {
 264                 buf = freelist.removeFirst();
 265                 buf.clear();
 266                 return buf;
 267             }
 268             return ByteBuffer.allocate(BUFSIZE);
 269         }
 270 
 271         @Override
 272         public synchronized void returnBuffer(ByteBuffer buffer) {
 273             assert buffer.capacity() > 0;
 274             freelist.add(buffer);
 275         }
 276     }
 277 
 278     static BufferPool pool = new BufferPool();
 279 
 280     static BufferHandler pool() {
 281         return pool;
 282     }
 283 */
 284     // Main loop for this client's selector
 285     private final static class SelectorManager extends Thread {
 286 
 287         private static final long NODEADLINE = 3000L;
 288         private final Selector selector;
 289         private volatile boolean closed;
 290         private final List<AsyncEvent> readyList;
 291         private final List<AsyncEvent> registrations;
 292 
 293         // Uses a weak reference to the HttpClient owning this
 294         // selector: a strong reference prevents its garbage
 295         // collection while the thread is running.
 296         // We want the thread to exit gracefully when the
 297         // HttpClient that owns it gets GC'ed.
 298         WeakReference<HttpClientImpl> ownerRef;
 299 
 300         SelectorManager(HttpClientImpl ref) throws IOException {
 301             super(null, null, "SelectorManager", 0, false);
 302             ownerRef = new WeakReference<>(ref);
 303             readyList = new ArrayList<>();
 304             registrations = new ArrayList<>();
 305             selector = Selector.open();
 306         }
 307 
 308         // This returns immediately. So caller not allowed to send/receive
 309         // on connection.
 310 
 311         synchronized void register(AsyncEvent e) throws IOException {
 312             registrations.add(e);
 313             selector.wakeup();
 314         }
 315 
 316         synchronized void cancel(SocketChannel e) {
 317             SelectionKey key = e.keyFor(selector);
 318             if (key != null) {
 319                 key.cancel();
 320             }
 321             selector.wakeup();
 322         }
 323 
 324         void wakeupSelector() {
 325             selector.wakeup();
 326         }
 327 
 328         synchronized void shutdown() {
 329             closed = true;
 330             try {
 331                 selector.close();
 332             } catch (IOException ignored) { }
 333         }
 334 
 335         @Override
 336         public void run() {
 337             try {
 338                 while (!Thread.currentThread().isInterrupted()) {
 339                     HttpClientImpl client;
 340                     synchronized (this) {
 341                         for (AsyncEvent exchange : registrations) {
 342                             SelectableChannel c = exchange.channel();
 343                             try {
 344                                 c.configureBlocking(false);
 345                                 SelectionKey key = c.keyFor(selector);
 346                                 SelectorAttachment sa;
 347                                 if (key == null || !key.isValid()) {
 348                                     if (key != null) {
 349                                         // key is canceled.
 350                                         // invoke selectNow() to purge it
 351                                         // before registering the new event.
 352                                         selector.selectNow();
 353                                     }
 354                                     sa = new SelectorAttachment(c, selector);
 355                                 } else {
 356                                     sa = (SelectorAttachment) key.attachment();
 357                                 }
 358                                 sa.register(exchange);
 359                             } catch (IOException e) {
 360                                 Log.logError("HttpClientImpl: " + e);
 361                                 c.close();
 362                                 // let the exchange deal with it
 363                                 handleEvent(exchange);
 364                             }
 365                         }
 366                         registrations.clear();
 367                     }
 368 
 369                     // Check whether client is still alive, and if not,
 370                     // gracefully stop this thread
 371                     if ((client = ownerRef.get()) == null) {
 372                         Log.logTrace("HttpClient no longer referenced. Exiting...");
 373                         return;
 374                     }
 375                     long millis = client.purgeTimeoutsAndReturnNextDeadline();
 376                     client = null; // don't hold onto the client ref
 377 
 378                     //debugPrint(selector);
 379                     // Don't wait for ever as it might prevent the thread to
 380                     // stop gracefully. millis will be 0 if no deadline was found.
 381                     int n = selector.select(millis == 0 ? NODEADLINE : millis);
 382                     if (n == 0) {
 383                         // Check whether client is still alive, and if not,
 384                         // gracefully stop this thread
 385                         if ((client = ownerRef.get()) == null) {
 386                             Log.logTrace("HttpClient no longer referenced. Exiting...");
 387                             return;
 388                         }
 389                         client.purgeTimeoutsAndReturnNextDeadline();
 390                         client = null; // don't hold onto the client ref
 391                         continue;
 392                     }
 393                     Set<SelectionKey> keys = selector.selectedKeys();
 394 
 395                     for (SelectionKey key : keys) {
 396                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
 397                         int eventsOccurred = key.readyOps();
 398                         sa.events(eventsOccurred).forEach(readyList::add);
 399                         sa.resetInterestOps(eventsOccurred);
 400                     }
 401                     selector.selectNow(); // complete cancellation
 402                     selector.selectedKeys().clear();
 403 
 404                     for (AsyncEvent exchange : readyList) {
 405                         if (exchange.blocking()) {
 406                             exchange.channel().configureBlocking(true);
 407                         }
 408                         handleEvent(exchange); // will be delegated to executor
 409                     }
 410                     readyList.clear();
 411                 }
 412             } catch (Throwable e) {
 413                 if (!closed) {
 414                     // This terminates thread. So, better just print stack trace
 415                     String err = Utils.stackTrace(e);
 416                     Log.logError("HttpClientImpl: fatal error: " + err);
 417                 }
 418             } finally {
 419                 shutdown();
 420             }
 421         }
 422 
 423         void debugPrint(Selector selector) {
 424             System.err.println("Selector: debugprint start");
 425             Set<SelectionKey> keys = selector.keys();
 426             for (SelectionKey key : keys) {
 427                 SelectableChannel c = key.channel();
 428                 int ops = key.interestOps();
 429                 System.err.printf("selector chan:%s ops:%d\n", c, ops);
 430             }
 431             System.err.println("Selector: debugprint end");
 432         }
 433 
 434         void handleEvent(AsyncEvent e) {
 435             if (closed) {
 436                 e.abort();
 437             } else {
 438                 e.handle();
 439             }
 440         }
 441     }
 442 
 443     /**
 444      * Tracks multiple user level registrations associated with one NIO
 445      * registration (SelectionKey). In this implementation, registrations
 446      * are one-off and when an event is posted the registration is cancelled
 447      * until explicitly registered again.
 448      *
 449      * <p> No external synchronization required as this class is only used
 450      * by the SelectorManager thread. One of these objects required per
 451      * connection.
 452      */
 453     private static class SelectorAttachment {
 454         private final SelectableChannel chan;
 455         private final Selector selector;
 456         private final ArrayList<AsyncEvent> pending;
 457         private int interestOps;
 458 
 459         SelectorAttachment(SelectableChannel chan, Selector selector) {
 460             this.pending = new ArrayList<>();
 461             this.chan = chan;
 462             this.selector = selector;
 463         }
 464 
 465         void register(AsyncEvent e) throws ClosedChannelException {
 466             int newOps = e.interestOps();
 467             boolean reRegister = (interestOps & newOps) != newOps;
 468             interestOps |= newOps;
 469             pending.add(e);
 470             if (reRegister) {
 471                 // first time registration happens here also
 472                 chan.register(selector, interestOps, this);
 473             }
 474         }
 475 
 476         /**
 477          * Returns a Stream<AsyncEvents> containing only events that are
 478          * registered with the given {@code interestOps}.
 479          */
 480         Stream<AsyncEvent> events(int interestOps) {
 481             return pending.stream()
 482                     .filter(ev -> (ev.interestOps() & interestOps) != 0);
 483         }
 484 
 485         /**
 486          * Removes any events with the given {@code interestOps}, and if no
 487          * events remaining, cancels the associated SelectionKey.
 488          */
 489         void resetInterestOps(int interestOps) {
 490             int newOps = 0;
 491 
 492             Iterator<AsyncEvent> itr = pending.iterator();
 493             while (itr.hasNext()) {
 494                 AsyncEvent event = itr.next();
 495                 int evops = event.interestOps();
 496                 if (event.repeating()) {
 497                     newOps |= evops;
 498                     continue;
 499                 }
 500                 if ((evops & interestOps) != 0) {
 501                     itr.remove();
 502                 } else {
 503                     newOps |= evops;
 504                 }
 505             }
 506 
 507             this.interestOps = newOps;
 508             SelectionKey key = chan.keyFor(selector);
 509             if (newOps == 0) {
 510                 key.cancel();
 511             } else {
 512                 key.interestOps(newOps);
 513             }
 514         }
 515     }
 516 
 517     @Override
 518     public SSLContext sslContext() {
 519         Utils.checkNetPermission("getSSLContext");
 520         return sslContext;
 521     }
 522 
 523     @Override
 524     public Optional<SSLParameters> sslParameters() {
 525         return Optional.ofNullable(sslParams);
 526     }
 527 
 528     @Override
 529     public Optional<Authenticator> authenticator() {
 530         return Optional.ofNullable(authenticator);
 531     }
 532 
 533     @Override
 534     public Executor executor() {
 535         return executor;
 536     }
 537 
 538     ConnectionPool connectionPool() {
 539         return connections;
 540     }
 541 
 542     @Override
 543     public Redirect followRedirects() {
 544         return followRedirects;
 545     }
 546 
 547 
 548     @Override
 549     public Optional<CookieManager> cookieManager() {
 550         return Optional.ofNullable(cookieManager);
 551     }
 552 
 553     @Override
 554     public Optional<ProxySelector> proxy() {
 555         return Optional.ofNullable(this.proxySelector);
 556     }
 557 
 558     @Override
 559     public WebSocket.Builder newWebSocketBuilder(URI uri,
 560                                                  WebSocket.Listener listener) {
 561         return new BuilderImpl(this, uri, listener);
 562     }
 563 
 564     @Override
 565     public Version version() {
 566         return version;
 567     }
 568 
 569     //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>();
 570 
 571     boolean getHttp2Allowed() {
 572         return version.equals(Version.HTTP_2);
 573     }
 574 
 575     private void initFilters() {
 576         addFilter(AuthenticationFilter.class);
 577         addFilter(RedirectFilter.class);
 578         if (this.cookieManager != null) {
 579             addFilter(CookieFilter.class);
 580         }
 581     }
 582 
 583     private void addFilter(Class<? extends HeaderFilter> f) {
 584         filters.addFilter(f);
 585     }
 586 
 587     final List<HeaderFilter> filterChain() {
 588         return filters.getFilterChain();
 589     }
 590 
 591     // Timer controls.
 592     // Timers are implemented through timed Selector.select() calls.
 593 
 594     synchronized void registerTimer(TimeoutEvent event) {
 595         Log.logTrace("Registering timer {0}", event);
 596         timeouts.add(event);
 597         selmgr.wakeupSelector();
 598     }
 599 
 600     synchronized void cancelTimer(TimeoutEvent event) {
 601         Log.logTrace("Canceling timer {0}", event);
 602         timeouts.remove(event);
 603     }
 604 
 605     /**
 606      * Purges ( handles ) timer events that have passed their deadline, and
 607      * returns the amount of time, in milliseconds, until the next earliest
 608      * event. A return value of 0 means that there are no events.
 609      */
 610     private long purgeTimeoutsAndReturnNextDeadline() {
 611         long diff = 0L;
 612         List<TimeoutEvent> toHandle = null;
 613         int remaining = 0;
 614         // enter critical section to retrieve the timeout event to handle
 615         synchronized(this) {
 616             if (timeouts.isEmpty()) return 0L;
 617 
 618             Instant now = Instant.now();
 619             Iterator<TimeoutEvent> itr = timeouts.iterator();
 620             while (itr.hasNext()) {
 621                 TimeoutEvent event = itr.next();
 622                 diff = now.until(event.deadline(), ChronoUnit.MILLIS);
 623                 if (diff <= 0) {
 624                     itr.remove();
 625                     toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
 626                     toHandle.add(event);
 627                 } else {
 628                     break;
 629                 }
 630             }
 631             remaining = timeouts.size();
 632         }
 633 
 634         // can be useful for debugging
 635         if (toHandle != null && Log.trace()) {
 636             Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
 637                     + (toHandle == null ? 0 : toHandle.size()) + " events, "
 638                     + "remaining " + remaining
 639                     + ", next deadline: " + (diff < 0 ? 0L : diff));
 640         }
 641 
 642         // handle timeout events out of critical section
 643         if (toHandle != null) {
 644             Throwable failed = null;
 645             for (TimeoutEvent event : toHandle) {
 646                 try {
 647                    Log.logTrace("Firing timer {0}", event);
 648                    event.handle();
 649                 } catch (Error | RuntimeException e) {
 650                     // Not expected. Handle remaining events then throw...
 651                     // If e is an OOME or SOE it might simply trigger a new
 652                     // error from here - but in this case there's not much we
 653                     // could do anyway. Just let it flow...
 654                     if (failed == null) failed = e;
 655                     else failed.addSuppressed(e);
 656                     Log.logTrace("Failed to handle event {0}: {1}", event, e);
 657                 }
 658             }
 659             if (failed instanceof Error) throw (Error) failed;
 660             if (failed instanceof RuntimeException) throw (RuntimeException) failed;
 661         }
 662 
 663         // return time to wait until next event. 0L if there's no more events.
 664         return diff < 0 ? 0L : diff;
 665     }
 666 
 667     // used for the connection window
 668     int getReceiveBufferSize() {
 669         return Utils.getIntegerNetProperty(
 670                 "jdk.httpclient.connectionWindowSize", 256 * 1024
 671         );
 672     }
 673 }