< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java
Print this page
*** 23,138 ****
* questions.
*/
package jdk.incubator.http;
- import jdk.incubator.http.internal.common.ByteBufferReference;
- import jdk.incubator.http.internal.common.MinimalFuture;
- import jdk.incubator.http.HttpResponse.BodyHandler;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
! import java.util.function.Consumer;
! import java.util.function.Supplier;
/**
* A plain text socket tunnel through a proxy. Uses "CONNECT" but does not
* encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy.
* Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption.
*/
! class PlainTunnelingConnection extends HttpConnection implements AsyncConnection {
final PlainHttpConnection delegate;
protected final InetSocketAddress proxyAddr;
private volatile boolean connected;
@Override
public CompletableFuture<Void> connectAsync() {
return delegate.connectAsync()
.thenCompose((Void v) -> {
! HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
! MultiExchange<Void,Void> mconnectExchange = new MultiExchange<>(req, client, this::ignore);
! return mconnectExchange.responseAsync()
! .thenCompose((HttpResponseImpl<Void> resp) -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
if (resp.statusCode() != 200) {
! cf.completeExceptionally(new IOException("Tunnel failed"));
} else {
connected = true;
cf.complete(null);
}
return cf;
});
});
}
- private HttpResponse.BodyProcessor<Void> ignore(int status, HttpHeaders hdrs) {
- return HttpResponse.BodyProcessor.discard((Void)null);
- }
-
@Override
! public void connect() throws IOException, InterruptedException {
! delegate.connect();
! HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
! MultiExchange<Void,Void> mul = new MultiExchange<>(req, client, BodyHandler.<Void>discard(null));
! Exchange<Void> connectExchange = new Exchange<>(req, mul);
! Response r = connectExchange.responseImpl(delegate);
! if (r.statusCode() != 200) {
! throw new IOException("Tunnel failed");
! }
! connected = true;
! }
@Override
boolean connected() {
return connected;
}
- protected PlainTunnelingConnection(InetSocketAddress addr,
- InetSocketAddress proxy,
- HttpClientImpl client) {
- super(addr, client);
- this.proxyAddr = proxy;
- delegate = new PlainHttpConnection(proxy, client);
- }
-
@Override
SocketChannel channel() {
return delegate.channel();
}
@Override
! ConnectionPool.CacheKey cacheKey() {
! return new ConnectionPool.CacheKey(null, proxyAddr);
! }
!
! @Override
! long write(ByteBuffer[] buffers, int start, int number) throws IOException {
! return delegate.write(buffers, start, number);
}
@Override
! long write(ByteBuffer buffer) throws IOException {
! return delegate.write(buffer);
! }
!
! @Override
! public void writeAsync(ByteBufferReference[] buffers) throws IOException {
! delegate.writeAsync(buffers);
! }
!
! @Override
! public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
! delegate.writeAsyncUnordered(buffers);
! }
!
! @Override
! public void flushAsync() throws IOException {
! delegate.flushAsync();
}
@Override
public void close() {
delegate.close();
--- 23,115 ----
* questions.
*/
package jdk.incubator.http;
import java.io.IOException;
+ import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
! import jdk.incubator.http.internal.common.FlowTube;
! import jdk.incubator.http.internal.common.MinimalFuture;
! import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
/**
* A plain text socket tunnel through a proxy. Uses "CONNECT" but does not
* encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy.
* Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption.
*/
! final class PlainTunnelingConnection extends HttpConnection {
final PlainHttpConnection delegate;
protected final InetSocketAddress proxyAddr;
private volatile boolean connected;
+ protected PlainTunnelingConnection(InetSocketAddress addr,
+ InetSocketAddress proxy,
+ HttpClientImpl client) {
+ super(addr, client);
+ this.proxyAddr = proxy;
+ delegate = new PlainHttpConnection(proxy, client);
+ }
+
@Override
public CompletableFuture<Void> connectAsync() {
+ debug.log(Level.DEBUG, "Connecting plain connection");
return delegate.connectAsync()
.thenCompose((Void v) -> {
! debug.log(Level.DEBUG, "sending HTTP/1.1 CONNECT");
! HttpClientImpl client = client();
! assert client != null;
! HttpRequestImpl req = new HttpRequestImpl("CONNECT", address);
! MultiExchange<Void,Void> mulEx = new MultiExchange<>(null, req, client, discard(null), null);
! Exchange<Void> connectExchange = new Exchange<>(req, mulEx);
!
! return connectExchange
! .responseAsyncImpl(delegate)
! .thenCompose((Response resp) -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
+ debug.log(Level.DEBUG, "got response: %d", resp.statusCode());
if (resp.statusCode() != 200) {
! cf.completeExceptionally(new IOException(
! "Tunnel failed, got: "+ resp.statusCode()));
} else {
+ // get the initial/remaining bytes
+ ByteBuffer b = ((Http1Exchange<?>)connectExchange.exchImpl).drainLeftOverBytes();
+ int remaining = b.remaining();
+ assert remaining == 0: "Unexpected remaining: " + remaining;
connected = true;
cf.complete(null);
}
return cf;
});
});
}
@Override
! HttpPublisher publisher() { return delegate.publisher(); }
@Override
boolean connected() {
return connected;
}
@Override
SocketChannel channel() {
return delegate.channel();
}
@Override
! FlowTube getConnectionFlow() {
! return delegate.getConnectionFlow();
}
@Override
! ConnectionPool.CacheKey cacheKey() {
! return new ConnectionPool.CacheKey(null, proxyAddr);
}
@Override
public void close() {
delegate.close();
*** 148,199 ****
void shutdownOutput() throws IOException {
delegate.shutdownOutput();
}
@Override
- CompletableFuture<Void> whenReceivingResponse() {
- return delegate.whenReceivingResponse();
- }
-
- @Override
- protected ByteBuffer readImpl() throws IOException {
- return delegate.readImpl();
- }
-
- @Override
boolean isSecure() {
return false;
}
@Override
boolean isProxied() {
return true;
}
@Override
! public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver,
! Consumer<Throwable> errorReceiver,
! Supplier<ByteBufferReference> readBufferSupplier) {
! delegate.setAsyncCallbacks(asyncReceiver, errorReceiver, readBufferSupplier);
! }
!
! @Override
! public void startReading() {
! delegate.startReading();
! }
!
! @Override
! public void stopAsyncReading() {
! delegate.stopAsyncReading();
! }
!
! @Override
! public void enableCallback() {
! delegate.enableCallback();
! }
!
! @Override
! synchronized void configureMode(Mode mode) throws IOException {
! super.configureMode(mode);
! delegate.configureMode(mode);
}
}
--- 125,146 ----
void shutdownOutput() throws IOException {
delegate.shutdownOutput();
}
@Override
boolean isSecure() {
return false;
}
@Override
boolean isProxied() {
return true;
}
+ // Support for WebSocket/RawChannelImpl which unfortunately
+ // still depends on synchronous read/writes.
+ // It should be removed when RawChannelImpl moves to using asynchronous APIs.
@Override
! DetachedConnectionChannel detachChannel() {
! return delegate.detachChannel();
}
}
< prev index next >