< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java

Print this page

        

@@ -1,7 +1,7 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 only, as
  * published by the Free Software Foundation.  Oracle designates this

@@ -23,21 +23,22 @@
  * questions.
  */
 
 package jdk.incubator.http;
 
-import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Base64;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
+import java.util.concurrent.CompletableFuture;
+import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Utils;
 import jdk.incubator.http.internal.frame.SettingsFrame;
 import static jdk.incubator.http.internal.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
 import static jdk.incubator.http.internal.frame.SettingsFrame.ENABLE_PUSH;
 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;

@@ -47,27 +48,44 @@
 /**
  *  Http2 specific aspects of HttpClientImpl
  */
 class Http2ClientImpl {
 
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    final static System.Logger debug =
+            Utils.getDebugLogger("Http2ClientImpl"::toString, DEBUG);
+
     private final HttpClientImpl client;
 
     Http2ClientImpl(HttpClientImpl client) {
         this.client = client;
     }
 
     /* Map key is "scheme:host:port" */
     private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
 
     private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
+    private final Map<String,Set<CompletableFuture<Http2Connection>>> waiting =
+    Collections.synchronizedMap(new HashMap<>());
 
-    boolean haveConnectionFor(URI uri, InetSocketAddress proxy) {
-        return connections.containsKey(Http2Connection.keyFor(uri,proxy));
+    private void addToWaiting(String key, CompletableFuture<Http2Connection> cf) {
+        synchronized (waiting) {
+            Set<CompletableFuture<Http2Connection>> waiters = waiting.get(key);
+            if (waiters == null) {
+                waiters = new HashSet<>();
+                waiting.put(key, waiters);
+            }
+            waiters.add(cf);
+        }
     }
 
+//    boolean haveConnectionFor(URI uri, InetSocketAddress proxy) {
+//        return connections.containsKey(Http2Connection.keyFor(uri,proxy));
+//    }
+
     /**
-     * If a https request then blocks and waits until a connection is opened.
+     * If a https request then async waits until a connection is opened.
      * Returns null if the request is 'http' as a different (upgrade)
      * mechanism is used.
      *
      * Only one connection per destination is created. Blocks when opening
      * connection, or when waiting for connection to be opened.

@@ -76,54 +94,63 @@
      * If the request is secure (https) then we open the connection here.
      * If not, then the more complicated upgrade from 1.1 to 2 happens (not here)
      * In latter case, when the Http2Connection is connected, putConnection() must
      * be called to store it.
      */
-    Http2Connection getConnectionFor(HttpRequestImpl req)
-            throws IOException, InterruptedException {
+    CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) {
         URI uri = req.uri();
-        InetSocketAddress proxy = req.proxy(client);
+        InetSocketAddress proxy = req.proxy();
         String key = Http2Connection.keyFor(uri, proxy);
+
+        synchronized (opening) {
         Http2Connection connection = connections.get(key);
         if (connection != null) { // fast path if connection already exists
-            return connection;
+                return CompletableFuture.completedFuture(connection);
         }
-        synchronized (opening) {
-            while ((connection = connections.get(key)) == null) {
+
                 if (!req.secure()) {
-                    return null;
+                return MinimalFuture.completedFuture(null);
                 }
+
                 if (!opening.contains(key)) {
+                debug.log(Level.DEBUG, "Opening: %s", key);
                     opening.add(key);
-                    break;
                 } else {
-                    opening.wait();
+                CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
+                addToWaiting(key, cf);
+                return cf;
                 }
             }
-        }
-        if (connection != null) {
-            return connection;
-        }
-        // we are opening the connection here blocking until it is done.
-        try {
-            connection = new Http2Connection(req, this);
-        } catch (Throwable t) {
+        return Http2Connection
+                .createAsync(req, this)
+                .whenComplete((conn, t) -> {
+                    debug.log(Level.DEBUG,
+                            "waking up dependents with created connection");
             synchronized (opening) {
+                        Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key);
+                        debug.log(Level.DEBUG, "Opening completed: %s", key);
                 opening.remove(key);
-                opening.notifyAll();
-            }
-            throw t;
+                        final Throwable cause = Utils.getCompletionCause(t);
+                        if (waiters == null) {
+                            debug.log(Level.DEBUG, "no dependent to wake up");
+                            return;
+                        } else if (cause instanceof Http2Connection.ALPNException) {
+                            waiters.forEach((cf1) -> cf1.completeAsync(() -> null,
+                                    client.theExecutor()));
+                        } else if (cause != null) {
+                            debug.log(Level.DEBUG,
+                                    () -> "waking up dependants: failed: " + cause);
+                            waiters.forEach((cf1) -> cf1.completeExceptionally(cause));
+                        } else  {
+                            debug.log(Level.DEBUG, "waking up dependants: succeeded");
+                            waiters.forEach((cf1) -> cf1.completeAsync(() -> conn,
+                                    client.theExecutor()));
         }
-        synchronized (opening) {
-            connections.put(key, connection);
-            opening.remove(key);
-            opening.notifyAll();
         }
-        return connection;
+                });
     }
 
-
     /*
      * TODO: If there isn't a connection to the same destination, then
      * store it. If there is already a connection, then close it
      */
     void putConnection(Http2Connection c) {

@@ -132,10 +159,20 @@
 
     void deleteConnection(Http2Connection c) {
         connections.remove(c.key());
     }
 
+    void stop() {
+        debug.log(Level.DEBUG, "stopping");
+        connections.values().forEach(this::close);
+        connections.clear();
+    }
+
+    private void close(Http2Connection h2c) {
+        try { h2c.close(); } catch (Throwable t) {}
+    }
+
     HttpClientImpl client() {
         return client;
     }
 
     /** Returns the client settings as a base64 (url) encoded string */
< prev index next >