--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java 2017-11-30 04:03:51.172369842 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java 2017-11-30 04:03:50.863342827 -0800 @@ -26,26 +26,23 @@ package jdk.incubator.http; import java.io.IOException; -import java.io.UncheckedIOException; +import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.net.ProxySelector; -import java.net.SocketPermission; import java.net.URI; import java.net.URISyntaxException; import java.net.URLPermission; import java.security.AccessControlContext; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; -import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import jdk.incubator.http.internal.common.MinimalFuture; import jdk.incubator.http.internal.common.Utils; import jdk.incubator.http.internal.common.Log; +import static jdk.incubator.http.internal.common.Utils.permissionForProxy; + /** * One request/response exchange (handles 100/101 intermediate response also). * depth field used to track number of times a new request is being sent @@ -61,19 +58,22 @@ */ final class Exchange { + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); + final HttpRequestImpl request; final HttpClientImpl client; volatile ExchangeImpl exchImpl; + volatile CompletableFuture> exchangeCF; // used to record possible cancellation raised before the exchImpl // has been established. private volatile IOException failed; - final List permissions = new LinkedList<>(); final AccessControlContext acc; final MultiExchange multi; final Executor parentExecutor; - final HttpRequest.BodyProcessor requestProcessor; boolean upgrading; // to HTTP/2 final PushGroup pushGroup; + final String dbgTag; Exchange(HttpRequestImpl request, MultiExchange multi) { this.request = request; @@ -82,8 +82,8 @@ this.multi = multi; this.acc = multi.acc; this.parentExecutor = multi.executor; - this.requestProcessor = request.requestProcessor; this.pushGroup = multi.pushGroup; + this.dbgTag = "Exchange"; } /* If different AccessControlContext to be used */ @@ -97,8 +97,8 @@ this.client = multi.client(); this.multi = multi; this.parentExecutor = multi.executor; - this.requestProcessor = request.requestProcessor; this.pushGroup = multi.pushGroup; + this.dbgTag = "Exchange"; } PushGroup getPushGroup() { @@ -117,18 +117,35 @@ return client; } - public Response response() throws IOException, InterruptedException { - return responseImpl(null); - } - public T readBody(HttpResponse.BodyHandler responseHandler) throws IOException { + public CompletableFuture readBodyAsync(HttpResponse.BodyHandler handler) { // The connection will not be returned to the pool in the case of WebSocket - return exchImpl.readBody(responseHandler, !request.isWebSocket()); + return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor) + .whenComplete((r,t) -> exchImpl.completed()); } - public CompletableFuture readBodyAsync(HttpResponse.BodyHandler handler) { - // The connection will not be returned to the pool in the case of WebSocket - return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor); + /** + * Called after a redirect or similar kind of retry where a body might + * be sent but we don't want it. Should send a RESET in h2. For http/1.1 + * we can consume small quantity of data, or close the connection in + * other cases. + */ + public CompletableFuture ignoreBody() { + return exchImpl.ignoreBody(); + } + + /** + * Called when a new exchange is created to replace this exchange. + * At this point it is guaranteed that readBody/readBodyAsync will + * not be called. + */ + public void released() { + ExchangeImpl impl = exchImpl; + if (impl != null) impl.released(); + // Don't set exchImpl to null here. We need to keep + // it alive until it's replaced by a Stream in wrapForUpgrade. + // Setting it to null here might get it GC'ed too early, because + // the Http1Response is now only weakly referenced by the Selector. } public void cancel() { @@ -153,19 +170,15 @@ ExchangeImpl impl = exchImpl; if (impl != null) { // propagate the exception to the impl + debug.log(Level.DEBUG, "Cancelling exchImpl: %s", exchImpl); impl.cancel(cause); } else { - try { - // no impl yet. record the exception - failed = cause; - // now call checkCancelled to recheck the impl. - // if the failed state is set and the impl is not null, reset - // the failed state and propagate the exception to the impl. - checkCancelled(false); - } catch (IOException x) { - // should not happen - we passed 'false' above - throw new UncheckedIOException(x); - } + // no impl yet. record the exception + failed = cause; + // now call checkCancelled to recheck the impl. + // if the failed state is set and the impl is not null, reset + // the failed state and propagate the exception to the impl. + checkCancelled(); } } @@ -175,37 +188,34 @@ // will persist until the exception can be raised and the failed state // can be cleared. // Takes care of possible race conditions. - private void checkCancelled(boolean throwIfNoImpl) throws IOException { + private void checkCancelled() { ExchangeImpl impl = null; IOException cause = null; + CompletableFuture> cf = null; if (failed != null) { synchronized(this) { cause = failed; impl = exchImpl; - if (throwIfNoImpl || impl != null) { - // The exception will be raised by one of the two methods - // below: reset the failed state. - failed = null; - } + cf = exchangeCF; } } if (cause == null) return; if (impl != null) { // The exception is raised by propagating it to the impl. + debug.log(Level.DEBUG, "Cancelling exchImpl: %s", impl); impl.cancel(cause); - } else if (throwIfNoImpl) { - // The exception is raised by throwing it immediately - throw cause; + failed = null; } else { Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set." + "\n\tCan''t cancel yet with {2}", request.uri(), - request.duration() == null ? -1 : + request.timeout().isPresent() ? // calling duration.toMillis() can throw an exception. // this is just debugging, we don't care if it overflows. - (request.duration().getSeconds() * 1000 - + request.duration().getNano() / 1000000), + (request.timeout().get().getSeconds() * 1000 + + request.timeout().get().getNano() / 1000000) : -1, cause); + if (cf != null) cf.completeExceptionally(cause); } } @@ -214,92 +224,51 @@ request.setH2Upgrade(client.client2()); } - static final SocketPermission[] SOCKET_ARRAY = new SocketPermission[0]; - - Response responseImpl(HttpConnection connection) - throws IOException, InterruptedException - { - SecurityException e = securityCheck(acc); - if (e != null) { - throw e; - } - - if (permissions.size() > 0) { - try { - return AccessController.doPrivileged( - (PrivilegedExceptionAction)() -> - responseImpl0(connection), - null, - permissions.toArray(SOCKET_ARRAY)); - } catch (Throwable ee) { - if (ee instanceof PrivilegedActionException) { - ee = ee.getCause(); - } - if (ee instanceof IOException) { - throw (IOException) ee; - } else { - throw new RuntimeException(ee); // TODO: fix - } - } - } else { - return responseImpl0(connection); - } + synchronized IOException getCancelCause() { + return failed; } // get/set the exchange impl, solving race condition issues with // potential concurrent calls to cancel() or cancel(IOException) - private void establishExchange(HttpConnection connection) - throws IOException, InterruptedException - { + private CompletableFuture> + establishExchange(HttpConnection connection) { + if (debug.isLoggable(Level.DEBUG)) { + debug.log(Level.DEBUG, + "establishing exchange for %s,%n\t proxy=%s", + request, + request.proxy()); + } // check if we have been cancelled first. - checkCancelled(true); - // not yet cancelled: create/get a new impl - exchImpl = ExchangeImpl.get(this, connection); - // recheck for cancelled, in case of race conditions - checkCancelled(true); - // now we're good to go. because exchImpl is no longer null - // cancel() will be able to propagate directly to the impl - // after this point. - } - - private Response responseImpl0(HttpConnection connection) - throws IOException, InterruptedException - { - establishExchange(connection); - if (request.expectContinue()) { - Log.logTrace("Sending Expect: 100-Continue"); - request.addSystemHeader("Expect", "100-Continue"); - exchImpl.sendHeadersOnly(); + Throwable t = getCancelCause(); + checkCancelled(); + if (t != null) { + return MinimalFuture.failedFuture(t); + } - Log.logTrace("Waiting for 407-Expectation-Failed or 100-Continue"); - Response resp = exchImpl.getResponse(); - HttpResponseImpl.logResponse(resp); - int rcode = resp.statusCode(); - if (rcode != 100) { - Log.logTrace("Expectation failed: Received {0}", - rcode); - if (upgrading && rcode == 101) { - throw new IOException( - "Unable to handle 101 while waiting for 100-Continue"); - } - return resp; + CompletableFuture> cf, res; + cf = ExchangeImpl.get(this, connection); + // We should probably use a VarHandle to get/set exchangeCF + // instead - as we need CAS semantics. + synchronized (this) { exchangeCF = cf; }; + res = cf.whenComplete((r,x) -> { + synchronized(Exchange.this) { + if (exchangeCF == cf) exchangeCF = null; } - - Log.logTrace("Received 100-Continue: sending body"); - exchImpl.sendBody(); - - Log.logTrace("Body sent: waiting for response"); - resp = exchImpl.getResponse(); - HttpResponseImpl.logResponse(resp); - - return checkForUpgrade(resp, exchImpl); - } else { - exchImpl.sendHeadersOnly(); - exchImpl.sendBody(); - Response resp = exchImpl.getResponse(); - HttpResponseImpl.logResponse(resp); - return checkForUpgrade(resp, exchImpl); - } + }); + checkCancelled(); + return res.thenCompose((eimpl) -> { + // recheck for cancelled, in case of race conditions + exchImpl = eimpl; + IOException tt = getCancelCause(); + checkCancelled(); + if (tt != null) { + return MinimalFuture.failedFuture(tt); + } else { + // Now we're good to go. Because exchImpl is no longer + // null cancel() will be able to propagate directly to + // the impl after this point ( if needed ). + return MinimalFuture.completedFuture(eimpl); + } }); } // Completed HttpResponse will be null if response succeeded @@ -310,35 +279,23 @@ } CompletableFuture responseAsyncImpl(HttpConnection connection) { - SecurityException e = securityCheck(acc); + SecurityException e = checkPermissions(); if (e != null) { return MinimalFuture.failedFuture(e); - } - if (permissions.size() > 0) { - return AccessController.doPrivileged( - (PrivilegedAction>)() -> - responseAsyncImpl0(connection), - null, - permissions.toArray(SOCKET_ARRAY)); } else { return responseAsyncImpl0(connection); } } CompletableFuture responseAsyncImpl0(HttpConnection connection) { - try { - establishExchange(connection); - } catch (IOException | InterruptedException e) { - return MinimalFuture.failedFuture(e); - } if (request.expectContinue()) { request.addSystemHeader("Expect", "100-Continue"); Log.logTrace("Sending Expect: 100-Continue"); - return exchImpl - .sendHeadersAsync() + return establishExchange(connection) + .thenCompose((ex) -> ex.sendHeadersAsync()) .thenCompose(v -> exchImpl.getResponseAsync(parentExecutor)) .thenCompose((Response r1) -> { - HttpResponseImpl.logResponse(r1); + Log.logResponse(r1::toString); int rcode = r1.statusCode(); if (rcode == 100) { Log.logTrace("Received 100-Continue: sending body"); @@ -361,8 +318,8 @@ } }); } else { - CompletableFuture cf = exchImpl - .sendHeadersAsync() + CompletableFuture cf = establishExchange(connection) + .thenCompose((ex) -> ex.sendHeadersAsync()) .thenCompose(ExchangeImpl::sendBodyAsync) .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor)); cf = wrapForUpgrade(cf); @@ -381,15 +338,15 @@ private CompletableFuture wrapForLog(CompletableFuture cf) { if (Log.requests()) { return cf.thenApply(response -> { - HttpResponseImpl.logResponse(response); + Log.logResponse(response::toString); return response; }); } return cf; } - HttpResponse.BodyProcessor ignoreBody(int status, HttpHeaders hdrs) { - return HttpResponse.BodyProcessor.discard((T)null); + HttpResponse.BodySubscriber ignoreBody(int status, HttpHeaders hdrs) { + return HttpResponse.BodySubscriber.discard((T)null); } // if this response was received in reply to an upgrade @@ -406,50 +363,59 @@ // check for 101 switching protocols // 101 responses are not supposed to contain a body. // => should we fail if there is one? + debug.log(Level.DEBUG, "Upgrading async %s" + e.connection()); return e.readBodyAsync(this::ignoreBody, false, parentExecutor) - .thenCompose((T v) -> // v is null - Http2Connection.createAsync(e.connection(), + .thenCompose((T v) -> {// v is null + debug.log(Level.DEBUG, "Ignored body"); + // we pass e::getBuffer to allow the ByteBuffers to accumulate + // while we build the Http2Connection + return Http2Connection.createAsync(e.connection(), client.client2(), - this, e.getBuffer()) + this, e::drainLeftOverBytes) .thenCompose((Http2Connection c) -> { c.putConnection(); Stream s = c.getStream(1); - exchImpl = s; + if (s == null) { + // s can be null if an exception occurred + // asynchronously while sending the preface. + Throwable t = c.getRecordedCause(); + if (t != null) { + return MinimalFuture.failedFuture( + new IOException("Can't get stream 1: " + t, t)); + } + } + exchImpl.released(); + Throwable t; + // There's a race condition window where an external + // thread (SelectorManager) might complete the + // exchange in timeout at the same time where we're + // trying to switch the exchange impl. + // 'failed' will be reset to null after + // exchImpl.cancel() has completed, so either we + // will observe failed != null here, or we will + // observe e.getCancelCause() != null, or the + // timeout exception will be routed to 's'. + // Either way, we need to relay it to s. + synchronized (this) { + exchImpl = s; + t = failed; + } + // Check whether the HTTP/1.1 was cancelled. + if (t == null) t = e.getCancelCause(); + // if HTTP/1.1 exchange was timed out, don't + // try to go further. + if (t instanceof HttpTimeoutException) { + s.cancelImpl(t); + return MinimalFuture.failedFuture(t); + } + debug.log(Level.DEBUG, "Getting response async %s" + s); return s.getResponseAsync(null); - }) + });} ); } return MinimalFuture.completedFuture(resp); } - private Response checkForUpgrade(Response resp, - ExchangeImpl ex) - throws IOException, InterruptedException - { - int rcode = resp.statusCode(); - if (upgrading && (rcode == 101)) { - Http1Exchange e = (Http1Exchange) ex; - - // 101 responses are not supposed to contain a body. - // => should we fail if there is one? - // => readBody called here by analogy with - // checkForUpgradeAsync above - e.readBody(this::ignoreBody, false); - - // must get connection from Http1Exchange - Http2Connection h2con = new Http2Connection(e.connection(), - client.client2(), - this, e.getBuffer()); - h2con.putConnection(); - Stream s = h2con.getStream(1); - exchImpl = s; - Response xx = s.getResponse(); - HttpResponseImpl.logResponse(xx); - return xx; - } - return resp; - } - private URI getURIForSecurityCheck() { URI u; String method = request.method(); @@ -476,91 +442,65 @@ } /** - * Do the security check and return any exception. - * Return null if no check needed or passes. - * - * Also adds any generated permissions to the "permissions" list. + * Returns the security permission required for the given details. + * If method is CONNECT, then uri must be of form "scheme://host:port" */ - private SecurityException securityCheck(AccessControlContext acc) { + private static URLPermission permissionForServer(URI uri, + String method, + Map> headers) { + if (method.equals("CONNECT")) { + return new URLPermission(uri.toString(), "CONNECT"); + } else { + return Utils.permissionForServer(uri, method, headers.keySet().stream()); + } + } + + /** + * Performs the necessary security permission checks required to retrieve + * the response. Returns a security exception representing the denied + * permission, or null if all checks pass or there is no security manager. + */ + private SecurityException checkPermissions() { + String method = request.method(); SecurityManager sm = System.getSecurityManager(); - if (sm == null) { + if (sm == null || method.equals("CONNECT")) { + // tunneling will have a null acc, which is fine. The proxy + // permission check will have already been preformed. return null; } - String method = request.method(); HttpHeaders userHeaders = request.getUserHeaders(); URI u = getURIForSecurityCheck(); - URLPermission p = Utils.getPermission(u, method, userHeaders.map()); + URLPermission p = permissionForServer(u, method, userHeaders.map()); try { assert acc != null; sm.checkPermission(p, acc); - permissions.add(getSocketPermissionFor(u)); } catch (SecurityException e) { return e; } - ProxySelector ps = client.proxy().orElse(null); + ProxySelector ps = client.proxySelector(); if (ps != null) { - InetSocketAddress proxy = (InetSocketAddress) - ps.select(u).get(0).address(); // TODO: check this - // may need additional check if (!method.equals("CONNECT")) { - // a direct http proxy. Need to check access to proxy - try { - u = new URI("socket", null, proxy.getHostString(), - proxy.getPort(), null, null, null); - } catch (URISyntaxException e) { - throw new InternalError(e); // shouldn't happen - } - p = new URLPermission(u.toString(), "CONNECT"); - try { - sm.checkPermission(p, acc); - } catch (SecurityException e) { - permissions.clear(); - return e; + // a non-tunneling HTTP proxy. Need to check access + URLPermission proxyPerm = permissionForProxy(request.proxy()); + if (proxyPerm != null) { + try { + sm.checkPermission(proxyPerm, acc); + } catch (SecurityException e) { + return e; + } } - String sockperm = proxy.getHostString() + - ":" + Integer.toString(proxy.getPort()); - - permissions.add(new SocketPermission(sockperm, "connect,resolve")); } } return null; } - HttpClient.Redirect followRedirects() { - return client.followRedirects(); - } - HttpClient.Version version() { return multi.version(); } - private static SocketPermission getSocketPermissionFor(URI url) { - if (System.getSecurityManager() == null) { - return null; - } - - StringBuilder sb = new StringBuilder(); - String host = url.getHost(); - sb.append(host); - int port = url.getPort(); - if (port == -1) { - String scheme = url.getScheme(); - if ("http".equals(scheme)) { - sb.append(":80"); - } else { // scheme must be https - sb.append(":443"); - } - } else { - sb.append(':') - .append(Integer.toString(port)); - } - String target = sb.toString(); - return new SocketPermission(target, "connect"); - } - - AccessControlContext getAccessControlContext() { - return acc; + String dbgString() { + return dbgTag; } }