1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.lang.ref.WeakReference; 29 import java.net.InetSocketAddress; 30 import java.util.HashMap; 31 import java.util.LinkedList; 32 import java.util.ListIterator; 33 import java.util.Objects; 34 import java.util.concurrent.atomic.AtomicLong; 35 import java.util.concurrent.atomic.AtomicReference; 36 import jdk.incubator.http.internal.common.Utils; 37 38 /** 39 * Http 1.1 connection pool. 40 */ 41 final class ConnectionPool { 42 43 // These counters are used to distribute ids for debugging 44 // The ACTIVE_CLEANER_COUNTER will tell how many CacheCleaner 45 // are active at a given time. It will increase when a new 46 // CacheCleaner is started and decrease when it exits. 47 static final AtomicLong ACTIVE_CLEANER_COUNTER = new AtomicLong(); 48 // The POOL_IDS_COUNTER increases each time a new ConnectionPool 49 // is created. It may wrap and become negative but will never be 50 // decremented. 51 static final AtomicLong POOL_IDS_COUNTER = new AtomicLong(); 52 // The cleanerCounter is used to name cleaner threads within a 53 // a connection pool, and increments monotically. 54 // It may wrap and become negative but will never be 55 // decremented. 56 final AtomicLong cleanerCounter = new AtomicLong(); 57 58 static final long KEEP_ALIVE = Utils.getIntegerNetProperty( 59 "jdk.httpclient.keepalive.timeout", 1200); // seconds 60 61 // Pools of idle connections 62 63 final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool; 64 final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool; 65 // A monotically increasing id for this connection pool. 66 // It may be negative (that's OK) 67 // Mostly used for debugging purposes when looking at thread dumps. 68 // Global scope. 69 final long poolID = POOL_IDS_COUNTER.incrementAndGet(); 70 final AtomicReference<CacheCleaner> cleanerRef; 71 72 /** 73 * Entries in connection pool are keyed by destination address and/or 74 * proxy address: 75 * case 1: plain TCP not via proxy (destination only) 76 * case 2: plain TCP via proxy (proxy only) 77 * case 3: SSL not via proxy (destination only) 78 * case 4: SSL over tunnel (destination and proxy) 79 */ 80 static class CacheKey { 81 final InetSocketAddress proxy; 82 final InetSocketAddress destination; 83 84 CacheKey(InetSocketAddress destination, InetSocketAddress proxy) { 85 this.proxy = proxy; 86 this.destination = destination; 87 } 88 89 @Override 90 public boolean equals(Object obj) { 93 } 94 if (getClass() != obj.getClass()) { 95 return false; 96 } 97 final CacheKey other = (CacheKey) obj; 98 if (!Objects.equals(this.proxy, other.proxy)) { 99 return false; 100 } 101 if (!Objects.equals(this.destination, other.destination)) { 102 return false; 103 } 104 return true; 105 } 106 107 @Override 108 public int hashCode() { 109 return Objects.hash(proxy, destination); 110 } 111 } 112 113 static class ExpiryEntry { 114 final HttpConnection connection; 115 final long expiry; // absolute time in seconds of expiry time 116 ExpiryEntry(HttpConnection connection, long expiry) { 117 this.connection = connection; 118 this.expiry = expiry; 119 } 120 } 121 122 final LinkedList<ExpiryEntry> expiryList; 123 124 /** 125 * There should be one of these per HttpClient. 126 */ 127 ConnectionPool() { 128 plainPool = new HashMap<>(); 129 sslPool = new HashMap<>(); 130 expiryList = new LinkedList<>(); 131 cleanerRef = new AtomicReference<>(); 132 } 133 134 void start() { 135 } 136 137 static CacheKey cacheKey(InetSocketAddress destination, 138 InetSocketAddress proxy) 139 { 140 return new CacheKey(destination, proxy); 141 } 142 143 synchronized HttpConnection getConnection(boolean secure, 144 InetSocketAddress addr, 145 InetSocketAddress proxy) { 146 CacheKey key = new CacheKey(addr, proxy); 147 HttpConnection c = secure ? findConnection(key, sslPool) 148 : findConnection(key, plainPool); 149 //System.out.println ("getConnection returning: " + c); 150 return c; 151 } 152 153 /** 154 * Returns the connection to the pool. 155 */ 156 synchronized void returnToPool(HttpConnection conn) { 157 if (conn instanceof PlainHttpConnection) { 158 putConnection(conn, plainPool); 159 } else { 160 putConnection(conn, sslPool); 161 } 162 addToExpiryList(conn); 163 //System.out.println("Return to pool: " + conn); 164 } 165 166 private HttpConnection 167 findConnection(CacheKey key, 168 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 169 LinkedList<HttpConnection> l = pool.get(key); 170 if (l == null || l.isEmpty()) { 171 return null; 172 } else { 173 HttpConnection c = l.removeFirst(); 174 removeFromExpiryList(c); 175 return c; 176 } 177 } 178 179 /* called from cache cleaner only */ 180 private void 181 removeFromPool(HttpConnection c, 182 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 183 //System.out.println("cacheCleaner removing: " + c); 184 LinkedList<HttpConnection> l = pool.get(c.cacheKey()); 185 assert l != null; 186 boolean wasPresent = l.remove(c); 187 assert wasPresent; 188 } 189 190 private void 191 putConnection(HttpConnection c, 192 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 193 CacheKey key = c.cacheKey(); 194 LinkedList<HttpConnection> l = pool.get(key); 195 if (l == null) { 196 l = new LinkedList<>(); 197 pool.put(key, l); 198 } 199 l.add(c); 200 } 201 202 static String makeCleanerName(long poolId, long cleanerId) { 203 return "HTTP-Cache-cleaner-" + poolId + "-" + cleanerId; 204 } 205 206 // only runs while entries exist in cache 207 final static class CacheCleaner extends Thread { 208 209 volatile boolean stopping; 210 // A monotically increasing id. May wrap and become negative (that's OK) 211 // Mostly used for debugging purposes when looking at thread dumps. 212 // Scoped per connection pool. 213 final long cleanerID; 214 // A reference to the owning ConnectionPool. 215 // This reference's referent may become null if the HttpClientImpl 216 // that owns this pool is GC'ed. 217 final WeakReference<ConnectionPool> ownerRef; 218 219 CacheCleaner(ConnectionPool owner) { 220 this(owner, owner.cleanerCounter.incrementAndGet()); 221 } 222 223 CacheCleaner(ConnectionPool owner, long cleanerID) { 224 super(null, null, makeCleanerName(owner.poolID, cleanerID), 0, false); 225 this.cleanerID = cleanerID; 226 this.ownerRef = new WeakReference<>(owner); 227 setDaemon(true); 228 } 229 230 synchronized boolean stopping() { 231 return stopping || ownerRef.get() == null; 232 } 233 234 synchronized void stopCleaner() { 235 stopping = true; 236 } 237 238 @Override 239 public void run() { 240 ACTIVE_CLEANER_COUNTER.incrementAndGet(); 241 try { 242 while (!stopping()) { 243 try { 244 Thread.sleep(3000); 245 } catch (InterruptedException e) {} 246 ConnectionPool owner = ownerRef.get(); 247 if (owner == null) return; 248 owner.cleanCache(this); 249 owner = null; 250 } 251 } finally { 252 ACTIVE_CLEANER_COUNTER.decrementAndGet(); 253 } 254 } 255 } 256 257 synchronized void removeFromExpiryList(HttpConnection c) { 258 if (c == null) { 259 return; 260 } 261 ListIterator<ExpiryEntry> li = expiryList.listIterator(); 262 while (li.hasNext()) { 263 ExpiryEntry e = li.next(); 264 if (e.connection.equals(c)) { 265 li.remove(); 266 return; 267 } 268 } 269 CacheCleaner cleaner = this.cleanerRef.get(); 270 if (expiryList.isEmpty() && cleaner != null) { 271 this.cleanerRef.compareAndSet(cleaner, null); 272 cleaner.stopCleaner(); 273 cleaner.interrupt(); 274 } 275 } 276 277 private void cleanCache(CacheCleaner cleaner) { 278 long now = System.currentTimeMillis() / 1000; 279 LinkedList<HttpConnection> closelist = new LinkedList<>(); 280 281 synchronized (this) { 282 ListIterator<ExpiryEntry> li = expiryList.listIterator(); 283 while (li.hasNext()) { 284 ExpiryEntry entry = li.next(); 285 if (entry.expiry <= now) { 286 li.remove(); 287 HttpConnection c = entry.connection; 288 closelist.add(c); 289 if (c instanceof PlainHttpConnection) { 290 removeFromPool(c, plainPool); 291 } else { 292 removeFromPool(c, sslPool); 293 } 294 } 295 } 296 if (expiryList.isEmpty() && cleaner != null) { 297 this.cleanerRef.compareAndSet(cleaner, null); 298 cleaner.stopCleaner(); 299 } 300 } 301 for (HttpConnection c : closelist) { 302 //System.out.println ("KAC: closing " + c); 303 c.close(); 304 } 305 } 306 307 private synchronized void addToExpiryList(HttpConnection conn) { 308 long now = System.currentTimeMillis() / 1000; 309 long then = now + KEEP_ALIVE; 310 if (expiryList.isEmpty()) { 311 CacheCleaner cleaner = new CacheCleaner(this); 312 if (this.cleanerRef.compareAndSet(null, cleaner)) { 313 cleaner.start(); 314 } 315 expiryList.add(new ExpiryEntry(conn, then)); 316 return; 317 } 318 319 ListIterator<ExpiryEntry> li = expiryList.listIterator(); 320 while (li.hasNext()) { 321 ExpiryEntry entry = li.next(); 322 323 if (then > entry.expiry) { 324 li.previous(); 325 // insert here 326 li.add(new ExpiryEntry(conn, then)); 327 return; 328 } 329 } 330 // first element of list 331 expiryList.add(new ExpiryEntry(conn, then)); 332 } 333 } | 1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.net.InetSocketAddress; 31 import java.nio.ByteBuffer; 32 import java.time.Instant; 33 import java.time.temporal.ChronoUnit; 34 import java.util.ArrayList; 35 import java.util.Collections; 36 import java.util.HashMap; 37 import java.util.Iterator; 38 import java.util.LinkedList; 39 import java.util.List; 40 import java.util.ListIterator; 41 import java.util.Objects; 42 import java.util.Optional; 43 import java.util.concurrent.Flow; 44 import java.util.stream.Collectors; 45 import jdk.incubator.http.internal.common.FlowTube; 46 import jdk.incubator.http.internal.common.Utils; 47 48 /** 49 * Http 1.1 connection pool. 50 */ 51 final class ConnectionPool { 52 53 static final long KEEP_ALIVE = Utils.getIntegerNetProperty( 54 "jdk.httpclient.keepalive.timeout", 1200); // seconds 55 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 56 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 57 58 // Pools of idle connections 59 60 private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool; 61 private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool; 62 private final ExpiryList expiryList; 63 private final String dbgTag; // used for debug 64 boolean stopped; 65 66 /** 67 * Entries in connection pool are keyed by destination address and/or 68 * proxy address: 69 * case 1: plain TCP not via proxy (destination only) 70 * case 2: plain TCP via proxy (proxy only) 71 * case 3: SSL not via proxy (destination only) 72 * case 4: SSL over tunnel (destination and proxy) 73 */ 74 static class CacheKey { 75 final InetSocketAddress proxy; 76 final InetSocketAddress destination; 77 78 CacheKey(InetSocketAddress destination, InetSocketAddress proxy) { 79 this.proxy = proxy; 80 this.destination = destination; 81 } 82 83 @Override 84 public boolean equals(Object obj) { 87 } 88 if (getClass() != obj.getClass()) { 89 return false; 90 } 91 final CacheKey other = (CacheKey) obj; 92 if (!Objects.equals(this.proxy, other.proxy)) { 93 return false; 94 } 95 if (!Objects.equals(this.destination, other.destination)) { 96 return false; 97 } 98 return true; 99 } 100 101 @Override 102 public int hashCode() { 103 return Objects.hash(proxy, destination); 104 } 105 } 106 107 ConnectionPool(long clientId) { 108 this("ConnectionPool("+clientId+")"); 109 } 110 111 /** 112 * There should be one of these per HttpClient. 113 */ 114 private ConnectionPool(String tag) { 115 dbgTag = tag; 116 plainPool = new HashMap<>(); 117 sslPool = new HashMap<>(); 118 expiryList = new ExpiryList(); 119 } 120 121 final String dbgString() { 122 return dbgTag; 123 } 124 125 void start() { 126 assert !stopped : "Already stopped"; 127 } 128 129 static CacheKey cacheKey(InetSocketAddress destination, 130 InetSocketAddress proxy) 131 { 132 return new CacheKey(destination, proxy); 133 } 134 135 synchronized HttpConnection getConnection(boolean secure, 136 InetSocketAddress addr, 137 InetSocketAddress proxy) { 138 if (stopped) return null; 139 CacheKey key = new CacheKey(addr, proxy); 140 HttpConnection c = secure ? findConnection(key, sslPool) 141 : findConnection(key, plainPool); 142 //System.out.println ("getConnection returning: " + c); 143 return c; 144 } 145 146 /** 147 * Returns the connection to the pool. 148 */ 149 void returnToPool(HttpConnection conn) { 150 returnToPool(conn, Instant.now(), KEEP_ALIVE); 151 } 152 153 // Called also by whitebox tests 154 void returnToPool(HttpConnection conn, Instant now, long keepAlive) { 155 156 // Don't call registerCleanupTrigger while holding a lock, 157 // but register it before the connection is added to the pool, 158 // since we don't want to trigger the cleanup if the connection 159 // is not in the pool. 160 CleanupTrigger cleanup = registerCleanupTrigger(conn); 161 162 // it's possible that cleanup may have been called. 163 synchronized(this) { 164 if (cleanup.isDone()) { 165 return; 166 } else if (stopped) { 167 conn.close(); 168 return; 169 } 170 if (conn instanceof PlainHttpConnection) { 171 putConnection(conn, plainPool); 172 } else { 173 assert conn.isSecure(); 174 putConnection(conn, sslPool); 175 } 176 expiryList.add(conn, now, keepAlive); 177 } 178 //System.out.println("Return to pool: " + conn); 179 } 180 181 private CleanupTrigger registerCleanupTrigger(HttpConnection conn) { 182 // Connect the connection flow to a pub/sub pair that will take the 183 // connection out of the pool and close it if anything happens 184 // while the connection is sitting in the pool. 185 CleanupTrigger cleanup = new CleanupTrigger(conn); 186 FlowTube flow = conn.getConnectionFlow(); 187 debug.log(Level.DEBUG, "registering %s", cleanup); 188 flow.connectFlows(cleanup, cleanup); 189 return cleanup; 190 } 191 192 private HttpConnection 193 findConnection(CacheKey key, 194 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 195 LinkedList<HttpConnection> l = pool.get(key); 196 if (l == null || l.isEmpty()) { 197 return null; 198 } else { 199 HttpConnection c = l.removeFirst(); 200 expiryList.remove(c); 201 return c; 202 } 203 } 204 205 /* called from cache cleaner only */ 206 private boolean 207 removeFromPool(HttpConnection c, 208 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 209 //System.out.println("cacheCleaner removing: " + c); 210 assert Thread.holdsLock(this); 211 CacheKey k = c.cacheKey(); 212 List<HttpConnection> l = pool.get(k); 213 if (l == null || l.isEmpty()) { 214 pool.remove(k); 215 return false; 216 } 217 return l.remove(c); 218 } 219 220 private void 221 putConnection(HttpConnection c, 222 HashMap<CacheKey,LinkedList<HttpConnection>> pool) { 223 CacheKey key = c.cacheKey(); 224 LinkedList<HttpConnection> l = pool.get(key); 225 if (l == null) { 226 l = new LinkedList<>(); 227 pool.put(key, l); 228 } 229 l.add(c); 230 } 231 232 /** 233 * Purge expired connection and return the number of milliseconds 234 * in which the next connection is scheduled to expire. 235 * If no connections are scheduled to be purged return 0. 236 * @return the delay in milliseconds in which the next connection will 237 * expire. 238 */ 239 long purgeExpiredConnectionsAndReturnNextDeadline() { 240 if (!expiryList.purgeMaybeRequired()) return 0; 241 return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now()); 242 } 243 244 // Used for whitebox testing 245 long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) { 246 long nextPurge = 0; 247 248 // We may be in the process of adding new elements 249 // to the expiry list - but those elements will not 250 // have outlast their keep alive timer yet since we're 251 // just adding them. 252 if (!expiryList.purgeMaybeRequired()) return nextPurge; 253 254 List<HttpConnection> closelist; 255 synchronized (this) { 256 closelist = expiryList.purgeUntil(now); 257 for (HttpConnection c : closelist) { 258 if (c instanceof PlainHttpConnection) { 259 boolean wasPresent = removeFromPool(c, plainPool); 260 assert wasPresent; 261 } else { 262 boolean wasPresent = removeFromPool(c, sslPool); 263 assert wasPresent; 264 } 265 } 266 nextPurge = now.until( 267 expiryList.nextExpiryDeadline().orElse(now), 268 ChronoUnit.MILLIS); 269 } 270 closelist.forEach(this::close); 271 return nextPurge; 272 } 273 274 private void close(HttpConnection c) { 275 try { 276 c.close(); 277 } catch (Throwable e) {} // ignore 278 } 279 280 void stop() { 281 List<HttpConnection> closelist = Collections.emptyList(); 282 try { 283 synchronized (this) { 284 stopped = true; 285 closelist = expiryList.stream() 286 .map(e -> e.connection) 287 .collect(Collectors.toList()); 288 expiryList.clear(); 289 plainPool.clear(); 290 sslPool.clear(); 291 } 292 } finally { 293 closelist.forEach(this::close); 294 } 295 } 296 297 static final class ExpiryEntry { 298 final HttpConnection connection; 299 final Instant expiry; // absolute time in seconds of expiry time 300 ExpiryEntry(HttpConnection connection, Instant expiry) { 301 this.connection = connection; 302 this.expiry = expiry; 303 } 304 } 305 306 /** 307 * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer 308 * deadline is at the tail of the list, and the entry with the farther 309 * deadline is at the head. In the most common situation, new elements 310 * will need to be added at the head (or close to it), and expired elements 311 * will need to be purged from the tail. 312 */ 313 private static final class ExpiryList { 314 private final LinkedList<ExpiryEntry> list = new LinkedList<>(); 315 private volatile boolean mayContainEntries; 316 317 // A loosely accurate boolean whose value is computed 318 // at the end of each operation performed on ExpiryList; 319 // Does not require synchronizing on the ConnectionPool. 320 boolean purgeMaybeRequired() { 321 return mayContainEntries; 322 } 323 324 // Returns the next expiry deadline 325 // should only be called while holding a synchronization 326 // lock on the ConnectionPool 327 Optional<Instant> nextExpiryDeadline() { 328 if (list.isEmpty()) return Optional.empty(); 329 else return Optional.of(list.getLast().expiry); 330 } 331 332 // should only be called while holding a synchronization 333 // lock on the ConnectionPool 334 void add(HttpConnection conn) { 335 add(conn, Instant.now(), KEEP_ALIVE); 336 } 337 338 // Used by whitebox test. 339 void add(HttpConnection conn, Instant now, long keepAlive) { 340 Instant then = now.truncatedTo(ChronoUnit.SECONDS) 341 .plus(keepAlive, ChronoUnit.SECONDS); 342 343 // Elements with the farther deadline are at the head of 344 // the list. It's more likely that the new element will 345 // have the farthest deadline, and will need to be inserted 346 // at the head of the list, so we're using an ascending 347 // list iterator to find the right insertion point. 348 ListIterator<ExpiryEntry> li = list.listIterator(); 349 while (li.hasNext()) { 350 ExpiryEntry entry = li.next(); 351 352 if (then.isAfter(entry.expiry)) { 353 li.previous(); 354 // insert here 355 li.add(new ExpiryEntry(conn, then)); 356 mayContainEntries = true; 357 return; 358 } 359 } 360 // last (or first) element of list (the last element is 361 // the first when the list is empty) 362 list.add(new ExpiryEntry(conn, then)); 363 mayContainEntries = true; 364 } 365 366 // should only be called while holding a synchronization 367 // lock on the ConnectionPool 368 void remove(HttpConnection c) { 369 if (c == null || list.isEmpty()) return; 370 ListIterator<ExpiryEntry> li = list.listIterator(); 371 while (li.hasNext()) { 372 ExpiryEntry e = li.next(); 373 if (e.connection.equals(c)) { 374 li.remove(); 375 mayContainEntries = !list.isEmpty(); 376 return; 377 } 378 } 379 } 380 381 // should only be called while holding a synchronization 382 // lock on the ConnectionPool. 383 // Purge all elements whose deadline is before now (now included). 384 List<HttpConnection> purgeUntil(Instant now) { 385 if (list.isEmpty()) return Collections.emptyList(); 386 387 List<HttpConnection> closelist = new ArrayList<>(); 388 389 // elements with the closest deadlines are at the tail 390 // of the queue, so we're going to use a descending iterator 391 // to remove them, and stop when we find the first element 392 // that has not expired yet. 393 Iterator<ExpiryEntry> li = list.descendingIterator(); 394 while (li.hasNext()) { 395 ExpiryEntry entry = li.next(); 396 // use !isAfter instead of isBefore in order to 397 // remove the entry if its expiry == now 398 if (!entry.expiry.isAfter(now)) { 399 li.remove(); 400 HttpConnection c = entry.connection; 401 closelist.add(c); 402 } else break; // the list is sorted 403 } 404 mayContainEntries = !list.isEmpty(); 405 return closelist; 406 } 407 408 // should only be called while holding a synchronization 409 // lock on the ConnectionPool 410 java.util.stream.Stream<ExpiryEntry> stream() { 411 return list.stream(); 412 } 413 414 // should only be called while holding a synchronization 415 // lock on the ConnectionPool 416 void clear() { 417 list.clear(); 418 mayContainEntries = false; 419 } 420 } 421 422 void cleanup(HttpConnection c, Throwable error) { 423 debug.log(Level.DEBUG, 424 "%s : ConnectionPool.cleanup(%s)", 425 String.valueOf(c.getConnectionFlow()), 426 error); 427 synchronized(this) { 428 if (c instanceof PlainHttpConnection) { 429 removeFromPool(c, plainPool); 430 } else { 431 assert c.isSecure(); 432 removeFromPool(c, sslPool); 433 } 434 expiryList.remove(c); 435 } 436 c.close(); 437 } 438 439 /** 440 * An object that subscribes to the flow while the connection is in 441 * the pool. Anything that comes in will cause the connection to be closed 442 * and removed from the pool. 443 */ 444 private final class CleanupTrigger implements 445 FlowTube.TubeSubscriber, FlowTube.TubePublisher, 446 Flow.Subscription { 447 448 private final HttpConnection connection; 449 private volatile boolean done; 450 451 public CleanupTrigger(HttpConnection connection) { 452 this.connection = connection; 453 } 454 455 public boolean isDone() { return done;} 456 457 private void triggerCleanup(Throwable error) { 458 done = true; 459 cleanup(connection, error); 460 } 461 462 @Override public void request(long n) {} 463 @Override public void cancel() {} 464 465 @Override 466 public void onSubscribe(Flow.Subscription subscription) { 467 subscription.request(1); 468 } 469 @Override 470 public void onError(Throwable error) { triggerCleanup(error); } 471 @Override 472 public void onComplete() { triggerCleanup(null); } 473 @Override 474 public void onNext(List<ByteBuffer> item) { 475 triggerCleanup(new IOException("Data received while in pool")); 476 } 477 478 @Override 479 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 480 subscriber.onSubscribe(this); 481 } 482 483 @Override 484 public String toString() { 485 return "CleanupTrigger(" + connection.getConnectionFlow() + ")"; 486 } 487 488 } 489 490 } |