< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ConnectionPool.java
Print this page
@@ -1,7 +1,7 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
@@ -23,53 +23,47 @@
* questions.
*/
package jdk.incubator.http;
-import java.lang.ref.WeakReference;
+import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Optional;
+import java.util.concurrent.Flow;
+import java.util.stream.Collectors;
+import jdk.incubator.http.internal.common.FlowTube;
import jdk.incubator.http.internal.common.Utils;
/**
* Http 1.1 connection pool.
*/
final class ConnectionPool {
- // These counters are used to distribute ids for debugging
- // The ACTIVE_CLEANER_COUNTER will tell how many CacheCleaner
- // are active at a given time. It will increase when a new
- // CacheCleaner is started and decrease when it exits.
- static final AtomicLong ACTIVE_CLEANER_COUNTER = new AtomicLong();
- // The POOL_IDS_COUNTER increases each time a new ConnectionPool
- // is created. It may wrap and become negative but will never be
- // decremented.
- static final AtomicLong POOL_IDS_COUNTER = new AtomicLong();
- // The cleanerCounter is used to name cleaner threads within a
- // a connection pool, and increments monotically.
- // It may wrap and become negative but will never be
- // decremented.
- final AtomicLong cleanerCounter = new AtomicLong();
-
static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
"jdk.httpclient.keepalive.timeout", 1200); // seconds
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
// Pools of idle connections
- final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
- final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
- // A monotically increasing id for this connection pool.
- // It may be negative (that's OK)
- // Mostly used for debugging purposes when looking at thread dumps.
- // Global scope.
- final long poolID = POOL_IDS_COUNTER.incrementAndGet();
- final AtomicReference<CacheCleaner> cleanerRef;
+ private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
+ private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
+ private final ExpiryList expiryList;
+ private final String dbgTag; // used for debug
+ boolean stopped;
/**
* Entries in connection pool are keyed by destination address and/or
* proxy address:
* case 1: plain TCP not via proxy (destination only)
@@ -108,32 +102,30 @@
public int hashCode() {
return Objects.hash(proxy, destination);
}
}
- static class ExpiryEntry {
- final HttpConnection connection;
- final long expiry; // absolute time in seconds of expiry time
- ExpiryEntry(HttpConnection connection, long expiry) {
- this.connection = connection;
- this.expiry = expiry;
+ ConnectionPool(long clientId) {
+ this("ConnectionPool("+clientId+")");
}
- }
-
- final LinkedList<ExpiryEntry> expiryList;
/**
* There should be one of these per HttpClient.
*/
- ConnectionPool() {
+ private ConnectionPool(String tag) {
+ dbgTag = tag;
plainPool = new HashMap<>();
sslPool = new HashMap<>();
- expiryList = new LinkedList<>();
- cleanerRef = new AtomicReference<>();
+ expiryList = new ExpiryList();
+ }
+
+ final String dbgString() {
+ return dbgTag;
}
void start() {
+ assert !stopped : "Already stopped";
}
static CacheKey cacheKey(InetSocketAddress destination,
InetSocketAddress proxy)
{
@@ -141,52 +133,90 @@
}
synchronized HttpConnection getConnection(boolean secure,
InetSocketAddress addr,
InetSocketAddress proxy) {
+ if (stopped) return null;
CacheKey key = new CacheKey(addr, proxy);
HttpConnection c = secure ? findConnection(key, sslPool)
: findConnection(key, plainPool);
//System.out.println ("getConnection returning: " + c);
return c;
}
/**
* Returns the connection to the pool.
*/
- synchronized void returnToPool(HttpConnection conn) {
+ void returnToPool(HttpConnection conn) {
+ returnToPool(conn, Instant.now(), KEEP_ALIVE);
+ }
+
+ // Called also by whitebox tests
+ void returnToPool(HttpConnection conn, Instant now, long keepAlive) {
+
+ // Don't call registerCleanupTrigger while holding a lock,
+ // but register it before the connection is added to the pool,
+ // since we don't want to trigger the cleanup if the connection
+ // is not in the pool.
+ CleanupTrigger cleanup = registerCleanupTrigger(conn);
+
+ // it's possible that cleanup may have been called.
+ synchronized(this) {
+ if (cleanup.isDone()) {
+ return;
+ } else if (stopped) {
+ conn.close();
+ return;
+ }
if (conn instanceof PlainHttpConnection) {
putConnection(conn, plainPool);
} else {
+ assert conn.isSecure();
putConnection(conn, sslPool);
}
- addToExpiryList(conn);
+ expiryList.add(conn, now, keepAlive);
+ }
//System.out.println("Return to pool: " + conn);
}
+ private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
+ // Connect the connection flow to a pub/sub pair that will take the
+ // connection out of the pool and close it if anything happens
+ // while the connection is sitting in the pool.
+ CleanupTrigger cleanup = new CleanupTrigger(conn);
+ FlowTube flow = conn.getConnectionFlow();
+ debug.log(Level.DEBUG, "registering %s", cleanup);
+ flow.connectFlows(cleanup, cleanup);
+ return cleanup;
+ }
+
private HttpConnection
findConnection(CacheKey key,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
LinkedList<HttpConnection> l = pool.get(key);
if (l == null || l.isEmpty()) {
return null;
} else {
HttpConnection c = l.removeFirst();
- removeFromExpiryList(c);
+ expiryList.remove(c);
return c;
}
}
/* called from cache cleaner only */
- private void
+ private boolean
removeFromPool(HttpConnection c,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
//System.out.println("cacheCleaner removing: " + c);
- LinkedList<HttpConnection> l = pool.get(c.cacheKey());
- assert l != null;
- boolean wasPresent = l.remove(c);
- assert wasPresent;
+ assert Thread.holdsLock(this);
+ CacheKey k = c.cacheKey();
+ List<HttpConnection> l = pool.get(k);
+ if (l == null || l.isEmpty()) {
+ pool.remove(k);
+ return false;
+ }
+ return l.remove(c);
}
private void
putConnection(HttpConnection c,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
@@ -197,137 +227,264 @@
pool.put(key, l);
}
l.add(c);
}
- static String makeCleanerName(long poolId, long cleanerId) {
- return "HTTP-Cache-cleaner-" + poolId + "-" + cleanerId;
- }
-
- // only runs while entries exist in cache
- final static class CacheCleaner extends Thread {
-
- volatile boolean stopping;
- // A monotically increasing id. May wrap and become negative (that's OK)
- // Mostly used for debugging purposes when looking at thread dumps.
- // Scoped per connection pool.
- final long cleanerID;
- // A reference to the owning ConnectionPool.
- // This reference's referent may become null if the HttpClientImpl
- // that owns this pool is GC'ed.
- final WeakReference<ConnectionPool> ownerRef;
+ /**
+ * Purge expired connection and return the number of milliseconds
+ * in which the next connection is scheduled to expire.
+ * If no connections are scheduled to be purged return 0.
+ * @return the delay in milliseconds in which the next connection will
+ * expire.
+ */
+ long purgeExpiredConnectionsAndReturnNextDeadline() {
+ if (!expiryList.purgeMaybeRequired()) return 0;
+ return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
+ }
+
+ // Used for whitebox testing
+ long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
+ long nextPurge = 0;
+
+ // We may be in the process of adding new elements
+ // to the expiry list - but those elements will not
+ // have outlast their keep alive timer yet since we're
+ // just adding them.
+ if (!expiryList.purgeMaybeRequired()) return nextPurge;
- CacheCleaner(ConnectionPool owner) {
- this(owner, owner.cleanerCounter.incrementAndGet());
+ List<HttpConnection> closelist;
+ synchronized (this) {
+ closelist = expiryList.purgeUntil(now);
+ for (HttpConnection c : closelist) {
+ if (c instanceof PlainHttpConnection) {
+ boolean wasPresent = removeFromPool(c, plainPool);
+ assert wasPresent;
+ } else {
+ boolean wasPresent = removeFromPool(c, sslPool);
+ assert wasPresent;
}
-
- CacheCleaner(ConnectionPool owner, long cleanerID) {
- super(null, null, makeCleanerName(owner.poolID, cleanerID), 0, false);
- this.cleanerID = cleanerID;
- this.ownerRef = new WeakReference<>(owner);
- setDaemon(true);
}
-
- synchronized boolean stopping() {
- return stopping || ownerRef.get() == null;
+ nextPurge = now.until(
+ expiryList.nextExpiryDeadline().orElse(now),
+ ChronoUnit.MILLIS);
}
-
- synchronized void stopCleaner() {
- stopping = true;
+ closelist.forEach(this::close);
+ return nextPurge;
}
- @Override
- public void run() {
- ACTIVE_CLEANER_COUNTER.incrementAndGet();
+ private void close(HttpConnection c) {
try {
- while (!stopping()) {
+ c.close();
+ } catch (Throwable e) {} // ignore
+ }
+
+ void stop() {
+ List<HttpConnection> closelist = Collections.emptyList();
try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {}
- ConnectionPool owner = ownerRef.get();
- if (owner == null) return;
- owner.cleanCache(this);
- owner = null;
+ synchronized (this) {
+ stopped = true;
+ closelist = expiryList.stream()
+ .map(e -> e.connection)
+ .collect(Collectors.toList());
+ expiryList.clear();
+ plainPool.clear();
+ sslPool.clear();
}
} finally {
- ACTIVE_CLEANER_COUNTER.decrementAndGet();
+ closelist.forEach(this::close);
}
}
+
+ static final class ExpiryEntry {
+ final HttpConnection connection;
+ final Instant expiry; // absolute time in seconds of expiry time
+ ExpiryEntry(HttpConnection connection, Instant expiry) {
+ this.connection = connection;
+ this.expiry = expiry;
+ }
}
- synchronized void removeFromExpiryList(HttpConnection c) {
- if (c == null) {
+ /**
+ * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
+ * deadline is at the tail of the list, and the entry with the farther
+ * deadline is at the head. In the most common situation, new elements
+ * will need to be added at the head (or close to it), and expired elements
+ * will need to be purged from the tail.
+ */
+ private static final class ExpiryList {
+ private final LinkedList<ExpiryEntry> list = new LinkedList<>();
+ private volatile boolean mayContainEntries;
+
+ // A loosely accurate boolean whose value is computed
+ // at the end of each operation performed on ExpiryList;
+ // Does not require synchronizing on the ConnectionPool.
+ boolean purgeMaybeRequired() {
+ return mayContainEntries;
+ }
+
+ // Returns the next expiry deadline
+ // should only be called while holding a synchronization
+ // lock on the ConnectionPool
+ Optional<Instant> nextExpiryDeadline() {
+ if (list.isEmpty()) return Optional.empty();
+ else return Optional.of(list.getLast().expiry);
+ }
+
+ // should only be called while holding a synchronization
+ // lock on the ConnectionPool
+ void add(HttpConnection conn) {
+ add(conn, Instant.now(), KEEP_ALIVE);
+ }
+
+ // Used by whitebox test.
+ void add(HttpConnection conn, Instant now, long keepAlive) {
+ Instant then = now.truncatedTo(ChronoUnit.SECONDS)
+ .plus(keepAlive, ChronoUnit.SECONDS);
+
+ // Elements with the farther deadline are at the head of
+ // the list. It's more likely that the new element will
+ // have the farthest deadline, and will need to be inserted
+ // at the head of the list, so we're using an ascending
+ // list iterator to find the right insertion point.
+ ListIterator<ExpiryEntry> li = list.listIterator();
+ while (li.hasNext()) {
+ ExpiryEntry entry = li.next();
+
+ if (then.isAfter(entry.expiry)) {
+ li.previous();
+ // insert here
+ li.add(new ExpiryEntry(conn, then));
+ mayContainEntries = true;
return;
}
- ListIterator<ExpiryEntry> li = expiryList.listIterator();
+ }
+ // last (or first) element of list (the last element is
+ // the first when the list is empty)
+ list.add(new ExpiryEntry(conn, then));
+ mayContainEntries = true;
+ }
+
+ // should only be called while holding a synchronization
+ // lock on the ConnectionPool
+ void remove(HttpConnection c) {
+ if (c == null || list.isEmpty()) return;
+ ListIterator<ExpiryEntry> li = list.listIterator();
while (li.hasNext()) {
ExpiryEntry e = li.next();
if (e.connection.equals(c)) {
li.remove();
+ mayContainEntries = !list.isEmpty();
return;
}
}
- CacheCleaner cleaner = this.cleanerRef.get();
- if (expiryList.isEmpty() && cleaner != null) {
- this.cleanerRef.compareAndSet(cleaner, null);
- cleaner.stopCleaner();
- cleaner.interrupt();
}
- }
-
- private void cleanCache(CacheCleaner cleaner) {
- long now = System.currentTimeMillis() / 1000;
- LinkedList<HttpConnection> closelist = new LinkedList<>();
- synchronized (this) {
- ListIterator<ExpiryEntry> li = expiryList.listIterator();
+ // should only be called while holding a synchronization
+ // lock on the ConnectionPool.
+ // Purge all elements whose deadline is before now (now included).
+ List<HttpConnection> purgeUntil(Instant now) {
+ if (list.isEmpty()) return Collections.emptyList();
+
+ List<HttpConnection> closelist = new ArrayList<>();
+
+ // elements with the closest deadlines are at the tail
+ // of the queue, so we're going to use a descending iterator
+ // to remove them, and stop when we find the first element
+ // that has not expired yet.
+ Iterator<ExpiryEntry> li = list.descendingIterator();
while (li.hasNext()) {
ExpiryEntry entry = li.next();
- if (entry.expiry <= now) {
+ // use !isAfter instead of isBefore in order to
+ // remove the entry if its expiry == now
+ if (!entry.expiry.isAfter(now)) {
li.remove();
HttpConnection c = entry.connection;
closelist.add(c);
+ } else break; // the list is sorted
+ }
+ mayContainEntries = !list.isEmpty();
+ return closelist;
+ }
+
+ // should only be called while holding a synchronization
+ // lock on the ConnectionPool
+ java.util.stream.Stream<ExpiryEntry> stream() {
+ return list.stream();
+ }
+
+ // should only be called while holding a synchronization
+ // lock on the ConnectionPool
+ void clear() {
+ list.clear();
+ mayContainEntries = false;
+ }
+ }
+
+ void cleanup(HttpConnection c, Throwable error) {
+ debug.log(Level.DEBUG,
+ "%s : ConnectionPool.cleanup(%s)",
+ String.valueOf(c.getConnectionFlow()),
+ error);
+ synchronized(this) {
if (c instanceof PlainHttpConnection) {
removeFromPool(c, plainPool);
} else {
+ assert c.isSecure();
removeFromPool(c, sslPool);
}
+ expiryList.remove(c);
}
- }
- if (expiryList.isEmpty() && cleaner != null) {
- this.cleanerRef.compareAndSet(cleaner, null);
- cleaner.stopCleaner();
- }
- }
- for (HttpConnection c : closelist) {
- //System.out.println ("KAC: closing " + c);
c.close();
}
- }
- private synchronized void addToExpiryList(HttpConnection conn) {
- long now = System.currentTimeMillis() / 1000;
- long then = now + KEEP_ALIVE;
- if (expiryList.isEmpty()) {
- CacheCleaner cleaner = new CacheCleaner(this);
- if (this.cleanerRef.compareAndSet(null, cleaner)) {
- cleaner.start();
+ /**
+ * An object that subscribes to the flow while the connection is in
+ * the pool. Anything that comes in will cause the connection to be closed
+ * and removed from the pool.
+ */
+ private final class CleanupTrigger implements
+ FlowTube.TubeSubscriber, FlowTube.TubePublisher,
+ Flow.Subscription {
+
+ private final HttpConnection connection;
+ private volatile boolean done;
+
+ public CleanupTrigger(HttpConnection connection) {
+ this.connection = connection;
}
- expiryList.add(new ExpiryEntry(conn, then));
- return;
+
+ public boolean isDone() { return done;}
+
+ private void triggerCleanup(Throwable error) {
+ done = true;
+ cleanup(connection, error);
}
- ListIterator<ExpiryEntry> li = expiryList.listIterator();
- while (li.hasNext()) {
- ExpiryEntry entry = li.next();
+ @Override public void request(long n) {}
+ @Override public void cancel() {}
- if (then > entry.expiry) {
- li.previous();
- // insert here
- li.add(new ExpiryEntry(conn, then));
- return;
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ subscription.request(1);
+ }
+ @Override
+ public void onError(Throwable error) { triggerCleanup(error); }
+ @Override
+ public void onComplete() { triggerCleanup(null); }
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ triggerCleanup(new IOException("Data received while in pool"));
+ }
+
+ @Override
+ public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+ subscriber.onSubscribe(this);
}
+
+ @Override
+ public String toString() {
+ return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
}
- // first element of list
- expiryList.add(new ExpiryEntry(conn, then));
+
}
+
}
< prev index next >