< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ConnectionPool.java

Print this page


   1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.lang.ref.WeakReference;

  29 import java.net.InetSocketAddress;





  30 import java.util.HashMap;

  31 import java.util.LinkedList;

  32 import java.util.ListIterator;
  33 import java.util.Objects;
  34 import java.util.concurrent.atomic.AtomicLong;
  35 import java.util.concurrent.atomic.AtomicReference;


  36 import jdk.incubator.http.internal.common.Utils;
  37 
  38 /**
  39  * Http 1.1 connection pool.
  40  */
  41 final class ConnectionPool {
  42 
  43     // These counters are used to distribute ids for debugging
  44     // The ACTIVE_CLEANER_COUNTER will tell how many CacheCleaner
  45     // are active at a given time. It will increase when a new
  46     // CacheCleaner is started and decrease when it exits.
  47     static final AtomicLong ACTIVE_CLEANER_COUNTER = new AtomicLong();
  48     // The POOL_IDS_COUNTER increases each time a new ConnectionPool
  49     // is created. It may wrap and become negative but will never be
  50     // decremented.
  51     static final AtomicLong POOL_IDS_COUNTER = new AtomicLong();
  52     // The cleanerCounter is used to name cleaner threads within a
  53     // a connection pool, and increments monotically.
  54     // It may wrap and become negative but will never be
  55     // decremented.
  56     final AtomicLong cleanerCounter = new AtomicLong();
  57 
  58     static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
  59             "jdk.httpclient.keepalive.timeout", 1200); // seconds


  60 
  61     // Pools of idle connections
  62 
  63     final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
  64     final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
  65     // A monotically increasing id for this connection pool.
  66     // It may be negative (that's OK)
  67     // Mostly used for debugging purposes when looking at thread dumps.
  68     // Global scope.
  69     final long poolID = POOL_IDS_COUNTER.incrementAndGet();
  70     final AtomicReference<CacheCleaner> cleanerRef;
  71 
  72     /**
  73      * Entries in connection pool are keyed by destination address and/or
  74      * proxy address:
  75      * case 1: plain TCP not via proxy (destination only)
  76      * case 2: plain TCP via proxy (proxy only)
  77      * case 3: SSL not via proxy (destination only)
  78      * case 4: SSL over tunnel (destination and proxy)
  79      */
  80     static class CacheKey {
  81         final InetSocketAddress proxy;
  82         final InetSocketAddress destination;
  83 
  84         CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
  85             this.proxy = proxy;
  86             this.destination = destination;
  87         }
  88 
  89         @Override
  90         public boolean equals(Object obj) {


  93             }
  94             if (getClass() != obj.getClass()) {
  95                 return false;
  96             }
  97             final CacheKey other = (CacheKey) obj;
  98             if (!Objects.equals(this.proxy, other.proxy)) {
  99                 return false;
 100             }
 101             if (!Objects.equals(this.destination, other.destination)) {
 102                 return false;
 103             }
 104             return true;
 105         }
 106 
 107         @Override
 108         public int hashCode() {
 109             return Objects.hash(proxy, destination);
 110         }
 111     }
 112 
 113     static class ExpiryEntry {
 114         final HttpConnection connection;
 115         final long expiry; // absolute time in seconds of expiry time
 116         ExpiryEntry(HttpConnection connection, long expiry) {
 117             this.connection = connection;
 118             this.expiry = expiry;
 119         }
 120     }
 121 
 122     final LinkedList<ExpiryEntry> expiryList;
 123 
 124     /**
 125      * There should be one of these per HttpClient.
 126      */
 127     ConnectionPool() {

 128         plainPool = new HashMap<>();
 129         sslPool = new HashMap<>();
 130         expiryList = new LinkedList<>();
 131         cleanerRef = new AtomicReference<>();



 132     }
 133 
 134     void start() {

 135     }
 136 
 137     static CacheKey cacheKey(InetSocketAddress destination,
 138                              InetSocketAddress proxy)
 139     {
 140         return new CacheKey(destination, proxy);
 141     }
 142 
 143     synchronized HttpConnection getConnection(boolean secure,
 144                                               InetSocketAddress addr,
 145                                               InetSocketAddress proxy) {

 146         CacheKey key = new CacheKey(addr, proxy);
 147         HttpConnection c = secure ? findConnection(key, sslPool)
 148                                   : findConnection(key, plainPool);
 149         //System.out.println ("getConnection returning: " + c);
 150         return c;
 151     }
 152 
 153     /**
 154      * Returns the connection to the pool.
 155      */
 156     synchronized void returnToPool(HttpConnection conn) {




















 157         if (conn instanceof PlainHttpConnection) {
 158             putConnection(conn, plainPool);
 159         } else {

 160             putConnection(conn, sslPool);
 161         }
 162         addToExpiryList(conn);

 163         //System.out.println("Return to pool: " + conn);
 164     }
 165 











 166     private HttpConnection
 167     findConnection(CacheKey key,
 168                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 169         LinkedList<HttpConnection> l = pool.get(key);
 170         if (l == null || l.isEmpty()) {
 171             return null;
 172         } else {
 173             HttpConnection c = l.removeFirst();
 174             removeFromExpiryList(c);
 175             return c;
 176         }
 177     }
 178 
 179     /* called from cache cleaner only  */
 180     private void
 181     removeFromPool(HttpConnection c,
 182                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 183         //System.out.println("cacheCleaner removing: " + c);
 184         LinkedList<HttpConnection> l = pool.get(c.cacheKey());
 185         assert l != null;
 186         boolean wasPresent = l.remove(c);
 187         assert wasPresent;




 188     }
 189 
 190     private void
 191     putConnection(HttpConnection c,
 192                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 193         CacheKey key = c.cacheKey();
 194         LinkedList<HttpConnection> l = pool.get(key);
 195         if (l == null) {
 196             l = new LinkedList<>();
 197             pool.put(key, l);
 198         }
 199         l.add(c);
 200     }
 201 
 202     static String makeCleanerName(long poolId, long cleanerId) {
 203         return "HTTP-Cache-cleaner-" + poolId + "-" + cleanerId;
 204     }
 205 
 206     // only runs while entries exist in cache
 207     final static class CacheCleaner extends Thread {
 208 
 209         volatile boolean stopping;
 210         // A monotically increasing id. May wrap and become negative (that's OK)
 211         // Mostly used for debugging purposes when looking at thread dumps.
 212         // Scoped per connection pool.
 213         final long cleanerID;
 214         // A reference to the owning ConnectionPool.
 215         // This reference's referent may become null if the HttpClientImpl
 216         // that owns this pool is GC'ed.
 217         final WeakReference<ConnectionPool> ownerRef;





 218 
 219         CacheCleaner(ConnectionPool owner) {
 220             this(owner, owner.cleanerCounter.incrementAndGet());








 221         }
 222 
 223         CacheCleaner(ConnectionPool owner, long cleanerID) {
 224             super(null, null, makeCleanerName(owner.poolID, cleanerID), 0, false);
 225             this.cleanerID = cleanerID;
 226             this.ownerRef = new WeakReference<>(owner);
 227             setDaemon(true);
 228         }
 229 
 230         synchronized boolean stopping() {
 231             return stopping || ownerRef.get() == null;
 232         }
 233 
 234         synchronized void stopCleaner() {
 235             stopping = true;
 236         }
 237 
 238         @Override
 239         public void run() {
 240             ACTIVE_CLEANER_COUNTER.incrementAndGet();
 241             try {
 242                 while (!stopping()) {





 243                     try {
 244                         Thread.sleep(3000);
 245                     } catch (InterruptedException e) {}
 246                     ConnectionPool owner = ownerRef.get();
 247                     if (owner == null) return;
 248                     owner.cleanCache(this);
 249                     owner = null;


 250                 }
 251             } finally {
 252                 ACTIVE_CLEANER_COUNTER.decrementAndGet();
 253             }
 254         }








 255     }
 256 
 257     synchronized void removeFromExpiryList(HttpConnection c) {
 258         if (c == null) {

















































 259             return;
 260         }
 261         ListIterator<ExpiryEntry> li = expiryList.listIterator();











 262         while (li.hasNext()) {
 263             ExpiryEntry e = li.next();
 264             if (e.connection.equals(c)) {
 265                 li.remove();

 266                 return;
 267             }
 268         }
 269         CacheCleaner cleaner = this.cleanerRef.get();
 270         if (expiryList.isEmpty() && cleaner != null) {
 271             this.cleanerRef.compareAndSet(cleaner, null);
 272             cleaner.stopCleaner();
 273             cleaner.interrupt();
 274         }
 275     }
 276 
 277     private void cleanCache(CacheCleaner cleaner) {
 278         long now = System.currentTimeMillis() / 1000;
 279         LinkedList<HttpConnection> closelist = new LinkedList<>();
 280 
 281         synchronized (this) {
 282             ListIterator<ExpiryEntry> li = expiryList.listIterator();











 283             while (li.hasNext()) {
 284                 ExpiryEntry entry = li.next();
 285                 if (entry.expiry <= now) {


 286                     li.remove();
 287                     HttpConnection c = entry.connection;
 288                     closelist.add(c);


























 289                     if (c instanceof PlainHttpConnection) {
 290                         removeFromPool(c, plainPool);
 291                     } else {

 292                         removeFromPool(c, sslPool);
 293                     }

 294                 }
 295             }
 296             if (expiryList.isEmpty() && cleaner != null) {
 297                 this.cleanerRef.compareAndSet(cleaner, null);
 298                 cleaner.stopCleaner();
 299             }
 300         }
 301         for (HttpConnection c : closelist) {
 302             //System.out.println ("KAC: closing " + c);
 303             c.close();
 304         }
 305     }
 306 
 307     private synchronized void addToExpiryList(HttpConnection conn) {
 308         long now = System.currentTimeMillis() / 1000;
 309         long then = now + KEEP_ALIVE;
 310         if (expiryList.isEmpty()) {
 311             CacheCleaner cleaner = new CacheCleaner(this);
 312             if (this.cleanerRef.compareAndSet(null, cleaner)) {
 313                 cleaner.start();







 314             }
 315             expiryList.add(new ExpiryEntry(conn, then));
 316             return;




 317         }
 318 
 319         ListIterator<ExpiryEntry> li = expiryList.listIterator();
 320         while (li.hasNext()) {
 321             ExpiryEntry entry = li.next();
 322 
 323             if (then > entry.expiry) {
 324                 li.previous();
 325                 // insert here
 326                 li.add(new ExpiryEntry(conn, then));
 327                 return;











 328             }




 329         }
 330         // first element of list
 331         expiryList.add(new ExpiryEntry(conn, then));
 332     }

 333 }
   1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.InetSocketAddress;
  31 import java.nio.ByteBuffer;
  32 import java.time.Instant;
  33 import java.time.temporal.ChronoUnit;
  34 import java.util.ArrayList;
  35 import java.util.Collections;
  36 import java.util.HashMap;
  37 import java.util.Iterator;
  38 import java.util.LinkedList;
  39 import java.util.List;
  40 import java.util.ListIterator;
  41 import java.util.Objects;
  42 import java.util.Optional;
  43 import java.util.concurrent.Flow;
  44 import java.util.stream.Collectors;
  45 import jdk.incubator.http.internal.common.FlowTube;
  46 import jdk.incubator.http.internal.common.Utils;
  47 
  48 /**
  49  * Http 1.1 connection pool.
  50  */
  51 final class ConnectionPool {
  52 















  53     static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
  54             "jdk.httpclient.keepalive.timeout", 1200); // seconds
  55     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  56     final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
  57 
  58     // Pools of idle connections
  59 
  60     private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
  61     private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
  62     private final ExpiryList expiryList;
  63     private final String dbgTag; // used for debug
  64     boolean stopped;



  65 
  66     /**
  67      * Entries in connection pool are keyed by destination address and/or
  68      * proxy address:
  69      * case 1: plain TCP not via proxy (destination only)
  70      * case 2: plain TCP via proxy (proxy only)
  71      * case 3: SSL not via proxy (destination only)
  72      * case 4: SSL over tunnel (destination and proxy)
  73      */
  74     static class CacheKey {
  75         final InetSocketAddress proxy;
  76         final InetSocketAddress destination;
  77 
  78         CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
  79             this.proxy = proxy;
  80             this.destination = destination;
  81         }
  82 
  83         @Override
  84         public boolean equals(Object obj) {


  87             }
  88             if (getClass() != obj.getClass()) {
  89                 return false;
  90             }
  91             final CacheKey other = (CacheKey) obj;
  92             if (!Objects.equals(this.proxy, other.proxy)) {
  93                 return false;
  94             }
  95             if (!Objects.equals(this.destination, other.destination)) {
  96                 return false;
  97             }
  98             return true;
  99         }
 100 
 101         @Override
 102         public int hashCode() {
 103             return Objects.hash(proxy, destination);
 104         }
 105     }
 106 
 107     ConnectionPool(long clientId) {
 108         this("ConnectionPool("+clientId+")");




 109     }



 110 
 111     /**
 112      * There should be one of these per HttpClient.
 113      */
 114     private ConnectionPool(String tag) {
 115         dbgTag = tag;
 116         plainPool = new HashMap<>();
 117         sslPool = new HashMap<>();
 118         expiryList = new ExpiryList();
 119     }
 120 
 121     final String dbgString() {
 122         return dbgTag;
 123     }
 124 
 125     void start() {
 126         assert !stopped : "Already stopped";
 127     }
 128 
 129     static CacheKey cacheKey(InetSocketAddress destination,
 130                              InetSocketAddress proxy)
 131     {
 132         return new CacheKey(destination, proxy);
 133     }
 134 
 135     synchronized HttpConnection getConnection(boolean secure,
 136                                               InetSocketAddress addr,
 137                                               InetSocketAddress proxy) {
 138         if (stopped) return null;
 139         CacheKey key = new CacheKey(addr, proxy);
 140         HttpConnection c = secure ? findConnection(key, sslPool)
 141                                   : findConnection(key, plainPool);
 142         //System.out.println ("getConnection returning: " + c);
 143         return c;
 144     }
 145 
 146     /**
 147      * Returns the connection to the pool.
 148      */
 149     void returnToPool(HttpConnection conn) {
 150         returnToPool(conn, Instant.now(), KEEP_ALIVE);
 151     }
 152 
 153     // Called also by whitebox tests
 154     void returnToPool(HttpConnection conn, Instant now, long keepAlive) {
 155 
 156         // Don't call registerCleanupTrigger while holding a lock,
 157         // but register it before the connection is added to the pool,
 158         // since we don't want to trigger the cleanup if the connection
 159         // is not in the pool.
 160         CleanupTrigger cleanup = registerCleanupTrigger(conn);
 161 
 162         // it's possible that cleanup may have been called.
 163         synchronized(this) {
 164             if (cleanup.isDone()) {
 165                 return;
 166             } else if (stopped) {
 167                 conn.close();
 168                 return;
 169             }
 170             if (conn instanceof PlainHttpConnection) {
 171                 putConnection(conn, plainPool);
 172             } else {
 173                 assert conn.isSecure();
 174                 putConnection(conn, sslPool);
 175             }
 176             expiryList.add(conn, now, keepAlive);
 177         }
 178         //System.out.println("Return to pool: " + conn);
 179     }
 180 
 181     private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
 182         // Connect the connection flow to a pub/sub pair that will take the
 183         // connection out of the pool and close it if anything happens
 184         // while the connection is sitting in the pool.
 185         CleanupTrigger cleanup = new CleanupTrigger(conn);
 186         FlowTube flow = conn.getConnectionFlow();
 187         debug.log(Level.DEBUG, "registering %s", cleanup);
 188         flow.connectFlows(cleanup, cleanup);
 189         return cleanup;
 190     }
 191 
 192     private HttpConnection
 193     findConnection(CacheKey key,
 194                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 195         LinkedList<HttpConnection> l = pool.get(key);
 196         if (l == null || l.isEmpty()) {
 197             return null;
 198         } else {
 199             HttpConnection c = l.removeFirst();
 200             expiryList.remove(c);
 201             return c;
 202         }
 203     }
 204 
 205     /* called from cache cleaner only  */
 206     private boolean
 207     removeFromPool(HttpConnection c,
 208                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 209         //System.out.println("cacheCleaner removing: " + c);
 210         assert Thread.holdsLock(this);
 211         CacheKey k = c.cacheKey();
 212         List<HttpConnection> l = pool.get(k);
 213         if (l == null || l.isEmpty()) {
 214             pool.remove(k);
 215             return false;
 216         }
 217         return l.remove(c);
 218     }
 219 
 220     private void
 221     putConnection(HttpConnection c,
 222                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
 223         CacheKey key = c.cacheKey();
 224         LinkedList<HttpConnection> l = pool.get(key);
 225         if (l == null) {
 226             l = new LinkedList<>();
 227             pool.put(key, l);
 228         }
 229         l.add(c);
 230     }
 231 
 232     /**
 233      * Purge expired connection and return the number of milliseconds
 234      * in which the next connection is scheduled to expire.
 235      * If no connections are scheduled to be purged return 0.
 236      * @return the delay in milliseconds in which the next connection will
 237      *         expire.
 238      */
 239     long purgeExpiredConnectionsAndReturnNextDeadline() {
 240         if (!expiryList.purgeMaybeRequired()) return 0;
 241         return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
 242     }
 243 
 244     // Used for whitebox testing
 245     long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
 246         long nextPurge = 0;
 247 
 248         // We may be in the process of adding new elements
 249         // to the expiry list - but those elements will not
 250         // have outlast their keep alive timer yet since we're
 251         // just adding them.
 252         if (!expiryList.purgeMaybeRequired()) return nextPurge;
 253 
 254         List<HttpConnection> closelist;
 255         synchronized (this) {
 256             closelist = expiryList.purgeUntil(now);
 257             for (HttpConnection c : closelist) {
 258                 if (c instanceof PlainHttpConnection) {
 259                     boolean wasPresent = removeFromPool(c, plainPool);
 260                     assert wasPresent;
 261                 } else {
 262                     boolean wasPresent = removeFromPool(c, sslPool);
 263                     assert wasPresent;
 264                 }






 265             }
 266             nextPurge = now.until(
 267                     expiryList.nextExpiryDeadline().orElse(now),
 268                     ChronoUnit.MILLIS);
 269         }
 270         closelist.forEach(this::close);
 271         return nextPurge;

 272     }
 273 
 274     private void close(HttpConnection c) {


 275         try {
 276             c.close();
 277         } catch (Throwable e) {} // ignore
 278     }
 279 
 280     void stop() {
 281         List<HttpConnection> closelist = Collections.emptyList();
 282         try {
 283             synchronized (this) {
 284                 stopped = true;
 285                 closelist = expiryList.stream()
 286                     .map(e -> e.connection)
 287                     .collect(Collectors.toList());
 288                 expiryList.clear();
 289                 plainPool.clear();
 290                 sslPool.clear();
 291             }
 292         } finally {
 293             closelist.forEach(this::close);
 294         }
 295     }
 296 
 297     static final class ExpiryEntry {
 298         final HttpConnection connection;
 299         final Instant expiry; // absolute time in seconds of expiry time
 300         ExpiryEntry(HttpConnection connection, Instant expiry) {
 301             this.connection = connection;
 302             this.expiry = expiry;
 303         }
 304     }
 305 
 306     /**
 307      * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
 308      * deadline is at the tail of the list, and the entry with the farther
 309      * deadline is at the head. In the most common situation, new elements
 310      * will need to be added at the head (or close to it), and expired elements
 311      * will need to be purged from the tail.
 312      */
 313     private static final class ExpiryList {
 314         private final LinkedList<ExpiryEntry> list = new LinkedList<>();
 315         private volatile boolean mayContainEntries;
 316 
 317         // A loosely accurate boolean whose value is computed
 318         // at the end of each operation performed on ExpiryList;
 319         // Does not require synchronizing on the ConnectionPool.
 320         boolean purgeMaybeRequired() {
 321             return mayContainEntries;
 322         }
 323 
 324         // Returns the next expiry deadline
 325         // should only be called while holding a synchronization
 326         // lock on the ConnectionPool
 327         Optional<Instant> nextExpiryDeadline() {
 328             if (list.isEmpty()) return Optional.empty();
 329             else return Optional.of(list.getLast().expiry);
 330         }
 331 
 332         // should only be called while holding a synchronization
 333         // lock on the ConnectionPool
 334         void add(HttpConnection conn) {
 335             add(conn, Instant.now(), KEEP_ALIVE);
 336         }
 337 
 338         // Used by whitebox test.
 339         void add(HttpConnection conn, Instant now, long keepAlive) {
 340             Instant then = now.truncatedTo(ChronoUnit.SECONDS)
 341                     .plus(keepAlive, ChronoUnit.SECONDS);
 342 
 343             // Elements with the farther deadline are at the head of
 344             // the list. It's more likely that the new element will
 345             // have the farthest deadline, and will need to be inserted
 346             // at the head of the list, so we're using an ascending
 347             // list iterator to find the right insertion point.
 348             ListIterator<ExpiryEntry> li = list.listIterator();
 349             while (li.hasNext()) {
 350                 ExpiryEntry entry = li.next();
 351 
 352                 if (then.isAfter(entry.expiry)) {
 353                     li.previous();
 354                     // insert here
 355                     li.add(new ExpiryEntry(conn, then));
 356                     mayContainEntries = true;
 357                     return;
 358                 }
 359             }
 360             // last (or first) element of list (the last element is
 361             // the first when the list is empty)
 362             list.add(new ExpiryEntry(conn, then));
 363             mayContainEntries = true;
 364         }
 365 
 366         // should only be called while holding a synchronization
 367         // lock on the ConnectionPool
 368         void remove(HttpConnection c) {
 369             if (c == null || list.isEmpty()) return;
 370             ListIterator<ExpiryEntry> li = list.listIterator();
 371             while (li.hasNext()) {
 372                 ExpiryEntry e = li.next();
 373                 if (e.connection.equals(c)) {
 374                     li.remove();
 375                     mayContainEntries = !list.isEmpty();
 376                     return;
 377                 }
 378             }





 379         }





 380 
 381         // should only be called while holding a synchronization
 382         // lock on the ConnectionPool.
 383         // Purge all elements whose deadline is before now (now included).
 384         List<HttpConnection> purgeUntil(Instant now) {
 385             if (list.isEmpty()) return Collections.emptyList();
 386 
 387             List<HttpConnection> closelist = new ArrayList<>();
 388 
 389             // elements with the closest deadlines are at the tail
 390             // of the queue, so we're going to use a descending iterator
 391             // to remove them, and stop when we find the first element
 392             // that has not expired yet.
 393             Iterator<ExpiryEntry> li = list.descendingIterator();
 394             while (li.hasNext()) {
 395                 ExpiryEntry entry = li.next();
 396                 // use !isAfter instead of isBefore in order to
 397                 // remove the entry if its expiry == now
 398                 if (!entry.expiry.isAfter(now)) {
 399                     li.remove();
 400                     HttpConnection c = entry.connection;
 401                     closelist.add(c);
 402                 } else break; // the list is sorted
 403             }
 404             mayContainEntries = !list.isEmpty();
 405             return closelist;
 406         }
 407 
 408         // should only be called while holding a synchronization
 409         // lock on the ConnectionPool
 410         java.util.stream.Stream<ExpiryEntry> stream() {
 411             return list.stream();
 412         }
 413 
 414         // should only be called while holding a synchronization
 415         // lock on the ConnectionPool
 416         void clear() {
 417             list.clear();
 418             mayContainEntries = false;
 419         }
 420     }
 421 
 422     void cleanup(HttpConnection c, Throwable error) {
 423         debug.log(Level.DEBUG,
 424                   "%s : ConnectionPool.cleanup(%s)",
 425                   String.valueOf(c.getConnectionFlow()),
 426                   error);
 427         synchronized(this) {
 428             if (c instanceof PlainHttpConnection) {
 429                 removeFromPool(c, plainPool);
 430             } else {
 431                 assert c.isSecure();
 432                 removeFromPool(c, sslPool);
 433             }
 434             expiryList.remove(c);
 435         }








 436         c.close();
 437     }

 438 
 439     /**
 440      * An object that subscribes to the flow while the connection is in
 441      * the pool. Anything that comes in will cause the connection to be closed
 442      * and removed from the pool.
 443      */
 444     private final class CleanupTrigger implements
 445             FlowTube.TubeSubscriber, FlowTube.TubePublisher,
 446             Flow.Subscription {
 447 
 448         private final HttpConnection connection;
 449         private volatile boolean done;
 450 
 451         public CleanupTrigger(HttpConnection connection) {
 452             this.connection = connection;
 453         }
 454 
 455         public boolean isDone() { return done;}
 456 
 457         private void triggerCleanup(Throwable error) {
 458             done = true;
 459             cleanup(connection, error);
 460         }
 461 
 462         @Override public void request(long n) {}
 463         @Override public void cancel() {}

 464 
 465         @Override
 466         public void onSubscribe(Flow.Subscription subscription) {
 467             subscription.request(1);
 468         }
 469         @Override
 470         public void onError(Throwable error) { triggerCleanup(error); }
 471         @Override
 472         public void onComplete() { triggerCleanup(null); }
 473         @Override
 474         public void onNext(List<ByteBuffer> item) {
 475             triggerCleanup(new IOException("Data received while in pool"));
 476         }
 477 
 478         @Override
 479         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 480             subscriber.onSubscribe(this);
 481         }
 482 
 483         @Override
 484         public String toString() {
 485             return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
 486         }
 487 

 488     }
 489 
 490 }
< prev index next >