1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.InetSocketAddress;
  31 import java.net.ProxySelector;
  32 import java.net.URI;
  33 import java.net.URISyntaxException;
  34 import java.net.URLPermission;
  35 import java.security.AccessControlContext;
  36 import java.util.List;
  37 import java.util.Map;
  38 import java.util.concurrent.CompletableFuture;
  39 import java.util.concurrent.Executor;
  40 import jdk.incubator.http.internal.common.MinimalFuture;
  41 import jdk.incubator.http.internal.common.Utils;
  42 import jdk.incubator.http.internal.common.Log;
  43 
  44 import static jdk.incubator.http.internal.common.Utils.permissionForProxy;
  45 
  46 /**
  47  * One request/response exchange (handles 100/101 intermediate response also).
  48  * depth field used to track number of times a new request is being sent
  49  * for a given API request. If limit exceeded exception is thrown.
  50  *
  51  * Security check is performed here:
  52  * - uses AccessControlContext captured at API level
  53  * - checks for appropriate URLPermission for request
  54  * - if permission allowed, grants equivalent SocketPermission to call
  55  * - in case of direct HTTP proxy, checks additionally for access to proxy
  56  *    (CONNECT proxying uses its own Exchange, so check done there)
  57  *
  58  */
  59 final class Exchange<T> {
  60 
  61     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  62     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
  63 
  64     final HttpRequestImpl request;
  65     final HttpClientImpl client;
  66     volatile ExchangeImpl<T> exchImpl;
  67     volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
  68     // used to record possible cancellation raised before the exchImpl
  69     // has been established.
  70     private volatile IOException failed;
  71     final AccessControlContext acc;
  72     final MultiExchange<?,T> multi;
  73     final Executor parentExecutor;
  74     boolean upgrading; // to HTTP/2
  75     final PushGroup<?,T> pushGroup;
  76     final String dbgTag;
  77 
  78     Exchange(HttpRequestImpl request, MultiExchange<?,T> multi) {
  79         this.request = request;
  80         this.upgrading = false;
  81         this.client = multi.client();
  82         this.multi = multi;
  83         this.acc = multi.acc;
  84         this.parentExecutor = multi.executor;
  85         this.pushGroup = multi.pushGroup;
  86         this.dbgTag = "Exchange";
  87     }
  88 
  89     /* If different AccessControlContext to be used  */
  90     Exchange(HttpRequestImpl request,
  91              MultiExchange<?,T> multi,
  92              AccessControlContext acc)
  93     {
  94         this.request = request;
  95         this.acc = acc;
  96         this.upgrading = false;
  97         this.client = multi.client();
  98         this.multi = multi;
  99         this.parentExecutor = multi.executor;
 100         this.pushGroup = multi.pushGroup;
 101         this.dbgTag = "Exchange";
 102     }
 103 
 104     PushGroup<?,T> getPushGroup() {
 105         return pushGroup;
 106     }
 107 
 108     Executor executor() {
 109         return parentExecutor;
 110     }
 111 
 112     public HttpRequestImpl request() {
 113         return request;
 114     }
 115 
 116     HttpClientImpl client() {
 117         return client;
 118     }
 119 
 120 
 121     public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
 122         // The connection will not be returned to the pool in the case of WebSocket
 123         return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor)
 124                 .whenComplete((r,t) -> exchImpl.completed());
 125     }
 126 
 127     /**
 128      * Called after a redirect or similar kind of retry where a body might
 129      * be sent but we don't want it. Should send a RESET in h2. For http/1.1
 130      * we can consume small quantity of data, or close the connection in
 131      * other cases.
 132      */
 133     public CompletableFuture<Void> ignoreBody() {
 134         return exchImpl.ignoreBody();
 135     }
 136 
 137     /**
 138      * Called when a new exchange is created to replace this exchange.
 139      * At this point it is guaranteed that readBody/readBodyAsync will
 140      * not be called.
 141      */
 142     public void released() {
 143         ExchangeImpl<?> impl = exchImpl;
 144         if (impl != null) impl.released();
 145         // Don't set exchImpl to null here. We need to keep
 146         // it alive until it's replaced by a Stream in wrapForUpgrade.
 147         // Setting it to null here might get it GC'ed too early, because
 148         // the Http1Response is now only weakly referenced by the Selector.
 149     }
 150 
 151     public void cancel() {
 152         // cancel can be called concurrently before or at the same time
 153         // that the exchange impl is being established.
 154         // In that case we won't be able to propagate the cancellation
 155         // right away
 156         if (exchImpl != null) {
 157             exchImpl.cancel();
 158         } else {
 159             // no impl - can't cancel impl yet.
 160             // call cancel(IOException) instead which takes care
 161             // of race conditions between impl/cancel.
 162             cancel(new IOException("Request cancelled"));
 163         }
 164     }
 165 
 166     public void cancel(IOException cause) {
 167         // If the impl is non null, propagate the exception right away.
 168         // Otherwise record it so that it can be propagated once the
 169         // exchange impl has been established.
 170         ExchangeImpl<?> impl = exchImpl;
 171         if (impl != null) {
 172             // propagate the exception to the impl
 173             debug.log(Level.DEBUG, "Cancelling exchImpl: %s", exchImpl);
 174             impl.cancel(cause);
 175         } else {
 176             // no impl yet. record the exception
 177             failed = cause;
 178             // now call checkCancelled to recheck the impl.
 179             // if the failed state is set and the impl is not null, reset
 180             // the failed state and propagate the exception to the impl.
 181             checkCancelled();
 182         }
 183     }
 184 
 185     // This method will raise an exception if one was reported and if
 186     // it is possible to do so. If the exception can be raised, then
 187     // the failed state will be reset. Otherwise, the failed state
 188     // will persist until the exception can be raised and the failed state
 189     // can be cleared.
 190     // Takes care of possible race conditions.
 191     private void checkCancelled() {
 192         ExchangeImpl<?> impl = null;
 193         IOException cause = null;
 194         CompletableFuture<? extends ExchangeImpl<T>> cf = null;
 195         if (failed != null) {
 196             synchronized(this) {
 197                 cause = failed;
 198                 impl = exchImpl;
 199                 cf = exchangeCF;
 200             }
 201         }
 202         if (cause == null) return;
 203         if (impl != null) {
 204             // The exception is raised by propagating it to the impl.
 205             debug.log(Level.DEBUG, "Cancelling exchImpl: %s", impl);
 206             impl.cancel(cause);
 207             failed = null;
 208         } else {
 209             Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set."
 210                          + "\n\tCan''t cancel yet with {2}",
 211                          request.uri(),
 212                          request.timeout().isPresent() ?
 213                          // calling duration.toMillis() can throw an exception.
 214                          // this is just debugging, we don't care if it overflows.
 215                          (request.timeout().get().getSeconds() * 1000
 216                           + request.timeout().get().getNano() / 1000000) : -1,
 217                          cause);
 218             if (cf != null) cf.completeExceptionally(cause);
 219         }
 220     }
 221 
 222     public void h2Upgrade() {
 223         upgrading = true;
 224         request.setH2Upgrade(client.client2());
 225     }
 226 
 227     synchronized IOException getCancelCause() {
 228         return failed;
 229     }
 230 
 231     // get/set the exchange impl, solving race condition issues with
 232     // potential concurrent calls to cancel() or cancel(IOException)
 233     private CompletableFuture<? extends ExchangeImpl<T>>
 234     establishExchange(HttpConnection connection) {
 235         if (debug.isLoggable(Level.DEBUG)) {
 236             debug.log(Level.DEBUG,
 237                     "establishing exchange for %s,%n\t proxy=%s",
 238                     request,
 239                     request.proxy());
 240         }
 241         // check if we have been cancelled first.
 242         Throwable t = getCancelCause();
 243         checkCancelled();
 244         if (t != null) {
 245             return MinimalFuture.failedFuture(t);
 246         }
 247 
 248         CompletableFuture<? extends ExchangeImpl<T>> cf, res;
 249         cf = ExchangeImpl.get(this, connection);
 250         // We should probably use a VarHandle to get/set exchangeCF
 251         // instead - as we need CAS semantics.
 252         synchronized (this) { exchangeCF = cf; };
 253         res = cf.whenComplete((r,x) -> {
 254             synchronized(Exchange.this) {
 255                 if (exchangeCF == cf) exchangeCF = null;
 256             }
 257         });
 258         checkCancelled();
 259         return res.thenCompose((eimpl) -> {
 260                     // recheck for cancelled, in case of race conditions
 261                     exchImpl = eimpl;
 262                     IOException tt = getCancelCause();
 263                     checkCancelled();
 264                     if (tt != null) {
 265                         return MinimalFuture.failedFuture(tt);
 266                     } else {
 267                         // Now we're good to go. Because exchImpl is no longer
 268                         // null cancel() will be able to propagate directly to
 269                         // the impl after this point ( if needed ).
 270                         return MinimalFuture.completedFuture(eimpl);
 271                     } });
 272     }
 273 
 274     // Completed HttpResponse will be null if response succeeded
 275     // will be a non null responseAsync if expect continue returns an error
 276 
 277     public CompletableFuture<Response> responseAsync() {
 278         return responseAsyncImpl(null);
 279     }
 280 
 281     CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
 282         SecurityException e = checkPermissions();
 283         if (e != null) {
 284             return MinimalFuture.failedFuture(e);
 285         } else {
 286             return responseAsyncImpl0(connection);
 287         }
 288     }
 289 
 290     CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
 291         if (request.expectContinue()) {
 292             request.addSystemHeader("Expect", "100-Continue");
 293             Log.logTrace("Sending Expect: 100-Continue");
 294             return establishExchange(connection)
 295                     .thenCompose((ex) -> ex.sendHeadersAsync())
 296                     .thenCompose(v -> exchImpl.getResponseAsync(parentExecutor))
 297                     .thenCompose((Response r1) -> {
 298                         Log.logResponse(r1::toString);
 299                         int rcode = r1.statusCode();
 300                         if (rcode == 100) {
 301                             Log.logTrace("Received 100-Continue: sending body");
 302                             CompletableFuture<Response> cf =
 303                                     exchImpl.sendBodyAsync()
 304                                             .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
 305                             cf = wrapForUpgrade(cf);
 306                             cf = wrapForLog(cf);
 307                             return cf;
 308                         } else {
 309                             Log.logTrace("Expectation failed: Received {0}",
 310                                          rcode);
 311                             if (upgrading && rcode == 101) {
 312                                 IOException failed = new IOException(
 313                                         "Unable to handle 101 while waiting for 100");
 314                                 return MinimalFuture.failedFuture(failed);
 315                             }
 316                             return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
 317                                   .thenApply(v ->  r1);
 318                         }
 319                     });
 320         } else {
 321             CompletableFuture<Response> cf = establishExchange(connection)
 322                     .thenCompose((ex) -> ex.sendHeadersAsync())
 323                     .thenCompose(ExchangeImpl::sendBodyAsync)
 324                     .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
 325             cf = wrapForUpgrade(cf);
 326             cf = wrapForLog(cf);
 327             return cf;
 328         }
 329     }
 330 
 331     private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
 332         if (upgrading) {
 333             return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
 334         }
 335         return cf;
 336     }
 337 
 338     private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
 339         if (Log.requests()) {
 340             return cf.thenApply(response -> {
 341                 Log.logResponse(response::toString);
 342                 return response;
 343             });
 344         }
 345         return cf;
 346     }
 347 
 348     HttpResponse.BodySubscriber<T> ignoreBody(int status, HttpHeaders hdrs) {
 349         return HttpResponse.BodySubscriber.discard((T)null);
 350     }
 351 
 352     // if this response was received in reply to an upgrade
 353     // then create the Http2Connection from the HttpConnection
 354     // initialize it and wait for the real response on a newly created Stream
 355 
 356     private CompletableFuture<Response>
 357     checkForUpgradeAsync(Response resp,
 358                          ExchangeImpl<T> ex) {
 359 
 360         int rcode = resp.statusCode();
 361         if (upgrading && (rcode == 101)) {
 362             Http1Exchange<T> e = (Http1Exchange<T>)ex;
 363             // check for 101 switching protocols
 364             // 101 responses are not supposed to contain a body.
 365             //    => should we fail if there is one?
 366             debug.log(Level.DEBUG, "Upgrading async %s" + e.connection());
 367             return e.readBodyAsync(this::ignoreBody, false, parentExecutor)
 368                 .thenCompose((T v) -> {// v is null
 369                     debug.log(Level.DEBUG, "Ignored body");
 370                     // we pass e::getBuffer to allow the ByteBuffers to accumulate
 371                     // while we build the Http2Connection
 372                     return Http2Connection.createAsync(e.connection(),
 373                                                  client.client2(),
 374                                                  this, e::drainLeftOverBytes)
 375                         .thenCompose((Http2Connection c) -> {
 376                             c.putConnection();
 377                             Stream<T> s = c.getStream(1);
 378                             if (s == null) {
 379                                 // s can be null if an exception occurred
 380                                 // asynchronously while sending the preface.
 381                                 Throwable t = c.getRecordedCause();
 382                                 if (t != null) {
 383                                     return MinimalFuture.failedFuture(
 384                                             new IOException("Can't get stream 1: " + t, t));
 385                                 }
 386                             }
 387                             exchImpl.released();
 388                             Throwable t;
 389                             // There's a race condition window where an external
 390                             // thread (SelectorManager) might complete the
 391                             // exchange in timeout at the same time where we're
 392                             // trying to switch the exchange impl.
 393                             // 'failed' will be reset to null after
 394                             // exchImpl.cancel() has completed, so either we
 395                             // will observe failed != null here, or we will
 396                             // observe e.getCancelCause() != null, or the
 397                             // timeout exception will be routed to 's'.
 398                             // Either way, we need to relay it to s.
 399                             synchronized (this) {
 400                                 exchImpl = s;
 401                                 t = failed;
 402                             }
 403                             // Check whether the HTTP/1.1 was cancelled.
 404                             if (t == null) t = e.getCancelCause();
 405                             // if HTTP/1.1 exchange was timed out, don't
 406                             // try to go further.
 407                             if (t instanceof HttpTimeoutException) {
 408                                  s.cancelImpl(t);
 409                                  return MinimalFuture.failedFuture(t);
 410                             }
 411                             debug.log(Level.DEBUG, "Getting response async %s" + s);
 412                             return s.getResponseAsync(null);
 413                         });}
 414                 );
 415         }
 416         return MinimalFuture.completedFuture(resp);
 417     }
 418 
 419     private URI getURIForSecurityCheck() {
 420         URI u;
 421         String method = request.method();
 422         InetSocketAddress authority = request.authority();
 423         URI uri = request.uri();
 424 
 425         // CONNECT should be restricted at API level
 426         if (method.equalsIgnoreCase("CONNECT")) {
 427             try {
 428                 u = new URI("socket",
 429                              null,
 430                              authority.getHostString(),
 431                              authority.getPort(),
 432                              null,
 433                              null,
 434                              null);
 435             } catch (URISyntaxException e) {
 436                 throw new InternalError(e); // shouldn't happen
 437             }
 438         } else {
 439             u = uri;
 440         }
 441         return u;
 442     }
 443 
 444     /**
 445      * Returns the security permission required for the given details.
 446      * If method is CONNECT, then uri must be of form "scheme://host:port"
 447      */
 448     private static URLPermission permissionForServer(URI uri,
 449                                                      String method,
 450                                                      Map<String, List<String>> headers) {
 451         if (method.equals("CONNECT")) {
 452             return new URLPermission(uri.toString(), "CONNECT");
 453         } else {
 454             return Utils.permissionForServer(uri, method, headers.keySet().stream());
 455         }
 456     }
 457 
 458     /**
 459      * Performs the necessary security permission checks required to retrieve
 460      * the response. Returns a security exception representing the denied
 461      * permission, or null if all checks pass or there is no security manager.
 462      */
 463     private SecurityException checkPermissions() {
 464         String method = request.method();
 465         SecurityManager sm = System.getSecurityManager();
 466         if (sm == null || method.equals("CONNECT")) {
 467             // tunneling will have a null acc, which is fine. The proxy
 468             // permission check will have already been preformed.
 469             return null;
 470         }
 471 
 472         HttpHeaders userHeaders = request.getUserHeaders();
 473         URI u = getURIForSecurityCheck();
 474         URLPermission p = permissionForServer(u, method, userHeaders.map());
 475 
 476         try {
 477             assert acc != null;
 478             sm.checkPermission(p, acc);
 479         } catch (SecurityException e) {
 480             return e;
 481         }
 482         ProxySelector ps = client.proxySelector();
 483         if (ps != null) {
 484             if (!method.equals("CONNECT")) {
 485                 // a non-tunneling HTTP proxy. Need to check access
 486                 URLPermission proxyPerm = permissionForProxy(request.proxy());
 487                 if (proxyPerm != null) {
 488                     try {
 489                         sm.checkPermission(proxyPerm, acc);
 490                     } catch (SecurityException e) {
 491                         return e;
 492                     }
 493                 }
 494             }
 495         }
 496         return null;
 497     }
 498 
 499     HttpClient.Version version() {
 500         return multi.version();
 501     }
 502 
 503     String dbgString() {
 504         return dbgTag;
 505     }
 506 }