< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java
Print this page
*** 1,7 ****
/*
! * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
--- 1,7 ----
/*
! * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
*** 24,37 ****
--- 24,40 ----
*/
package jdk.incubator.http;
import java.io.IOException;
+ import java.lang.System.Logger.Level;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+ import java.util.function.Function;
import jdk.incubator.http.internal.common.MinimalFuture;
import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
+ import jdk.incubator.http.internal.common.Utils;
/**
* Splits request so that headers and body can be sent separately with optional
* (multiple) responses in between (e.g. 100 Continue). Also request and
* response always sent/received in different calls.
*** 44,53 ****
--- 47,60 ----
*
* These implementation classes are where work is allocated to threads.
*/
abstract class ExchangeImpl<T> {
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ private static final System.Logger DEBUG_LOGGER =
+ Utils.getDebugLogger("ExchangeImpl"::toString, DEBUG);
+
final Exchange<T> exchange;
ExchangeImpl(Exchange<T> e) {
// e == null means a http/2 pushed stream
this.exchange = e;
*** 66,161 ****
/**
* Initiates a new exchange and assigns it to a connection if one exists
* already. connection usually null.
*/
! static <U> ExchangeImpl<U> get(Exchange<U> exchange, HttpConnection connection)
! throws IOException, InterruptedException
{
- HttpRequestImpl req = exchange.request();
if (exchange.version() == HTTP_1_1) {
! return new Http1Exchange<>(exchange, connection);
} else {
Http2ClientImpl c2 = exchange.client().client2(); // TODO: improve
HttpRequestImpl request = exchange.request();
! Http2Connection c;
! try {
! c = c2.getConnectionFor(request);
! } catch (Http2Connection.ALPNException e) {
! // failed to negotiate "h2"
! AbstractAsyncSSLConnection as = e.getConnection();
! as.stopAsyncReading();
! HttpConnection sslc = as.downgrade();
! ExchangeImpl<U> ex = new Http1Exchange<>(exchange, sslc);
return ex;
}
if (c == null) {
// no existing connection. Send request with HTTP 1 and then
// upgrade if successful
! ExchangeImpl<U> ex = new Http1Exchange<>(exchange, connection);
exchange.h2Upgrade();
return ex;
}
! return c.createStream(exchange);
}
}
/* The following methods have separate HTTP/1.1 and HTTP/2 implementations */
! /**
! * Sends the request headers only. May block until all sent.
! */
! abstract void sendHeadersOnly() throws IOException, InterruptedException;
! // Blocking impl but in async style
! CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
! // this is blocking. cf will already be completed.
! return MinimalFuture.supply(() -> {
! sendHeadersOnly();
! return this;
! });
! }
/**
! * Gets response by blocking if necessary. This may be an
! * intermediate response (like 101) or a final response 200 etc. Returns
! * before body is read.
*/
! abstract Response getResponse() throws IOException;
! abstract T readBody(HttpResponse.BodyHandler<T> handler,
! boolean returnConnectionToPool) throws IOException;
! abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
! boolean returnConnectionToPool,
! Executor executor);
/**
! * Async version of getResponse. Completes before body is read.
*/
! abstract CompletableFuture<Response> getResponseAsync(Executor executor);
/**
! * Sends a request body after request headers.
*/
! abstract void sendBody() throws IOException, InterruptedException;
! // Async version of sendBody(). This only used when body sent separately
! // to headers (100 continue)
! CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
! return MinimalFuture.supply(() -> {
! sendBody();
! return this;
! });
! }
/**
! * Cancels a request. Not currently exposed through API.
*/
! abstract void cancel();
/**
! * Cancels a request with a cause. Not currently exposed through API.
*/
! abstract void cancel(IOException cause);
}
--- 73,203 ----
/**
* Initiates a new exchange and assigns it to a connection if one exists
* already. connection usually null.
*/
! static <U> CompletableFuture<? extends ExchangeImpl<U>>
! get(Exchange<U> exchange, HttpConnection connection)
{
if (exchange.version() == HTTP_1_1) {
! DEBUG_LOGGER.log(Level.DEBUG, "get: HTTP/1.1: new Http1Exchange");
! return createHttp1Exchange(exchange, connection);
} else {
Http2ClientImpl c2 = exchange.client().client2(); // TODO: improve
HttpRequestImpl request = exchange.request();
! CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request);
! DEBUG_LOGGER.log(Level.DEBUG, "get: Trying to get HTTP/2 connection");
! return c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection))
! .thenCompose(Function.identity());
! }
! }
!
! private static <U> CompletableFuture<? extends ExchangeImpl<U>>
! createExchangeImpl(Http2Connection c,
! Throwable t,
! Exchange<U> exchange,
! HttpConnection connection)
! {
! DEBUG_LOGGER.log(Level.DEBUG, "handling HTTP/2 connection creation result");
! if (t != null) {
! DEBUG_LOGGER.log(Level.DEBUG,
! "handling HTTP/2 connection creation failed: %s",
! (Object)t);
! t = Utils.getCompletionCause(t);
! if (t instanceof Http2Connection.ALPNException) {
! Http2Connection.ALPNException ee = (Http2Connection.ALPNException)t;
! AbstractAsyncSSLConnection as = ee.getConnection();
! DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 with: %s", as);
! CompletableFuture<? extends ExchangeImpl<U>> ex =
! createHttp1Exchange(exchange, as);
return ex;
+ } else {
+ DEBUG_LOGGER.log(Level.DEBUG, "HTTP/2 connection creation failed "
+ + "with unexpected exception: %s", (Object)t);
+ return CompletableFuture.failedFuture(t);
+ }
}
if (c == null) {
// no existing connection. Send request with HTTP 1 and then
// upgrade if successful
! DEBUG_LOGGER.log(Level.DEBUG, "new Http1Exchange, try to upgrade");
! return createHttp1Exchange(exchange, connection)
! .thenApply((e) -> {
exchange.h2Upgrade();
+ return e;
+ });
+ } else {
+ DEBUG_LOGGER.log(Level.DEBUG, "creating HTTP/2 streams");
+ Stream<U> s = c.createStream(exchange);
+ CompletableFuture<? extends ExchangeImpl<U>> ex = MinimalFuture.completedFuture(s);
return ex;
}
! }
!
! private static <T> CompletableFuture<Http1Exchange<T>>
! createHttp1Exchange(Exchange<T> ex, HttpConnection as)
! {
! try {
! return MinimalFuture.completedFuture(new Http1Exchange<>(ex, as));
! } catch (Throwable e) {
! return MinimalFuture.failedFuture(e);
}
}
/* The following methods have separate HTTP/1.1 and HTTP/2 implementations */
! abstract CompletableFuture<ExchangeImpl<T>> sendHeadersAsync();
! /** Sends a request body, after request headers have been sent. */
! abstract CompletableFuture<ExchangeImpl<T>> sendBodyAsync();
! abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
! boolean returnConnectionToPool,
! Executor executor);
/**
! * Ignore/consume the body.
*/
! abstract CompletableFuture<Void> ignoreBody();
! /** Gets the response headers. Completes before body is read. */
! abstract CompletableFuture<Response> getResponseAsync(Executor executor);
!
! /** Cancels a request. Not currently exposed through API. */
! abstract void cancel();
/**
! * Cancels a request with a cause. Not currently exposed through API.
*/
! abstract void cancel(IOException cause);
/**
! * Called when the exchange is released, so that cleanup actions may be
! * performed - such as deregistering callbacks.
! * Typically released is called during upgrade, when an HTTP/2 stream
! * takes over from an Http1Exchange, or when a new exchange is created
! * during a multi exchange before the final response body was received.
*/
! abstract void released();
! /**
! * Called when the exchange is completed, so that cleanup actions may be
! * performed - such as deregistering callbacks.
! * Typically, completed is called at the end of the exchange, when the
! * final response body has been received (or an error has caused the
! * completion of the exchange).
! */
! abstract void completed();
/**
! * Returns true if this exchange was canceled.
! * @return true if this exchange was canceled.
*/
! abstract boolean isCanceled();
/**
! * Returns the cause for which this exchange was canceled, if available.
! * @return the cause for which this exchange was canceled, if available.
*/
! abstract Throwable getCancelCause();
}
< prev index next >