/* * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package jdk.incubator.http; import java.io.IOException; import java.lang.System.Logger.Level; import java.time.Duration; import java.util.List; import java.security.AccessControlContext; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import jdk.incubator.http.HttpResponse.UntrustedBodyHandler; import jdk.incubator.http.internal.common.Log; import jdk.incubator.http.internal.common.MinimalFuture; import jdk.incubator.http.internal.common.ConnectionExpiredException; import jdk.incubator.http.internal.common.Utils; import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture; import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture; /** * Encapsulates multiple Exchanges belonging to one HttpRequestImpl. * - manages filters * - retries due to filters. * - I/O errors and most other exceptions get returned directly to user * * Creates a new Exchange for each request/response interaction */ class MultiExchange { static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. static final System.Logger DEBUG_LOGGER = Utils.getDebugLogger("MultiExchange"::toString, DEBUG); private final HttpRequest userRequest; // the user request private final HttpRequestImpl request; // a copy of the user request final AccessControlContext acc; final HttpClientImpl client; final HttpResponse.BodyHandler responseHandler; final Executor executor; final HttpResponse.MultiSubscriber multiResponseSubscriber; final AtomicInteger attempts = new AtomicInteger(); HttpRequestImpl currentreq; // used for async only Exchange exchange; // the current exchange Exchange previous; volatile Throwable retryCause; volatile boolean expiredOnce; volatile HttpResponse response = null; // Maximum number of times a request will be retried/redirected // for any reason static final int DEFAULT_MAX_ATTEMPTS = 5; static final int max_attempts = Utils.getIntegerNetProperty( "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS ); private final List filters; TimedEvent timedEvent; volatile boolean cancelled; final PushGroup pushGroup; /** * Filter fields. These are attached as required by filters * and only used by the filter implementations. This could be * generalised into Objects that are passed explicitly to the filters * (one per MultiExchange object, and one per Exchange object possibly) */ volatile AuthenticationFilter.AuthInfo serverauth, proxyauth; // RedirectHandler volatile int numberOfRedirects = 0; /** * MultiExchange with one final response. */ MultiExchange(HttpRequest userRequest, HttpRequestImpl requestImpl, HttpClientImpl client, HttpResponse.BodyHandler responseHandler, AccessControlContext acc) { this.previous = null; this.userRequest = userRequest; this.request = requestImpl; this.currentreq = request; this.client = client; this.filters = client.filterChain(); this.acc = acc; this.executor = client.theExecutor(); this.responseHandler = responseHandler; if (acc != null) { // Restricts the file publisher with the senders ACC, if any if (responseHandler instanceof UntrustedBodyHandler) ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc); } this.exchange = new Exchange<>(request, this); this.multiResponseSubscriber = null; this.pushGroup = null; } /** * MultiExchange with multiple responses (HTTP/2 server pushes). */ MultiExchange(HttpRequest userRequest, HttpRequestImpl requestImpl, HttpClientImpl client, HttpResponse.MultiSubscriber multiResponseSubscriber, AccessControlContext acc) { this.previous = null; this.userRequest = userRequest; this.request = requestImpl; this.currentreq = request; this.client = client; this.filters = client.filterChain(); this.acc = acc; this.executor = client.theExecutor(); this.multiResponseSubscriber = multiResponseSubscriber; this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc); this.exchange = new Exchange<>(request, this); this.responseHandler = pushGroup.mainResponseHandler(); } // CompletableFuture multiCompletionCF() { // return pushGroup.groupResult(); // } private synchronized Exchange getExchange() { return exchange; } HttpClientImpl client() { return client; } // HttpClient.Redirect followRedirects() { // return client.followRedirects(); // } HttpClient.Version version() { return request.version().orElse(client.version()); } private synchronized void setExchange(Exchange exchange) { if (this.exchange != null && exchange != this.exchange) { this.exchange.released(); } this.exchange = exchange; } private void cancelTimer() { if (timedEvent != null) { client.cancelTimer(timedEvent); } } private void requestFilters(HttpRequestImpl r) throws IOException { Log.logTrace("Applying request filters"); for (HeaderFilter filter : filters) { Log.logTrace("Applying {0}", filter); filter.request(r, this); } Log.logTrace("All filters applied"); } private HttpRequestImpl responseFilters(Response response) throws IOException { Log.logTrace("Applying response filters"); for (HeaderFilter filter : filters) { Log.logTrace("Applying {0}", filter); HttpRequestImpl newreq = filter.response(response); if (newreq != null) { Log.logTrace("New request: stopping filters"); return newreq; } } Log.logTrace("All filters applied"); return null; } // public void cancel() { // cancelled = true; // getExchange().cancel(); // } public void cancel(IOException cause) { cancelled = true; getExchange().cancel(cause); } public CompletableFuture> responseAsync() { CompletableFuture start = new MinimalFuture<>(); CompletableFuture> cf = responseAsync0(start); start.completeAsync( () -> null, executor); // trigger execution return cf; } private CompletableFuture> responseAsync0(CompletableFuture start) { return start.thenCompose( v -> responseAsyncImpl()) .thenCompose((Response r) -> { Exchange exch = getExchange(); return exch.readBodyAsync(responseHandler) .thenApply((T body) -> { this.response = new HttpResponseImpl<>(userRequest, r, this.response, body, exch); return this.response; }); }); } CompletableFuture multiResponseAsync() { CompletableFuture start = new MinimalFuture<>(); CompletableFuture> cf = responseAsync0(start); CompletableFuture> mainResponse = cf.thenApply(b -> { multiResponseSubscriber.onResponse(b); pushGroup.noMorePushes(true); return b; }); pushGroup.setMainResponse(mainResponse); CompletableFuture res = multiResponseSubscriber.completion(pushGroup.groupResult(), pushGroup.pushesCF()); start.completeAsync( () -> null, executor); // trigger execution return res; } private CompletableFuture responseAsyncImpl() { CompletableFuture cf; if (attempts.incrementAndGet() > max_attempts) { cf = failedFuture(new IOException("Too many retries", retryCause)); } else { if (currentreq.timeout().isPresent()) { timedEvent = new TimedEvent(currentreq.timeout().get()); client.registerTimer(timedEvent); } try { // 1. apply request filters requestFilters(currentreq); } catch (IOException e) { return failedFuture(e); } Exchange exch = getExchange(); // 2. get response cf = exch.responseAsync() .thenCompose((Response response) -> { HttpRequestImpl newrequest; try { // 3. apply response filters newrequest = responseFilters(response); } catch (IOException e) { return failedFuture(e); } // 4. check filter result and repeat or continue if (newrequest == null) { if (attempts.get() > 1) { Log.logError("Succeeded on attempt: " + attempts); } return completedFuture(response); } else { this.response = new HttpResponseImpl<>(currentreq, response, this.response, null, exch); Exchange oldExch = exch; return exch.ignoreBody().handle((r,t) -> { currentreq = newrequest; expiredOnce = false; setExchange(new Exchange<>(currentreq, this, acc)); return responseAsyncImpl(); }).thenCompose(Function.identity()); } }) .handle((response, ex) -> { // 5. handle errors and cancel any timer set cancelTimer(); if (ex == null) { assert response != null; return completedFuture(response); } // all exceptions thrown are handled here CompletableFuture errorCF = getExceptionalCF(ex); if (errorCF == null) { return responseAsyncImpl(); } else { return errorCF; } }) .thenCompose(Function.identity()); } return cf; } /** * Takes a Throwable and returns a suitable CompletableFuture that is * completed exceptionally, or null. */ private CompletableFuture getExceptionalCF(Throwable t) { if ((t instanceof CompletionException) || (t instanceof ExecutionException)) { if (t.getCause() != null) { t = t.getCause(); } } if (cancelled && t instanceof IOException) { t = new HttpTimeoutException("request timed out"); } else if (t instanceof ConnectionExpiredException) { // allow the retry mechanism to do its work // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? ) if (t.getCause() != null) retryCause = t.getCause(); if (!expiredOnce) { DEBUG_LOGGER.log(Level.DEBUG, "MultiExchange: ConnectionExpiredException (async): retrying...", t); expiredOnce = true; return null; } else { DEBUG_LOGGER.log(Level.DEBUG, "MultiExchange: ConnectionExpiredException (async): already retried once.", t); if (t.getCause() != null) t = t.getCause(); } } return failedFuture(t); } class TimedEvent extends TimeoutEvent { TimedEvent(Duration duration) { super(duration); } @Override public void handle() { DEBUG_LOGGER.log(Level.DEBUG, "Cancelling MultiExchange due to timeout for request %s", request); cancel(new HttpTimeoutException("request timed out")); } } }