--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java 2017-11-30 04:03:51.748420200 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java 2017-11-30 04:03:51.540402015 -0800 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -26,10 +26,13 @@ package jdk.incubator.http; import java.io.IOException; +import java.lang.System.Logger.Level; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.function.Function; import jdk.incubator.http.internal.common.MinimalFuture; import static jdk.incubator.http.HttpClient.Version.HTTP_1_1; +import jdk.incubator.http.internal.common.Utils; /** * Splits request so that headers and body can be sent separately with optional @@ -46,6 +49,10 @@ */ abstract class ExchangeImpl { + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + private static final System.Logger DEBUG_LOGGER = + Utils.getDebugLogger("ExchangeImpl"::toString, DEBUG); + final Exchange exchange; ExchangeImpl(Exchange e) { @@ -68,94 +75,129 @@ * Initiates a new exchange and assigns it to a connection if one exists * already. connection usually null. */ - static ExchangeImpl get(Exchange exchange, HttpConnection connection) - throws IOException, InterruptedException + static CompletableFuture> + get(Exchange exchange, HttpConnection connection) { - HttpRequestImpl req = exchange.request(); if (exchange.version() == HTTP_1_1) { - return new Http1Exchange<>(exchange, connection); + DEBUG_LOGGER.log(Level.DEBUG, "get: HTTP/1.1: new Http1Exchange"); + return createHttp1Exchange(exchange, connection); } else { Http2ClientImpl c2 = exchange.client().client2(); // TODO: improve HttpRequestImpl request = exchange.request(); - Http2Connection c; - try { - c = c2.getConnectionFor(request); - } catch (Http2Connection.ALPNException e) { - // failed to negotiate "h2" - AbstractAsyncSSLConnection as = e.getConnection(); - as.stopAsyncReading(); - HttpConnection sslc = as.downgrade(); - ExchangeImpl ex = new Http1Exchange<>(exchange, sslc); - return ex; - } - if (c == null) { - // no existing connection. Send request with HTTP 1 and then - // upgrade if successful - ExchangeImpl ex = new Http1Exchange<>(exchange, connection); - exchange.h2Upgrade(); + CompletableFuture c2f = c2.getConnectionFor(request); + DEBUG_LOGGER.log(Level.DEBUG, "get: Trying to get HTTP/2 connection"); + return c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection)) + .thenCompose(Function.identity()); + } + } + + private static CompletableFuture> + createExchangeImpl(Http2Connection c, + Throwable t, + Exchange exchange, + HttpConnection connection) + { + DEBUG_LOGGER.log(Level.DEBUG, "handling HTTP/2 connection creation result"); + if (t != null) { + DEBUG_LOGGER.log(Level.DEBUG, + "handling HTTP/2 connection creation failed: %s", + (Object)t); + t = Utils.getCompletionCause(t); + if (t instanceof Http2Connection.ALPNException) { + Http2Connection.ALPNException ee = (Http2Connection.ALPNException)t; + AbstractAsyncSSLConnection as = ee.getConnection(); + DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 with: %s", as); + CompletableFuture> ex = + createHttp1Exchange(exchange, as); return ex; + } else { + DEBUG_LOGGER.log(Level.DEBUG, "HTTP/2 connection creation failed " + + "with unexpected exception: %s", (Object)t); + return CompletableFuture.failedFuture(t); } - return c.createStream(exchange); + } + if (c == null) { + // no existing connection. Send request with HTTP 1 and then + // upgrade if successful + DEBUG_LOGGER.log(Level.DEBUG, "new Http1Exchange, try to upgrade"); + return createHttp1Exchange(exchange, connection) + .thenApply((e) -> { + exchange.h2Upgrade(); + return e; + }); + } else { + DEBUG_LOGGER.log(Level.DEBUG, "creating HTTP/2 streams"); + Stream s = c.createStream(exchange); + CompletableFuture> ex = MinimalFuture.completedFuture(s); + return ex; + } + } + + private static CompletableFuture> + createHttp1Exchange(Exchange ex, HttpConnection as) + { + try { + return MinimalFuture.completedFuture(new Http1Exchange<>(ex, as)); + } catch (Throwable e) { + return MinimalFuture.failedFuture(e); } } /* The following methods have separate HTTP/1.1 and HTTP/2 implementations */ - /** - * Sends the request headers only. May block until all sent. - */ - abstract void sendHeadersOnly() throws IOException, InterruptedException; + abstract CompletableFuture> sendHeadersAsync(); - // Blocking impl but in async style + /** Sends a request body, after request headers have been sent. */ + abstract CompletableFuture> sendBodyAsync(); - CompletableFuture> sendHeadersAsync() { - // this is blocking. cf will already be completed. - return MinimalFuture.supply(() -> { - sendHeadersOnly(); - return this; - }); - } + abstract CompletableFuture readBodyAsync(HttpResponse.BodyHandler handler, + boolean returnConnectionToPool, + Executor executor); /** - * Gets response by blocking if necessary. This may be an - * intermediate response (like 101) or a final response 200 etc. Returns - * before body is read. + * Ignore/consume the body. */ - abstract Response getResponse() throws IOException; + abstract CompletableFuture ignoreBody(); - abstract T readBody(HttpResponse.BodyHandler handler, - boolean returnConnectionToPool) throws IOException; + /** Gets the response headers. Completes before body is read. */ + abstract CompletableFuture getResponseAsync(Executor executor); - abstract CompletableFuture readBodyAsync(HttpResponse.BodyHandler handler, - boolean returnConnectionToPool, - Executor executor); + + /** Cancels a request. Not currently exposed through API. */ + abstract void cancel(); /** - * Async version of getResponse. Completes before body is read. + * Cancels a request with a cause. Not currently exposed through API. */ - abstract CompletableFuture getResponseAsync(Executor executor); + abstract void cancel(IOException cause); /** - * Sends a request body after request headers. + * Called when the exchange is released, so that cleanup actions may be + * performed - such as deregistering callbacks. + * Typically released is called during upgrade, when an HTTP/2 stream + * takes over from an Http1Exchange, or when a new exchange is created + * during a multi exchange before the final response body was received. */ - abstract void sendBody() throws IOException, InterruptedException; + abstract void released(); - // Async version of sendBody(). This only used when body sent separately - // to headers (100 continue) - CompletableFuture> sendBodyAsync() { - return MinimalFuture.supply(() -> { - sendBody(); - return this; - }); - } + /** + * Called when the exchange is completed, so that cleanup actions may be + * performed - such as deregistering callbacks. + * Typically, completed is called at the end of the exchange, when the + * final response body has been received (or an error has caused the + * completion of the exchange). + */ + abstract void completed(); /** - * Cancels a request. Not currently exposed through API. + * Returns true if this exchange was canceled. + * @return true if this exchange was canceled. */ - abstract void cancel(); + abstract boolean isCanceled(); /** - * Cancels a request with a cause. Not currently exposed through API. + * Returns the cause for which this exchange was canceled, if available. + * @return the cause for which this exchange was canceled, if available. */ - abstract void cancel(IOException cause); + abstract Throwable getCancelCause(); }