< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java

Print this page

        

@@ -24,26 +24,27 @@
  */
 
 package jdk.incubator.http;
 
 import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.time.Duration;
 import java.util.List;
 import java.security.AccessControlContext;
-import java.security.AccessController;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
-import java.util.function.BiFunction;
 import java.util.concurrent.Executor;
-import java.util.function.UnaryOperator;
-
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
 import jdk.incubator.http.internal.common.Log;
 import jdk.incubator.http.internal.common.MinimalFuture;
-import jdk.incubator.http.internal.common.Pair;
+import jdk.incubator.http.internal.common.ConnectionExpiredException;
 import jdk.incubator.http.internal.common.Utils;
-import static jdk.incubator.http.internal.common.Pair.pair;
+import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture;
+import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
 
 /**
  * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
  * - manages filters
  * - retries due to filters.

@@ -51,22 +52,29 @@
  *
  * Creates a new Exchange for each request/response interaction
  */
 class MultiExchange<U,T> {
 
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    static final System.Logger DEBUG_LOGGER =
+            Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
+
     private final HttpRequest userRequest; // the user request
     private final HttpRequestImpl request; // a copy of the user request
     final AccessControlContext acc;
     final HttpClientImpl client;
     final HttpResponse.BodyHandler<T> responseHandler;
-    final ExecutorWrapper execWrapper;
     final Executor executor;
-    final HttpResponse.MultiProcessor<U,T> multiResponseHandler;
+    final HttpResponse.MultiSubscriber<U,T> multiResponseSubscriber;
+    final AtomicInteger attempts = new AtomicInteger();
     HttpRequestImpl currentreq; // used for async only
     Exchange<T> exchange; // the current exchange
     Exchange<T> previous;
-    int attempts;
+    volatile Throwable retryCause;
+    volatile boolean expiredOnce;
+    volatile HttpResponse<T> response = null;
+
     // Maximum number of times a request will be retried/redirected
     // for any reason
 
     static final int DEFAULT_MAX_ATTEMPTS = 5;
     static final int max_attempts = Utils.getIntegerNetProperty(

@@ -89,115 +97,80 @@
     volatile int numberOfRedirects = 0;
 
     /**
      * MultiExchange with one final response.
      */
-    MultiExchange(HttpRequest req,
+    MultiExchange(HttpRequest userRequest,
+                  HttpRequestImpl requestImpl,
                   HttpClientImpl client,
-                  HttpResponse.BodyHandler<T> responseHandler) {
+                  HttpResponse.BodyHandler<T> responseHandler,
+                  AccessControlContext acc) {
         this.previous = null;
-        this.userRequest = req;
-        this.request = new HttpRequestImpl(req);
+        this.userRequest = userRequest;
+        this.request = requestImpl;
         this.currentreq = request;
-        this.attempts = 0;
         this.client = client;
         this.filters = client.filterChain();
-        if (System.getSecurityManager() != null) {
-            this.acc = AccessController.getContext();
-        } else {
-            this.acc = null;
-        }
-        this.execWrapper = new ExecutorWrapper(client.executor(), acc);
-        this.executor = execWrapper.executor();
+        this.acc = acc;
+        this.executor = client.theExecutor();
         this.responseHandler = responseHandler;
+        if (acc != null) {
+            // Restricts the file publisher with the senders ACC, if any
+            if (responseHandler instanceof UntrustedBodyHandler)
+                ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
+        }
         this.exchange = new Exchange<>(request, this);
-        this.multiResponseHandler = null;
+        this.multiResponseSubscriber = null;
         this.pushGroup = null;
     }
 
     /**
      * MultiExchange with multiple responses (HTTP/2 server pushes).
      */
-    MultiExchange(HttpRequest req,
+    MultiExchange(HttpRequest userRequest,
+                  HttpRequestImpl requestImpl,
                   HttpClientImpl client,
-                  HttpResponse.MultiProcessor<U, T> multiResponseHandler) {
+                  HttpResponse.MultiSubscriber<U, T> multiResponseSubscriber,
+                  AccessControlContext acc) {
         this.previous = null;
-        this.userRequest = req;
-        this.request = new HttpRequestImpl(req);
+        this.userRequest = userRequest;
+        this.request = requestImpl;
         this.currentreq = request;
-        this.attempts = 0;
         this.client = client;
         this.filters = client.filterChain();
-        if (System.getSecurityManager() != null) {
-            this.acc = AccessController.getContext();
-        } else {
-            this.acc = null;
-        }
-        this.execWrapper = new ExecutorWrapper(client.executor(), acc);
-        this.executor = execWrapper.executor();
-        this.multiResponseHandler = multiResponseHandler;
-        this.pushGroup = new PushGroup<>(multiResponseHandler, request);
+        this.acc = acc;
+        this.executor = client.theExecutor();
+        this.multiResponseSubscriber = multiResponseSubscriber;
+        this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc);
         this.exchange = new Exchange<>(request, this);
         this.responseHandler = pushGroup.mainResponseHandler();
     }
 
-    public HttpResponseImpl<T> response() throws IOException, InterruptedException {
-        HttpRequestImpl r = request;
-        if (r.duration() != null) {
-            timedEvent = new TimedEvent(r.duration());
-            client.registerTimer(timedEvent);
-        }
-        while (attempts < max_attempts) {
-            try {
-                attempts++;
-                Exchange<T> currExchange = getExchange();
-                requestFilters(r);
-                Response response = currExchange.response();
-                HttpRequestImpl newreq = responseFilters(response);
-                if (newreq == null) {
-                    if (attempts > 1) {
-                        Log.logError("Succeeded on attempt: " + attempts);
-                    }
-                    T body = currExchange.readBody(responseHandler);
-                    cancelTimer();
-                    return new HttpResponseImpl<>(userRequest, response, body, currExchange);
-                }
-                //response.body(HttpResponse.ignoreBody());
-                setExchange(new Exchange<>(newreq, this, acc));
-                r = newreq;
-            } catch (IOException e) {
-                if (cancelled) {
-                    throw new HttpTimeoutException("Request timed out");
-                }
-                throw e;
-            }
-        }
-        cancelTimer();
-        throw new IOException("Retry limit exceeded");
-    }
-
-    CompletableFuture<Void> multiCompletionCF() {
-        return pushGroup.groupResult();
-    }
+//    CompletableFuture<Void> multiCompletionCF() {
+//        return pushGroup.groupResult();
+//    }
 
     private synchronized Exchange<T> getExchange() {
         return exchange;
     }
 
     HttpClientImpl client() {
         return client;
     }
 
-    HttpClient.Redirect followRedirects() {
-        return client.followRedirects();
-    }
+//    HttpClient.Redirect followRedirects() {
+//        return client.followRedirects();
+//    }
 
     HttpClient.Version version() {
         return request.version().orElse(client.version());
     }
 
     private synchronized void setExchange(Exchange<T> exchange) {
+        if (this.exchange != null && exchange != this.exchange) {
+            this.exchange.released();
+        }
         this.exchange = exchange;
     }
 
     private void cancelTimer() {
         if (timedEvent != null) {

@@ -227,136 +200,158 @@
         }
         Log.logTrace("All filters applied");
         return null;
     }
 
-    public void cancel() {
-        cancelled = true;
-        getExchange().cancel();
-    }
+//    public void cancel() {
+//        cancelled = true;
+//        getExchange().cancel();
+//    }
 
     public void cancel(IOException cause) {
         cancelled = true;
         getExchange().cancel(cause);
     }
 
-    public CompletableFuture<HttpResponseImpl<T>> responseAsync() {
+    public CompletableFuture<HttpResponse<T>> responseAsync() {
         CompletableFuture<Void> start = new MinimalFuture<>();
-        CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
+        CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
         start.completeAsync( () -> null, executor); // trigger execution
         return cf;
     }
 
-    private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) {
+    private CompletableFuture<HttpResponse<T>>
+    responseAsync0(CompletableFuture<Void> start) {
         return start.thenCompose( v -> responseAsyncImpl())
             .thenCompose((Response r) -> {
                 Exchange<T> exch = getExchange();
                 return exch.readBodyAsync(responseHandler)
-                        .thenApply((T body) ->  new HttpResponseImpl<>(userRequest, r, body, exch));
+                            .thenApply((T body) -> {
+                                this.response =
+                                    new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
+                                return this.response;
+                            });
             });
     }
 
     CompletableFuture<U> multiResponseAsync() {
         CompletableFuture<Void> start = new MinimalFuture<>();
-        CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
+        CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
         CompletableFuture<HttpResponse<T>> mainResponse =
-                cf.thenApply((HttpResponseImpl<T> b) -> {
-                      multiResponseHandler.onResponse(b);
-                      return (HttpResponse<T>)b;
-                   });
-
-        pushGroup.setMainResponse(mainResponse);
-        // set up house-keeping related to multi-response
-        mainResponse.thenAccept((r) -> {
-            // All push promises received by now.
+                cf.thenApply(b -> {
+                        multiResponseSubscriber.onResponse(b);
             pushGroup.noMorePushes(true);
-        });
-        CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
+                        return b; });
+        pushGroup.setMainResponse(mainResponse);
+        CompletableFuture<U> res = multiResponseSubscriber.completion(pushGroup.groupResult(),
+                                                                      pushGroup.pushesCF());
         start.completeAsync( () -> null, executor); // trigger execution
         return res;
     }
 
     private CompletableFuture<Response> responseAsyncImpl() {
         CompletableFuture<Response> cf;
-        if (++attempts > max_attempts) {
-            cf = MinimalFuture.failedFuture(new IOException("Too many retries"));
+        if (attempts.incrementAndGet() > max_attempts) {
+            cf = failedFuture(new IOException("Too many retries", retryCause));
         } else {
-            if (currentreq.duration() != null) {
-                timedEvent = new TimedEvent(currentreq.duration());
+            if (currentreq.timeout().isPresent()) {
+                timedEvent = new TimedEvent(currentreq.timeout().get());
                 client.registerTimer(timedEvent);
             }
             try {
-                // 1. Apply request filters
+                // 1. apply request filters
                 requestFilters(currentreq);
             } catch (IOException e) {
-                return MinimalFuture.failedFuture(e);
+                return failedFuture(e);
             }
             Exchange<T> exch = getExchange();
             // 2. get response
             cf = exch.responseAsync()
                 .thenCompose((Response response) -> {
-                    HttpRequestImpl newrequest = null;
+                        HttpRequestImpl newrequest;
                     try {
-                        // 3. Apply response filters
+                            // 3. apply response filters
                         newrequest = responseFilters(response);
                     } catch (IOException e) {
-                        return MinimalFuture.failedFuture(e);
+                            return failedFuture(e);
                     }
-                    // 4. Check filter result and repeat or continue
+                        // 4. check filter result and repeat or continue
                     if (newrequest == null) {
-                        if (attempts > 1) {
+                            if (attempts.get() > 1) {
                             Log.logError("Succeeded on attempt: " + attempts);
                         }
-                        return MinimalFuture.completedFuture(response);
+                            return completedFuture(response);
                     } else {
+                            this.response =
+                                new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
+                            Exchange<T> oldExch = exch;
+                            return exch.ignoreBody().handle((r,t) -> {
                         currentreq = newrequest;
+                                expiredOnce = false;
                         setExchange(new Exchange<>(currentreq, this, acc));
-                        //reads body off previous, and then waits for next response
                         return responseAsyncImpl();
-                    }
-                })
-            // 5. Handle errors and cancel any timer set
+                            }).thenCompose(Function.identity());
+                        } })
             .handle((response, ex) -> {
+                        // 5. handle errors and cancel any timer set
                 cancelTimer();
                 if (ex == null) {
                     assert response != null;
-                    return MinimalFuture.completedFuture(response);
+                            return completedFuture(response);
                 }
                 // all exceptions thrown are handled here
-                CompletableFuture<Response> error = getExceptionalCF(ex);
-                if (error == null) {
+                        CompletableFuture<Response> errorCF = getExceptionalCF(ex);
+                        if (errorCF == null) {
                     return responseAsyncImpl();
                 } else {
-                    return error;
-                }
-            })
-            .thenCompose(UnaryOperator.identity());
+                            return errorCF;
+                        } })
+                     .thenCompose(Function.identity());
         }
         return cf;
     }
 
     /**
-     * Take a Throwable and return a suitable CompletableFuture that is
-     * completed exceptionally.
+     * Takes a Throwable and returns a suitable CompletableFuture that is
+     * completed exceptionally, or null.
      */
     private CompletableFuture<Response> getExceptionalCF(Throwable t) {
         if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
             if (t.getCause() != null) {
                 t = t.getCause();
             }
         }
         if (cancelled && t instanceof IOException) {
             t = new HttpTimeoutException("request timed out");
+        } else if (t instanceof ConnectionExpiredException) {
+            // allow the retry mechanism to do its work
+            // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? )
+            if (t.getCause() != null) retryCause = t.getCause();
+            if (!expiredOnce) {
+                DEBUG_LOGGER.log(Level.DEBUG,
+                    "MultiExchange: ConnectionExpiredException (async): retrying...",
+                    t);
+                expiredOnce = true;
+                return null;
+            } else {
+                DEBUG_LOGGER.log(Level.DEBUG,
+                    "MultiExchange: ConnectionExpiredException (async): already retried once.",
+                    t);
+                if (t.getCause() != null) t = t.getCause();
+            }
         }
-        return MinimalFuture.failedFuture(t);
+        return failedFuture(t);
     }
 
     class TimedEvent extends TimeoutEvent {
         TimedEvent(Duration duration) {
             super(duration);
         }
         @Override
         public void handle() {
+            DEBUG_LOGGER.log(Level.DEBUG,
+                    "Cancelling MultiExchange due to timeout for request %s",
+                     request);
             cancel(new HttpTimeoutException("request timed out"));
         }
     }
 }
< prev index next >