1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.net.InetSocketAddress; 31 import java.net.ProxySelector; 32 import java.net.URI; 33 import java.net.URISyntaxException; 34 import java.net.URLPermission; 35 import java.security.AccessControlContext; 36 import java.util.List; 37 import java.util.Map; 38 import java.util.concurrent.CompletableFuture; 39 import java.util.concurrent.Executor; 40 import jdk.incubator.http.internal.common.MinimalFuture; 41 import jdk.incubator.http.internal.common.Utils; 42 import jdk.incubator.http.internal.common.Log; 43 44 import static jdk.incubator.http.internal.common.Utils.permissionForProxy; 45 46 /** 47 * One request/response exchange (handles 100/101 intermediate response also). 48 * depth field used to track number of times a new request is being sent 49 * for a given API request. If limit exceeded exception is thrown. 50 * 51 * Security check is performed here: 52 * - uses AccessControlContext captured at API level 53 * - checks for appropriate URLPermission for request 54 * - if permission allowed, grants equivalent SocketPermission to call 55 * - in case of direct HTTP proxy, checks additionally for access to proxy 56 * (CONNECT proxying uses its own Exchange, so check done there) 57 * 58 */ 59 final class Exchange<T> { 60 61 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 62 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 63 64 final HttpRequestImpl request; 65 final HttpClientImpl client; 66 volatile ExchangeImpl<T> exchImpl; 67 volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF; 68 // used to record possible cancellation raised before the exchImpl 69 // has been established. 70 private volatile IOException failed; 71 final AccessControlContext acc; 72 final MultiExchange<?,T> multi; 73 final Executor parentExecutor; 74 boolean upgrading; // to HTTP/2 75 final PushGroup<?,T> pushGroup; 76 final String dbgTag; 77 78 Exchange(HttpRequestImpl request, MultiExchange<?,T> multi) { 79 this.request = request; 80 this.upgrading = false; 81 this.client = multi.client(); 82 this.multi = multi; 83 this.acc = multi.acc; 84 this.parentExecutor = multi.executor; 85 this.pushGroup = multi.pushGroup; 86 this.dbgTag = "Exchange"; 87 } 88 89 /* If different AccessControlContext to be used */ 90 Exchange(HttpRequestImpl request, 91 MultiExchange<?,T> multi, 92 AccessControlContext acc) 93 { 94 this.request = request; 95 this.acc = acc; 96 this.upgrading = false; 97 this.client = multi.client(); 98 this.multi = multi; 99 this.parentExecutor = multi.executor; 100 this.pushGroup = multi.pushGroup; 101 this.dbgTag = "Exchange"; 102 } 103 104 PushGroup<?,T> getPushGroup() { 105 return pushGroup; 106 } 107 108 Executor executor() { 109 return parentExecutor; 110 } 111 112 public HttpRequestImpl request() { 113 return request; 114 } 115 116 HttpClientImpl client() { 117 return client; 118 } 119 120 121 public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) { 122 // The connection will not be returned to the pool in the case of WebSocket 123 return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor) 124 .whenComplete((r,t) -> exchImpl.completed()); 125 } 126 127 /** 128 * Called after a redirect or similar kind of retry where a body might 129 * be sent but we don't want it. Should send a RESET in h2. For http/1.1 130 * we can consume small quantity of data, or close the connection in 131 * other cases. 132 */ 133 public CompletableFuture<Void> ignoreBody() { 134 return exchImpl.ignoreBody(); 135 } 136 137 /** 138 * Called when a new exchange is created to replace this exchange. 139 * At this point it is guaranteed that readBody/readBodyAsync will 140 * not be called. 141 */ 142 public void released() { 143 ExchangeImpl<?> impl = exchImpl; 144 if (impl != null) impl.released(); 145 // Don't set exchImpl to null here. We need to keep 146 // it alive until it's replaced by a Stream in wrapForUpgrade. 147 // Setting it to null here might get it GC'ed too early, because 148 // the Http1Response is now only weakly referenced by the Selector. 149 } 150 151 public void cancel() { 152 // cancel can be called concurrently before or at the same time 153 // that the exchange impl is being established. 154 // In that case we won't be able to propagate the cancellation 155 // right away 156 if (exchImpl != null) { 157 exchImpl.cancel(); 158 } else { 159 // no impl - can't cancel impl yet. 160 // call cancel(IOException) instead which takes care 161 // of race conditions between impl/cancel. 162 cancel(new IOException("Request cancelled")); 163 } 164 } 165 166 public void cancel(IOException cause) { 167 // If the impl is non null, propagate the exception right away. 168 // Otherwise record it so that it can be propagated once the 169 // exchange impl has been established. 170 ExchangeImpl<?> impl = exchImpl; 171 if (impl != null) { 172 // propagate the exception to the impl 173 debug.log(Level.DEBUG, "Cancelling exchImpl: %s", exchImpl); 174 impl.cancel(cause); 175 } else { 176 // no impl yet. record the exception 177 failed = cause; 178 // now call checkCancelled to recheck the impl. 179 // if the failed state is set and the impl is not null, reset 180 // the failed state and propagate the exception to the impl. 181 checkCancelled(); 182 } 183 } 184 185 // This method will raise an exception if one was reported and if 186 // it is possible to do so. If the exception can be raised, then 187 // the failed state will be reset. Otherwise, the failed state 188 // will persist until the exception can be raised and the failed state 189 // can be cleared. 190 // Takes care of possible race conditions. 191 private void checkCancelled() { 192 ExchangeImpl<?> impl = null; 193 IOException cause = null; 194 CompletableFuture<? extends ExchangeImpl<T>> cf = null; 195 if (failed != null) { 196 synchronized(this) { 197 cause = failed; 198 impl = exchImpl; 199 cf = exchangeCF; 200 } 201 } 202 if (cause == null) return; 203 if (impl != null) { 204 // The exception is raised by propagating it to the impl. 205 debug.log(Level.DEBUG, "Cancelling exchImpl: %s", impl); 206 impl.cancel(cause); 207 failed = null; 208 } else { 209 Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set." 210 + "\n\tCan''t cancel yet with {2}", 211 request.uri(), 212 request.timeout().isPresent() ? 213 // calling duration.toMillis() can throw an exception. 214 // this is just debugging, we don't care if it overflows. 215 (request.timeout().get().getSeconds() * 1000 216 + request.timeout().get().getNano() / 1000000) : -1, 217 cause); 218 if (cf != null) cf.completeExceptionally(cause); 219 } 220 } 221 222 public void h2Upgrade() { 223 upgrading = true; 224 request.setH2Upgrade(client.client2()); 225 } 226 227 synchronized IOException getCancelCause() { 228 return failed; 229 } 230 231 // get/set the exchange impl, solving race condition issues with 232 // potential concurrent calls to cancel() or cancel(IOException) 233 private CompletableFuture<? extends ExchangeImpl<T>> 234 establishExchange(HttpConnection connection) { 235 if (debug.isLoggable(Level.DEBUG)) { 236 debug.log(Level.DEBUG, 237 "establishing exchange for %s,%n\t proxy=%s", 238 request, 239 request.proxy()); 240 } 241 // check if we have been cancelled first. 242 Throwable t = getCancelCause(); 243 checkCancelled(); 244 if (t != null) { 245 return MinimalFuture.failedFuture(t); 246 } 247 248 CompletableFuture<? extends ExchangeImpl<T>> cf, res; 249 cf = ExchangeImpl.get(this, connection); 250 // We should probably use a VarHandle to get/set exchangeCF 251 // instead - as we need CAS semantics. 252 synchronized (this) { exchangeCF = cf; }; 253 res = cf.whenComplete((r,x) -> { 254 synchronized(Exchange.this) { 255 if (exchangeCF == cf) exchangeCF = null; 256 } 257 }); 258 checkCancelled(); 259 return res.thenCompose((eimpl) -> { 260 // recheck for cancelled, in case of race conditions 261 exchImpl = eimpl; 262 IOException tt = getCancelCause(); 263 checkCancelled(); 264 if (tt != null) { 265 return MinimalFuture.failedFuture(tt); 266 } else { 267 // Now we're good to go. Because exchImpl is no longer 268 // null cancel() will be able to propagate directly to 269 // the impl after this point ( if needed ). 270 return MinimalFuture.completedFuture(eimpl); 271 } }); 272 } 273 274 // Completed HttpResponse will be null if response succeeded 275 // will be a non null responseAsync if expect continue returns an error 276 277 public CompletableFuture<Response> responseAsync() { 278 return responseAsyncImpl(null); 279 } 280 281 CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) { 282 SecurityException e = checkPermissions(); 283 if (e != null) { 284 return MinimalFuture.failedFuture(e); 285 } else { 286 return responseAsyncImpl0(connection); 287 } 288 } 289 290 CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) { 291 if (request.expectContinue()) { 292 request.addSystemHeader("Expect", "100-Continue"); 293 Log.logTrace("Sending Expect: 100-Continue"); 294 return establishExchange(connection) 295 .thenCompose((ex) -> ex.sendHeadersAsync()) 296 .thenCompose(v -> exchImpl.getResponseAsync(parentExecutor)) 297 .thenCompose((Response r1) -> { 298 Log.logResponse(r1::toString); 299 int rcode = r1.statusCode(); 300 if (rcode == 100) { 301 Log.logTrace("Received 100-Continue: sending body"); 302 CompletableFuture<Response> cf = 303 exchImpl.sendBodyAsync() 304 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor)); 305 cf = wrapForUpgrade(cf); 306 cf = wrapForLog(cf); 307 return cf; 308 } else { 309 Log.logTrace("Expectation failed: Received {0}", 310 rcode); 311 if (upgrading && rcode == 101) { 312 IOException failed = new IOException( 313 "Unable to handle 101 while waiting for 100"); 314 return MinimalFuture.failedFuture(failed); 315 } 316 return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor) 317 .thenApply(v -> r1); 318 } 319 }); 320 } else { 321 CompletableFuture<Response> cf = establishExchange(connection) 322 .thenCompose((ex) -> ex.sendHeadersAsync()) 323 .thenCompose(ExchangeImpl::sendBodyAsync) 324 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor)); 325 cf = wrapForUpgrade(cf); 326 cf = wrapForLog(cf); 327 return cf; 328 } 329 } 330 331 private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) { 332 if (upgrading) { 333 return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl)); 334 } 335 return cf; 336 } 337 338 private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) { 339 if (Log.requests()) { 340 return cf.thenApply(response -> { 341 Log.logResponse(response::toString); 342 return response; 343 }); 344 } 345 return cf; 346 } 347 348 HttpResponse.BodySubscriber<T> ignoreBody(int status, HttpHeaders hdrs) { 349 return HttpResponse.BodySubscriber.discard((T)null); 350 } 351 352 // if this response was received in reply to an upgrade 353 // then create the Http2Connection from the HttpConnection 354 // initialize it and wait for the real response on a newly created Stream 355 356 private CompletableFuture<Response> 357 checkForUpgradeAsync(Response resp, 358 ExchangeImpl<T> ex) { 359 360 int rcode = resp.statusCode(); 361 if (upgrading && (rcode == 101)) { 362 Http1Exchange<T> e = (Http1Exchange<T>)ex; 363 // check for 101 switching protocols 364 // 101 responses are not supposed to contain a body. 365 // => should we fail if there is one? 366 debug.log(Level.DEBUG, "Upgrading async %s" + e.connection()); 367 return e.readBodyAsync(this::ignoreBody, false, parentExecutor) 368 .thenCompose((T v) -> {// v is null 369 debug.log(Level.DEBUG, "Ignored body"); 370 // we pass e::getBuffer to allow the ByteBuffers to accumulate 371 // while we build the Http2Connection 372 return Http2Connection.createAsync(e.connection(), 373 client.client2(), 374 this, e::drainLeftOverBytes) 375 .thenCompose((Http2Connection c) -> { 376 c.putConnection(); 377 Stream<T> s = c.getStream(1); 378 if (s == null) { 379 // s can be null if an exception occurred 380 // asynchronously while sending the preface. 381 Throwable t = c.getRecordedCause(); 382 if (t != null) { 383 return MinimalFuture.failedFuture( 384 new IOException("Can't get stream 1: " + t, t)); 385 } 386 } 387 exchImpl.released(); 388 Throwable t; 389 // There's a race condition window where an external 390 // thread (SelectorManager) might complete the 391 // exchange in timeout at the same time where we're 392 // trying to switch the exchange impl. 393 // 'failed' will be reset to null after 394 // exchImpl.cancel() has completed, so either we 395 // will observe failed != null here, or we will 396 // observe e.getCancelCause() != null, or the 397 // timeout exception will be routed to 's'. 398 // Either way, we need to relay it to s. 399 synchronized (this) { 400 exchImpl = s; 401 t = failed; 402 } 403 // Check whether the HTTP/1.1 was cancelled. 404 if (t == null) t = e.getCancelCause(); 405 // if HTTP/1.1 exchange was timed out, don't 406 // try to go further. 407 if (t instanceof HttpTimeoutException) { 408 s.cancelImpl(t); 409 return MinimalFuture.failedFuture(t); 410 } 411 debug.log(Level.DEBUG, "Getting response async %s" + s); 412 return s.getResponseAsync(null); 413 });} 414 ); 415 } 416 return MinimalFuture.completedFuture(resp); 417 } 418 419 private URI getURIForSecurityCheck() { 420 URI u; 421 String method = request.method(); 422 InetSocketAddress authority = request.authority(); 423 URI uri = request.uri(); 424 425 // CONNECT should be restricted at API level 426 if (method.equalsIgnoreCase("CONNECT")) { 427 try { 428 u = new URI("socket", 429 null, 430 authority.getHostString(), 431 authority.getPort(), 432 null, 433 null, 434 null); 435 } catch (URISyntaxException e) { 436 throw new InternalError(e); // shouldn't happen 437 } 438 } else { 439 u = uri; 440 } 441 return u; 442 } 443 444 /** 445 * Returns the security permission required for the given details. 446 * If method is CONNECT, then uri must be of form "scheme://host:port" 447 */ 448 private static URLPermission permissionForServer(URI uri, 449 String method, 450 Map<String, List<String>> headers) { 451 if (method.equals("CONNECT")) { 452 return new URLPermission(uri.toString(), "CONNECT"); 453 } else { 454 return Utils.permissionForServer(uri, method, headers.keySet().stream()); 455 } 456 } 457 458 /** 459 * Performs the necessary security permission checks required to retrieve 460 * the response. Returns a security exception representing the denied 461 * permission, or null if all checks pass or there is no security manager. 462 */ 463 private SecurityException checkPermissions() { 464 String method = request.method(); 465 SecurityManager sm = System.getSecurityManager(); 466 if (sm == null || method.equals("CONNECT")) { 467 // tunneling will have a null acc, which is fine. The proxy 468 // permission check will have already been preformed. 469 return null; 470 } 471 472 HttpHeaders userHeaders = request.getUserHeaders(); 473 URI u = getURIForSecurityCheck(); 474 URLPermission p = permissionForServer(u, method, userHeaders.map()); 475 476 try { 477 assert acc != null; 478 sm.checkPermission(p, acc); 479 } catch (SecurityException e) { 480 return e; 481 } 482 ProxySelector ps = client.proxySelector(); 483 if (ps != null) { 484 if (!method.equals("CONNECT")) { 485 // a non-tunneling HTTP proxy. Need to check access 486 URLPermission proxyPerm = permissionForProxy(request.proxy()); 487 if (proxyPerm != null) { 488 try { 489 sm.checkPermission(proxyPerm, acc); 490 } catch (SecurityException e) { 491 return e; 492 } 493 } 494 } 495 } 496 return null; 497 } 498 499 HttpClient.Version version() { 500 return multi.version(); 501 } 502 503 String dbgString() { 504 return dbgTag; 505 } 506 }