1 /*
   2  * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.Closeable;
  29 import java.io.IOException;
  30 import java.net.InetSocketAddress;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.SocketChannel;
  33 import java.util.Arrays;
  34 import java.util.IdentityHashMap;
  35 import java.util.List;
  36 import java.util.Map;
  37 import java.util.TreeMap;
  38 import java.util.concurrent.CompletableFuture;
  39 import java.util.concurrent.CompletionStage;
  40 import java.util.concurrent.ConcurrentLinkedDeque;
  41 import java.util.concurrent.Flow;
  42 import java.util.function.BiPredicate;
  43 import java.util.function.Predicate;
  44 import java.net.http.HttpClient;
  45 import java.net.http.HttpClient.Version;
  46 import java.net.http.HttpHeaders;
  47 import jdk.internal.net.http.common.Demand;
  48 import jdk.internal.net.http.common.FlowTube;
  49 import jdk.internal.net.http.common.Logger;
  50 import jdk.internal.net.http.common.SequentialScheduler;
  51 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
  52 import jdk.internal.net.http.common.Log;
  53 import jdk.internal.net.http.common.Utils;
  54 import static java.net.http.HttpClient.Version.HTTP_2;
  55 
  56 /**
  57  * Wraps socket channel layer and takes care of SSL also.
  58  *
  59  * Subtypes are:
  60  *      PlainHttpConnection: regular direct TCP connection to server
  61  *      PlainProxyConnection: plain text proxy connection
  62  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
  63  *      AsyncSSLConnection: TLS channel direct to server
  64  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
  65  */
  66 abstract class HttpConnection implements Closeable {
  67 
  68     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  69     final static Logger DEBUG_LOGGER = Utils.getDebugLogger(
  70             () -> "HttpConnection(SocketTube(?))", Utils.DEBUG);
  71 
  72     /** The address this connection is connected to. Could be a server or a proxy. */
  73     final InetSocketAddress address;
  74     private final HttpClientImpl client;
  75     private final TrailingOperations trailingOperations;
  76 
  77     HttpConnection(InetSocketAddress address, HttpClientImpl client) {
  78         this.address = address;
  79         this.client = client;
  80         trailingOperations = new TrailingOperations();
  81     }
  82 
  83     private static final class TrailingOperations {
  84         private final Map<CompletionStage<?>, Boolean> operations =
  85                 new IdentityHashMap<>();
  86         void add(CompletionStage<?> cf) {
  87             synchronized(operations) {
  88                 operations.put(cf, Boolean.TRUE);
  89                 cf.whenComplete((r,t)-> remove(cf));
  90             }
  91         }
  92         boolean remove(CompletionStage<?> cf) {
  93             synchronized(operations) {
  94                 return operations.remove(cf);
  95             }
  96         }
  97     }
  98 
  99     final void addTrailingOperation(CompletionStage<?> cf) {
 100         trailingOperations.add(cf);
 101     }
 102 
 103 //    final void removeTrailingOperation(CompletableFuture<?> cf) {
 104 //        trailingOperations.remove(cf);
 105 //    }
 106 
 107     final HttpClientImpl client() {
 108         return client;
 109     }
 110 
 111     /**
 112      * Initiates the connect phase.
 113      *
 114      * Returns a CompletableFuture that completes when the underlying
 115      * TCP connection has been established or an error occurs.
 116      */
 117     public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange);
 118 
 119     /**
 120      * Finishes the connection phase.
 121      *
 122      * Returns a CompletableFuture that completes when any additional,
 123      * type specific, setup has been done. Must be called after connectAsync. */
 124     public abstract CompletableFuture<Void> finishConnect();
 125 
 126     /** Tells whether, or not, this connection is connected to its destination. */
 127     abstract boolean connected();
 128 
 129     /** Tells whether, or not, this connection is secure ( over SSL ) */
 130     abstract boolean isSecure();
 131 
 132     /**
 133      * Tells whether, or not, this connection is proxied.
 134      * Returns true for tunnel connections, or clear connection to
 135      * any host through proxy.
 136      */
 137     abstract boolean isProxied();
 138 
 139     /**
 140      * Returns the address of the proxy used by this connection.
 141      * Returns the proxy address for tunnel connections, or
 142      * clear connection to any host through proxy.
 143      * Returns {@code null} otherwise.
 144      */
 145     abstract InetSocketAddress proxy();
 146 
 147     /** Tells whether, or not, this connection is open. */
 148     final boolean isOpen() {
 149         return channel().isOpen() &&
 150                 (connected() ? !getConnectionFlow().isFinished() : true);
 151     }
 152 
 153     interface HttpPublisher extends FlowTube.TubePublisher {
 154         void enqueue(List<ByteBuffer> buffers) throws IOException;
 155         void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
 156         void signalEnqueued() throws IOException;
 157     }
 158 
 159     /**
 160      * Returns the HTTP publisher associated with this connection.  May be null
 161      * if invoked before connecting.
 162      */
 163     abstract HttpPublisher publisher();
 164 
 165     // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS
 166     private static final Predicate<String> testRequiredHTTP2TLSVersion = proto ->
 167             proto.equals("TLSv1.2") || proto.equals("TLSv1.3");
 168 
 169    /**
 170     * Returns true if the given client's SSL parameter protocols contains at
 171     * least one TLS version that HTTP/2 requires.
 172     */
 173    private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) {
 174        String[] protos = client.sslParameters().getProtocols();
 175        if (protos != null) {
 176            return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent();
 177        } else {
 178            return false;
 179        }
 180    }
 181 
 182     /**
 183      * Factory for retrieving HttpConnections. A connection can be retrieved
 184      * from the connection pool, or a new one created if none available.
 185      *
 186      * The given {@code addr} is the ultimate destination. Any proxies,
 187      * etc, are determined from the request. Returns a concrete instance which
 188      * is one of the following:
 189      *      {@link PlainHttpConnection}
 190      *      {@link PlainTunnelingConnection}
 191      *
 192      * The returned connection, if not from the connection pool, must have its,
 193      * connect() or connectAsync() method invoked, which ( when it completes
 194      * successfully ) renders the connection usable for requests.
 195      */
 196     public static HttpConnection getConnection(InetSocketAddress addr,
 197                                                HttpClientImpl client,
 198                                                HttpRequestImpl request,
 199                                                Version version) {
 200         // The default proxy selector may select a proxy whose  address is
 201         // unresolved. We must resolve the address before connecting to it.
 202         InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
 203         HttpConnection c = null;
 204         boolean secure = request.secure();
 205         ConnectionPool pool = client.connectionPool();
 206 
 207         if (!secure) {
 208             c = pool.getConnection(false, addr, proxy);
 209             if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
 210                 final HttpConnection conn = c;
 211                 if (DEBUG_LOGGER.on())
 212                     DEBUG_LOGGER.log(conn.getConnectionFlow()
 213                                      + ": plain connection retrieved from HTTP/1.1 pool");
 214                 return c;
 215             } else {
 216                 return getPlainConnection(addr, proxy, request, client);
 217             }
 218         } else {  // secure
 219             if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
 220                 c = pool.getConnection(true, addr, proxy);
 221             }
 222             if (c != null && c.isOpen()) {
 223                 final HttpConnection conn = c;
 224                 if (DEBUG_LOGGER.on())
 225                     DEBUG_LOGGER.log(conn.getConnectionFlow()
 226                                      + ": SSL connection retrieved from HTTP/1.1 pool");
 227                 return c;
 228             } else {
 229                 String[] alpn = null;
 230                 if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
 231                     alpn = new String[] { "h2", "http/1.1" };
 232                 }
 233                 return getSSLConnection(addr, proxy, alpn, request, client);
 234             }
 235         }
 236     }
 237 
 238     private static HttpConnection getSSLConnection(InetSocketAddress addr,
 239                                                    InetSocketAddress proxy,
 240                                                    String[] alpn,
 241                                                    HttpRequestImpl request,
 242                                                    HttpClientImpl client) {
 243         if (proxy != null)
 244             return new AsyncSSLTunnelConnection(addr, client, alpn, proxy,
 245                                                 proxyTunnelHeaders(request));
 246         else
 247             return new AsyncSSLConnection(addr, client, alpn);
 248     }
 249 
 250     /**
 251      * This method is used to build a filter that will accept or
 252      * veto (header-name, value) tuple for transmission on the
 253      * wire.
 254      * The filter is applied to the headers when sending the headers
 255      * to the remote party.
 256      * Which tuple is accepted/vetoed depends on:
 257      * <pre>
 258      *    - whether the connection is a tunnel connection
 259      *      [talking to a server through a proxy tunnel]
 260      *    - whether the method is CONNECT
 261      *      [establishing a CONNECT tunnel through a proxy]
 262      *    - whether the request is using a proxy
 263      *      (and the connection is not a tunnel)
 264      *      [talking to a server through a proxy]
 265      *    - whether the request is a direct connection to
 266      *      a server (no tunnel, no proxy).
 267      * </pre>
 268      * @param request
 269      * @return
 270      */
 271     BiPredicate<String,String> headerFilter(HttpRequestImpl request) {
 272         if (isTunnel()) {
 273             // talking to a server through a proxy tunnel
 274             // don't send proxy-* headers to a plain server
 275             assert !request.isConnect();
 276             return Utils.NO_PROXY_HEADERS_FILTER;
 277         } else if (request.isConnect()) {
 278             // establishing a proxy tunnel
 279             // check for proxy tunnel disabled schemes
 280             // assert !this.isTunnel();
 281             assert request.proxy() == null;
 282             return Utils.PROXY_TUNNEL_FILTER;
 283         } else if (request.proxy() != null) {
 284             // talking to a server through a proxy (no tunnel)
 285             // check for proxy disabled schemes
 286             // assert !isTunnel() && !request.isConnect();
 287             return Utils.PROXY_FILTER;
 288         } else {
 289             // talking to a server directly (no tunnel, no proxy)
 290             // don't send proxy-* headers to a plain server
 291             // assert request.proxy() == null && !request.isConnect();
 292             return Utils.NO_PROXY_HEADERS_FILTER;
 293         }
 294     }
 295 
 296     // Composes a new immutable HttpHeaders that combines the
 297     // user and system header but only keeps those headers that
 298     // start with "proxy-"
 299     private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) {
 300         Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
 301         combined.putAll(request.getSystemHeadersBuilder().map());
 302         combined.putAll(request.headers().map()); // let user override system
 303 
 304         // keep only proxy-* - and also strip authorization headers
 305         // for disabled schemes
 306         return HttpHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER);
 307     }
 308 
 309     /* Returns either a plain HTTP connection or a plain tunnelling connection
 310      * for proxied WebSocket */
 311     private static HttpConnection getPlainConnection(InetSocketAddress addr,
 312                                                      InetSocketAddress proxy,
 313                                                      HttpRequestImpl request,
 314                                                      HttpClientImpl client) {
 315         if (request.isWebSocket() && proxy != null)
 316             return new PlainTunnelingConnection(addr, proxy, client,
 317                                                 proxyTunnelHeaders(request));
 318 
 319         if (proxy == null)
 320             return new PlainHttpConnection(addr, client);
 321         else
 322             return new PlainProxyConnection(proxy, client);
 323     }
 324 
 325     void closeOrReturnToCache(HttpHeaders hdrs) {
 326         if (hdrs == null) {
 327             // the connection was closed by server, eof
 328             Log.logTrace("Cannot return connection to pool: closing {0}", this);
 329             close();
 330             return;
 331         }
 332         HttpClientImpl client = client();
 333         if (client == null) {
 334             Log.logTrace("Client released: closing {0}", this);
 335             close();
 336             return;
 337         }
 338         ConnectionPool pool = client.connectionPool();
 339         boolean keepAlive = hdrs.firstValue("Connection")
 340                 .map((s) -> !s.equalsIgnoreCase("close"))
 341                 .orElse(true);
 342 
 343         if (keepAlive && isOpen()) {
 344             Log.logTrace("Returning connection to the pool: {0}", this);
 345             pool.returnToPool(this);
 346         } else {
 347             Log.logTrace("Closing connection (keepAlive={0}, isOpen={1}): {2}",
 348                     keepAlive, isOpen(), this);
 349             close();
 350         }
 351     }
 352 
 353     /* Tells whether or not this connection is a tunnel through a proxy */
 354     boolean isTunnel() { return false; }
 355 
 356     abstract SocketChannel channel();
 357 
 358     final InetSocketAddress address() {
 359         return address;
 360     }
 361 
 362     abstract ConnectionPool.CacheKey cacheKey();
 363 
 364     /**
 365      * Closes this connection, by returning the socket to its connection pool.
 366      */
 367     @Override
 368     public abstract void close();
 369 
 370     abstract FlowTube getConnectionFlow();
 371 
 372     /**
 373      * A publisher that makes it possible to publish (write) ordered (normal
 374      * priority) and unordered (high priority) buffers downstream.
 375      */
 376     final class PlainHttpPublisher implements HttpPublisher {
 377         final Object reading;
 378         PlainHttpPublisher() {
 379             this(new Object());
 380         }
 381         PlainHttpPublisher(Object readingLock) {
 382             this.reading = readingLock;
 383         }
 384         final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
 385         final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>();
 386         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
 387         volatile HttpWriteSubscription subscription;
 388         final SequentialScheduler writeScheduler =
 389                     new SequentialScheduler(this::flushTask);
 390         @Override
 391         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 392             synchronized (reading) {
 393                 //assert this.subscription == null;
 394                 //assert this.subscriber == null;
 395                 if (subscription == null) {
 396                     subscription = new HttpWriteSubscription();
 397                 }
 398                 this.subscriber = subscriber;
 399             }
 400             // TODO: should we do this in the flow?
 401             subscriber.onSubscribe(subscription);
 402             signal();
 403         }
 404 
 405         void flushTask(DeferredCompleter completer) {
 406             try {
 407                 HttpWriteSubscription sub = subscription;
 408                 if (sub != null) sub.flush();
 409             } finally {
 410                 completer.complete();
 411             }
 412         }
 413 
 414         void signal() {
 415             writeScheduler.runOrSchedule();
 416         }
 417 
 418         final class HttpWriteSubscription implements Flow.Subscription {
 419             final Demand demand = new Demand();
 420 
 421             @Override
 422             public void request(long n) {
 423                 if (n <= 0) throw new IllegalArgumentException("non-positive request");
 424                 demand.increase(n);
 425                 if (debug.on())
 426                     debug.log("HttpPublisher: got request of "  + n + " from "
 427                                + getConnectionFlow());
 428                 writeScheduler.runOrSchedule();
 429             }
 430 
 431             @Override
 432             public void cancel() {
 433                 if (debug.on())
 434                     debug.log("HttpPublisher: cancelled by " + getConnectionFlow());
 435             }
 436 
 437             private boolean isEmpty() {
 438                 return queue.isEmpty() && priority.isEmpty();
 439             }
 440 
 441             private List<ByteBuffer> poll() {
 442                 List<ByteBuffer> elem = priority.poll();
 443                 return elem == null ? queue.poll() : elem;
 444             }
 445 
 446             void flush() {
 447                 while (!isEmpty() && demand.tryDecrement()) {
 448                     List<ByteBuffer> elem = poll();
 449                     if (debug.on())
 450                         debug.log("HttpPublisher: sending "
 451                                     + Utils.remaining(elem) + " bytes ("
 452                                     + elem.size() + " buffers) to "
 453                                     + getConnectionFlow());
 454                     subscriber.onNext(elem);
 455                 }
 456             }
 457         }
 458 
 459         @Override
 460         public void enqueue(List<ByteBuffer> buffers) throws IOException {
 461             queue.add(buffers);
 462             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 463             debug.log("added %d bytes to the write queue", bytes);
 464         }
 465 
 466         @Override
 467         public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
 468             // Unordered frames are sent before existing frames.
 469             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 470             priority.add(buffers);
 471             debug.log("added %d bytes in the priority write queue", bytes);
 472         }
 473 
 474         @Override
 475         public void signalEnqueued() throws IOException {
 476             debug.log("signalling the publisher of the write queue");
 477             signal();
 478         }
 479     }
 480 
 481     String dbgTag;
 482     final String dbgString() {
 483         FlowTube flow = getConnectionFlow();
 484         String tag = dbgTag;
 485         if (tag == null && flow != null) {
 486             dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
 487         } else if (tag == null) {
 488             tag = this.getClass().getSimpleName() + "(?)";
 489         }
 490         return tag;
 491     }
 492 
 493     @Override
 494     public String toString() {
 495         return "HttpConnection: " + channel().toString();
 496     }
 497 }