< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java
Print this page
@@ -23,327 +23,387 @@
* questions.
*/
package jdk.incubator.http;
-import javax.net.ssl.SSLParameters;
import java.io.Closeable;
import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
-
-import jdk.incubator.http.internal.common.ByteBufferReference;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Flow;
+import jdk.incubator.http.HttpClient.Version;
+import jdk.incubator.http.internal.common.Demand;
+import jdk.incubator.http.internal.common.FlowTube;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
+import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.Utils;
+import static jdk.incubator.http.HttpClient.Version.HTTP_2;
/**
* Wraps socket channel layer and takes care of SSL also.
*
* Subtypes are:
* PlainHttpConnection: regular direct TCP connection to server
* PlainProxyConnection: plain text proxy connection
* PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
- * SSLConnection: TLS channel direct to server
- * SSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
+ * AsyncSSLConnection: TLS channel direct to server
+ * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
*/
abstract class HttpConnection implements Closeable {
- enum Mode {
- BLOCKING,
- NON_BLOCKING,
- ASYNC
- }
-
- protected Mode mode;
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+ final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger(
+ () -> "HttpConnection(SocketTube(?))", DEBUG);
- // address we are connected to. Could be a server or a proxy
+ /** The address this connection is connected to. Could be a server or a proxy. */
final InetSocketAddress address;
- final HttpClientImpl client;
+ private final HttpClientImpl client;
+ private final TrailingOperations trailingOperations;
HttpConnection(InetSocketAddress address, HttpClientImpl client) {
this.address = address;
this.client = client;
+ trailingOperations = new TrailingOperations();
}
- /**
- * Public API to this class. addr is the ultimate destination. Any proxies
- * etc are figured out from the request. Returns an instance of one of the
- * following
- * PlainHttpConnection
- * PlainTunnelingConnection
- * SSLConnection
- * SSLTunnelConnection
- *
- * When object returned, connect() or connectAsync() must be called, which
- * when it returns/completes, the connection is usable for requests.
- */
- public static HttpConnection getConnection(
- InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request)
- {
- return getConnectionImpl(addr, client, request, false);
+ private static final class TrailingOperations {
+ private final Map<CompletionStage<?>, Boolean> operations =
+ new IdentityHashMap<>();
+ void add(CompletionStage<?> cf) {
+ synchronized(operations) {
+ cf.whenComplete((r,t)-> remove(cf));
+ operations.put(cf, Boolean.TRUE);
+ }
+ }
+ boolean remove(CompletionStage<?> cf) {
+ synchronized(operations) {
+ return operations.remove(cf);
+ }
+ }
}
- /**
- * Called specifically to get an async connection for HTTP/2 over SSL.
- */
- public static HttpConnection getConnection(InetSocketAddress addr,
- HttpClientImpl client, HttpRequestImpl request, boolean isHttp2) {
+ final void addTrailingOperation(CompletionStage<?> cf) {
+ trailingOperations.add(cf);
+ }
- return getConnectionImpl(addr, client, request, isHttp2);
+// final void removeTrailingOperation(CompletableFuture<?> cf) {
+// trailingOperations.remove(cf);
+// }
+
+ final HttpClientImpl client() {
+ return client;
}
- public abstract void connect() throws IOException, InterruptedException;
+ //public abstract void connect() throws IOException, InterruptedException;
public abstract CompletableFuture<Void> connectAsync();
- /**
- * Returns whether this connection is connected to its destination
- */
+ /** Tells whether, or not, this connection is connected to its destination. */
abstract boolean connected();
+ /** Tells whether, or not, this connection is secure ( over SSL ) */
abstract boolean isSecure();
+ /** Tells whether, or not, this connection is proxied. */
abstract boolean isProxied();
- /**
- * Completes when the first byte of the response is available to be read.
- */
- abstract CompletableFuture<Void> whenReceivingResponse();
-
+ /** Tells whether, or not, this connection is open. */
final boolean isOpen() {
- return channel().isOpen();
+ return channel().isOpen() &&
+ (connected() ? !getConnectionFlow().isFinished() : true);
}
- /* Returns either a plain HTTP connection or a plain tunnelling connection
- * for proxied WebSocket */
- private static HttpConnection getPlainConnection(InetSocketAddress addr,
- InetSocketAddress proxy,
- HttpRequestImpl request,
- HttpClientImpl client) {
- if (request.isWebSocket() && proxy != null) {
- return new PlainTunnelingConnection(addr, proxy, client);
- } else {
- if (proxy == null) {
- return new PlainHttpConnection(addr, client);
- } else {
- return new PlainProxyConnection(proxy, client);
- }
- }
+ interface HttpPublisher extends FlowTube.TubePublisher {
+ void enqueue(List<ByteBuffer> buffers) throws IOException;
+ void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
+ void signalEnqueued() throws IOException;
}
- private static HttpConnection getSSLConnection(InetSocketAddress addr,
- InetSocketAddress proxy, HttpRequestImpl request,
- String[] alpn, boolean isHttp2, HttpClientImpl client)
- {
- if (proxy != null) {
- if (!isHttp2) {
- return new SSLTunnelConnection(addr, client, proxy);
- } else {
- return new AsyncSSLTunnelConnection(addr, client, alpn, proxy);
- }
- } else if (!isHttp2) {
- return new SSLConnection(addr, client, alpn);
- } else {
- return new AsyncSSLConnection(addr, client, alpn);
- }
- }
+ /**
+ * Returns the HTTP publisher associated with this connection. May be null
+ * if invoked before connecting.
+ */
+ abstract HttpPublisher publisher();
/**
- * Main factory method. Gets a HttpConnection, either cached or new if
- * none available.
+ * Factory for retrieving HttpConnections. A connection can be retrieved
+ * from the connection pool, or a new one created if none available.
+ *
+ * The given {@code addr} is the ultimate destination. Any proxies,
+ * etc, are determined from the request. Returns a concrete instance which
+ * is one of the following:
+ * {@link PlainHttpConnection}
+ * {@link PlainTunnelingConnection}
+ *
+ * The returned connection, if not from the connection pool, must have its,
+ * connect() or connectAsync() method invoked, which ( when it completes
+ * successfully ) renders the connection usable for requests.
*/
- private static HttpConnection getConnectionImpl(InetSocketAddress addr,
+ public static HttpConnection getConnection(InetSocketAddress addr,
HttpClientImpl client,
- HttpRequestImpl request, boolean isHttp2)
- {
+ HttpRequestImpl request,
+ Version version) {
HttpConnection c = null;
- InetSocketAddress proxy = request.proxy(client);
+ InetSocketAddress proxy = request.proxy();
if (proxy != null && proxy.isUnresolved()) {
- // The default proxy selector may select a proxy whose
- // address is unresolved. We must resolve the address
- // before using it to connect.
+ // The default proxy selector may select a proxy whose address is
+ // unresolved. We must resolve the address before connecting to it.
proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
}
boolean secure = request.secure();
ConnectionPool pool = client.connectionPool();
- String[] alpn = null;
-
- if (secure && isHttp2) {
- alpn = new String[2];
- alpn[0] = "h2";
- alpn[1] = "http/1.1";
- }
if (!secure) {
c = pool.getConnection(false, addr, proxy);
- if (c != null) {
+ if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
+ final HttpConnection conn = c;
+ DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
+ + ": plain connection retrieved from HTTP/1.1 pool");
return c;
} else {
return getPlainConnection(addr, proxy, request, client);
}
- } else {
- if (!isHttp2) { // if http2 we don't cache connections
+ } else { // secure
+ if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
c = pool.getConnection(true, addr, proxy);
}
- if (c != null) {
+ if (c != null && c.isOpen()) {
+ final HttpConnection conn = c;
+ DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
+ + ": SSL connection retrieved from HTTP/1.1 pool");
return c;
} else {
- return getSSLConnection(addr, proxy, request, alpn, isHttp2, client);
+ String[] alpn = null;
+ if (version == HTTP_2) {
+ alpn = new String[] { "h2", "http/1.1" };
+ }
+ return getSSLConnection(addr, proxy, alpn, client);
+ }
}
}
+
+ private static HttpConnection getSSLConnection(InetSocketAddress addr,
+ InetSocketAddress proxy,
+ String[] alpn,
+ HttpClientImpl client) {
+ if (proxy != null)
+ return new AsyncSSLTunnelConnection(addr, client, alpn, proxy);
+ else
+ return new AsyncSSLConnection(addr, client, alpn);
+ }
+
+ /* Returns either a plain HTTP connection or a plain tunnelling connection
+ * for proxied WebSocket */
+ private static HttpConnection getPlainConnection(InetSocketAddress addr,
+ InetSocketAddress proxy,
+ HttpRequestImpl request,
+ HttpClientImpl client) {
+ if (request.isWebSocket() && proxy != null)
+ return new PlainTunnelingConnection(addr, proxy, client);
+
+ if (proxy == null)
+ return new PlainHttpConnection(addr, client);
+ else
+ return new PlainProxyConnection(proxy, client);
}
- void returnToCache(HttpHeaders hdrs) {
+ void closeOrReturnToCache(HttpHeaders hdrs) {
if (hdrs == null) {
- // the connection was closed by server
+ // the connection was closed by server, eof
close();
return;
}
if (!isOpen()) {
return;
}
+ HttpClientImpl client = client();
+ if (client == null) {
+ close();
+ return;
+ }
ConnectionPool pool = client.connectionPool();
boolean keepAlive = hdrs.firstValue("Connection")
.map((s) -> !s.equalsIgnoreCase("close"))
.orElse(true);
if (keepAlive) {
+ Log.logTrace("Returning connection to the pool: {0}", this);
pool.returnToPool(this);
} else {
close();
}
}
- /**
- * Also check that the number of bytes written is what was expected. This
- * could be different if the buffer is user-supplied and its internal
- * pointers were manipulated in a race condition.
- */
- final void checkWrite(long expected, ByteBuffer buffer) throws IOException {
- long written = write(buffer);
- if (written != expected) {
- throw new IOException("incorrect number of bytes written");
- }
- }
-
- final void checkWrite(long expected,
- ByteBuffer[] buffers,
- int start,
- int length)
- throws IOException
- {
- long written = write(buffers, start, length);
- if (written != expected) {
- throw new IOException("incorrect number of bytes written");
- }
- }
-
abstract SocketChannel channel();
final InetSocketAddress address() {
return address;
}
- synchronized void configureMode(Mode mode) throws IOException {
- this.mode = mode;
- if (mode == Mode.BLOCKING) {
- channel().configureBlocking(true);
- } else {
- channel().configureBlocking(false);
- }
- }
+ abstract ConnectionPool.CacheKey cacheKey();
- synchronized Mode getMode() {
- return mode;
- }
+// // overridden in SSL only
+// SSLParameters sslParameters() {
+// return null;
+// }
- abstract ConnectionPool.CacheKey cacheKey();
+ /**
+ * Closes this connection, by returning the socket to its connection pool.
+ */
+ @Override
+ public abstract void close();
- // overridden in SSL only
- SSLParameters sslParameters() {
- return null;
- }
+ abstract void shutdownInput() throws IOException;
- // Methods to be implemented for Plain TCP and SSL
+ abstract void shutdownOutput() throws IOException;
+ // Support for WebSocket/RawChannelImpl which unfortunately
+ // still depends on synchronous read/writes.
+ // It should be removed when RawChannelImpl moves to using asynchronous APIs.
+ abstract static class DetachedConnectionChannel implements Closeable {
+ DetachedConnectionChannel() {}
+ abstract SocketChannel channel();
abstract long write(ByteBuffer[] buffers, int start, int number)
throws IOException;
+ abstract void shutdownInput() throws IOException;
+ abstract void shutdownOutput() throws IOException;
+ abstract ByteBuffer read() throws IOException;
+ @Override
+ public abstract void close();
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName() + ": " + channel().toString();
+ }
+ }
- abstract long write(ByteBuffer buffer) throws IOException;
+ // Support for WebSocket/RawChannelImpl which unfortunately
+ // still depends on synchronous read/writes.
+ // It should be removed when RawChannelImpl moves to using asynchronous APIs.
+ abstract DetachedConnectionChannel detachChannel();
- // Methods to be implemented for Plain TCP (async mode) and AsyncSSL
+ abstract FlowTube getConnectionFlow();
/**
- * In {@linkplain Mode#ASYNC async mode}, this method puts buffers at the
- * end of the send queue; Otherwise, it is equivalent to {@link
- * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}.
- * When in async mode, calling this method should later be followed by
- * subsequent flushAsync invocation.
- * That allows multiple threads to put buffers into the queue while some other
- * thread is writing.
+ * A publisher that makes it possible to publish (write)
+ * ordered (normal priority) and unordered (high priority)
+ * buffers downstream.
*/
- abstract void writeAsync(ByteBufferReference[] buffers) throws IOException;
+ final class PlainHttpPublisher implements HttpPublisher {
+ final Object reading;
+ PlainHttpPublisher() {
+ this(new Object());
+ }
+ PlainHttpPublisher(Object readingLock) {
+ this.reading = readingLock;
+ }
+ final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
+ volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
+ volatile HttpWriteSubscription subscription;
+ final SequentialScheduler writeScheduler =
+ new SequentialScheduler(this::flushTask);
+ @Override
+ public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+ synchronized (reading) {
+ //assert this.subscription == null;
+ //assert this.subscriber == null;
+ if (subscription == null) {
+ subscription = new HttpWriteSubscription();
+ }
+ this.subscriber = subscriber;
+ }
+ // TODO: should we do this in the flow?
+ subscriber.onSubscribe(subscription);
+ signal();
+ }
- /**
- * In {@linkplain Mode#ASYNC async mode}, this method may put
- * buffers at the beginning of send queue, breaking frames sequence and
- * allowing to write these buffers before other buffers in the queue;
- * Otherwise, it is equivalent to {@link
- * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}.
- * When in async mode, calling this method should later be followed by
- * subsequent flushAsync invocation.
- * That allows multiple threads to put buffers into the queue while some other
- * thread is writing.
- */
- abstract void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException;
+ void flushTask(DeferredCompleter completer) {
+ try {
+ HttpWriteSubscription sub = subscription;
+ if (sub != null) sub.flush();
+ } finally {
+ completer.complete();
+ }
+ }
- /**
- * This method should be called after any writeAsync/writeAsyncUnordered
- * invocation.
- * If there is a race to flushAsync from several threads one thread
- * (race winner) capture flush operation and write the whole queue content.
- * Other threads (race losers) exits from the method (not blocking)
- * and continue execution.
- */
- abstract void flushAsync() throws IOException;
+ void signal() {
+ writeScheduler.runOrSchedule();
+ }
+
+ final class HttpWriteSubscription implements Flow.Subscription {
+ final Demand demand = new Demand();
- /**
- * Closes this connection, by returning the socket to its connection pool.
- */
@Override
- public abstract void close();
+ public void request(long n) {
+ if (n <= 0) throw new IllegalArgumentException("non-positive request");
+ demand.increase(n);
+ debug.log(Level.DEBUG, () -> "HttpPublisher: got request of "
+ + n + " from "
+ + getConnectionFlow());
+ writeScheduler.runOrSchedule();
+ }
- abstract void shutdownInput() throws IOException;
+ @Override
+ public void cancel() {
+ debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by "
+ + getConnectionFlow());
+ }
- abstract void shutdownOutput() throws IOException;
+ void flush() {
+ while (!queue.isEmpty() && demand.tryDecrement()) {
+ List<ByteBuffer> elem = queue.poll();
+ debug.log(Level.DEBUG, () -> "HttpPublisher: sending "
+ + Utils.remaining(elem) + " bytes ("
+ + elem.size() + " buffers) to "
+ + getConnectionFlow());
+ subscriber.onNext(elem);
+ }
+ }
+ }
- /**
- * Puts position to limit and limit to capacity so we can resume reading
- * into this buffer, but if required > 0 then limit may be reduced so that
- * no more than required bytes are read next time.
- */
- static void resumeChannelRead(ByteBuffer buf, int required) {
- int limit = buf.limit();
- buf.position(limit);
- int capacity = buf.capacity() - limit;
- if (required > 0 && required < capacity) {
- buf.limit(limit + required);
- } else {
- buf.limit(buf.capacity());
+ @Override
+ public void enqueue(List<ByteBuffer> buffers) throws IOException {
+ queue.add(buffers);
+ int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
+ debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes);
}
+
+ @Override
+ public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
+ // Unordered frames are sent before existing frames.
+ int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
+ queue.addFirst(buffers);
+ debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
}
- final ByteBuffer read() throws IOException {
- ByteBuffer b = readImpl();
- return b;
+ @Override
+ public void signalEnqueued() throws IOException {
+ debug.log(Level.DEBUG, "signalling the publisher of the write queue");
+ signal();
+ }
}
- /*
- * Returns a ByteBuffer with the data available at the moment, or null if
- * reached EOF.
- */
- protected abstract ByteBuffer readImpl() throws IOException;
+ String dbgTag = null;
+ final String dbgString() {
+ FlowTube flow = getConnectionFlow();
+ String tag = dbgTag;
+ if (tag == null && flow != null) {
+ dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
+ } else if (tag == null) {
+ tag = this.getClass().getSimpleName() + "(?)";
+ }
+ return tag;
+ }
@Override
public String toString() {
return "HttpConnection: " + channel().toString();
}
< prev index next >