< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java
Print this page
*** 23,349 ****
* questions.
*/
package jdk.incubator.http;
- import javax.net.ssl.SSLParameters;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
!
! import jdk.incubator.http.internal.common.ByteBufferReference;
/**
* Wraps socket channel layer and takes care of SSL also.
*
* Subtypes are:
* PlainHttpConnection: regular direct TCP connection to server
* PlainProxyConnection: plain text proxy connection
* PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
! * SSLConnection: TLS channel direct to server
! * SSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
*/
abstract class HttpConnection implements Closeable {
! enum Mode {
! BLOCKING,
! NON_BLOCKING,
! ASYNC
! }
!
! protected Mode mode;
! // address we are connected to. Could be a server or a proxy
final InetSocketAddress address;
! final HttpClientImpl client;
HttpConnection(InetSocketAddress address, HttpClientImpl client) {
this.address = address;
this.client = client;
}
! /**
! * Public API to this class. addr is the ultimate destination. Any proxies
! * etc are figured out from the request. Returns an instance of one of the
! * following
! * PlainHttpConnection
! * PlainTunnelingConnection
! * SSLConnection
! * SSLTunnelConnection
! *
! * When object returned, connect() or connectAsync() must be called, which
! * when it returns/completes, the connection is usable for requests.
! */
! public static HttpConnection getConnection(
! InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request)
! {
! return getConnectionImpl(addr, client, request, false);
}
! /**
! * Called specifically to get an async connection for HTTP/2 over SSL.
! */
! public static HttpConnection getConnection(InetSocketAddress addr,
! HttpClientImpl client, HttpRequestImpl request, boolean isHttp2) {
! return getConnectionImpl(addr, client, request, isHttp2);
}
! public abstract void connect() throws IOException, InterruptedException;
public abstract CompletableFuture<Void> connectAsync();
! /**
! * Returns whether this connection is connected to its destination
! */
abstract boolean connected();
abstract boolean isSecure();
abstract boolean isProxied();
! /**
! * Completes when the first byte of the response is available to be read.
! */
! abstract CompletableFuture<Void> whenReceivingResponse();
!
final boolean isOpen() {
! return channel().isOpen();
}
! /* Returns either a plain HTTP connection or a plain tunnelling connection
! * for proxied WebSocket */
! private static HttpConnection getPlainConnection(InetSocketAddress addr,
! InetSocketAddress proxy,
! HttpRequestImpl request,
! HttpClientImpl client) {
! if (request.isWebSocket() && proxy != null) {
! return new PlainTunnelingConnection(addr, proxy, client);
! } else {
! if (proxy == null) {
! return new PlainHttpConnection(addr, client);
! } else {
! return new PlainProxyConnection(proxy, client);
! }
! }
}
! private static HttpConnection getSSLConnection(InetSocketAddress addr,
! InetSocketAddress proxy, HttpRequestImpl request,
! String[] alpn, boolean isHttp2, HttpClientImpl client)
! {
! if (proxy != null) {
! if (!isHttp2) {
! return new SSLTunnelConnection(addr, client, proxy);
! } else {
! return new AsyncSSLTunnelConnection(addr, client, alpn, proxy);
! }
! } else if (!isHttp2) {
! return new SSLConnection(addr, client, alpn);
! } else {
! return new AsyncSSLConnection(addr, client, alpn);
! }
! }
/**
! * Main factory method. Gets a HttpConnection, either cached or new if
! * none available.
*/
! private static HttpConnection getConnectionImpl(InetSocketAddress addr,
HttpClientImpl client,
! HttpRequestImpl request, boolean isHttp2)
! {
HttpConnection c = null;
! InetSocketAddress proxy = request.proxy(client);
if (proxy != null && proxy.isUnresolved()) {
! // The default proxy selector may select a proxy whose
! // address is unresolved. We must resolve the address
! // before using it to connect.
proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
}
boolean secure = request.secure();
ConnectionPool pool = client.connectionPool();
- String[] alpn = null;
-
- if (secure && isHttp2) {
- alpn = new String[2];
- alpn[0] = "h2";
- alpn[1] = "http/1.1";
- }
if (!secure) {
c = pool.getConnection(false, addr, proxy);
! if (c != null) {
return c;
} else {
return getPlainConnection(addr, proxy, request, client);
}
! } else {
! if (!isHttp2) { // if http2 we don't cache connections
c = pool.getConnection(true, addr, proxy);
}
! if (c != null) {
return c;
} else {
! return getSSLConnection(addr, proxy, request, alpn, isHttp2, client);
}
}
}
! void returnToCache(HttpHeaders hdrs) {
if (hdrs == null) {
! // the connection was closed by server
close();
return;
}
if (!isOpen()) {
return;
}
ConnectionPool pool = client.connectionPool();
boolean keepAlive = hdrs.firstValue("Connection")
.map((s) -> !s.equalsIgnoreCase("close"))
.orElse(true);
if (keepAlive) {
pool.returnToPool(this);
} else {
close();
}
}
- /**
- * Also check that the number of bytes written is what was expected. This
- * could be different if the buffer is user-supplied and its internal
- * pointers were manipulated in a race condition.
- */
- final void checkWrite(long expected, ByteBuffer buffer) throws IOException {
- long written = write(buffer);
- if (written != expected) {
- throw new IOException("incorrect number of bytes written");
- }
- }
-
- final void checkWrite(long expected,
- ByteBuffer[] buffers,
- int start,
- int length)
- throws IOException
- {
- long written = write(buffers, start, length);
- if (written != expected) {
- throw new IOException("incorrect number of bytes written");
- }
- }
-
abstract SocketChannel channel();
final InetSocketAddress address() {
return address;
}
! synchronized void configureMode(Mode mode) throws IOException {
! this.mode = mode;
! if (mode == Mode.BLOCKING) {
! channel().configureBlocking(true);
! } else {
! channel().configureBlocking(false);
! }
! }
! synchronized Mode getMode() {
! return mode;
! }
! abstract ConnectionPool.CacheKey cacheKey();
! // overridden in SSL only
! SSLParameters sslParameters() {
! return null;
! }
! // Methods to be implemented for Plain TCP and SSL
abstract long write(ByteBuffer[] buffers, int start, int number)
throws IOException;
! abstract long write(ByteBuffer buffer) throws IOException;
! // Methods to be implemented for Plain TCP (async mode) and AsyncSSL
/**
! * In {@linkplain Mode#ASYNC async mode}, this method puts buffers at the
! * end of the send queue; Otherwise, it is equivalent to {@link
! * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}.
! * When in async mode, calling this method should later be followed by
! * subsequent flushAsync invocation.
! * That allows multiple threads to put buffers into the queue while some other
! * thread is writing.
*/
! abstract void writeAsync(ByteBufferReference[] buffers) throws IOException;
! /**
! * In {@linkplain Mode#ASYNC async mode}, this method may put
! * buffers at the beginning of send queue, breaking frames sequence and
! * allowing to write these buffers before other buffers in the queue;
! * Otherwise, it is equivalent to {@link
! * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}.
! * When in async mode, calling this method should later be followed by
! * subsequent flushAsync invocation.
! * That allows multiple threads to put buffers into the queue while some other
! * thread is writing.
! */
! abstract void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException;
! /**
! * This method should be called after any writeAsync/writeAsyncUnordered
! * invocation.
! * If there is a race to flushAsync from several threads one thread
! * (race winner) capture flush operation and write the whole queue content.
! * Other threads (race losers) exits from the method (not blocking)
! * and continue execution.
! */
! abstract void flushAsync() throws IOException;
- /**
- * Closes this connection, by returning the socket to its connection pool.
- */
@Override
! public abstract void close();
! abstract void shutdownInput() throws IOException;
! abstract void shutdownOutput() throws IOException;
! /**
! * Puts position to limit and limit to capacity so we can resume reading
! * into this buffer, but if required > 0 then limit may be reduced so that
! * no more than required bytes are read next time.
! */
! static void resumeChannelRead(ByteBuffer buf, int required) {
! int limit = buf.limit();
! buf.position(limit);
! int capacity = buf.capacity() - limit;
! if (required > 0 && required < capacity) {
! buf.limit(limit + required);
! } else {
! buf.limit(buf.capacity());
}
}
! final ByteBuffer read() throws IOException {
! ByteBuffer b = readImpl();
! return b;
}
! /*
! * Returns a ByteBuffer with the data available at the moment, or null if
! * reached EOF.
! */
! protected abstract ByteBuffer readImpl() throws IOException;
@Override
public String toString() {
return "HttpConnection: " + channel().toString();
}
--- 23,409 ----
* questions.
*/
package jdk.incubator.http;
import java.io.Closeable;
import java.io.IOException;
+ import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
+ import java.util.IdentityHashMap;
+ import java.util.List;
+ import java.util.Map;
import java.util.concurrent.CompletableFuture;
! import java.util.concurrent.CompletionStage;
! import java.util.concurrent.ConcurrentLinkedDeque;
! import java.util.concurrent.Flow;
! import jdk.incubator.http.HttpClient.Version;
! import jdk.incubator.http.internal.common.Demand;
! import jdk.incubator.http.internal.common.FlowTube;
! import jdk.incubator.http.internal.common.SequentialScheduler;
! import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
! import jdk.incubator.http.internal.common.Log;
! import jdk.incubator.http.internal.common.Utils;
! import static jdk.incubator.http.HttpClient.Version.HTTP_2;
/**
* Wraps socket channel layer and takes care of SSL also.
*
* Subtypes are:
* PlainHttpConnection: regular direct TCP connection to server
* PlainProxyConnection: plain text proxy connection
* PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
! * AsyncSSLConnection: TLS channel direct to server
! * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
*/
abstract class HttpConnection implements Closeable {
! static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
! final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
! final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger(
! () -> "HttpConnection(SocketTube(?))", DEBUG);
! /** The address this connection is connected to. Could be a server or a proxy. */
final InetSocketAddress address;
! private final HttpClientImpl client;
! private final TrailingOperations trailingOperations;
HttpConnection(InetSocketAddress address, HttpClientImpl client) {
this.address = address;
this.client = client;
+ trailingOperations = new TrailingOperations();
}
! private static final class TrailingOperations {
! private final Map<CompletionStage<?>, Boolean> operations =
! new IdentityHashMap<>();
! void add(CompletionStage<?> cf) {
! synchronized(operations) {
! cf.whenComplete((r,t)-> remove(cf));
! operations.put(cf, Boolean.TRUE);
! }
! }
! boolean remove(CompletionStage<?> cf) {
! synchronized(operations) {
! return operations.remove(cf);
! }
! }
}
! final void addTrailingOperation(CompletionStage<?> cf) {
! trailingOperations.add(cf);
! }
! // final void removeTrailingOperation(CompletableFuture<?> cf) {
! // trailingOperations.remove(cf);
! // }
!
! final HttpClientImpl client() {
! return client;
}
! //public abstract void connect() throws IOException, InterruptedException;
public abstract CompletableFuture<Void> connectAsync();
! /** Tells whether, or not, this connection is connected to its destination. */
abstract boolean connected();
+ /** Tells whether, or not, this connection is secure ( over SSL ) */
abstract boolean isSecure();
+ /** Tells whether, or not, this connection is proxied. */
abstract boolean isProxied();
! /** Tells whether, or not, this connection is open. */
final boolean isOpen() {
! return channel().isOpen() &&
! (connected() ? !getConnectionFlow().isFinished() : true);
}
! interface HttpPublisher extends FlowTube.TubePublisher {
! void enqueue(List<ByteBuffer> buffers) throws IOException;
! void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
! void signalEnqueued() throws IOException;
}
! /**
! * Returns the HTTP publisher associated with this connection. May be null
! * if invoked before connecting.
! */
! abstract HttpPublisher publisher();
/**
! * Factory for retrieving HttpConnections. A connection can be retrieved
! * from the connection pool, or a new one created if none available.
! *
! * The given {@code addr} is the ultimate destination. Any proxies,
! * etc, are determined from the request. Returns a concrete instance which
! * is one of the following:
! * {@link PlainHttpConnection}
! * {@link PlainTunnelingConnection}
! *
! * The returned connection, if not from the connection pool, must have its,
! * connect() or connectAsync() method invoked, which ( when it completes
! * successfully ) renders the connection usable for requests.
*/
! public static HttpConnection getConnection(InetSocketAddress addr,
HttpClientImpl client,
! HttpRequestImpl request,
! Version version) {
HttpConnection c = null;
! InetSocketAddress proxy = request.proxy();
if (proxy != null && proxy.isUnresolved()) {
! // The default proxy selector may select a proxy whose address is
! // unresolved. We must resolve the address before connecting to it.
proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
}
boolean secure = request.secure();
ConnectionPool pool = client.connectionPool();
if (!secure) {
c = pool.getConnection(false, addr, proxy);
! if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
! final HttpConnection conn = c;
! DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
! + ": plain connection retrieved from HTTP/1.1 pool");
return c;
} else {
return getPlainConnection(addr, proxy, request, client);
}
! } else { // secure
! if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
c = pool.getConnection(true, addr, proxy);
}
! if (c != null && c.isOpen()) {
! final HttpConnection conn = c;
! DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
! + ": SSL connection retrieved from HTTP/1.1 pool");
return c;
} else {
! String[] alpn = null;
! if (version == HTTP_2) {
! alpn = new String[] { "h2", "http/1.1" };
! }
! return getSSLConnection(addr, proxy, alpn, client);
! }
}
}
+
+ private static HttpConnection getSSLConnection(InetSocketAddress addr,
+ InetSocketAddress proxy,
+ String[] alpn,
+ HttpClientImpl client) {
+ if (proxy != null)
+ return new AsyncSSLTunnelConnection(addr, client, alpn, proxy);
+ else
+ return new AsyncSSLConnection(addr, client, alpn);
+ }
+
+ /* Returns either a plain HTTP connection or a plain tunnelling connection
+ * for proxied WebSocket */
+ private static HttpConnection getPlainConnection(InetSocketAddress addr,
+ InetSocketAddress proxy,
+ HttpRequestImpl request,
+ HttpClientImpl client) {
+ if (request.isWebSocket() && proxy != null)
+ return new PlainTunnelingConnection(addr, proxy, client);
+
+ if (proxy == null)
+ return new PlainHttpConnection(addr, client);
+ else
+ return new PlainProxyConnection(proxy, client);
}
! void closeOrReturnToCache(HttpHeaders hdrs) {
if (hdrs == null) {
! // the connection was closed by server, eof
close();
return;
}
if (!isOpen()) {
return;
}
+ HttpClientImpl client = client();
+ if (client == null) {
+ close();
+ return;
+ }
ConnectionPool pool = client.connectionPool();
boolean keepAlive = hdrs.firstValue("Connection")
.map((s) -> !s.equalsIgnoreCase("close"))
.orElse(true);
if (keepAlive) {
+ Log.logTrace("Returning connection to the pool: {0}", this);
pool.returnToPool(this);
} else {
close();
}
}
abstract SocketChannel channel();
final InetSocketAddress address() {
return address;
}
! abstract ConnectionPool.CacheKey cacheKey();
! // // overridden in SSL only
! // SSLParameters sslParameters() {
! // return null;
! // }
! /**
! * Closes this connection, by returning the socket to its connection pool.
! */
! @Override
! public abstract void close();
! abstract void shutdownInput() throws IOException;
! abstract void shutdownOutput() throws IOException;
+ // Support for WebSocket/RawChannelImpl which unfortunately
+ // still depends on synchronous read/writes.
+ // It should be removed when RawChannelImpl moves to using asynchronous APIs.
+ abstract static class DetachedConnectionChannel implements Closeable {
+ DetachedConnectionChannel() {}
+ abstract SocketChannel channel();
abstract long write(ByteBuffer[] buffers, int start, int number)
throws IOException;
+ abstract void shutdownInput() throws IOException;
+ abstract void shutdownOutput() throws IOException;
+ abstract ByteBuffer read() throws IOException;
+ @Override
+ public abstract void close();
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName() + ": " + channel().toString();
+ }
+ }
! // Support for WebSocket/RawChannelImpl which unfortunately
! // still depends on synchronous read/writes.
! // It should be removed when RawChannelImpl moves to using asynchronous APIs.
! abstract DetachedConnectionChannel detachChannel();
! abstract FlowTube getConnectionFlow();
/**
! * A publisher that makes it possible to publish (write)
! * ordered (normal priority) and unordered (high priority)
! * buffers downstream.
*/
! final class PlainHttpPublisher implements HttpPublisher {
! final Object reading;
! PlainHttpPublisher() {
! this(new Object());
! }
! PlainHttpPublisher(Object readingLock) {
! this.reading = readingLock;
! }
! final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
! volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
! volatile HttpWriteSubscription subscription;
! final SequentialScheduler writeScheduler =
! new SequentialScheduler(this::flushTask);
! @Override
! public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
! synchronized (reading) {
! //assert this.subscription == null;
! //assert this.subscriber == null;
! if (subscription == null) {
! subscription = new HttpWriteSubscription();
! }
! this.subscriber = subscriber;
! }
! // TODO: should we do this in the flow?
! subscriber.onSubscribe(subscription);
! signal();
! }
! void flushTask(DeferredCompleter completer) {
! try {
! HttpWriteSubscription sub = subscription;
! if (sub != null) sub.flush();
! } finally {
! completer.complete();
! }
! }
! void signal() {
! writeScheduler.runOrSchedule();
! }
!
! final class HttpWriteSubscription implements Flow.Subscription {
! final Demand demand = new Demand();
@Override
! public void request(long n) {
! if (n <= 0) throw new IllegalArgumentException("non-positive request");
! demand.increase(n);
! debug.log(Level.DEBUG, () -> "HttpPublisher: got request of "
! + n + " from "
! + getConnectionFlow());
! writeScheduler.runOrSchedule();
! }
! @Override
! public void cancel() {
! debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by "
! + getConnectionFlow());
! }
! void flush() {
! while (!queue.isEmpty() && demand.tryDecrement()) {
! List<ByteBuffer> elem = queue.poll();
! debug.log(Level.DEBUG, () -> "HttpPublisher: sending "
! + Utils.remaining(elem) + " bytes ("
! + elem.size() + " buffers) to "
! + getConnectionFlow());
! subscriber.onNext(elem);
! }
! }
! }
! @Override
! public void enqueue(List<ByteBuffer> buffers) throws IOException {
! queue.add(buffers);
! int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
! debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes);
}
+
+ @Override
+ public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
+ // Unordered frames are sent before existing frames.
+ int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
+ queue.addFirst(buffers);
+ debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
}
! @Override
! public void signalEnqueued() throws IOException {
! debug.log(Level.DEBUG, "signalling the publisher of the write queue");
! signal();
! }
}
! String dbgTag = null;
! final String dbgString() {
! FlowTube flow = getConnectionFlow();
! String tag = dbgTag;
! if (tag == null && flow != null) {
! dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
! } else if (tag == null) {
! tag = this.getClass().getSimpleName() + "(?)";
! }
! return tag;
! }
@Override
public String toString() {
return "HttpConnection: " + channel().toString();
}
< prev index next >