< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java
Print this page
@@ -23,116 +23,93 @@
* questions.
*/
package jdk.incubator.http;
-import jdk.incubator.http.internal.common.ByteBufferReference;
-import jdk.incubator.http.internal.common.MinimalFuture;
-import jdk.incubator.http.HttpResponse.BodyHandler;
-
import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
+import jdk.incubator.http.internal.common.FlowTube;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
/**
* A plain text socket tunnel through a proxy. Uses "CONNECT" but does not
* encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy.
* Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption.
*/
-class PlainTunnelingConnection extends HttpConnection implements AsyncConnection {
+final class PlainTunnelingConnection extends HttpConnection {
final PlainHttpConnection delegate;
protected final InetSocketAddress proxyAddr;
private volatile boolean connected;
+ protected PlainTunnelingConnection(InetSocketAddress addr,
+ InetSocketAddress proxy,
+ HttpClientImpl client) {
+ super(addr, client);
+ this.proxyAddr = proxy;
+ delegate = new PlainHttpConnection(proxy, client);
+ }
+
@Override
public CompletableFuture<Void> connectAsync() {
+ debug.log(Level.DEBUG, "Connecting plain connection");
return delegate.connectAsync()
.thenCompose((Void v) -> {
- HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
- MultiExchange<Void,Void> mconnectExchange = new MultiExchange<>(req, client, this::ignore);
- return mconnectExchange.responseAsync()
- .thenCompose((HttpResponseImpl<Void> resp) -> {
+ debug.log(Level.DEBUG, "sending HTTP/1.1 CONNECT");
+ HttpClientImpl client = client();
+ assert client != null;
+ HttpRequestImpl req = new HttpRequestImpl("CONNECT", address);
+ MultiExchange<Void,Void> mulEx = new MultiExchange<>(null, req, client, discard(null), null);
+ Exchange<Void> connectExchange = new Exchange<>(req, mulEx);
+
+ return connectExchange
+ .responseAsyncImpl(delegate)
+ .thenCompose((Response resp) -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
+ debug.log(Level.DEBUG, "got response: %d", resp.statusCode());
if (resp.statusCode() != 200) {
- cf.completeExceptionally(new IOException("Tunnel failed"));
+ cf.completeExceptionally(new IOException(
+ "Tunnel failed, got: "+ resp.statusCode()));
} else {
+ // get the initial/remaining bytes
+ ByteBuffer b = ((Http1Exchange<?>)connectExchange.exchImpl).drainLeftOverBytes();
+ int remaining = b.remaining();
+ assert remaining == 0: "Unexpected remaining: " + remaining;
connected = true;
cf.complete(null);
}
return cf;
});
});
}
- private HttpResponse.BodyProcessor<Void> ignore(int status, HttpHeaders hdrs) {
- return HttpResponse.BodyProcessor.discard((Void)null);
- }
-
@Override
- public void connect() throws IOException, InterruptedException {
- delegate.connect();
- HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
- MultiExchange<Void,Void> mul = new MultiExchange<>(req, client, BodyHandler.<Void>discard(null));
- Exchange<Void> connectExchange = new Exchange<>(req, mul);
- Response r = connectExchange.responseImpl(delegate);
- if (r.statusCode() != 200) {
- throw new IOException("Tunnel failed");
- }
- connected = true;
- }
+ HttpPublisher publisher() { return delegate.publisher(); }
@Override
boolean connected() {
return connected;
}
- protected PlainTunnelingConnection(InetSocketAddress addr,
- InetSocketAddress proxy,
- HttpClientImpl client) {
- super(addr, client);
- this.proxyAddr = proxy;
- delegate = new PlainHttpConnection(proxy, client);
- }
-
@Override
SocketChannel channel() {
return delegate.channel();
}
@Override
- ConnectionPool.CacheKey cacheKey() {
- return new ConnectionPool.CacheKey(null, proxyAddr);
- }
-
- @Override
- long write(ByteBuffer[] buffers, int start, int number) throws IOException {
- return delegate.write(buffers, start, number);
+ FlowTube getConnectionFlow() {
+ return delegate.getConnectionFlow();
}
@Override
- long write(ByteBuffer buffer) throws IOException {
- return delegate.write(buffer);
- }
-
- @Override
- public void writeAsync(ByteBufferReference[] buffers) throws IOException {
- delegate.writeAsync(buffers);
- }
-
- @Override
- public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
- delegate.writeAsyncUnordered(buffers);
- }
-
- @Override
- public void flushAsync() throws IOException {
- delegate.flushAsync();
+ ConnectionPool.CacheKey cacheKey() {
+ return new ConnectionPool.CacheKey(null, proxyAddr);
}
@Override
public void close() {
delegate.close();
@@ -148,52 +125,22 @@
void shutdownOutput() throws IOException {
delegate.shutdownOutput();
}
@Override
- CompletableFuture<Void> whenReceivingResponse() {
- return delegate.whenReceivingResponse();
- }
-
- @Override
- protected ByteBuffer readImpl() throws IOException {
- return delegate.readImpl();
- }
-
- @Override
boolean isSecure() {
return false;
}
@Override
boolean isProxied() {
return true;
}
+ // Support for WebSocket/RawChannelImpl which unfortunately
+ // still depends on synchronous read/writes.
+ // It should be removed when RawChannelImpl moves to using asynchronous APIs.
@Override
- public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver,
- Consumer<Throwable> errorReceiver,
- Supplier<ByteBufferReference> readBufferSupplier) {
- delegate.setAsyncCallbacks(asyncReceiver, errorReceiver, readBufferSupplier);
- }
-
- @Override
- public void startReading() {
- delegate.startReading();
- }
-
- @Override
- public void stopAsyncReading() {
- delegate.stopAsyncReading();
- }
-
- @Override
- public void enableCallback() {
- delegate.enableCallback();
- }
-
- @Override
- synchronized void configureMode(Mode mode) throws IOException {
- super.configureMode(mode);
- delegate.configureMode(mode);
+ DetachedConnectionChannel detachChannel() {
+ return delegate.detachChannel();
}
}
< prev index next >