1 /*
   2  * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;
  29 import java.net.ConnectException;
  30 import java.net.http.HttpConnectTimeoutException;
  31 import java.time.Duration;
  32 import java.util.Iterator;
  33 import java.util.LinkedList;
  34 import java.security.AccessControlContext;
  35 import java.util.Objects;
  36 import java.util.Optional;
  37 import java.util.concurrent.CompletableFuture;
  38 import java.util.concurrent.CompletionStage;
  39 import java.util.concurrent.CompletionException;
  40 import java.util.concurrent.ExecutionException;
  41 import java.util.concurrent.Executor;
  42 import java.util.concurrent.Flow;
  43 import java.util.concurrent.atomic.AtomicInteger;
  44 import java.util.concurrent.atomic.AtomicLong;
  45 import java.util.function.Function;
  46 
  47 import java.net.http.HttpClient;
  48 import java.net.http.HttpHeaders;
  49 import java.net.http.HttpRequest;
  50 import java.net.http.HttpResponse;
  51 import java.net.http.HttpResponse.BodySubscriber;
  52 import java.net.http.HttpResponse.PushPromiseHandler;
  53 import java.net.http.HttpTimeoutException;
  54 import jdk.internal.net.http.common.Log;
  55 import jdk.internal.net.http.common.Logger;
  56 import jdk.internal.net.http.common.MinimalFuture;
  57 import jdk.internal.net.http.common.ConnectionExpiredException;
  58 import jdk.internal.net.http.common.Utils;
  59 import static jdk.internal.net.http.common.MinimalFuture.completedFuture;
  60 import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
  61 
  62 /**
  63  * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
  64  * - manages filters
  65  * - retries due to filters.
  66  * - I/O errors and most other exceptions get returned directly to user
  67  *
  68  * Creates a new Exchange for each request/response interaction
  69  */
  70 class MultiExchange<T> {
  71 
  72     static final Logger debug =
  73             Utils.getDebugLogger("MultiExchange"::toString, Utils.DEBUG);
  74 
  75     private final HttpRequest userRequest; // the user request
  76     private final HttpRequestImpl request; // a copy of the user request
  77     private final ConnectTimeoutTracker connectTimeout; // null if no timeout
  78     final AccessControlContext acc;
  79     final HttpClientImpl client;
  80     final HttpResponse.BodyHandler<T> responseHandler;
  81     final HttpClientImpl.DelegatingExecutor executor;
  82     final AtomicInteger attempts = new AtomicInteger();
  83     HttpRequestImpl currentreq; // used for retries & redirect
  84     HttpRequestImpl previousreq; // used for retries & redirect
  85     Exchange<T> exchange; // the current exchange
  86     Exchange<T> previous;
  87     volatile Throwable retryCause;
  88     volatile boolean expiredOnce;
  89     volatile HttpResponse<T> response = null;
  90 
  91     // Maximum number of times a request will be retried/redirected
  92     // for any reason
  93 
  94     static final int DEFAULT_MAX_ATTEMPTS = 5;
  95     static final int max_attempts = Utils.getIntegerNetProperty(
  96             "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
  97     );
  98 
  99     private final LinkedList<HeaderFilter> filters;
 100     ResponseTimerEvent responseTimerEvent;
 101     volatile boolean cancelled;
 102     final PushGroup<T> pushGroup;
 103 
 104     /**
 105      * Filter fields. These are attached as required by filters
 106      * and only used by the filter implementations. This could be
 107      * generalised into Objects that are passed explicitly to the filters
 108      * (one per MultiExchange object, and one per Exchange object possibly)
 109      */
 110     volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
 111     // RedirectHandler
 112     volatile int numberOfRedirects = 0;
 113 
 114     // This class is used to keep track of the connection timeout
 115     // across retries, when a ConnectException causes a retry.
 116     // In that case - we will retry the connect, but we don't
 117     // want to double the timeout by starting a new timer with
 118     // the full connectTimeout again.
 119     // Instead we use the ConnectTimeoutTracker to return a new
 120     // duration that takes into account the time spent in the
 121     // first connect attempt.
 122     // If however, the connection gets connected, but we later
 123     // retry the whole operation, then we reset the timer before
 124     // retrying (since the connection used for the second request
 125     // will not necessarily be the same: it could be a new
 126     // unconnected connection) - see getExceptionalCF().
 127     private static final class ConnectTimeoutTracker {
 128         final Duration max;
 129         final AtomicLong startTime = new AtomicLong();
 130         ConnectTimeoutTracker(Duration connectTimeout) {
 131             this.max = Objects.requireNonNull(connectTimeout);
 132         }
 133 
 134         Duration getRemaining() {
 135             long now = System.nanoTime();
 136             long previous = startTime.compareAndExchange(0, now);
 137             if (previous == 0 || max.isZero()) return max;
 138             Duration remaining = max.minus(Duration.ofNanos(now - previous));
 139             assert remaining.compareTo(max) <= 0;
 140             return remaining.isNegative() ? Duration.ZERO : remaining;
 141         }
 142 
 143         void reset() { startTime.set(0); }
 144     }
 145 
 146     /**
 147      * MultiExchange with one final response.
 148      */
 149     MultiExchange(HttpRequest userRequest,
 150                   HttpRequestImpl requestImpl,
 151                   HttpClientImpl client,
 152                   HttpResponse.BodyHandler<T> responseHandler,
 153                   PushPromiseHandler<T> pushPromiseHandler,
 154                   AccessControlContext acc) {
 155         this.previous = null;
 156         this.userRequest = userRequest;
 157         this.request = requestImpl;
 158         this.currentreq = request;
 159         this.previousreq = null;
 160         this.client = client;
 161         this.filters = client.filterChain();
 162         this.acc = acc;
 163         this.executor = client.theExecutor();
 164         this.responseHandler = responseHandler;
 165 
 166         if (pushPromiseHandler != null) {
 167             Executor executor = acc == null
 168                     ? this.executor.delegate()
 169                     : new PrivilegedExecutor(this.executor.delegate(), acc);
 170             this.pushGroup = new PushGroup<>(pushPromiseHandler, request, executor);
 171         } else {
 172             pushGroup = null;
 173         }
 174         this.connectTimeout = client.connectTimeout()
 175                 .map(ConnectTimeoutTracker::new).orElse(null);
 176         this.exchange = new Exchange<>(request, this);
 177     }
 178 
 179     synchronized Exchange<T> getExchange() {
 180         return exchange;
 181     }
 182 
 183     HttpClientImpl client() {
 184         return client;
 185     }
 186 
 187     HttpClient.Version version() {
 188         HttpClient.Version vers = request.version().orElse(client.version());
 189         if (vers == HttpClient.Version.HTTP_2 && !request.secure() && request.proxy() != null)
 190             vers = HttpClient.Version.HTTP_1_1;
 191         return vers;
 192     }
 193 
 194     private synchronized void setExchange(Exchange<T> exchange) {
 195         if (this.exchange != null && exchange != this.exchange) {
 196             this.exchange.released();
 197         }
 198         this.exchange = exchange;
 199     }
 200 
 201     public Optional<Duration> remainingConnectTimeout() {
 202         return Optional.ofNullable(connectTimeout)
 203                 .map(ConnectTimeoutTracker::getRemaining);
 204     }
 205 
 206     private void cancelTimer() {
 207         if (responseTimerEvent != null) {
 208             client.cancelTimer(responseTimerEvent);
 209         }
 210     }
 211 
 212     private void requestFilters(HttpRequestImpl r) throws IOException {
 213         Log.logTrace("Applying request filters");
 214         for (HeaderFilter filter : filters) {
 215             Log.logTrace("Applying {0}", filter);
 216             filter.request(r, this);
 217         }
 218         Log.logTrace("All filters applied");
 219     }
 220 
 221     private HttpRequestImpl responseFilters(Response response) throws IOException
 222     {
 223         Log.logTrace("Applying response filters");
 224         Iterator<HeaderFilter> reverseItr = filters.descendingIterator();
 225         while (reverseItr.hasNext()) {
 226             HeaderFilter filter = reverseItr.next();
 227             Log.logTrace("Applying {0}", filter);
 228             HttpRequestImpl newreq = filter.response(response);
 229             if (newreq != null) {
 230                 Log.logTrace("New request: stopping filters");
 231                 return newreq;
 232             }
 233         }
 234         Log.logTrace("All filters applied");
 235         return null;
 236     }
 237 
 238     public void cancel(IOException cause) {
 239         cancelled = true;
 240         getExchange().cancel(cause);
 241     }
 242 
 243     public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) {
 244         CompletableFuture<Void> start = new MinimalFuture<>();
 245         CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
 246         start.completeAsync( () -> null, executor); // trigger execution
 247         return cf;
 248     }
 249 
 250     // return true if the response is a type where a response body is never possible
 251     // and therefore doesn't have to include header information which indicates no
 252     // body is present. This is distinct from responses that also do not contain
 253     // response bodies (possibly ever) but which are required to have content length
 254     // info in the header (eg 205). Those cases do not have to be handled specially
 255 
 256     private static boolean bodyNotPermitted(Response r) {
 257         return r.statusCode == 204;
 258     }
 259 
 260     private boolean bodyIsPresent(Response r) {
 261         HttpHeaders headers = r.headers();
 262         if (headers.firstValueAsLong("Content-length").orElse(0L) != 0L)
 263             return true;
 264         if (headers.firstValue("Transfer-encoding").isPresent())
 265             return true;
 266         return false;
 267     }
 268 
 269     // Call the user's body handler to get an empty body object
 270 
 271     private CompletableFuture<HttpResponse<T>> handleNoBody(Response r, Exchange<T> exch) {
 272         BodySubscriber<T> bs = responseHandler.apply(new ResponseInfoImpl(r.statusCode(),
 273                 r.headers(), r.version()));
 274         CompletionStage<T> cs = bs.getBody();
 275         bs.onSubscribe(new NullSubscription());
 276         bs.onComplete();
 277         MinimalFuture<HttpResponse<T>> result = new MinimalFuture<>();
 278         cs.whenComplete((nullBody, exception) -> {
 279             if (exception != null)
 280                 result.completeExceptionally(exception);
 281             else {
 282                 this.response =
 283                         new HttpResponseImpl<>(r.request(), r, this.response, nullBody, exch);
 284                 result.complete(this.response);
 285             }
 286         });
 287         // ensure that the connection is closed or returned to the pool.
 288         return result.whenComplete(exch::nullBody);
 289     }
 290 
 291     private CompletableFuture<HttpResponse<T>>
 292     responseAsync0(CompletableFuture<Void> start) {
 293         return start.thenCompose( v -> responseAsyncImpl())
 294                     .thenCompose((Response r) -> {
 295                         Exchange<T> exch = getExchange();
 296                         if (bodyNotPermitted(r)) {
 297                             if (bodyIsPresent(r)) {
 298                                 IOException ioe = new IOException(
 299                                     "unexpected content length header with 204 response");
 300                                 exch.cancel(ioe);
 301                                 return MinimalFuture.failedFuture(ioe);
 302                             } else
 303                                 return handleNoBody(r, exch);
 304                         }
 305                         return exch.readBodyAsync(responseHandler)
 306                             .thenApply((T body) -> {
 307                                 this.response =
 308                                     new HttpResponseImpl<>(r.request(), r, this.response, body, exch);
 309                                 return this.response;
 310                             });
 311                     });
 312     }
 313 
 314     static class NullSubscription implements Flow.Subscription {
 315         @Override
 316         public void request(long n) {
 317         }
 318 
 319         @Override
 320         public void cancel() {
 321         }
 322     }
 323 
 324     private CompletableFuture<Response> responseAsyncImpl() {
 325         CompletableFuture<Response> cf;
 326         if (attempts.incrementAndGet() > max_attempts) {
 327             cf = failedFuture(new IOException("Too many retries", retryCause));
 328         } else {
 329             if (currentreq.timeout().isPresent()) {
 330                 responseTimerEvent = ResponseTimerEvent.of(this);
 331                 client.registerTimer(responseTimerEvent);
 332             }
 333             try {
 334                 // 1. apply request filters
 335                 // if currentreq == previousreq the filters have already
 336                 // been applied once. Applying them a second time might
 337                 // cause some headers values to be added twice: for
 338                 // instance, the same cookie might be added again.
 339                 if (currentreq != previousreq) {
 340                     requestFilters(currentreq);
 341                 }
 342             } catch (IOException e) {
 343                 return failedFuture(e);
 344             }
 345             Exchange<T> exch = getExchange();
 346             // 2. get response
 347             cf = exch.responseAsync()
 348                      .thenCompose((Response response) -> {
 349                         HttpRequestImpl newrequest;
 350                         try {
 351                             // 3. apply response filters
 352                             newrequest = responseFilters(response);
 353                         } catch (IOException e) {
 354                             return failedFuture(e);
 355                         }
 356                         // 4. check filter result and repeat or continue
 357                         if (newrequest == null) {
 358                             if (attempts.get() > 1) {
 359                                 Log.logError("Succeeded on attempt: " + attempts);
 360                             }
 361                             return completedFuture(response);
 362                         } else {
 363                             this.response =
 364                                 new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
 365                             Exchange<T> oldExch = exch;
 366                             if (currentreq.isWebSocket()) {
 367                                 // need to close the connection and open a new one.
 368                                 exch.exchImpl.connection().close();
 369                             }
 370                             return exch.ignoreBody().handle((r,t) -> {
 371                                 previousreq = currentreq;
 372                                 currentreq = newrequest;
 373                                 expiredOnce = false;
 374                                 setExchange(new Exchange<>(currentreq, this, acc));
 375                                 return responseAsyncImpl();
 376                             }).thenCompose(Function.identity());
 377                         } })
 378                      .handle((response, ex) -> {
 379                         // 5. handle errors and cancel any timer set
 380                         cancelTimer();
 381                         if (ex == null) {
 382                             assert response != null;
 383                             return completedFuture(response);
 384                         }
 385                         // all exceptions thrown are handled here
 386                         CompletableFuture<Response> errorCF = getExceptionalCF(ex);
 387                         if (errorCF == null) {
 388                             return responseAsyncImpl();
 389                         } else {
 390                             return errorCF;
 391                         } })
 392                      .thenCompose(Function.identity());
 393         }
 394         return cf;
 395     }
 396 
 397     private static boolean retryPostValue() {
 398         String s = Utils.getNetProperty("jdk.httpclient.enableAllMethodRetry");
 399         if (s == null)
 400             return false;
 401         return s.isEmpty() ? true : Boolean.parseBoolean(s);
 402     }
 403 
 404     private static boolean disableRetryConnect() {
 405         String s = Utils.getNetProperty("jdk.httpclient.disableRetryConnect");
 406         if (s == null)
 407             return false;
 408         return s.isEmpty() ? true : Boolean.parseBoolean(s);
 409     }
 410 
 411     /** True if ALL ( even non-idempotent ) requests can be automatic retried. */
 412     private static final boolean RETRY_ALWAYS = retryPostValue();
 413     /** True if ConnectException should cause a retry. Enabled by default */
 414     private static final boolean RETRY_CONNECT = !disableRetryConnect();
 415 
 416     /** Returns true is given request has an idempotent method. */
 417     private static boolean isIdempotentRequest(HttpRequest request) {
 418         String method = request.method();
 419         switch (method) {
 420             case "GET" :
 421             case "HEAD" :
 422                 return true;
 423             default :
 424                 return false;
 425         }
 426     }
 427 
 428     /** Returns true if the given request can be automatically retried. */
 429     private static boolean canRetryRequest(HttpRequest request) {
 430         if (RETRY_ALWAYS)
 431             return true;
 432         if (isIdempotentRequest(request))
 433             return true;
 434         return false;
 435     }
 436 
 437     private boolean retryOnFailure(Throwable t) {
 438         return t instanceof ConnectionExpiredException
 439                 || (RETRY_CONNECT && (t instanceof ConnectException));
 440     }
 441 
 442     private Throwable retryCause(Throwable t) {
 443         Throwable cause = t instanceof ConnectionExpiredException ? t.getCause() : t;
 444         return cause == null ? t : cause;
 445     }
 446 
 447     /**
 448      * Takes a Throwable and returns a suitable CompletableFuture that is
 449      * completed exceptionally, or null.
 450      */
 451     private CompletableFuture<Response> getExceptionalCF(Throwable t) {
 452         if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
 453             if (t.getCause() != null) {
 454                 t = t.getCause();
 455             }
 456         }
 457         if (cancelled && t instanceof IOException) {
 458             if (!(t instanceof HttpTimeoutException)) {
 459                 t = toTimeoutException((IOException)t);
 460             }
 461         } else if (retryOnFailure(t)) {
 462             Throwable cause = retryCause(t);
 463 
 464             if (!(t instanceof ConnectException)) {
 465                 // we may need to start a new connection, and if so
 466                 // we want to start with a fresh connect timeout again.
 467                 if (connectTimeout != null) connectTimeout.reset();
 468                 if (!canRetryRequest(currentreq)) {
 469                     return failedFuture(cause); // fails with original cause
 470                 }
 471             } // ConnectException: retry, but don't reset the connectTimeout.
 472 
 473             // allow the retry mechanism to do its work
 474             retryCause = cause;
 475             if (!expiredOnce) {
 476                 if (debug.on())
 477                     debug.log(t.getClass().getSimpleName() + " (async): retrying...", t);
 478                 expiredOnce = true;
 479                 // The connection was abruptly closed.
 480                 // We return null to retry the same request a second time.
 481                 // The request filters have already been applied to the
 482                 // currentreq, so we set previousreq = currentreq to
 483                 // prevent them from being applied again.
 484                 previousreq = currentreq;
 485                 return null;
 486             } else {
 487                 if (debug.on()) {
 488                     debug.log(t.getClass().getSimpleName()
 489                             + " (async): already retried once.", t);
 490                 }
 491                 t = cause;
 492             }
 493         }
 494         return failedFuture(t);
 495     }
 496 
 497     private HttpTimeoutException toTimeoutException(IOException ioe) {
 498         HttpTimeoutException t = null;
 499 
 500         // more specific, "request timed out", when connected
 501         Exchange<?> exchange = getExchange();
 502         if (exchange != null) {
 503             ExchangeImpl<?> exchangeImpl = exchange.exchImpl;
 504             if (exchangeImpl != null) {
 505                 if (exchangeImpl.connection().connected()) {
 506                     t = new HttpTimeoutException("request timed out");
 507                     t.initCause(ioe);
 508                 }
 509             }
 510         }
 511         if (t == null) {
 512             t = new HttpConnectTimeoutException("HTTP connect timed out");
 513             t.initCause(new ConnectException("HTTP connect timed out"));
 514         }
 515         return t;
 516     }
 517 }