1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.net.InetSocketAddress; 31 import java.nio.ByteBuffer; 32 import java.time.Instant; 33 import java.time.temporal.ChronoUnit; 34 import java.util.ArrayList; 35 import java.util.Collections; 36 import java.util.HashMap; 37 import java.util.Iterator; 38 import java.util.LinkedList; 39 import java.util.List; 40 import java.util.ListIterator; 41 import java.util.Objects; 42 import java.util.Optional; 43 import java.util.concurrent.Flow; 44 import java.util.stream.Collectors; 45 import jdk.internal.net.http.common.FlowTube; 46 import jdk.internal.net.http.common.Logger; 47 import jdk.internal.net.http.common.Utils; 48 49 /** 50 * Http 1.1 connection pool. 51 */ 52 final class ConnectionPool { 53 54 static final long KEEP_ALIVE = Utils.getIntegerNetProperty( 55 "jdk.httpclient.keepalive.timeout", 1200); // seconds 56 static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty( 57 "jdk.httpclient.connectionPoolSize", 0); // unbounded 58 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 59 60 // Pools of idle connections 61 62 private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool; 63 private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool; 64 private final ExpiryList expiryList; 65 private final String dbgTag; // used for debug 66 boolean stopped; 67 68 /** 69 * Entries in connection pool are keyed by destination address and/or 70 * proxy address: 71 * case 1: plain TCP not via proxy (destination only) 72 * case 2: plain TCP via proxy (proxy only) 73 * case 3: SSL not via proxy (destination only) 74 * case 4: SSL over tunnel (destination and proxy) 75 */ 76 static class CacheKey { 77 final InetSocketAddress proxy; 78 final InetSocketAddress destination; 79 80 CacheKey(InetSocketAddress destination, InetSocketAddress proxy) { 81 this.proxy = proxy; 82 this.destination = destination; 83 } 84 85 @Override 86 public boolean equals(Object obj) { 87 if (obj == null) { 88 return false; 89 } 90 if (getClass() != obj.getClass()) { 91 return false; 92 } 93 final CacheKey other = (CacheKey) obj; 94 if (!Objects.equals(this.proxy, other.proxy)) { 95 return false; 96 } 97 if (!Objects.equals(this.destination, other.destination)) { 98 return false; 99 } 100 return true; 101 } 102 103 @Override 104 public int hashCode() { 105 return Objects.hash(proxy, destination); 106 } 107 } 108 109 ConnectionPool(long clientId) { 110 this("ConnectionPool("+clientId+")"); 111 } 112 113 /** 114 * There should be one of these per HttpClient. 115 */ 116 private ConnectionPool(String tag) { 117 dbgTag = tag; 118 plainPool = new HashMap<>(); 119 sslPool = new HashMap<>(); 120 expiryList = new ExpiryList(); 121 } 122 123 final String dbgString() { 124 return dbgTag; 125 } 126 127 synchronized void start() { 128 assert !stopped : "Already stopped"; 129 } 130 131 static CacheKey cacheKey(InetSocketAddress destination, 132 InetSocketAddress proxy) 133 { 134 return new CacheKey(destination, proxy); 135 } 136 137 synchronized HttpConnection getConnection(boolean secure, 138 InetSocketAddress addr, 139 InetSocketAddress proxy) { 140 if (stopped) return null; 141 CacheKey key = new CacheKey(addr, proxy); 142 HttpConnection c = secure ? findConnection(key, sslPool) 143 : findConnection(key, plainPool); 144 //System.out.println ("getConnection returning: " + c); 145 return c; 146 } 147 148 /** 149 * Returns the connection to the pool. 150 */ 151 void returnToPool(HttpConnection conn) { 152 returnToPool(conn, Instant.now(), KEEP_ALIVE); 153 } 154 155 // Called also by whitebox tests 156 void returnToPool(HttpConnection conn, Instant now, long keepAlive) { 157 158 // Don't call registerCleanupTrigger while holding a lock, 159 // but register it before the connection is added to the pool, 160 // since we don't want to trigger the cleanup if the connection 161 // is not in the pool. 162 CleanupTrigger cleanup = registerCleanupTrigger(conn); 163 164 // it's possible that cleanup may have been called. 165 HttpConnection toClose = null; 166 synchronized(this) { 167 if (cleanup.isDone()) { 168 return; 169 } else if (stopped) { 170 conn.close(); 171 return; 172 } 173 if (MAX_POOL_SIZE > 0 && expiryList.size() >= MAX_POOL_SIZE) { 174 toClose = expiryList.removeOldest(); 175 if (toClose != null) removeFromPool(toClose); 176 } 177 if (conn instanceof PlainHttpConnection) { 178 putConnection(conn, plainPool); 179 } else { 180 assert conn.isSecure(); 181 putConnection(conn, sslPool); 182 } 183 expiryList.add(conn, now, keepAlive); 184 } 185 if (toClose != null) { 186 if (debug.on()) { 187 debug.log("Maximum pool size reached: removing oldest connection %s", 188 toClose.dbgString()); 189 } 190 close(toClose); 191 } 192 //System.out.println("Return to pool: " + conn); 193 } 194 195 private CleanupTrigger registerCleanupTrigger(HttpConnection conn) { 196 // Connect the connection flow to a pub/sub pair that will take the 197 // connection out of the pool and close it if anything happens 198 // while the connection is sitting in the pool. 199 CleanupTrigger cleanup = new CleanupTrigger(conn); 200 FlowTube flow = conn.getConnectionFlow(); 201 if (debug.on()) debug.log("registering %s", cleanup); 202 flow.connectFlows(cleanup, cleanup); 203 return cleanup; 204 } 205 206 private HttpConnection 207 findConnection(CacheKey key, 208 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 209 LinkedList<HttpConnection> l = pool.get(key); 210 if (l == null || l.isEmpty()) { 211 return null; 212 } else { 213 HttpConnection c = l.removeFirst(); 214 expiryList.remove(c); 215 return c; 216 } 217 } 218 219 /* called from cache cleaner only */ 220 private boolean 221 removeFromPool(HttpConnection c, 222 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 223 //System.out.println("cacheCleaner removing: " + c); 224 assert Thread.holdsLock(this); 225 CacheKey k = c.cacheKey(); 226 List<HttpConnection> l = pool.get(k); 227 if (l == null || l.isEmpty()) { 228 pool.remove(k); 229 return false; 230 } 231 return l.remove(c); 232 } 233 234 private void 235 putConnection(HttpConnection c, 236 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 237 CacheKey key = c.cacheKey(); 238 LinkedList<HttpConnection> l = pool.get(key); 239 if (l == null) { 240 l = new LinkedList<>(); 241 pool.put(key, l); 242 } 243 l.add(c); 244 } 245 246 /** 247 * Purge expired connection and return the number of milliseconds 248 * in which the next connection is scheduled to expire. 249 * If no connections are scheduled to be purged return 0. 250 * @return the delay in milliseconds in which the next connection will 251 * expire. 252 */ 253 long purgeExpiredConnectionsAndReturnNextDeadline() { 254 if (!expiryList.purgeMaybeRequired()) return 0; 255 return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now()); 256 } 257 258 // Used for whitebox testing 259 long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) { 260 long nextPurge = 0; 261 262 // We may be in the process of adding new elements 263 // to the expiry list - but those elements will not 264 // have outlast their keep alive timer yet since we're 265 // just adding them. 266 if (!expiryList.purgeMaybeRequired()) return nextPurge; 267 268 List<HttpConnection> closelist; 269 synchronized (this) { 270 closelist = expiryList.purgeUntil(now); 271 for (HttpConnection c : closelist) { 272 if (c instanceof PlainHttpConnection) { 273 boolean wasPresent = removeFromPool(c, plainPool); 274 assert wasPresent; 275 } else { 276 boolean wasPresent = removeFromPool(c, sslPool); 277 assert wasPresent; 278 } 279 } 280 nextPurge = now.until( 281 expiryList.nextExpiryDeadline().orElse(now), 282 ChronoUnit.MILLIS); 283 } 284 closelist.forEach(this::close); 285 return nextPurge; 286 } 287 288 private void close(HttpConnection c) { 289 try { 290 c.close(); 291 } catch (Throwable e) {} // ignore 292 } 293 294 void stop() { 295 List<HttpConnection> closelist = Collections.emptyList(); 296 try { 297 synchronized (this) { 298 stopped = true; 299 closelist = expiryList.stream() 300 .map(e -> e.connection) 301 .collect(Collectors.toList()); 302 expiryList.clear(); 303 plainPool.clear(); 304 sslPool.clear(); 305 } 306 } finally { 307 closelist.forEach(this::close); 308 } 309 } 310 311 static final class ExpiryEntry { 312 final HttpConnection connection; 313 final Instant expiry; // absolute time in seconds of expiry time 314 ExpiryEntry(HttpConnection connection, Instant expiry) { 315 this.connection = connection; 316 this.expiry = expiry; 317 } 318 } 319 320 /** 321 * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer 322 * deadline is at the tail of the list, and the entry with the farther 323 * deadline is at the head. In the most common situation, new elements 324 * will need to be added at the head (or close to it), and expired elements 325 * will need to be purged from the tail. 326 */ 327 private static final class ExpiryList { 328 private final LinkedList<ExpiryEntry> list = new LinkedList<>(); 329 private volatile boolean mayContainEntries; 330 331 int size() { return list.size(); } 332 333 // A loosely accurate boolean whose value is computed 334 // at the end of each operation performed on ExpiryList; 335 // Does not require synchronizing on the ConnectionPool. 336 boolean purgeMaybeRequired() { 337 return mayContainEntries; 338 } 339 340 // Returns the next expiry deadline 341 // should only be called while holding a synchronization 342 // lock on the ConnectionPool 343 Optional<Instant> nextExpiryDeadline() { 344 if (list.isEmpty()) return Optional.empty(); 345 else return Optional.of(list.getLast().expiry); 346 } 347 348 // should only be called while holding a synchronization 349 // lock on the ConnectionPool 350 HttpConnection removeOldest() { 351 ExpiryEntry entry = list.pollLast(); 352 return entry == null ? null : entry.connection; 353 } 354 355 // should only be called while holding a synchronization 356 // lock on the ConnectionPool 357 void add(HttpConnection conn) { 358 add(conn, Instant.now(), KEEP_ALIVE); 359 } 360 361 // Used by whitebox test. 362 void add(HttpConnection conn, Instant now, long keepAlive) { 363 Instant then = now.truncatedTo(ChronoUnit.SECONDS) 364 .plus(keepAlive, ChronoUnit.SECONDS); 365 366 // Elements with the farther deadline are at the head of 367 // the list. It's more likely that the new element will 368 // have the farthest deadline, and will need to be inserted 369 // at the head of the list, so we're using an ascending 370 // list iterator to find the right insertion point. 371 ListIterator<ExpiryEntry> li = list.listIterator(); 372 while (li.hasNext()) { 373 ExpiryEntry entry = li.next(); 374 375 if (then.isAfter(entry.expiry)) { 376 li.previous(); 377 // insert here 378 li.add(new ExpiryEntry(conn, then)); 379 mayContainEntries = true; 380 return; 381 } 382 } 383 // last (or first) element of list (the last element is 384 // the first when the list is empty) 385 list.add(new ExpiryEntry(conn, then)); 386 mayContainEntries = true; 387 } 388 389 // should only be called while holding a synchronization 390 // lock on the ConnectionPool 391 void remove(HttpConnection c) { 392 if (c == null || list.isEmpty()) return; 393 ListIterator<ExpiryEntry> li = list.listIterator(); 394 while (li.hasNext()) { 395 ExpiryEntry e = li.next(); 396 if (e.connection.equals(c)) { 397 li.remove(); 398 mayContainEntries = !list.isEmpty(); 399 return; 400 } 401 } 402 } 403 404 // should only be called while holding a synchronization 405 // lock on the ConnectionPool. 406 // Purge all elements whose deadline is before now (now included). 407 List<HttpConnection> purgeUntil(Instant now) { 408 if (list.isEmpty()) return Collections.emptyList(); 409 410 List<HttpConnection> closelist = new ArrayList<>(); 411 412 // elements with the closest deadlines are at the tail 413 // of the queue, so we're going to use a descending iterator 414 // to remove them, and stop when we find the first element 415 // that has not expired yet. 416 Iterator<ExpiryEntry> li = list.descendingIterator(); 417 while (li.hasNext()) { 418 ExpiryEntry entry = li.next(); 419 // use !isAfter instead of isBefore in order to 420 // remove the entry if its expiry == now 421 if (!entry.expiry.isAfter(now)) { 422 li.remove(); 423 HttpConnection c = entry.connection; 424 closelist.add(c); 425 } else break; // the list is sorted 426 } 427 mayContainEntries = !list.isEmpty(); 428 return closelist; 429 } 430 431 // should only be called while holding a synchronization 432 // lock on the ConnectionPool 433 java.util.stream.Stream<ExpiryEntry> stream() { 434 return list.stream(); 435 } 436 437 // should only be called while holding a synchronization 438 // lock on the ConnectionPool 439 void clear() { 440 list.clear(); 441 mayContainEntries = false; 442 } 443 } 444 445 // Remove a connection from the pool. 446 // should only be called while holding a synchronization 447 // lock on the ConnectionPool 448 private void removeFromPool(HttpConnection c) { 449 assert Thread.holdsLock(this); 450 if (c instanceof PlainHttpConnection) { 451 removeFromPool(c, plainPool); 452 } else { 453 assert c.isSecure(); 454 removeFromPool(c, sslPool); 455 } 456 } 457 458 // Used by tests 459 synchronized boolean contains(HttpConnection c) { 460 final CacheKey key = c.cacheKey(); 461 List<HttpConnection> list; 462 if ((list = plainPool.get(key)) != null) { 463 if (list.contains(c)) return true; 464 } 465 if ((list = sslPool.get(key)) != null) { 466 if (list.contains(c)) return true; 467 } 468 return false; 469 } 470 471 void cleanup(HttpConnection c, Throwable error) { 472 if (debug.on()) 473 debug.log("%s : ConnectionPool.cleanup(%s)", 474 String.valueOf(c.getConnectionFlow()), error); 475 synchronized(this) { 476 removeFromPool(c); 477 expiryList.remove(c); 478 } 479 c.close(); 480 } 481 482 /** 483 * An object that subscribes to the flow while the connection is in 484 * the pool. Anything that comes in will cause the connection to be closed 485 * and removed from the pool. 486 */ 487 private final class CleanupTrigger implements 488 FlowTube.TubeSubscriber, FlowTube.TubePublisher, 489 Flow.Subscription { 490 491 private final HttpConnection connection; 492 private volatile boolean done; 493 494 public CleanupTrigger(HttpConnection connection) { 495 this.connection = connection; 496 } 497 498 public boolean isDone() { return done;} 499 500 private void triggerCleanup(Throwable error) { 501 done = true; 502 cleanup(connection, error); 503 } 504 505 @Override public void request(long n) {} 506 @Override public void cancel() {} 507 508 @Override 509 public void onSubscribe(Flow.Subscription subscription) { 510 subscription.request(1); 511 } 512 @Override 513 public void onError(Throwable error) { triggerCleanup(error); } 514 @Override 515 public void onComplete() { triggerCleanup(null); } 516 @Override 517 public void onNext(List<ByteBuffer> item) { 518 triggerCleanup(new IOException("Data received while in pool")); 519 } 520 521 @Override 522 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 523 subscriber.onSubscribe(this); 524 } 525 526 @Override 527 public String toString() { 528 return "CleanupTrigger(" + connection.getConnectionFlow() + ")"; 529 } 530 } 531 }