--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java 2017-11-30 04:03:56.038795264 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java 2017-11-30 04:03:55.838777778 -0800 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,17 +25,18 @@ package jdk.incubator.http; -import java.io.IOException; +import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.net.URI; import java.util.Base64; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - +import java.util.concurrent.CompletableFuture; +import jdk.incubator.http.internal.common.MinimalFuture; import jdk.incubator.http.internal.common.Utils; import jdk.incubator.http.internal.frame.SettingsFrame; import static jdk.incubator.http.internal.frame.SettingsFrame.INITIAL_WINDOW_SIZE; @@ -49,6 +50,10 @@ */ class Http2ClientImpl { + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + final static System.Logger debug = + Utils.getDebugLogger("Http2ClientImpl"::toString, DEBUG); + private final HttpClientImpl client; Http2ClientImpl(HttpClientImpl client) { @@ -59,13 +64,26 @@ private final Map connections = new ConcurrentHashMap<>(); private final Set opening = Collections.synchronizedSet(new HashSet<>()); + private final Map>> waiting = + Collections.synchronizedMap(new HashMap<>()); - boolean haveConnectionFor(URI uri, InetSocketAddress proxy) { - return connections.containsKey(Http2Connection.keyFor(uri,proxy)); + private void addToWaiting(String key, CompletableFuture cf) { + synchronized (waiting) { + Set> waiters = waiting.get(key); + if (waiters == null) { + waiters = new HashSet<>(); + waiting.put(key, waiters); + } + waiters.add(cf); + } } +// boolean haveConnectionFor(URI uri, InetSocketAddress proxy) { +// return connections.containsKey(Http2Connection.keyFor(uri,proxy)); +// } + /** - * If a https request then blocks and waits until a connection is opened. + * If a https request then async waits until a connection is opened. * Returns null if the request is 'http' as a different (upgrade) * mechanism is used. * @@ -78,50 +96,59 @@ * In latter case, when the Http2Connection is connected, putConnection() must * be called to store it. */ - Http2Connection getConnectionFor(HttpRequestImpl req) - throws IOException, InterruptedException { + CompletableFuture getConnectionFor(HttpRequestImpl req) { URI uri = req.uri(); - InetSocketAddress proxy = req.proxy(client); + InetSocketAddress proxy = req.proxy(); String key = Http2Connection.keyFor(uri, proxy); - Http2Connection connection = connections.get(key); - if (connection != null) { // fast path if connection already exists - return connection; - } + synchronized (opening) { - while ((connection = connections.get(key)) == null) { - if (!req.secure()) { - return null; - } - if (!opening.contains(key)) { - opening.add(key); - break; - } else { - opening.wait(); - } + Http2Connection connection = connections.get(key); + if (connection != null) { // fast path if connection already exists + return CompletableFuture.completedFuture(connection); } - } - if (connection != null) { - return connection; - } - // we are opening the connection here blocking until it is done. - try { - connection = new Http2Connection(req, this); - } catch (Throwable t) { - synchronized (opening) { - opening.remove(key); - opening.notifyAll(); + + if (!req.secure()) { + return MinimalFuture.completedFuture(null); + } + + if (!opening.contains(key)) { + debug.log(Level.DEBUG, "Opening: %s", key); + opening.add(key); + } else { + CompletableFuture cf = new MinimalFuture<>(); + addToWaiting(key, cf); + return cf; } - throw t; - } - synchronized (opening) { - connections.put(key, connection); - opening.remove(key); - opening.notifyAll(); } - return connection; + return Http2Connection + .createAsync(req, this) + .whenComplete((conn, t) -> { + debug.log(Level.DEBUG, + "waking up dependents with created connection"); + synchronized (opening) { + Set> waiters = waiting.remove(key); + debug.log(Level.DEBUG, "Opening completed: %s", key); + opening.remove(key); + final Throwable cause = Utils.getCompletionCause(t); + if (waiters == null) { + debug.log(Level.DEBUG, "no dependent to wake up"); + return; + } else if (cause instanceof Http2Connection.ALPNException) { + waiters.forEach((cf1) -> cf1.completeAsync(() -> null, + client.theExecutor())); + } else if (cause != null) { + debug.log(Level.DEBUG, + () -> "waking up dependants: failed: " + cause); + waiters.forEach((cf1) -> cf1.completeExceptionally(cause)); + } else { + debug.log(Level.DEBUG, "waking up dependants: succeeded"); + waiters.forEach((cf1) -> cf1.completeAsync(() -> conn, + client.theExecutor())); + } + } + }); } - /* * TODO: If there isn't a connection to the same destination, then * store it. If there is already a connection, then close it @@ -134,6 +161,16 @@ connections.remove(c.key()); } + void stop() { + debug.log(Level.DEBUG, "stopping"); + connections.values().forEach(this::close); + connections.clear(); + } + + private void close(Http2Connection h2c) { + try { h2c.close(); } catch (Throwable t) {} + } + HttpClientImpl client() { return client; }