1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.Closeable;
  29 import java.io.IOException;
  30 import java.net.InetSocketAddress;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.SocketChannel;
  33 import java.util.Arrays;
  34 import java.util.IdentityHashMap;
  35 import java.util.List;
  36 import java.util.Map;
  37 import java.util.TreeMap;
  38 import java.util.concurrent.CompletableFuture;
  39 import java.util.concurrent.CompletionStage;
  40 import java.util.concurrent.ConcurrentLinkedDeque;
  41 import java.util.concurrent.Flow;
  42 import java.util.function.BiPredicate;
  43 import java.util.function.Predicate;
  44 import java.net.http.HttpClient;
  45 import java.net.http.HttpClient.Version;
  46 import java.net.http.HttpHeaders;
  47 import jdk.internal.net.http.common.Demand;
  48 import jdk.internal.net.http.common.FlowTube;
  49 import jdk.internal.net.http.common.Logger;
  50 import jdk.internal.net.http.common.SequentialScheduler;
  51 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
  52 import jdk.internal.net.http.common.Log;
  53 import jdk.internal.net.http.common.Utils;
  54 import static java.net.http.HttpClient.Version.HTTP_2;
  55 
  56 /**
  57  * Wraps socket channel layer and takes care of SSL also.
  58  *
  59  * Subtypes are:
  60  *      PlainHttpConnection: regular direct TCP connection to server
  61  *      PlainProxyConnection: plain text proxy connection
  62  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
  63  *      AsyncSSLConnection: TLS channel direct to server
  64  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
  65  */
  66 abstract class HttpConnection implements Closeable {
  67 
  68     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  69     final static Logger DEBUG_LOGGER = Utils.getDebugLogger(
  70             () -> "HttpConnection(SocketTube(?))", Utils.DEBUG);
  71 
  72     /** The address this connection is connected to. Could be a server or a proxy. */
  73     final InetSocketAddress address;
  74     private final HttpClientImpl client;
  75     private final TrailingOperations trailingOperations;
  76 
  77     HttpConnection(InetSocketAddress address, HttpClientImpl client) {
  78         this.address = address;
  79         this.client = client;
  80         trailingOperations = new TrailingOperations();
  81     }
  82 
  83     private static final class TrailingOperations {
  84         private final Map<CompletionStage<?>, Boolean> operations =
  85                 new IdentityHashMap<>();
  86         void add(CompletionStage<?> cf) {
  87             synchronized(operations) {
  88                 cf.whenComplete((r,t)-> remove(cf));
  89                 operations.put(cf, Boolean.TRUE);
  90             }
  91         }
  92         boolean remove(CompletionStage<?> cf) {
  93             synchronized(operations) {
  94                 return operations.remove(cf);
  95             }
  96         }
  97     }
  98 
  99     final void addTrailingOperation(CompletionStage<?> cf) {
 100         trailingOperations.add(cf);
 101     }
 102 
 103 //    final void removeTrailingOperation(CompletableFuture<?> cf) {
 104 //        trailingOperations.remove(cf);
 105 //    }
 106 
 107     final HttpClientImpl client() {
 108         return client;
 109     }
 110 
 111     /**
 112      * Initiates the connect and returns a CompletableFuture that completes
 113      * when the initial connection phase has been established or an error
 114      * occurs. The given exchange provides access connect timeout configuration.
 115      */
 116     public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange);
 117 
 118     /** Completes the connection phase. Must be called after connectAsync. */
 119     public abstract CompletableFuture<Void> finishConnect();
 120 
 121     /** Tells whether, or not, this connection is connected to its destination. */
 122     abstract boolean connected();
 123 
 124     /** Tells whether, or not, this connection is secure ( over SSL ) */
 125     abstract boolean isSecure();
 126 
 127     /**
 128      * Tells whether, or not, this connection is proxied.
 129      * Returns true for tunnel connections, or clear connection to
 130      * any host through proxy.
 131      */
 132     abstract boolean isProxied();
 133 
 134     /** Tells whether, or not, this connection is open. */
 135     final boolean isOpen() {
 136         return channel().isOpen() &&
 137                 (connected() ? !getConnectionFlow().isFinished() : true);
 138     }
 139 
 140     interface HttpPublisher extends FlowTube.TubePublisher {
 141         void enqueue(List<ByteBuffer> buffers) throws IOException;
 142         void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
 143         void signalEnqueued() throws IOException;
 144     }
 145 
 146     /**
 147      * Returns the HTTP publisher associated with this connection.  May be null
 148      * if invoked before connecting.
 149      */
 150     abstract HttpPublisher publisher();
 151 
 152     // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS
 153     private static final Predicate<String> testRequiredHTTP2TLSVersion = proto ->
 154             proto.equals("TLSv1.2") || proto.equals("TLSv1.3");
 155 
 156    /**
 157     * Returns true if the given client's SSL parameter protocols contains at
 158     * least one TLS version that HTTP/2 requires.
 159     */
 160    private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) {
 161        String[] protos = client.sslParameters().getProtocols();
 162        if (protos != null) {
 163            return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent();
 164        } else {
 165            return false;
 166        }
 167    }
 168 
 169     /**
 170      * Factory for retrieving HttpConnections. A connection can be retrieved
 171      * from the connection pool, or a new one created if none available.
 172      *
 173      * The given {@code addr} is the ultimate destination. Any proxies,
 174      * etc, are determined from the request. Returns a concrete instance which
 175      * is one of the following:
 176      *      {@link PlainHttpConnection}
 177      *      {@link PlainTunnelingConnection}
 178      *
 179      * The returned connection, if not from the connection pool, must have its,
 180      * connect() or connectAsync() method invoked, which ( when it completes
 181      * successfully ) renders the connection usable for requests.
 182      */
 183     public static HttpConnection getConnection(InetSocketAddress addr,
 184                                                HttpClientImpl client,
 185                                                HttpRequestImpl request,
 186                                                Version version) {
 187         // The default proxy selector may select a proxy whose  address is
 188         // unresolved. We must resolve the address before connecting to it.
 189         InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
 190         HttpConnection c = null;
 191         boolean secure = request.secure();
 192         ConnectionPool pool = client.connectionPool();
 193 
 194         if (!secure) {
 195             c = pool.getConnection(false, addr, proxy);
 196             if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
 197                 final HttpConnection conn = c;
 198                 if (DEBUG_LOGGER.on())
 199                     DEBUG_LOGGER.log(conn.getConnectionFlow()
 200                                      + ": plain connection retrieved from HTTP/1.1 pool");
 201                 return c;
 202             } else {
 203                 return getPlainConnection(addr, proxy, request, client);
 204             }
 205         } else {  // secure
 206             if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
 207                 c = pool.getConnection(true, addr, proxy);
 208             }
 209             if (c != null && c.isOpen()) {
 210                 final HttpConnection conn = c;
 211                 if (DEBUG_LOGGER.on())
 212                     DEBUG_LOGGER.log(conn.getConnectionFlow()
 213                                      + ": SSL connection retrieved from HTTP/1.1 pool");
 214                 return c;
 215             } else {
 216                 String[] alpn = null;
 217                 if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
 218                     alpn = new String[] { "h2", "http/1.1" };
 219                 }
 220                 return getSSLConnection(addr, proxy, alpn, request, client);
 221             }
 222         }
 223     }
 224 
 225     private static HttpConnection getSSLConnection(InetSocketAddress addr,
 226                                                    InetSocketAddress proxy,
 227                                                    String[] alpn,
 228                                                    HttpRequestImpl request,
 229                                                    HttpClientImpl client) {
 230         if (proxy != null)
 231             return new AsyncSSLTunnelConnection(addr, client, alpn, proxy,
 232                                                 proxyTunnelHeaders(request));
 233         else
 234             return new AsyncSSLConnection(addr, client, alpn);
 235     }
 236 
 237     /**
 238      * This method is used to build a filter that will accept or
 239      * veto (header-name, value) tuple for transmission on the
 240      * wire.
 241      * The filter is applied to the headers when sending the headers
 242      * to the remote party.
 243      * Which tuple is accepted/vetoed depends on:
 244      * <pre>
 245      *    - whether the connection is a tunnel connection
 246      *      [talking to a server through a proxy tunnel]
 247      *    - whether the method is CONNECT
 248      *      [establishing a CONNECT tunnel through a proxy]
 249      *    - whether the request is using a proxy
 250      *      (and the connection is not a tunnel)
 251      *      [talking to a server through a proxy]
 252      *    - whether the request is a direct connection to
 253      *      a server (no tunnel, no proxy).
 254      * </pre>
 255      * @param request
 256      * @return
 257      */
 258     BiPredicate<String,String> headerFilter(HttpRequestImpl request) {
 259         if (isTunnel()) {
 260             // talking to a server through a proxy tunnel
 261             // don't send proxy-* headers to a plain server
 262             assert !request.isConnect();
 263             return Utils.NO_PROXY_HEADERS_FILTER;
 264         } else if (request.isConnect()) {
 265             // establishing a proxy tunnel
 266             // check for proxy tunnel disabled schemes
 267             // assert !this.isTunnel();
 268             assert request.proxy() == null;
 269             return Utils.PROXY_TUNNEL_FILTER;
 270         } else if (request.proxy() != null) {
 271             // talking to a server through a proxy (no tunnel)
 272             // check for proxy disabled schemes
 273             // assert !isTunnel() && !request.isConnect();
 274             return Utils.PROXY_FILTER;
 275         } else {
 276             // talking to a server directly (no tunnel, no proxy)
 277             // don't send proxy-* headers to a plain server
 278             // assert request.proxy() == null && !request.isConnect();
 279             return Utils.NO_PROXY_HEADERS_FILTER;
 280         }
 281     }
 282 
 283     // Composes a new immutable HttpHeaders that combines the
 284     // user and system header but only keeps those headers that
 285     // start with "proxy-"
 286     private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) {
 287         Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
 288         combined.putAll(request.getSystemHeadersBuilder().map());
 289         combined.putAll(request.headers().map()); // let user override system
 290 
 291         // keep only proxy-* - and also strip authorization headers
 292         // for disabled schemes
 293         return HttpHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER);
 294     }
 295 
 296     /* Returns either a plain HTTP connection or a plain tunnelling connection
 297      * for proxied WebSocket */
 298     private static HttpConnection getPlainConnection(InetSocketAddress addr,
 299                                                      InetSocketAddress proxy,
 300                                                      HttpRequestImpl request,
 301                                                      HttpClientImpl client) {
 302         if (request.isWebSocket() && proxy != null)
 303             return new PlainTunnelingConnection(addr, proxy, client,
 304                                                 proxyTunnelHeaders(request));
 305 
 306         if (proxy == null)
 307             return new PlainHttpConnection(addr, client);
 308         else
 309             return new PlainProxyConnection(proxy, client);
 310     }
 311 
 312     void closeOrReturnToCache(HttpHeaders hdrs) {
 313         if (hdrs == null) {
 314             // the connection was closed by server, eof
 315             close();
 316             return;
 317         }
 318         if (!isOpen()) {
 319             return;
 320         }
 321         HttpClientImpl client = client();
 322         if (client == null) {
 323             close();
 324             return;
 325         }
 326         ConnectionPool pool = client.connectionPool();
 327         boolean keepAlive = hdrs.firstValue("Connection")
 328                 .map((s) -> !s.equalsIgnoreCase("close"))
 329                 .orElse(true);
 330 
 331         if (keepAlive) {
 332             Log.logTrace("Returning connection to the pool: {0}", this);
 333             pool.returnToPool(this);
 334         } else {
 335             close();
 336         }
 337     }
 338 
 339     /* Tells whether or not this connection is a tunnel through a proxy */
 340     boolean isTunnel() { return false; }
 341 
 342     abstract SocketChannel channel();
 343 
 344     final InetSocketAddress address() {
 345         return address;
 346     }
 347 
 348     abstract ConnectionPool.CacheKey cacheKey();
 349 
 350     /**
 351      * Closes this connection, by returning the socket to its connection pool.
 352      */
 353     @Override
 354     public abstract void close();
 355 
 356     abstract FlowTube getConnectionFlow();
 357 
 358     /**
 359      * A publisher that makes it possible to publish (write) ordered (normal
 360      * priority) and unordered (high priority) buffers downstream.
 361      */
 362     final class PlainHttpPublisher implements HttpPublisher {
 363         final Object reading;
 364         PlainHttpPublisher() {
 365             this(new Object());
 366         }
 367         PlainHttpPublisher(Object readingLock) {
 368             this.reading = readingLock;
 369         }
 370         final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
 371         final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>();
 372         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
 373         volatile HttpWriteSubscription subscription;
 374         final SequentialScheduler writeScheduler =
 375                     new SequentialScheduler(this::flushTask);
 376         @Override
 377         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 378             synchronized (reading) {
 379                 //assert this.subscription == null;
 380                 //assert this.subscriber == null;
 381                 if (subscription == null) {
 382                     subscription = new HttpWriteSubscription();
 383                 }
 384                 this.subscriber = subscriber;
 385             }
 386             // TODO: should we do this in the flow?
 387             subscriber.onSubscribe(subscription);
 388             signal();
 389         }
 390 
 391         void flushTask(DeferredCompleter completer) {
 392             try {
 393                 HttpWriteSubscription sub = subscription;
 394                 if (sub != null) sub.flush();
 395             } finally {
 396                 completer.complete();
 397             }
 398         }
 399 
 400         void signal() {
 401             writeScheduler.runOrSchedule();
 402         }
 403 
 404         final class HttpWriteSubscription implements Flow.Subscription {
 405             final Demand demand = new Demand();
 406 
 407             @Override
 408             public void request(long n) {
 409                 if (n <= 0) throw new IllegalArgumentException("non-positive request");
 410                 demand.increase(n);
 411                 if (debug.on())
 412                     debug.log("HttpPublisher: got request of "  + n + " from "
 413                                + getConnectionFlow());
 414                 writeScheduler.runOrSchedule();
 415             }
 416 
 417             @Override
 418             public void cancel() {
 419                 if (debug.on())
 420                     debug.log("HttpPublisher: cancelled by " + getConnectionFlow());
 421             }
 422 
 423             private boolean isEmpty() {
 424                 return queue.isEmpty() && priority.isEmpty();
 425             }
 426 
 427             private List<ByteBuffer> poll() {
 428                 List<ByteBuffer> elem = priority.poll();
 429                 return elem == null ? queue.poll() : elem;
 430             }
 431 
 432             void flush() {
 433                 while (!isEmpty() && demand.tryDecrement()) {
 434                     List<ByteBuffer> elem = poll();
 435                     if (debug.on())
 436                         debug.log("HttpPublisher: sending "
 437                                     + Utils.remaining(elem) + " bytes ("
 438                                     + elem.size() + " buffers) to "
 439                                     + getConnectionFlow());
 440                     subscriber.onNext(elem);
 441                 }
 442             }
 443         }
 444 
 445         @Override
 446         public void enqueue(List<ByteBuffer> buffers) throws IOException {
 447             queue.add(buffers);
 448             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 449             debug.log("added %d bytes to the write queue", bytes);
 450         }
 451 
 452         @Override
 453         public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
 454             // Unordered frames are sent before existing frames.
 455             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 456             priority.add(buffers);
 457             debug.log("added %d bytes in the priority write queue", bytes);
 458         }
 459 
 460         @Override
 461         public void signalEnqueued() throws IOException {
 462             debug.log("signalling the publisher of the write queue");
 463             signal();
 464         }
 465     }
 466 
 467     String dbgTag;
 468     final String dbgString() {
 469         FlowTube flow = getConnectionFlow();
 470         String tag = dbgTag;
 471         if (tag == null && flow != null) {
 472             dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
 473         } else if (tag == null) {
 474             tag = this.getClass().getSimpleName() + "(?)";
 475         }
 476         return tag;
 477     }
 478 
 479     @Override
 480     public String toString() {
 481         return "HttpConnection: " + channel().toString();
 482     }
 483 }