< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java

Print this page

        

*** 23,138 **** * questions. */ package jdk.incubator.http; - import jdk.incubator.http.internal.common.ByteBufferReference; - import jdk.incubator.http.internal.common.MinimalFuture; - import jdk.incubator.http.HttpResponse.BodyHandler; - import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.concurrent.CompletableFuture; ! import java.util.function.Consumer; ! import java.util.function.Supplier; /** * A plain text socket tunnel through a proxy. Uses "CONNECT" but does not * encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy. * Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption. */ ! class PlainTunnelingConnection extends HttpConnection implements AsyncConnection { final PlainHttpConnection delegate; protected final InetSocketAddress proxyAddr; private volatile boolean connected; @Override public CompletableFuture<Void> connectAsync() { return delegate.connectAsync() .thenCompose((Void v) -> { ! HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address); ! MultiExchange<Void,Void> mconnectExchange = new MultiExchange<>(req, client, this::ignore); ! return mconnectExchange.responseAsync() ! .thenCompose((HttpResponseImpl<Void> resp) -> { CompletableFuture<Void> cf = new MinimalFuture<>(); if (resp.statusCode() != 200) { ! cf.completeExceptionally(new IOException("Tunnel failed")); } else { connected = true; cf.complete(null); } return cf; }); }); } - private HttpResponse.BodyProcessor<Void> ignore(int status, HttpHeaders hdrs) { - return HttpResponse.BodyProcessor.discard((Void)null); - } - @Override ! public void connect() throws IOException, InterruptedException { ! delegate.connect(); ! HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address); ! MultiExchange<Void,Void> mul = new MultiExchange<>(req, client, BodyHandler.<Void>discard(null)); ! Exchange<Void> connectExchange = new Exchange<>(req, mul); ! Response r = connectExchange.responseImpl(delegate); ! if (r.statusCode() != 200) { ! throw new IOException("Tunnel failed"); ! } ! connected = true; ! } @Override boolean connected() { return connected; } - protected PlainTunnelingConnection(InetSocketAddress addr, - InetSocketAddress proxy, - HttpClientImpl client) { - super(addr, client); - this.proxyAddr = proxy; - delegate = new PlainHttpConnection(proxy, client); - } - @Override SocketChannel channel() { return delegate.channel(); } @Override ! ConnectionPool.CacheKey cacheKey() { ! return new ConnectionPool.CacheKey(null, proxyAddr); ! } ! ! @Override ! long write(ByteBuffer[] buffers, int start, int number) throws IOException { ! return delegate.write(buffers, start, number); } @Override ! long write(ByteBuffer buffer) throws IOException { ! return delegate.write(buffer); ! } ! ! @Override ! public void writeAsync(ByteBufferReference[] buffers) throws IOException { ! delegate.writeAsync(buffers); ! } ! ! @Override ! public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException { ! delegate.writeAsyncUnordered(buffers); ! } ! ! @Override ! public void flushAsync() throws IOException { ! delegate.flushAsync(); } @Override public void close() { delegate.close(); --- 23,115 ---- * questions. */ package jdk.incubator.http; import java.io.IOException; + import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.concurrent.CompletableFuture; ! import jdk.incubator.http.internal.common.FlowTube; ! import jdk.incubator.http.internal.common.MinimalFuture; ! import static jdk.incubator.http.HttpResponse.BodyHandler.discard; /** * A plain text socket tunnel through a proxy. Uses "CONNECT" but does not * encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy. * Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption. */ ! final class PlainTunnelingConnection extends HttpConnection { final PlainHttpConnection delegate; protected final InetSocketAddress proxyAddr; private volatile boolean connected; + protected PlainTunnelingConnection(InetSocketAddress addr, + InetSocketAddress proxy, + HttpClientImpl client) { + super(addr, client); + this.proxyAddr = proxy; + delegate = new PlainHttpConnection(proxy, client); + } + @Override public CompletableFuture<Void> connectAsync() { + debug.log(Level.DEBUG, "Connecting plain connection"); return delegate.connectAsync() .thenCompose((Void v) -> { ! debug.log(Level.DEBUG, "sending HTTP/1.1 CONNECT"); ! HttpClientImpl client = client(); ! assert client != null; ! HttpRequestImpl req = new HttpRequestImpl("CONNECT", address); ! MultiExchange<Void,Void> mulEx = new MultiExchange<>(null, req, client, discard(null), null); ! Exchange<Void> connectExchange = new Exchange<>(req, mulEx); ! ! return connectExchange ! .responseAsyncImpl(delegate) ! .thenCompose((Response resp) -> { CompletableFuture<Void> cf = new MinimalFuture<>(); + debug.log(Level.DEBUG, "got response: %d", resp.statusCode()); if (resp.statusCode() != 200) { ! cf.completeExceptionally(new IOException( ! "Tunnel failed, got: "+ resp.statusCode())); } else { + // get the initial/remaining bytes + ByteBuffer b = ((Http1Exchange<?>)connectExchange.exchImpl).drainLeftOverBytes(); + int remaining = b.remaining(); + assert remaining == 0: "Unexpected remaining: " + remaining; connected = true; cf.complete(null); } return cf; }); }); } @Override ! HttpPublisher publisher() { return delegate.publisher(); } @Override boolean connected() { return connected; } @Override SocketChannel channel() { return delegate.channel(); } @Override ! FlowTube getConnectionFlow() { ! return delegate.getConnectionFlow(); } @Override ! ConnectionPool.CacheKey cacheKey() { ! return new ConnectionPool.CacheKey(null, proxyAddr); } @Override public void close() { delegate.close();
*** 148,199 **** void shutdownOutput() throws IOException { delegate.shutdownOutput(); } @Override - CompletableFuture<Void> whenReceivingResponse() { - return delegate.whenReceivingResponse(); - } - - @Override - protected ByteBuffer readImpl() throws IOException { - return delegate.readImpl(); - } - - @Override boolean isSecure() { return false; } @Override boolean isProxied() { return true; } @Override ! public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver, ! Consumer<Throwable> errorReceiver, ! Supplier<ByteBufferReference> readBufferSupplier) { ! delegate.setAsyncCallbacks(asyncReceiver, errorReceiver, readBufferSupplier); ! } ! ! @Override ! public void startReading() { ! delegate.startReading(); ! } ! ! @Override ! public void stopAsyncReading() { ! delegate.stopAsyncReading(); ! } ! ! @Override ! public void enableCallback() { ! delegate.enableCallback(); ! } ! ! @Override ! synchronized void configureMode(Mode mode) throws IOException { ! super.configureMode(mode); ! delegate.configureMode(mode); } } --- 125,146 ---- void shutdownOutput() throws IOException { delegate.shutdownOutput(); } @Override boolean isSecure() { return false; } @Override boolean isProxied() { return true; } + // Support for WebSocket/RawChannelImpl which unfortunately + // still depends on synchronous read/writes. + // It should be removed when RawChannelImpl moves to using asynchronous APIs. @Override ! DetachedConnectionChannel detachChannel() { ! return delegate.detachChannel(); } }
< prev index next >