< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java

Print this page

        

@@ -1,7 +1,7 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 only, as
  * published by the Free Software Foundation.  Oracle designates this

@@ -24,14 +24,17 @@
  */
 
 package jdk.incubator.http;
 
 import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.Function;
 import jdk.incubator.http.internal.common.MinimalFuture;
 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
+import jdk.incubator.http.internal.common.Utils;
 
 /**
  * Splits request so that headers and body can be sent separately with optional
  * (multiple) responses in between (e.g. 100 Continue). Also request and
  * response always sent/received in different calls.

@@ -44,10 +47,14 @@
  *
  * These implementation classes are where work is allocated to threads.
  */
 abstract class ExchangeImpl<T> {
 
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    private static final System.Logger DEBUG_LOGGER =
+            Utils.getDebugLogger("ExchangeImpl"::toString, DEBUG);
+
     final Exchange<T> exchange;
 
     ExchangeImpl(Exchange<T> e) {
         // e == null means a http/2 pushed stream
         this.exchange = e;

@@ -66,96 +73,131 @@
 
     /**
      * Initiates a new exchange and assigns it to a connection if one exists
      * already. connection usually null.
      */
-    static <U> ExchangeImpl<U> get(Exchange<U> exchange, HttpConnection connection)
-        throws IOException, InterruptedException
+    static <U> CompletableFuture<? extends ExchangeImpl<U>>
+    get(Exchange<U> exchange, HttpConnection connection)
     {
-        HttpRequestImpl req = exchange.request();
         if (exchange.version() == HTTP_1_1) {
-            return new Http1Exchange<>(exchange, connection);
+            DEBUG_LOGGER.log(Level.DEBUG, "get: HTTP/1.1: new Http1Exchange");
+            return createHttp1Exchange(exchange, connection);
         } else {
             Http2ClientImpl c2 = exchange.client().client2(); // TODO: improve
             HttpRequestImpl request = exchange.request();
-            Http2Connection c;
-            try {
-                c = c2.getConnectionFor(request);
-            } catch (Http2Connection.ALPNException e) {
-                // failed to negotiate "h2"
-                AbstractAsyncSSLConnection as = e.getConnection();
-                as.stopAsyncReading();
-                HttpConnection sslc = as.downgrade();
-                ExchangeImpl<U> ex = new Http1Exchange<>(exchange, sslc);
+            CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request);
+            DEBUG_LOGGER.log(Level.DEBUG, "get: Trying to get HTTP/2 connection");
+            return c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection))
+                    .thenCompose(Function.identity());
+        }
+    }
+
+    private static <U> CompletableFuture<? extends ExchangeImpl<U>>
+    createExchangeImpl(Http2Connection c,
+                       Throwable t,
+                       Exchange<U> exchange,
+                       HttpConnection connection)
+    {
+        DEBUG_LOGGER.log(Level.DEBUG, "handling HTTP/2 connection creation result");
+        if (t != null) {
+            DEBUG_LOGGER.log(Level.DEBUG,
+                             "handling HTTP/2 connection creation failed: %s",
+                             (Object)t);
+            t = Utils.getCompletionCause(t);
+            if (t instanceof Http2Connection.ALPNException) {
+                Http2Connection.ALPNException ee = (Http2Connection.ALPNException)t;
+                AbstractAsyncSSLConnection as = ee.getConnection();
+                DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 with: %s", as);
+                CompletableFuture<? extends ExchangeImpl<U>> ex =
+                        createHttp1Exchange(exchange, as);
                 return ex;
+            } else {
+                DEBUG_LOGGER.log(Level.DEBUG, "HTTP/2 connection creation failed "
+                                  + "with unexpected exception: %s", (Object)t);
+                return CompletableFuture.failedFuture(t);
+            }
             }
             if (c == null) {
                 // no existing connection. Send request with HTTP 1 and then
                 // upgrade if successful
-                ExchangeImpl<U> ex = new Http1Exchange<>(exchange, connection);
+            DEBUG_LOGGER.log(Level.DEBUG, "new Http1Exchange, try to upgrade");
+            return createHttp1Exchange(exchange, connection)
+                    .thenApply((e) -> {
                 exchange.h2Upgrade();
+                        return e;
+                    });
+        } else {
+            DEBUG_LOGGER.log(Level.DEBUG, "creating HTTP/2 streams");
+            Stream<U> s = c.createStream(exchange);
+            CompletableFuture<? extends ExchangeImpl<U>> ex = MinimalFuture.completedFuture(s);
                 return ex;
             }
-            return c.createStream(exchange);
+    }
+
+    private static <T> CompletableFuture<Http1Exchange<T>>
+    createHttp1Exchange(Exchange<T> ex, HttpConnection as)
+    {
+        try {
+            return MinimalFuture.completedFuture(new Http1Exchange<>(ex, as));
+        } catch (Throwable e) {
+            return MinimalFuture.failedFuture(e);
         }
     }
 
     /* The following methods have separate HTTP/1.1 and HTTP/2 implementations */
 
-    /**
-     * Sends the request headers only. May block until all sent.
-     */
-    abstract void sendHeadersOnly() throws IOException, InterruptedException;
+    abstract CompletableFuture<ExchangeImpl<T>> sendHeadersAsync();
 
-    // Blocking impl but in async style
+    /** Sends a request body, after request headers have been sent. */
+    abstract CompletableFuture<ExchangeImpl<T>> sendBodyAsync();
 
-    CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
-        // this is blocking. cf will already be completed.
-        return MinimalFuture.supply(() -> {
-            sendHeadersOnly();
-            return this;
-        });
-    }
+    abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
+                                                boolean returnConnectionToPool,
+                                                Executor executor);
 
     /**
-     * Gets response by blocking if necessary. This may be an
-     * intermediate response (like 101) or a final response 200 etc. Returns
-     * before body is read.
+     * Ignore/consume the body.
      */
-    abstract Response getResponse() throws IOException;
+    abstract CompletableFuture<Void> ignoreBody();
 
-    abstract T readBody(HttpResponse.BodyHandler<T> handler,
-                        boolean returnConnectionToPool) throws IOException;
+    /** Gets the response headers. Completes before body is read. */
+    abstract CompletableFuture<Response> getResponseAsync(Executor executor);
 
-    abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
-                                                boolean returnConnectionToPool,
-                                                Executor executor);
+
+    /** Cancels a request.  Not currently exposed through API. */
+    abstract void cancel();
 
     /**
-     * Async version of getResponse. Completes before body is read.
+     * Cancels a request with a cause.  Not currently exposed through API.
      */
-    abstract CompletableFuture<Response> getResponseAsync(Executor executor);
+    abstract void cancel(IOException cause);
 
     /**
-     * Sends a request body after request headers.
+     * Called when the exchange is released, so that cleanup actions may be
+     * performed - such as deregistering callbacks.
+     * Typically released is called during upgrade, when an HTTP/2 stream
+     * takes over from an Http1Exchange, or when a new exchange is created
+     * during a multi exchange before the final response body was received.
      */
-    abstract void sendBody() throws IOException, InterruptedException;
+    abstract void released();
 
-    // Async version of sendBody(). This only used when body sent separately
-    // to headers (100 continue)
-    CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
-        return MinimalFuture.supply(() -> {
-            sendBody();
-            return this;
-        });
-    }
+    /**
+     * Called when the exchange is completed, so that cleanup actions may be
+     * performed - such as deregistering callbacks.
+     * Typically, completed is called at the end of the exchange, when the
+     * final response body has been received (or an error has caused the
+     * completion of the exchange).
+     */
+    abstract void completed();
 
     /**
-     * Cancels a request.  Not currently exposed through API.
+     * Returns true if this exchange was canceled.
+     * @return true if this exchange was canceled.
      */
-    abstract void cancel();
+    abstract boolean isCanceled();
 
     /**
-     * Cancels a request with a cause.  Not currently exposed through API.
+     * Returns the cause for which this exchange was canceled, if available.
+     * @return the cause for which this exchange was canceled, if available.
      */
-    abstract void cancel(IOException cause);
+    abstract Throwable getCancelCause();
 }
< prev index next >