1 /* 2 * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.net.InetSocketAddress; 31 import java.nio.ByteBuffer; 32 import java.time.Instant; 33 import java.time.temporal.ChronoUnit; 34 import java.util.ArrayList; 35 import java.util.Collections; 36 import java.util.HashMap; 37 import java.util.Iterator; 38 import java.util.LinkedList; 39 import java.util.List; 40 import java.util.ListIterator; 41 import java.util.Objects; 42 import java.util.Optional; 43 import java.util.concurrent.Flow; 44 import java.util.stream.Collectors; 45 import jdk.internal.net.http.common.FlowTube; 46 import jdk.internal.net.http.common.Logger; 47 import jdk.internal.net.http.common.Utils; 48 49 /** 50 * Http 1.1 connection pool. 51 */ 52 final class ConnectionPool { 53 54 static final long KEEP_ALIVE = Utils.getIntegerNetProperty( 55 "jdk.httpclient.keepalive.timeout", 1200); // seconds 56 static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty( 57 "jdk.httpclient.connectionPoolSize", 0); // unbounded 58 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 59 60 // Pools of idle connections 61 62 private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool; 63 private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool; 64 private final ExpiryList expiryList; 65 private final String dbgTag; // used for debug 66 boolean stopped; 67 68 /** 69 * Entries in connection pool are keyed by destination address and/or 70 * proxy address: 71 * case 1: plain TCP not via proxy (destination only) 72 * case 2: plain TCP via proxy (proxy only) 73 * case 3: SSL not via proxy (destination only) 74 * case 4: SSL over tunnel (destination and proxy) 75 */ 76 static class CacheKey { 77 final InetSocketAddress proxy; 78 final InetSocketAddress destination; 79 80 CacheKey(InetSocketAddress destination, InetSocketAddress proxy) { 81 this.proxy = proxy; 82 this.destination = destination; 83 } 84 85 @Override 86 public boolean equals(Object obj) { 87 if (obj == null) { 88 return false; 89 } 90 if (getClass() != obj.getClass()) { 91 return false; 92 } 93 final CacheKey other = (CacheKey) obj; 94 if (!Objects.equals(this.proxy, other.proxy)) { 95 return false; 96 } 97 if (!Objects.equals(this.destination, other.destination)) { 98 return false; 99 } 100 return true; 101 } 102 103 @Override 104 public int hashCode() { 105 return Objects.hash(proxy, destination); 106 } 107 } 108 109 ConnectionPool(long clientId) { 110 this("ConnectionPool("+clientId+")"); 111 } 112 113 /** 114 * There should be one of these per HttpClient. 115 */ 116 private ConnectionPool(String tag) { 117 dbgTag = tag; 118 plainPool = new HashMap<>(); 119 sslPool = new HashMap<>(); 120 expiryList = new ExpiryList(); 121 } 122 123 final String dbgString() { 124 return dbgTag; 125 } 126 127 synchronized void start() { 128 assert !stopped : "Already stopped"; 129 } 130 131 static CacheKey cacheKey(InetSocketAddress destination, 132 InetSocketAddress proxy) 133 { 134 return new CacheKey(destination, proxy); 135 } 136 137 synchronized HttpConnection getConnection(boolean secure, 138 InetSocketAddress addr, 139 InetSocketAddress proxy) { 140 if (stopped) return null; 141 CacheKey key = new CacheKey(addr, proxy); 142 HttpConnection c = secure ? findConnection(key, sslPool) 143 : findConnection(key, plainPool); 144 //System.out.println ("getConnection returning: " + c); 145 assert c == null || c.isSecure() == secure; 146 return c; 147 } 148 149 /** 150 * Returns the connection to the pool. 151 */ 152 void returnToPool(HttpConnection conn) { 153 returnToPool(conn, Instant.now(), KEEP_ALIVE); 154 } 155 156 // Called also by whitebox tests 157 void returnToPool(HttpConnection conn, Instant now, long keepAlive) { 158 159 assert (conn instanceof PlainHttpConnection) || conn.isSecure() 160 : "Attempting to return unsecure connection to SSL pool: " 161 + conn.getClass(); 162 163 // Don't call registerCleanupTrigger while holding a lock, 164 // but register it before the connection is added to the pool, 165 // since we don't want to trigger the cleanup if the connection 166 // is not in the pool. 167 CleanupTrigger cleanup = registerCleanupTrigger(conn); 168 169 // it's possible that cleanup may have been called. 170 HttpConnection toClose = null; 171 synchronized(this) { 172 if (cleanup.isDone()) { 173 return; 174 } else if (stopped) { 175 conn.close(); 176 return; 177 } 178 if (MAX_POOL_SIZE > 0 && expiryList.size() >= MAX_POOL_SIZE) { 179 toClose = expiryList.removeOldest(); 180 if (toClose != null) removeFromPool(toClose); 181 } 182 if (conn instanceof PlainHttpConnection) { 183 putConnection(conn, plainPool); 184 } else { 185 assert conn.isSecure(); 186 putConnection(conn, sslPool); 187 } 188 expiryList.add(conn, now, keepAlive); 189 } 190 if (toClose != null) { 191 if (debug.on()) { 192 debug.log("Maximum pool size reached: removing oldest connection %s", 193 toClose.dbgString()); 194 } 195 close(toClose); 196 } 197 //System.out.println("Return to pool: " + conn); 198 } 199 200 private CleanupTrigger registerCleanupTrigger(HttpConnection conn) { 201 // Connect the connection flow to a pub/sub pair that will take the 202 // connection out of the pool and close it if anything happens 203 // while the connection is sitting in the pool. 204 CleanupTrigger cleanup = new CleanupTrigger(conn); 205 FlowTube flow = conn.getConnectionFlow(); 206 if (debug.on()) debug.log("registering %s", cleanup); 207 flow.connectFlows(cleanup, cleanup); 208 return cleanup; 209 } 210 211 private HttpConnection 212 findConnection(CacheKey key, 213 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 214 LinkedList<HttpConnection> l = pool.get(key); 215 if (l == null || l.isEmpty()) { 216 return null; 217 } else { 218 HttpConnection c = l.removeFirst(); 219 expiryList.remove(c); 220 return c; 221 } 222 } 223 224 /* called from cache cleaner only */ 225 private boolean 226 removeFromPool(HttpConnection c, 227 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 228 //System.out.println("cacheCleaner removing: " + c); 229 assert Thread.holdsLock(this); 230 CacheKey k = c.cacheKey(); 231 List<HttpConnection> l = pool.get(k); 232 if (l == null || l.isEmpty()) { 233 pool.remove(k); 234 return false; 235 } 236 return l.remove(c); 237 } 238 239 private void 240 putConnection(HttpConnection c, 241 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 242 CacheKey key = c.cacheKey(); 243 LinkedList<HttpConnection> l = pool.get(key); 244 if (l == null) { 245 l = new LinkedList<>(); 246 pool.put(key, l); 247 } 248 l.add(c); 249 } 250 251 /** 252 * Purge expired connection and return the number of milliseconds 253 * in which the next connection is scheduled to expire. 254 * If no connections are scheduled to be purged return 0. 255 * @return the delay in milliseconds in which the next connection will 256 * expire. 257 */ 258 long purgeExpiredConnectionsAndReturnNextDeadline() { 259 if (!expiryList.purgeMaybeRequired()) return 0; 260 return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now()); 261 } 262 263 // Used for whitebox testing 264 long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) { 265 long nextPurge = 0; 266 267 // We may be in the process of adding new elements 268 // to the expiry list - but those elements will not 269 // have outlast their keep alive timer yet since we're 270 // just adding them. 271 if (!expiryList.purgeMaybeRequired()) return nextPurge; 272 273 List<HttpConnection> closelist; 274 synchronized (this) { 275 closelist = expiryList.purgeUntil(now); 276 for (HttpConnection c : closelist) { 277 if (c instanceof PlainHttpConnection) { 278 boolean wasPresent = removeFromPool(c, plainPool); 279 assert wasPresent; 280 } else { 281 boolean wasPresent = removeFromPool(c, sslPool); 282 assert wasPresent; 283 } 284 } 285 nextPurge = now.until( 286 expiryList.nextExpiryDeadline().orElse(now), 287 ChronoUnit.MILLIS); 288 } 289 closelist.forEach(this::close); 290 return nextPurge; 291 } 292 293 private void close(HttpConnection c) { 294 try { 295 c.close(); 296 } catch (Throwable e) {} // ignore 297 } 298 299 void stop() { 300 List<HttpConnection> closelist = Collections.emptyList(); 301 try { 302 synchronized (this) { 303 stopped = true; 304 closelist = expiryList.stream() 305 .map(e -> e.connection) 306 .collect(Collectors.toList()); 307 expiryList.clear(); 308 plainPool.clear(); 309 sslPool.clear(); 310 } 311 } finally { 312 closelist.forEach(this::close); 313 } 314 } 315 316 static final class ExpiryEntry { 317 final HttpConnection connection; 318 final Instant expiry; // absolute time in seconds of expiry time 319 ExpiryEntry(HttpConnection connection, Instant expiry) { 320 this.connection = connection; 321 this.expiry = expiry; 322 } 323 } 324 325 /** 326 * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer 327 * deadline is at the tail of the list, and the entry with the farther 328 * deadline is at the head. In the most common situation, new elements 329 * will need to be added at the head (or close to it), and expired elements 330 * will need to be purged from the tail. 331 */ 332 private static final class ExpiryList { 333 private final LinkedList<ExpiryEntry> list = new LinkedList<>(); 334 private volatile boolean mayContainEntries; 335 336 int size() { return list.size(); } 337 338 // A loosely accurate boolean whose value is computed 339 // at the end of each operation performed on ExpiryList; 340 // Does not require synchronizing on the ConnectionPool. 341 boolean purgeMaybeRequired() { 342 return mayContainEntries; 343 } 344 345 // Returns the next expiry deadline 346 // should only be called while holding a synchronization 347 // lock on the ConnectionPool 348 Optional<Instant> nextExpiryDeadline() { 349 if (list.isEmpty()) return Optional.empty(); 350 else return Optional.of(list.getLast().expiry); 351 } 352 353 // should only be called while holding a synchronization 354 // lock on the ConnectionPool 355 HttpConnection removeOldest() { 356 ExpiryEntry entry = list.pollLast(); 357 return entry == null ? null : entry.connection; 358 } 359 360 // should only be called while holding a synchronization 361 // lock on the ConnectionPool 362 void add(HttpConnection conn) { 363 add(conn, Instant.now(), KEEP_ALIVE); 364 } 365 366 // Used by whitebox test. 367 void add(HttpConnection conn, Instant now, long keepAlive) { 368 Instant then = now.truncatedTo(ChronoUnit.SECONDS) 369 .plus(keepAlive, ChronoUnit.SECONDS); 370 371 // Elements with the farther deadline are at the head of 372 // the list. It's more likely that the new element will 373 // have the farthest deadline, and will need to be inserted 374 // at the head of the list, so we're using an ascending 375 // list iterator to find the right insertion point. 376 ListIterator<ExpiryEntry> li = list.listIterator(); 377 while (li.hasNext()) { 378 ExpiryEntry entry = li.next(); 379 380 if (then.isAfter(entry.expiry)) { 381 li.previous(); 382 // insert here 383 li.add(new ExpiryEntry(conn, then)); 384 mayContainEntries = true; 385 return; 386 } 387 } 388 // last (or first) element of list (the last element is 389 // the first when the list is empty) 390 list.add(new ExpiryEntry(conn, then)); 391 mayContainEntries = true; 392 } 393 394 // should only be called while holding a synchronization 395 // lock on the ConnectionPool 396 void remove(HttpConnection c) { 397 if (c == null || list.isEmpty()) return; 398 ListIterator<ExpiryEntry> li = list.listIterator(); 399 while (li.hasNext()) { 400 ExpiryEntry e = li.next(); 401 if (e.connection.equals(c)) { 402 li.remove(); 403 mayContainEntries = !list.isEmpty(); 404 return; 405 } 406 } 407 } 408 409 // should only be called while holding a synchronization 410 // lock on the ConnectionPool. 411 // Purge all elements whose deadline is before now (now included). 412 List<HttpConnection> purgeUntil(Instant now) { 413 if (list.isEmpty()) return Collections.emptyList(); 414 415 List<HttpConnection> closelist = new ArrayList<>(); 416 417 // elements with the closest deadlines are at the tail 418 // of the queue, so we're going to use a descending iterator 419 // to remove them, and stop when we find the first element 420 // that has not expired yet. 421 Iterator<ExpiryEntry> li = list.descendingIterator(); 422 while (li.hasNext()) { 423 ExpiryEntry entry = li.next(); 424 // use !isAfter instead of isBefore in order to 425 // remove the entry if its expiry == now 426 if (!entry.expiry.isAfter(now)) { 427 li.remove(); 428 HttpConnection c = entry.connection; 429 closelist.add(c); 430 } else break; // the list is sorted 431 } 432 mayContainEntries = !list.isEmpty(); 433 return closelist; 434 } 435 436 // should only be called while holding a synchronization 437 // lock on the ConnectionPool 438 java.util.stream.Stream<ExpiryEntry> stream() { 439 return list.stream(); 440 } 441 442 // should only be called while holding a synchronization 443 // lock on the ConnectionPool 444 void clear() { 445 list.clear(); 446 mayContainEntries = false; 447 } 448 } 449 450 // Remove a connection from the pool. 451 // should only be called while holding a synchronization 452 // lock on the ConnectionPool 453 private void removeFromPool(HttpConnection c) { 454 assert Thread.holdsLock(this); 455 if (c instanceof PlainHttpConnection) { 456 removeFromPool(c, plainPool); 457 } else { 458 assert c.isSecure() : "connection " + c + " is not secure!"; 459 removeFromPool(c, sslPool); 460 } 461 } 462 463 // Used by tests 464 synchronized boolean contains(HttpConnection c) { 465 final CacheKey key = c.cacheKey(); 466 List<HttpConnection> list; 467 if ((list = plainPool.get(key)) != null) { 468 if (list.contains(c)) return true; 469 } 470 if ((list = sslPool.get(key)) != null) { 471 if (list.contains(c)) return true; 472 } 473 return false; 474 } 475 476 void cleanup(HttpConnection c, Throwable error) { 477 if (debug.on()) 478 debug.log("%s : ConnectionPool.cleanup(%s)", 479 String.valueOf(c.getConnectionFlow()), error); 480 synchronized(this) { 481 removeFromPool(c); 482 expiryList.remove(c); 483 } 484 c.close(); 485 } 486 487 /** 488 * An object that subscribes to the flow while the connection is in 489 * the pool. Anything that comes in will cause the connection to be closed 490 * and removed from the pool. 491 */ 492 private final class CleanupTrigger implements 493 FlowTube.TubeSubscriber, FlowTube.TubePublisher, 494 Flow.Subscription { 495 496 private final HttpConnection connection; 497 private volatile boolean done; 498 499 public CleanupTrigger(HttpConnection connection) { 500 this.connection = connection; 501 } 502 503 public boolean isDone() { return done;} 504 505 private void triggerCleanup(Throwable error) { 506 done = true; 507 cleanup(connection, error); 508 } 509 510 @Override public void request(long n) {} 511 @Override public void cancel() {} 512 513 @Override 514 public void onSubscribe(Flow.Subscription subscription) { 515 subscription.request(1); 516 } 517 @Override 518 public void onError(Throwable error) { triggerCleanup(error); } 519 @Override 520 public void onComplete() { triggerCleanup(null); } 521 @Override 522 public void onNext(List<ByteBuffer> item) { 523 triggerCleanup(new IOException("Data received while in pool")); 524 } 525 526 @Override 527 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 528 subscriber.onSubscribe(this); 529 } 530 531 @Override 532 public String toString() { 533 return "CleanupTrigger(" + connection.getConnectionFlow() + ")"; 534 } 535 } 536 }