1 /*
   2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 package jdk.internal.net.http;
  25 
  26 import java.io.IOException;
  27 import java.lang.management.ManagementFactory;
  28 import java.net.Authenticator;
  29 import java.net.CookieHandler;
  30 import java.net.InetSocketAddress;
  31 import java.net.ProxySelector;
  32 import java.nio.ByteBuffer;
  33 import java.nio.channels.SocketChannel;
  34 import java.time.Duration;
  35 import java.util.Arrays;
  36 import java.util.List;
  37 import java.util.Optional;
  38 import java.util.Random;
  39 import java.util.concurrent.CompletableFuture;
  40 import java.util.concurrent.Executor;
  41 import java.util.concurrent.Flow;
  42 import java.util.stream.IntStream;
  43 import java.time.Instant;
  44 import java.time.temporal.ChronoUnit;
  45 import javax.net.ssl.SSLContext;
  46 import javax.net.ssl.SSLParameters;
  47 import java.net.http.HttpClient;
  48 import java.net.http.HttpRequest;
  49 import java.net.http.HttpResponse;
  50 import jdk.internal.net.http.common.FlowTube;
  51 
  52 /**
  53  * @summary Verifies that the ConnectionPool correctly handle
  54  *          connection deadlines and purges the right connections
  55  *          from the cache.
  56  * @bug 8187044 8187111
  57  * @author danielfuchs
  58  */
  59 public class ConnectionPoolTest {
  60 
  61     static long getActiveCleaners() throws ClassNotFoundException {
  62         // ConnectionPool.ACTIVE_CLEANER_COUNTER.get()
  63         // ConnectionPoolTest.class.getModule().addReads(
  64         //      Class.forName("java.lang.management.ManagementFactory").getModule());
  65         return java.util.stream.Stream.of(ManagementFactory.getThreadMXBean()
  66                 .dumpAllThreads(false, false))
  67               .filter(t -> t.getThreadName().startsWith("HTTP-Cache-cleaner"))
  68               .count();
  69     }
  70 
  71     public static void main(String[] args) throws Exception {
  72         if (args.length == 0) {
  73             args = new String[] {"testCacheCleaners"};
  74         }
  75         for (String arg : args) {
  76             if ("testCacheCleaners".equals(arg)) {
  77                 testCacheCleaners();
  78             } else if ("testPoolSize".equals(arg)) {
  79                 assert args.length == 1 : "testPoolSize should be run in its own VM";
  80                 testPoolSize();
  81             }
  82         }
  83     }
  84 
  85     public static void testCacheCleaners() throws Exception {
  86         ConnectionPool pool = new ConnectionPool(666);
  87         HttpClient client = new HttpClientStub(pool);
  88         InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80);
  89         System.out.println("Adding 20 connections to pool");
  90         Random random = new Random();
  91 
  92         final int count = 20;
  93         Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
  94         int[] keepAlives = new int[count];
  95         HttpConnectionStub[] connections = new HttpConnectionStub[count];
  96         long purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
  97         long expected = 0;
  98         if (purge != expected) {
  99             throw new RuntimeException("Bad purge delay: " + purge
 100                                         + ", expected " + expected);
 101         }
 102         expected = Long.MAX_VALUE;
 103         for (int i=0; i<count; i++) {
 104             InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
 105             keepAlives[i] = random.nextInt(10) * 10  + 10;
 106             connections[i] = new HttpConnectionStub(client, addr, proxy, true);
 107             System.out.println("Adding connection: " + now
 108                                 + " keepAlive: " + keepAlives[i]
 109                                 + " /" + connections[i]);
 110             pool.returnToPool(connections[i], now, keepAlives[i]);
 111             expected = Math.min(expected, keepAlives[i] * 1000);
 112             purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
 113             if (purge != expected) {
 114                 throw new RuntimeException("Bad purge delay: " + purge
 115                                         + ", expected " + expected);
 116             }
 117         }
 118         int min = IntStream.of(keepAlives).min().getAsInt();
 119         int max = IntStream.of(keepAlives).max().getAsInt();
 120         int mean = (min + max)/2;
 121         System.out.println("min=" + min + ", max=" + max + ", mean=" + mean);
 122         purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
 123         System.out.println("first purge would be in " + purge + " ms");
 124         if (Math.abs(purge/1000 - min) > 0) {
 125             throw new RuntimeException("expected " + min + " got " + purge/1000);
 126         }
 127         long opened = java.util.stream.Stream.of(connections)
 128                      .filter(HttpConnectionStub::connected).count();
 129         if (opened != count) {
 130             throw new RuntimeException("Opened: expected "
 131                                        + count + " got " + opened);
 132         }
 133         purge = mean * 1000;
 134         System.out.println("start purging at " + purge + " ms");
 135         Instant next = now;
 136         do {
 137            System.out.println("next purge is in " + purge + " ms");
 138            next = next.plus(purge, ChronoUnit.MILLIS);
 139            purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(next);
 140            long k = now.until(next, ChronoUnit.SECONDS);
 141            System.out.println("now is " + k + "s from start");
 142            for (int i=0; i<count; i++) {
 143                if (connections[i].connected() != (k < keepAlives[i])) {
 144                    throw new RuntimeException("Bad connection state for "
 145                              + i
 146                              + "\n\t connected=" + connections[i].connected()
 147                              + "\n\t keepAlive=" + keepAlives[i]
 148                              + "\n\t elapsed=" + k);
 149                }
 150            }
 151         } while (purge > 0);
 152         opened = java.util.stream.Stream.of(connections)
 153                      .filter(HttpConnectionStub::connected).count();
 154         if (opened != 0) {
 155            throw new RuntimeException("Closed: expected "
 156                                        + count + " got "
 157                                        + (count-opened));
 158         }
 159     }
 160 
 161     public static void testPoolSize() throws Exception {
 162         final int MAX_POOL_SIZE = 10;
 163         System.setProperty("jdk.httpclient.connectionPoolSize",
 164                 String.valueOf(MAX_POOL_SIZE));
 165         ConnectionPool pool = new ConnectionPool(666);
 166         HttpClient client = new HttpClientStub(pool);
 167         InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80);
 168         System.out.println("Adding 20 connections to pool");
 169         Random random = new Random();
 170 
 171         final int count = 20;
 172         Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
 173         int[] keepAlives = new int[count];
 174         HttpConnectionStub[] connections = new HttpConnectionStub[count];
 175         long purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
 176         long expected = 0;
 177         if (purge != expected) {
 178             throw new RuntimeException("Bad purge delay: " + purge
 179                     + ", expected " + expected);
 180         }
 181         expected = Long.MAX_VALUE;
 182         int previous = 0;
 183         for (int i=0; i<count; i++) {
 184             InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
 185             keepAlives[i] = random.nextInt(10) * 10  + 5 + previous;
 186             previous = keepAlives[i];
 187             connections[i] = new HttpConnectionStub(client, addr, proxy, true);
 188             System.out.println("Adding connection: " + now
 189                     + " keepAlive: " + keepAlives[i]
 190                     + " /" + connections[i]);
 191             pool.returnToPool(connections[i], now, keepAlives[i]);
 192             if (i < MAX_POOL_SIZE) {
 193                 expected = Math.min(expected, keepAlives[i] * 1000);
 194             } else {
 195                 expected = keepAlives[i-MAX_POOL_SIZE+1] * 1000;
 196                 if (pool.contains(connections[i-MAX_POOL_SIZE])) {
 197                     throw new RuntimeException("Connection[" + i + "]/"
 198                             + connections[i] + " should have been removed");
 199                 }
 200             }
 201             purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
 202             if (purge != expected) {
 203                 throw new RuntimeException("Bad purge delay for " + i + ": "
 204                         + purge + ", expected " + expected);
 205             }
 206         }
 207 
 208         long opened = java.util.stream.Stream.of(connections)
 209                 .filter(HttpConnectionStub::connected).count();
 210         if (opened != MAX_POOL_SIZE) {
 211             throw new RuntimeException("Opened: expected "
 212                     + count + " got " + opened);
 213         }
 214         for (int i=0 ; i<count; i++) {
 215             boolean closed = (i < count - MAX_POOL_SIZE);
 216             if (connections[i].closed != closed) {
 217                 throw new RuntimeException("connection[" + i + "] should be "
 218                         + (closed ? "closed" : "opened"));
 219             }
 220             if (pool.contains(connections[i]) == closed) {
 221                 throw new RuntimeException("Connection[" + i + "]/"
 222                         + connections[i] + " should "
 223                         + (closed ? "" : "not ")
 224                         + "have been removed");
 225             }
 226         }
 227     }
 228 
 229     static <T> T error() {
 230         throw new InternalError("Should not reach here: wrong test assumptions!");
 231     }
 232 
 233     static class FlowTubeStub implements FlowTube {
 234         final HttpConnectionStub conn;
 235         FlowTubeStub(HttpConnectionStub conn) { this.conn = conn; }
 236         @Override
 237         public void onSubscribe(Flow.Subscription subscription) { }
 238         @Override public void onError(Throwable error) { error(); }
 239         @Override public void onComplete() { error(); }
 240         @Override public void onNext(List<ByteBuffer> item) { error();}
 241         @Override
 242         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 243         }
 244         @Override public boolean isFinished() { return conn.closed; }
 245     }
 246 
 247     // Emulates an HttpConnection that has a strong reference to its HttpClient.
 248     static class HttpConnectionStub extends HttpConnection {
 249 
 250         public HttpConnectionStub(HttpClient client,
 251                 InetSocketAddress address,
 252                 InetSocketAddress proxy,
 253                 boolean secured) {
 254             super(address, null);
 255             this.key = ConnectionPool.cacheKey(address, proxy);
 256             this.address = address;
 257             this.proxy = proxy;
 258             this.secured = secured;
 259             this.client = client;
 260             this.flow = new FlowTubeStub(this);
 261         }
 262 
 263         final InetSocketAddress proxy;
 264         final InetSocketAddress address;
 265         final boolean secured;
 266         final ConnectionPool.CacheKey key;
 267         final HttpClient client;
 268         final FlowTubeStub flow;
 269         volatile boolean closed;
 270 
 271         // All these return something
 272         @Override boolean connected() {return !closed;}
 273         @Override boolean isSecure() {return secured;}
 274         @Override boolean isProxied() {return proxy!=null;}
 275         @Override ConnectionPool.CacheKey cacheKey() {return key;}
 276         @Override
 277         public void close() {
 278             closed=true;
 279             System.out.println("closed: " + this);
 280         }
 281         @Override
 282         public String toString() {
 283             return "HttpConnectionStub: " + address + " proxy: " + proxy;
 284         }
 285 
 286         // All these throw errors
 287         @Override public HttpPublisher publisher() {return error();}
 288         @Override public CompletableFuture<Void> connectAsync(Exchange<?> e) {return error();}
 289         @Override public CompletableFuture<Void> finishConnect() {return error();}
 290         @Override SocketChannel channel() {return error();}
 291         @Override
 292         FlowTube getConnectionFlow() {return flow;}
 293     }
 294     // Emulates an HttpClient that has a strong reference to its connection pool.
 295     static class HttpClientStub extends HttpClient {
 296         public HttpClientStub(ConnectionPool pool) {
 297             this.pool = pool;
 298         }
 299         final ConnectionPool pool;
 300         @Override public Optional<CookieHandler> cookieHandler() {return error();}
 301         @Override public Optional<Duration> connectTimeout() {return error();}
 302         @Override public HttpClient.Redirect followRedirects() {return error();}
 303         @Override public Optional<ProxySelector> proxy() {return error();}
 304         @Override public SSLContext sslContext() {return error();}
 305         @Override public SSLParameters sslParameters() {return error();}
 306         @Override public Optional<Authenticator> authenticator() {return error();}
 307         @Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;}
 308         @Override public Optional<Executor> executor() {return error();}
 309         @Override
 310         public <T> HttpResponse<T> send(HttpRequest req,
 311                                         HttpResponse.BodyHandler<T> responseBodyHandler)
 312                 throws IOException, InterruptedException {
 313             return error();
 314         }
 315         @Override
 316         public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req,
 317                 HttpResponse.BodyHandler<T> responseBodyHandler) {
 318             return error();
 319         }
 320         @Override
 321         public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req,
 322                 HttpResponse.BodyHandler<T> bodyHandler,
 323                 HttpResponse.PushPromiseHandler<T> multiHandler) {
 324             return error();
 325         }
 326     }
 327 
 328 }