--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java 2017-11-30 04:04:04.534537960 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java 2017-11-30 04:04:04.331520213 -0800 @@ -26,22 +26,23 @@ package jdk.incubator.http; import java.io.IOException; +import java.lang.System.Logger.Level; import java.time.Duration; import java.util.List; import java.security.AccessControlContext; -import java.security.AccessController; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; -import java.util.function.BiFunction; import java.util.concurrent.Executor; -import java.util.function.UnaryOperator; - +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import jdk.incubator.http.HttpResponse.UntrustedBodyHandler; import jdk.incubator.http.internal.common.Log; import jdk.incubator.http.internal.common.MinimalFuture; -import jdk.incubator.http.internal.common.Pair; +import jdk.incubator.http.internal.common.ConnectionExpiredException; import jdk.incubator.http.internal.common.Utils; -import static jdk.incubator.http.internal.common.Pair.pair; +import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture; +import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture; /** * Encapsulates multiple Exchanges belonging to one HttpRequestImpl. @@ -53,18 +54,25 @@ */ class MultiExchange { + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + static final System.Logger DEBUG_LOGGER = + Utils.getDebugLogger("MultiExchange"::toString, DEBUG); + private final HttpRequest userRequest; // the user request private final HttpRequestImpl request; // a copy of the user request final AccessControlContext acc; final HttpClientImpl client; final HttpResponse.BodyHandler responseHandler; - final ExecutorWrapper execWrapper; final Executor executor; - final HttpResponse.MultiProcessor multiResponseHandler; + final HttpResponse.MultiSubscriber multiResponseSubscriber; + final AtomicInteger attempts = new AtomicInteger(); HttpRequestImpl currentreq; // used for async only Exchange exchange; // the current exchange Exchange previous; - int attempts; + volatile Throwable retryCause; + volatile boolean expiredOnce; + volatile HttpResponse response = null; + // Maximum number of times a request will be retried/redirected // for any reason @@ -91,93 +99,55 @@ /** * MultiExchange with one final response. */ - MultiExchange(HttpRequest req, + MultiExchange(HttpRequest userRequest, + HttpRequestImpl requestImpl, HttpClientImpl client, - HttpResponse.BodyHandler responseHandler) { + HttpResponse.BodyHandler responseHandler, + AccessControlContext acc) { this.previous = null; - this.userRequest = req; - this.request = new HttpRequestImpl(req); + this.userRequest = userRequest; + this.request = requestImpl; this.currentreq = request; - this.attempts = 0; this.client = client; this.filters = client.filterChain(); - if (System.getSecurityManager() != null) { - this.acc = AccessController.getContext(); - } else { - this.acc = null; - } - this.execWrapper = new ExecutorWrapper(client.executor(), acc); - this.executor = execWrapper.executor(); + this.acc = acc; + this.executor = client.theExecutor(); this.responseHandler = responseHandler; + if (acc != null) { + // Restricts the file publisher with the senders ACC, if any + if (responseHandler instanceof UntrustedBodyHandler) + ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc); + } this.exchange = new Exchange<>(request, this); - this.multiResponseHandler = null; + this.multiResponseSubscriber = null; this.pushGroup = null; } /** * MultiExchange with multiple responses (HTTP/2 server pushes). */ - MultiExchange(HttpRequest req, + MultiExchange(HttpRequest userRequest, + HttpRequestImpl requestImpl, HttpClientImpl client, - HttpResponse.MultiProcessor multiResponseHandler) { + HttpResponse.MultiSubscriber multiResponseSubscriber, + AccessControlContext acc) { this.previous = null; - this.userRequest = req; - this.request = new HttpRequestImpl(req); + this.userRequest = userRequest; + this.request = requestImpl; this.currentreq = request; - this.attempts = 0; this.client = client; this.filters = client.filterChain(); - if (System.getSecurityManager() != null) { - this.acc = AccessController.getContext(); - } else { - this.acc = null; - } - this.execWrapper = new ExecutorWrapper(client.executor(), acc); - this.executor = execWrapper.executor(); - this.multiResponseHandler = multiResponseHandler; - this.pushGroup = new PushGroup<>(multiResponseHandler, request); + this.acc = acc; + this.executor = client.theExecutor(); + this.multiResponseSubscriber = multiResponseSubscriber; + this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc); this.exchange = new Exchange<>(request, this); this.responseHandler = pushGroup.mainResponseHandler(); } - public HttpResponseImpl response() throws IOException, InterruptedException { - HttpRequestImpl r = request; - if (r.duration() != null) { - timedEvent = new TimedEvent(r.duration()); - client.registerTimer(timedEvent); - } - while (attempts < max_attempts) { - try { - attempts++; - Exchange currExchange = getExchange(); - requestFilters(r); - Response response = currExchange.response(); - HttpRequestImpl newreq = responseFilters(response); - if (newreq == null) { - if (attempts > 1) { - Log.logError("Succeeded on attempt: " + attempts); - } - T body = currExchange.readBody(responseHandler); - cancelTimer(); - return new HttpResponseImpl<>(userRequest, response, body, currExchange); - } - //response.body(HttpResponse.ignoreBody()); - setExchange(new Exchange<>(newreq, this, acc)); - r = newreq; - } catch (IOException e) { - if (cancelled) { - throw new HttpTimeoutException("Request timed out"); - } - throw e; - } - } - cancelTimer(); - throw new IOException("Retry limit exceeded"); - } - - CompletableFuture multiCompletionCF() { - return pushGroup.groupResult(); - } +// CompletableFuture multiCompletionCF() { +// return pushGroup.groupResult(); +// } private synchronized Exchange getExchange() { return exchange; @@ -187,15 +157,18 @@ return client; } - HttpClient.Redirect followRedirects() { - return client.followRedirects(); - } +// HttpClient.Redirect followRedirects() { +// return client.followRedirects(); +// } HttpClient.Version version() { return request.version().orElse(client.version()); } private synchronized void setExchange(Exchange exchange) { + if (this.exchange != null && exchange != this.exchange) { + this.exchange.released(); + } this.exchange = exchange; } @@ -229,114 +202,117 @@ return null; } - public void cancel() { - cancelled = true; - getExchange().cancel(); - } +// public void cancel() { +// cancelled = true; +// getExchange().cancel(); +// } public void cancel(IOException cause) { cancelled = true; getExchange().cancel(cause); } - public CompletableFuture> responseAsync() { + public CompletableFuture> responseAsync() { CompletableFuture start = new MinimalFuture<>(); - CompletableFuture> cf = responseAsync0(start); + CompletableFuture> cf = responseAsync0(start); start.completeAsync( () -> null, executor); // trigger execution return cf; } - private CompletableFuture> responseAsync0(CompletableFuture start) { + private CompletableFuture> + responseAsync0(CompletableFuture start) { return start.thenCompose( v -> responseAsyncImpl()) - .thenCompose((Response r) -> { - Exchange exch = getExchange(); - return exch.readBodyAsync(responseHandler) - .thenApply((T body) -> new HttpResponseImpl<>(userRequest, r, body, exch)); - }); + .thenCompose((Response r) -> { + Exchange exch = getExchange(); + return exch.readBodyAsync(responseHandler) + .thenApply((T body) -> { + this.response = + new HttpResponseImpl<>(userRequest, r, this.response, body, exch); + return this.response; + }); + }); } CompletableFuture multiResponseAsync() { CompletableFuture start = new MinimalFuture<>(); - CompletableFuture> cf = responseAsync0(start); + CompletableFuture> cf = responseAsync0(start); CompletableFuture> mainResponse = - cf.thenApply((HttpResponseImpl b) -> { - multiResponseHandler.onResponse(b); - return (HttpResponse)b; - }); - + cf.thenApply(b -> { + multiResponseSubscriber.onResponse(b); + pushGroup.noMorePushes(true); + return b; }); pushGroup.setMainResponse(mainResponse); - // set up house-keeping related to multi-response - mainResponse.thenAccept((r) -> { - // All push promises received by now. - pushGroup.noMorePushes(true); - }); - CompletableFuture res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF()); + CompletableFuture res = multiResponseSubscriber.completion(pushGroup.groupResult(), + pushGroup.pushesCF()); start.completeAsync( () -> null, executor); // trigger execution return res; } private CompletableFuture responseAsyncImpl() { CompletableFuture cf; - if (++attempts > max_attempts) { - cf = MinimalFuture.failedFuture(new IOException("Too many retries")); + if (attempts.incrementAndGet() > max_attempts) { + cf = failedFuture(new IOException("Too many retries", retryCause)); } else { - if (currentreq.duration() != null) { - timedEvent = new TimedEvent(currentreq.duration()); + if (currentreq.timeout().isPresent()) { + timedEvent = new TimedEvent(currentreq.timeout().get()); client.registerTimer(timedEvent); } try { - // 1. Apply request filters + // 1. apply request filters requestFilters(currentreq); } catch (IOException e) { - return MinimalFuture.failedFuture(e); + return failedFuture(e); } Exchange exch = getExchange(); // 2. get response cf = exch.responseAsync() - .thenCompose((Response response) -> { - HttpRequestImpl newrequest = null; - try { - // 3. Apply response filters - newrequest = responseFilters(response); - } catch (IOException e) { - return MinimalFuture.failedFuture(e); - } - // 4. Check filter result and repeat or continue - if (newrequest == null) { - if (attempts > 1) { - Log.logError("Succeeded on attempt: " + attempts); + .thenCompose((Response response) -> { + HttpRequestImpl newrequest; + try { + // 3. apply response filters + newrequest = responseFilters(response); + } catch (IOException e) { + return failedFuture(e); } - return MinimalFuture.completedFuture(response); - } else { - currentreq = newrequest; - setExchange(new Exchange<>(currentreq, this, acc)); - //reads body off previous, and then waits for next response - return responseAsyncImpl(); - } - }) - // 5. Handle errors and cancel any timer set - .handle((response, ex) -> { - cancelTimer(); - if (ex == null) { - assert response != null; - return MinimalFuture.completedFuture(response); - } - // all exceptions thrown are handled here - CompletableFuture error = getExceptionalCF(ex); - if (error == null) { - return responseAsyncImpl(); - } else { - return error; - } - }) - .thenCompose(UnaryOperator.identity()); + // 4. check filter result and repeat or continue + if (newrequest == null) { + if (attempts.get() > 1) { + Log.logError("Succeeded on attempt: " + attempts); + } + return completedFuture(response); + } else { + this.response = + new HttpResponseImpl<>(currentreq, response, this.response, null, exch); + Exchange oldExch = exch; + return exch.ignoreBody().handle((r,t) -> { + currentreq = newrequest; + expiredOnce = false; + setExchange(new Exchange<>(currentreq, this, acc)); + return responseAsyncImpl(); + }).thenCompose(Function.identity()); + } }) + .handle((response, ex) -> { + // 5. handle errors and cancel any timer set + cancelTimer(); + if (ex == null) { + assert response != null; + return completedFuture(response); + } + // all exceptions thrown are handled here + CompletableFuture errorCF = getExceptionalCF(ex); + if (errorCF == null) { + return responseAsyncImpl(); + } else { + return errorCF; + } }) + .thenCompose(Function.identity()); } return cf; } /** - * Take a Throwable and return a suitable CompletableFuture that is - * completed exceptionally. + * Takes a Throwable and returns a suitable CompletableFuture that is + * completed exceptionally, or null. */ private CompletableFuture getExceptionalCF(Throwable t) { if ((t instanceof CompletionException) || (t instanceof ExecutionException)) { @@ -346,8 +322,24 @@ } if (cancelled && t instanceof IOException) { t = new HttpTimeoutException("request timed out"); + } else if (t instanceof ConnectionExpiredException) { + // allow the retry mechanism to do its work + // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? ) + if (t.getCause() != null) retryCause = t.getCause(); + if (!expiredOnce) { + DEBUG_LOGGER.log(Level.DEBUG, + "MultiExchange: ConnectionExpiredException (async): retrying...", + t); + expiredOnce = true; + return null; + } else { + DEBUG_LOGGER.log(Level.DEBUG, + "MultiExchange: ConnectionExpiredException (async): already retried once.", + t); + if (t.getCause() != null) t = t.getCause(); + } } - return MinimalFuture.failedFuture(t); + return failedFuture(t); } class TimedEvent extends TimeoutEvent { @@ -356,6 +348,9 @@ } @Override public void handle() { + DEBUG_LOGGER.log(Level.DEBUG, + "Cancelling MultiExchange due to timeout for request %s", + request); cancel(new HttpTimeoutException("request timed out")); } }