1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  */
  24 package java.net.http;
  25 
  26 import java.io.IOException;
  27 import java.net.Authenticator;
  28 import java.net.CookieManager;
  29 import java.net.ProxySelector;
  30 import java.net.URI;
  31 import static java.net.http.Utils.BUFSIZE;
  32 import java.nio.ByteBuffer;
  33 import java.nio.channels.ClosedChannelException;
  34 import java.nio.channels.SelectableChannel;
  35 import java.nio.channels.SelectionKey;
  36 import static java.nio.channels.SelectionKey.OP_CONNECT;
  37 import static java.nio.channels.SelectionKey.OP_READ;
  38 import static java.nio.channels.SelectionKey.OP_WRITE;
  39 import java.nio.channels.Selector;
  40 import java.util.*;
  41 import java.util.stream.Stream;
  42 import java.util.concurrent.ExecutorService;
  43 import java.security.NoSuchAlgorithmException;
  44 import java.util.concurrent.Executors;
  45 import java.util.concurrent.ThreadFactory;
  46 import javax.net.ssl.SSLContext;
  47 import javax.net.ssl.SSLParameters;
  48 
  49 /**
  50  * Client implementation. Contains all configuration information and also
  51  * the selector manager thread which allows async events to be registered
  52  * and delivered when they occur. See AsyncEvent.
  53  */
  54 class HttpClientImpl extends HttpClient implements BufferHandler {
  55 
  56     private final CookieManager cookieManager;
  57     private final Redirect followRedirects;
  58     private final ProxySelector proxySelector;
  59     private final Authenticator authenticator;
  60     private final Version version;
  61     private boolean pipelining = false;
  62     private final ConnectionPool connections;
  63     private final ExecutorWrapper executor;
  64     // Security parameters
  65     private final SSLContext sslContext;
  66     private final SSLParameters sslParams;
  67     private final SelectorManager selmgr;
  68     private final FilterFactory filters;
  69     private final Http2ClientImpl client2;
  70     private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
  71     private final LinkedList<TimeoutEvent> timeouts;
  72 
  73     //@Override
  74     void debugPrint() {
  75         selmgr.debugPrint();
  76         client2.debugPrint();
  77     }
  78 
  79     public static HttpClientImpl create(HttpClientBuilderImpl builder) {
  80         HttpClientImpl impl = new HttpClientImpl(builder);
  81         impl.start();
  82         return impl;
  83     }
  84 
  85     private HttpClientImpl(HttpClientBuilderImpl builder) {
  86         if (builder.sslContext == null) {
  87             try {
  88                 sslContext = SSLContext.getDefault();
  89             } catch (NoSuchAlgorithmException ex) {
  90                 throw new InternalError(ex);
  91             }
  92         } else {
  93             sslContext = builder.sslContext;
  94         }
  95         ExecutorService ex = builder.executor;
  96         if (ex == null) {
  97             ex = Executors.newCachedThreadPool((r) -> {
  98                 Thread t = defaultFactory.newThread(r);
  99                 t.setDaemon(true);
 100                 return t;
 101             });
 102         } else {
 103             ex = builder.executor;
 104         }
 105         client2 = new Http2ClientImpl(this);
 106         executor = ExecutorWrapper.wrap(ex);
 107         cookieManager = builder.cookieManager;
 108         followRedirects = builder.followRedirects == null ?
 109                 Redirect.NEVER : builder.followRedirects;
 110         this.proxySelector = builder.proxy;
 111         authenticator = builder.authenticator;
 112         version = builder.version;
 113         sslParams = builder.sslParams;
 114         connections = new ConnectionPool();
 115         connections.start();
 116         timeouts = new LinkedList<>();
 117         try {
 118             selmgr = new SelectorManager();
 119         } catch (IOException e) {
 120             // unlikely
 121             throw new InternalError(e);
 122         }
 123         selmgr.setDaemon(true);
 124         selmgr.setName("HttpSelector");
 125         filters = new FilterFactory();
 126         initFilters();
 127     }
 128 
 129     private void start() {
 130         selmgr.start();
 131     }
 132 
 133     /**
 134      * Wait for activity on given exchange (assuming blocking = false).
 135      * It's a no-op if blocking = true. In particular, the following occurs
 136      * in the SelectorManager thread.
 137      *
 138      *  1) mark the connection non-blocking
 139      *  2) add to selector
 140      *  3) If selector fires for this exchange then
 141      *  4)   - mark connection as blocking
 142      *  5)   - call AsyncEvent.handle()
 143      *
 144      *  If exchange needs to block again, then call registerEvent() again
 145      */
 146     void registerEvent(AsyncEvent exchange) throws IOException {
 147         selmgr.register(exchange);
 148     }
 149 
 150     Http2ClientImpl client2() {
 151         return client2;
 152     }
 153 
 154     LinkedList<ByteBuffer> freelist = new LinkedList<>();
 155 
 156     @Override
 157     public synchronized ByteBuffer getBuffer() {
 158         if (freelist.isEmpty()) {
 159             return ByteBuffer.allocate(BUFSIZE);
 160         }
 161         return freelist.removeFirst();
 162     }
 163 
 164     @Override
 165     public synchronized void returnBuffer(ByteBuffer buffer) {
 166         buffer.clear();
 167         freelist.add(buffer);
 168     }
 169 
 170 
 171     // Main loop for this client's selector
 172 
 173     class SelectorManager extends Thread {
 174         final Selector selector;
 175         boolean closed;
 176 
 177         final List<AsyncEvent> readyList;
 178         final List<AsyncEvent> registrations;
 179 
 180         List<AsyncEvent> debugList;
 181 
 182         SelectorManager() throws IOException {
 183             readyList = new LinkedList<>();
 184             registrations = new LinkedList<>();
 185             debugList = new LinkedList<>();
 186             selector = Selector.open();
 187         }
 188 
 189         // This returns immediately. So caller not allowed to send/receive
 190         // on connection.
 191 
 192         synchronized void register(AsyncEvent e) throws IOException {
 193             registrations.add(e);
 194             selector.wakeup();
 195         }
 196 
 197         void wakeupSelector() {
 198             selector.wakeup();
 199         }
 200 
 201         synchronized void shutdown() {
 202             closed = true;
 203             try {
 204                 selector.close();
 205             } catch (IOException e) {}
 206         }
 207 
 208         private List<AsyncEvent> copy(List<AsyncEvent> list) {
 209             LinkedList<AsyncEvent> c = new LinkedList<>();
 210             for (AsyncEvent e : list) {
 211                 c.add(e);
 212             }
 213             return c;
 214         }
 215 
 216         synchronized void debugPrint() {
 217             System.err.println("Selecting on:");
 218             for (AsyncEvent e : debugList) {
 219                 System.err.println(opvals(e.interestOps()));
 220             }
 221         }
 222 
 223         String opvals(int i) {
 224             StringBuilder sb = new StringBuilder();
 225             if ((i & OP_READ) != 0)
 226                 sb.append("OP_READ ");
 227             if ((i & OP_CONNECT) != 0)
 228                 sb.append("OP_CONNECT ");
 229             if ((i & OP_WRITE) != 0)
 230                 sb.append("OP_WRITE ");
 231             return sb.toString();
 232         }
 233 
 234         @Override
 235         public void run() {
 236             try {
 237                 while (true) {
 238                     synchronized (this) {
 239                         debugList = copy(registrations);
 240                         for (AsyncEvent exchange : registrations) {
 241                             SelectableChannel c = exchange.channel();
 242                             try {
 243                                 c.configureBlocking(false);
 244                                 SelectionKey key = c.keyFor(selector);
 245                                 SelectorAttachment sa;
 246                                 if (key == null) {
 247                                     sa = new SelectorAttachment(c, selector);
 248                                 } else {
 249                                     sa = (SelectorAttachment)key.attachment();
 250                                 }
 251                                 sa.register(exchange);
 252                             } catch (IOException e) {
 253                                 Log.logError("HttpClientImpl: " + e);
 254                                 c.close();
 255                                 // let the exchange deal with it
 256                                 handleEvent(exchange);
 257                             }
 258                         }
 259                         registrations.clear();
 260                     }
 261                     long timeval = getTimeoutValue();
 262                     long now = System.currentTimeMillis();
 263                     int n = selector.select(timeval);
 264                     if (n == 0) {
 265                         signalTimeouts(now);
 266                         continue;
 267                     }
 268                     Set<SelectionKey> keys = selector.selectedKeys();
 269 
 270                     for (SelectionKey key : keys) {
 271                         SelectorAttachment sa = (SelectorAttachment)key.attachment();
 272                         int eventsOccurred = key.readyOps();
 273                         sa.events(eventsOccurred).forEach(readyList::add);
 274                         sa.resetInterestOps(eventsOccurred);
 275                     }
 276                     selector.selectNow(); // complete cancellation
 277                     selector.selectedKeys().clear();
 278 
 279                     for (AsyncEvent exchange : readyList) {
 280                         if (exchange instanceof AsyncEvent.Blocking) {
 281                             exchange.channel().configureBlocking(true);
 282                         } else {
 283                             assert exchange instanceof AsyncEvent.NonBlocking;
 284                         }
 285                         executor.synchronize();
 286                         handleEvent(exchange); // will be delegated to executor
 287                     }
 288                     readyList.clear();
 289                 }
 290             } catch (Throwable e) {
 291                 if (!closed) {
 292                     System.err.println("HttpClientImpl terminating on error");
 293                     // This terminates thread. So, better just print stack trace
 294                     String err = Utils.stackTrace(e);
 295                     Log.logError("HttpClientImpl: fatal error: " + err);
 296                 }
 297             }
 298         }
 299 
 300         void handleEvent(AsyncEvent e) {
 301             if (closed) {
 302                 e.abort();
 303             } else {
 304                 e.handle();
 305             }
 306         }
 307     }
 308 
 309     /**
 310      * Tracks multiple user level registrations associated with one NIO
 311      * registration (SelectionKey). In this implementation, registrations
 312      * are one-off and when an event is posted the registration is cancelled
 313      * until explicitly registered again.
 314      *
 315      * <p> No external synchronization required as this class is only used
 316      * by the SelectorManager thread. One of these objects required per
 317      * connection.
 318      */
 319     private static class SelectorAttachment {
 320         private final SelectableChannel chan;
 321         private final Selector selector;
 322         private final ArrayList<AsyncEvent> pending;
 323         private int interestops;
 324 
 325         SelectorAttachment(SelectableChannel chan, Selector selector) {
 326             this.pending = new ArrayList<>();
 327             this.chan = chan;
 328             this.selector = selector;
 329         }
 330 
 331         void register(AsyncEvent e) throws ClosedChannelException {
 332             int newops = e.interestOps();
 333             boolean reRegister = (interestops & newops) != newops;
 334             interestops |= newops;
 335             pending.add(e);
 336             if (reRegister) {
 337                 // first time registration happens here also
 338                 chan.register(selector, interestops, this);
 339             }
 340         }
 341 
 342         int interestOps() {
 343             return interestops;
 344         }
 345 
 346         /**
 347          * Returns an Iterator<AsyncEvents> containing only events that are
 348          * registered with the given {@code interestop}.
 349          */
 350         Stream<AsyncEvent> events(int interestop) {
 351             return pending.stream()
 352                           .filter(ev -> (ev.interestOps() & interestop) != 0);
 353         }
 354 
 355         /**
 356          * Removes any events with the given {@code interestop}, and if no
 357          * events remaining, cancels the associated SelectionKey.
 358          */
 359         void resetInterestOps(int interestop) {
 360             int newops = 0;
 361 
 362             Iterator<AsyncEvent> itr = pending.iterator();
 363             while (itr.hasNext()) {
 364                 AsyncEvent event = itr.next();
 365                 int evops = event.interestOps();
 366                 if ((evops & interestop) != 0) {
 367                     itr.remove();
 368                 } else {
 369                     newops |= evops;
 370                 }
 371             }
 372 
 373             interestops = newops;
 374             SelectionKey key = chan.keyFor(selector);
 375             if (newops == 0) {
 376                 key.cancel();
 377             } else {
 378                 key.interestOps(newops);
 379             }
 380         }
 381     }
 382 
 383     /**
 384      * Creates a HttpRequest associated with this group.
 385      *
 386      * @throws IllegalStateException if the group has been stopped
 387      */
 388     @Override
 389     public HttpRequestBuilderImpl request() {
 390         return new HttpRequestBuilderImpl(this, null);
 391     }
 392 
 393     /**
 394      * Creates a HttpRequest associated with this group.
 395      *
 396      * @throws IllegalStateException if the group has been stopped
 397      */
 398     @Override
 399     public HttpRequestBuilderImpl request(URI uri) {
 400         return new HttpRequestBuilderImpl(this, uri);
 401     }
 402 
 403     @Override
 404     public SSLContext sslContext() {
 405         Utils.checkNetPermission("getSSLContext");
 406         return sslContext;
 407     }
 408 
 409     @Override
 410     public Optional<SSLParameters> sslParameters() {
 411         return Optional.ofNullable(sslParams);
 412     }
 413 
 414     @Override
 415     public Optional<Authenticator> authenticator() {
 416         return Optional.ofNullable(authenticator);
 417     }
 418 
 419     @Override
 420     public ExecutorService executorService() {
 421         return executor.userExecutor();
 422     }
 423 
 424     ExecutorWrapper executorWrapper() {
 425         return executor;
 426     }
 427 
 428     @Override
 429     public boolean pipelining() {
 430         return this.pipelining;
 431     }
 432 
 433     ConnectionPool connectionPool() {
 434         return connections;
 435     }
 436 
 437     @Override
 438     public Redirect followRedirects() {
 439         return followRedirects;
 440     }
 441 
 442 
 443     @Override
 444     public Optional<CookieManager> cookieManager() {
 445         return Optional.ofNullable(cookieManager);
 446     }
 447 
 448     @Override
 449     public Optional<ProxySelector> proxy() {
 450         return Optional.ofNullable(this.proxySelector);
 451     }
 452 
 453     @Override
 454     public Version version() {
 455         return version;
 456     }
 457 
 458     //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>();
 459 
 460     boolean getHttp2Allowed() {
 461         return version.equals(Version.HTTP_2);
 462     }
 463 
 464     //void setHttp2NotSupported(String host) {
 465         //http2NotSupported.put(host, false);
 466     //}
 467 
 468     final void initFilters() {
 469         addFilter(AuthenticationFilter.class);
 470         addFilter(RedirectFilter.class);
 471     }
 472 
 473     final void addFilter(Class<? extends HeaderFilter> f) {
 474         filters.addFilter(f);
 475     }
 476 
 477     final List<HeaderFilter> filterChain() {
 478         return filters.getFilterChain();
 479     }
 480 
 481     // Timer controls. Timers are implemented through timed Selector.select()
 482     // calls.
 483     synchronized void registerTimer(TimeoutEvent event) {
 484         long elapse = event.timevalMillis();
 485         ListIterator<TimeoutEvent> iter = timeouts.listIterator();
 486         long listval = 0;
 487         event.delta = event.timeval; // in case list empty
 488         TimeoutEvent next;
 489         while (iter.hasNext()) {
 490             next = iter.next();
 491             listval += next.delta;
 492             if (elapse < listval) {
 493                 listval -= next.delta;
 494                 event.delta = elapse - listval;
 495                 next.delta -= event.delta;
 496                 iter.previous();
 497                 break;
 498             } else if (!iter.hasNext()) {
 499                 event.delta = event.timeval - listval ;
 500             }
 501         }
 502         iter.add(event);
 503         //debugPrintList("register");
 504         selmgr.wakeupSelector();
 505     }
 506 
 507     void debugPrintList(String s) {
 508         System.err.printf("%s: {", s);
 509         for (TimeoutEvent e : timeouts) {
 510             System.err.printf("(%d,%d) ", e.delta, e.timeval);
 511         }
 512         System.err.println("}");
 513     }
 514 
 515     synchronized void signalTimeouts(long then) {
 516         if (timeouts.isEmpty()) {
 517             return;
 518         }
 519         long now = System.currentTimeMillis();
 520         long duration = now - then;
 521         ListIterator<TimeoutEvent> iter = timeouts.listIterator();
 522         TimeoutEvent event = iter.next();
 523         long delta = event.delta;
 524         if (duration < delta) {
 525             event.delta -= duration;
 526             return;
 527         }
 528         event.handle();
 529         iter.remove();
 530         while (iter.hasNext()) {
 531             event = iter.next();
 532             if (event.delta == 0) {
 533                 event.handle();
 534                 iter.remove();
 535             } else {
 536                 event.delta += delta;
 537                 break;
 538             }
 539         }
 540         //debugPrintList("signalTimeouts");
 541     }
 542 
 543     synchronized void cancelTimer(TimeoutEvent event) {
 544         ListIterator<TimeoutEvent> iter = timeouts.listIterator();
 545         while (iter.hasNext()) {
 546             TimeoutEvent ev = iter.next();
 547             if (event == ev) {
 548                 if (iter.hasNext()) {
 549                     // adjust
 550                     TimeoutEvent next = iter.next();
 551                     next.delta += ev.delta;
 552                     iter.previous();
 553                 }
 554                 iter.remove();
 555             }
 556         }
 557     }
 558 
 559     // used for the connection window
 560     int getReceiveBufferSize() {
 561         return Utils.getIntegerNetProperty(
 562                 "sun.net.httpclient.connectionWindowSize", 256 * 1024
 563         );
 564     }
 565 
 566     // returns 0 meaning block forever, or a number of millis to block for
 567     synchronized long getTimeoutValue() {
 568         if (timeouts.isEmpty()) {
 569             return 0;
 570         } else {
 571             return timeouts.get(0).delta;
 572         }
 573     }
 574 }