1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.InetSocketAddress;
  31 import java.net.ProxySelector;
  32 import java.net.URI;
  33 import java.net.URISyntaxException;
  34 import java.net.URLPermission;
  35 import java.security.AccessControlContext;
  36 import java.util.List;
  37 import java.util.Map;
  38 import java.util.concurrent.CompletableFuture;
  39 import java.util.concurrent.Executor;
  40 import java.util.function.Function;
  41 import java.net.http.HttpClient;
  42 import java.net.http.HttpHeaders;
  43 import java.net.http.HttpResponse;
  44 import java.net.http.HttpTimeoutException;
  45 
  46 import jdk.internal.net.http.common.Logger;
  47 import jdk.internal.net.http.common.MinimalFuture;
  48 import jdk.internal.net.http.common.Utils;
  49 import jdk.internal.net.http.common.Log;
  50 
  51 import static jdk.internal.net.http.common.Utils.permissionForProxy;
  52 
  53 /**
  54  * One request/response exchange (handles 100/101 intermediate response also).
  55  * depth field used to track number of times a new request is being sent
  56  * for a given API request. If limit exceeded exception is thrown.
  57  *
  58  * Security check is performed here:
  59  * - uses AccessControlContext captured at API level
  60  * - checks for appropriate URLPermission for request
  61  * - if permission allowed, grants equivalent SocketPermission to call
  62  * - in case of direct HTTP proxy, checks additionally for access to proxy
  63  *    (CONNECT proxying uses its own Exchange, so check done there)
  64  *
  65  */
  66 final class Exchange<T> {
  67 
  68     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  69 
  70     final HttpRequestImpl request;
  71     final HttpClientImpl client;
  72     volatile ExchangeImpl<T> exchImpl;
  73     volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
  74     volatile CompletableFuture<Void> bodyIgnored;
  75 
  76     // used to record possible cancellation raised before the exchImpl
  77     // has been established.
  78     private volatile IOException failed;
  79     final AccessControlContext acc;
  80     final MultiExchange<T> multi;
  81     final Executor parentExecutor;
  82     boolean upgrading; // to HTTP/2
  83     final PushGroup<T> pushGroup;
  84     final String dbgTag;
  85 
  86     Exchange(HttpRequestImpl request, MultiExchange<T> multi) {
  87         this.request = request;
  88         this.upgrading = false;
  89         this.client = multi.client();
  90         this.multi = multi;
  91         this.acc = multi.acc;
  92         this.parentExecutor = multi.executor;
  93         this.pushGroup = multi.pushGroup;
  94         this.dbgTag = "Exchange";
  95     }
  96 
  97     /* If different AccessControlContext to be used  */
  98     Exchange(HttpRequestImpl request,
  99              MultiExchange<T> multi,
 100              AccessControlContext acc)
 101     {
 102         this.request = request;
 103         this.acc = acc;
 104         this.upgrading = false;
 105         this.client = multi.client();
 106         this.multi = multi;
 107         this.parentExecutor = multi.executor;
 108         this.pushGroup = multi.pushGroup;
 109         this.dbgTag = "Exchange";
 110     }
 111 
 112     PushGroup<T> getPushGroup() {
 113         return pushGroup;
 114     }
 115 
 116     Executor executor() {
 117         return parentExecutor;
 118     }
 119 
 120     public HttpRequestImpl request() {
 121         return request;
 122     }
 123 
 124     HttpClientImpl client() {
 125         return client;
 126     }
 127 
 128 
 129     public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
 130         // If we received a 407 while establishing the exchange
 131         // there will be no body to read: bodyIgnored will be true,
 132         // and exchImpl will be null (if we were trying to establish
 133         // an HTTP/2 tunnel through an HTTP/1.1 proxy)
 134         if (bodyIgnored != null) return MinimalFuture.completedFuture(null);
 135 
 136         // The connection will not be returned to the pool in the case of WebSocket
 137         return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor)
 138                 .whenComplete((r,t) -> exchImpl.completed());
 139     }
 140 
 141     /**
 142      * Called after a redirect or similar kind of retry where a body might
 143      * be sent but we don't want it. Should send a RESET in h2. For http/1.1
 144      * we can consume small quantity of data, or close the connection in
 145      * other cases.
 146      */
 147     public CompletableFuture<Void> ignoreBody() {
 148         if (bodyIgnored != null) return bodyIgnored;
 149         return exchImpl.ignoreBody();
 150     }
 151 
 152     /**
 153      * Called when a new exchange is created to replace this exchange.
 154      * At this point it is guaranteed that readBody/readBodyAsync will
 155      * not be called.
 156      */
 157     public void released() {
 158         ExchangeImpl<?> impl = exchImpl;
 159         if (impl != null) impl.released();
 160         // Don't set exchImpl to null here. We need to keep
 161         // it alive until it's replaced by a Stream in wrapForUpgrade.
 162         // Setting it to null here might get it GC'ed too early, because
 163         // the Http1Response is now only weakly referenced by the Selector.
 164     }
 165 
 166     public void cancel() {
 167         // cancel can be called concurrently before or at the same time
 168         // that the exchange impl is being established.
 169         // In that case we won't be able to propagate the cancellation
 170         // right away
 171         if (exchImpl != null) {
 172             exchImpl.cancel();
 173         } else {
 174             // no impl - can't cancel impl yet.
 175             // call cancel(IOException) instead which takes care
 176             // of race conditions between impl/cancel.
 177             cancel(new IOException("Request cancelled"));
 178         }
 179     }
 180 
 181     public void cancel(IOException cause) {
 182         // If the impl is non null, propagate the exception right away.
 183         // Otherwise record it so that it can be propagated once the
 184         // exchange impl has been established.
 185         ExchangeImpl<?> impl = exchImpl;
 186         if (impl != null) {
 187             // propagate the exception to the impl
 188             if (debug.on()) debug.log("Cancelling exchImpl: %s", exchImpl);
 189             impl.cancel(cause);
 190         } else {
 191             // no impl yet. record the exception
 192             failed = cause;
 193             // now call checkCancelled to recheck the impl.
 194             // if the failed state is set and the impl is not null, reset
 195             // the failed state and propagate the exception to the impl.
 196             checkCancelled();
 197         }
 198     }
 199 
 200     // This method will raise an exception if one was reported and if
 201     // it is possible to do so. If the exception can be raised, then
 202     // the failed state will be reset. Otherwise, the failed state
 203     // will persist until the exception can be raised and the failed state
 204     // can be cleared.
 205     // Takes care of possible race conditions.
 206     private void checkCancelled() {
 207         ExchangeImpl<?> impl = null;
 208         IOException cause = null;
 209         CompletableFuture<? extends ExchangeImpl<T>> cf = null;
 210         if (failed != null) {
 211             synchronized(this) {
 212                 cause = failed;
 213                 impl = exchImpl;
 214                 cf = exchangeCF;
 215             }
 216         }
 217         if (cause == null) return;
 218         if (impl != null) {
 219             // The exception is raised by propagating it to the impl.
 220             if (debug.on()) debug.log("Cancelling exchImpl: %s", impl);
 221             impl.cancel(cause);
 222             failed = null;
 223         } else {
 224             Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set."
 225                          + "\n\tCan''t cancel yet with {2}",
 226                          request.uri(),
 227                          request.timeout().isPresent() ?
 228                          // calling duration.toMillis() can throw an exception.
 229                          // this is just debugging, we don't care if it overflows.
 230                          (request.timeout().get().getSeconds() * 1000
 231                           + request.timeout().get().getNano() / 1000000) : -1,
 232                          cause);
 233             if (cf != null) cf.completeExceptionally(cause);
 234         }
 235     }
 236 
 237     public void h2Upgrade() {
 238         upgrading = true;
 239         request.setH2Upgrade(client.client2());
 240     }
 241 
 242     synchronized IOException getCancelCause() {
 243         return failed;
 244     }
 245 
 246     // get/set the exchange impl, solving race condition issues with
 247     // potential concurrent calls to cancel() or cancel(IOException)
 248     private CompletableFuture<? extends ExchangeImpl<T>>
 249     establishExchange(HttpConnection connection) {
 250         if (debug.on()) {
 251             debug.log("establishing exchange for %s,%n\t proxy=%s",
 252                       request, request.proxy());
 253         }
 254         // check if we have been cancelled first.
 255         Throwable t = getCancelCause();
 256         checkCancelled();
 257         if (t != null) {
 258             return MinimalFuture.failedFuture(t);
 259         }
 260 
 261         CompletableFuture<? extends ExchangeImpl<T>> cf, res;
 262         cf = ExchangeImpl.get(this, connection);
 263         // We should probably use a VarHandle to get/set exchangeCF
 264         // instead - as we need CAS semantics.
 265         synchronized (this) { exchangeCF = cf; };
 266         res = cf.whenComplete((r,x) -> {
 267             synchronized(Exchange.this) {
 268                 if (exchangeCF == cf) exchangeCF = null;
 269             }
 270         });
 271         checkCancelled();
 272         return res.thenCompose((eimpl) -> {
 273                     // recheck for cancelled, in case of race conditions
 274                     exchImpl = eimpl;
 275                     IOException tt = getCancelCause();
 276                     checkCancelled();
 277                     if (tt != null) {
 278                         return MinimalFuture.failedFuture(tt);
 279                     } else {
 280                         // Now we're good to go. Because exchImpl is no longer
 281                         // null cancel() will be able to propagate directly to
 282                         // the impl after this point ( if needed ).
 283                         return MinimalFuture.completedFuture(eimpl);
 284                     } });
 285     }
 286 
 287     // Completed HttpResponse will be null if response succeeded
 288     // will be a non null responseAsync if expect continue returns an error
 289 
 290     public CompletableFuture<Response> responseAsync() {
 291         return responseAsyncImpl(null);
 292     }
 293 
 294     CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
 295         SecurityException e = checkPermissions();
 296         if (e != null) {
 297             return MinimalFuture.failedFuture(e);
 298         } else {
 299             return responseAsyncImpl0(connection);
 300         }
 301     }
 302 
 303     // check whether the headersSentCF was completed exceptionally with
 304     // ProxyAuthorizationRequired. If so the Response embedded in the
 305     // exception is returned. Otherwise we proceed.
 306     private CompletableFuture<Response> checkFor407(ExchangeImpl<T> ex, Throwable t,
 307                                                     Function<ExchangeImpl<T>,CompletableFuture<Response>> andThen) {
 308         t = Utils.getCompletionCause(t);
 309         if (t instanceof ProxyAuthenticationRequired) {
 310             if (debug.on()) debug.log("checkFor407: ProxyAuthenticationRequired: building synthetic response");
 311             bodyIgnored = MinimalFuture.completedFuture(null);
 312             Response proxyResponse = ((ProxyAuthenticationRequired)t).proxyResponse;
 313             HttpConnection c = ex == null ? null : ex.connection();
 314             Response syntheticResponse = new Response(request, this,
 315                     proxyResponse.headers, c, proxyResponse.statusCode,
 316                     proxyResponse.version, true);
 317             return MinimalFuture.completedFuture(syntheticResponse);
 318         } else if (t != null) {
 319             if (debug.on()) debug.log("checkFor407: no response - %s", (Object)t);
 320             return MinimalFuture.failedFuture(t);
 321         } else {
 322             if (debug.on()) debug.log("checkFor407: all clear");
 323             return andThen.apply(ex);
 324         }
 325     }
 326 
 327     // After sending the request headers, if no ProxyAuthorizationRequired
 328     // was raised and the expectContinue flag is on, we need to wait
 329     // for the 100-Continue response
 330     private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
 331         assert request.expectContinue();
 332         return ex.getResponseAsync(parentExecutor)
 333                 .thenCompose((Response r1) -> {
 334             Log.logResponse(r1::toString);
 335             int rcode = r1.statusCode();
 336             if (rcode == 100) {
 337                 Log.logTrace("Received 100-Continue: sending body");
 338                 if (debug.on()) debug.log("Received 100-Continue for %s", r1);
 339                 CompletableFuture<Response> cf =
 340                         exchImpl.sendBodyAsync()
 341                                 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
 342                 cf = wrapForUpgrade(cf);
 343                 cf = wrapForLog(cf);
 344                 return cf;
 345             } else {
 346                 Log.logTrace("Expectation failed: Received {0}",
 347                         rcode);
 348                 if (debug.on()) debug.log("Expect-Continue failed (%d) for: %s", rcode, r1);
 349                 if (upgrading && rcode == 101) {
 350                     IOException failed = new IOException(
 351                             "Unable to handle 101 while waiting for 100");
 352                     return MinimalFuture.failedFuture(failed);
 353                 }
 354                 return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
 355                         .thenApply(v ->  r1);
 356             }
 357         });
 358     }
 359 
 360     // After sending the request headers, if no ProxyAuthorizationRequired
 361     // was raised and the expectContinue flag is off, we can immediately
 362     // send the request body and proceed.
 363     private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {
 364         assert !request.expectContinue();
 365         if (debug.on()) debug.log("sendRequestBody");
 366         CompletableFuture<Response> cf = ex.sendBodyAsync()
 367                 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
 368         cf = wrapForUpgrade(cf);
 369         cf = wrapForLog(cf);
 370         return cf;
 371     }
 372 
 373     CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
 374         Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
 375         bodyIgnored = null;
 376         if (request.expectContinue()) {
 377             request.addSystemHeader("Expect", "100-Continue");
 378             Log.logTrace("Sending Expect: 100-Continue");
 379             // wait for 100-Continue before sending body
 380             after407Check = this::expectContinue;
 381         } else {
 382             // send request body and proceed.
 383             after407Check = this::sendRequestBody;
 384         }
 385         // The ProxyAuthorizationRequired can be triggered either by
 386         // establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
 387         // or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
 388         // Therefore we handle it with a call to this checkFor407(...) after these
 389         // two places.
 390         Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
 391                 (ex) -> ex.sendHeadersAsync()
 392                         .handle((r,t) -> this.checkFor407(r, t, after407Check))
 393                         .thenCompose(Function.identity());
 394         return establishExchange(connection)
 395                 .handle((r,t) -> this.checkFor407(r,t, afterExch407Check))
 396                 .thenCompose(Function.identity());
 397     }
 398 
 399     private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
 400         if (upgrading) {
 401             return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
 402         }
 403         return cf;
 404     }
 405 
 406     private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
 407         if (Log.requests()) {
 408             return cf.thenApply(response -> {
 409                 Log.logResponse(response::toString);
 410                 return response;
 411             });
 412         }
 413         return cf;
 414     }
 415 
 416     HttpResponse.BodySubscriber<T> ignoreBody(HttpResponse.ResponseInfo hdrs) {
 417         return HttpResponse.BodySubscribers.replacing(null);
 418     }
 419 
 420     // if this response was received in reply to an upgrade
 421     // then create the Http2Connection from the HttpConnection
 422     // initialize it and wait for the real response on a newly created Stream
 423 
 424     private CompletableFuture<Response>
 425     checkForUpgradeAsync(Response resp,
 426                          ExchangeImpl<T> ex) {
 427 
 428         int rcode = resp.statusCode();
 429         if (upgrading && (rcode == 101)) {
 430             Http1Exchange<T> e = (Http1Exchange<T>)ex;
 431             // check for 101 switching protocols
 432             // 101 responses are not supposed to contain a body.
 433             //    => should we fail if there is one?
 434             if (debug.on()) debug.log("Upgrading async %s", e.connection());
 435             return e.readBodyAsync(this::ignoreBody, false, parentExecutor)
 436                 .thenCompose((T v) -> {// v is null
 437                     debug.log("Ignored body");
 438                     // we pass e::getBuffer to allow the ByteBuffers to accumulate
 439                     // while we build the Http2Connection
 440                     return Http2Connection.createAsync(e.connection(),
 441                                                  client.client2(),
 442                                                  this, e::drainLeftOverBytes)
 443                         .thenCompose((Http2Connection c) -> {
 444                             boolean cached = c.offerConnection();
 445                             Stream<T> s = c.getStream(1);
 446 
 447                             if (s == null) {
 448                                 // s can be null if an exception occurred
 449                                 // asynchronously while sending the preface.
 450                                 Throwable t = c.getRecordedCause();
 451                                 IOException ioe;
 452                                 if (t != null) {
 453                                     if (!cached)
 454                                         c.close();
 455                                     ioe = new IOException("Can't get stream 1: " + t, t);
 456                                 } else {
 457                                     ioe = new IOException("Can't get stream 1");
 458                                 }
 459                                 return MinimalFuture.failedFuture(ioe);
 460                             }
 461                             exchImpl.released();
 462                             Throwable t;
 463                             // There's a race condition window where an external
 464                             // thread (SelectorManager) might complete the
 465                             // exchange in timeout at the same time where we're
 466                             // trying to switch the exchange impl.
 467                             // 'failed' will be reset to null after
 468                             // exchImpl.cancel() has completed, so either we
 469                             // will observe failed != null here, or we will
 470                             // observe e.getCancelCause() != null, or the
 471                             // timeout exception will be routed to 's'.
 472                             // Either way, we need to relay it to s.
 473                             synchronized (this) {
 474                                 exchImpl = s;
 475                                 t = failed;
 476                             }
 477                             // Check whether the HTTP/1.1 was cancelled.
 478                             if (t == null) t = e.getCancelCause();
 479                             // if HTTP/1.1 exchange was timed out, don't
 480                             // try to go further.
 481                             if (t instanceof HttpTimeoutException) {
 482                                  s.cancelImpl(t);
 483                                  return MinimalFuture.failedFuture(t);
 484                             }
 485                             if (debug.on())
 486                                 debug.log("Getting response async %s", s);
 487                             return s.getResponseAsync(null);
 488                         });}
 489                 );
 490         }
 491         return MinimalFuture.completedFuture(resp);
 492     }
 493 
 494     private URI getURIForSecurityCheck() {
 495         URI u;
 496         String method = request.method();
 497         InetSocketAddress authority = request.authority();
 498         URI uri = request.uri();
 499 
 500         // CONNECT should be restricted at API level
 501         if (method.equalsIgnoreCase("CONNECT")) {
 502             try {
 503                 u = new URI("socket",
 504                              null,
 505                              authority.getHostString(),
 506                              authority.getPort(),
 507                              null,
 508                              null,
 509                              null);
 510             } catch (URISyntaxException e) {
 511                 throw new InternalError(e); // shouldn't happen
 512             }
 513         } else {
 514             u = uri;
 515         }
 516         return u;
 517     }
 518 
 519     /**
 520      * Returns the security permission required for the given details.
 521      * If method is CONNECT, then uri must be of form "scheme://host:port"
 522      */
 523     private static URLPermission permissionForServer(URI uri,
 524                                                      String method,
 525                                                      Map<String, List<String>> headers) {
 526         if (method.equals("CONNECT")) {
 527             return new URLPermission(uri.toString(), "CONNECT");
 528         } else {
 529             return Utils.permissionForServer(uri, method, headers.keySet().stream());
 530         }
 531     }
 532 
 533     /**
 534      * Performs the necessary security permission checks required to retrieve
 535      * the response. Returns a security exception representing the denied
 536      * permission, or null if all checks pass or there is no security manager.
 537      */
 538     private SecurityException checkPermissions() {
 539         String method = request.method();
 540         SecurityManager sm = System.getSecurityManager();
 541         if (sm == null || method.equals("CONNECT")) {
 542             // tunneling will have a null acc, which is fine. The proxy
 543             // permission check will have already been preformed.
 544             return null;
 545         }
 546 
 547         HttpHeaders userHeaders = request.getUserHeaders();
 548         URI u = getURIForSecurityCheck();
 549         URLPermission p = permissionForServer(u, method, userHeaders.map());
 550 
 551         try {
 552             assert acc != null;
 553             sm.checkPermission(p, acc);
 554         } catch (SecurityException e) {
 555             return e;
 556         }
 557         ProxySelector ps = client.proxySelector();
 558         if (ps != null) {
 559             if (!method.equals("CONNECT")) {
 560                 // a non-tunneling HTTP proxy. Need to check access
 561                 URLPermission proxyPerm = permissionForProxy(request.proxy());
 562                 if (proxyPerm != null) {
 563                     try {
 564                         sm.checkPermission(proxyPerm, acc);
 565                     } catch (SecurityException e) {
 566                         return e;
 567                     }
 568                 }
 569             }
 570         }
 571         return null;
 572     }
 573 
 574     HttpClient.Version version() {
 575         return multi.version();
 576     }
 577 
 578     String dbgString() {
 579         return dbgTag;
 580     }
 581 }