1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.time.Duration; 30 import java.util.List; 31 import java.security.AccessControlContext; 32 import java.security.AccessController; 33 import java.util.concurrent.CompletableFuture; 34 import java.util.concurrent.CompletionException; 35 import java.util.concurrent.ExecutionException; 36 import java.util.function.BiFunction; 37 import java.util.concurrent.Executor; 38 import java.util.function.UnaryOperator; 39 40 import jdk.incubator.http.internal.common.Log; 41 import jdk.incubator.http.internal.common.MinimalFuture; 42 import jdk.incubator.http.internal.common.Pair; 43 import jdk.incubator.http.internal.common.Utils; 44 import static jdk.incubator.http.internal.common.Pair.pair; 45 46 /** 47 * Encapsulates multiple Exchanges belonging to one HttpRequestImpl. 48 * - manages filters 49 * - retries due to filters. 50 * - I/O errors and most other exceptions get returned directly to user 51 * 52 * Creates a new Exchange for each request/response interaction 53 */ 54 class MultiExchange<U,T> { 55 56 private final HttpRequest userRequest; // the user request 57 private final HttpRequestImpl request; // a copy of the user request 58 final AccessControlContext acc; 59 final HttpClientImpl client; 60 final HttpResponse.BodyHandler<T> responseHandler; 61 final ExecutorWrapper execWrapper; 62 final Executor executor; 63 final HttpResponse.MultiProcessor<U,T> multiResponseHandler; 64 HttpRequestImpl currentreq; // used for async only 65 Exchange<T> exchange; // the current exchange 66 Exchange<T> previous; 67 int attempts; 68 // Maximum number of times a request will be retried/redirected 69 // for any reason 70 71 static final int DEFAULT_MAX_ATTEMPTS = 5; 72 static final int max_attempts = Utils.getIntegerNetProperty( 73 "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS 74 ); 75 76 private final List<HeaderFilter> filters; 77 TimedEvent timedEvent; 78 volatile boolean cancelled; 79 final PushGroup<U,T> pushGroup; 80 81 /** 82 * Filter fields. These are attached as required by filters 83 * and only used by the filter implementations. This could be 84 * generalised into Objects that are passed explicitly to the filters 85 * (one per MultiExchange object, and one per Exchange object possibly) 86 */ 87 volatile AuthenticationFilter.AuthInfo serverauth, proxyauth; 88 // RedirectHandler 89 volatile int numberOfRedirects = 0; 90 91 /** 92 * MultiExchange with one final response. 93 */ 94 MultiExchange(HttpRequest req, 95 HttpClientImpl client, 96 HttpResponse.BodyHandler<T> responseHandler) { 97 this.previous = null; 98 this.userRequest = req; 99 this.request = new HttpRequestImpl(req); 100 this.currentreq = request; 101 this.attempts = 0; 102 this.client = client; 103 this.filters = client.filterChain(); 104 if (System.getSecurityManager() != null) { 105 this.acc = AccessController.getContext(); 106 } else { 107 this.acc = null; 108 } 109 this.execWrapper = new ExecutorWrapper(client.executor(), acc); 110 this.executor = execWrapper.executor(); 111 this.responseHandler = responseHandler; 112 this.exchange = new Exchange<>(request, this); 113 this.multiResponseHandler = null; 114 this.pushGroup = null; 115 } 116 117 /** 118 * MultiExchange with multiple responses (HTTP/2 server pushes). 119 */ 120 MultiExchange(HttpRequest req, 121 HttpClientImpl client, 122 HttpResponse.MultiProcessor<U, T> multiResponseHandler) { 123 this.previous = null; 124 this.userRequest = req; 125 this.request = new HttpRequestImpl(req); 126 this.currentreq = request; 127 this.attempts = 0; 128 this.client = client; 129 this.filters = client.filterChain(); 130 if (System.getSecurityManager() != null) { 131 this.acc = AccessController.getContext(); 132 } else { 133 this.acc = null; 134 } 135 this.execWrapper = new ExecutorWrapper(client.executor(), acc); 136 this.executor = execWrapper.executor(); 137 this.multiResponseHandler = multiResponseHandler; 138 this.pushGroup = new PushGroup<>(multiResponseHandler, request); 139 this.exchange = new Exchange<>(request, this); 140 this.responseHandler = pushGroup.mainResponseHandler(); 141 } 142 143 public HttpResponseImpl<T> response() throws IOException, InterruptedException { 144 HttpRequestImpl r = request; 145 if (r.duration() != null) { 146 timedEvent = new TimedEvent(r.duration()); 147 client.registerTimer(timedEvent); 148 } 149 while (attempts < max_attempts) { 150 try { 151 attempts++; 152 Exchange<T> currExchange = getExchange(); 153 requestFilters(r); 154 Response response = currExchange.response(); 155 HttpRequestImpl newreq = responseFilters(response); 156 if (newreq == null) { 157 if (attempts > 1) { 158 Log.logError("Succeeded on attempt: " + attempts); 159 } 160 T body = currExchange.readBody(responseHandler); 161 cancelTimer(); 162 return new HttpResponseImpl<>(userRequest, response, body, currExchange); 163 } 164 //response.body(HttpResponse.ignoreBody()); 165 setExchange(new Exchange<>(newreq, this, acc)); 166 r = newreq; 167 } catch (IOException e) { 168 if (cancelled) { 169 throw new HttpTimeoutException("Request timed out"); 170 } 171 throw e; 172 } 173 } 174 cancelTimer(); 175 throw new IOException("Retry limit exceeded"); 176 } 177 178 CompletableFuture<Void> multiCompletionCF() { 179 return pushGroup.groupResult(); 180 } 181 182 private synchronized Exchange<T> getExchange() { 183 return exchange; 184 } 185 186 HttpClientImpl client() { 187 return client; 188 } 189 190 HttpClient.Redirect followRedirects() { 191 return client.followRedirects(); 192 } 193 194 HttpClient.Version version() { 195 return request.version().orElse(client.version()); 196 } 197 198 private synchronized void setExchange(Exchange<T> exchange) { 199 this.exchange = exchange; 200 } 201 202 private void cancelTimer() { 203 if (timedEvent != null) { 204 client.cancelTimer(timedEvent); 205 } 206 } 207 208 private void requestFilters(HttpRequestImpl r) throws IOException { 209 Log.logTrace("Applying request filters"); 210 for (HeaderFilter filter : filters) { 211 Log.logTrace("Applying {0}", filter); 212 filter.request(r, this); 213 } 214 Log.logTrace("All filters applied"); 215 } 216 217 private HttpRequestImpl responseFilters(Response response) throws IOException 218 { 219 Log.logTrace("Applying response filters"); 220 for (HeaderFilter filter : filters) { 221 Log.logTrace("Applying {0}", filter); 222 HttpRequestImpl newreq = filter.response(response); 223 if (newreq != null) { 224 Log.logTrace("New request: stopping filters"); 225 return newreq; 226 } 227 } 228 Log.logTrace("All filters applied"); 229 return null; 230 } 231 232 public void cancel() { 233 cancelled = true; 234 getExchange().cancel(); 235 } 236 237 public void cancel(IOException cause) { 238 cancelled = true; 239 getExchange().cancel(cause); 240 } 241 242 public CompletableFuture<HttpResponseImpl<T>> responseAsync() { 243 CompletableFuture<Void> start = new MinimalFuture<>(); 244 CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start); 245 start.completeAsync( () -> null, executor); // trigger execution 246 return cf; 247 } 248 249 private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) { 250 return start.thenCompose( v -> responseAsyncImpl()) 251 .thenCompose((Response r) -> { 252 Exchange<T> exch = getExchange(); 253 return exch.readBodyAsync(responseHandler) 254 .thenApply((T body) -> new HttpResponseImpl<>(userRequest, r, body, exch)); 255 }); 256 } 257 258 CompletableFuture<U> multiResponseAsync() { 259 CompletableFuture<Void> start = new MinimalFuture<>(); 260 CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start); 261 CompletableFuture<HttpResponse<T>> mainResponse = 262 cf.thenApply((HttpResponseImpl<T> b) -> { 263 multiResponseHandler.onResponse(b); 264 return (HttpResponse<T>)b; 265 }); 266 267 pushGroup.setMainResponse(mainResponse); 268 // set up house-keeping related to multi-response 269 mainResponse.thenAccept((r) -> { 270 // All push promises received by now. 271 pushGroup.noMorePushes(true); 272 }); 273 CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF()); 274 start.completeAsync( () -> null, executor); // trigger execution 275 return res; 276 } 277 278 private CompletableFuture<Response> responseAsyncImpl() { 279 CompletableFuture<Response> cf; 280 if (++attempts > max_attempts) { 281 cf = MinimalFuture.failedFuture(new IOException("Too many retries")); 282 } else { 283 if (currentreq.duration() != null) { 284 timedEvent = new TimedEvent(currentreq.duration()); 285 client.registerTimer(timedEvent); 286 } 287 try { 288 // 1. Apply request filters 289 requestFilters(currentreq); 290 } catch (IOException e) { 291 return MinimalFuture.failedFuture(e); 292 } 293 Exchange<T> exch = getExchange(); 294 // 2. get response 295 cf = exch.responseAsync() 296 .thenCompose((Response response) -> { 297 HttpRequestImpl newrequest = null; 298 try { 299 // 3. Apply response filters 300 newrequest = responseFilters(response); 301 } catch (IOException e) { 302 return MinimalFuture.failedFuture(e); 303 } 304 // 4. Check filter result and repeat or continue 305 if (newrequest == null) { 306 if (attempts > 1) { 307 Log.logError("Succeeded on attempt: " + attempts); 308 } 309 return MinimalFuture.completedFuture(response); 310 } else { 311 currentreq = newrequest; 312 setExchange(new Exchange<>(currentreq, this, acc)); 313 //reads body off previous, and then waits for next response 314 return responseAsyncImpl(); 315 } 316 }) 317 // 5. Handle errors and cancel any timer set 318 .handle((response, ex) -> { 319 cancelTimer(); 320 if (ex == null) { 321 assert response != null; 322 return MinimalFuture.completedFuture(response); 323 } 324 // all exceptions thrown are handled here 325 CompletableFuture<Response> error = getExceptionalCF(ex); 326 if (error == null) { 327 return responseAsyncImpl(); 328 } else { 329 return error; 330 } 331 }) 332 .thenCompose(UnaryOperator.identity()); 333 } 334 return cf; 335 } 336 337 /** 338 * Take a Throwable and return a suitable CompletableFuture that is 339 * completed exceptionally. 340 */ 341 private CompletableFuture<Response> getExceptionalCF(Throwable t) { 342 if ((t instanceof CompletionException) || (t instanceof ExecutionException)) { 343 if (t.getCause() != null) { 344 t = t.getCause(); 345 } 346 } 347 if (cancelled && t instanceof IOException) { 348 t = new HttpTimeoutException("request timed out"); 349 } 350 return MinimalFuture.failedFuture(t); 351 } 352 353 class TimedEvent extends TimeoutEvent { 354 TimedEvent(Duration duration) { 355 super(duration); 356 } 357 @Override 358 public void handle() { 359 cancel(new HttpTimeoutException("request timed out")); 360 } 361 } 362 }