< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ConnectionPool.java

Print this page

        

@@ -1,7 +1,7 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 only, as
  * published by the Free Software Foundation.  Oracle designates this

@@ -23,53 +23,47 @@
  * questions.
  */
 
 package jdk.incubator.http;
 
-import java.lang.ref.WeakReference;
+import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.ListIterator;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Optional;
+import java.util.concurrent.Flow;
+import java.util.stream.Collectors;
+import jdk.incubator.http.internal.common.FlowTube;
 import jdk.incubator.http.internal.common.Utils;
 
 /**
  * Http 1.1 connection pool.
  */
 final class ConnectionPool {
 
-    // These counters are used to distribute ids for debugging
-    // The ACTIVE_CLEANER_COUNTER will tell how many CacheCleaner
-    // are active at a given time. It will increase when a new
-    // CacheCleaner is started and decrease when it exits.
-    static final AtomicLong ACTIVE_CLEANER_COUNTER = new AtomicLong();
-    // The POOL_IDS_COUNTER increases each time a new ConnectionPool
-    // is created. It may wrap and become negative but will never be
-    // decremented.
-    static final AtomicLong POOL_IDS_COUNTER = new AtomicLong();
-    // The cleanerCounter is used to name cleaner threads within a
-    // a connection pool, and increments monotically.
-    // It may wrap and become negative but will never be
-    // decremented.
-    final AtomicLong cleanerCounter = new AtomicLong();
-
     static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
             "jdk.httpclient.keepalive.timeout", 1200); // seconds
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
 
     // Pools of idle connections
 
-    final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
-    final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
-    // A monotically increasing id for this connection pool.
-    // It may be negative (that's OK)
-    // Mostly used for debugging purposes when looking at thread dumps.
-    // Global scope.
-    final long poolID = POOL_IDS_COUNTER.incrementAndGet();
-    final AtomicReference<CacheCleaner> cleanerRef;
+    private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
+    private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
+    private final ExpiryList expiryList;
+    private final String dbgTag; // used for debug
+    boolean stopped;
 
     /**
      * Entries in connection pool are keyed by destination address and/or
      * proxy address:
      * case 1: plain TCP not via proxy (destination only)

@@ -108,32 +102,30 @@
         public int hashCode() {
             return Objects.hash(proxy, destination);
         }
     }
 
-    static class ExpiryEntry {
-        final HttpConnection connection;
-        final long expiry; // absolute time in seconds of expiry time
-        ExpiryEntry(HttpConnection connection, long expiry) {
-            this.connection = connection;
-            this.expiry = expiry;
+    ConnectionPool(long clientId) {
+        this("ConnectionPool("+clientId+")");
         }
-    }
-
-    final LinkedList<ExpiryEntry> expiryList;
 
     /**
      * There should be one of these per HttpClient.
      */
-    ConnectionPool() {
+    private ConnectionPool(String tag) {
+        dbgTag = tag;
         plainPool = new HashMap<>();
         sslPool = new HashMap<>();
-        expiryList = new LinkedList<>();
-        cleanerRef = new AtomicReference<>();
+        expiryList = new ExpiryList();
+    }
+
+    final String dbgString() {
+        return dbgTag;
     }
 
     void start() {
+        assert !stopped : "Already stopped";
     }
 
     static CacheKey cacheKey(InetSocketAddress destination,
                              InetSocketAddress proxy)
     {

@@ -141,52 +133,90 @@
     }
 
     synchronized HttpConnection getConnection(boolean secure,
                                               InetSocketAddress addr,
                                               InetSocketAddress proxy) {
+        if (stopped) return null;
         CacheKey key = new CacheKey(addr, proxy);
         HttpConnection c = secure ? findConnection(key, sslPool)
                                   : findConnection(key, plainPool);
         //System.out.println ("getConnection returning: " + c);
         return c;
     }
 
     /**
      * Returns the connection to the pool.
      */
-    synchronized void returnToPool(HttpConnection conn) {
+    void returnToPool(HttpConnection conn) {
+        returnToPool(conn, Instant.now(), KEEP_ALIVE);
+    }
+
+    // Called also by whitebox tests
+    void returnToPool(HttpConnection conn, Instant now, long keepAlive) {
+
+        // Don't call registerCleanupTrigger while holding a lock,
+        // but register it before the connection is added to the pool,
+        // since we don't want to trigger the cleanup if the connection
+        // is not in the pool.
+        CleanupTrigger cleanup = registerCleanupTrigger(conn);
+
+        // it's possible that cleanup may have been called.
+        synchronized(this) {
+            if (cleanup.isDone()) {
+                return;
+            } else if (stopped) {
+                conn.close();
+                return;
+            }
         if (conn instanceof PlainHttpConnection) {
             putConnection(conn, plainPool);
         } else {
+                assert conn.isSecure();
             putConnection(conn, sslPool);
         }
-        addToExpiryList(conn);
+            expiryList.add(conn, now, keepAlive);
+        }
         //System.out.println("Return to pool: " + conn);
     }
 
+    private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
+        // Connect the connection flow to a pub/sub pair that will take the
+        // connection out of the pool and close it if anything happens
+        // while the connection is sitting in the pool.
+        CleanupTrigger cleanup = new CleanupTrigger(conn);
+        FlowTube flow = conn.getConnectionFlow();
+        debug.log(Level.DEBUG, "registering %s", cleanup);
+        flow.connectFlows(cleanup, cleanup);
+        return cleanup;
+    }
+
     private HttpConnection
     findConnection(CacheKey key,
                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
         LinkedList<HttpConnection> l = pool.get(key);
         if (l == null || l.isEmpty()) {
             return null;
         } else {
             HttpConnection c = l.removeFirst();
-            removeFromExpiryList(c);
+            expiryList.remove(c);
             return c;
         }
     }
 
     /* called from cache cleaner only  */
-    private void
+    private boolean
     removeFromPool(HttpConnection c,
                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
         //System.out.println("cacheCleaner removing: " + c);
-        LinkedList<HttpConnection> l = pool.get(c.cacheKey());
-        assert l != null;
-        boolean wasPresent = l.remove(c);
-        assert wasPresent;
+        assert Thread.holdsLock(this);
+        CacheKey k = c.cacheKey();
+        List<HttpConnection> l = pool.get(k);
+        if (l == null || l.isEmpty()) {
+            pool.remove(k);
+            return false;
+        }
+        return l.remove(c);
     }
 
     private void
     putConnection(HttpConnection c,
                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {

@@ -197,137 +227,264 @@
             pool.put(key, l);
         }
         l.add(c);
     }
 
-    static String makeCleanerName(long poolId, long cleanerId) {
-        return "HTTP-Cache-cleaner-" + poolId + "-" + cleanerId;
-    }
-
-    // only runs while entries exist in cache
-    final static class CacheCleaner extends Thread {
-
-        volatile boolean stopping;
-        // A monotically increasing id. May wrap and become negative (that's OK)
-        // Mostly used for debugging purposes when looking at thread dumps.
-        // Scoped per connection pool.
-        final long cleanerID;
-        // A reference to the owning ConnectionPool.
-        // This reference's referent may become null if the HttpClientImpl
-        // that owns this pool is GC'ed.
-        final WeakReference<ConnectionPool> ownerRef;
+    /**
+     * Purge expired connection and return the number of milliseconds
+     * in which the next connection is scheduled to expire.
+     * If no connections are scheduled to be purged return 0.
+     * @return the delay in milliseconds in which the next connection will
+     *         expire.
+     */
+    long purgeExpiredConnectionsAndReturnNextDeadline() {
+        if (!expiryList.purgeMaybeRequired()) return 0;
+        return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
+    }
+
+    // Used for whitebox testing
+    long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
+        long nextPurge = 0;
+
+        // We may be in the process of adding new elements
+        // to the expiry list - but those elements will not
+        // have outlast their keep alive timer yet since we're
+        // just adding them.
+        if (!expiryList.purgeMaybeRequired()) return nextPurge;
 
-        CacheCleaner(ConnectionPool owner) {
-            this(owner, owner.cleanerCounter.incrementAndGet());
+        List<HttpConnection> closelist;
+        synchronized (this) {
+            closelist = expiryList.purgeUntil(now);
+            for (HttpConnection c : closelist) {
+                if (c instanceof PlainHttpConnection) {
+                    boolean wasPresent = removeFromPool(c, plainPool);
+                    assert wasPresent;
+                } else {
+                    boolean wasPresent = removeFromPool(c, sslPool);
+                    assert wasPresent;
         }
-
-        CacheCleaner(ConnectionPool owner, long cleanerID) {
-            super(null, null, makeCleanerName(owner.poolID, cleanerID), 0, false);
-            this.cleanerID = cleanerID;
-            this.ownerRef = new WeakReference<>(owner);
-            setDaemon(true);
         }
-
-        synchronized boolean stopping() {
-            return stopping || ownerRef.get() == null;
+            nextPurge = now.until(
+                    expiryList.nextExpiryDeadline().orElse(now),
+                    ChronoUnit.MILLIS);
         }
-
-        synchronized void stopCleaner() {
-            stopping = true;
+        closelist.forEach(this::close);
+        return nextPurge;
         }
 
-        @Override
-        public void run() {
-            ACTIVE_CLEANER_COUNTER.incrementAndGet();
+    private void close(HttpConnection c) {
             try {
-                while (!stopping()) {
+            c.close();
+        } catch (Throwable e) {} // ignore
+    }
+
+    void stop() {
+        List<HttpConnection> closelist = Collections.emptyList();
                     try {
-                        Thread.sleep(3000);
-                    } catch (InterruptedException e) {}
-                    ConnectionPool owner = ownerRef.get();
-                    if (owner == null) return;
-                    owner.cleanCache(this);
-                    owner = null;
+            synchronized (this) {
+                stopped = true;
+                closelist = expiryList.stream()
+                    .map(e -> e.connection)
+                    .collect(Collectors.toList());
+                expiryList.clear();
+                plainPool.clear();
+                sslPool.clear();
                 }
             } finally {
-                ACTIVE_CLEANER_COUNTER.decrementAndGet();
+            closelist.forEach(this::close);
             }
         }
+
+    static final class ExpiryEntry {
+        final HttpConnection connection;
+        final Instant expiry; // absolute time in seconds of expiry time
+        ExpiryEntry(HttpConnection connection, Instant expiry) {
+            this.connection = connection;
+            this.expiry = expiry;
+        }
     }
 
-    synchronized void removeFromExpiryList(HttpConnection c) {
-        if (c == null) {
+    /**
+     * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
+     * deadline is at the tail of the list, and the entry with the farther
+     * deadline is at the head. In the most common situation, new elements
+     * will need to be added at the head (or close to it), and expired elements
+     * will need to be purged from the tail.
+     */
+    private static final class ExpiryList {
+        private final LinkedList<ExpiryEntry> list = new LinkedList<>();
+        private volatile boolean mayContainEntries;
+
+        // A loosely accurate boolean whose value is computed
+        // at the end of each operation performed on ExpiryList;
+        // Does not require synchronizing on the ConnectionPool.
+        boolean purgeMaybeRequired() {
+            return mayContainEntries;
+        }
+
+        // Returns the next expiry deadline
+        // should only be called while holding a synchronization
+        // lock on the ConnectionPool
+        Optional<Instant> nextExpiryDeadline() {
+            if (list.isEmpty()) return Optional.empty();
+            else return Optional.of(list.getLast().expiry);
+        }
+
+        // should only be called while holding a synchronization
+        // lock on the ConnectionPool
+        void add(HttpConnection conn) {
+            add(conn, Instant.now(), KEEP_ALIVE);
+        }
+
+        // Used by whitebox test.
+        void add(HttpConnection conn, Instant now, long keepAlive) {
+            Instant then = now.truncatedTo(ChronoUnit.SECONDS)
+                    .plus(keepAlive, ChronoUnit.SECONDS);
+
+            // Elements with the farther deadline are at the head of
+            // the list. It's more likely that the new element will
+            // have the farthest deadline, and will need to be inserted
+            // at the head of the list, so we're using an ascending
+            // list iterator to find the right insertion point.
+            ListIterator<ExpiryEntry> li = list.listIterator();
+            while (li.hasNext()) {
+                ExpiryEntry entry = li.next();
+
+                if (then.isAfter(entry.expiry)) {
+                    li.previous();
+                    // insert here
+                    li.add(new ExpiryEntry(conn, then));
+                    mayContainEntries = true;
             return;
         }
-        ListIterator<ExpiryEntry> li = expiryList.listIterator();
+            }
+            // last (or first) element of list (the last element is
+            // the first when the list is empty)
+            list.add(new ExpiryEntry(conn, then));
+            mayContainEntries = true;
+        }
+
+        // should only be called while holding a synchronization
+        // lock on the ConnectionPool
+        void remove(HttpConnection c) {
+            if (c == null || list.isEmpty()) return;
+            ListIterator<ExpiryEntry> li = list.listIterator();
         while (li.hasNext()) {
             ExpiryEntry e = li.next();
             if (e.connection.equals(c)) {
                 li.remove();
+                    mayContainEntries = !list.isEmpty();
                 return;
             }
         }
-        CacheCleaner cleaner = this.cleanerRef.get();
-        if (expiryList.isEmpty() && cleaner != null) {
-            this.cleanerRef.compareAndSet(cleaner, null);
-            cleaner.stopCleaner();
-            cleaner.interrupt();
         }
-    }
-
-    private void cleanCache(CacheCleaner cleaner) {
-        long now = System.currentTimeMillis() / 1000;
-        LinkedList<HttpConnection> closelist = new LinkedList<>();
 
-        synchronized (this) {
-            ListIterator<ExpiryEntry> li = expiryList.listIterator();
+        // should only be called while holding a synchronization
+        // lock on the ConnectionPool.
+        // Purge all elements whose deadline is before now (now included).
+        List<HttpConnection> purgeUntil(Instant now) {
+            if (list.isEmpty()) return Collections.emptyList();
+
+            List<HttpConnection> closelist = new ArrayList<>();
+
+            // elements with the closest deadlines are at the tail
+            // of the queue, so we're going to use a descending iterator
+            // to remove them, and stop when we find the first element
+            // that has not expired yet.
+            Iterator<ExpiryEntry> li = list.descendingIterator();
             while (li.hasNext()) {
                 ExpiryEntry entry = li.next();
-                if (entry.expiry <= now) {
+                // use !isAfter instead of isBefore in order to
+                // remove the entry if its expiry == now
+                if (!entry.expiry.isAfter(now)) {
                     li.remove();
                     HttpConnection c = entry.connection;
                     closelist.add(c);
+                } else break; // the list is sorted
+            }
+            mayContainEntries = !list.isEmpty();
+            return closelist;
+        }
+
+        // should only be called while holding a synchronization
+        // lock on the ConnectionPool
+        java.util.stream.Stream<ExpiryEntry> stream() {
+            return list.stream();
+        }
+
+        // should only be called while holding a synchronization
+        // lock on the ConnectionPool
+        void clear() {
+            list.clear();
+            mayContainEntries = false;
+        }
+    }
+
+    void cleanup(HttpConnection c, Throwable error) {
+        debug.log(Level.DEBUG,
+                  "%s : ConnectionPool.cleanup(%s)",
+                  String.valueOf(c.getConnectionFlow()),
+                  error);
+        synchronized(this) {
                     if (c instanceof PlainHttpConnection) {
                         removeFromPool(c, plainPool);
                     } else {
+                assert c.isSecure();
                         removeFromPool(c, sslPool);
                     }
+            expiryList.remove(c);
                 }
-            }
-            if (expiryList.isEmpty() && cleaner != null) {
-                this.cleanerRef.compareAndSet(cleaner, null);
-                cleaner.stopCleaner();
-            }
-        }
-        for (HttpConnection c : closelist) {
-            //System.out.println ("KAC: closing " + c);
             c.close();
         }
-    }
 
-    private synchronized void addToExpiryList(HttpConnection conn) {
-        long now = System.currentTimeMillis() / 1000;
-        long then = now + KEEP_ALIVE;
-        if (expiryList.isEmpty()) {
-            CacheCleaner cleaner = new CacheCleaner(this);
-            if (this.cleanerRef.compareAndSet(null, cleaner)) {
-                cleaner.start();
+    /**
+     * An object that subscribes to the flow while the connection is in
+     * the pool. Anything that comes in will cause the connection to be closed
+     * and removed from the pool.
+     */
+    private final class CleanupTrigger implements
+            FlowTube.TubeSubscriber, FlowTube.TubePublisher,
+            Flow.Subscription {
+
+        private final HttpConnection connection;
+        private volatile boolean done;
+
+        public CleanupTrigger(HttpConnection connection) {
+            this.connection = connection;
             }
-            expiryList.add(new ExpiryEntry(conn, then));
-            return;
+
+        public boolean isDone() { return done;}
+
+        private void triggerCleanup(Throwable error) {
+            done = true;
+            cleanup(connection, error);
         }
 
-        ListIterator<ExpiryEntry> li = expiryList.listIterator();
-        while (li.hasNext()) {
-            ExpiryEntry entry = li.next();
+        @Override public void request(long n) {}
+        @Override public void cancel() {}
 
-            if (then > entry.expiry) {
-                li.previous();
-                // insert here
-                li.add(new ExpiryEntry(conn, then));
-                return;
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            subscription.request(1);
+        }
+        @Override
+        public void onError(Throwable error) { triggerCleanup(error); }
+        @Override
+        public void onComplete() { triggerCleanup(null); }
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            triggerCleanup(new IOException("Data received while in pool"));
+        }
+
+        @Override
+        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+            subscriber.onSubscribe(this);
             }
+
+        @Override
+        public String toString() {
+            return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
         }
-        // first element of list
-        expiryList.add(new ExpiryEntry(conn, then));
+
     }
+
 }
< prev index next >