< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ConnectionPool.java
Print this page
*** 1,7 ****
/*
! * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
--- 1,7 ----
/*
! * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
*** 23,75 ****
* questions.
*/
package jdk.incubator.http;
! import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Objects;
! import java.util.concurrent.atomic.AtomicLong;
! import java.util.concurrent.atomic.AtomicReference;
import jdk.incubator.http.internal.common.Utils;
/**
* Http 1.1 connection pool.
*/
final class ConnectionPool {
- // These counters are used to distribute ids for debugging
- // The ACTIVE_CLEANER_COUNTER will tell how many CacheCleaner
- // are active at a given time. It will increase when a new
- // CacheCleaner is started and decrease when it exits.
- static final AtomicLong ACTIVE_CLEANER_COUNTER = new AtomicLong();
- // The POOL_IDS_COUNTER increases each time a new ConnectionPool
- // is created. It may wrap and become negative but will never be
- // decremented.
- static final AtomicLong POOL_IDS_COUNTER = new AtomicLong();
- // The cleanerCounter is used to name cleaner threads within a
- // a connection pool, and increments monotically.
- // It may wrap and become negative but will never be
- // decremented.
- final AtomicLong cleanerCounter = new AtomicLong();
-
static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
"jdk.httpclient.keepalive.timeout", 1200); // seconds
// Pools of idle connections
! final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
! final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
! // A monotically increasing id for this connection pool.
! // It may be negative (that's OK)
! // Mostly used for debugging purposes when looking at thread dumps.
! // Global scope.
! final long poolID = POOL_IDS_COUNTER.incrementAndGet();
! final AtomicReference<CacheCleaner> cleanerRef;
/**
* Entries in connection pool are keyed by destination address and/or
* proxy address:
* case 1: plain TCP not via proxy (destination only)
--- 23,69 ----
* questions.
*/
package jdk.incubator.http;
! import java.io.IOException;
! import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
+ import java.nio.ByteBuffer;
+ import java.time.Instant;
+ import java.time.temporal.ChronoUnit;
+ import java.util.ArrayList;
+ import java.util.Collections;
import java.util.HashMap;
+ import java.util.Iterator;
import java.util.LinkedList;
+ import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
! import java.util.Optional;
! import java.util.concurrent.Flow;
! import java.util.stream.Collectors;
! import jdk.incubator.http.internal.common.FlowTube;
import jdk.incubator.http.internal.common.Utils;
/**
* Http 1.1 connection pool.
*/
final class ConnectionPool {
static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
"jdk.httpclient.keepalive.timeout", 1200); // seconds
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
// Pools of idle connections
! private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
! private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
! private final ExpiryList expiryList;
! private final String dbgTag; // used for debug
! boolean stopped;
/**
* Entries in connection pool are keyed by destination address and/or
* proxy address:
* case 1: plain TCP not via proxy (destination only)
*** 108,139 ****
public int hashCode() {
return Objects.hash(proxy, destination);
}
}
! static class ExpiryEntry {
! final HttpConnection connection;
! final long expiry; // absolute time in seconds of expiry time
! ExpiryEntry(HttpConnection connection, long expiry) {
! this.connection = connection;
! this.expiry = expiry;
}
- }
-
- final LinkedList<ExpiryEntry> expiryList;
/**
* There should be one of these per HttpClient.
*/
! ConnectionPool() {
plainPool = new HashMap<>();
sslPool = new HashMap<>();
! expiryList = new LinkedList<>();
! cleanerRef = new AtomicReference<>();
}
void start() {
}
static CacheKey cacheKey(InetSocketAddress destination,
InetSocketAddress proxy)
{
--- 102,131 ----
public int hashCode() {
return Objects.hash(proxy, destination);
}
}
! ConnectionPool(long clientId) {
! this("ConnectionPool("+clientId+")");
}
/**
* There should be one of these per HttpClient.
*/
! private ConnectionPool(String tag) {
! dbgTag = tag;
plainPool = new HashMap<>();
sslPool = new HashMap<>();
! expiryList = new ExpiryList();
! }
!
! final String dbgString() {
! return dbgTag;
}
void start() {
+ assert !stopped : "Already stopped";
}
static CacheKey cacheKey(InetSocketAddress destination,
InetSocketAddress proxy)
{
*** 141,192 ****
}
synchronized HttpConnection getConnection(boolean secure,
InetSocketAddress addr,
InetSocketAddress proxy) {
CacheKey key = new CacheKey(addr, proxy);
HttpConnection c = secure ? findConnection(key, sslPool)
: findConnection(key, plainPool);
//System.out.println ("getConnection returning: " + c);
return c;
}
/**
* Returns the connection to the pool.
*/
! synchronized void returnToPool(HttpConnection conn) {
if (conn instanceof PlainHttpConnection) {
putConnection(conn, plainPool);
} else {
putConnection(conn, sslPool);
}
! addToExpiryList(conn);
//System.out.println("Return to pool: " + conn);
}
private HttpConnection
findConnection(CacheKey key,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
LinkedList<HttpConnection> l = pool.get(key);
if (l == null || l.isEmpty()) {
return null;
} else {
HttpConnection c = l.removeFirst();
! removeFromExpiryList(c);
return c;
}
}
/* called from cache cleaner only */
! private void
removeFromPool(HttpConnection c,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
//System.out.println("cacheCleaner removing: " + c);
! LinkedList<HttpConnection> l = pool.get(c.cacheKey());
! assert l != null;
! boolean wasPresent = l.remove(c);
! assert wasPresent;
}
private void
putConnection(HttpConnection c,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
--- 133,222 ----
}
synchronized HttpConnection getConnection(boolean secure,
InetSocketAddress addr,
InetSocketAddress proxy) {
+ if (stopped) return null;
CacheKey key = new CacheKey(addr, proxy);
HttpConnection c = secure ? findConnection(key, sslPool)
: findConnection(key, plainPool);
//System.out.println ("getConnection returning: " + c);
return c;
}
/**
* Returns the connection to the pool.
*/
! void returnToPool(HttpConnection conn) {
! returnToPool(conn, Instant.now(), KEEP_ALIVE);
! }
!
! // Called also by whitebox tests
! void returnToPool(HttpConnection conn, Instant now, long keepAlive) {
!
! // Don't call registerCleanupTrigger while holding a lock,
! // but register it before the connection is added to the pool,
! // since we don't want to trigger the cleanup if the connection
! // is not in the pool.
! CleanupTrigger cleanup = registerCleanupTrigger(conn);
!
! // it's possible that cleanup may have been called.
! synchronized(this) {
! if (cleanup.isDone()) {
! return;
! } else if (stopped) {
! conn.close();
! return;
! }
if (conn instanceof PlainHttpConnection) {
putConnection(conn, plainPool);
} else {
+ assert conn.isSecure();
putConnection(conn, sslPool);
}
! expiryList.add(conn, now, keepAlive);
! }
//System.out.println("Return to pool: " + conn);
}
+ private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
+ // Connect the connection flow to a pub/sub pair that will take the
+ // connection out of the pool and close it if anything happens
+ // while the connection is sitting in the pool.
+ CleanupTrigger cleanup = new CleanupTrigger(conn);
+ FlowTube flow = conn.getConnectionFlow();
+ debug.log(Level.DEBUG, "registering %s", cleanup);
+ flow.connectFlows(cleanup, cleanup);
+ return cleanup;
+ }
+
private HttpConnection
findConnection(CacheKey key,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
LinkedList<HttpConnection> l = pool.get(key);
if (l == null || l.isEmpty()) {
return null;
} else {
HttpConnection c = l.removeFirst();
! expiryList.remove(c);
return c;
}
}
/* called from cache cleaner only */
! private boolean
removeFromPool(HttpConnection c,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
//System.out.println("cacheCleaner removing: " + c);
! assert Thread.holdsLock(this);
! CacheKey k = c.cacheKey();
! List<HttpConnection> l = pool.get(k);
! if (l == null || l.isEmpty()) {
! pool.remove(k);
! return false;
! }
! return l.remove(c);
}
private void
putConnection(HttpConnection c,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
*** 197,333 ****
pool.put(key, l);
}
l.add(c);
}
! static String makeCleanerName(long poolId, long cleanerId) {
! return "HTTP-Cache-cleaner-" + poolId + "-" + cleanerId;
! }
!
! // only runs while entries exist in cache
! final static class CacheCleaner extends Thread {
!
! volatile boolean stopping;
! // A monotically increasing id. May wrap and become negative (that's OK)
! // Mostly used for debugging purposes when looking at thread dumps.
! // Scoped per connection pool.
! final long cleanerID;
! // A reference to the owning ConnectionPool.
! // This reference's referent may become null if the HttpClientImpl
! // that owns this pool is GC'ed.
! final WeakReference<ConnectionPool> ownerRef;
! CacheCleaner(ConnectionPool owner) {
! this(owner, owner.cleanerCounter.incrementAndGet());
}
-
- CacheCleaner(ConnectionPool owner, long cleanerID) {
- super(null, null, makeCleanerName(owner.poolID, cleanerID), 0, false);
- this.cleanerID = cleanerID;
- this.ownerRef = new WeakReference<>(owner);
- setDaemon(true);
}
!
! synchronized boolean stopping() {
! return stopping || ownerRef.get() == null;
}
!
! synchronized void stopCleaner() {
! stopping = true;
}
! @Override
! public void run() {
! ACTIVE_CLEANER_COUNTER.incrementAndGet();
try {
! while (!stopping()) {
try {
! Thread.sleep(3000);
! } catch (InterruptedException e) {}
! ConnectionPool owner = ownerRef.get();
! if (owner == null) return;
! owner.cleanCache(this);
! owner = null;
}
} finally {
! ACTIVE_CLEANER_COUNTER.decrementAndGet();
}
}
}
! synchronized void removeFromExpiryList(HttpConnection c) {
! if (c == null) {
return;
}
! ListIterator<ExpiryEntry> li = expiryList.listIterator();
while (li.hasNext()) {
ExpiryEntry e = li.next();
if (e.connection.equals(c)) {
li.remove();
return;
}
}
- CacheCleaner cleaner = this.cleanerRef.get();
- if (expiryList.isEmpty() && cleaner != null) {
- this.cleanerRef.compareAndSet(cleaner, null);
- cleaner.stopCleaner();
- cleaner.interrupt();
}
- }
-
- private void cleanCache(CacheCleaner cleaner) {
- long now = System.currentTimeMillis() / 1000;
- LinkedList<HttpConnection> closelist = new LinkedList<>();
! synchronized (this) {
! ListIterator<ExpiryEntry> li = expiryList.listIterator();
while (li.hasNext()) {
ExpiryEntry entry = li.next();
! if (entry.expiry <= now) {
li.remove();
HttpConnection c = entry.connection;
closelist.add(c);
if (c instanceof PlainHttpConnection) {
removeFromPool(c, plainPool);
} else {
removeFromPool(c, sslPool);
}
}
- }
- if (expiryList.isEmpty() && cleaner != null) {
- this.cleanerRef.compareAndSet(cleaner, null);
- cleaner.stopCleaner();
- }
- }
- for (HttpConnection c : closelist) {
- //System.out.println ("KAC: closing " + c);
c.close();
}
- }
! private synchronized void addToExpiryList(HttpConnection conn) {
! long now = System.currentTimeMillis() / 1000;
! long then = now + KEEP_ALIVE;
! if (expiryList.isEmpty()) {
! CacheCleaner cleaner = new CacheCleaner(this);
! if (this.cleanerRef.compareAndSet(null, cleaner)) {
! cleaner.start();
}
! expiryList.add(new ExpiryEntry(conn, then));
! return;
}
! ListIterator<ExpiryEntry> li = expiryList.listIterator();
! while (li.hasNext()) {
! ExpiryEntry entry = li.next();
! if (then > entry.expiry) {
! li.previous();
! // insert here
! li.add(new ExpiryEntry(conn, then));
! return;
}
}
! // first element of list
! expiryList.add(new ExpiryEntry(conn, then));
}
}
--- 227,490 ----
pool.put(key, l);
}
l.add(c);
}
! /**
! * Purge expired connection and return the number of milliseconds
! * in which the next connection is scheduled to expire.
! * If no connections are scheduled to be purged return 0.
! * @return the delay in milliseconds in which the next connection will
! * expire.
! */
! long purgeExpiredConnectionsAndReturnNextDeadline() {
! if (!expiryList.purgeMaybeRequired()) return 0;
! return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
! }
!
! // Used for whitebox testing
! long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
! long nextPurge = 0;
!
! // We may be in the process of adding new elements
! // to the expiry list - but those elements will not
! // have outlast their keep alive timer yet since we're
! // just adding them.
! if (!expiryList.purgeMaybeRequired()) return nextPurge;
! List<HttpConnection> closelist;
! synchronized (this) {
! closelist = expiryList.purgeUntil(now);
! for (HttpConnection c : closelist) {
! if (c instanceof PlainHttpConnection) {
! boolean wasPresent = removeFromPool(c, plainPool);
! assert wasPresent;
! } else {
! boolean wasPresent = removeFromPool(c, sslPool);
! assert wasPresent;
}
}
! nextPurge = now.until(
! expiryList.nextExpiryDeadline().orElse(now),
! ChronoUnit.MILLIS);
}
! closelist.forEach(this::close);
! return nextPurge;
}
! private void close(HttpConnection c) {
try {
! c.close();
! } catch (Throwable e) {} // ignore
! }
!
! void stop() {
! List<HttpConnection> closelist = Collections.emptyList();
try {
! synchronized (this) {
! stopped = true;
! closelist = expiryList.stream()
! .map(e -> e.connection)
! .collect(Collectors.toList());
! expiryList.clear();
! plainPool.clear();
! sslPool.clear();
}
} finally {
! closelist.forEach(this::close);
}
}
+
+ static final class ExpiryEntry {
+ final HttpConnection connection;
+ final Instant expiry; // absolute time in seconds of expiry time
+ ExpiryEntry(HttpConnection connection, Instant expiry) {
+ this.connection = connection;
+ this.expiry = expiry;
+ }
}
! /**
! * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
! * deadline is at the tail of the list, and the entry with the farther
! * deadline is at the head. In the most common situation, new elements
! * will need to be added at the head (or close to it), and expired elements
! * will need to be purged from the tail.
! */
! private static final class ExpiryList {
! private final LinkedList<ExpiryEntry> list = new LinkedList<>();
! private volatile boolean mayContainEntries;
!
! // A loosely accurate boolean whose value is computed
! // at the end of each operation performed on ExpiryList;
! // Does not require synchronizing on the ConnectionPool.
! boolean purgeMaybeRequired() {
! return mayContainEntries;
! }
!
! // Returns the next expiry deadline
! // should only be called while holding a synchronization
! // lock on the ConnectionPool
! Optional<Instant> nextExpiryDeadline() {
! if (list.isEmpty()) return Optional.empty();
! else return Optional.of(list.getLast().expiry);
! }
!
! // should only be called while holding a synchronization
! // lock on the ConnectionPool
! void add(HttpConnection conn) {
! add(conn, Instant.now(), KEEP_ALIVE);
! }
!
! // Used by whitebox test.
! void add(HttpConnection conn, Instant now, long keepAlive) {
! Instant then = now.truncatedTo(ChronoUnit.SECONDS)
! .plus(keepAlive, ChronoUnit.SECONDS);
!
! // Elements with the farther deadline are at the head of
! // the list. It's more likely that the new element will
! // have the farthest deadline, and will need to be inserted
! // at the head of the list, so we're using an ascending
! // list iterator to find the right insertion point.
! ListIterator<ExpiryEntry> li = list.listIterator();
! while (li.hasNext()) {
! ExpiryEntry entry = li.next();
!
! if (then.isAfter(entry.expiry)) {
! li.previous();
! // insert here
! li.add(new ExpiryEntry(conn, then));
! mayContainEntries = true;
return;
}
! }
! // last (or first) element of list (the last element is
! // the first when the list is empty)
! list.add(new ExpiryEntry(conn, then));
! mayContainEntries = true;
! }
!
! // should only be called while holding a synchronization
! // lock on the ConnectionPool
! void remove(HttpConnection c) {
! if (c == null || list.isEmpty()) return;
! ListIterator<ExpiryEntry> li = list.listIterator();
while (li.hasNext()) {
ExpiryEntry e = li.next();
if (e.connection.equals(c)) {
li.remove();
+ mayContainEntries = !list.isEmpty();
return;
}
}
}
! // should only be called while holding a synchronization
! // lock on the ConnectionPool.
! // Purge all elements whose deadline is before now (now included).
! List<HttpConnection> purgeUntil(Instant now) {
! if (list.isEmpty()) return Collections.emptyList();
!
! List<HttpConnection> closelist = new ArrayList<>();
!
! // elements with the closest deadlines are at the tail
! // of the queue, so we're going to use a descending iterator
! // to remove them, and stop when we find the first element
! // that has not expired yet.
! Iterator<ExpiryEntry> li = list.descendingIterator();
while (li.hasNext()) {
ExpiryEntry entry = li.next();
! // use !isAfter instead of isBefore in order to
! // remove the entry if its expiry == now
! if (!entry.expiry.isAfter(now)) {
li.remove();
HttpConnection c = entry.connection;
closelist.add(c);
+ } else break; // the list is sorted
+ }
+ mayContainEntries = !list.isEmpty();
+ return closelist;
+ }
+
+ // should only be called while holding a synchronization
+ // lock on the ConnectionPool
+ java.util.stream.Stream<ExpiryEntry> stream() {
+ return list.stream();
+ }
+
+ // should only be called while holding a synchronization
+ // lock on the ConnectionPool
+ void clear() {
+ list.clear();
+ mayContainEntries = false;
+ }
+ }
+
+ void cleanup(HttpConnection c, Throwable error) {
+ debug.log(Level.DEBUG,
+ "%s : ConnectionPool.cleanup(%s)",
+ String.valueOf(c.getConnectionFlow()),
+ error);
+ synchronized(this) {
if (c instanceof PlainHttpConnection) {
removeFromPool(c, plainPool);
} else {
+ assert c.isSecure();
removeFromPool(c, sslPool);
}
+ expiryList.remove(c);
}
c.close();
}
! /**
! * An object that subscribes to the flow while the connection is in
! * the pool. Anything that comes in will cause the connection to be closed
! * and removed from the pool.
! */
! private final class CleanupTrigger implements
! FlowTube.TubeSubscriber, FlowTube.TubePublisher,
! Flow.Subscription {
!
! private final HttpConnection connection;
! private volatile boolean done;
!
! public CleanupTrigger(HttpConnection connection) {
! this.connection = connection;
}
!
! public boolean isDone() { return done;}
!
! private void triggerCleanup(Throwable error) {
! done = true;
! cleanup(connection, error);
}
! @Override public void request(long n) {}
! @Override public void cancel() {}
! @Override
! public void onSubscribe(Flow.Subscription subscription) {
! subscription.request(1);
! }
! @Override
! public void onError(Throwable error) { triggerCleanup(error); }
! @Override
! public void onComplete() { triggerCleanup(null); }
! @Override
! public void onNext(List<ByteBuffer> item) {
! triggerCleanup(new IOException("Data received while in pool"));
! }
!
! @Override
! public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
! subscriber.onSubscribe(this);
}
+
+ @Override
+ public String toString() {
+ return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
}
!
}
+
}
< prev index next >