< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java
Print this page
@@ -1,7 +1,7 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
@@ -24,14 +24,17 @@
*/
package jdk.incubator.http;
import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.function.Function;
import jdk.incubator.http.internal.common.MinimalFuture;
import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
+import jdk.incubator.http.internal.common.Utils;
/**
* Splits request so that headers and body can be sent separately with optional
* (multiple) responses in between (e.g. 100 Continue). Also request and
* response always sent/received in different calls.
@@ -44,10 +47,14 @@
*
* These implementation classes are where work is allocated to threads.
*/
abstract class ExchangeImpl<T> {
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ private static final System.Logger DEBUG_LOGGER =
+ Utils.getDebugLogger("ExchangeImpl"::toString, DEBUG);
+
final Exchange<T> exchange;
ExchangeImpl(Exchange<T> e) {
// e == null means a http/2 pushed stream
this.exchange = e;
@@ -66,96 +73,131 @@
/**
* Initiates a new exchange and assigns it to a connection if one exists
* already. connection usually null.
*/
- static <U> ExchangeImpl<U> get(Exchange<U> exchange, HttpConnection connection)
- throws IOException, InterruptedException
+ static <U> CompletableFuture<? extends ExchangeImpl<U>>
+ get(Exchange<U> exchange, HttpConnection connection)
{
- HttpRequestImpl req = exchange.request();
if (exchange.version() == HTTP_1_1) {
- return new Http1Exchange<>(exchange, connection);
+ DEBUG_LOGGER.log(Level.DEBUG, "get: HTTP/1.1: new Http1Exchange");
+ return createHttp1Exchange(exchange, connection);
} else {
Http2ClientImpl c2 = exchange.client().client2(); // TODO: improve
HttpRequestImpl request = exchange.request();
- Http2Connection c;
- try {
- c = c2.getConnectionFor(request);
- } catch (Http2Connection.ALPNException e) {
- // failed to negotiate "h2"
- AbstractAsyncSSLConnection as = e.getConnection();
- as.stopAsyncReading();
- HttpConnection sslc = as.downgrade();
- ExchangeImpl<U> ex = new Http1Exchange<>(exchange, sslc);
+ CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request);
+ DEBUG_LOGGER.log(Level.DEBUG, "get: Trying to get HTTP/2 connection");
+ return c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection))
+ .thenCompose(Function.identity());
+ }
+ }
+
+ private static <U> CompletableFuture<? extends ExchangeImpl<U>>
+ createExchangeImpl(Http2Connection c,
+ Throwable t,
+ Exchange<U> exchange,
+ HttpConnection connection)
+ {
+ DEBUG_LOGGER.log(Level.DEBUG, "handling HTTP/2 connection creation result");
+ if (t != null) {
+ DEBUG_LOGGER.log(Level.DEBUG,
+ "handling HTTP/2 connection creation failed: %s",
+ (Object)t);
+ t = Utils.getCompletionCause(t);
+ if (t instanceof Http2Connection.ALPNException) {
+ Http2Connection.ALPNException ee = (Http2Connection.ALPNException)t;
+ AbstractAsyncSSLConnection as = ee.getConnection();
+ DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 with: %s", as);
+ CompletableFuture<? extends ExchangeImpl<U>> ex =
+ createHttp1Exchange(exchange, as);
return ex;
+ } else {
+ DEBUG_LOGGER.log(Level.DEBUG, "HTTP/2 connection creation failed "
+ + "with unexpected exception: %s", (Object)t);
+ return CompletableFuture.failedFuture(t);
+ }
}
if (c == null) {
// no existing connection. Send request with HTTP 1 and then
// upgrade if successful
- ExchangeImpl<U> ex = new Http1Exchange<>(exchange, connection);
+ DEBUG_LOGGER.log(Level.DEBUG, "new Http1Exchange, try to upgrade");
+ return createHttp1Exchange(exchange, connection)
+ .thenApply((e) -> {
exchange.h2Upgrade();
+ return e;
+ });
+ } else {
+ DEBUG_LOGGER.log(Level.DEBUG, "creating HTTP/2 streams");
+ Stream<U> s = c.createStream(exchange);
+ CompletableFuture<? extends ExchangeImpl<U>> ex = MinimalFuture.completedFuture(s);
return ex;
}
- return c.createStream(exchange);
+ }
+
+ private static <T> CompletableFuture<Http1Exchange<T>>
+ createHttp1Exchange(Exchange<T> ex, HttpConnection as)
+ {
+ try {
+ return MinimalFuture.completedFuture(new Http1Exchange<>(ex, as));
+ } catch (Throwable e) {
+ return MinimalFuture.failedFuture(e);
}
}
/* The following methods have separate HTTP/1.1 and HTTP/2 implementations */
- /**
- * Sends the request headers only. May block until all sent.
- */
- abstract void sendHeadersOnly() throws IOException, InterruptedException;
+ abstract CompletableFuture<ExchangeImpl<T>> sendHeadersAsync();
- // Blocking impl but in async style
+ /** Sends a request body, after request headers have been sent. */
+ abstract CompletableFuture<ExchangeImpl<T>> sendBodyAsync();
- CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
- // this is blocking. cf will already be completed.
- return MinimalFuture.supply(() -> {
- sendHeadersOnly();
- return this;
- });
- }
+ abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
+ boolean returnConnectionToPool,
+ Executor executor);
/**
- * Gets response by blocking if necessary. This may be an
- * intermediate response (like 101) or a final response 200 etc. Returns
- * before body is read.
+ * Ignore/consume the body.
*/
- abstract Response getResponse() throws IOException;
+ abstract CompletableFuture<Void> ignoreBody();
- abstract T readBody(HttpResponse.BodyHandler<T> handler,
- boolean returnConnectionToPool) throws IOException;
+ /** Gets the response headers. Completes before body is read. */
+ abstract CompletableFuture<Response> getResponseAsync(Executor executor);
- abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
- boolean returnConnectionToPool,
- Executor executor);
+
+ /** Cancels a request. Not currently exposed through API. */
+ abstract void cancel();
/**
- * Async version of getResponse. Completes before body is read.
+ * Cancels a request with a cause. Not currently exposed through API.
*/
- abstract CompletableFuture<Response> getResponseAsync(Executor executor);
+ abstract void cancel(IOException cause);
/**
- * Sends a request body after request headers.
+ * Called when the exchange is released, so that cleanup actions may be
+ * performed - such as deregistering callbacks.
+ * Typically released is called during upgrade, when an HTTP/2 stream
+ * takes over from an Http1Exchange, or when a new exchange is created
+ * during a multi exchange before the final response body was received.
*/
- abstract void sendBody() throws IOException, InterruptedException;
+ abstract void released();
- // Async version of sendBody(). This only used when body sent separately
- // to headers (100 continue)
- CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
- return MinimalFuture.supply(() -> {
- sendBody();
- return this;
- });
- }
+ /**
+ * Called when the exchange is completed, so that cleanup actions may be
+ * performed - such as deregistering callbacks.
+ * Typically, completed is called at the end of the exchange, when the
+ * final response body has been received (or an error has caused the
+ * completion of the exchange).
+ */
+ abstract void completed();
/**
- * Cancels a request. Not currently exposed through API.
+ * Returns true if this exchange was canceled.
+ * @return true if this exchange was canceled.
*/
- abstract void cancel();
+ abstract boolean isCanceled();
/**
- * Cancels a request with a cause. Not currently exposed through API.
+ * Returns the cause for which this exchange was canceled, if available.
+ * @return the cause for which this exchange was canceled, if available.
*/
- abstract void cancel(IOException cause);
+ abstract Throwable getCancelCause();
}
< prev index next >