< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java

Print this page

        

@@ -24,30 +24,27 @@
  */
 
 package jdk.incubator.http;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
+import java.lang.System.Logger.Level;
 import java.net.InetSocketAddress;
 import java.net.ProxySelector;
-import java.net.SocketPermission;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLPermission;
 import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Utils;
 import jdk.incubator.http.internal.common.Log;
 
+import static jdk.incubator.http.internal.common.Utils.permissionForProxy;
+
 /**
  * One request/response exchange (handles 100/101 intermediate response also).
  * depth field used to track number of times a new request is being sent
  * for a given API request. If limit exceeded exception is thrown.
  *

@@ -59,33 +56,36 @@
  *    (CONNECT proxying uses its own Exchange, so check done there)
  *
  */
 final class Exchange<T> {
 
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+
     final HttpRequestImpl request;
     final HttpClientImpl client;
     volatile ExchangeImpl<T> exchImpl;
+    volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
     // used to record possible cancellation raised before the exchImpl
     // has been established.
     private volatile IOException failed;
-    final List<SocketPermission> permissions = new LinkedList<>();
     final AccessControlContext acc;
     final MultiExchange<?,T> multi;
     final Executor parentExecutor;
-    final HttpRequest.BodyProcessor requestProcessor;
     boolean upgrading; // to HTTP/2
     final PushGroup<?,T> pushGroup;
+    final String dbgTag;
 
     Exchange(HttpRequestImpl request, MultiExchange<?,T> multi) {
         this.request = request;
         this.upgrading = false;
         this.client = multi.client();
         this.multi = multi;
         this.acc = multi.acc;
         this.parentExecutor = multi.executor;
-        this.requestProcessor = request.requestProcessor;
         this.pushGroup = multi.pushGroup;
+        this.dbgTag = "Exchange";
     }
 
     /* If different AccessControlContext to be used  */
     Exchange(HttpRequestImpl request,
              MultiExchange<?,T> multi,

@@ -95,12 +95,12 @@
         this.acc = acc;
         this.upgrading = false;
         this.client = multi.client();
         this.multi = multi;
         this.parentExecutor = multi.executor;
-        this.requestProcessor = request.requestProcessor;
         this.pushGroup = multi.pushGroup;
+        this.dbgTag = "Exchange";
     }
 
     PushGroup<?,T> getPushGroup() {
         return pushGroup;
     }

@@ -115,22 +115,39 @@
 
     HttpClientImpl client() {
         return client;
     }
 
-    public Response response() throws IOException, InterruptedException {
-        return responseImpl(null);
-    }
 
-    public T readBody(HttpResponse.BodyHandler<T> responseHandler) throws IOException {
+    public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
         // The connection will not be returned to the pool in the case of WebSocket
-        return exchImpl.readBody(responseHandler, !request.isWebSocket());
+        return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor)
+                .whenComplete((r,t) -> exchImpl.completed());
     }
 
-    public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
-        // The connection will not be returned to the pool in the case of WebSocket
-        return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor);
+    /**
+     * Called after a redirect or similar kind of retry where a body might
+     * be sent but we don't want it. Should send a RESET in h2. For http/1.1
+     * we can consume small quantity of data, or close the connection in
+     * other cases.
+     */
+    public CompletableFuture<Void> ignoreBody() {
+        return exchImpl.ignoreBody();
+    }
+
+    /**
+     * Called when a new exchange is created to replace this exchange.
+     * At this point it is guaranteed that readBody/readBodyAsync will
+     * not be called.
+     */
+    public void released() {
+        ExchangeImpl<?> impl = exchImpl;
+        if (impl != null) impl.released();
+        // Don't set exchImpl to null here. We need to keep
+        // it alive until it's replaced by a Stream in wrapForUpgrade.
+        // Setting it to null here might get it GC'ed too early, because
+        // the Http1Response is now only weakly referenced by the Selector.
     }
 
     public void cancel() {
         // cancel can be called concurrently before or at the same time
         // that the exchange impl is being established.

@@ -151,196 +168,136 @@
         // Otherwise record it so that it can be propagated once the
         // exchange impl has been established.
         ExchangeImpl<?> impl = exchImpl;
         if (impl != null) {
             // propagate the exception to the impl
+            debug.log(Level.DEBUG, "Cancelling exchImpl: %s", exchImpl);
             impl.cancel(cause);
         } else {
-            try {
                 // no impl yet. record the exception
                 failed = cause;
                 // now call checkCancelled to recheck the impl.
                 // if the failed state is set and the impl is not null, reset
                 // the failed state and propagate the exception to the impl.
-                checkCancelled(false);
-            } catch (IOException x) {
-                // should not happen - we passed 'false' above
-                throw new UncheckedIOException(x);
-            }
+            checkCancelled();
         }
     }
 
     // This method will raise an exception if one was reported and if
     // it is possible to do so. If the exception can be raised, then
     // the failed state will be reset. Otherwise, the failed state
     // will persist until the exception can be raised and the failed state
     // can be cleared.
     // Takes care of possible race conditions.
-    private void checkCancelled(boolean throwIfNoImpl) throws IOException {
+    private void checkCancelled() {
         ExchangeImpl<?> impl = null;
         IOException cause = null;
+        CompletableFuture<? extends ExchangeImpl<T>> cf = null;
         if (failed != null) {
             synchronized(this) {
                 cause = failed;
                 impl = exchImpl;
-                if (throwIfNoImpl || impl != null) {
-                    // The exception will be raised by one of the two methods
-                    // below: reset the failed state.
-                    failed = null;
-                }
+                cf = exchangeCF;
             }
         }
         if (cause == null) return;
         if (impl != null) {
             // The exception is raised by propagating it to the impl.
+            debug.log(Level.DEBUG, "Cancelling exchImpl: %s", impl);
             impl.cancel(cause);
-        } else if (throwIfNoImpl) {
-            // The exception is raised by throwing it immediately
-            throw cause;
+            failed = null;
         } else {
             Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set."
                          + "\n\tCan''t cancel yet with {2}",
                          request.uri(),
-                         request.duration() == null ? -1 :
+                         request.timeout().isPresent() ?
                          // calling duration.toMillis() can throw an exception.
                          // this is just debugging, we don't care if it overflows.
-                         (request.duration().getSeconds() * 1000
-                          + request.duration().getNano() / 1000000),
+                         (request.timeout().get().getSeconds() * 1000
+                          + request.timeout().get().getNano() / 1000000) : -1,
                          cause);
+            if (cf != null) cf.completeExceptionally(cause);
         }
     }
 
     public void h2Upgrade() {
         upgrading = true;
         request.setH2Upgrade(client.client2());
     }
 
-    static final SocketPermission[] SOCKET_ARRAY = new SocketPermission[0];
-
-    Response responseImpl(HttpConnection connection)
-        throws IOException, InterruptedException
-    {
-        SecurityException e = securityCheck(acc);
-        if (e != null) {
-            throw e;
-        }
-
-        if (permissions.size() > 0) {
-            try {
-                return AccessController.doPrivileged(
-                        (PrivilegedExceptionAction<Response>)() ->
-                             responseImpl0(connection),
-                        null,
-                        permissions.toArray(SOCKET_ARRAY));
-            } catch (Throwable ee) {
-                if (ee instanceof PrivilegedActionException) {
-                    ee = ee.getCause();
-                }
-                if (ee instanceof IOException) {
-                    throw (IOException) ee;
-                } else {
-                    throw new RuntimeException(ee); // TODO: fix
-                }
-            }
-        } else {
-            return responseImpl0(connection);
-        }
+    synchronized IOException getCancelCause() {
+        return failed;
     }
 
     // get/set the exchange impl, solving race condition issues with
     // potential concurrent calls to cancel() or cancel(IOException)
-    private void establishExchange(HttpConnection connection)
-        throws IOException, InterruptedException
-    {
+    private CompletableFuture<? extends ExchangeImpl<T>>
+    establishExchange(HttpConnection connection) {
+        if (debug.isLoggable(Level.DEBUG)) {
+            debug.log(Level.DEBUG,
+                    "establishing exchange for %s,%n\t proxy=%s",
+                    request,
+                    request.proxy());
+        }
         // check if we have been cancelled first.
-        checkCancelled(true);
-        // not yet cancelled: create/get a new impl
-        exchImpl = ExchangeImpl.get(this, connection);
-        // recheck for cancelled, in case of race conditions
-        checkCancelled(true);
-        // now we're good to go. because exchImpl is no longer null
-        // cancel() will be able to propagate directly to the impl
-        // after this point.
+        Throwable t = getCancelCause();
+        checkCancelled();
+        if (t != null) {
+            return MinimalFuture.failedFuture(t);
     }
 
-    private Response responseImpl0(HttpConnection connection)
-        throws IOException, InterruptedException
-    {
-        establishExchange(connection);
-        if (request.expectContinue()) {
-            Log.logTrace("Sending Expect: 100-Continue");
-            request.addSystemHeader("Expect", "100-Continue");
-            exchImpl.sendHeadersOnly();
-
-            Log.logTrace("Waiting for 407-Expectation-Failed or 100-Continue");
-            Response resp = exchImpl.getResponse();
-            HttpResponseImpl.logResponse(resp);
-            int rcode = resp.statusCode();
-            if (rcode != 100) {
-                Log.logTrace("Expectation failed: Received {0}",
-                             rcode);
-                if (upgrading && rcode == 101) {
-                    throw new IOException(
-                        "Unable to handle 101 while waiting for 100-Continue");
-                }
-                return resp;
-            }
-
-            Log.logTrace("Received 100-Continue: sending body");
-            exchImpl.sendBody();
-
-            Log.logTrace("Body sent: waiting for response");
-            resp = exchImpl.getResponse();
-            HttpResponseImpl.logResponse(resp);
-
-            return checkForUpgrade(resp, exchImpl);
-        } else {
-            exchImpl.sendHeadersOnly();
-            exchImpl.sendBody();
-            Response resp = exchImpl.getResponse();
-            HttpResponseImpl.logResponse(resp);
-            return checkForUpgrade(resp, exchImpl);
+        CompletableFuture<? extends ExchangeImpl<T>> cf, res;
+        cf = ExchangeImpl.get(this, connection);
+        // We should probably use a VarHandle to get/set exchangeCF
+        // instead - as we need CAS semantics.
+        synchronized (this) { exchangeCF = cf; };
+        res = cf.whenComplete((r,x) -> {
+            synchronized(Exchange.this) {
+                if (exchangeCF == cf) exchangeCF = null;
         }
+        });
+        checkCancelled();
+        return res.thenCompose((eimpl) -> {
+                    // recheck for cancelled, in case of race conditions
+                    exchImpl = eimpl;
+                    IOException tt = getCancelCause();
+                    checkCancelled();
+                    if (tt != null) {
+                        return MinimalFuture.failedFuture(tt);
+                    } else {
+                        // Now we're good to go. Because exchImpl is no longer
+                        // null cancel() will be able to propagate directly to
+                        // the impl after this point ( if needed ).
+                        return MinimalFuture.completedFuture(eimpl);
+                    } });
     }
 
     // Completed HttpResponse will be null if response succeeded
     // will be a non null responseAsync if expect continue returns an error
 
     public CompletableFuture<Response> responseAsync() {
         return responseAsyncImpl(null);
     }
 
     CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
-        SecurityException e = securityCheck(acc);
+        SecurityException e = checkPermissions();
         if (e != null) {
             return MinimalFuture.failedFuture(e);
-        }
-        if (permissions.size() > 0) {
-            return AccessController.doPrivileged(
-                    (PrivilegedAction<CompletableFuture<Response>>)() ->
-                        responseAsyncImpl0(connection),
-                    null,
-                    permissions.toArray(SOCKET_ARRAY));
         } else {
             return responseAsyncImpl0(connection);
         }
     }
 
     CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
-        try {
-            establishExchange(connection);
-        } catch (IOException | InterruptedException e) {
-            return MinimalFuture.failedFuture(e);
-        }
         if (request.expectContinue()) {
             request.addSystemHeader("Expect", "100-Continue");
             Log.logTrace("Sending Expect: 100-Continue");
-            return exchImpl
-                    .sendHeadersAsync()
+            return establishExchange(connection)
+                    .thenCompose((ex) -> ex.sendHeadersAsync())
                     .thenCompose(v -> exchImpl.getResponseAsync(parentExecutor))
                     .thenCompose((Response r1) -> {
-                        HttpResponseImpl.logResponse(r1);
+                        Log.logResponse(r1::toString);
                         int rcode = r1.statusCode();
                         if (rcode == 100) {
                             Log.logTrace("Received 100-Continue: sending body");
                             CompletableFuture<Response> cf =
                                     exchImpl.sendBodyAsync()

@@ -359,12 +316,12 @@
                             return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
                                   .thenApply(v ->  r1);
                         }
                     });
         } else {
-            CompletableFuture<Response> cf = exchImpl
-                    .sendHeadersAsync()
+            CompletableFuture<Response> cf = establishExchange(connection)
+                    .thenCompose((ex) -> ex.sendHeadersAsync())
                     .thenCompose(ExchangeImpl::sendBodyAsync)
                     .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
             cf = wrapForUpgrade(cf);
             cf = wrapForLog(cf);
             return cf;

@@ -379,19 +336,19 @@
     }
 
     private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
         if (Log.requests()) {
             return cf.thenApply(response -> {
-                HttpResponseImpl.logResponse(response);
+                Log.logResponse(response::toString);
                 return response;
             });
         }
         return cf;
     }
 
-    HttpResponse.BodyProcessor<T> ignoreBody(int status, HttpHeaders hdrs) {
-        return HttpResponse.BodyProcessor.discard((T)null);
+    HttpResponse.BodySubscriber<T> ignoreBody(int status, HttpHeaders hdrs) {
+        return HttpResponse.BodySubscriber.discard((T)null);
     }
 
     // if this response was received in reply to an upgrade
     // then create the Http2Connection from the HttpConnection
     // initialize it and wait for the real response on a newly created Stream

@@ -404,54 +361,63 @@
         if (upgrading && (rcode == 101)) {
             Http1Exchange<T> e = (Http1Exchange<T>)ex;
             // check for 101 switching protocols
             // 101 responses are not supposed to contain a body.
             //    => should we fail if there is one?
+            debug.log(Level.DEBUG, "Upgrading async %s" + e.connection());
             return e.readBodyAsync(this::ignoreBody, false, parentExecutor)
-                .thenCompose((T v) -> // v is null
-                     Http2Connection.createAsync(e.connection(),
+                .thenCompose((T v) -> {// v is null
+                    debug.log(Level.DEBUG, "Ignored body");
+                    // we pass e::getBuffer to allow the ByteBuffers to accumulate
+                    // while we build the Http2Connection
+                    return Http2Connection.createAsync(e.connection(),
                                                  client.client2(),
-                                                 this, e.getBuffer())
+                                                 this, e::drainLeftOverBytes)
                         .thenCompose((Http2Connection c) -> {
                             c.putConnection();
                             Stream<T> s = c.getStream(1);
+                            if (s == null) {
+                                // s can be null if an exception occurred
+                                // asynchronously while sending the preface.
+                                Throwable t = c.getRecordedCause();
+                                if (t != null) {
+                                    return MinimalFuture.failedFuture(
+                                            new IOException("Can't get stream 1: " + t, t));
+                                }
+                            }
+                            exchImpl.released();
+                            Throwable t;
+                            // There's a race condition window where an external
+                            // thread (SelectorManager) might complete the
+                            // exchange in timeout at the same time where we're
+                            // trying to switch the exchange impl.
+                            // 'failed' will be reset to null after
+                            // exchImpl.cancel() has completed, so either we
+                            // will observe failed != null here, or we will
+                            // observe e.getCancelCause() != null, or the
+                            // timeout exception will be routed to 's'.
+                            // Either way, we need to relay it to s.
+                            synchronized (this) {
                             exchImpl = s;
+                                t = failed;
+                            }
+                            // Check whether the HTTP/1.1 was cancelled.
+                            if (t == null) t = e.getCancelCause();
+                            // if HTTP/1.1 exchange was timed out, don't
+                            // try to go further.
+                            if (t instanceof HttpTimeoutException) {
+                                 s.cancelImpl(t);
+                                 return MinimalFuture.failedFuture(t);
+                            }
+                            debug.log(Level.DEBUG, "Getting response async %s" + s);
                             return s.getResponseAsync(null);
-                        })
+                        });}
                 );
         }
         return MinimalFuture.completedFuture(resp);
     }
 
-    private Response checkForUpgrade(Response resp,
-                                             ExchangeImpl<T> ex)
-        throws IOException, InterruptedException
-    {
-        int rcode = resp.statusCode();
-        if (upgrading && (rcode == 101)) {
-            Http1Exchange<T> e = (Http1Exchange<T>) ex;
-
-            // 101 responses are not supposed to contain a body.
-            //    => should we fail if there is one?
-            //    => readBody called here by analogy with
-            //       checkForUpgradeAsync above
-            e.readBody(this::ignoreBody, false);
-
-            // must get connection from Http1Exchange
-            Http2Connection h2con = new Http2Connection(e.connection(),
-                                                        client.client2(),
-                                                        this, e.getBuffer());
-            h2con.putConnection();
-            Stream<T> s = h2con.getStream(1);
-            exchImpl = s;
-            Response xx = s.getResponse();
-            HttpResponseImpl.logResponse(xx);
-            return xx;
-        }
-        return resp;
-    }
-
     private URI getURIForSecurityCheck() {
         URI u;
         String method = request.method();
         InetSocketAddress authority = request.authority();
         URI uri = request.uri();

@@ -474,93 +440,67 @@
         }
         return u;
     }
 
     /**
-     * Do the security check and return any exception.
-     * Return null if no check needed or passes.
-     *
-     * Also adds any generated permissions to the "permissions" list.
+     * Returns the security permission required for the given details.
+     * If method is CONNECT, then uri must be of form "scheme://host:port"
+     */
+    private static URLPermission permissionForServer(URI uri,
+                                                     String method,
+                                                     Map<String, List<String>> headers) {
+        if (method.equals("CONNECT")) {
+            return new URLPermission(uri.toString(), "CONNECT");
+        } else {
+            return Utils.permissionForServer(uri, method, headers.keySet().stream());
+        }
+    }
+
+    /**
+     * Performs the necessary security permission checks required to retrieve
+     * the response. Returns a security exception representing the denied
+     * permission, or null if all checks pass or there is no security manager.
      */
-    private SecurityException securityCheck(AccessControlContext acc) {
+    private SecurityException checkPermissions() {
+        String method = request.method();
         SecurityManager sm = System.getSecurityManager();
-        if (sm == null) {
+        if (sm == null || method.equals("CONNECT")) {
+            // tunneling will have a null acc, which is fine. The proxy
+            // permission check will have already been preformed.
             return null;
         }
 
-        String method = request.method();
         HttpHeaders userHeaders = request.getUserHeaders();
         URI u = getURIForSecurityCheck();
-        URLPermission p = Utils.getPermission(u, method, userHeaders.map());
+        URLPermission p = permissionForServer(u, method, userHeaders.map());
 
         try {
             assert acc != null;
             sm.checkPermission(p, acc);
-            permissions.add(getSocketPermissionFor(u));
         } catch (SecurityException e) {
             return e;
         }
-        ProxySelector ps = client.proxy().orElse(null);
+        ProxySelector ps = client.proxySelector();
         if (ps != null) {
-            InetSocketAddress proxy = (InetSocketAddress)
-                    ps.select(u).get(0).address(); // TODO: check this
-            // may need additional check
             if (!method.equals("CONNECT")) {
-                // a direct http proxy. Need to check access to proxy
-                try {
-                    u = new URI("socket", null, proxy.getHostString(),
-                        proxy.getPort(), null, null, null);
-                } catch (URISyntaxException e) {
-                    throw new InternalError(e); // shouldn't happen
-                }
-                p = new URLPermission(u.toString(), "CONNECT");
+                // a non-tunneling HTTP proxy. Need to check access
+                URLPermission proxyPerm = permissionForProxy(request.proxy());
+                if (proxyPerm != null) {
                 try {
-                    sm.checkPermission(p, acc);
+                        sm.checkPermission(proxyPerm, acc);
                 } catch (SecurityException e) {
-                    permissions.clear();
                     return e;
                 }
-                String sockperm = proxy.getHostString() +
-                        ":" + Integer.toString(proxy.getPort());
-
-                permissions.add(new SocketPermission(sockperm, "connect,resolve"));
             }
         }
-        return null;
     }
-
-    HttpClient.Redirect followRedirects() {
-        return client.followRedirects();
+        return null;
     }
 
     HttpClient.Version version() {
         return multi.version();
     }
 
-    private static SocketPermission getSocketPermissionFor(URI url) {
-        if (System.getSecurityManager() == null) {
-            return null;
-        }
-
-        StringBuilder sb = new StringBuilder();
-        String host = url.getHost();
-        sb.append(host);
-        int port = url.getPort();
-        if (port == -1) {
-            String scheme = url.getScheme();
-            if ("http".equals(scheme)) {
-                sb.append(":80");
-            } else { // scheme must be https
-                sb.append(":443");
-            }
-        } else {
-            sb.append(':')
-              .append(Integer.toString(port));
-        }
-        String target = sb.toString();
-        return new SocketPermission(target, "connect");
-    }
-
-    AccessControlContext getAccessControlContext() {
-        return acc;
+    String dbgString() {
+        return dbgTag;
     }
 }
< prev index next >