< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java
Print this page
*** 1,7 ****
/*
! * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
--- 1,7 ----
/*
! * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
*** 23,43 ****
* questions.
*/
package jdk.incubator.http;
! import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Base64;
import java.util.Collections;
- import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
!
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.frame.SettingsFrame;
import static jdk.incubator.http.internal.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
import static jdk.incubator.http.internal.frame.SettingsFrame.ENABLE_PUSH;
import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
--- 23,44 ----
* questions.
*/
package jdk.incubator.http;
! import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
+ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
! import java.util.concurrent.CompletableFuture;
! import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.frame.SettingsFrame;
import static jdk.incubator.http.internal.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
import static jdk.incubator.http.internal.frame.SettingsFrame.ENABLE_PUSH;
import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
*** 47,73 ****
/**
* Http2 specific aspects of HttpClientImpl
*/
class Http2ClientImpl {
private final HttpClientImpl client;
Http2ClientImpl(HttpClientImpl client) {
this.client = client;
}
/* Map key is "scheme:host:port" */
private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
! boolean haveConnectionFor(URI uri, InetSocketAddress proxy) {
! return connections.containsKey(Http2Connection.keyFor(uri,proxy));
}
/**
! * If a https request then blocks and waits until a connection is opened.
* Returns null if the request is 'http' as a different (upgrade)
* mechanism is used.
*
* Only one connection per destination is created. Blocks when opening
* connection, or when waiting for connection to be opened.
--- 48,91 ----
/**
* Http2 specific aspects of HttpClientImpl
*/
class Http2ClientImpl {
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ final static System.Logger debug =
+ Utils.getDebugLogger("Http2ClientImpl"::toString, DEBUG);
+
private final HttpClientImpl client;
Http2ClientImpl(HttpClientImpl client) {
this.client = client;
}
/* Map key is "scheme:host:port" */
private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
+ private final Map<String,Set<CompletableFuture<Http2Connection>>> waiting =
+ Collections.synchronizedMap(new HashMap<>());
! private void addToWaiting(String key, CompletableFuture<Http2Connection> cf) {
! synchronized (waiting) {
! Set<CompletableFuture<Http2Connection>> waiters = waiting.get(key);
! if (waiters == null) {
! waiters = new HashSet<>();
! waiting.put(key, waiters);
! }
! waiters.add(cf);
! }
}
+ // boolean haveConnectionFor(URI uri, InetSocketAddress proxy) {
+ // return connections.containsKey(Http2Connection.keyFor(uri,proxy));
+ // }
+
/**
! * If a https request then async waits until a connection is opened.
* Returns null if the request is 'http' as a different (upgrade)
* mechanism is used.
*
* Only one connection per destination is created. Blocks when opening
* connection, or when waiting for connection to be opened.
*** 76,129 ****
* If the request is secure (https) then we open the connection here.
* If not, then the more complicated upgrade from 1.1 to 2 happens (not here)
* In latter case, when the Http2Connection is connected, putConnection() must
* be called to store it.
*/
! Http2Connection getConnectionFor(HttpRequestImpl req)
! throws IOException, InterruptedException {
URI uri = req.uri();
! InetSocketAddress proxy = req.proxy(client);
String key = Http2Connection.keyFor(uri, proxy);
Http2Connection connection = connections.get(key);
if (connection != null) { // fast path if connection already exists
! return connection;
}
! synchronized (opening) {
! while ((connection = connections.get(key)) == null) {
if (!req.secure()) {
! return null;
}
if (!opening.contains(key)) {
opening.add(key);
- break;
} else {
! opening.wait();
}
}
! }
! if (connection != null) {
! return connection;
! }
! // we are opening the connection here blocking until it is done.
! try {
! connection = new Http2Connection(req, this);
! } catch (Throwable t) {
synchronized (opening) {
opening.remove(key);
! opening.notifyAll();
! }
! throw t;
}
- synchronized (opening) {
- connections.put(key, connection);
- opening.remove(key);
- opening.notifyAll();
}
! return connection;
}
-
/*
* TODO: If there isn't a connection to the same destination, then
* store it. If there is already a connection, then close it
*/
void putConnection(Http2Connection c) {
--- 94,156 ----
* If the request is secure (https) then we open the connection here.
* If not, then the more complicated upgrade from 1.1 to 2 happens (not here)
* In latter case, when the Http2Connection is connected, putConnection() must
* be called to store it.
*/
! CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) {
URI uri = req.uri();
! InetSocketAddress proxy = req.proxy();
String key = Http2Connection.keyFor(uri, proxy);
+
+ synchronized (opening) {
Http2Connection connection = connections.get(key);
if (connection != null) { // fast path if connection already exists
! return CompletableFuture.completedFuture(connection);
}
!
if (!req.secure()) {
! return MinimalFuture.completedFuture(null);
}
+
if (!opening.contains(key)) {
+ debug.log(Level.DEBUG, "Opening: %s", key);
opening.add(key);
} else {
! CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
! addToWaiting(key, cf);
! return cf;
}
}
! return Http2Connection
! .createAsync(req, this)
! .whenComplete((conn, t) -> {
! debug.log(Level.DEBUG,
! "waking up dependents with created connection");
synchronized (opening) {
+ Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key);
+ debug.log(Level.DEBUG, "Opening completed: %s", key);
opening.remove(key);
! final Throwable cause = Utils.getCompletionCause(t);
! if (waiters == null) {
! debug.log(Level.DEBUG, "no dependent to wake up");
! return;
! } else if (cause instanceof Http2Connection.ALPNException) {
! waiters.forEach((cf1) -> cf1.completeAsync(() -> null,
! client.theExecutor()));
! } else if (cause != null) {
! debug.log(Level.DEBUG,
! () -> "waking up dependants: failed: " + cause);
! waiters.forEach((cf1) -> cf1.completeExceptionally(cause));
! } else {
! debug.log(Level.DEBUG, "waking up dependants: succeeded");
! waiters.forEach((cf1) -> cf1.completeAsync(() -> conn,
! client.theExecutor()));
}
}
! });
}
/*
* TODO: If there isn't a connection to the same destination, then
* store it. If there is already a connection, then close it
*/
void putConnection(Http2Connection c) {
*** 132,141 ****
--- 159,178 ----
void deleteConnection(Http2Connection c) {
connections.remove(c.key());
}
+ void stop() {
+ debug.log(Level.DEBUG, "stopping");
+ connections.values().forEach(this::close);
+ connections.clear();
+ }
+
+ private void close(Http2Connection h2c) {
+ try { h2c.close(); } catch (Throwable t) {}
+ }
+
HttpClientImpl client() {
return client;
}
/** Returns the client settings as a base64 (url) encoded string */
< prev index next >