1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.InetSocketAddress;
  31 import java.net.ProxySelector;
  32 import java.net.URI;
  33 import java.net.URISyntaxException;
  34 import java.net.URLPermission;
  35 import java.security.AccessControlContext;
  36 import java.util.List;
  37 import java.util.Map;
  38 import java.util.concurrent.CompletableFuture;
  39 import java.util.concurrent.Executor;
  40 import java.util.function.Function;
  41 import java.net.http.HttpClient;
  42 import java.net.http.HttpHeaders;
  43 import java.net.http.HttpResponse;
  44 import java.net.http.HttpTimeoutException;
  45 
  46 import jdk.internal.net.http.common.Logger;
  47 import jdk.internal.net.http.common.MinimalFuture;
  48 import jdk.internal.net.http.common.Utils;
  49 import jdk.internal.net.http.common.Log;
  50 
  51 import static jdk.internal.net.http.common.Utils.permissionForProxy;
  52 
  53 /**
  54  * One request/response exchange (handles 100/101 intermediate response also).
  55  * depth field used to track number of times a new request is being sent
  56  * for a given API request. If limit exceeded exception is thrown.
  57  *
  58  * Security check is performed here:
  59  * - uses AccessControlContext captured at API level
  60  * - checks for appropriate URLPermission for request
  61  * - if permission allowed, grants equivalent SocketPermission to call
  62  * - in case of direct HTTP proxy, checks additionally for access to proxy
  63  *    (CONNECT proxying uses its own Exchange, so check done there)
  64  *
  65  */
  66 final class Exchange<T> {
  67 
  68     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  69 
  70     final HttpRequestImpl request;
  71     final HttpClientImpl client;
  72     volatile ExchangeImpl<T> exchImpl;
  73     volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
  74     volatile CompletableFuture<Void> bodyIgnored;
  75 
  76     // used to record possible cancellation raised before the exchImpl
  77     // has been established.
  78     private volatile IOException failed;
  79     final AccessControlContext acc;
  80     final MultiExchange<T> multi;
  81     final Executor parentExecutor;
  82     boolean upgrading; // to HTTP/2
  83     final PushGroup<T> pushGroup;
  84     final String dbgTag;
  85 
  86     // Keeps track of the underlying connection when establishing an HTTP/2
  87     // exchange so that it can be aborted/timed out mid setup.
  88     final ConnectionAborter connectionAborter = new ConnectionAborter();
  89 
  90     Exchange(HttpRequestImpl request, MultiExchange<T> multi) {
  91         this.request = request;
  92         this.upgrading = false;
  93         this.client = multi.client();
  94         this.multi = multi;
  95         this.acc = multi.acc;
  96         this.parentExecutor = multi.executor;
  97         this.pushGroup = multi.pushGroup;
  98         this.dbgTag = "Exchange";
  99     }
 100 
 101     /* If different AccessControlContext to be used  */
 102     Exchange(HttpRequestImpl request,
 103              MultiExchange<T> multi,
 104              AccessControlContext acc)
 105     {
 106         this.request = request;
 107         this.acc = acc;
 108         this.upgrading = false;
 109         this.client = multi.client();
 110         this.multi = multi;
 111         this.parentExecutor = multi.executor;
 112         this.pushGroup = multi.pushGroup;
 113         this.dbgTag = "Exchange";
 114     }
 115 
 116     PushGroup<T> getPushGroup() {
 117         return pushGroup;
 118     }
 119 
 120     Executor executor() {
 121         return parentExecutor;
 122     }
 123 
 124     public HttpRequestImpl request() {
 125         return request;
 126     }
 127 
 128     HttpClientImpl client() {
 129         return client;
 130     }
 131 
 132     // Keeps track of the underlying connection when establishing an HTTP/2
 133     // exchange so that it can be aborted/timed out mid setup.
 134     static final class ConnectionAborter {
 135         private volatile HttpConnection connection;
 136 
 137         void connection(HttpConnection connection) {
 138             this.connection = connection;
 139         }
 140 
 141         void closeConnection() {
 142             HttpConnection connection = this.connection;
 143             this.connection = null;
 144             if (connection != null) {
 145                 try {
 146                     connection.close();
 147                 } catch (Throwable t) {
 148                     // ignore
 149                 }
 150             }
 151         }
 152     }
 153 
 154     public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
 155         // If we received a 407 while establishing the exchange
 156         // there will be no body to read: bodyIgnored will be true,
 157         // and exchImpl will be null (if we were trying to establish
 158         // an HTTP/2 tunnel through an HTTP/1.1 proxy)
 159         if (bodyIgnored != null) return MinimalFuture.completedFuture(null);
 160 
 161         // The connection will not be returned to the pool in the case of WebSocket
 162         return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor)
 163                 .whenComplete((r,t) -> exchImpl.completed());
 164     }
 165 
 166     /**
 167      * Called after a redirect or similar kind of retry where a body might
 168      * be sent but we don't want it. Should send a RESET in h2. For http/1.1
 169      * we can consume small quantity of data, or close the connection in
 170      * other cases.
 171      */
 172     public CompletableFuture<Void> ignoreBody() {
 173         if (bodyIgnored != null) return bodyIgnored;
 174         return exchImpl.ignoreBody();
 175     }
 176 
 177     /**
 178      * Called when a new exchange is created to replace this exchange.
 179      * At this point it is guaranteed that readBody/readBodyAsync will
 180      * not be called.
 181      */
 182     public void released() {
 183         ExchangeImpl<?> impl = exchImpl;
 184         if (impl != null) impl.released();
 185         // Don't set exchImpl to null here. We need to keep
 186         // it alive until it's replaced by a Stream in wrapForUpgrade.
 187         // Setting it to null here might get it GC'ed too early, because
 188         // the Http1Response is now only weakly referenced by the Selector.
 189     }
 190 
 191     public void cancel() {
 192         // cancel can be called concurrently before or at the same time
 193         // that the exchange impl is being established.
 194         // In that case we won't be able to propagate the cancellation
 195         // right away
 196         if (exchImpl != null) {
 197             exchImpl.cancel();
 198         } else {
 199             // no impl - can't cancel impl yet.
 200             // call cancel(IOException) instead which takes care
 201             // of race conditions between impl/cancel.
 202             cancel(new IOException("Request cancelled"));
 203         }
 204     }
 205 
 206     public void cancel(IOException cause) {
 207         if (debug.on()) debug.log("cancel exchImpl: %s, with \"%s\"", exchImpl, cause);
 208         // If the impl is non null, propagate the exception right away.
 209         // Otherwise record it so that it can be propagated once the
 210         // exchange impl has been established.
 211         ExchangeImpl<?> impl = exchImpl;
 212         if (impl != null) {
 213             // propagate the exception to the impl
 214             if (debug.on()) debug.log("Cancelling exchImpl: %s", exchImpl);
 215             impl.cancel(cause);
 216         } else {
 217             // no impl yet. record the exception
 218             failed = cause;
 219 
 220             // abort/close the connection if setting up the exchange. This can
 221             // be important when setting up HTTP/2
 222             connectionAborter.closeConnection();
 223 
 224             // now call checkCancelled to recheck the impl.
 225             // if the failed state is set and the impl is not null, reset
 226             // the failed state and propagate the exception to the impl.
 227             checkCancelled();
 228         }
 229     }
 230 
 231     // This method will raise an exception if one was reported and if
 232     // it is possible to do so. If the exception can be raised, then
 233     // the failed state will be reset. Otherwise, the failed state
 234     // will persist until the exception can be raised and the failed state
 235     // can be cleared.
 236     // Takes care of possible race conditions.
 237     private void checkCancelled() {
 238         ExchangeImpl<?> impl = null;
 239         IOException cause = null;
 240         CompletableFuture<? extends ExchangeImpl<T>> cf = null;
 241         if (failed != null) {
 242             synchronized(this) {
 243                 cause = failed;
 244                 impl = exchImpl;
 245                 cf = exchangeCF;
 246             }
 247         }
 248         if (cause == null) return;
 249         if (impl != null) {
 250             // The exception is raised by propagating it to the impl.
 251             if (debug.on()) debug.log("Cancelling exchImpl: %s", impl);
 252             impl.cancel(cause);
 253             failed = null;
 254         } else {
 255             Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set."
 256                          + "\n\tCan''t cancel yet with {2}",
 257                          request.uri(),
 258                          request.timeout().isPresent() ?
 259                          // calling duration.toMillis() can throw an exception.
 260                          // this is just debugging, we don't care if it overflows.
 261                          (request.timeout().get().getSeconds() * 1000
 262                           + request.timeout().get().getNano() / 1000000) : -1,
 263                          cause);
 264             if (cf != null) cf.completeExceptionally(cause);
 265         }
 266     }
 267 
 268     public void h2Upgrade() {
 269         upgrading = true;
 270         request.setH2Upgrade(client.client2());
 271     }
 272 
 273     synchronized IOException getCancelCause() {
 274         return failed;
 275     }
 276 
 277     // get/set the exchange impl, solving race condition issues with
 278     // potential concurrent calls to cancel() or cancel(IOException)
 279     private CompletableFuture<? extends ExchangeImpl<T>>
 280     establishExchange(HttpConnection connection) {
 281         if (debug.on()) {
 282             debug.log("establishing exchange for %s,%n\t proxy=%s",
 283                       request, request.proxy());
 284         }
 285         // check if we have been cancelled first.
 286         Throwable t = getCancelCause();
 287         checkCancelled();
 288         if (t != null) {
 289             return MinimalFuture.failedFuture(t);
 290         }
 291 
 292         CompletableFuture<? extends ExchangeImpl<T>> cf, res;
 293         cf = ExchangeImpl.get(this, connection);
 294         // We should probably use a VarHandle to get/set exchangeCF
 295         // instead - as we need CAS semantics.
 296         synchronized (this) { exchangeCF = cf; };
 297         res = cf.whenComplete((r,x) -> {
 298             synchronized(Exchange.this) {
 299                 if (exchangeCF == cf) exchangeCF = null;
 300             }
 301         });
 302         checkCancelled();
 303         return res.thenCompose((eimpl) -> {
 304                     // recheck for cancelled, in case of race conditions
 305                     exchImpl = eimpl;
 306                     IOException tt = getCancelCause();
 307                     checkCancelled();
 308                     if (tt != null) {
 309                         return MinimalFuture.failedFuture(tt);
 310                     } else {
 311                         // Now we're good to go. Because exchImpl is no longer
 312                         // null cancel() will be able to propagate directly to
 313                         // the impl after this point ( if needed ).
 314                         return MinimalFuture.completedFuture(eimpl);
 315                     } });
 316     }
 317 
 318     // Completed HttpResponse will be null if response succeeded
 319     // will be a non null responseAsync if expect continue returns an error
 320 
 321     public CompletableFuture<Response> responseAsync() {
 322         return responseAsyncImpl(null);
 323     }
 324 
 325     CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
 326         SecurityException e = checkPermissions();
 327         if (e != null) {
 328             return MinimalFuture.failedFuture(e);
 329         } else {
 330             return responseAsyncImpl0(connection);
 331         }
 332     }
 333 
 334     // check whether the headersSentCF was completed exceptionally with
 335     // ProxyAuthorizationRequired. If so the Response embedded in the
 336     // exception is returned. Otherwise we proceed.
 337     private CompletableFuture<Response> checkFor407(ExchangeImpl<T> ex, Throwable t,
 338                                                     Function<ExchangeImpl<T>,CompletableFuture<Response>> andThen) {
 339         t = Utils.getCompletionCause(t);
 340         if (t instanceof ProxyAuthenticationRequired) {
 341             if (debug.on()) debug.log("checkFor407: ProxyAuthenticationRequired: building synthetic response");
 342             bodyIgnored = MinimalFuture.completedFuture(null);
 343             Response proxyResponse = ((ProxyAuthenticationRequired)t).proxyResponse;
 344             HttpConnection c = ex == null ? null : ex.connection();
 345             Response syntheticResponse = new Response(request, this,
 346                     proxyResponse.headers, c, proxyResponse.statusCode,
 347                     proxyResponse.version, true);
 348             return MinimalFuture.completedFuture(syntheticResponse);
 349         } else if (t != null) {
 350             if (debug.on()) debug.log("checkFor407: no response - %s", (Object)t);
 351             return MinimalFuture.failedFuture(t);
 352         } else {
 353             if (debug.on()) debug.log("checkFor407: all clear");
 354             return andThen.apply(ex);
 355         }
 356     }
 357 
 358     // After sending the request headers, if no ProxyAuthorizationRequired
 359     // was raised and the expectContinue flag is on, we need to wait
 360     // for the 100-Continue response
 361     private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
 362         assert request.expectContinue();
 363         return ex.getResponseAsync(parentExecutor)
 364                 .thenCompose((Response r1) -> {
 365             Log.logResponse(r1::toString);
 366             int rcode = r1.statusCode();
 367             if (rcode == 100) {
 368                 Log.logTrace("Received 100-Continue: sending body");
 369                 if (debug.on()) debug.log("Received 100-Continue for %s", r1);
 370                 CompletableFuture<Response> cf =
 371                         exchImpl.sendBodyAsync()
 372                                 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
 373                 cf = wrapForUpgrade(cf);
 374                 cf = wrapForLog(cf);
 375                 return cf;
 376             } else {
 377                 Log.logTrace("Expectation failed: Received {0}",
 378                         rcode);
 379                 if (debug.on()) debug.log("Expect-Continue failed (%d) for: %s", rcode, r1);
 380                 if (upgrading && rcode == 101) {
 381                     IOException failed = new IOException(
 382                             "Unable to handle 101 while waiting for 100");
 383                     return MinimalFuture.failedFuture(failed);
 384                 }
 385                 return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
 386                         .thenApply(v ->  r1);
 387             }
 388         });
 389     }
 390 
 391     // After sending the request headers, if no ProxyAuthorizationRequired
 392     // was raised and the expectContinue flag is off, we can immediately
 393     // send the request body and proceed.
 394     private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {
 395         assert !request.expectContinue();
 396         if (debug.on()) debug.log("sendRequestBody");
 397         CompletableFuture<Response> cf = ex.sendBodyAsync()
 398                 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
 399         cf = wrapForUpgrade(cf);
 400         cf = wrapForLog(cf);
 401         return cf;
 402     }
 403 
 404     CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
 405         Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
 406         bodyIgnored = null;
 407         if (request.expectContinue()) {
 408             request.addSystemHeader("Expect", "100-Continue");
 409             Log.logTrace("Sending Expect: 100-Continue");
 410             // wait for 100-Continue before sending body
 411             after407Check = this::expectContinue;
 412         } else {
 413             // send request body and proceed.
 414             after407Check = this::sendRequestBody;
 415         }
 416         // The ProxyAuthorizationRequired can be triggered either by
 417         // establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
 418         // or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
 419         // Therefore we handle it with a call to this checkFor407(...) after these
 420         // two places.
 421         Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
 422                 (ex) -> ex.sendHeadersAsync()
 423                         .handle((r,t) -> this.checkFor407(r, t, after407Check))
 424                         .thenCompose(Function.identity());
 425         return establishExchange(connection)
 426                 .handle((r,t) -> this.checkFor407(r,t, afterExch407Check))
 427                 .thenCompose(Function.identity());
 428     }
 429 
 430     private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
 431         if (upgrading) {
 432             return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
 433         }
 434         return cf;
 435     }
 436 
 437     private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
 438         if (Log.requests()) {
 439             return cf.thenApply(response -> {
 440                 Log.logResponse(response::toString);
 441                 return response;
 442             });
 443         }
 444         return cf;
 445     }
 446 
 447     HttpResponse.BodySubscriber<T> ignoreBody(HttpResponse.ResponseInfo hdrs) {
 448         return HttpResponse.BodySubscribers.replacing(null);
 449     }
 450 
 451     // if this response was received in reply to an upgrade
 452     // then create the Http2Connection from the HttpConnection
 453     // initialize it and wait for the real response on a newly created Stream
 454 
 455     private CompletableFuture<Response>
 456     checkForUpgradeAsync(Response resp,
 457                          ExchangeImpl<T> ex) {
 458 
 459         int rcode = resp.statusCode();
 460         if (upgrading && (rcode == 101)) {
 461             Http1Exchange<T> e = (Http1Exchange<T>)ex;
 462             // check for 101 switching protocols
 463             // 101 responses are not supposed to contain a body.
 464             //    => should we fail if there is one?
 465             if (debug.on()) debug.log("Upgrading async %s", e.connection());
 466             return e.readBodyAsync(this::ignoreBody, false, parentExecutor)
 467                 .thenCompose((T v) -> {// v is null
 468                     debug.log("Ignored body");
 469                     // we pass e::getBuffer to allow the ByteBuffers to accumulate
 470                     // while we build the Http2Connection
 471                     return Http2Connection.createAsync(e.connection(),
 472                                                  client.client2(),
 473                                                  this, e::drainLeftOverBytes)
 474                         .thenCompose((Http2Connection c) -> {
 475                             boolean cached = c.offerConnection();
 476                             Stream<T> s = c.getStream(1);
 477 
 478                             if (s == null) {
 479                                 // s can be null if an exception occurred
 480                                 // asynchronously while sending the preface.
 481                                 Throwable t = c.getRecordedCause();
 482                                 IOException ioe;
 483                                 if (t != null) {
 484                                     if (!cached)
 485                                         c.close();
 486                                     ioe = new IOException("Can't get stream 1: " + t, t);
 487                                 } else {
 488                                     ioe = new IOException("Can't get stream 1");
 489                                 }
 490                                 return MinimalFuture.failedFuture(ioe);
 491                             }
 492                             exchImpl.released();
 493                             Throwable t;
 494                             // There's a race condition window where an external
 495                             // thread (SelectorManager) might complete the
 496                             // exchange in timeout at the same time where we're
 497                             // trying to switch the exchange impl.
 498                             // 'failed' will be reset to null after
 499                             // exchImpl.cancel() has completed, so either we
 500                             // will observe failed != null here, or we will
 501                             // observe e.getCancelCause() != null, or the
 502                             // timeout exception will be routed to 's'.
 503                             // Either way, we need to relay it to s.
 504                             synchronized (this) {
 505                                 exchImpl = s;
 506                                 t = failed;
 507                             }
 508                             // Check whether the HTTP/1.1 was cancelled.
 509                             if (t == null) t = e.getCancelCause();
 510                             // if HTTP/1.1 exchange was timed out, don't
 511                             // try to go further.
 512                             if (t instanceof HttpTimeoutException) {
 513                                  s.cancelImpl(t);
 514                                  return MinimalFuture.failedFuture(t);
 515                             }
 516                             if (debug.on())
 517                                 debug.log("Getting response async %s", s);
 518                             return s.getResponseAsync(null);
 519                         });}
 520                 );
 521         }
 522         return MinimalFuture.completedFuture(resp);
 523     }
 524 
 525     private URI getURIForSecurityCheck() {
 526         URI u;
 527         String method = request.method();
 528         InetSocketAddress authority = request.authority();
 529         URI uri = request.uri();
 530 
 531         // CONNECT should be restricted at API level
 532         if (method.equalsIgnoreCase("CONNECT")) {
 533             try {
 534                 u = new URI("socket",
 535                              null,
 536                              authority.getHostString(),
 537                              authority.getPort(),
 538                              null,
 539                              null,
 540                              null);
 541             } catch (URISyntaxException e) {
 542                 throw new InternalError(e); // shouldn't happen
 543             }
 544         } else {
 545             u = uri;
 546         }
 547         return u;
 548     }
 549 
 550     /**
 551      * Returns the security permission required for the given details.
 552      * If method is CONNECT, then uri must be of form "scheme://host:port"
 553      */
 554     private static URLPermission permissionForServer(URI uri,
 555                                                      String method,
 556                                                      Map<String, List<String>> headers) {
 557         if (method.equals("CONNECT")) {
 558             return new URLPermission(uri.toString(), "CONNECT");
 559         } else {
 560             return Utils.permissionForServer(uri, method, headers.keySet().stream());
 561         }
 562     }
 563 
 564     /**
 565      * Performs the necessary security permission checks required to retrieve
 566      * the response. Returns a security exception representing the denied
 567      * permission, or null if all checks pass or there is no security manager.
 568      */
 569     private SecurityException checkPermissions() {
 570         String method = request.method();
 571         SecurityManager sm = System.getSecurityManager();
 572         if (sm == null || method.equals("CONNECT")) {
 573             // tunneling will have a null acc, which is fine. The proxy
 574             // permission check will have already been preformed.
 575             return null;
 576         }
 577 
 578         HttpHeaders userHeaders = request.getUserHeaders();
 579         URI u = getURIForSecurityCheck();
 580         URLPermission p = permissionForServer(u, method, userHeaders.map());
 581 
 582         try {
 583             assert acc != null;
 584             sm.checkPermission(p, acc);
 585         } catch (SecurityException e) {
 586             return e;
 587         }
 588         ProxySelector ps = client.proxySelector();
 589         if (ps != null) {
 590             if (!method.equals("CONNECT")) {
 591                 // a non-tunneling HTTP proxy. Need to check access
 592                 URLPermission proxyPerm = permissionForProxy(request.proxy());
 593                 if (proxyPerm != null) {
 594                     try {
 595                         sm.checkPermission(proxyPerm, acc);
 596                     } catch (SecurityException e) {
 597                         return e;
 598                     }
 599                 }
 600             }
 601         }
 602         return null;
 603     }
 604 
 605     HttpClient.Version version() {
 606         return multi.version();
 607     }
 608 
 609     String dbgString() {
 610         return dbgTag;
 611     }
 612 }