8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 package jdk.incubator.http; 25 26 import java.io.IOException; 27 import java.lang.management.ManagementFactory; 28 import java.lang.ref.Reference; 29 import java.lang.ref.ReferenceQueue; 30 import java.lang.ref.WeakReference; 31 import java.net.Authenticator; 32 import java.net.CookieManager; 33 import java.net.InetSocketAddress; 34 import java.net.ProxySelector; 35 import java.nio.ByteBuffer; 36 import java.nio.channels.SocketChannel; 37 import java.util.Optional; 38 import java.util.concurrent.CompletableFuture; 39 import java.util.concurrent.Executor; 40 import javax.net.ssl.SSLContext; 41 import javax.net.ssl.SSLParameters; 42 import jdk.incubator.http.internal.common.ByteBufferReference; 43 44 /** 45 * @summary Verifies that the ConnectionPool won't prevent an HttpClient 46 * from being GC'ed. Verifies that the ConnectionPool has at most 47 * one CacheCleaner thread running. 48 * @bug 8187044 49 * @author danielfuchs 50 */ 51 public class ConnectionPoolTest { 52 53 static long getActiveCleaners() throws ClassNotFoundException { 54 // ConnectionPool.ACTIVE_CLEANER_COUNTER.get() 55 // ConnectionPoolTest.class.getModule().addReads( 56 // Class.forName("java.lang.management.ManagementFactory").getModule()); 57 return java.util.stream.Stream.of(ManagementFactory.getThreadMXBean() 58 .dumpAllThreads(false, false)) 59 .filter(t -> t.getThreadName().startsWith("HTTP-Cache-cleaner")) 60 .count(); 61 } 62 63 public static void main(String[] args) throws Exception { 64 testCacheCleaners(); 65 } 66 67 public static void testCacheCleaners() throws Exception { 68 ConnectionPool pool = new ConnectionPool(); 69 HttpClient client = new HttpClientStub(pool); 70 InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80); 71 System.out.println("Adding 10 connections to pool"); 72 for (int i=0; i<10; i++) { 73 InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80); 74 HttpConnection c1 = new HttpConnectionStub(client, addr, proxy, true); 75 pool.returnToPool(c1); 76 } 77 while (getActiveCleaners() == 0) { 78 System.out.println("Waiting for cleaner to start"); 79 Thread.sleep(10); 80 } 81 System.out.println("Active CacheCleaners: " + getActiveCleaners()); 82 if (getActiveCleaners() > 1) { 83 throw new RuntimeException("Too many CacheCleaner active: " 84 + getActiveCleaners()); 85 } 86 System.out.println("Removing 9 connections from pool"); 87 for (int i=0; i<9; i++) { 88 InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80); 89 HttpConnection c2 = pool.getConnection(true, addr, proxy); 90 if (c2 == null) { 91 throw new RuntimeException("connection not found for " + addr); 92 } 93 } 94 System.out.println("Active CacheCleaners: " + getActiveCleaners()); 95 if (getActiveCleaners() != 1) { 96 throw new RuntimeException("Wrong number of CacheCleaner active: " 97 + getActiveCleaners()); 98 } 99 System.out.println("Removing last connection from pool"); 100 for (int i=9; i<10; i++) { 101 InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80); 102 HttpConnection c2 = pool.getConnection(true, addr, proxy); 103 if (c2 == null) { 104 throw new RuntimeException("connection not found for " + addr); 105 } 106 } 107 System.out.println("Active CacheCleaners: " + getActiveCleaners() 108 + " (may be 0 or may still be 1)"); 109 if (getActiveCleaners() > 1) { 110 throw new RuntimeException("Too many CacheCleaner active: " 111 + getActiveCleaners()); 112 } 113 InetSocketAddress addr = InetSocketAddress.createUnresolved("foo", 80); 114 HttpConnection c = new HttpConnectionStub(client, addr, proxy, true); 115 System.out.println("Adding/Removing one connection from pool 20 times in a loop"); 116 for (int i=0; i<20; i++) { 117 pool.returnToPool(c); 118 HttpConnection c2 = pool.getConnection(true, addr, proxy); 119 if (c2 == null) { 120 throw new RuntimeException("connection not found for " + addr); 121 } 122 if (c2 != c) { 123 throw new RuntimeException("wrong connection found for " + addr); 124 } 125 } 126 if (getActiveCleaners() > 1) { 127 throw new RuntimeException("Too many CacheCleaner active: " 128 + getActiveCleaners()); 129 } 130 ReferenceQueue<HttpClient> queue = new ReferenceQueue<>(); 131 WeakReference<HttpClient> weak = new WeakReference<>(client, queue); 132 System.gc(); 133 Reference.reachabilityFence(pool); 134 client = null; pool = null; c = null; 135 while (true) { 136 long cleaners = getActiveCleaners(); 137 System.out.println("Waiting for GC to release stub HttpClient;" 138 + " active cache cleaners: " + cleaners); 139 System.gc(); 140 Reference<?> ref = queue.remove(1000); 141 if (ref == weak) { 142 System.out.println("Stub HttpClient GC'ed"); 143 break; 144 } 145 } 146 while (getActiveCleaners() > 0) { 147 System.out.println("Waiting for CacheCleaner to stop"); 148 Thread.sleep(1000); 149 } 150 System.out.println("Active CacheCleaners: " 151 + getActiveCleaners()); 152 153 if (getActiveCleaners() > 0) { 154 throw new RuntimeException("Too many CacheCleaner active: " 155 + getActiveCleaners()); 156 } 157 } 158 static <T> T error() { 159 throw new InternalError("Should not reach here: wrong test assumptions!"); 160 } 161 162 // Emulates an HttpConnection that has a strong reference to its HttpClient. 163 static class HttpConnectionStub extends HttpConnection { 164 165 public HttpConnectionStub(HttpClient client, 166 InetSocketAddress address, 167 InetSocketAddress proxy, 168 boolean secured) { 169 super(address, null); 170 this.key = ConnectionPool.cacheKey(address, proxy); 171 this.address = address; 172 this.proxy = proxy; 173 this.secured = secured; 174 this.client = client; 175 } 176 177 InetSocketAddress proxy; 178 InetSocketAddress address; 179 boolean secured; 180 ConnectionPool.CacheKey key; 181 HttpClient client; 182 183 // All these return something 184 @Override boolean connected() {return true;} 185 @Override boolean isSecure() {return secured;} 186 @Override boolean isProxied() {return proxy!=null;} 187 @Override ConnectionPool.CacheKey cacheKey() {return key;} 188 @Override public void close() {} 189 @Override void shutdownInput() throws IOException {} 190 @Override void shutdownOutput() throws IOException {} 191 public String toString() { 192 return "HttpConnectionStub: " + address + " proxy: " + proxy; 193 } 194 195 // All these throw errors 196 @Override 197 public void connect() throws IOException, InterruptedException {error();} 198 @Override public CompletableFuture<Void> connectAsync() {return error();} 199 @Override SocketChannel channel() {return error();} 200 @Override void flushAsync() throws IOException {error();} 201 @Override 202 protected ByteBuffer readImpl() throws IOException {return error();} 203 @Override CompletableFuture<Void> whenReceivingResponse() {return error();} 204 @Override 205 long write(ByteBuffer[] buffers, int start, int number) throws IOException { 206 throw (Error)error(); 207 } 208 @Override 209 long write(ByteBuffer buffer) throws IOException {throw (Error)error();} 210 @Override 211 void writeAsync(ByteBufferReference[] buffers) throws IOException { 212 error(); 213 } 214 @Override 215 void writeAsyncUnordered(ByteBufferReference[] buffers) 216 throws IOException { 217 error(); 218 } 219 } 220 // Emulates an HttpClient that has a strong reference to its connection pool. 221 static class HttpClientStub extends HttpClient { 222 public HttpClientStub(ConnectionPool pool) { 223 this.pool = pool; 224 } 225 final ConnectionPool pool; 226 @Override public Optional<CookieManager> cookieManager() {return error();} 227 @Override public HttpClient.Redirect followRedirects() {return error();} 228 @Override public Optional<ProxySelector> proxy() {return error();} 229 @Override public SSLContext sslContext() {return error();} 230 @Override public Optional<SSLParameters> sslParameters() {return error();} 231 @Override public Optional<Authenticator> authenticator() {return error();} 232 @Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;} 233 @Override public Executor executor() {return error();} 234 @Override 235 public <T> HttpResponse<T> send(HttpRequest req, 236 HttpResponse.BodyHandler<T> responseBodyHandler) 237 throws IOException, InterruptedException { 238 return error(); 239 } 240 @Override 241 public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req, 242 HttpResponse.BodyHandler<T> responseBodyHandler) { 243 return error(); 244 } 245 @Override 246 public <U, T> CompletableFuture<U> sendAsync(HttpRequest req, 247 HttpResponse.MultiProcessor<U, T> multiProcessor) { 248 return error(); 249 } 250 } 251 252 } | 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 package jdk.incubator.http; 25 26 import java.io.IOException; 27 import java.lang.management.ManagementFactory; 28 import java.net.Authenticator; 29 import java.net.CookieHandler; 30 import java.net.InetSocketAddress; 31 import java.net.ProxySelector; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.SocketChannel; 34 import java.util.List; 35 import java.util.Optional; 36 import java.util.Random; 37 import java.util.concurrent.CompletableFuture; 38 import java.util.concurrent.Executor; 39 import java.util.concurrent.Flow; 40 import java.util.stream.IntStream; 41 import java.time.Instant; 42 import java.time.temporal.ChronoUnit; 43 import javax.net.ssl.SSLContext; 44 import javax.net.ssl.SSLParameters; 45 import jdk.incubator.http.internal.common.FlowTube; 46 47 /** 48 * @summary Verifies that the ConnectionPool correctly handle 49 * connection deadlines and purges the right connections 50 * from the cache. 51 * @bug 8187044 8187111 52 * @author danielfuchs 53 */ 54 public class ConnectionPoolTest { 55 56 static long getActiveCleaners() throws ClassNotFoundException { 57 // ConnectionPool.ACTIVE_CLEANER_COUNTER.get() 58 // ConnectionPoolTest.class.getModule().addReads( 59 // Class.forName("java.lang.management.ManagementFactory").getModule()); 60 return java.util.stream.Stream.of(ManagementFactory.getThreadMXBean() 61 .dumpAllThreads(false, false)) 62 .filter(t -> t.getThreadName().startsWith("HTTP-Cache-cleaner")) 63 .count(); 64 } 65 66 public static void main(String[] args) throws Exception { 67 testCacheCleaners(); 68 } 69 70 public static void testCacheCleaners() throws Exception { 71 ConnectionPool pool = new ConnectionPool(666); 72 HttpClient client = new HttpClientStub(pool); 73 InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80); 74 System.out.println("Adding 10 connections to pool"); 75 Random random = new Random(); 76 77 final int count = 20; 78 Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); 79 int[] keepAlives = new int[count]; 80 HttpConnectionStub[] connections = new HttpConnectionStub[count]; 81 long purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now); 82 long expected = 0; 83 if (purge != expected) { 84 throw new RuntimeException("Bad purge delay: " + purge 85 + ", expected " + expected); 86 } 87 expected = Long.MAX_VALUE; 88 for (int i=0; i<count; i++) { 89 InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80); 90 keepAlives[i] = random.nextInt(10) * 10 + 10; 91 connections[i] = new HttpConnectionStub(client, addr, proxy, true); 92 System.out.println("Adding connection: " + now 93 + " keepAlive: " + keepAlives[i] 94 + " /" + connections[i]); 95 pool.returnToPool(connections[i], now, keepAlives[i]); 96 expected = Math.min(expected, keepAlives[i] * 1000); 97 purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now); 98 if (purge != expected) { 99 throw new RuntimeException("Bad purge delay: " + purge 100 + ", expected " + expected); 101 } 102 } 103 int min = IntStream.of(keepAlives).min().getAsInt(); 104 int max = IntStream.of(keepAlives).max().getAsInt(); 105 int mean = (min + max)/2; 106 System.out.println("min=" + min + ", max=" + max + ", mean=" + mean); 107 purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now); 108 System.out.println("first purge would be in " + purge + " ms"); 109 if (Math.abs(purge/1000 - min) > 0) { 110 throw new RuntimeException("expected " + min + " got " + purge/1000); 111 } 112 long opened = java.util.stream.Stream.of(connections) 113 .filter(HttpConnectionStub::connected).count(); 114 if (opened != count) { 115 throw new RuntimeException("Opened: expected " 116 + count + " got " + opened); 117 } 118 purge = mean * 1000; 119 System.out.println("start purging at " + purge + " ms"); 120 Instant next = now; 121 do { 122 System.out.println("next purge is in " + purge + " ms"); 123 next = next.plus(purge, ChronoUnit.MILLIS); 124 purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(next); 125 long k = now.until(next, ChronoUnit.SECONDS); 126 System.out.println("now is " + k + "s from start"); 127 for (int i=0; i<count; i++) { 128 if (connections[i].connected() != (k < keepAlives[i])) { 129 throw new RuntimeException("Bad connection state for " 130 + i 131 + "\n\t connected=" + connections[i].connected() 132 + "\n\t keepAlive=" + keepAlives[i] 133 + "\n\t elapsed=" + k); 134 } 135 } 136 } while (purge > 0); 137 opened = java.util.stream.Stream.of(connections) 138 .filter(HttpConnectionStub::connected).count(); 139 if (opened != 0) { 140 throw new RuntimeException("Closed: expected " 141 + count + " got " 142 + (count-opened)); 143 } 144 } 145 146 static <T> T error() { 147 throw new InternalError("Should not reach here: wrong test assumptions!"); 148 } 149 150 static class FlowTubeStub implements FlowTube { 151 final HttpConnectionStub conn; 152 FlowTubeStub(HttpConnectionStub conn) { this.conn = conn; } 153 @Override 154 public void onSubscribe(Flow.Subscription subscription) { } 155 @Override public void onError(Throwable error) { error(); } 156 @Override public void onComplete() { error(); } 157 @Override public void onNext(List<ByteBuffer> item) { error();} 158 @Override 159 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 160 } 161 @Override public boolean isFinished() { return conn.closed; } 162 } 163 164 // Emulates an HttpConnection that has a strong reference to its HttpClient. 165 static class HttpConnectionStub extends HttpConnection { 166 167 public HttpConnectionStub(HttpClient client, 168 InetSocketAddress address, 169 InetSocketAddress proxy, 170 boolean secured) { 171 super(address, null); 172 this.key = ConnectionPool.cacheKey(address, proxy); 173 this.address = address; 174 this.proxy = proxy; 175 this.secured = secured; 176 this.client = client; 177 this.flow = new FlowTubeStub(this); 178 } 179 180 final InetSocketAddress proxy; 181 final InetSocketAddress address; 182 final boolean secured; 183 final ConnectionPool.CacheKey key; 184 final HttpClient client; 185 final FlowTubeStub flow; 186 volatile boolean closed; 187 188 // All these return something 189 @Override boolean connected() {return !closed;} 190 @Override boolean isSecure() {return secured;} 191 @Override boolean isProxied() {return proxy!=null;} 192 @Override ConnectionPool.CacheKey cacheKey() {return key;} 193 @Override void shutdownInput() throws IOException {} 194 @Override void shutdownOutput() throws IOException {} 195 @Override 196 public void close() { 197 closed=true; 198 System.out.println("closed: " + this); 199 } 200 @Override 201 public String toString() { 202 return "HttpConnectionStub: " + address + " proxy: " + proxy; 203 } 204 205 // All these throw errors 206 @Override public HttpPublisher publisher() {return error();} 207 @Override public CompletableFuture<Void> connectAsync() {return error();} 208 @Override SocketChannel channel() {return error();} 209 @Override 210 HttpConnection.DetachedConnectionChannel detachChannel() { 211 return error(); 212 } 213 @Override 214 FlowTube getConnectionFlow() {return flow;} 215 } 216 // Emulates an HttpClient that has a strong reference to its connection pool. 217 static class HttpClientStub extends HttpClient { 218 public HttpClientStub(ConnectionPool pool) { 219 this.pool = pool; 220 } 221 final ConnectionPool pool; 222 @Override public Optional<CookieHandler> cookieHandler() {return error();} 223 @Override public HttpClient.Redirect followRedirects() {return error();} 224 @Override public Optional<ProxySelector> proxy() {return error();} 225 @Override public SSLContext sslContext() {return error();} 226 @Override public SSLParameters sslParameters() {return error();} 227 @Override public Optional<Authenticator> authenticator() {return error();} 228 @Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;} 229 @Override public Optional<Executor> executor() {return error();} 230 @Override 231 public <T> HttpResponse<T> send(HttpRequest req, 232 HttpResponse.BodyHandler<T> responseBodyHandler) 233 throws IOException, InterruptedException { 234 return error(); 235 } 236 @Override 237 public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req, 238 HttpResponse.BodyHandler<T> responseBodyHandler) { 239 return error(); 240 } 241 @Override 242 public <U, T> CompletableFuture<U> sendAsync(HttpRequest req, 243 HttpResponse.MultiSubscriber<U, T> multiSubscriber) { 244 return error(); 245 } 246 } 247 248 } |