1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.time.Duration;
  31 import java.util.List;
  32 import java.security.AccessControlContext;
  33 import java.util.concurrent.CompletableFuture;
  34 import java.util.concurrent.CompletionException;
  35 import java.util.concurrent.ExecutionException;
  36 import java.util.concurrent.Executor;
  37 import java.util.concurrent.atomic.AtomicInteger;
  38 import java.util.function.Function;
  39 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
  40 import jdk.incubator.http.internal.common.Log;
  41 import jdk.incubator.http.internal.common.MinimalFuture;
  42 import jdk.incubator.http.internal.common.ConnectionExpiredException;
  43 import jdk.incubator.http.internal.common.Utils;
  44 import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture;
  45 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
  46 
  47 /**
  48  * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
  49  * - manages filters
  50  * - retries due to filters.
  51  * - I/O errors and most other exceptions get returned directly to user
  52  *
  53  * Creates a new Exchange for each request/response interaction
  54  */
  55 class MultiExchange<U,T> {
  56 
  57     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  58     static final System.Logger DEBUG_LOGGER =
  59             Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
  60 
  61     private final HttpRequest userRequest; // the user request
  62     private final HttpRequestImpl request; // a copy of the user request
  63     final AccessControlContext acc;
  64     final HttpClientImpl client;
  65     final HttpResponse.BodyHandler<T> responseHandler;
  66     final Executor executor;
  67     final HttpResponse.MultiSubscriber<U,T> multiResponseSubscriber;
  68     final AtomicInteger attempts = new AtomicInteger();
  69     HttpRequestImpl currentreq; // used for async only
  70     Exchange<T> exchange; // the current exchange
  71     Exchange<T> previous;
  72     volatile Throwable retryCause;
  73     volatile boolean expiredOnce;
  74     volatile HttpResponse<T> response = null;
  75 
  76     // Maximum number of times a request will be retried/redirected
  77     // for any reason
  78 
  79     static final int DEFAULT_MAX_ATTEMPTS = 5;
  80     static final int max_attempts = Utils.getIntegerNetProperty(
  81             "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
  82     );
  83 
  84     private final List<HeaderFilter> filters;
  85     TimedEvent timedEvent;
  86     volatile boolean cancelled;
  87     final PushGroup<U,T> pushGroup;
  88 
  89     /**
  90      * Filter fields. These are attached as required by filters
  91      * and only used by the filter implementations. This could be
  92      * generalised into Objects that are passed explicitly to the filters
  93      * (one per MultiExchange object, and one per Exchange object possibly)
  94      */
  95     volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
  96     // RedirectHandler
  97     volatile int numberOfRedirects = 0;
  98 
  99     /**
 100      * MultiExchange with one final response.
 101      */
 102     MultiExchange(HttpRequest userRequest,
 103                   HttpRequestImpl requestImpl,
 104                   HttpClientImpl client,
 105                   HttpResponse.BodyHandler<T> responseHandler,
 106                   AccessControlContext acc) {
 107         this.previous = null;
 108         this.userRequest = userRequest;
 109         this.request = requestImpl;
 110         this.currentreq = request;
 111         this.client = client;
 112         this.filters = client.filterChain();
 113         this.acc = acc;
 114         this.executor = client.theExecutor();
 115         this.responseHandler = responseHandler;
 116         if (acc != null) {
 117             // Restricts the file publisher with the senders ACC, if any
 118             if (responseHandler instanceof UntrustedBodyHandler)
 119                 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
 120         }
 121         this.exchange = new Exchange<>(request, this);
 122         this.multiResponseSubscriber = null;
 123         this.pushGroup = null;
 124     }
 125 
 126     /**
 127      * MultiExchange with multiple responses (HTTP/2 server pushes).
 128      */
 129     MultiExchange(HttpRequest userRequest,
 130                   HttpRequestImpl requestImpl,
 131                   HttpClientImpl client,
 132                   HttpResponse.MultiSubscriber<U, T> multiResponseSubscriber,
 133                   AccessControlContext acc) {
 134         this.previous = null;
 135         this.userRequest = userRequest;
 136         this.request = requestImpl;
 137         this.currentreq = request;
 138         this.client = client;
 139         this.filters = client.filterChain();
 140         this.acc = acc;
 141         this.executor = client.theExecutor();
 142         this.multiResponseSubscriber = multiResponseSubscriber;
 143         this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc);
 144         this.exchange = new Exchange<>(request, this);
 145         this.responseHandler = pushGroup.mainResponseHandler();
 146     }
 147 
 148 //    CompletableFuture<Void> multiCompletionCF() {
 149 //        return pushGroup.groupResult();
 150 //    }
 151 
 152     private synchronized Exchange<T> getExchange() {
 153         return exchange;
 154     }
 155 
 156     HttpClientImpl client() {
 157         return client;
 158     }
 159 
 160 //    HttpClient.Redirect followRedirects() {
 161 //        return client.followRedirects();
 162 //    }
 163 
 164     HttpClient.Version version() {
 165         return request.version().orElse(client.version());
 166     }
 167 
 168     private synchronized void setExchange(Exchange<T> exchange) {
 169         if (this.exchange != null && exchange != this.exchange) {
 170             this.exchange.released();
 171         }
 172         this.exchange = exchange;
 173     }
 174 
 175     private void cancelTimer() {
 176         if (timedEvent != null) {
 177             client.cancelTimer(timedEvent);
 178         }
 179     }
 180 
 181     private void requestFilters(HttpRequestImpl r) throws IOException {
 182         Log.logTrace("Applying request filters");
 183         for (HeaderFilter filter : filters) {
 184             Log.logTrace("Applying {0}", filter);
 185             filter.request(r, this);
 186         }
 187         Log.logTrace("All filters applied");
 188     }
 189 
 190     private HttpRequestImpl responseFilters(Response response) throws IOException
 191     {
 192         Log.logTrace("Applying response filters");
 193         for (HeaderFilter filter : filters) {
 194             Log.logTrace("Applying {0}", filter);
 195             HttpRequestImpl newreq = filter.response(response);
 196             if (newreq != null) {
 197                 Log.logTrace("New request: stopping filters");
 198                 return newreq;
 199             }
 200         }
 201         Log.logTrace("All filters applied");
 202         return null;
 203     }
 204 
 205 //    public void cancel() {
 206 //        cancelled = true;
 207 //        getExchange().cancel();
 208 //    }
 209 
 210     public void cancel(IOException cause) {
 211         cancelled = true;
 212         getExchange().cancel(cause);
 213     }
 214 
 215     public CompletableFuture<HttpResponse<T>> responseAsync() {
 216         CompletableFuture<Void> start = new MinimalFuture<>();
 217         CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
 218         start.completeAsync( () -> null, executor); // trigger execution
 219         return cf;
 220     }
 221 
 222     private CompletableFuture<HttpResponse<T>>
 223     responseAsync0(CompletableFuture<Void> start) {
 224         return start.thenCompose( v -> responseAsyncImpl())
 225                     .thenCompose((Response r) -> {
 226                         Exchange<T> exch = getExchange();
 227                         return exch.readBodyAsync(responseHandler)
 228                             .thenApply((T body) -> {
 229                                 this.response =
 230                                     new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
 231                                 return this.response;
 232                             });
 233                     });
 234     }
 235 
 236     CompletableFuture<U> multiResponseAsync() {
 237         CompletableFuture<Void> start = new MinimalFuture<>();
 238         CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
 239         CompletableFuture<HttpResponse<T>> mainResponse =
 240                 cf.thenApply(b -> {
 241                         multiResponseSubscriber.onResponse(b);
 242                         pushGroup.noMorePushes(true);
 243                         return b; });
 244         pushGroup.setMainResponse(mainResponse);
 245         CompletableFuture<U> res = multiResponseSubscriber.completion(pushGroup.groupResult(),
 246                                                                       pushGroup.pushesCF());
 247         start.completeAsync( () -> null, executor); // trigger execution
 248         return res;
 249     }
 250 
 251     private CompletableFuture<Response> responseAsyncImpl() {
 252         CompletableFuture<Response> cf;
 253         if (attempts.incrementAndGet() > max_attempts) {
 254             cf = failedFuture(new IOException("Too many retries", retryCause));
 255         } else {
 256             if (currentreq.timeout().isPresent()) {
 257                 timedEvent = new TimedEvent(currentreq.timeout().get());
 258                 client.registerTimer(timedEvent);
 259             }
 260             try {
 261                 // 1. apply request filters
 262                 requestFilters(currentreq);
 263             } catch (IOException e) {
 264                 return failedFuture(e);
 265             }
 266             Exchange<T> exch = getExchange();
 267             // 2. get response
 268             cf = exch.responseAsync()
 269                      .thenCompose((Response response) -> {
 270                         HttpRequestImpl newrequest;
 271                         try {
 272                             // 3. apply response filters
 273                             newrequest = responseFilters(response);
 274                         } catch (IOException e) {
 275                             return failedFuture(e);
 276                         }
 277                         // 4. check filter result and repeat or continue
 278                         if (newrequest == null) {
 279                             if (attempts.get() > 1) {
 280                                 Log.logError("Succeeded on attempt: " + attempts);
 281                             }
 282                             return completedFuture(response);
 283                         } else {
 284                             this.response =
 285                                 new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
 286                             Exchange<T> oldExch = exch;
 287                             return exch.ignoreBody().handle((r,t) -> {
 288                                 currentreq = newrequest;
 289                                 expiredOnce = false;
 290                                 setExchange(new Exchange<>(currentreq, this, acc));
 291                                 return responseAsyncImpl();
 292                             }).thenCompose(Function.identity());
 293                         } })
 294                      .handle((response, ex) -> {
 295                         // 5. handle errors and cancel any timer set
 296                         cancelTimer();
 297                         if (ex == null) {
 298                             assert response != null;
 299                             return completedFuture(response);
 300                         }
 301                         // all exceptions thrown are handled here
 302                         CompletableFuture<Response> errorCF = getExceptionalCF(ex);
 303                         if (errorCF == null) {
 304                             return responseAsyncImpl();
 305                         } else {
 306                             return errorCF;
 307                         } })
 308                      .thenCompose(Function.identity());
 309         }
 310         return cf;
 311     }
 312 
 313     /**
 314      * Takes a Throwable and returns a suitable CompletableFuture that is
 315      * completed exceptionally, or null.
 316      */
 317     private CompletableFuture<Response> getExceptionalCF(Throwable t) {
 318         if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
 319             if (t.getCause() != null) {
 320                 t = t.getCause();
 321             }
 322         }
 323         if (cancelled && t instanceof IOException) {
 324             t = new HttpTimeoutException("request timed out");
 325         } else if (t instanceof ConnectionExpiredException) {
 326             // allow the retry mechanism to do its work
 327             // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? )
 328             if (t.getCause() != null) retryCause = t.getCause();
 329             if (!expiredOnce) {
 330                 DEBUG_LOGGER.log(Level.DEBUG,
 331                     "MultiExchange: ConnectionExpiredException (async): retrying...",
 332                     t);
 333                 expiredOnce = true;
 334                 return null;
 335             } else {
 336                 DEBUG_LOGGER.log(Level.DEBUG,
 337                     "MultiExchange: ConnectionExpiredException (async): already retried once.",
 338                     t);
 339                 if (t.getCause() != null) t = t.getCause();
 340             }
 341         }
 342         return failedFuture(t);
 343     }
 344 
 345     class TimedEvent extends TimeoutEvent {
 346         TimedEvent(Duration duration) {
 347             super(duration);
 348         }
 349         @Override
 350         public void handle() {
 351             DEBUG_LOGGER.log(Level.DEBUG,
 352                     "Cancelling MultiExchange due to timeout for request %s",
 353                      request);
 354             cancel(new HttpTimeoutException("request timed out"));
 355         }
 356     }
 357 }