1 /* 2 * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.IOException; 29 import java.net.ConnectException; 30 import java.net.http.HttpConnectTimeoutException; 31 import java.time.Duration; 32 import java.util.Iterator; 33 import java.util.LinkedList; 34 import java.security.AccessControlContext; 35 import java.util.Objects; 36 import java.util.Optional; 37 import java.util.concurrent.CompletableFuture; 38 import java.util.concurrent.CompletionStage; 39 import java.util.concurrent.CompletionException; 40 import java.util.concurrent.ExecutionException; 41 import java.util.concurrent.Executor; 42 import java.util.concurrent.Flow; 43 import java.util.concurrent.atomic.AtomicInteger; 44 import java.util.concurrent.atomic.AtomicLong; 45 import java.util.function.Function; 46 47 import java.net.http.HttpClient; 48 import java.net.http.HttpHeaders; 49 import java.net.http.HttpRequest; 50 import java.net.http.HttpResponse; 51 import java.net.http.HttpResponse.BodySubscriber; 52 import java.net.http.HttpResponse.PushPromiseHandler; 53 import java.net.http.HttpTimeoutException; 54 import jdk.internal.net.http.common.Log; 55 import jdk.internal.net.http.common.Logger; 56 import jdk.internal.net.http.common.MinimalFuture; 57 import jdk.internal.net.http.common.ConnectionExpiredException; 58 import jdk.internal.net.http.common.Utils; 59 import static jdk.internal.net.http.common.MinimalFuture.completedFuture; 60 import static jdk.internal.net.http.common.MinimalFuture.failedFuture; 61 62 /** 63 * Encapsulates multiple Exchanges belonging to one HttpRequestImpl. 64 * - manages filters 65 * - retries due to filters. 66 * - I/O errors and most other exceptions get returned directly to user 67 * 68 * Creates a new Exchange for each request/response interaction 69 */ 70 class MultiExchange<T> { 71 72 static final Logger debug = 73 Utils.getDebugLogger("MultiExchange"::toString, Utils.DEBUG); 74 75 private final HttpRequest userRequest; // the user request 76 private final HttpRequestImpl request; // a copy of the user request 77 private final ConnectTimeoutTracker connectTimeout; // null if no timeout 78 final AccessControlContext acc; 79 final HttpClientImpl client; 80 final HttpResponse.BodyHandler<T> responseHandler; 81 final HttpClientImpl.DelegatingExecutor executor; 82 final AtomicInteger attempts = new AtomicInteger(); 83 HttpRequestImpl currentreq; // used for retries & redirect 84 HttpRequestImpl previousreq; // used for retries & redirect 85 Exchange<T> exchange; // the current exchange 86 Exchange<T> previous; 87 volatile Throwable retryCause; 88 volatile boolean expiredOnce; 89 volatile HttpResponse<T> response = null; 90 91 // Maximum number of times a request will be retried/redirected 92 // for any reason 93 94 static final int DEFAULT_MAX_ATTEMPTS = 5; 95 static final int max_attempts = Utils.getIntegerNetProperty( 96 "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS 97 ); 98 99 private final LinkedList<HeaderFilter> filters; 100 ResponseTimerEvent responseTimerEvent; 101 volatile boolean cancelled; 102 final PushGroup<T> pushGroup; 103 104 /** 105 * Filter fields. These are attached as required by filters 106 * and only used by the filter implementations. This could be 107 * generalised into Objects that are passed explicitly to the filters 108 * (one per MultiExchange object, and one per Exchange object possibly) 109 */ 110 volatile AuthenticationFilter.AuthInfo serverauth, proxyauth; 111 // RedirectHandler 112 volatile int numberOfRedirects = 0; 113 114 // This class is used to keep track of the connection timeout 115 // across retries, when a ConnectException causes a retry. 116 // In that case - we will retry the connect, but we don't 117 // want to double the timeout by starting a new timer with 118 // the full connectTimeout again. 119 // Instead we use the ConnectTimeoutTracker to return a new 120 // duration that takes into account the time spent in the 121 // first connect attempt. 122 // If however, the connection gets connected, but we later 123 // retry the whole operation, then we reset the timer before 124 // retrying (since the connection used for the second request 125 // will not necessarily be the same: it could be a new 126 // unconnected connection) - see getExceptionalCF(). 127 private static final class ConnectTimeoutTracker { 128 final Duration max; 129 final AtomicLong startTime = new AtomicLong(); 130 ConnectTimeoutTracker(Duration connectTimeout) { 131 this.max = Objects.requireNonNull(connectTimeout); 132 } 133 134 Duration getRemaining() { 135 long now = System.nanoTime(); 136 long previous = startTime.compareAndExchange(0, now); 137 if (previous == 0 || max.isZero()) return max; 138 Duration remaining = max.minus(Duration.ofNanos(now - previous)); 139 assert remaining.compareTo(max) <= 0; 140 return remaining.isNegative() ? Duration.ZERO : remaining; 141 } 142 143 void reset() { startTime.set(0); } 144 } 145 146 /** 147 * MultiExchange with one final response. 148 */ 149 MultiExchange(HttpRequest userRequest, 150 HttpRequestImpl requestImpl, 151 HttpClientImpl client, 152 HttpResponse.BodyHandler<T> responseHandler, 153 PushPromiseHandler<T> pushPromiseHandler, 154 AccessControlContext acc) { 155 this.previous = null; 156 this.userRequest = userRequest; 157 this.request = requestImpl; 158 this.currentreq = request; 159 this.previousreq = null; 160 this.client = client; 161 this.filters = client.filterChain(); 162 this.acc = acc; 163 this.executor = client.theExecutor(); 164 this.responseHandler = responseHandler; 165 166 if (pushPromiseHandler != null) { 167 Executor executor = acc == null 168 ? this.executor.delegate() 169 : new PrivilegedExecutor(this.executor.delegate(), acc); 170 this.pushGroup = new PushGroup<>(pushPromiseHandler, request, executor); 171 } else { 172 pushGroup = null; 173 } 174 this.connectTimeout = client.connectTimeout() 175 .map(ConnectTimeoutTracker::new).orElse(null); 176 this.exchange = new Exchange<>(request, this); 177 } 178 179 synchronized Exchange<T> getExchange() { 180 return exchange; 181 } 182 183 HttpClientImpl client() { 184 return client; 185 } 186 187 HttpClient.Version version() { 188 HttpClient.Version vers = request.version().orElse(client.version()); 189 if (vers == HttpClient.Version.HTTP_2 && !request.secure() && request.proxy() != null) 190 vers = HttpClient.Version.HTTP_1_1; 191 return vers; 192 } 193 194 private synchronized void setExchange(Exchange<T> exchange) { 195 if (this.exchange != null && exchange != this.exchange) { 196 this.exchange.released(); 197 } 198 this.exchange = exchange; 199 } 200 201 public Optional<Duration> remainingConnectTimeout() { 202 return Optional.ofNullable(connectTimeout) 203 .map(ConnectTimeoutTracker::getRemaining); 204 } 205 206 private void cancelTimer() { 207 if (responseTimerEvent != null) { 208 client.cancelTimer(responseTimerEvent); 209 } 210 } 211 212 private void requestFilters(HttpRequestImpl r) throws IOException { 213 Log.logTrace("Applying request filters"); 214 for (HeaderFilter filter : filters) { 215 Log.logTrace("Applying {0}", filter); 216 filter.request(r, this); 217 } 218 Log.logTrace("All filters applied"); 219 } 220 221 private HttpRequestImpl responseFilters(Response response) throws IOException 222 { 223 Log.logTrace("Applying response filters"); 224 Iterator<HeaderFilter> reverseItr = filters.descendingIterator(); 225 while (reverseItr.hasNext()) { 226 HeaderFilter filter = reverseItr.next(); 227 Log.logTrace("Applying {0}", filter); 228 HttpRequestImpl newreq = filter.response(response); 229 if (newreq != null) { 230 Log.logTrace("New request: stopping filters"); 231 return newreq; 232 } 233 } 234 Log.logTrace("All filters applied"); 235 return null; 236 } 237 238 public void cancel(IOException cause) { 239 cancelled = true; 240 getExchange().cancel(cause); 241 } 242 243 public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) { 244 CompletableFuture<Void> start = new MinimalFuture<>(); 245 CompletableFuture<HttpResponse<T>> cf = responseAsync0(start); 246 start.completeAsync( () -> null, executor); // trigger execution 247 return cf; 248 } 249 250 // return true if the response is a type where a response body is never possible 251 // and therefore doesn't have to include header information which indicates no 252 // body is present. This is distinct from responses that also do not contain 253 // response bodies (possibly ever) but which are required to have content length 254 // info in the header (eg 205). Those cases do not have to be handled specially 255 256 private static boolean bodyNotPermitted(Response r) { 257 return r.statusCode == 204; 258 } 259 260 private boolean bodyIsPresent(Response r) { 261 HttpHeaders headers = r.headers(); 262 if (headers.firstValueAsLong("Content-length").orElse(0L) != 0L) 263 return true; 264 if (headers.firstValue("Transfer-encoding").isPresent()) 265 return true; 266 return false; 267 } 268 269 // Call the user's body handler to get an empty body object 270 271 private CompletableFuture<HttpResponse<T>> handleNoBody(Response r, Exchange<T> exch) { 272 BodySubscriber<T> bs = responseHandler.apply(new ResponseInfoImpl(r.statusCode(), 273 r.headers(), r.version())); 274 CompletionStage<T> cs = bs.getBody(); 275 bs.onSubscribe(new NullSubscription()); 276 bs.onComplete(); 277 MinimalFuture<HttpResponse<T>> result = new MinimalFuture<>(); 278 cs.whenComplete((nullBody, exception) -> { 279 if (exception != null) 280 result.completeExceptionally(exception); 281 else { 282 this.response = 283 new HttpResponseImpl<>(r.request(), r, this.response, nullBody, exch); 284 result.complete(this.response); 285 } 286 }); 287 // ensure that the connection is closed or returned to the pool. 288 return result.whenComplete(exch::nullBody); 289 } 290 291 private CompletableFuture<HttpResponse<T>> 292 responseAsync0(CompletableFuture<Void> start) { 293 return start.thenCompose( v -> responseAsyncImpl()) 294 .thenCompose((Response r) -> { 295 Exchange<T> exch = getExchange(); 296 if (bodyNotPermitted(r)) { 297 if (bodyIsPresent(r)) { 298 IOException ioe = new IOException( 299 "unexpected content length header with 204 response"); 300 exch.cancel(ioe); 301 return MinimalFuture.failedFuture(ioe); 302 } else 303 return handleNoBody(r, exch); 304 } 305 return exch.readBodyAsync(responseHandler) 306 .thenApply((T body) -> { 307 this.response = 308 new HttpResponseImpl<>(r.request(), r, this.response, body, exch); 309 return this.response; 310 }); 311 }); 312 } 313 314 static class NullSubscription implements Flow.Subscription { 315 @Override 316 public void request(long n) { 317 } 318 319 @Override 320 public void cancel() { 321 } 322 } 323 324 private CompletableFuture<Response> responseAsyncImpl() { 325 CompletableFuture<Response> cf; 326 if (attempts.incrementAndGet() > max_attempts) { 327 cf = failedFuture(new IOException("Too many retries", retryCause)); 328 } else { 329 if (currentreq.timeout().isPresent()) { 330 responseTimerEvent = ResponseTimerEvent.of(this); 331 client.registerTimer(responseTimerEvent); 332 } 333 try { 334 // 1. apply request filters 335 // if currentreq == previousreq the filters have already 336 // been applied once. Applying them a second time might 337 // cause some headers values to be added twice: for 338 // instance, the same cookie might be added again. 339 if (currentreq != previousreq) { 340 requestFilters(currentreq); 341 } 342 } catch (IOException e) { 343 return failedFuture(e); 344 } 345 Exchange<T> exch = getExchange(); 346 // 2. get response 347 cf = exch.responseAsync() 348 .thenCompose((Response response) -> { 349 HttpRequestImpl newrequest; 350 try { 351 // 3. apply response filters 352 newrequest = responseFilters(response); 353 } catch (IOException e) { 354 return failedFuture(e); 355 } 356 // 4. check filter result and repeat or continue 357 if (newrequest == null) { 358 if (attempts.get() > 1) { 359 Log.logError("Succeeded on attempt: " + attempts); 360 } 361 return completedFuture(response); 362 } else { 363 this.response = 364 new HttpResponseImpl<>(currentreq, response, this.response, null, exch); 365 Exchange<T> oldExch = exch; 366 if (currentreq.isWebSocket()) { 367 // need to close the connection and open a new one. 368 exch.exchImpl.connection().close(); 369 } 370 return exch.ignoreBody().handle((r,t) -> { 371 previousreq = currentreq; 372 currentreq = newrequest; 373 expiredOnce = false; 374 setExchange(new Exchange<>(currentreq, this, acc)); 375 return responseAsyncImpl(); 376 }).thenCompose(Function.identity()); 377 } }) 378 .handle((response, ex) -> { 379 // 5. handle errors and cancel any timer set 380 cancelTimer(); 381 if (ex == null) { 382 assert response != null; 383 return completedFuture(response); 384 } 385 // all exceptions thrown are handled here 386 CompletableFuture<Response> errorCF = getExceptionalCF(ex); 387 if (errorCF == null) { 388 return responseAsyncImpl(); 389 } else { 390 return errorCF; 391 } }) 392 .thenCompose(Function.identity()); 393 } 394 return cf; 395 } 396 397 private static boolean retryPostValue() { 398 String s = Utils.getNetProperty("jdk.httpclient.enableAllMethodRetry"); 399 if (s == null) 400 return false; 401 return s.isEmpty() ? true : Boolean.parseBoolean(s); 402 } 403 404 private static boolean disableRetryConnect() { 405 String s = Utils.getNetProperty("jdk.httpclient.disableRetryConnect"); 406 if (s == null) 407 return false; 408 return s.isEmpty() ? true : Boolean.parseBoolean(s); 409 } 410 411 /** True if ALL ( even non-idempotent ) requests can be automatic retried. */ 412 private static final boolean RETRY_ALWAYS = retryPostValue(); 413 /** True if ConnectException should cause a retry. Enabled by default */ 414 private static final boolean RETRY_CONNECT = !disableRetryConnect(); 415 416 /** Returns true is given request has an idempotent method. */ 417 private static boolean isIdempotentRequest(HttpRequest request) { 418 String method = request.method(); 419 switch (method) { 420 case "GET" : 421 case "HEAD" : 422 return true; 423 default : 424 return false; 425 } 426 } 427 428 /** Returns true if the given request can be automatically retried. */ 429 private static boolean canRetryRequest(HttpRequest request) { 430 if (RETRY_ALWAYS) 431 return true; 432 if (isIdempotentRequest(request)) 433 return true; 434 return false; 435 } 436 437 private boolean retryOnFailure(Throwable t) { 438 return t instanceof ConnectionExpiredException 439 || (RETRY_CONNECT && (t instanceof ConnectException)); 440 } 441 442 private Throwable retryCause(Throwable t) { 443 Throwable cause = t instanceof ConnectionExpiredException ? t.getCause() : t; 444 return cause == null ? t : cause; 445 } 446 447 /** 448 * Takes a Throwable and returns a suitable CompletableFuture that is 449 * completed exceptionally, or null. 450 */ 451 private CompletableFuture<Response> getExceptionalCF(Throwable t) { 452 if ((t instanceof CompletionException) || (t instanceof ExecutionException)) { 453 if (t.getCause() != null) { 454 t = t.getCause(); 455 } 456 } 457 if (cancelled && t instanceof IOException) { 458 if (!(t instanceof HttpTimeoutException)) { 459 t = toTimeoutException((IOException)t); 460 } 461 } else if (retryOnFailure(t)) { 462 Throwable cause = retryCause(t); 463 464 if (!(t instanceof ConnectException)) { 465 // we may need to start a new connection, and if so 466 // we want to start with a fresh connect timeout again. 467 if (connectTimeout != null) connectTimeout.reset(); 468 if (!canRetryRequest(currentreq)) { 469 return failedFuture(cause); // fails with original cause 470 } 471 } // ConnectException: retry, but don't reset the connectTimeout. 472 473 // allow the retry mechanism to do its work 474 retryCause = cause; 475 if (!expiredOnce) { 476 if (debug.on()) 477 debug.log(t.getClass().getSimpleName() + " (async): retrying...", t); 478 expiredOnce = true; 479 // The connection was abruptly closed. 480 // We return null to retry the same request a second time. 481 // The request filters have already been applied to the 482 // currentreq, so we set previousreq = currentreq to 483 // prevent them from being applied again. 484 previousreq = currentreq; 485 return null; 486 } else { 487 if (debug.on()) { 488 debug.log(t.getClass().getSimpleName() 489 + " (async): already retried once.", t); 490 } 491 t = cause; 492 } 493 } 494 return failedFuture(t); 495 } 496 497 private HttpTimeoutException toTimeoutException(IOException ioe) { 498 HttpTimeoutException t = null; 499 500 // more specific, "request timed out", when connected 501 Exchange<?> exchange = getExchange(); 502 if (exchange != null) { 503 ExchangeImpl<?> exchangeImpl = exchange.exchImpl; 504 if (exchangeImpl != null) { 505 if (exchangeImpl.connection().connected()) { 506 t = new HttpTimeoutException("request timed out"); 507 t.initCause(ioe); 508 } 509 } 510 } 511 if (t == null) { 512 t = new HttpConnectTimeoutException("HTTP connect timed out"); 513 t.initCause(new ConnectException("HTTP connect timed out")); 514 } 515 return t; 516 } 517 }