1 /*
   2  * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.InetSocketAddress;
  31 import java.nio.ByteBuffer;
  32 import java.time.Instant;
  33 import java.time.temporal.ChronoUnit;
  34 import java.util.ArrayList;
  35 import java.util.Collections;
  36 import java.util.HashMap;
  37 import java.util.Iterator;
  38 import java.util.LinkedList;
  39 import java.util.List;
  40 import java.util.ListIterator;
  41 import java.util.Objects;
  42 import java.util.Optional;
  43 import java.util.concurrent.Flow;
  44 import java.util.stream.Collectors;
  45 import jdk.internal.net.http.common.FlowTube;
  46 import jdk.internal.net.http.common.Logger;
  47 import jdk.internal.net.http.common.Utils;
  48 
  49 /**
  50  * Http 1.1 connection pool.
  51  */
  52 final class ConnectionPool {
  53 
  54     static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
  55             "jdk.httpclient.keepalive.timeout", 1200); // seconds
  56     static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty(
  57             "jdk.httpclient.connectionPoolSize", 0); // unbounded
  58     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  59 
  60     // Pools of idle connections
  61 
  62     private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
  63     private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
  64     private final ExpiryList expiryList;
  65     private final String dbgTag; // used for debug
  66     boolean stopped;
  67 
  68     /**
  69      * Entries in connection pool are keyed by destination address and/or
  70      * proxy address:
  71      * case 1: plain TCP not via proxy (destination only)
  72      * case 2: plain TCP via proxy (proxy only)
  73      * case 3: SSL not via proxy (destination only)
  74      * case 4: SSL over tunnel (destination and proxy)
  75      */
  76     static class CacheKey {
  77         final InetSocketAddress proxy;
  78         final InetSocketAddress destination;
  79 
  80         CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
  81             this.proxy = proxy;
  82             this.destination = destination;
  83         }
  84 
  85         @Override
  86         public boolean equals(Object obj) {
  87             if (obj == null) {
  88                 return false;
  89             }
  90             if (getClass() != obj.getClass()) {
  91                 return false;
  92             }
  93             final CacheKey other = (CacheKey) obj;
  94             if (!Objects.equals(this.proxy, other.proxy)) {
  95                 return false;
  96             }
  97             if (!Objects.equals(this.destination, other.destination)) {
  98                 return false;
  99             }
 100             return true;
 101         }
 102 
 103         @Override
 104         public int hashCode() {
 105             return Objects.hash(proxy, destination);
 106         }
 107     }
 108 
 109     ConnectionPool(long clientId) {
 110         this("ConnectionPool("+clientId+")");
 111     }
 112 
 113     /**
 114      * There should be one of these per HttpClient.
 115      */
 116     private ConnectionPool(String tag) {
 117         dbgTag = tag;
 118         plainPool = new HashMap<>();
 119         sslPool = new HashMap<>();
 120         expiryList = new ExpiryList();
 121     }
 122 
 123     final String dbgString() {
 124         return dbgTag;
 125     }
 126 
 127     synchronized void start() {
 128         assert !stopped : "Already stopped";
 129     }
 130 
 131     static CacheKey cacheKey(InetSocketAddress destination,
 132                              InetSocketAddress proxy)
 133     {
 134         return new CacheKey(destination, proxy);
 135     }
 136 
 137     synchronized HttpConnection getConnection(boolean secure,
 138                                               InetSocketAddress addr,
 139                                               InetSocketAddress proxy) {
 140         if (stopped) return null;
 141         CacheKey key = new CacheKey(addr, proxy);
 142         HttpConnection c = secure ? findConnection(key, sslPool)
 143                                   : findConnection(key, plainPool);
 144         //System.out.println ("getConnection returning: " + c);
 145         assert c == null || c.isSecure() == secure;
 146         return c;
 147     }
 148 
 149     /**
 150      * Returns the connection to the pool.
 151      */
 152     void returnToPool(HttpConnection conn) {
 153         returnToPool(conn, Instant.now(), KEEP_ALIVE);
 154     }
 155 
 156     // Called also by whitebox tests
 157     void returnToPool(HttpConnection conn, Instant now, long keepAlive) {
 158 
 159         assert (conn instanceof PlainHttpConnection) || conn.isSecure()
 160             : "Attempting to return unsecure connection to SSL pool: "
 161                 + conn.getClass();
 162 
 163         // Don't call registerCleanupTrigger while holding a lock,
 164         // but register it before the connection is added to the pool,
 165         // since we don't want to trigger the cleanup if the connection
 166         // is not in the pool.
 167         CleanupTrigger cleanup = registerCleanupTrigger(conn);
 168 
 169         // it's possible that cleanup may have been called.
 170         HttpConnection toClose = null;
 171         synchronized(this) {
 172             if (cleanup.isDone()) {
 173                 return;
 174             } else if (stopped) {
 175                 conn.close();
 176                 return;
 177             }
 178             if (MAX_POOL_SIZE > 0 && expiryList.size() >= MAX_POOL_SIZE) {
 179                 toClose = expiryList.removeOldest();
 180                 if (toClose != null) removeFromPool(toClose);
 181             }
 182             if (conn instanceof PlainHttpConnection) {
 183                 putConnection(conn, plainPool);
 184             } else {
 185                 assert conn.isSecure();
 186                 putConnection(conn, sslPool);
 187             }
 188             expiryList.add(conn, now, keepAlive);
 189         }
 190         if (toClose != null) {
 191             if (debug.on()) {
 192                 debug.log("Maximum pool size reached: removing oldest connection %s",
 193                           toClose.dbgString());
 194             }
 195             close(toClose);
 196         }
 197         //System.out.println("Return to pool: " + conn);
 198     }
 199 
 200     private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
 201         // Connect the connection flow to a pub/sub pair that will take the
 202         // connection out of the pool and close it if anything happens
 203         // while the connection is sitting in the pool.
 204         CleanupTrigger cleanup = new CleanupTrigger(conn);
 205         FlowTube flow = conn.getConnectionFlow();
 206         if (debug.on()) debug.log("registering %s", cleanup);
 207         flow.connectFlows(cleanup, cleanup);
 208         return cleanup;
 209     }
 210 
 211     private HttpConnection
 212     findConnection(CacheKey key,
 213                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 214         LinkedList<HttpConnection> l = pool.get(key);
 215         if (l == null || l.isEmpty()) {
 216             return null;
 217         } else {
 218             HttpConnection c = l.removeFirst();
 219             expiryList.remove(c);
 220             return c;
 221         }
 222     }
 223 
 224     /* called from cache cleaner only  */
 225     private boolean
 226     removeFromPool(HttpConnection c,
 227                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 228         //System.out.println("cacheCleaner removing: " + c);
 229         assert Thread.holdsLock(this);
 230         CacheKey k = c.cacheKey();
 231         List<HttpConnection> l = pool.get(k);
 232         if (l == null || l.isEmpty()) {
 233             pool.remove(k);
 234             return false;
 235         }
 236         return l.remove(c);
 237     }
 238 
 239     private void
 240     putConnection(HttpConnection c,
 241                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 242         CacheKey key = c.cacheKey();
 243         LinkedList<HttpConnection> l = pool.get(key);
 244         if (l == null) {
 245             l = new LinkedList<>();
 246             pool.put(key, l);
 247         }
 248         l.add(c);
 249     }
 250 
 251     /**
 252      * Purge expired connection and return the number of milliseconds
 253      * in which the next connection is scheduled to expire.
 254      * If no connections are scheduled to be purged return 0.
 255      * @return the delay in milliseconds in which the next connection will
 256      *         expire.
 257      */
 258     long purgeExpiredConnectionsAndReturnNextDeadline() {
 259         if (!expiryList.purgeMaybeRequired()) return 0;
 260         return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
 261     }
 262 
 263     // Used for whitebox testing
 264     long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
 265         long nextPurge = 0;
 266 
 267         // We may be in the process of adding new elements
 268         // to the expiry list - but those elements will not
 269         // have outlast their keep alive timer yet since we're
 270         // just adding them.
 271         if (!expiryList.purgeMaybeRequired()) return nextPurge;
 272 
 273         List<HttpConnection> closelist;
 274         synchronized (this) {
 275             closelist = expiryList.purgeUntil(now);
 276             for (HttpConnection c : closelist) {
 277                 if (c instanceof PlainHttpConnection) {
 278                     boolean wasPresent = removeFromPool(c, plainPool);
 279                     assert wasPresent;
 280                 } else {
 281                     boolean wasPresent = removeFromPool(c, sslPool);
 282                     assert wasPresent;
 283                 }
 284             }
 285             nextPurge = now.until(
 286                     expiryList.nextExpiryDeadline().orElse(now),
 287                     ChronoUnit.MILLIS);
 288         }
 289         closelist.forEach(this::close);
 290         return nextPurge;
 291     }
 292 
 293     private void close(HttpConnection c) {
 294         try {
 295             c.close();
 296         } catch (Throwable e) {} // ignore
 297     }
 298 
 299     void stop() {
 300         List<HttpConnection> closelist = Collections.emptyList();
 301         try {
 302             synchronized (this) {
 303                 stopped = true;
 304                 closelist = expiryList.stream()
 305                     .map(e -> e.connection)
 306                     .collect(Collectors.toList());
 307                 expiryList.clear();
 308                 plainPool.clear();
 309                 sslPool.clear();
 310             }
 311         } finally {
 312             closelist.forEach(this::close);
 313         }
 314     }
 315 
 316     static final class ExpiryEntry {
 317         final HttpConnection connection;
 318         final Instant expiry; // absolute time in seconds of expiry time
 319         ExpiryEntry(HttpConnection connection, Instant expiry) {
 320             this.connection = connection;
 321             this.expiry = expiry;
 322         }
 323     }
 324 
 325     /**
 326      * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
 327      * deadline is at the tail of the list, and the entry with the farther
 328      * deadline is at the head. In the most common situation, new elements
 329      * will need to be added at the head (or close to it), and expired elements
 330      * will need to be purged from the tail.
 331      */
 332     private static final class ExpiryList {
 333         private final LinkedList<ExpiryEntry> list = new LinkedList<>();
 334         private volatile boolean mayContainEntries;
 335 
 336         int size() { return list.size(); }
 337 
 338         // A loosely accurate boolean whose value is computed
 339         // at the end of each operation performed on ExpiryList;
 340         // Does not require synchronizing on the ConnectionPool.
 341         boolean purgeMaybeRequired() {
 342             return mayContainEntries;
 343         }
 344 
 345         // Returns the next expiry deadline
 346         // should only be called while holding a synchronization
 347         // lock on the ConnectionPool
 348         Optional<Instant> nextExpiryDeadline() {
 349             if (list.isEmpty()) return Optional.empty();
 350             else return Optional.of(list.getLast().expiry);
 351         }
 352 
 353         // should only be called while holding a synchronization
 354         // lock on the ConnectionPool
 355         HttpConnection removeOldest() {
 356             ExpiryEntry entry = list.pollLast();
 357             return entry == null ? null : entry.connection;
 358         }
 359 
 360         // should only be called while holding a synchronization
 361         // lock on the ConnectionPool
 362         void add(HttpConnection conn) {
 363             add(conn, Instant.now(), KEEP_ALIVE);
 364         }
 365 
 366         // Used by whitebox test.
 367         void add(HttpConnection conn, Instant now, long keepAlive) {
 368             Instant then = now.truncatedTo(ChronoUnit.SECONDS)
 369                     .plus(keepAlive, ChronoUnit.SECONDS);
 370 
 371             // Elements with the farther deadline are at the head of
 372             // the list. It's more likely that the new element will
 373             // have the farthest deadline, and will need to be inserted
 374             // at the head of the list, so we're using an ascending
 375             // list iterator to find the right insertion point.
 376             ListIterator<ExpiryEntry> li = list.listIterator();
 377             while (li.hasNext()) {
 378                 ExpiryEntry entry = li.next();
 379 
 380                 if (then.isAfter(entry.expiry)) {
 381                     li.previous();
 382                     // insert here
 383                     li.add(new ExpiryEntry(conn, then));
 384                     mayContainEntries = true;
 385                     return;
 386                 }
 387             }
 388             // last (or first) element of list (the last element is
 389             // the first when the list is empty)
 390             list.add(new ExpiryEntry(conn, then));
 391             mayContainEntries = true;
 392         }
 393 
 394         // should only be called while holding a synchronization
 395         // lock on the ConnectionPool
 396         void remove(HttpConnection c) {
 397             if (c == null || list.isEmpty()) return;
 398             ListIterator<ExpiryEntry> li = list.listIterator();
 399             while (li.hasNext()) {
 400                 ExpiryEntry e = li.next();
 401                 if (e.connection.equals(c)) {
 402                     li.remove();
 403                     mayContainEntries = !list.isEmpty();
 404                     return;
 405                 }
 406             }
 407         }
 408 
 409         // should only be called while holding a synchronization
 410         // lock on the ConnectionPool.
 411         // Purge all elements whose deadline is before now (now included).
 412         List<HttpConnection> purgeUntil(Instant now) {
 413             if (list.isEmpty()) return Collections.emptyList();
 414 
 415             List<HttpConnection> closelist = new ArrayList<>();
 416 
 417             // elements with the closest deadlines are at the tail
 418             // of the queue, so we're going to use a descending iterator
 419             // to remove them, and stop when we find the first element
 420             // that has not expired yet.
 421             Iterator<ExpiryEntry> li = list.descendingIterator();
 422             while (li.hasNext()) {
 423                 ExpiryEntry entry = li.next();
 424                 // use !isAfter instead of isBefore in order to
 425                 // remove the entry if its expiry == now
 426                 if (!entry.expiry.isAfter(now)) {
 427                     li.remove();
 428                     HttpConnection c = entry.connection;
 429                     closelist.add(c);
 430                 } else break; // the list is sorted
 431             }
 432             mayContainEntries = !list.isEmpty();
 433             return closelist;
 434         }
 435 
 436         // should only be called while holding a synchronization
 437         // lock on the ConnectionPool
 438         java.util.stream.Stream<ExpiryEntry> stream() {
 439             return list.stream();
 440         }
 441 
 442         // should only be called while holding a synchronization
 443         // lock on the ConnectionPool
 444         void clear() {
 445             list.clear();
 446             mayContainEntries = false;
 447         }
 448     }
 449 
 450     // Remove a connection from the pool.
 451     // should only be called while holding a synchronization
 452     // lock on the ConnectionPool
 453     private void removeFromPool(HttpConnection c) {
 454         assert Thread.holdsLock(this);
 455         if (c instanceof PlainHttpConnection) {
 456             removeFromPool(c, plainPool);
 457         } else {
 458             assert c.isSecure() : "connection " + c + " is not secure!";
 459             removeFromPool(c, sslPool);
 460         }
 461     }
 462 
 463     // Used by tests
 464     synchronized boolean contains(HttpConnection c) {
 465         final CacheKey key = c.cacheKey();
 466         List<HttpConnection> list;
 467         if ((list = plainPool.get(key)) != null) {
 468             if (list.contains(c)) return true;
 469         }
 470         if ((list = sslPool.get(key)) != null) {
 471             if (list.contains(c)) return true;
 472         }
 473         return false;
 474     }
 475 
 476     void cleanup(HttpConnection c, Throwable error) {
 477         if (debug.on())
 478             debug.log("%s : ConnectionPool.cleanup(%s)",
 479                     String.valueOf(c.getConnectionFlow()), error);
 480         synchronized(this) {
 481             removeFromPool(c);
 482             expiryList.remove(c);
 483         }
 484         c.close();
 485     }
 486 
 487     /**
 488      * An object that subscribes to the flow while the connection is in
 489      * the pool. Anything that comes in will cause the connection to be closed
 490      * and removed from the pool.
 491      */
 492     private final class CleanupTrigger implements
 493             FlowTube.TubeSubscriber, FlowTube.TubePublisher,
 494             Flow.Subscription {
 495 
 496         private final HttpConnection connection;
 497         private volatile boolean done;
 498 
 499         public CleanupTrigger(HttpConnection connection) {
 500             this.connection = connection;
 501         }
 502 
 503         public boolean isDone() { return done;}
 504 
 505         private void triggerCleanup(Throwable error) {
 506             done = true;
 507             cleanup(connection, error);
 508         }
 509 
 510         @Override public void request(long n) {}
 511         @Override public void cancel() {}
 512 
 513         @Override
 514         public void onSubscribe(Flow.Subscription subscription) {
 515             subscription.request(1);
 516         }
 517         @Override
 518         public void onError(Throwable error) { triggerCleanup(error); }
 519         @Override
 520         public void onComplete() { triggerCleanup(null); }
 521         @Override
 522         public void onNext(List<ByteBuffer> item) {
 523             triggerCleanup(new IOException("Data received while in pool"));
 524         }
 525 
 526         @Override
 527         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 528             subscriber.onSubscribe(this);
 529         }
 530 
 531         @Override
 532         public String toString() {
 533             return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
 534         }
 535     }
 536 }