1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.Closeable;
  29 import java.io.IOException;
  30 import java.lang.System.Logger.Level;
  31 import java.net.InetSocketAddress;
  32 import java.nio.ByteBuffer;
  33 import java.nio.channels.SocketChannel;
  34 import java.util.IdentityHashMap;
  35 import java.util.List;
  36 import java.util.Map;
  37 import java.util.concurrent.CompletableFuture;
  38 import java.util.concurrent.CompletionStage;
  39 import java.util.concurrent.ConcurrentLinkedDeque;
  40 import java.util.concurrent.Flow;
  41 import jdk.incubator.http.HttpClient.Version;
  42 import jdk.incubator.http.internal.common.Demand;
  43 import jdk.incubator.http.internal.common.FlowTube;
  44 import jdk.incubator.http.internal.common.SequentialScheduler;
  45 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
  46 import jdk.incubator.http.internal.common.Log;
  47 import jdk.incubator.http.internal.common.Utils;
  48 import static jdk.incubator.http.HttpClient.Version.HTTP_2;
  49 
  50 /**
  51  * Wraps socket channel layer and takes care of SSL also.
  52  *
  53  * Subtypes are:
  54  *      PlainHttpConnection: regular direct TCP connection to server
  55  *      PlainProxyConnection: plain text proxy connection
  56  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
  57  *      AsyncSSLConnection: TLS channel direct to server
  58  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
  59  */
  60 abstract class HttpConnection implements Closeable {
  61 
  62     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  63     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
  64     final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger(
  65             () -> "HttpConnection(SocketTube(?))", DEBUG);
  66 
  67     /** The address this connection is connected to. Could be a server or a proxy. */
  68     final InetSocketAddress address;
  69     private final HttpClientImpl client;
  70     private final TrailingOperations trailingOperations;
  71 
  72     HttpConnection(InetSocketAddress address, HttpClientImpl client) {
  73         this.address = address;
  74         this.client = client;
  75         trailingOperations = new TrailingOperations();
  76     }
  77 
  78     private static final class TrailingOperations {
  79         private final Map<CompletionStage<?>, Boolean> operations =
  80                 new IdentityHashMap<>();
  81         void add(CompletionStage<?> cf) {
  82             synchronized(operations) {
  83                 cf.whenComplete((r,t)-> remove(cf));
  84                 operations.put(cf, Boolean.TRUE);
  85             }
  86         }
  87         boolean remove(CompletionStage<?> cf) {
  88             synchronized(operations) {
  89                 return operations.remove(cf);
  90             }
  91         }
  92     }
  93 
  94     final void addTrailingOperation(CompletionStage<?> cf) {
  95         trailingOperations.add(cf);
  96     }
  97 
  98 //    final void removeTrailingOperation(CompletableFuture<?> cf) {
  99 //        trailingOperations.remove(cf);
 100 //    }
 101 
 102     final HttpClientImpl client() {
 103         return client;
 104     }
 105 
 106     //public abstract void connect() throws IOException, InterruptedException;
 107 
 108     public abstract CompletableFuture<Void> connectAsync();
 109 
 110     /** Tells whether, or not, this connection is connected to its destination. */
 111     abstract boolean connected();
 112 
 113     /** Tells whether, or not, this connection is secure ( over SSL ) */
 114     abstract boolean isSecure();
 115 
 116     /** Tells whether, or not, this connection is proxied. */
 117     abstract boolean isProxied();
 118 
 119     /** Tells whether, or not, this connection is open. */
 120     final boolean isOpen() {
 121         return channel().isOpen() &&
 122                 (connected() ? !getConnectionFlow().isFinished() : true);
 123     }
 124 
 125     interface HttpPublisher extends FlowTube.TubePublisher {
 126         void enqueue(List<ByteBuffer> buffers) throws IOException;
 127         void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
 128         void signalEnqueued() throws IOException;
 129     }
 130 
 131     /**
 132      * Returns the HTTP publisher associated with this connection.  May be null
 133      * if invoked before connecting.
 134      */
 135     abstract HttpPublisher publisher();
 136 
 137     /**
 138      * Factory for retrieving HttpConnections. A connection can be retrieved
 139      * from the connection pool, or a new one created if none available.
 140      *
 141      * The given {@code addr} is the ultimate destination. Any proxies,
 142      * etc, are determined from the request. Returns a concrete instance which
 143      * is one of the following:
 144      *      {@link PlainHttpConnection}
 145      *      {@link PlainTunnelingConnection}
 146      *
 147      * The returned connection, if not from the connection pool, must have its,
 148      * connect() or connectAsync() method invoked, which ( when it completes
 149      * successfully ) renders the connection usable for requests.
 150      */
 151     public static HttpConnection getConnection(InetSocketAddress addr,
 152                                                HttpClientImpl client,
 153                                                HttpRequestImpl request,
 154                                                Version version) {
 155         HttpConnection c = null;
 156         InetSocketAddress proxy = request.proxy();
 157         if (proxy != null && proxy.isUnresolved()) {
 158             // The default proxy selector may select a proxy whose  address is
 159             // unresolved. We must resolve the address before connecting to it.
 160             proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
 161         }
 162         boolean secure = request.secure();
 163         ConnectionPool pool = client.connectionPool();
 164 
 165         if (!secure) {
 166             c = pool.getConnection(false, addr, proxy);
 167             if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
 168                 final HttpConnection conn = c;
 169                 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
 170                             + ": plain connection retrieved from HTTP/1.1 pool");
 171                 return c;
 172             } else {
 173                 return getPlainConnection(addr, proxy, request, client);
 174             }
 175         } else {  // secure
 176             if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
 177                 c = pool.getConnection(true, addr, proxy);
 178             }
 179             if (c != null && c.isOpen()) {
 180                 final HttpConnection conn = c;
 181                 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
 182                             + ": SSL connection retrieved from HTTP/1.1 pool");
 183                 return c;
 184             } else {
 185                 String[] alpn = null;
 186                 if (version == HTTP_2) {
 187                     alpn = new String[] { "h2", "http/1.1" };
 188                 }
 189                 return getSSLConnection(addr, proxy, alpn, client);
 190             }
 191         }
 192     }
 193 
 194     private static HttpConnection getSSLConnection(InetSocketAddress addr,
 195                                                    InetSocketAddress proxy,
 196                                                    String[] alpn,
 197                                                    HttpClientImpl client) {
 198         if (proxy != null)
 199             return new AsyncSSLTunnelConnection(addr, client, alpn, proxy);
 200         else
 201             return new AsyncSSLConnection(addr, client, alpn);
 202     }
 203 
 204     /* Returns either a plain HTTP connection or a plain tunnelling connection
 205      * for proxied WebSocket */
 206     private static HttpConnection getPlainConnection(InetSocketAddress addr,
 207                                                      InetSocketAddress proxy,
 208                                                      HttpRequestImpl request,
 209                                                      HttpClientImpl client) {
 210         if (request.isWebSocket() && proxy != null)
 211             return new PlainTunnelingConnection(addr, proxy, client);
 212 
 213         if (proxy == null)
 214             return new PlainHttpConnection(addr, client);
 215         else
 216             return new PlainProxyConnection(proxy, client);
 217     }
 218 
 219     void closeOrReturnToCache(HttpHeaders hdrs) {
 220         if (hdrs == null) {
 221             // the connection was closed by server, eof
 222             close();
 223             return;
 224         }
 225         if (!isOpen()) {
 226             return;
 227         }
 228         HttpClientImpl client = client();
 229         if (client == null) {
 230             close();
 231             return;
 232         }
 233         ConnectionPool pool = client.connectionPool();
 234         boolean keepAlive = hdrs.firstValue("Connection")
 235                 .map((s) -> !s.equalsIgnoreCase("close"))
 236                 .orElse(true);
 237 
 238         if (keepAlive) {
 239             Log.logTrace("Returning connection to the pool: {0}", this);
 240             pool.returnToPool(this);
 241         } else {
 242             close();
 243         }
 244     }
 245 
 246     abstract SocketChannel channel();
 247 
 248     final InetSocketAddress address() {
 249         return address;
 250     }
 251 
 252     abstract ConnectionPool.CacheKey cacheKey();
 253 
 254 //    // overridden in SSL only
 255 //    SSLParameters sslParameters() {
 256 //        return null;
 257 //    }
 258 
 259     /**
 260      * Closes this connection, by returning the socket to its connection pool.
 261      */
 262     @Override
 263     public abstract void close();
 264 
 265     abstract void shutdownInput() throws IOException;
 266 
 267     abstract void shutdownOutput() throws IOException;
 268 
 269     // Support for WebSocket/RawChannelImpl which unfortunately
 270     // still depends on synchronous read/writes.
 271     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
 272     abstract static class DetachedConnectionChannel implements Closeable {
 273         DetachedConnectionChannel() {}
 274         abstract SocketChannel channel();
 275         abstract long write(ByteBuffer[] buffers, int start, int number)
 276                 throws IOException;
 277         abstract void shutdownInput() throws IOException;
 278         abstract void shutdownOutput() throws IOException;
 279         abstract ByteBuffer read() throws IOException;
 280         @Override
 281         public abstract void close();
 282         @Override
 283         public String toString() {
 284             return this.getClass().getSimpleName() + ": " + channel().toString();
 285         }
 286     }
 287 
 288     // Support for WebSocket/RawChannelImpl which unfortunately
 289     // still depends on synchronous read/writes.
 290     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
 291     abstract DetachedConnectionChannel detachChannel();
 292 
 293     abstract FlowTube getConnectionFlow();
 294 
 295     /**
 296      * A publisher that makes it possible to publish (write)
 297      * ordered (normal priority) and unordered (high priority)
 298      * buffers downstream.
 299      */
 300     final class PlainHttpPublisher implements HttpPublisher {
 301         final Object reading;
 302         PlainHttpPublisher() {
 303             this(new Object());
 304         }
 305         PlainHttpPublisher(Object readingLock) {
 306             this.reading = readingLock;
 307         }
 308         final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
 309         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
 310         volatile HttpWriteSubscription subscription;
 311         final SequentialScheduler writeScheduler =
 312                     new SequentialScheduler(this::flushTask);
 313         @Override
 314         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 315             synchronized (reading) {
 316                 //assert this.subscription == null;
 317                 //assert this.subscriber == null;
 318                 if (subscription == null) {
 319                     subscription = new HttpWriteSubscription();
 320                 }
 321                 this.subscriber = subscriber;
 322             }
 323             // TODO: should we do this in the flow?
 324             subscriber.onSubscribe(subscription);
 325             signal();
 326         }
 327 
 328         void flushTask(DeferredCompleter completer) {
 329             try {
 330                 HttpWriteSubscription sub = subscription;
 331                 if (sub != null) sub.flush();
 332             } finally {
 333                 completer.complete();
 334             }
 335         }
 336 
 337         void signal() {
 338             writeScheduler.runOrSchedule();
 339         }
 340 
 341         final class HttpWriteSubscription implements Flow.Subscription {
 342             final Demand demand = new Demand();
 343 
 344             @Override
 345             public void request(long n) {
 346                 if (n <= 0) throw new IllegalArgumentException("non-positive request");
 347                 demand.increase(n);
 348                 debug.log(Level.DEBUG, () -> "HttpPublisher: got request of "
 349                             + n + " from "
 350                             + getConnectionFlow());
 351                 writeScheduler.runOrSchedule();
 352             }
 353 
 354             @Override
 355             public void cancel() {
 356                 debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by "
 357                           + getConnectionFlow());
 358             }
 359 
 360             void flush() {
 361                 while (!queue.isEmpty() && demand.tryDecrement()) {
 362                     List<ByteBuffer> elem = queue.poll();
 363                     debug.log(Level.DEBUG, () -> "HttpPublisher: sending "
 364                                 + Utils.remaining(elem) + " bytes ("
 365                                 + elem.size() + " buffers) to "
 366                                 + getConnectionFlow());
 367                     subscriber.onNext(elem);
 368                 }
 369             }
 370         }
 371 
 372         @Override
 373         public void enqueue(List<ByteBuffer> buffers) throws IOException {
 374             queue.add(buffers);
 375             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 376             debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes);
 377         }
 378 
 379         @Override
 380         public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
 381             // Unordered frames are sent before existing frames.
 382             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 383             queue.addFirst(buffers);
 384             debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
 385         }
 386 
 387         @Override
 388         public void signalEnqueued() throws IOException {
 389             debug.log(Level.DEBUG, "signalling the publisher of the write queue");
 390             signal();
 391         }
 392     }
 393 
 394     String dbgTag = null;
 395     final String dbgString() {
 396         FlowTube flow = getConnectionFlow();
 397         String tag = dbgTag;
 398         if (tag == null && flow != null) {
 399             dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
 400         } else if (tag == null) {
 401             tag = this.getClass().getSimpleName() + "(?)";
 402         }
 403         return tag;
 404     }
 405 
 406     @Override
 407     public String toString() {
 408         return "HttpConnection: " + channel().toString();
 409     }
 410 }