1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.io.UncheckedIOException;
  30 import java.net.InetSocketAddress;
  31 import java.net.ProxySelector;
  32 import java.net.SocketPermission;
  33 import java.net.URI;
  34 import java.net.URISyntaxException;
  35 import java.net.URLPermission;
  36 import java.security.AccessControlContext;
  37 import java.security.AccessController;
  38 import java.security.PrivilegedAction;
  39 import java.security.PrivilegedActionException;
  40 import java.security.PrivilegedExceptionAction;
  41 import java.util.LinkedList;
  42 import java.util.List;
  43 import java.util.concurrent.CompletableFuture;
  44 import java.util.concurrent.Executor;
  45 import jdk.incubator.http.internal.common.MinimalFuture;
  46 import jdk.incubator.http.internal.common.Utils;
  47 import jdk.incubator.http.internal.common.Log;
  48 
  49 /**
  50  * One request/response exchange (handles 100/101 intermediate response also).
  51  * depth field used to track number of times a new request is being sent
  52  * for a given API request. If limit exceeded exception is thrown.
  53  *
  54  * Security check is performed here:
  55  * - uses AccessControlContext captured at API level
  56  * - checks for appropriate URLPermission for request
  57  * - if permission allowed, grants equivalent SocketPermission to call
  58  * - in case of direct HTTP proxy, checks additionally for access to proxy
  59  *    (CONNECT proxying uses its own Exchange, so check done there)
  60  *
  61  */
  62 final class Exchange<T> {
  63 
  64     final HttpRequestImpl request;
  65     final HttpClientImpl client;
  66     volatile ExchangeImpl<T> exchImpl;
  67     // used to record possible cancellation raised before the exchImpl
  68     // has been established.
  69     private volatile IOException failed;
  70     final List<SocketPermission> permissions = new LinkedList<>();
  71     final AccessControlContext acc;
  72     final MultiExchange<?,T> multi;
  73     final Executor parentExecutor;
  74     final HttpRequest.BodyProcessor requestProcessor;
  75     boolean upgrading; // to HTTP/2
  76     final PushGroup<?,T> pushGroup;
  77 
  78     Exchange(HttpRequestImpl request, MultiExchange<?,T> multi) {
  79         this.request = request;
  80         this.upgrading = false;
  81         this.client = multi.client();
  82         this.multi = multi;
  83         this.acc = multi.acc;
  84         this.parentExecutor = multi.executor;
  85         this.requestProcessor = request.requestProcessor;
  86         this.pushGroup = multi.pushGroup;
  87     }
  88 
  89     /* If different AccessControlContext to be used  */
  90     Exchange(HttpRequestImpl request,
  91              MultiExchange<?,T> multi,
  92              AccessControlContext acc)
  93     {
  94         this.request = request;
  95         this.acc = acc;
  96         this.upgrading = false;
  97         this.client = multi.client();
  98         this.multi = multi;
  99         this.parentExecutor = multi.executor;
 100         this.requestProcessor = request.requestProcessor;
 101         this.pushGroup = multi.pushGroup;
 102     }
 103 
 104     PushGroup<?,T> getPushGroup() {
 105         return pushGroup;
 106     }
 107 
 108     Executor executor() {
 109         return parentExecutor;
 110     }
 111 
 112     public HttpRequestImpl request() {
 113         return request;
 114     }
 115 
 116     HttpClientImpl client() {
 117         return client;
 118     }
 119 
 120     public Response response() throws IOException, InterruptedException {
 121         return responseImpl(null);
 122     }
 123 
 124     public T readBody(HttpResponse.BodyHandler<T> responseHandler) throws IOException {
 125         // The connection will not be returned to the pool in the case of WebSocket
 126         return exchImpl.readBody(responseHandler, !request.isWebSocket());
 127     }
 128 
 129     public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
 130         // The connection will not be returned to the pool in the case of WebSocket
 131         return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor);
 132     }
 133 
 134     public void cancel() {
 135         // cancel can be called concurrently before or at the same time
 136         // that the exchange impl is being established.
 137         // In that case we won't be able to propagate the cancellation
 138         // right away
 139         if (exchImpl != null) {
 140             exchImpl.cancel();
 141         } else {
 142             // no impl - can't cancel impl yet.
 143             // call cancel(IOException) instead which takes care
 144             // of race conditions between impl/cancel.
 145             cancel(new IOException("Request cancelled"));
 146         }
 147     }
 148 
 149     public void cancel(IOException cause) {
 150         // If the impl is non null, propagate the exception right away.
 151         // Otherwise record it so that it can be propagated once the
 152         // exchange impl has been established.
 153         ExchangeImpl<?> impl = exchImpl;
 154         if (impl != null) {
 155             // propagate the exception to the impl
 156             impl.cancel(cause);
 157         } else {
 158             try {
 159                 // no impl yet. record the exception
 160                 failed = cause;
 161                 // now call checkCancelled to recheck the impl.
 162                 // if the failed state is set and the impl is not null, reset
 163                 // the failed state and propagate the exception to the impl.
 164                 checkCancelled(false);
 165             } catch (IOException x) {
 166                 // should not happen - we passed 'false' above
 167                 throw new UncheckedIOException(x);
 168             }
 169         }
 170     }
 171 
 172     // This method will raise an exception if one was reported and if
 173     // it is possible to do so. If the exception can be raised, then
 174     // the failed state will be reset. Otherwise, the failed state
 175     // will persist until the exception can be raised and the failed state
 176     // can be cleared.
 177     // Takes care of possible race conditions.
 178     private void checkCancelled(boolean throwIfNoImpl) throws IOException {
 179         ExchangeImpl<?> impl = null;
 180         IOException cause = null;
 181         if (failed != null) {
 182             synchronized(this) {
 183                 cause = failed;
 184                 impl = exchImpl;
 185                 if (throwIfNoImpl || impl != null) {
 186                     // The exception will be raised by one of the two methods
 187                     // below: reset the failed state.
 188                     failed = null;
 189                 }
 190             }
 191         }
 192         if (cause == null) return;
 193         if (impl != null) {
 194             // The exception is raised by propagating it to the impl.
 195             impl.cancel(cause);
 196         } else if (throwIfNoImpl) {
 197             // The exception is raised by throwing it immediately
 198             throw cause;
 199         } else {
 200             Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set."
 201                          + "\n\tCan''t cancel yet with {2}",
 202                          request.uri(),
 203                          request.duration() == null ? -1 :
 204                          // calling duration.toMillis() can throw an exception.
 205                          // this is just debugging, we don't care if it overflows.
 206                          (request.duration().getSeconds() * 1000
 207                           + request.duration().getNano() / 1000000),
 208                          cause);
 209         }
 210     }
 211 
 212     public void h2Upgrade() {
 213         upgrading = true;
 214         request.setH2Upgrade(client.client2());
 215     }
 216 
 217     static final SocketPermission[] SOCKET_ARRAY = new SocketPermission[0];
 218 
 219     Response responseImpl(HttpConnection connection)
 220         throws IOException, InterruptedException
 221     {
 222         SecurityException e = securityCheck(acc);
 223         if (e != null) {
 224             throw e;
 225         }
 226 
 227         if (permissions.size() > 0) {
 228             try {
 229                 return AccessController.doPrivileged(
 230                         (PrivilegedExceptionAction<Response>)() ->
 231                              responseImpl0(connection),
 232                         null,
 233                         permissions.toArray(SOCKET_ARRAY));
 234             } catch (Throwable ee) {
 235                 if (ee instanceof PrivilegedActionException) {
 236                     ee = ee.getCause();
 237                 }
 238                 if (ee instanceof IOException) {
 239                     throw (IOException) ee;
 240                 } else {
 241                     throw new RuntimeException(ee); // TODO: fix
 242                 }
 243             }
 244         } else {
 245             return responseImpl0(connection);
 246         }
 247     }
 248 
 249     // get/set the exchange impl, solving race condition issues with
 250     // potential concurrent calls to cancel() or cancel(IOException)
 251     private void establishExchange(HttpConnection connection)
 252         throws IOException, InterruptedException
 253     {
 254         // check if we have been cancelled first.
 255         checkCancelled(true);
 256         // not yet cancelled: create/get a new impl
 257         exchImpl = ExchangeImpl.get(this, connection);
 258         // recheck for cancelled, in case of race conditions
 259         checkCancelled(true);
 260         // now we're good to go. because exchImpl is no longer null
 261         // cancel() will be able to propagate directly to the impl
 262         // after this point.
 263     }
 264 
 265     private Response responseImpl0(HttpConnection connection)
 266         throws IOException, InterruptedException
 267     {
 268         establishExchange(connection);
 269         if (request.expectContinue()) {
 270             Log.logTrace("Sending Expect: 100-Continue");
 271             request.addSystemHeader("Expect", "100-Continue");
 272             exchImpl.sendHeadersOnly();
 273 
 274             Log.logTrace("Waiting for 407-Expectation-Failed or 100-Continue");
 275             Response resp = exchImpl.getResponse();
 276             HttpResponseImpl.logResponse(resp);
 277             int rcode = resp.statusCode();
 278             if (rcode != 100) {
 279                 Log.logTrace("Expectation failed: Received {0}",
 280                              rcode);
 281                 if (upgrading && rcode == 101) {
 282                     throw new IOException(
 283                         "Unable to handle 101 while waiting for 100-Continue");
 284                 }
 285                 return resp;
 286             }
 287 
 288             Log.logTrace("Received 100-Continue: sending body");
 289             exchImpl.sendBody();
 290 
 291             Log.logTrace("Body sent: waiting for response");
 292             resp = exchImpl.getResponse();
 293             HttpResponseImpl.logResponse(resp);
 294 
 295             return checkForUpgrade(resp, exchImpl);
 296         } else {
 297             exchImpl.sendHeadersOnly();
 298             exchImpl.sendBody();
 299             Response resp = exchImpl.getResponse();
 300             HttpResponseImpl.logResponse(resp);
 301             return checkForUpgrade(resp, exchImpl);
 302         }
 303     }
 304 
 305     // Completed HttpResponse will be null if response succeeded
 306     // will be a non null responseAsync if expect continue returns an error
 307 
 308     public CompletableFuture<Response> responseAsync() {
 309         return responseAsyncImpl(null);
 310     }
 311 
 312     CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
 313         SecurityException e = securityCheck(acc);
 314         if (e != null) {
 315             return MinimalFuture.failedFuture(e);
 316         }
 317         if (permissions.size() > 0) {
 318             return AccessController.doPrivileged(
 319                     (PrivilegedAction<CompletableFuture<Response>>)() ->
 320                         responseAsyncImpl0(connection),
 321                     null,
 322                     permissions.toArray(SOCKET_ARRAY));
 323         } else {
 324             return responseAsyncImpl0(connection);
 325         }
 326     }
 327 
 328     CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
 329         try {
 330             establishExchange(connection);
 331         } catch (IOException | InterruptedException e) {
 332             return MinimalFuture.failedFuture(e);
 333         }
 334         if (request.expectContinue()) {
 335             request.addSystemHeader("Expect", "100-Continue");
 336             Log.logTrace("Sending Expect: 100-Continue");
 337             return exchImpl
 338                     .sendHeadersAsync()
 339                     .thenCompose(v -> exchImpl.getResponseAsync(parentExecutor))
 340                     .thenCompose((Response r1) -> {
 341                         HttpResponseImpl.logResponse(r1);
 342                         int rcode = r1.statusCode();
 343                         if (rcode == 100) {
 344                             Log.logTrace("Received 100-Continue: sending body");
 345                             CompletableFuture<Response> cf =
 346                                     exchImpl.sendBodyAsync()
 347                                             .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
 348                             cf = wrapForUpgrade(cf);
 349                             cf = wrapForLog(cf);
 350                             return cf;
 351                         } else {
 352                             Log.logTrace("Expectation failed: Received {0}",
 353                                          rcode);
 354                             if (upgrading && rcode == 101) {
 355                                 IOException failed = new IOException(
 356                                         "Unable to handle 101 while waiting for 100");
 357                                 return MinimalFuture.failedFuture(failed);
 358                             }
 359                             return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
 360                                   .thenApply(v ->  r1);
 361                         }
 362                     });
 363         } else {
 364             CompletableFuture<Response> cf = exchImpl
 365                     .sendHeadersAsync()
 366                     .thenCompose(ExchangeImpl::sendBodyAsync)
 367                     .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
 368             cf = wrapForUpgrade(cf);
 369             cf = wrapForLog(cf);
 370             return cf;
 371         }
 372     }
 373 
 374     private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
 375         if (upgrading) {
 376             return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
 377         }
 378         return cf;
 379     }
 380 
 381     private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
 382         if (Log.requests()) {
 383             return cf.thenApply(response -> {
 384                 HttpResponseImpl.logResponse(response);
 385                 return response;
 386             });
 387         }
 388         return cf;
 389     }
 390 
 391     HttpResponse.BodyProcessor<T> ignoreBody(int status, HttpHeaders hdrs) {
 392         return HttpResponse.BodyProcessor.discard((T)null);
 393     }
 394 
 395     // if this response was received in reply to an upgrade
 396     // then create the Http2Connection from the HttpConnection
 397     // initialize it and wait for the real response on a newly created Stream
 398 
 399     private CompletableFuture<Response>
 400     checkForUpgradeAsync(Response resp,
 401                          ExchangeImpl<T> ex) {
 402 
 403         int rcode = resp.statusCode();
 404         if (upgrading && (rcode == 101)) {
 405             Http1Exchange<T> e = (Http1Exchange<T>)ex;
 406             // check for 101 switching protocols
 407             // 101 responses are not supposed to contain a body.
 408             //    => should we fail if there is one?
 409             return e.readBodyAsync(this::ignoreBody, false, parentExecutor)
 410                 .thenCompose((T v) -> // v is null
 411                      Http2Connection.createAsync(e.connection(),
 412                                                  client.client2(),
 413                                                  this, e.getBuffer())
 414                         .thenCompose((Http2Connection c) -> {
 415                             c.putConnection();
 416                             Stream<T> s = c.getStream(1);
 417                             exchImpl = s;
 418                             return s.getResponseAsync(null);
 419                         })
 420                 );
 421         }
 422         return MinimalFuture.completedFuture(resp);
 423     }
 424 
 425     private Response checkForUpgrade(Response resp,
 426                                              ExchangeImpl<T> ex)
 427         throws IOException, InterruptedException
 428     {
 429         int rcode = resp.statusCode();
 430         if (upgrading && (rcode == 101)) {
 431             Http1Exchange<T> e = (Http1Exchange<T>) ex;
 432 
 433             // 101 responses are not supposed to contain a body.
 434             //    => should we fail if there is one?
 435             //    => readBody called here by analogy with
 436             //       checkForUpgradeAsync above
 437             e.readBody(this::ignoreBody, false);
 438 
 439             // must get connection from Http1Exchange
 440             Http2Connection h2con = new Http2Connection(e.connection(),
 441                                                         client.client2(),
 442                                                         this, e.getBuffer());
 443             h2con.putConnection();
 444             Stream<T> s = h2con.getStream(1);
 445             exchImpl = s;
 446             Response xx = s.getResponse();
 447             HttpResponseImpl.logResponse(xx);
 448             return xx;
 449         }
 450         return resp;
 451     }
 452 
 453     private URI getURIForSecurityCheck() {
 454         URI u;
 455         String method = request.method();
 456         InetSocketAddress authority = request.authority();
 457         URI uri = request.uri();
 458 
 459         // CONNECT should be restricted at API level
 460         if (method.equalsIgnoreCase("CONNECT")) {
 461             try {
 462                 u = new URI("socket",
 463                              null,
 464                              authority.getHostString(),
 465                              authority.getPort(),
 466                              null,
 467                              null,
 468                              null);
 469             } catch (URISyntaxException e) {
 470                 throw new InternalError(e); // shouldn't happen
 471             }
 472         } else {
 473             u = uri;
 474         }
 475         return u;
 476     }
 477 
 478     /**
 479      * Do the security check and return any exception.
 480      * Return null if no check needed or passes.
 481      *
 482      * Also adds any generated permissions to the "permissions" list.
 483      */
 484     private SecurityException securityCheck(AccessControlContext acc) {
 485         SecurityManager sm = System.getSecurityManager();
 486         if (sm == null) {
 487             return null;
 488         }
 489 
 490         String method = request.method();
 491         HttpHeaders userHeaders = request.getUserHeaders();
 492         URI u = getURIForSecurityCheck();
 493         URLPermission p = Utils.getPermission(u, method, userHeaders.map());
 494 
 495         try {
 496             assert acc != null;
 497             sm.checkPermission(p, acc);
 498             permissions.add(getSocketPermissionFor(u));
 499         } catch (SecurityException e) {
 500             return e;
 501         }
 502         ProxySelector ps = client.proxy().orElse(null);
 503         if (ps != null) {
 504             InetSocketAddress proxy = (InetSocketAddress)
 505                     ps.select(u).get(0).address(); // TODO: check this
 506             // may need additional check
 507             if (!method.equals("CONNECT")) {
 508                 // a direct http proxy. Need to check access to proxy
 509                 try {
 510                     u = new URI("socket", null, proxy.getHostString(),
 511                         proxy.getPort(), null, null, null);
 512                 } catch (URISyntaxException e) {
 513                     throw new InternalError(e); // shouldn't happen
 514                 }
 515                 p = new URLPermission(u.toString(), "CONNECT");
 516                 try {
 517                     sm.checkPermission(p, acc);
 518                 } catch (SecurityException e) {
 519                     permissions.clear();
 520                     return e;
 521                 }
 522                 String sockperm = proxy.getHostString() +
 523                         ":" + Integer.toString(proxy.getPort());
 524 
 525                 permissions.add(new SocketPermission(sockperm, "connect,resolve"));
 526             }
 527         }
 528         return null;
 529     }
 530 
 531     HttpClient.Redirect followRedirects() {
 532         return client.followRedirects();
 533     }
 534 
 535     HttpClient.Version version() {
 536         return multi.version();
 537     }
 538 
 539     private static SocketPermission getSocketPermissionFor(URI url) {
 540         if (System.getSecurityManager() == null) {
 541             return null;
 542         }
 543 
 544         StringBuilder sb = new StringBuilder();
 545         String host = url.getHost();
 546         sb.append(host);
 547         int port = url.getPort();
 548         if (port == -1) {
 549             String scheme = url.getScheme();
 550             if ("http".equals(scheme)) {
 551                 sb.append(":80");
 552             } else { // scheme must be https
 553                 sb.append(":443");
 554             }
 555         } else {
 556             sb.append(':')
 557               .append(Integer.toString(port));
 558         }
 559         String target = sb.toString();
 560         return new SocketPermission(target, "connect");
 561     }
 562 
 563     AccessControlContext getAccessControlContext() {
 564         return acc;
 565     }
 566 }