1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.time.Duration;
  30 import java.util.List;
  31 import java.security.AccessControlContext;
  32 import java.security.AccessController;
  33 import java.util.concurrent.CompletableFuture;
  34 import java.util.concurrent.CompletionException;
  35 import java.util.concurrent.ExecutionException;
  36 import java.util.function.BiFunction;
  37 import java.util.concurrent.Executor;
  38 import java.util.function.UnaryOperator;
  39 
  40 import jdk.incubator.http.internal.common.Log;
  41 import jdk.incubator.http.internal.common.MinimalFuture;
  42 import jdk.incubator.http.internal.common.Pair;
  43 import jdk.incubator.http.internal.common.Utils;
  44 import static jdk.incubator.http.internal.common.Pair.pair;
  45 
  46 /**
  47  * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
  48  * - manages filters
  49  * - retries due to filters.
  50  * - I/O errors and most other exceptions get returned directly to user
  51  *
  52  * Creates a new Exchange for each request/response interaction
  53  */
  54 class MultiExchange<U,T> {
  55 
  56     private final HttpRequest userRequest; // the user request
  57     private final HttpRequestImpl request; // a copy of the user request
  58     final AccessControlContext acc;
  59     final HttpClientImpl client;
  60     final HttpResponse.BodyHandler<T> responseHandler;
  61     final ExecutorWrapper execWrapper;
  62     final Executor executor;
  63     final HttpResponse.MultiProcessor<U,T> multiResponseHandler;
  64     HttpRequestImpl currentreq; // used for async only
  65     Exchange<T> exchange; // the current exchange
  66     Exchange<T> previous;
  67     int attempts;
  68     // Maximum number of times a request will be retried/redirected
  69     // for any reason
  70 
  71     static final int DEFAULT_MAX_ATTEMPTS = 5;
  72     static final int max_attempts = Utils.getIntegerNetProperty(
  73             "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
  74     );
  75 
  76     private final List<HeaderFilter> filters;
  77     TimedEvent timedEvent;
  78     volatile boolean cancelled;
  79     final PushGroup<U,T> pushGroup;
  80 
  81     /**
  82      * Filter fields. These are attached as required by filters
  83      * and only used by the filter implementations. This could be
  84      * generalised into Objects that are passed explicitly to the filters
  85      * (one per MultiExchange object, and one per Exchange object possibly)
  86      */
  87     volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
  88     // RedirectHandler
  89     volatile int numberOfRedirects = 0;
  90 
  91     /**
  92      * MultiExchange with one final response.
  93      */
  94     MultiExchange(HttpRequest req,
  95                   HttpClientImpl client,
  96                   HttpResponse.BodyHandler<T> responseHandler) {
  97         this.previous = null;
  98         this.userRequest = req;
  99         this.request = new HttpRequestImpl(req);
 100         this.currentreq = request;
 101         this.attempts = 0;
 102         this.client = client;
 103         this.filters = client.filterChain();
 104         if (System.getSecurityManager() != null) {
 105             this.acc = AccessController.getContext();
 106         } else {
 107             this.acc = null;
 108         }
 109         this.execWrapper = new ExecutorWrapper(client.executor(), acc);
 110         this.executor = execWrapper.executor();
 111         this.responseHandler = responseHandler;
 112         this.exchange = new Exchange<>(request, this);
 113         this.multiResponseHandler = null;
 114         this.pushGroup = null;
 115     }
 116 
 117     /**
 118      * MultiExchange with multiple responses (HTTP/2 server pushes).
 119      */
 120     MultiExchange(HttpRequest req,
 121                   HttpClientImpl client,
 122                   HttpResponse.MultiProcessor<U, T> multiResponseHandler) {
 123         this.previous = null;
 124         this.userRequest = req;
 125         this.request = new HttpRequestImpl(req);
 126         this.currentreq = request;
 127         this.attempts = 0;
 128         this.client = client;
 129         this.filters = client.filterChain();
 130         if (System.getSecurityManager() != null) {
 131             this.acc = AccessController.getContext();
 132         } else {
 133             this.acc = null;
 134         }
 135         this.execWrapper = new ExecutorWrapper(client.executor(), acc);
 136         this.executor = execWrapper.executor();
 137         this.multiResponseHandler = multiResponseHandler;
 138         this.pushGroup = new PushGroup<>(multiResponseHandler, request);
 139         this.exchange = new Exchange<>(request, this);
 140         this.responseHandler = pushGroup.mainResponseHandler();
 141     }
 142 
 143     public HttpResponseImpl<T> response() throws IOException, InterruptedException {
 144         HttpRequestImpl r = request;
 145         if (r.duration() != null) {
 146             timedEvent = new TimedEvent(r.duration());
 147             client.registerTimer(timedEvent);
 148         }
 149         while (attempts < max_attempts) {
 150             try {
 151                 attempts++;
 152                 Exchange<T> currExchange = getExchange();
 153                 requestFilters(r);
 154                 Response response = currExchange.response();
 155                 HttpRequestImpl newreq = responseFilters(response);
 156                 if (newreq == null) {
 157                     if (attempts > 1) {
 158                         Log.logError("Succeeded on attempt: " + attempts);
 159                     }
 160                     T body = currExchange.readBody(responseHandler);
 161                     cancelTimer();
 162                     return new HttpResponseImpl<>(userRequest, response, body, currExchange);
 163                 }
 164                 //response.body(HttpResponse.ignoreBody());
 165                 setExchange(new Exchange<>(newreq, this, acc));
 166                 r = newreq;
 167             } catch (IOException e) {
 168                 if (cancelled) {
 169                     throw new HttpTimeoutException("Request timed out");
 170                 }
 171                 throw e;
 172             }
 173         }
 174         cancelTimer();
 175         throw new IOException("Retry limit exceeded");
 176     }
 177 
 178     CompletableFuture<Void> multiCompletionCF() {
 179         return pushGroup.groupResult();
 180     }
 181 
 182     private synchronized Exchange<T> getExchange() {
 183         return exchange;
 184     }
 185 
 186     HttpClientImpl client() {
 187         return client;
 188     }
 189 
 190     HttpClient.Redirect followRedirects() {
 191         return client.followRedirects();
 192     }
 193 
 194     HttpClient.Version version() {
 195         return request.version().orElse(client.version());
 196     }
 197 
 198     private synchronized void setExchange(Exchange<T> exchange) {
 199         this.exchange = exchange;
 200     }
 201 
 202     private void cancelTimer() {
 203         if (timedEvent != null) {
 204             client.cancelTimer(timedEvent);
 205         }
 206     }
 207 
 208     private void requestFilters(HttpRequestImpl r) throws IOException {
 209         Log.logTrace("Applying request filters");
 210         for (HeaderFilter filter : filters) {
 211             Log.logTrace("Applying {0}", filter);
 212             filter.request(r, this);
 213         }
 214         Log.logTrace("All filters applied");
 215     }
 216 
 217     private HttpRequestImpl responseFilters(Response response) throws IOException
 218     {
 219         Log.logTrace("Applying response filters");
 220         for (HeaderFilter filter : filters) {
 221             Log.logTrace("Applying {0}", filter);
 222             HttpRequestImpl newreq = filter.response(response);
 223             if (newreq != null) {
 224                 Log.logTrace("New request: stopping filters");
 225                 return newreq;
 226             }
 227         }
 228         Log.logTrace("All filters applied");
 229         return null;
 230     }
 231 
 232     public void cancel() {
 233         cancelled = true;
 234         getExchange().cancel();
 235     }
 236 
 237     public void cancel(IOException cause) {
 238         cancelled = true;
 239         getExchange().cancel(cause);
 240     }
 241 
 242     public CompletableFuture<HttpResponseImpl<T>> responseAsync() {
 243         CompletableFuture<Void> start = new MinimalFuture<>();
 244         CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
 245         start.completeAsync( () -> null, executor); // trigger execution
 246         return cf;
 247     }
 248 
 249     private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) {
 250         return start.thenCompose( v -> responseAsyncImpl())
 251             .thenCompose((Response r) -> {
 252                 Exchange<T> exch = getExchange();
 253                 return exch.readBodyAsync(responseHandler)
 254                         .thenApply((T body) ->  new HttpResponseImpl<>(userRequest, r, body, exch));
 255             });
 256     }
 257 
 258     CompletableFuture<U> multiResponseAsync() {
 259         CompletableFuture<Void> start = new MinimalFuture<>();
 260         CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
 261         CompletableFuture<HttpResponse<T>> mainResponse =
 262                 cf.thenApply((HttpResponseImpl<T> b) -> {
 263                       multiResponseHandler.onResponse(b);
 264                       return (HttpResponse<T>)b;
 265                    });
 266 
 267         pushGroup.setMainResponse(mainResponse);
 268         // set up house-keeping related to multi-response
 269         mainResponse.thenAccept((r) -> {
 270             // All push promises received by now.
 271             pushGroup.noMorePushes(true);
 272         });
 273         CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
 274         start.completeAsync( () -> null, executor); // trigger execution
 275         return res;
 276     }
 277 
 278     private CompletableFuture<Response> responseAsyncImpl() {
 279         CompletableFuture<Response> cf;
 280         if (++attempts > max_attempts) {
 281             cf = MinimalFuture.failedFuture(new IOException("Too many retries"));
 282         } else {
 283             if (currentreq.duration() != null) {
 284                 timedEvent = new TimedEvent(currentreq.duration());
 285                 client.registerTimer(timedEvent);
 286             }
 287             try {
 288                 // 1. Apply request filters
 289                 requestFilters(currentreq);
 290             } catch (IOException e) {
 291                 return MinimalFuture.failedFuture(e);
 292             }
 293             Exchange<T> exch = getExchange();
 294             // 2. get response
 295             cf = exch.responseAsync()
 296                 .thenCompose((Response response) -> {
 297                     HttpRequestImpl newrequest = null;
 298                     try {
 299                         // 3. Apply response filters
 300                         newrequest = responseFilters(response);
 301                     } catch (IOException e) {
 302                         return MinimalFuture.failedFuture(e);
 303                     }
 304                     // 4. Check filter result and repeat or continue
 305                     if (newrequest == null) {
 306                         if (attempts > 1) {
 307                             Log.logError("Succeeded on attempt: " + attempts);
 308                         }
 309                         return MinimalFuture.completedFuture(response);
 310                     } else {
 311                         currentreq = newrequest;
 312                         setExchange(new Exchange<>(currentreq, this, acc));
 313                         //reads body off previous, and then waits for next response
 314                         return responseAsyncImpl();
 315                     }
 316                 })
 317             // 5. Handle errors and cancel any timer set
 318             .handle((response, ex) -> {
 319                 cancelTimer();
 320                 if (ex == null) {
 321                     assert response != null;
 322                     return MinimalFuture.completedFuture(response);
 323                 }
 324                 // all exceptions thrown are handled here
 325                 CompletableFuture<Response> error = getExceptionalCF(ex);
 326                 if (error == null) {
 327                     return responseAsyncImpl();
 328                 } else {
 329                     return error;
 330                 }
 331             })
 332             .thenCompose(UnaryOperator.identity());
 333         }
 334         return cf;
 335     }
 336 
 337     /**
 338      * Take a Throwable and return a suitable CompletableFuture that is
 339      * completed exceptionally.
 340      */
 341     private CompletableFuture<Response> getExceptionalCF(Throwable t) {
 342         if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
 343             if (t.getCause() != null) {
 344                 t = t.getCause();
 345             }
 346         }
 347         if (cancelled && t instanceof IOException) {
 348             t = new HttpTimeoutException("request timed out");
 349         }
 350         return MinimalFuture.failedFuture(t);
 351     }
 352 
 353     class TimedEvent extends TimeoutEvent {
 354         TimedEvent(Duration duration) {
 355             super(duration);
 356         }
 357         @Override
 358         public void handle() {
 359             cancel(new HttpTimeoutException("request timed out"));
 360         }
 361     }
 362 }