--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java 2017-11-30 04:04:07.082760726 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java 2017-11-30 04:04:06.901744901 -0800 @@ -25,71 +25,27 @@ package jdk.incubator.http; -import jdk.incubator.http.internal.common.ByteBufferReference; -import jdk.incubator.http.internal.common.MinimalFuture; -import jdk.incubator.http.HttpResponse.BodyHandler; - import java.io.IOException; +import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; -import java.util.function.Supplier; +import jdk.incubator.http.internal.common.FlowTube; +import jdk.incubator.http.internal.common.MinimalFuture; +import static jdk.incubator.http.HttpResponse.BodyHandler.discard; /** * A plain text socket tunnel through a proxy. Uses "CONNECT" but does not * encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy. * Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption. */ -class PlainTunnelingConnection extends HttpConnection implements AsyncConnection { +final class PlainTunnelingConnection extends HttpConnection { final PlainHttpConnection delegate; protected final InetSocketAddress proxyAddr; private volatile boolean connected; - @Override - public CompletableFuture connectAsync() { - return delegate.connectAsync() - .thenCompose((Void v) -> { - HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address); - MultiExchange mconnectExchange = new MultiExchange<>(req, client, this::ignore); - return mconnectExchange.responseAsync() - .thenCompose((HttpResponseImpl resp) -> { - CompletableFuture cf = new MinimalFuture<>(); - if (resp.statusCode() != 200) { - cf.completeExceptionally(new IOException("Tunnel failed")); - } else { - connected = true; - cf.complete(null); - } - return cf; - }); - }); - } - - private HttpResponse.BodyProcessor ignore(int status, HttpHeaders hdrs) { - return HttpResponse.BodyProcessor.discard((Void)null); - } - - @Override - public void connect() throws IOException, InterruptedException { - delegate.connect(); - HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address); - MultiExchange mul = new MultiExchange<>(req, client, BodyHandler.discard(null)); - Exchange connectExchange = new Exchange<>(req, mul); - Response r = connectExchange.responseImpl(delegate); - if (r.statusCode() != 200) { - throw new IOException("Tunnel failed"); - } - connected = true; - } - - @Override - boolean connected() { - return connected; - } - protected PlainTunnelingConnection(InetSocketAddress addr, InetSocketAddress proxy, HttpClientImpl client) { @@ -99,38 +55,59 @@ } @Override - SocketChannel channel() { - return delegate.channel(); - } - - @Override - ConnectionPool.CacheKey cacheKey() { - return new ConnectionPool.CacheKey(null, proxyAddr); + public CompletableFuture connectAsync() { + debug.log(Level.DEBUG, "Connecting plain connection"); + return delegate.connectAsync() + .thenCompose((Void v) -> { + debug.log(Level.DEBUG, "sending HTTP/1.1 CONNECT"); + HttpClientImpl client = client(); + assert client != null; + HttpRequestImpl req = new HttpRequestImpl("CONNECT", address); + MultiExchange mulEx = new MultiExchange<>(null, req, client, discard(null), null); + Exchange connectExchange = new Exchange<>(req, mulEx); + + return connectExchange + .responseAsyncImpl(delegate) + .thenCompose((Response resp) -> { + CompletableFuture cf = new MinimalFuture<>(); + debug.log(Level.DEBUG, "got response: %d", resp.statusCode()); + if (resp.statusCode() != 200) { + cf.completeExceptionally(new IOException( + "Tunnel failed, got: "+ resp.statusCode())); + } else { + // get the initial/remaining bytes + ByteBuffer b = ((Http1Exchange)connectExchange.exchImpl).drainLeftOverBytes(); + int remaining = b.remaining(); + assert remaining == 0: "Unexpected remaining: " + remaining; + connected = true; + cf.complete(null); + } + return cf; + }); + }); } @Override - long write(ByteBuffer[] buffers, int start, int number) throws IOException { - return delegate.write(buffers, start, number); - } + HttpPublisher publisher() { return delegate.publisher(); } @Override - long write(ByteBuffer buffer) throws IOException { - return delegate.write(buffer); + boolean connected() { + return connected; } @Override - public void writeAsync(ByteBufferReference[] buffers) throws IOException { - delegate.writeAsync(buffers); + SocketChannel channel() { + return delegate.channel(); } @Override - public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException { - delegate.writeAsyncUnordered(buffers); + FlowTube getConnectionFlow() { + return delegate.getConnectionFlow(); } @Override - public void flushAsync() throws IOException { - delegate.flushAsync(); + ConnectionPool.CacheKey cacheKey() { + return new ConnectionPool.CacheKey(null, proxyAddr); } @Override @@ -150,16 +127,6 @@ } @Override - CompletableFuture whenReceivingResponse() { - return delegate.whenReceivingResponse(); - } - - @Override - protected ByteBuffer readImpl() throws IOException { - return delegate.readImpl(); - } - - @Override boolean isSecure() { return false; } @@ -169,31 +136,11 @@ return true; } + // Support for WebSocket/RawChannelImpl which unfortunately + // still depends on synchronous read/writes. + // It should be removed when RawChannelImpl moves to using asynchronous APIs. @Override - public void setAsyncCallbacks(Consumer asyncReceiver, - Consumer errorReceiver, - Supplier readBufferSupplier) { - delegate.setAsyncCallbacks(asyncReceiver, errorReceiver, readBufferSupplier); - } - - @Override - public void startReading() { - delegate.startReading(); - } - - @Override - public void stopAsyncReading() { - delegate.stopAsyncReading(); - } - - @Override - public void enableCallback() { - delegate.enableCallback(); - } - - @Override - synchronized void configureMode(Mode mode) throws IOException { - super.configureMode(mode); - delegate.configureMode(mode); + DetachedConnectionChannel detachChannel() { + return delegate.detachChannel(); } }