< prev index next >

test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java

Print this page

        

*** 23,53 **** package jdk.incubator.http; import java.io.IOException; import java.lang.management.ManagementFactory; - import java.lang.ref.Reference; - import java.lang.ref.ReferenceQueue; - import java.lang.ref.WeakReference; import java.net.Authenticator; ! import java.net.CookieManager; import java.net.InetSocketAddress; import java.net.ProxySelector; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; ! import jdk.incubator.http.internal.common.ByteBufferReference; /** ! * @summary Verifies that the ConnectionPool won't prevent an HttpClient ! * from being GC'ed. Verifies that the ConnectionPool has at most ! * one CacheCleaner thread running. ! * @bug 8187044 * @author danielfuchs */ public class ConnectionPoolTest { static long getActiveCleaners() throws ClassNotFoundException { --- 23,56 ---- package jdk.incubator.http; import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.Authenticator; ! import java.net.CookieHandler; import java.net.InetSocketAddress; import java.net.ProxySelector; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; + import java.util.List; import java.util.Optional; + import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; + import java.util.concurrent.Flow; + import java.util.stream.IntStream; + import java.time.Instant; + import java.time.temporal.ChronoUnit; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; ! import jdk.incubator.http.internal.common.FlowTube; /** ! * @summary Verifies that the ConnectionPool correctly handle ! * connection deadlines and purges the right connections ! * from the cache. ! * @bug 8187044 8187111 * @author danielfuchs */ public class ConnectionPoolTest { static long getActiveCleaners() throws ClassNotFoundException {
*** 63,166 **** public static void main(String[] args) throws Exception { testCacheCleaners(); } public static void testCacheCleaners() throws Exception { ! ConnectionPool pool = new ConnectionPool(); HttpClient client = new HttpClientStub(pool); InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80); System.out.println("Adding 10 connections to pool"); ! for (int i=0; i<10; i++) { ! InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80); ! HttpConnection c1 = new HttpConnectionStub(client, addr, proxy, true); ! pool.returnToPool(c1); ! } ! while (getActiveCleaners() == 0) { ! System.out.println("Waiting for cleaner to start"); ! Thread.sleep(10); ! } ! System.out.println("Active CacheCleaners: " + getActiveCleaners()); ! if (getActiveCleaners() > 1) { ! throw new RuntimeException("Too many CacheCleaner active: " ! + getActiveCleaners()); ! } ! System.out.println("Removing 9 connections from pool"); ! for (int i=0; i<9; i++) { ! InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80); ! HttpConnection c2 = pool.getConnection(true, addr, proxy); ! if (c2 == null) { ! throw new RuntimeException("connection not found for " + addr); ! } ! } ! System.out.println("Active CacheCleaners: " + getActiveCleaners()); ! if (getActiveCleaners() != 1) { ! throw new RuntimeException("Wrong number of CacheCleaner active: " ! + getActiveCleaners()); } ! System.out.println("Removing last connection from pool"); ! for (int i=9; i<10; i++) { InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80); ! HttpConnection c2 = pool.getConnection(true, addr, proxy); ! if (c2 == null) { ! throw new RuntimeException("connection not found for " + addr); ! } ! } ! System.out.println("Active CacheCleaners: " + getActiveCleaners() ! + " (may be 0 or may still be 1)"); ! if (getActiveCleaners() > 1) { ! throw new RuntimeException("Too many CacheCleaner active: " ! + getActiveCleaners()); ! } ! InetSocketAddress addr = InetSocketAddress.createUnresolved("foo", 80); ! HttpConnection c = new HttpConnectionStub(client, addr, proxy, true); ! System.out.println("Adding/Removing one connection from pool 20 times in a loop"); ! for (int i=0; i<20; i++) { ! pool.returnToPool(c); ! HttpConnection c2 = pool.getConnection(true, addr, proxy); ! if (c2 == null) { ! throw new RuntimeException("connection not found for " + addr); ! } ! if (c2 != c) { ! throw new RuntimeException("wrong connection found for " + addr); ! } ! } ! if (getActiveCleaners() > 1) { ! throw new RuntimeException("Too many CacheCleaner active: " ! + getActiveCleaners()); ! } ! ReferenceQueue<HttpClient> queue = new ReferenceQueue<>(); ! WeakReference<HttpClient> weak = new WeakReference<>(client, queue); ! System.gc(); ! Reference.reachabilityFence(pool); ! client = null; pool = null; c = null; ! while (true) { ! long cleaners = getActiveCleaners(); ! System.out.println("Waiting for GC to release stub HttpClient;" ! + " active cache cleaners: " + cleaners); ! System.gc(); ! Reference<?> ref = queue.remove(1000); ! if (ref == weak) { ! System.out.println("Stub HttpClient GC'ed"); ! break; ! } ! } ! while (getActiveCleaners() > 0) { ! System.out.println("Waiting for CacheCleaner to stop"); ! Thread.sleep(1000); ! } ! System.out.println("Active CacheCleaners: " ! + getActiveCleaners()); ! ! if (getActiveCleaners() > 0) { ! throw new RuntimeException("Too many CacheCleaner active: " ! + getActiveCleaners()); } } static <T> T error() { throw new InternalError("Should not reach here: wrong test assumptions!"); } // Emulates an HttpConnection that has a strong reference to its HttpClient. static class HttpConnectionStub extends HttpConnection { public HttpConnectionStub(HttpClient client, InetSocketAddress address, --- 66,168 ---- public static void main(String[] args) throws Exception { testCacheCleaners(); } public static void testCacheCleaners() throws Exception { ! ConnectionPool pool = new ConnectionPool(666); HttpClient client = new HttpClientStub(pool); InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80); System.out.println("Adding 10 connections to pool"); ! Random random = new Random(); ! ! final int count = 20; ! Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); ! int[] keepAlives = new int[count]; ! HttpConnectionStub[] connections = new HttpConnectionStub[count]; ! long purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now); ! long expected = 0; ! if (purge != expected) { ! throw new RuntimeException("Bad purge delay: " + purge ! + ", expected " + expected); } ! expected = Long.MAX_VALUE; ! for (int i=0; i<count; i++) { InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80); ! keepAlives[i] = random.nextInt(10) * 10 + 10; ! connections[i] = new HttpConnectionStub(client, addr, proxy, true); ! System.out.println("Adding connection: " + now ! + " keepAlive: " + keepAlives[i] ! + " /" + connections[i]); ! pool.returnToPool(connections[i], now, keepAlives[i]); ! expected = Math.min(expected, keepAlives[i] * 1000); ! purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now); ! if (purge != expected) { ! throw new RuntimeException("Bad purge delay: " + purge ! + ", expected " + expected); ! } ! } ! int min = IntStream.of(keepAlives).min().getAsInt(); ! int max = IntStream.of(keepAlives).max().getAsInt(); ! int mean = (min + max)/2; ! System.out.println("min=" + min + ", max=" + max + ", mean=" + mean); ! purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now); ! System.out.println("first purge would be in " + purge + " ms"); ! if (Math.abs(purge/1000 - min) > 0) { ! throw new RuntimeException("expected " + min + " got " + purge/1000); ! } ! long opened = java.util.stream.Stream.of(connections) ! .filter(HttpConnectionStub::connected).count(); ! if (opened != count) { ! throw new RuntimeException("Opened: expected " ! + count + " got " + opened); ! } ! purge = mean * 1000; ! System.out.println("start purging at " + purge + " ms"); ! Instant next = now; ! do { ! System.out.println("next purge is in " + purge + " ms"); ! next = next.plus(purge, ChronoUnit.MILLIS); ! purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(next); ! long k = now.until(next, ChronoUnit.SECONDS); ! System.out.println("now is " + k + "s from start"); ! for (int i=0; i<count; i++) { ! if (connections[i].connected() != (k < keepAlives[i])) { ! throw new RuntimeException("Bad connection state for " ! + i ! + "\n\t connected=" + connections[i].connected() ! + "\n\t keepAlive=" + keepAlives[i] ! + "\n\t elapsed=" + k); ! } ! } ! } while (purge > 0); ! opened = java.util.stream.Stream.of(connections) ! .filter(HttpConnectionStub::connected).count(); ! if (opened != 0) { ! throw new RuntimeException("Closed: expected " ! + count + " got " ! + (count-opened)); } } + static <T> T error() { throw new InternalError("Should not reach here: wrong test assumptions!"); } + static class FlowTubeStub implements FlowTube { + final HttpConnectionStub conn; + FlowTubeStub(HttpConnectionStub conn) { this.conn = conn; } + @Override + public void onSubscribe(Flow.Subscription subscription) { } + @Override public void onError(Throwable error) { error(); } + @Override public void onComplete() { error(); } + @Override public void onNext(List<ByteBuffer> item) { error();} + @Override + public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { + } + @Override public boolean isFinished() { return conn.closed; } + } + // Emulates an HttpConnection that has a strong reference to its HttpClient. static class HttpConnectionStub extends HttpConnection { public HttpConnectionStub(HttpClient client, InetSocketAddress address,
*** 170,238 **** this.key = ConnectionPool.cacheKey(address, proxy); this.address = address; this.proxy = proxy; this.secured = secured; this.client = client; } ! InetSocketAddress proxy; ! InetSocketAddress address; ! boolean secured; ! ConnectionPool.CacheKey key; ! HttpClient client; // All these return something ! @Override boolean connected() {return true;} @Override boolean isSecure() {return secured;} @Override boolean isProxied() {return proxy!=null;} @Override ConnectionPool.CacheKey cacheKey() {return key;} - @Override public void close() {} @Override void shutdownInput() throws IOException {} @Override void shutdownOutput() throws IOException {} public String toString() { return "HttpConnectionStub: " + address + " proxy: " + proxy; } // All these throw errors ! @Override ! public void connect() throws IOException, InterruptedException {error();} @Override public CompletableFuture<Void> connectAsync() {return error();} @Override SocketChannel channel() {return error();} - @Override void flushAsync() throws IOException {error();} - @Override - protected ByteBuffer readImpl() throws IOException {return error();} - @Override CompletableFuture<Void> whenReceivingResponse() {return error();} @Override ! long write(ByteBuffer[] buffers, int start, int number) throws IOException { ! throw (Error)error(); ! } ! @Override ! long write(ByteBuffer buffer) throws IOException {throw (Error)error();} ! @Override ! void writeAsync(ByteBufferReference[] buffers) throws IOException { ! error(); } @Override ! void writeAsyncUnordered(ByteBufferReference[] buffers) ! throws IOException { ! error(); ! } } // Emulates an HttpClient that has a strong reference to its connection pool. static class HttpClientStub extends HttpClient { public HttpClientStub(ConnectionPool pool) { this.pool = pool; } final ConnectionPool pool; ! @Override public Optional<CookieManager> cookieManager() {return error();} @Override public HttpClient.Redirect followRedirects() {return error();} @Override public Optional<ProxySelector> proxy() {return error();} @Override public SSLContext sslContext() {return error();} ! @Override public Optional<SSLParameters> sslParameters() {return error();} @Override public Optional<Authenticator> authenticator() {return error();} @Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;} ! @Override public Executor executor() {return error();} @Override public <T> HttpResponse<T> send(HttpRequest req, HttpResponse.BodyHandler<T> responseBodyHandler) throws IOException, InterruptedException { return error(); --- 172,234 ---- this.key = ConnectionPool.cacheKey(address, proxy); this.address = address; this.proxy = proxy; this.secured = secured; this.client = client; + this.flow = new FlowTubeStub(this); } ! final InetSocketAddress proxy; ! final InetSocketAddress address; ! final boolean secured; ! final ConnectionPool.CacheKey key; ! final HttpClient client; ! final FlowTubeStub flow; ! volatile boolean closed; // All these return something ! @Override boolean connected() {return !closed;} @Override boolean isSecure() {return secured;} @Override boolean isProxied() {return proxy!=null;} @Override ConnectionPool.CacheKey cacheKey() {return key;} @Override void shutdownInput() throws IOException {} @Override void shutdownOutput() throws IOException {} + @Override + public void close() { + closed=true; + System.out.println("closed: " + this); + } + @Override public String toString() { return "HttpConnectionStub: " + address + " proxy: " + proxy; } // All these throw errors ! @Override public HttpPublisher publisher() {return error();} @Override public CompletableFuture<Void> connectAsync() {return error();} @Override SocketChannel channel() {return error();} @Override ! HttpConnection.DetachedConnectionChannel detachChannel() { ! return error(); } @Override ! FlowTube getConnectionFlow() {return flow;} } // Emulates an HttpClient that has a strong reference to its connection pool. static class HttpClientStub extends HttpClient { public HttpClientStub(ConnectionPool pool) { this.pool = pool; } final ConnectionPool pool; ! @Override public Optional<CookieHandler> cookieHandler() {return error();} @Override public HttpClient.Redirect followRedirects() {return error();} @Override public Optional<ProxySelector> proxy() {return error();} @Override public SSLContext sslContext() {return error();} ! @Override public SSLParameters sslParameters() {return error();} @Override public Optional<Authenticator> authenticator() {return error();} @Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;} ! @Override public Optional<Executor> executor() {return error();} @Override public <T> HttpResponse<T> send(HttpRequest req, HttpResponse.BodyHandler<T> responseBodyHandler) throws IOException, InterruptedException { return error();
*** 242,252 **** HttpResponse.BodyHandler<T> responseBodyHandler) { return error(); } @Override public <U, T> CompletableFuture<U> sendAsync(HttpRequest req, ! HttpResponse.MultiProcessor<U, T> multiProcessor) { return error(); } } } --- 238,248 ---- HttpResponse.BodyHandler<T> responseBodyHandler) { return error(); } @Override public <U, T> CompletableFuture<U> sendAsync(HttpRequest req, ! HttpResponse.MultiSubscriber<U, T> multiSubscriber) { return error(); } } }
< prev index next >