< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java

Print this page

        

@@ -23,116 +23,93 @@
  * questions.
  */
 
 package jdk.incubator.http;
 
-import jdk.incubator.http.internal.common.ByteBufferReference;
-import jdk.incubator.http.internal.common.MinimalFuture;
-import jdk.incubator.http.HttpResponse.BodyHandler;
-
 import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
+import jdk.incubator.http.internal.common.FlowTube;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
 
 /**
  * A plain text socket tunnel through a proxy. Uses "CONNECT" but does not
  * encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy.
  * Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption.
  */
-class PlainTunnelingConnection extends HttpConnection implements AsyncConnection {
+final class PlainTunnelingConnection extends HttpConnection {
 
     final PlainHttpConnection delegate;
     protected final InetSocketAddress proxyAddr;
     private volatile boolean connected;
 
+    protected PlainTunnelingConnection(InetSocketAddress addr,
+                                       InetSocketAddress proxy,
+                                       HttpClientImpl client) {
+        super(addr, client);
+        this.proxyAddr = proxy;
+        delegate = new PlainHttpConnection(proxy, client);
+    }
+
     @Override
     public CompletableFuture<Void> connectAsync() {
+        debug.log(Level.DEBUG, "Connecting plain connection");
         return delegate.connectAsync()
             .thenCompose((Void v) -> {
-                HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
-                MultiExchange<Void,Void> mconnectExchange = new MultiExchange<>(req, client, this::ignore);
-                return mconnectExchange.responseAsync()
-                    .thenCompose((HttpResponseImpl<Void> resp) -> {
+                debug.log(Level.DEBUG, "sending HTTP/1.1 CONNECT");
+                HttpClientImpl client = client();
+                assert client != null;
+                HttpRequestImpl req = new HttpRequestImpl("CONNECT", address);
+                MultiExchange<Void,Void> mulEx = new MultiExchange<>(null, req, client, discard(null), null);
+                Exchange<Void> connectExchange = new Exchange<>(req, mulEx);
+
+                return connectExchange
+                        .responseAsyncImpl(delegate)
+                        .thenCompose((Response resp) -> {
                         CompletableFuture<Void> cf = new MinimalFuture<>();
+                            debug.log(Level.DEBUG, "got response: %d", resp.statusCode());
                         if (resp.statusCode() != 200) {
-                            cf.completeExceptionally(new IOException("Tunnel failed"));
+                                cf.completeExceptionally(new IOException(
+                                        "Tunnel failed, got: "+ resp.statusCode()));
                         } else {
+                                // get the initial/remaining bytes
+                                ByteBuffer b = ((Http1Exchange<?>)connectExchange.exchImpl).drainLeftOverBytes();
+                                int remaining = b.remaining();
+                                assert remaining == 0: "Unexpected remaining: " + remaining;
                             connected = true;
                             cf.complete(null);
                         }
                         return cf;
                     });
             });
     }
 
-    private HttpResponse.BodyProcessor<Void> ignore(int status, HttpHeaders hdrs) {
-        return HttpResponse.BodyProcessor.discard((Void)null);
-    }
-
     @Override
-    public void connect() throws IOException, InterruptedException {
-        delegate.connect();
-        HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
-        MultiExchange<Void,Void> mul = new MultiExchange<>(req, client, BodyHandler.<Void>discard(null));
-        Exchange<Void> connectExchange = new Exchange<>(req, mul);
-        Response r = connectExchange.responseImpl(delegate);
-        if (r.statusCode() != 200) {
-            throw new IOException("Tunnel failed");
-        }
-        connected = true;
-    }
+    HttpPublisher publisher() { return delegate.publisher(); }
 
     @Override
     boolean connected() {
         return connected;
     }
 
-    protected PlainTunnelingConnection(InetSocketAddress addr,
-                                       InetSocketAddress proxy,
-                                       HttpClientImpl client) {
-        super(addr, client);
-        this.proxyAddr = proxy;
-        delegate = new PlainHttpConnection(proxy, client);
-    }
-
     @Override
     SocketChannel channel() {
         return delegate.channel();
     }
 
     @Override
-    ConnectionPool.CacheKey cacheKey() {
-        return new ConnectionPool.CacheKey(null, proxyAddr);
-    }
-
-    @Override
-    long write(ByteBuffer[] buffers, int start, int number) throws IOException {
-        return delegate.write(buffers, start, number);
+    FlowTube getConnectionFlow() {
+        return delegate.getConnectionFlow();
     }
 
     @Override
-    long write(ByteBuffer buffer) throws IOException {
-        return delegate.write(buffer);
-    }
-
-    @Override
-    public void writeAsync(ByteBufferReference[] buffers) throws IOException {
-        delegate.writeAsync(buffers);
-    }
-
-    @Override
-    public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
-        delegate.writeAsyncUnordered(buffers);
-    }
-
-    @Override
-    public void flushAsync() throws IOException {
-        delegate.flushAsync();
+    ConnectionPool.CacheKey cacheKey() {
+        return new ConnectionPool.CacheKey(null, proxyAddr);
     }
 
     @Override
     public void close() {
         delegate.close();

@@ -148,52 +125,22 @@
     void shutdownOutput() throws IOException {
         delegate.shutdownOutput();
     }
 
     @Override
-    CompletableFuture<Void> whenReceivingResponse() {
-        return delegate.whenReceivingResponse();
-    }
-
-    @Override
-    protected ByteBuffer readImpl() throws IOException {
-        return delegate.readImpl();
-    }
-
-    @Override
     boolean isSecure() {
         return false;
     }
 
     @Override
     boolean isProxied() {
         return true;
     }
 
+    // Support for WebSocket/RawChannelImpl which unfortunately
+    // still depends on synchronous read/writes.
+    // It should be removed when RawChannelImpl moves to using asynchronous APIs.
     @Override
-    public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver,
-            Consumer<Throwable> errorReceiver,
-            Supplier<ByteBufferReference> readBufferSupplier) {
-        delegate.setAsyncCallbacks(asyncReceiver, errorReceiver, readBufferSupplier);
-    }
-
-    @Override
-    public void startReading() {
-        delegate.startReading();
-    }
-
-    @Override
-    public void stopAsyncReading() {
-        delegate.stopAsyncReading();
-    }
-
-    @Override
-    public void enableCallback() {
-        delegate.enableCallback();
-    }
-
-    @Override
-    synchronized void configureMode(Mode mode) throws IOException {
-        super.configureMode(mode);
-        delegate.configureMode(mode);
+    DetachedConnectionChannel detachChannel() {
+        return delegate.detachChannel();
     }
 }
< prev index next >