1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.InetSocketAddress;
  31 import java.nio.ByteBuffer;
  32 import java.time.Instant;
  33 import java.time.temporal.ChronoUnit;
  34 import java.util.ArrayList;
  35 import java.util.Collections;
  36 import java.util.HashMap;
  37 import java.util.Iterator;
  38 import java.util.LinkedList;
  39 import java.util.List;
  40 import java.util.ListIterator;
  41 import java.util.Objects;
  42 import java.util.Optional;
  43 import java.util.concurrent.Flow;
  44 import java.util.stream.Collectors;
  45 import jdk.internal.net.http.common.FlowTube;
  46 import jdk.internal.net.http.common.Logger;
  47 import jdk.internal.net.http.common.Utils;
  48 
  49 /**
  50  * Http 1.1 connection pool.
  51  */
  52 final class ConnectionPool {
  53 
  54     static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
  55             "jdk.httpclient.keepalive.timeout", 1200); // seconds
  56     static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty(
  57             "jdk.httpclient.connectionPoolSize", 0); // unbounded
  58     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  59 
  60     // Pools of idle connections
  61 
  62     private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
  63     private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
  64     private final ExpiryList expiryList;
  65     private final String dbgTag; // used for debug
  66     boolean stopped;
  67 
  68     /**
  69      * Entries in connection pool are keyed by destination address and/or
  70      * proxy address:
  71      * case 1: plain TCP not via proxy (destination only)
  72      * case 2: plain TCP via proxy (proxy only)
  73      * case 3: SSL not via proxy (destination only)
  74      * case 4: SSL over tunnel (destination and proxy)
  75      */
  76     static class CacheKey {
  77         final InetSocketAddress proxy;
  78         final InetSocketAddress destination;
  79 
  80         CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
  81             this.proxy = proxy;
  82             this.destination = destination;
  83         }
  84 
  85         @Override
  86         public boolean equals(Object obj) {
  87             if (obj == null) {
  88                 return false;
  89             }
  90             if (getClass() != obj.getClass()) {
  91                 return false;
  92             }
  93             final CacheKey other = (CacheKey) obj;
  94             if (!Objects.equals(this.proxy, other.proxy)) {
  95                 return false;
  96             }
  97             if (!Objects.equals(this.destination, other.destination)) {
  98                 return false;
  99             }
 100             return true;
 101         }
 102 
 103         @Override
 104         public int hashCode() {
 105             return Objects.hash(proxy, destination);
 106         }
 107     }
 108 
 109     ConnectionPool(long clientId) {
 110         this("ConnectionPool("+clientId+")");
 111     }
 112 
 113     /**
 114      * There should be one of these per HttpClient.
 115      */
 116     private ConnectionPool(String tag) {
 117         dbgTag = tag;
 118         plainPool = new HashMap<>();
 119         sslPool = new HashMap<>();
 120         expiryList = new ExpiryList();
 121     }
 122 
 123     final String dbgString() {
 124         return dbgTag;
 125     }
 126 
 127     synchronized void start() {
 128         assert !stopped : "Already stopped";
 129     }
 130 
 131     static CacheKey cacheKey(InetSocketAddress destination,
 132                              InetSocketAddress proxy)
 133     {
 134         return new CacheKey(destination, proxy);
 135     }
 136 
 137     synchronized HttpConnection getConnection(boolean secure,
 138                                               InetSocketAddress addr,
 139                                               InetSocketAddress proxy) {
 140         if (stopped) return null;
 141         CacheKey key = new CacheKey(addr, proxy);
 142         HttpConnection c = secure ? findConnection(key, sslPool)
 143                                   : findConnection(key, plainPool);
 144         //System.out.println ("getConnection returning: " + c);
 145         return c;
 146     }
 147 
 148     /**
 149      * Returns the connection to the pool.
 150      */
 151     void returnToPool(HttpConnection conn) {
 152         returnToPool(conn, Instant.now(), KEEP_ALIVE);
 153     }
 154 
 155     // Called also by whitebox tests
 156     void returnToPool(HttpConnection conn, Instant now, long keepAlive) {
 157 
 158         // Don't call registerCleanupTrigger while holding a lock,
 159         // but register it before the connection is added to the pool,
 160         // since we don't want to trigger the cleanup if the connection
 161         // is not in the pool.
 162         CleanupTrigger cleanup = registerCleanupTrigger(conn);
 163 
 164         // it's possible that cleanup may have been called.
 165         HttpConnection toClose = null;
 166         synchronized(this) {
 167             if (cleanup.isDone()) {
 168                 return;
 169             } else if (stopped) {
 170                 conn.close();
 171                 return;
 172             }
 173             if (MAX_POOL_SIZE > 0 && expiryList.size() >= MAX_POOL_SIZE) {
 174                 toClose = expiryList.removeOldest();
 175                 if (toClose != null) removeFromPool(toClose);
 176             }
 177             if (conn instanceof PlainHttpConnection) {
 178                 putConnection(conn, plainPool);
 179             } else {
 180                 assert conn.isSecure();
 181                 putConnection(conn, sslPool);
 182             }
 183             expiryList.add(conn, now, keepAlive);
 184         }
 185         if (toClose != null) {
 186             if (debug.on()) {
 187                 debug.log("Maximum pool size reached: removing oldest connection %s",
 188                           toClose.dbgString());
 189             }
 190             close(toClose);
 191         }
 192         //System.out.println("Return to pool: " + conn);
 193     }
 194 
 195     private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
 196         // Connect the connection flow to a pub/sub pair that will take the
 197         // connection out of the pool and close it if anything happens
 198         // while the connection is sitting in the pool.
 199         CleanupTrigger cleanup = new CleanupTrigger(conn);
 200         FlowTube flow = conn.getConnectionFlow();
 201         if (debug.on()) debug.log("registering %s", cleanup);
 202         flow.connectFlows(cleanup, cleanup);
 203         return cleanup;
 204     }
 205 
 206     private HttpConnection
 207     findConnection(CacheKey key,
 208                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 209         LinkedList<HttpConnection> l = pool.get(key);
 210         if (l == null || l.isEmpty()) {
 211             return null;
 212         } else {
 213             HttpConnection c = l.removeFirst();
 214             expiryList.remove(c);
 215             return c;
 216         }
 217     }
 218 
 219     /* called from cache cleaner only  */
 220     private boolean
 221     removeFromPool(HttpConnection c,
 222                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 223         //System.out.println("cacheCleaner removing: " + c);
 224         assert Thread.holdsLock(this);
 225         CacheKey k = c.cacheKey();
 226         List<HttpConnection> l = pool.get(k);
 227         if (l == null || l.isEmpty()) {
 228             pool.remove(k);
 229             return false;
 230         }
 231         return l.remove(c);
 232     }
 233 
 234     private void
 235     putConnection(HttpConnection c,
 236                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 237         CacheKey key = c.cacheKey();
 238         LinkedList<HttpConnection> l = pool.get(key);
 239         if (l == null) {
 240             l = new LinkedList<>();
 241             pool.put(key, l);
 242         }
 243         l.add(c);
 244     }
 245 
 246     /**
 247      * Purge expired connection and return the number of milliseconds
 248      * in which the next connection is scheduled to expire.
 249      * If no connections are scheduled to be purged return 0.
 250      * @return the delay in milliseconds in which the next connection will
 251      *         expire.
 252      */
 253     long purgeExpiredConnectionsAndReturnNextDeadline() {
 254         if (!expiryList.purgeMaybeRequired()) return 0;
 255         return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
 256     }
 257 
 258     // Used for whitebox testing
 259     long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
 260         long nextPurge = 0;
 261 
 262         // We may be in the process of adding new elements
 263         // to the expiry list - but those elements will not
 264         // have outlast their keep alive timer yet since we're
 265         // just adding them.
 266         if (!expiryList.purgeMaybeRequired()) return nextPurge;
 267 
 268         List<HttpConnection> closelist;
 269         synchronized (this) {
 270             closelist = expiryList.purgeUntil(now);
 271             for (HttpConnection c : closelist) {
 272                 if (c instanceof PlainHttpConnection) {
 273                     boolean wasPresent = removeFromPool(c, plainPool);
 274                     assert wasPresent;
 275                 } else {
 276                     boolean wasPresent = removeFromPool(c, sslPool);
 277                     assert wasPresent;
 278                 }
 279             }
 280             nextPurge = now.until(
 281                     expiryList.nextExpiryDeadline().orElse(now),
 282                     ChronoUnit.MILLIS);
 283         }
 284         closelist.forEach(this::close);
 285         return nextPurge;
 286     }
 287 
 288     private void close(HttpConnection c) {
 289         try {
 290             c.close();
 291         } catch (Throwable e) {} // ignore
 292     }
 293 
 294     void stop() {
 295         List<HttpConnection> closelist = Collections.emptyList();
 296         try {
 297             synchronized (this) {
 298                 stopped = true;
 299                 closelist = expiryList.stream()
 300                     .map(e -> e.connection)
 301                     .collect(Collectors.toList());
 302                 expiryList.clear();
 303                 plainPool.clear();
 304                 sslPool.clear();
 305             }
 306         } finally {
 307             closelist.forEach(this::close);
 308         }
 309     }
 310 
 311     static final class ExpiryEntry {
 312         final HttpConnection connection;
 313         final Instant expiry; // absolute time in seconds of expiry time
 314         ExpiryEntry(HttpConnection connection, Instant expiry) {
 315             this.connection = connection;
 316             this.expiry = expiry;
 317         }
 318     }
 319 
 320     /**
 321      * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
 322      * deadline is at the tail of the list, and the entry with the farther
 323      * deadline is at the head. In the most common situation, new elements
 324      * will need to be added at the head (or close to it), and expired elements
 325      * will need to be purged from the tail.
 326      */
 327     private static final class ExpiryList {
 328         private final LinkedList<ExpiryEntry> list = new LinkedList<>();
 329         private volatile boolean mayContainEntries;
 330 
 331         int size() { return list.size(); }
 332 
 333         // A loosely accurate boolean whose value is computed
 334         // at the end of each operation performed on ExpiryList;
 335         // Does not require synchronizing on the ConnectionPool.
 336         boolean purgeMaybeRequired() {
 337             return mayContainEntries;
 338         }
 339 
 340         // Returns the next expiry deadline
 341         // should only be called while holding a synchronization
 342         // lock on the ConnectionPool
 343         Optional<Instant> nextExpiryDeadline() {
 344             if (list.isEmpty()) return Optional.empty();
 345             else return Optional.of(list.getLast().expiry);
 346         }
 347 
 348         // should only be called while holding a synchronization
 349         // lock on the ConnectionPool
 350         HttpConnection removeOldest() {
 351             ExpiryEntry entry = list.pollLast();
 352             return entry == null ? null : entry.connection;
 353         }
 354 
 355         // should only be called while holding a synchronization
 356         // lock on the ConnectionPool
 357         void add(HttpConnection conn) {
 358             add(conn, Instant.now(), KEEP_ALIVE);
 359         }
 360 
 361         // Used by whitebox test.
 362         void add(HttpConnection conn, Instant now, long keepAlive) {
 363             Instant then = now.truncatedTo(ChronoUnit.SECONDS)
 364                     .plus(keepAlive, ChronoUnit.SECONDS);
 365 
 366             // Elements with the farther deadline are at the head of
 367             // the list. It's more likely that the new element will
 368             // have the farthest deadline, and will need to be inserted
 369             // at the head of the list, so we're using an ascending
 370             // list iterator to find the right insertion point.
 371             ListIterator<ExpiryEntry> li = list.listIterator();
 372             while (li.hasNext()) {
 373                 ExpiryEntry entry = li.next();
 374 
 375                 if (then.isAfter(entry.expiry)) {
 376                     li.previous();
 377                     // insert here
 378                     li.add(new ExpiryEntry(conn, then));
 379                     mayContainEntries = true;
 380                     return;
 381                 }
 382             }
 383             // last (or first) element of list (the last element is
 384             // the first when the list is empty)
 385             list.add(new ExpiryEntry(conn, then));
 386             mayContainEntries = true;
 387         }
 388 
 389         // should only be called while holding a synchronization
 390         // lock on the ConnectionPool
 391         void remove(HttpConnection c) {
 392             if (c == null || list.isEmpty()) return;
 393             ListIterator<ExpiryEntry> li = list.listIterator();
 394             while (li.hasNext()) {
 395                 ExpiryEntry e = li.next();
 396                 if (e.connection.equals(c)) {
 397                     li.remove();
 398                     mayContainEntries = !list.isEmpty();
 399                     return;
 400                 }
 401             }
 402         }
 403 
 404         // should only be called while holding a synchronization
 405         // lock on the ConnectionPool.
 406         // Purge all elements whose deadline is before now (now included).
 407         List<HttpConnection> purgeUntil(Instant now) {
 408             if (list.isEmpty()) return Collections.emptyList();
 409 
 410             List<HttpConnection> closelist = new ArrayList<>();
 411 
 412             // elements with the closest deadlines are at the tail
 413             // of the queue, so we're going to use a descending iterator
 414             // to remove them, and stop when we find the first element
 415             // that has not expired yet.
 416             Iterator<ExpiryEntry> li = list.descendingIterator();
 417             while (li.hasNext()) {
 418                 ExpiryEntry entry = li.next();
 419                 // use !isAfter instead of isBefore in order to
 420                 // remove the entry if its expiry == now
 421                 if (!entry.expiry.isAfter(now)) {
 422                     li.remove();
 423                     HttpConnection c = entry.connection;
 424                     closelist.add(c);
 425                 } else break; // the list is sorted
 426             }
 427             mayContainEntries = !list.isEmpty();
 428             return closelist;
 429         }
 430 
 431         // should only be called while holding a synchronization
 432         // lock on the ConnectionPool
 433         java.util.stream.Stream<ExpiryEntry> stream() {
 434             return list.stream();
 435         }
 436 
 437         // should only be called while holding a synchronization
 438         // lock on the ConnectionPool
 439         void clear() {
 440             list.clear();
 441             mayContainEntries = false;
 442         }
 443     }
 444 
 445     // Remove a connection from the pool.
 446     // should only be called while holding a synchronization
 447     // lock on the ConnectionPool
 448     private void removeFromPool(HttpConnection c) {
 449         assert Thread.holdsLock(this);
 450         if (c instanceof PlainHttpConnection) {
 451             removeFromPool(c, plainPool);
 452         } else {
 453             assert c.isSecure();
 454             removeFromPool(c, sslPool);
 455         }
 456     }
 457 
 458     // Used by tests
 459     synchronized boolean contains(HttpConnection c) {
 460         final CacheKey key = c.cacheKey();
 461         List<HttpConnection> list;
 462         if ((list = plainPool.get(key)) != null) {
 463             if (list.contains(c)) return true;
 464         }
 465         if ((list = sslPool.get(key)) != null) {
 466             if (list.contains(c)) return true;
 467         }
 468         return false;
 469     }
 470 
 471     void cleanup(HttpConnection c, Throwable error) {
 472         if (debug.on())
 473             debug.log("%s : ConnectionPool.cleanup(%s)",
 474                     String.valueOf(c.getConnectionFlow()), error);
 475         synchronized(this) {
 476             removeFromPool(c);
 477             expiryList.remove(c);
 478         }
 479         c.close();
 480     }
 481 
 482     /**
 483      * An object that subscribes to the flow while the connection is in
 484      * the pool. Anything that comes in will cause the connection to be closed
 485      * and removed from the pool.
 486      */
 487     private final class CleanupTrigger implements
 488             FlowTube.TubeSubscriber, FlowTube.TubePublisher,
 489             Flow.Subscription {
 490 
 491         private final HttpConnection connection;
 492         private volatile boolean done;
 493 
 494         public CleanupTrigger(HttpConnection connection) {
 495             this.connection = connection;
 496         }
 497 
 498         public boolean isDone() { return done;}
 499 
 500         private void triggerCleanup(Throwable error) {
 501             done = true;
 502             cleanup(connection, error);
 503         }
 504 
 505         @Override public void request(long n) {}
 506         @Override public void cancel() {}
 507 
 508         @Override
 509         public void onSubscribe(Flow.Subscription subscription) {
 510             subscription.request(1);
 511         }
 512         @Override
 513         public void onError(Throwable error) { triggerCleanup(error); }
 514         @Override
 515         public void onComplete() { triggerCleanup(null); }
 516         @Override
 517         public void onNext(List<ByteBuffer> item) {
 518             triggerCleanup(new IOException("Data received while in pool"));
 519         }
 520 
 521         @Override
 522         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 523             subscriber.onSubscribe(this);
 524         }
 525 
 526         @Override
 527         public String toString() {
 528             return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
 529         }
 530     }
 531 }