1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.time.Duration; 31 import java.util.List; 32 import java.security.AccessControlContext; 33 import java.util.concurrent.CompletableFuture; 34 import java.util.concurrent.CompletionException; 35 import java.util.concurrent.ExecutionException; 36 import java.util.concurrent.Executor; 37 import java.util.concurrent.atomic.AtomicInteger; 38 import java.util.function.Function; 39 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler; 40 import jdk.incubator.http.internal.common.Log; 41 import jdk.incubator.http.internal.common.MinimalFuture; 42 import jdk.incubator.http.internal.common.ConnectionExpiredException; 43 import jdk.incubator.http.internal.common.Utils; 44 import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture; 45 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture; 46 47 /** 48 * Encapsulates multiple Exchanges belonging to one HttpRequestImpl. 49 * - manages filters 50 * - retries due to filters. 51 * - I/O errors and most other exceptions get returned directly to user 52 * 53 * Creates a new Exchange for each request/response interaction 54 */ 55 class MultiExchange<U,T> { 56 57 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 58 static final System.Logger DEBUG_LOGGER = 59 Utils.getDebugLogger("MultiExchange"::toString, DEBUG); 60 61 private final HttpRequest userRequest; // the user request 62 private final HttpRequestImpl request; // a copy of the user request 63 final AccessControlContext acc; 64 final HttpClientImpl client; 65 final HttpResponse.BodyHandler<T> responseHandler; 66 final Executor executor; 67 final HttpResponse.MultiSubscriber<U,T> multiResponseSubscriber; 68 final AtomicInteger attempts = new AtomicInteger(); 69 HttpRequestImpl currentreq; // used for async only 70 Exchange<T> exchange; // the current exchange 71 Exchange<T> previous; 72 volatile Throwable retryCause; 73 volatile boolean expiredOnce; 74 volatile HttpResponse<T> response = null; 75 76 // Maximum number of times a request will be retried/redirected 77 // for any reason 78 79 static final int DEFAULT_MAX_ATTEMPTS = 5; 80 static final int max_attempts = Utils.getIntegerNetProperty( 81 "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS 82 ); 83 84 private final List<HeaderFilter> filters; 85 TimedEvent timedEvent; 86 volatile boolean cancelled; 87 final PushGroup<U,T> pushGroup; 88 89 /** 90 * Filter fields. These are attached as required by filters 91 * and only used by the filter implementations. This could be 92 * generalised into Objects that are passed explicitly to the filters 93 * (one per MultiExchange object, and one per Exchange object possibly) 94 */ 95 volatile AuthenticationFilter.AuthInfo serverauth, proxyauth; 96 // RedirectHandler 97 volatile int numberOfRedirects = 0; 98 99 /** 100 * MultiExchange with one final response. 101 */ 102 MultiExchange(HttpRequest userRequest, 103 HttpRequestImpl requestImpl, 104 HttpClientImpl client, 105 HttpResponse.BodyHandler<T> responseHandler, 106 AccessControlContext acc) { 107 this.previous = null; 108 this.userRequest = userRequest; 109 this.request = requestImpl; 110 this.currentreq = request; 111 this.client = client; 112 this.filters = client.filterChain(); 113 this.acc = acc; 114 this.executor = client.theExecutor(); 115 this.responseHandler = responseHandler; 116 if (acc != null) { 117 // Restricts the file publisher with the senders ACC, if any 118 if (responseHandler instanceof UntrustedBodyHandler) 119 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc); 120 } 121 this.exchange = new Exchange<>(request, this); 122 this.multiResponseSubscriber = null; 123 this.pushGroup = null; 124 } 125 126 /** 127 * MultiExchange with multiple responses (HTTP/2 server pushes). 128 */ 129 MultiExchange(HttpRequest userRequest, 130 HttpRequestImpl requestImpl, 131 HttpClientImpl client, 132 HttpResponse.MultiSubscriber<U, T> multiResponseSubscriber, 133 AccessControlContext acc) { 134 this.previous = null; 135 this.userRequest = userRequest; 136 this.request = requestImpl; 137 this.currentreq = request; 138 this.client = client; 139 this.filters = client.filterChain(); 140 this.acc = acc; 141 this.executor = client.theExecutor(); 142 this.multiResponseSubscriber = multiResponseSubscriber; 143 this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc); 144 this.exchange = new Exchange<>(request, this); 145 this.responseHandler = pushGroup.mainResponseHandler(); 146 } 147 148 // CompletableFuture<Void> multiCompletionCF() { 149 // return pushGroup.groupResult(); 150 // } 151 152 private synchronized Exchange<T> getExchange() { 153 return exchange; 154 } 155 156 HttpClientImpl client() { 157 return client; 158 } 159 160 // HttpClient.Redirect followRedirects() { 161 // return client.followRedirects(); 162 // } 163 164 HttpClient.Version version() { 165 return request.version().orElse(client.version()); 166 } 167 168 private synchronized void setExchange(Exchange<T> exchange) { 169 if (this.exchange != null && exchange != this.exchange) { 170 this.exchange.released(); 171 } 172 this.exchange = exchange; 173 } 174 175 private void cancelTimer() { 176 if (timedEvent != null) { 177 client.cancelTimer(timedEvent); 178 } 179 } 180 181 private void requestFilters(HttpRequestImpl r) throws IOException { 182 Log.logTrace("Applying request filters"); 183 for (HeaderFilter filter : filters) { 184 Log.logTrace("Applying {0}", filter); 185 filter.request(r, this); 186 } 187 Log.logTrace("All filters applied"); 188 } 189 190 private HttpRequestImpl responseFilters(Response response) throws IOException 191 { 192 Log.logTrace("Applying response filters"); 193 for (HeaderFilter filter : filters) { 194 Log.logTrace("Applying {0}", filter); 195 HttpRequestImpl newreq = filter.response(response); 196 if (newreq != null) { 197 Log.logTrace("New request: stopping filters"); 198 return newreq; 199 } 200 } 201 Log.logTrace("All filters applied"); 202 return null; 203 } 204 205 // public void cancel() { 206 // cancelled = true; 207 // getExchange().cancel(); 208 // } 209 210 public void cancel(IOException cause) { 211 cancelled = true; 212 getExchange().cancel(cause); 213 } 214 215 public CompletableFuture<HttpResponse<T>> responseAsync() { 216 CompletableFuture<Void> start = new MinimalFuture<>(); 217 CompletableFuture<HttpResponse<T>> cf = responseAsync0(start); 218 start.completeAsync( () -> null, executor); // trigger execution 219 return cf; 220 } 221 222 private CompletableFuture<HttpResponse<T>> 223 responseAsync0(CompletableFuture<Void> start) { 224 return start.thenCompose( v -> responseAsyncImpl()) 225 .thenCompose((Response r) -> { 226 Exchange<T> exch = getExchange(); 227 return exch.readBodyAsync(responseHandler) 228 .thenApply((T body) -> { 229 this.response = 230 new HttpResponseImpl<>(userRequest, r, this.response, body, exch); 231 return this.response; 232 }); 233 }); 234 } 235 236 CompletableFuture<U> multiResponseAsync() { 237 CompletableFuture<Void> start = new MinimalFuture<>(); 238 CompletableFuture<HttpResponse<T>> cf = responseAsync0(start); 239 CompletableFuture<HttpResponse<T>> mainResponse = 240 cf.thenApply(b -> { 241 multiResponseSubscriber.onResponse(b); 242 pushGroup.noMorePushes(true); 243 return b; }); 244 pushGroup.setMainResponse(mainResponse); 245 CompletableFuture<U> res = multiResponseSubscriber.completion(pushGroup.groupResult(), 246 pushGroup.pushesCF()); 247 start.completeAsync( () -> null, executor); // trigger execution 248 return res; 249 } 250 251 private CompletableFuture<Response> responseAsyncImpl() { 252 CompletableFuture<Response> cf; 253 if (attempts.incrementAndGet() > max_attempts) { 254 cf = failedFuture(new IOException("Too many retries", retryCause)); 255 } else { 256 if (currentreq.timeout().isPresent()) { 257 timedEvent = new TimedEvent(currentreq.timeout().get()); 258 client.registerTimer(timedEvent); 259 } 260 try { 261 // 1. apply request filters 262 requestFilters(currentreq); 263 } catch (IOException e) { 264 return failedFuture(e); 265 } 266 Exchange<T> exch = getExchange(); 267 // 2. get response 268 cf = exch.responseAsync() 269 .thenCompose((Response response) -> { 270 HttpRequestImpl newrequest; 271 try { 272 // 3. apply response filters 273 newrequest = responseFilters(response); 274 } catch (IOException e) { 275 return failedFuture(e); 276 } 277 // 4. check filter result and repeat or continue 278 if (newrequest == null) { 279 if (attempts.get() > 1) { 280 Log.logError("Succeeded on attempt: " + attempts); 281 } 282 return completedFuture(response); 283 } else { 284 this.response = 285 new HttpResponseImpl<>(currentreq, response, this.response, null, exch); 286 Exchange<T> oldExch = exch; 287 return exch.ignoreBody().handle((r,t) -> { 288 currentreq = newrequest; 289 expiredOnce = false; 290 setExchange(new Exchange<>(currentreq, this, acc)); 291 return responseAsyncImpl(); 292 }).thenCompose(Function.identity()); 293 } }) 294 .handle((response, ex) -> { 295 // 5. handle errors and cancel any timer set 296 cancelTimer(); 297 if (ex == null) { 298 assert response != null; 299 return completedFuture(response); 300 } 301 // all exceptions thrown are handled here 302 CompletableFuture<Response> errorCF = getExceptionalCF(ex); 303 if (errorCF == null) { 304 return responseAsyncImpl(); 305 } else { 306 return errorCF; 307 } }) 308 .thenCompose(Function.identity()); 309 } 310 return cf; 311 } 312 313 /** 314 * Takes a Throwable and returns a suitable CompletableFuture that is 315 * completed exceptionally, or null. 316 */ 317 private CompletableFuture<Response> getExceptionalCF(Throwable t) { 318 if ((t instanceof CompletionException) || (t instanceof ExecutionException)) { 319 if (t.getCause() != null) { 320 t = t.getCause(); 321 } 322 } 323 if (cancelled && t instanceof IOException) { 324 t = new HttpTimeoutException("request timed out"); 325 } else if (t instanceof ConnectionExpiredException) { 326 // allow the retry mechanism to do its work 327 // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? ) 328 if (t.getCause() != null) retryCause = t.getCause(); 329 if (!expiredOnce) { 330 DEBUG_LOGGER.log(Level.DEBUG, 331 "MultiExchange: ConnectionExpiredException (async): retrying...", 332 t); 333 expiredOnce = true; 334 return null; 335 } else { 336 DEBUG_LOGGER.log(Level.DEBUG, 337 "MultiExchange: ConnectionExpiredException (async): already retried once.", 338 t); 339 if (t.getCause() != null) t = t.getCause(); 340 } 341 } 342 return failedFuture(t); 343 } 344 345 class TimedEvent extends TimeoutEvent { 346 TimedEvent(Duration duration) { 347 super(duration); 348 } 349 @Override 350 public void handle() { 351 DEBUG_LOGGER.log(Level.DEBUG, 352 "Cancelling MultiExchange due to timeout for request %s", 353 request); 354 cancel(new HttpTimeoutException("request timed out")); 355 } 356 } 357 }