< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java

Print this page

        

*** 1,7 **** /* ! * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this --- 1,7 ---- /* ! * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this
*** 24,37 **** --- 24,40 ---- */ package jdk.incubator.http; import java.io.IOException; + import java.lang.System.Logger.Level; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; + import java.util.function.Function; import jdk.incubator.http.internal.common.MinimalFuture; import static jdk.incubator.http.HttpClient.Version.HTTP_1_1; + import jdk.incubator.http.internal.common.Utils; /** * Splits request so that headers and body can be sent separately with optional * (multiple) responses in between (e.g. 100 Continue). Also request and * response always sent/received in different calls.
*** 44,53 **** --- 47,60 ---- * * These implementation classes are where work is allocated to threads. */ abstract class ExchangeImpl<T> { + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + private static final System.Logger DEBUG_LOGGER = + Utils.getDebugLogger("ExchangeImpl"::toString, DEBUG); + final Exchange<T> exchange; ExchangeImpl(Exchange<T> e) { // e == null means a http/2 pushed stream this.exchange = e;
*** 66,161 **** /** * Initiates a new exchange and assigns it to a connection if one exists * already. connection usually null. */ ! static <U> ExchangeImpl<U> get(Exchange<U> exchange, HttpConnection connection) ! throws IOException, InterruptedException { - HttpRequestImpl req = exchange.request(); if (exchange.version() == HTTP_1_1) { ! return new Http1Exchange<>(exchange, connection); } else { Http2ClientImpl c2 = exchange.client().client2(); // TODO: improve HttpRequestImpl request = exchange.request(); ! Http2Connection c; ! try { ! c = c2.getConnectionFor(request); ! } catch (Http2Connection.ALPNException e) { ! // failed to negotiate "h2" ! AbstractAsyncSSLConnection as = e.getConnection(); ! as.stopAsyncReading(); ! HttpConnection sslc = as.downgrade(); ! ExchangeImpl<U> ex = new Http1Exchange<>(exchange, sslc); return ex; } if (c == null) { // no existing connection. Send request with HTTP 1 and then // upgrade if successful ! ExchangeImpl<U> ex = new Http1Exchange<>(exchange, connection); exchange.h2Upgrade(); return ex; } ! return c.createStream(exchange); } } /* The following methods have separate HTTP/1.1 and HTTP/2 implementations */ ! /** ! * Sends the request headers only. May block until all sent. ! */ ! abstract void sendHeadersOnly() throws IOException, InterruptedException; ! // Blocking impl but in async style ! CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { ! // this is blocking. cf will already be completed. ! return MinimalFuture.supply(() -> { ! sendHeadersOnly(); ! return this; ! }); ! } /** ! * Gets response by blocking if necessary. This may be an ! * intermediate response (like 101) or a final response 200 etc. Returns ! * before body is read. */ ! abstract Response getResponse() throws IOException; ! abstract T readBody(HttpResponse.BodyHandler<T> handler, ! boolean returnConnectionToPool) throws IOException; ! abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, ! boolean returnConnectionToPool, ! Executor executor); /** ! * Async version of getResponse. Completes before body is read. */ ! abstract CompletableFuture<Response> getResponseAsync(Executor executor); /** ! * Sends a request body after request headers. */ ! abstract void sendBody() throws IOException, InterruptedException; ! // Async version of sendBody(). This only used when body sent separately ! // to headers (100 continue) ! CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { ! return MinimalFuture.supply(() -> { ! sendBody(); ! return this; ! }); ! } /** ! * Cancels a request. Not currently exposed through API. */ ! abstract void cancel(); /** ! * Cancels a request with a cause. Not currently exposed through API. */ ! abstract void cancel(IOException cause); } --- 73,203 ---- /** * Initiates a new exchange and assigns it to a connection if one exists * already. connection usually null. */ ! static <U> CompletableFuture<? extends ExchangeImpl<U>> ! get(Exchange<U> exchange, HttpConnection connection) { if (exchange.version() == HTTP_1_1) { ! DEBUG_LOGGER.log(Level.DEBUG, "get: HTTP/1.1: new Http1Exchange"); ! return createHttp1Exchange(exchange, connection); } else { Http2ClientImpl c2 = exchange.client().client2(); // TODO: improve HttpRequestImpl request = exchange.request(); ! CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request); ! DEBUG_LOGGER.log(Level.DEBUG, "get: Trying to get HTTP/2 connection"); ! return c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection)) ! .thenCompose(Function.identity()); ! } ! } ! ! private static <U> CompletableFuture<? extends ExchangeImpl<U>> ! createExchangeImpl(Http2Connection c, ! Throwable t, ! Exchange<U> exchange, ! HttpConnection connection) ! { ! DEBUG_LOGGER.log(Level.DEBUG, "handling HTTP/2 connection creation result"); ! if (t != null) { ! DEBUG_LOGGER.log(Level.DEBUG, ! "handling HTTP/2 connection creation failed: %s", ! (Object)t); ! t = Utils.getCompletionCause(t); ! if (t instanceof Http2Connection.ALPNException) { ! Http2Connection.ALPNException ee = (Http2Connection.ALPNException)t; ! AbstractAsyncSSLConnection as = ee.getConnection(); ! DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 with: %s", as); ! CompletableFuture<? extends ExchangeImpl<U>> ex = ! createHttp1Exchange(exchange, as); return ex; + } else { + DEBUG_LOGGER.log(Level.DEBUG, "HTTP/2 connection creation failed " + + "with unexpected exception: %s", (Object)t); + return CompletableFuture.failedFuture(t); + } } if (c == null) { // no existing connection. Send request with HTTP 1 and then // upgrade if successful ! DEBUG_LOGGER.log(Level.DEBUG, "new Http1Exchange, try to upgrade"); ! return createHttp1Exchange(exchange, connection) ! .thenApply((e) -> { exchange.h2Upgrade(); + return e; + }); + } else { + DEBUG_LOGGER.log(Level.DEBUG, "creating HTTP/2 streams"); + Stream<U> s = c.createStream(exchange); + CompletableFuture<? extends ExchangeImpl<U>> ex = MinimalFuture.completedFuture(s); return ex; } ! } ! ! private static <T> CompletableFuture<Http1Exchange<T>> ! createHttp1Exchange(Exchange<T> ex, HttpConnection as) ! { ! try { ! return MinimalFuture.completedFuture(new Http1Exchange<>(ex, as)); ! } catch (Throwable e) { ! return MinimalFuture.failedFuture(e); } } /* The following methods have separate HTTP/1.1 and HTTP/2 implementations */ ! abstract CompletableFuture<ExchangeImpl<T>> sendHeadersAsync(); ! /** Sends a request body, after request headers have been sent. */ ! abstract CompletableFuture<ExchangeImpl<T>> sendBodyAsync(); ! abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, ! boolean returnConnectionToPool, ! Executor executor); /** ! * Ignore/consume the body. */ ! abstract CompletableFuture<Void> ignoreBody(); ! /** Gets the response headers. Completes before body is read. */ ! abstract CompletableFuture<Response> getResponseAsync(Executor executor); ! ! /** Cancels a request. Not currently exposed through API. */ ! abstract void cancel(); /** ! * Cancels a request with a cause. Not currently exposed through API. */ ! abstract void cancel(IOException cause); /** ! * Called when the exchange is released, so that cleanup actions may be ! * performed - such as deregistering callbacks. ! * Typically released is called during upgrade, when an HTTP/2 stream ! * takes over from an Http1Exchange, or when a new exchange is created ! * during a multi exchange before the final response body was received. */ ! abstract void released(); ! /** ! * Called when the exchange is completed, so that cleanup actions may be ! * performed - such as deregistering callbacks. ! * Typically, completed is called at the end of the exchange, when the ! * final response body has been received (or an error has caused the ! * completion of the exchange). ! */ ! abstract void completed(); /** ! * Returns true if this exchange was canceled. ! * @return true if this exchange was canceled. */ ! abstract boolean isCanceled(); /** ! * Returns the cause for which this exchange was canceled, if available. ! * @return the cause for which this exchange was canceled, if available. */ ! abstract Throwable getCancelCause(); }
< prev index next >