1 /*
   2  * Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 package jdk.internal.net.http;
  25 
  26 import java.io.IOException;
  27 import java.lang.management.ManagementFactory;
  28 import java.lang.ref.Reference;
  29 import java.net.Authenticator;
  30 import java.net.CookieHandler;
  31 import java.net.InetSocketAddress;
  32 import java.net.ProxySelector;
  33 import java.net.Socket;
  34 import java.net.SocketAddress;
  35 import java.net.SocketOption;
  36 import java.net.http.HttpHeaders;
  37 import java.nio.ByteBuffer;
  38 import java.nio.channels.SocketChannel;
  39 import java.nio.channels.spi.SelectorProvider;
  40 import java.time.Duration;
  41 import java.util.HashMap;
  42 import java.util.List;
  43 import java.util.Map;
  44 import java.util.Optional;
  45 import java.util.Random;
  46 import java.util.Set;
  47 import java.util.concurrent.CompletableFuture;
  48 import java.util.concurrent.Executor;
  49 import java.util.concurrent.Flow;
  50 import java.util.stream.IntStream;
  51 import java.time.Instant;
  52 import java.time.temporal.ChronoUnit;
  53 import javax.net.ssl.SSLContext;
  54 import javax.net.ssl.SSLParameters;
  55 import java.net.http.HttpClient;
  56 import java.net.http.HttpRequest;
  57 import java.net.http.HttpResponse;
  58 import jdk.internal.net.http.common.FlowTube;
  59 
  60 /**
  61  * @summary Verifies that the ConnectionPool correctly handle
  62  *          connection deadlines and purges the right connections
  63  *          from the cache.
  64  * @bug 8187044 8187111 8221395
  65  * @author danielfuchs
  66  */
  67 public class ConnectionPoolTest {
  68 
  69     static long getActiveCleaners() throws ClassNotFoundException {
  70         // ConnectionPool.ACTIVE_CLEANER_COUNTER.get()
  71         // ConnectionPoolTest.class.getModule().addReads(
  72         //      Class.forName("java.lang.management.ManagementFactory").getModule());
  73         return java.util.stream.Stream.of(ManagementFactory.getThreadMXBean()
  74                 .dumpAllThreads(false, false))
  75               .filter(t -> t.getThreadName().startsWith("HTTP-Cache-cleaner"))
  76               .count();
  77     }
  78 
  79     public static void main(String[] args) throws Exception {
  80         if (args.length == 0) {
  81             args = new String[] {"testCacheCleaners"};
  82         }
  83         for (String arg : args) {
  84             if ("testCacheCleaners".equals(arg)) {
  85                 testCacheCleaners();
  86             } else if ("testPoolSize".equals(arg)) {
  87                 assert args.length == 1 : "testPoolSize should be run in its own VM";
  88                 testPoolSize();
  89             } else if ("testCloseOrReturnToPool".equals(arg)) {
  90                 assert args.length == 1 : "testCloseOrReturnToPool should be run in its own VM";
  91                 testCloseOrReturnToPool();
  92             } else throw new RuntimeException("unknown test case: " + arg);
  93         }
  94     }
  95 
  96     public static void testCacheCleaners() throws Exception {
  97         ConnectionPool pool = new ConnectionPool(666);
  98         HttpClient client = new HttpClientStub(pool);
  99         InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80);
 100         System.out.println("Adding 20 connections to pool");
 101         Random random = new Random();
 102 
 103         final int count = 20;
 104         Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
 105         int[] keepAlives = new int[count];
 106         HttpConnectionStub[] connections = new HttpConnectionStub[count];
 107         long purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
 108         long expected = 0;
 109         if (purge != expected) {
 110             throw new RuntimeException("Bad purge delay: " + purge
 111                                         + ", expected " + expected);
 112         }
 113         expected = Long.MAX_VALUE;
 114         for (int i=0; i<count; i++) {
 115             InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
 116             keepAlives[i] = random.nextInt(10) * 10  + 10;
 117             connections[i] = new HttpConnectionStub(client, addr, proxy, true);
 118             System.out.println("Adding connection: " + now
 119                                 + " keepAlive: " + keepAlives[i]
 120                                 + " /" + connections[i]);
 121             pool.returnToPool(connections[i], now, keepAlives[i]);
 122             expected = Math.min(expected, keepAlives[i] * 1000);
 123             purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
 124             if (purge != expected) {
 125                 throw new RuntimeException("Bad purge delay: " + purge
 126                                         + ", expected " + expected);
 127             }
 128         }
 129         int min = IntStream.of(keepAlives).min().getAsInt();
 130         int max = IntStream.of(keepAlives).max().getAsInt();
 131         int mean = (min + max)/2;
 132         System.out.println("min=" + min + ", max=" + max + ", mean=" + mean);
 133         purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
 134         System.out.println("first purge would be in " + purge + " ms");
 135         if (Math.abs(purge/1000 - min) > 0) {
 136             throw new RuntimeException("expected " + min + " got " + purge/1000);
 137         }
 138         long opened = java.util.stream.Stream.of(connections)
 139                      .filter(HttpConnectionStub::connected).count();
 140         if (opened != count) {
 141             throw new RuntimeException("Opened: expected "
 142                                        + count + " got " + opened);
 143         }
 144         purge = mean * 1000;
 145         System.out.println("start purging at " + purge + " ms");
 146         Instant next = now;
 147         do {
 148            System.out.println("next purge is in " + purge + " ms");
 149            next = next.plus(purge, ChronoUnit.MILLIS);
 150            purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(next);
 151            long k = now.until(next, ChronoUnit.SECONDS);
 152            System.out.println("now is " + k + "s from start");
 153            for (int i=0; i<count; i++) {
 154                if (connections[i].connected() != (k < keepAlives[i])) {
 155                    throw new RuntimeException("Bad connection state for "
 156                              + i
 157                              + "\n\t connected=" + connections[i].connected()
 158                              + "\n\t keepAlive=" + keepAlives[i]
 159                              + "\n\t elapsed=" + k);
 160                }
 161            }
 162         } while (purge > 0);
 163         opened = java.util.stream.Stream.of(connections)
 164                      .filter(HttpConnectionStub::connected).count();
 165         if (opened != 0) {
 166            throw new RuntimeException("Closed: expected "
 167                                        + count + " got "
 168                                        + (count-opened));
 169         }
 170     }
 171 
 172     public static void testPoolSize() throws Exception {
 173         final int MAX_POOL_SIZE = 10;
 174         System.setProperty("jdk.httpclient.connectionPoolSize",
 175                 String.valueOf(MAX_POOL_SIZE));
 176         ConnectionPool pool = new ConnectionPool(666);
 177         HttpClient client = new HttpClientStub(pool);
 178         InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80);
 179         System.out.println("Adding 20 connections to pool");
 180         Random random = new Random();
 181 
 182         final int count = 20;
 183         Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
 184         int[] keepAlives = new int[count];
 185         HttpConnectionStub[] connections = new HttpConnectionStub[count];
 186         long purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
 187         long expected = 0;
 188         if (purge != expected) {
 189             throw new RuntimeException("Bad purge delay: " + purge
 190                     + ", expected " + expected);
 191         }
 192         expected = Long.MAX_VALUE;
 193         int previous = 0;
 194         for (int i=0; i<count; i++) {
 195             InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
 196             keepAlives[i] = random.nextInt(10) * 10  + 5 + previous;
 197             previous = keepAlives[i];
 198             connections[i] = new HttpConnectionStub(client, addr, proxy, true);
 199             System.out.println("Adding connection: " + now
 200                     + " keepAlive: " + keepAlives[i]
 201                     + " /" + connections[i]);
 202             pool.returnToPool(connections[i], now, keepAlives[i]);
 203             if (i < MAX_POOL_SIZE) {
 204                 expected = Math.min(expected, keepAlives[i] * 1000);
 205             } else {
 206                 expected = keepAlives[i-MAX_POOL_SIZE+1] * 1000;
 207                 if (pool.contains(connections[i-MAX_POOL_SIZE])) {
 208                     throw new RuntimeException("Connection[" + i + "]/"
 209                             + connections[i] + " should have been removed");
 210                 }
 211             }
 212             purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
 213             if (purge != expected) {
 214                 throw new RuntimeException("Bad purge delay for " + i + ": "
 215                         + purge + ", expected " + expected);
 216             }
 217         }
 218 
 219         long opened = java.util.stream.Stream.of(connections)
 220                 .filter(HttpConnectionStub::connected).count();
 221         if (opened != MAX_POOL_SIZE) {
 222             throw new RuntimeException("Opened: expected "
 223                     + count + " got " + opened);
 224         }
 225         for (int i=0 ; i<count; i++) {
 226             boolean closed = (i < count - MAX_POOL_SIZE);
 227             if (connections[i].closed != closed) {
 228                 throw new RuntimeException("connection[" + i + "] should be "
 229                         + (closed ? "closed" : "opened"));
 230             }
 231             if (pool.contains(connections[i]) == closed) {
 232                 throw new RuntimeException("Connection[" + i + "]/"
 233                         + connections[i] + " should "
 234                         + (closed ? "" : "not ")
 235                         + "have been removed");
 236             }
 237         }
 238     }
 239 
 240     public static void testCloseOrReturnToPool() throws Exception {
 241         HttpClientFacade facade = (HttpClientFacade)HttpClient.newHttpClient();
 242         HttpClientImpl client = facade.impl;
 243         ConnectionPool pool = client.connectionPool();
 244         InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80);
 245 
 246         InetSocketAddress addr = InetSocketAddress.createUnresolved("foo1", 80);
 247         HttpConnectionStub conn1 = new HttpConnectionStub(facade, client, addr, proxy, true);
 248         HttpHeaders hdrs = HttpHeaders.of(new HashMap<>(), (s1,s2) -> true);
 249         HttpConnection conn;
 250 
 251         conn1.reopen();
 252         if (!conn1.isOpen()) {
 253             throw new RuntimeException("conn1 finished");
 254         }
 255 
 256         conn1.closeOrReturnToCache(hdrs);
 257 
 258         // Check we can find conn1 in the pool
 259         if (conn1 != (conn = pool.getConnection(true, addr, proxy))) {
 260             throw new RuntimeException("conn1 not returned, got: " + conn);
 261         }
 262         System.out.println("Found connection in the pool: " + conn );
 263 
 264         // Try to return it with no headers: the connection should
 265         // be closed and not returned to the pool (EOF).
 266         conn.closeOrReturnToCache(null);
 267         if ((conn = pool.getConnection(true, addr, proxy)) != null) {
 268             throw new RuntimeException(conn + " found in the pool!");
 269         }
 270         if (!conn1.closed) {
 271             throw new RuntimeException("conn1 not closed!");
 272         }
 273         System.out.println("EOF connection successfully closed when returned to pool");
 274 
 275         // reopen the connection
 276         conn1.reopen();
 277         if (!conn1.isOpen()) {
 278             throw new RuntimeException("conn1 finished");
 279         }
 280 
 281         // Try to return it with empty headers: the connection should
 282         // be returned to the pool.
 283         conn1.closeOrReturnToCache(hdrs);
 284         if (conn1 != (conn = pool.getConnection(true, addr, proxy))) {
 285             throw new RuntimeException("conn1 not returned to pool, got: " + conn);
 286         }
 287         if (conn1.closed) {
 288             throw new RuntimeException("conn1 closed");
 289         }
 290         if (!conn1.isOpen()) {
 291             throw new RuntimeException("conn1 finished");
 292         }
 293 
 294         System.out.println("Keep alive connection successfully returned to pool");
 295 
 296         // Try to return it with connection: close headers: the connection should
 297         // not be returned to the pool, and should be closed.
 298         HttpHeaders hdrs2 = HttpHeaders.of(Map.of("connection", List.of("close")), (s1, s2) -> true);
 299         conn1.closeOrReturnToCache(hdrs2);
 300         if ((conn = pool.getConnection(true, addr, proxy)) != null) {
 301             throw new RuntimeException(conn + " found in the pool!");
 302         }
 303         if (!conn1.closed) {
 304             throw new RuntimeException("conn1 not closed!");
 305         }
 306         System.out.println("Close connection successfully closed when returned to pool");
 307 
 308         // reopen and finish the connection.
 309         conn1.reopen();
 310         conn1.finish(true);
 311         if (conn1.closed) {
 312             throw new RuntimeException("conn1 closed");
 313         }
 314         if (conn1.isOpen()) {
 315             throw new RuntimeException("conn1 is opened!");
 316         }
 317         conn1.closeOrReturnToCache(hdrs2);
 318         if ((conn = pool.getConnection(true, addr, proxy)) != null) {
 319             throw new RuntimeException(conn + " found in the pool!");
 320         }
 321         if (!conn1.closed) {
 322             throw new RuntimeException("conn1 not closed!");
 323         }
 324         System.out.println("Finished 'close' connection successfully closed when returned to pool");
 325 
 326         // reopen and finish the connection.
 327         conn1.reopen();
 328         conn1.finish(true);
 329         if (conn1.closed) {
 330             throw new RuntimeException("conn1 closed");
 331         }
 332         if (conn1.isOpen()) {
 333             throw new RuntimeException("conn1 is opened!");
 334         }
 335         conn1.closeOrReturnToCache(hdrs);
 336         if ((conn = pool.getConnection(true, addr, proxy)) != null) {
 337             throw new RuntimeException(conn + " found in the pool!");
 338         }
 339         if (!conn1.closed) {
 340             throw new RuntimeException("conn1 not closed!");
 341         }
 342         System.out.println("Finished keep-alive connection successfully closed when returned to pool");
 343 
 344         Reference.reachabilityFence(facade);
 345     }
 346 
 347     static <T> T error() {
 348         throw new InternalError("Should not reach here: wrong test assumptions!");
 349     }
 350 
 351     static class FlowTubeStub implements FlowTube {
 352         final HttpConnectionStub conn;
 353         FlowTubeStub(HttpConnectionStub conn) { this.conn = conn; }
 354         @Override
 355         public void onSubscribe(Flow.Subscription subscription) { }
 356         @Override public void onError(Throwable error) { error(); }
 357         @Override public void onComplete() { error(); }
 358         @Override public void onNext(List<ByteBuffer> item) { error();}
 359         @Override
 360         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 361         }
 362         @Override public boolean isFinished() { return conn.finished; }
 363     }
 364 
 365     static class SocketChannelStub extends SocketChannel {
 366 
 367         SocketChannelStub() { super(SelectorProvider.provider()); }
 368 
 369         @Override
 370         public SocketChannel bind(SocketAddress local) throws IOException {
 371             return error();
 372         }
 373         @Override
 374         public <T> SocketChannel setOption(SocketOption<T> name, T value) throws IOException {
 375             return error();
 376         }
 377         @Override
 378         public SocketChannel shutdownInput() throws IOException {
 379             return error();
 380         }
 381         @Override
 382         public SocketChannel shutdownOutput() throws IOException {
 383             return error();
 384         }
 385         @Override
 386         public Socket socket() { return error(); }
 387         @Override
 388         public boolean isConnected() { return true; }
 389         @Override
 390         public boolean isConnectionPending() { return false; }
 391         @Override
 392         public boolean connect(SocketAddress remote) throws IOException {
 393             return error();
 394         }
 395         @Override
 396         public boolean finishConnect() throws IOException {
 397             return error();
 398         }
 399         @Override
 400         public SocketAddress getRemoteAddress() throws IOException {
 401             return error();
 402         }
 403         @Override
 404         public int read(ByteBuffer dst) throws IOException {
 405             return error();
 406         }
 407         @Override
 408         public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
 409             return error();
 410         }
 411         @Override
 412         public int write(ByteBuffer src) throws IOException {
 413             return error();
 414         }
 415         @Override
 416         public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
 417             return 0;
 418         }
 419         @Override
 420         public SocketAddress getLocalAddress() throws IOException {
 421             return error();
 422         }
 423         @Override
 424         public <T> T getOption(SocketOption<T> name) throws IOException {
 425             return error();
 426         }
 427         @Override
 428         public Set<SocketOption<?>> supportedOptions() {
 429             return error();
 430         }
 431         @Override
 432         protected void implCloseSelectableChannel() throws IOException {
 433             error();
 434         }
 435         @Override
 436         protected void implConfigureBlocking(boolean block) throws IOException {
 437             error();
 438         }
 439     }
 440 
 441     // Emulates an HttpConnection that has a strong reference to its HttpClient.
 442     static class HttpConnectionStub extends HttpConnection {
 443 
 444         public HttpConnectionStub(
 445                 HttpClient client,
 446                 InetSocketAddress address,
 447                 InetSocketAddress proxy,
 448                 boolean secured) {
 449             this(client, null, address, proxy, secured);
 450         }
 451         public HttpConnectionStub(
 452                 HttpClient client,
 453                 HttpClientImpl impl,
 454                 InetSocketAddress address,
 455                 InetSocketAddress proxy,
 456                 boolean secured) {
 457             super(address, impl);
 458             this.key = ConnectionPool.cacheKey(address, proxy);
 459             this.address = address;
 460             this.proxy = proxy;
 461             this.secured = secured;
 462             this.client = client;
 463             this.channel = new SocketChannelStub();
 464             this.flow = new FlowTubeStub(this);
 465         }
 466 
 467         final InetSocketAddress proxy;
 468         final InetSocketAddress address;
 469         final boolean secured;
 470         final ConnectionPool.CacheKey key;
 471         final HttpClient client;
 472         final FlowTubeStub flow;
 473         final SocketChannel channel;
 474         volatile boolean closed, finished;
 475 
 476         // Used for testing closeOrReturnToPool.
 477         void finish(boolean finished) { this.finished = finished; }
 478         void reopen() { closed = finished = false;}
 479 
 480         // All these return something
 481         @Override boolean connected() {return !closed;}
 482         @Override boolean isSecure() {return secured;}
 483         @Override boolean isProxied() {return proxy!=null;}
 484         @Override InetSocketAddress proxy() { return proxy; }
 485         @Override ConnectionPool.CacheKey cacheKey() {return key;}
 486         @Override FlowTube getConnectionFlow() {return flow;}
 487         @Override SocketChannel channel() {return channel;}
 488         @Override
 489         public void close() {
 490             closed=finished=true;
 491             System.out.println("closed: " + this);
 492         }
 493         @Override
 494         public String toString() {
 495             return "HttpConnectionStub: " + address + " proxy: " + proxy;
 496         }
 497 
 498 
 499         // All these throw errors
 500         @Override public HttpPublisher publisher() {return error();}
 501         @Override public CompletableFuture<Void> connectAsync(Exchange<?> e) {return error();}
 502         @Override public CompletableFuture<Void> finishConnect() {return error();}
 503     }
 504     // Emulates an HttpClient that has a strong reference to its connection pool.
 505     static class HttpClientStub extends HttpClient {
 506         public HttpClientStub(ConnectionPool pool) {
 507             this.pool = pool;
 508         }
 509         final ConnectionPool pool;
 510         @Override public Optional<CookieHandler> cookieHandler() {return error();}
 511         @Override public Optional<Duration> connectTimeout() {return error();}
 512         @Override public HttpClient.Redirect followRedirects() {return error();}
 513         @Override public Optional<ProxySelector> proxy() {return error();}
 514         @Override public SSLContext sslContext() {return error();}
 515         @Override public SSLParameters sslParameters() {return error();}
 516         @Override public Optional<Authenticator> authenticator() {return error();}
 517         @Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;}
 518         @Override public Optional<Executor> executor() {return error();}
 519         @Override
 520         public <T> HttpResponse<T> send(HttpRequest req,
 521                                         HttpResponse.BodyHandler<T> responseBodyHandler)
 522                 throws IOException, InterruptedException {
 523             return error();
 524         }
 525         @Override
 526         public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req,
 527                 HttpResponse.BodyHandler<T> responseBodyHandler) {
 528             return error();
 529         }
 530         @Override
 531         public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req,
 532                 HttpResponse.BodyHandler<T> bodyHandler,
 533                 HttpResponse.PushPromiseHandler<T> multiHandler) {
 534             return error();
 535         }
 536     }
 537 
 538 }