1 /*
   2  * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.Closeable;
  29 import java.io.IOException;
  30 import java.net.InetSocketAddress;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.SocketChannel;
  33 import java.util.Arrays;
  34 import java.util.IdentityHashMap;
  35 import java.util.List;
  36 import java.util.Map;
  37 import java.util.TreeMap;
  38 import java.util.concurrent.CompletableFuture;
  39 import java.util.concurrent.CompletionStage;
  40 import java.util.concurrent.ConcurrentLinkedDeque;
  41 import java.util.concurrent.Flow;
  42 import java.util.function.BiPredicate;
  43 import java.util.function.Predicate;
  44 import java.net.http.HttpClient;
  45 import java.net.http.HttpClient.Version;
  46 import java.net.http.HttpHeaders;
  47 import jdk.internal.net.http.common.Demand;
  48 import jdk.internal.net.http.common.FlowTube;
  49 import jdk.internal.net.http.common.Logger;
  50 import jdk.internal.net.http.common.SequentialScheduler;
  51 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
  52 import jdk.internal.net.http.common.Log;
  53 import jdk.internal.net.http.common.Utils;
  54 import static java.net.http.HttpClient.Version.HTTP_2;
  55 
  56 /**
  57  * Wraps socket channel layer and takes care of SSL also.
  58  *
  59  * Subtypes are:
  60  *      PlainHttpConnection: regular direct TCP connection to server
  61  *      PlainProxyConnection: plain text proxy connection
  62  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
  63  *      AsyncSSLConnection: TLS channel direct to server
  64  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
  65  */
  66 abstract class HttpConnection implements Closeable {
  67 
  68     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  69     final static Logger DEBUG_LOGGER = Utils.getDebugLogger(
  70             () -> "HttpConnection(SocketTube(?))", Utils.DEBUG);
  71 
  72     /** The address this connection is connected to. Could be a server or a proxy. */
  73     final InetSocketAddress address;
  74     private final HttpClientImpl client;
  75     private final TrailingOperations trailingOperations;
  76 
  77     HttpConnection(InetSocketAddress address, HttpClientImpl client) {
  78         this.address = address;
  79         this.client = client;
  80         trailingOperations = new TrailingOperations();
  81     }
  82 
  83     private static final class TrailingOperations {
  84         private final Map<CompletionStage<?>, Boolean> operations =
  85                 new IdentityHashMap<>();
  86         void add(CompletionStage<?> cf) {
  87             synchronized(operations) {
  88                 operations.put(cf, Boolean.TRUE);
  89                 cf.whenComplete((r,t)-> remove(cf));
  90             }
  91         }
  92         boolean remove(CompletionStage<?> cf) {
  93             synchronized(operations) {
  94                 return operations.remove(cf);
  95             }
  96         }
  97     }
  98 
  99     final void addTrailingOperation(CompletionStage<?> cf) {
 100         trailingOperations.add(cf);
 101     }
 102 
 103 //    final void removeTrailingOperation(CompletableFuture<?> cf) {
 104 //        trailingOperations.remove(cf);
 105 //    }
 106 
 107     final HttpClientImpl client() {
 108         return client;
 109     }
 110 
 111     /**
 112      * Initiates the connect phase.
 113      *
 114      * Returns a CompletableFuture that completes when the underlying
 115      * TCP connection has been established or an error occurs.
 116      */
 117     public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange);
 118 
 119     /**
 120      * Finishes the connection phase.
 121      *
 122      * Returns a CompletableFuture that completes when any additional,
 123      * type specific, setup has been done. Must be called after connectAsync. */
 124     public abstract CompletableFuture<Void> finishConnect();
 125 
 126     /** Tells whether, or not, this connection is connected to its destination. */
 127     abstract boolean connected();
 128 
 129     /** Tells whether, or not, this connection is secure ( over SSL ) */
 130     abstract boolean isSecure();
 131 
 132     /**
 133      * Tells whether, or not, this connection is proxied.
 134      * Returns true for tunnel connections, or clear connection to
 135      * any host through proxy.
 136      */
 137     abstract boolean isProxied();
 138 
 139     /** Tells whether, or not, this connection is open. */
 140     final boolean isOpen() {
 141         return channel().isOpen() &&
 142                 (connected() ? !getConnectionFlow().isFinished() : true);
 143     }
 144 
 145     interface HttpPublisher extends FlowTube.TubePublisher {
 146         void enqueue(List<ByteBuffer> buffers) throws IOException;
 147         void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
 148         void signalEnqueued() throws IOException;
 149     }
 150 
 151     /**
 152      * Returns the HTTP publisher associated with this connection.  May be null
 153      * if invoked before connecting.
 154      */
 155     abstract HttpPublisher publisher();
 156 
 157     // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS
 158     private static final Predicate<String> testRequiredHTTP2TLSVersion = proto ->
 159             proto.equals("TLSv1.2") || proto.equals("TLSv1.3");
 160 
 161    /**
 162     * Returns true if the given client's SSL parameter protocols contains at
 163     * least one TLS version that HTTP/2 requires.
 164     */
 165    private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) {
 166        String[] protos = client.sslParameters().getProtocols();
 167        if (protos != null) {
 168            return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent();
 169        } else {
 170            return false;
 171        }
 172    }
 173 
 174     /**
 175      * Factory for retrieving HttpConnections. A connection can be retrieved
 176      * from the connection pool, or a new one created if none available.
 177      *
 178      * The given {@code addr} is the ultimate destination. Any proxies,
 179      * etc, are determined from the request. Returns a concrete instance which
 180      * is one of the following:
 181      *      {@link PlainHttpConnection}
 182      *      {@link PlainTunnelingConnection}
 183      *
 184      * The returned connection, if not from the connection pool, must have its,
 185      * connect() or connectAsync() method invoked, which ( when it completes
 186      * successfully ) renders the connection usable for requests.
 187      */
 188     public static HttpConnection getConnection(InetSocketAddress addr,
 189                                                HttpClientImpl client,
 190                                                HttpRequestImpl request,
 191                                                Version version) {
 192         // The default proxy selector may select a proxy whose  address is
 193         // unresolved. We must resolve the address before connecting to it.
 194         InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
 195         HttpConnection c = null;
 196         boolean secure = request.secure();
 197         ConnectionPool pool = client.connectionPool();
 198 
 199         if (!secure) {
 200             c = pool.getConnection(false, addr, proxy);
 201             if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
 202                 final HttpConnection conn = c;
 203                 if (DEBUG_LOGGER.on())
 204                     DEBUG_LOGGER.log(conn.getConnectionFlow()
 205                                      + ": plain connection retrieved from HTTP/1.1 pool");
 206                 return c;
 207             } else {
 208                 return getPlainConnection(addr, proxy, request, client);
 209             }
 210         } else {  // secure
 211             if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
 212                 c = pool.getConnection(true, addr, proxy);
 213             }
 214             if (c != null && c.isOpen()) {
 215                 final HttpConnection conn = c;
 216                 if (DEBUG_LOGGER.on())
 217                     DEBUG_LOGGER.log(conn.getConnectionFlow()
 218                                      + ": SSL connection retrieved from HTTP/1.1 pool");
 219                 return c;
 220             } else {
 221                 String[] alpn = null;
 222                 if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
 223                     alpn = new String[] { "h2", "http/1.1" };
 224                 }
 225                 return getSSLConnection(addr, proxy, alpn, request, client);
 226             }
 227         }
 228     }
 229 
 230     private static HttpConnection getSSLConnection(InetSocketAddress addr,
 231                                                    InetSocketAddress proxy,
 232                                                    String[] alpn,
 233                                                    HttpRequestImpl request,
 234                                                    HttpClientImpl client) {
 235         if (proxy != null)
 236             return new AsyncSSLTunnelConnection(addr, client, alpn, proxy,
 237                                                 proxyTunnelHeaders(request));
 238         else
 239             return new AsyncSSLConnection(addr, client, alpn);
 240     }
 241 
 242     /**
 243      * This method is used to build a filter that will accept or
 244      * veto (header-name, value) tuple for transmission on the
 245      * wire.
 246      * The filter is applied to the headers when sending the headers
 247      * to the remote party.
 248      * Which tuple is accepted/vetoed depends on:
 249      * <pre>
 250      *    - whether the connection is a tunnel connection
 251      *      [talking to a server through a proxy tunnel]
 252      *    - whether the method is CONNECT
 253      *      [establishing a CONNECT tunnel through a proxy]
 254      *    - whether the request is using a proxy
 255      *      (and the connection is not a tunnel)
 256      *      [talking to a server through a proxy]
 257      *    - whether the request is a direct connection to
 258      *      a server (no tunnel, no proxy).
 259      * </pre>
 260      * @param request
 261      * @return
 262      */
 263     BiPredicate<String,String> headerFilter(HttpRequestImpl request) {
 264         if (isTunnel()) {
 265             // talking to a server through a proxy tunnel
 266             // don't send proxy-* headers to a plain server
 267             assert !request.isConnect();
 268             return Utils.NO_PROXY_HEADERS_FILTER;
 269         } else if (request.isConnect()) {
 270             // establishing a proxy tunnel
 271             // check for proxy tunnel disabled schemes
 272             // assert !this.isTunnel();
 273             assert request.proxy() == null;
 274             return Utils.PROXY_TUNNEL_FILTER;
 275         } else if (request.proxy() != null) {
 276             // talking to a server through a proxy (no tunnel)
 277             // check for proxy disabled schemes
 278             // assert !isTunnel() && !request.isConnect();
 279             return Utils.PROXY_FILTER;
 280         } else {
 281             // talking to a server directly (no tunnel, no proxy)
 282             // don't send proxy-* headers to a plain server
 283             // assert request.proxy() == null && !request.isConnect();
 284             return Utils.NO_PROXY_HEADERS_FILTER;
 285         }
 286     }
 287 
 288     // Composes a new immutable HttpHeaders that combines the
 289     // user and system header but only keeps those headers that
 290     // start with "proxy-"
 291     private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) {
 292         Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
 293         combined.putAll(request.getSystemHeadersBuilder().map());
 294         combined.putAll(request.headers().map()); // let user override system
 295 
 296         // keep only proxy-* - and also strip authorization headers
 297         // for disabled schemes
 298         return HttpHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER);
 299     }
 300 
 301     /* Returns either a plain HTTP connection or a plain tunnelling connection
 302      * for proxied WebSocket */
 303     private static HttpConnection getPlainConnection(InetSocketAddress addr,
 304                                                      InetSocketAddress proxy,
 305                                                      HttpRequestImpl request,
 306                                                      HttpClientImpl client) {
 307         if (request.isWebSocket() && proxy != null)
 308             return new PlainTunnelingConnection(addr, proxy, client,
 309                                                 proxyTunnelHeaders(request));
 310 
 311         if (proxy == null)
 312             return new PlainHttpConnection(addr, client);
 313         else
 314             return new PlainProxyConnection(proxy, client);
 315     }
 316 
 317     void closeOrReturnToCache(HttpHeaders hdrs) {
 318         if (hdrs == null) {
 319             // the connection was closed by server, eof
 320             Log.logTrace("Cannot return connection to pool: closing {0}", this);
 321             close();
 322             return;
 323         }
 324         HttpClientImpl client = client();
 325         if (client == null) {
 326             Log.logTrace("Client released: closing {0}", this);
 327             close();
 328             return;
 329         }
 330         ConnectionPool pool = client.connectionPool();
 331         boolean keepAlive = hdrs.firstValue("Connection")
 332                 .map((s) -> !s.equalsIgnoreCase("close"))
 333                 .orElse(true);
 334 
 335         if (keepAlive && isOpen()) {
 336             Log.logTrace("Returning connection to the pool: {0}", this);
 337             pool.returnToPool(this);
 338         } else {
 339             Log.logTrace("Closing connection (keepAlive={0}, isOpen={1}): {2}",
 340                     keepAlive, isOpen(), this);
 341             close();
 342         }
 343     }
 344 
 345     /* Tells whether or not this connection is a tunnel through a proxy */
 346     boolean isTunnel() { return false; }
 347 
 348     abstract SocketChannel channel();
 349 
 350     final InetSocketAddress address() {
 351         return address;
 352     }
 353 
 354     abstract ConnectionPool.CacheKey cacheKey();
 355 
 356     /**
 357      * Closes this connection, by returning the socket to its connection pool.
 358      */
 359     @Override
 360     public abstract void close();
 361 
 362     abstract FlowTube getConnectionFlow();
 363 
 364     /**
 365      * A publisher that makes it possible to publish (write) ordered (normal
 366      * priority) and unordered (high priority) buffers downstream.
 367      */
 368     final class PlainHttpPublisher implements HttpPublisher {
 369         final Object reading;
 370         PlainHttpPublisher() {
 371             this(new Object());
 372         }
 373         PlainHttpPublisher(Object readingLock) {
 374             this.reading = readingLock;
 375         }
 376         final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
 377         final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>();
 378         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
 379         volatile HttpWriteSubscription subscription;
 380         final SequentialScheduler writeScheduler =
 381                     new SequentialScheduler(this::flushTask);
 382         @Override
 383         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 384             synchronized (reading) {
 385                 //assert this.subscription == null;
 386                 //assert this.subscriber == null;
 387                 if (subscription == null) {
 388                     subscription = new HttpWriteSubscription();
 389                 }
 390                 this.subscriber = subscriber;
 391             }
 392             // TODO: should we do this in the flow?
 393             subscriber.onSubscribe(subscription);
 394             signal();
 395         }
 396 
 397         void flushTask(DeferredCompleter completer) {
 398             try {
 399                 HttpWriteSubscription sub = subscription;
 400                 if (sub != null) sub.flush();
 401             } finally {
 402                 completer.complete();
 403             }
 404         }
 405 
 406         void signal() {
 407             writeScheduler.runOrSchedule();
 408         }
 409 
 410         final class HttpWriteSubscription implements Flow.Subscription {
 411             final Demand demand = new Demand();
 412 
 413             @Override
 414             public void request(long n) {
 415                 if (n <= 0) throw new IllegalArgumentException("non-positive request");
 416                 demand.increase(n);
 417                 if (debug.on())
 418                     debug.log("HttpPublisher: got request of "  + n + " from "
 419                                + getConnectionFlow());
 420                 writeScheduler.runOrSchedule();
 421             }
 422 
 423             @Override
 424             public void cancel() {
 425                 if (debug.on())
 426                     debug.log("HttpPublisher: cancelled by " + getConnectionFlow());
 427             }
 428 
 429             private boolean isEmpty() {
 430                 return queue.isEmpty() && priority.isEmpty();
 431             }
 432 
 433             private List<ByteBuffer> poll() {
 434                 List<ByteBuffer> elem = priority.poll();
 435                 return elem == null ? queue.poll() : elem;
 436             }
 437 
 438             void flush() {
 439                 while (!isEmpty() && demand.tryDecrement()) {
 440                     List<ByteBuffer> elem = poll();
 441                     if (debug.on())
 442                         debug.log("HttpPublisher: sending "
 443                                     + Utils.remaining(elem) + " bytes ("
 444                                     + elem.size() + " buffers) to "
 445                                     + getConnectionFlow());
 446                     subscriber.onNext(elem);
 447                 }
 448             }
 449         }
 450 
 451         @Override
 452         public void enqueue(List<ByteBuffer> buffers) throws IOException {
 453             queue.add(buffers);
 454             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 455             debug.log("added %d bytes to the write queue", bytes);
 456         }
 457 
 458         @Override
 459         public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
 460             // Unordered frames are sent before existing frames.
 461             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 462             priority.add(buffers);
 463             debug.log("added %d bytes in the priority write queue", bytes);
 464         }
 465 
 466         @Override
 467         public void signalEnqueued() throws IOException {
 468             debug.log("signalling the publisher of the write queue");
 469             signal();
 470         }
 471     }
 472 
 473     String dbgTag;
 474     final String dbgString() {
 475         FlowTube flow = getConnectionFlow();
 476         String tag = dbgTag;
 477         if (tag == null && flow != null) {
 478             dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
 479         } else if (tag == null) {
 480             tag = this.getClass().getSimpleName() + "(?)";
 481         }
 482         return tag;
 483     }
 484 
 485     @Override
 486     public String toString() {
 487         return "HttpConnection: " + channel().toString();
 488     }
 489 }