--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java 2017-11-30 04:03:58.910046268 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java 2017-11-30 04:03:58.681026247 -0800 @@ -25,15 +25,27 @@ package jdk.incubator.http; -import javax.net.ssl.SSLParameters; import java.io.Closeable; import java.io.IOException; +import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; - -import jdk.incubator.http.internal.common.ByteBufferReference; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Flow; +import jdk.incubator.http.HttpClient.Version; +import jdk.incubator.http.internal.common.Demand; +import jdk.incubator.http.internal.common.FlowTube; +import jdk.incubator.http.internal.common.SequentialScheduler; +import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter; +import jdk.incubator.http.internal.common.Log; +import jdk.incubator.http.internal.common.Utils; +import static jdk.incubator.http.HttpClient.Version.HTTP_2; /** * Wraps socket channel layer and takes care of SSL also. @@ -42,269 +54,207 @@ * PlainHttpConnection: regular direct TCP connection to server * PlainProxyConnection: plain text proxy connection * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server - * SSLConnection: TLS channel direct to server - * SSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel + * AsyncSSLConnection: TLS channel direct to server + * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel */ abstract class HttpConnection implements Closeable { - enum Mode { - BLOCKING, - NON_BLOCKING, - ASYNC - } - - protected Mode mode; + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); + final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger( + () -> "HttpConnection(SocketTube(?))", DEBUG); - // address we are connected to. Could be a server or a proxy + /** The address this connection is connected to. Could be a server or a proxy. */ final InetSocketAddress address; - final HttpClientImpl client; + private final HttpClientImpl client; + private final TrailingOperations trailingOperations; HttpConnection(InetSocketAddress address, HttpClientImpl client) { this.address = address; this.client = client; + trailingOperations = new TrailingOperations(); } - /** - * Public API to this class. addr is the ultimate destination. Any proxies - * etc are figured out from the request. Returns an instance of one of the - * following - * PlainHttpConnection - * PlainTunnelingConnection - * SSLConnection - * SSLTunnelConnection - * - * When object returned, connect() or connectAsync() must be called, which - * when it returns/completes, the connection is usable for requests. - */ - public static HttpConnection getConnection( - InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request) - { - return getConnectionImpl(addr, client, request, false); + private static final class TrailingOperations { + private final Map, Boolean> operations = + new IdentityHashMap<>(); + void add(CompletionStage cf) { + synchronized(operations) { + cf.whenComplete((r,t)-> remove(cf)); + operations.put(cf, Boolean.TRUE); + } + } + boolean remove(CompletionStage cf) { + synchronized(operations) { + return operations.remove(cf); + } + } } - /** - * Called specifically to get an async connection for HTTP/2 over SSL. - */ - public static HttpConnection getConnection(InetSocketAddress addr, - HttpClientImpl client, HttpRequestImpl request, boolean isHttp2) { + final void addTrailingOperation(CompletionStage cf) { + trailingOperations.add(cf); + } + +// final void removeTrailingOperation(CompletableFuture cf) { +// trailingOperations.remove(cf); +// } - return getConnectionImpl(addr, client, request, isHttp2); + final HttpClientImpl client() { + return client; } - public abstract void connect() throws IOException, InterruptedException; + //public abstract void connect() throws IOException, InterruptedException; public abstract CompletableFuture connectAsync(); - /** - * Returns whether this connection is connected to its destination - */ + /** Tells whether, or not, this connection is connected to its destination. */ abstract boolean connected(); + /** Tells whether, or not, this connection is secure ( over SSL ) */ abstract boolean isSecure(); + /** Tells whether, or not, this connection is proxied. */ abstract boolean isProxied(); - /** - * Completes when the first byte of the response is available to be read. - */ - abstract CompletableFuture whenReceivingResponse(); - + /** Tells whether, or not, this connection is open. */ final boolean isOpen() { - return channel().isOpen(); + return channel().isOpen() && + (connected() ? !getConnectionFlow().isFinished() : true); } - /* Returns either a plain HTTP connection or a plain tunnelling connection - * for proxied WebSocket */ - private static HttpConnection getPlainConnection(InetSocketAddress addr, - InetSocketAddress proxy, - HttpRequestImpl request, - HttpClientImpl client) { - if (request.isWebSocket() && proxy != null) { - return new PlainTunnelingConnection(addr, proxy, client); - } else { - if (proxy == null) { - return new PlainHttpConnection(addr, client); - } else { - return new PlainProxyConnection(proxy, client); - } - } + interface HttpPublisher extends FlowTube.TubePublisher { + void enqueue(List buffers) throws IOException; + void enqueueUnordered(List buffers) throws IOException; + void signalEnqueued() throws IOException; } - private static HttpConnection getSSLConnection(InetSocketAddress addr, - InetSocketAddress proxy, HttpRequestImpl request, - String[] alpn, boolean isHttp2, HttpClientImpl client) - { - if (proxy != null) { - if (!isHttp2) { - return new SSLTunnelConnection(addr, client, proxy); - } else { - return new AsyncSSLTunnelConnection(addr, client, alpn, proxy); - } - } else if (!isHttp2) { - return new SSLConnection(addr, client, alpn); - } else { - return new AsyncSSLConnection(addr, client, alpn); - } - } + /** + * Returns the HTTP publisher associated with this connection. May be null + * if invoked before connecting. + */ + abstract HttpPublisher publisher(); /** - * Main factory method. Gets a HttpConnection, either cached or new if - * none available. + * Factory for retrieving HttpConnections. A connection can be retrieved + * from the connection pool, or a new one created if none available. + * + * The given {@code addr} is the ultimate destination. Any proxies, + * etc, are determined from the request. Returns a concrete instance which + * is one of the following: + * {@link PlainHttpConnection} + * {@link PlainTunnelingConnection} + * + * The returned connection, if not from the connection pool, must have its, + * connect() or connectAsync() method invoked, which ( when it completes + * successfully ) renders the connection usable for requests. */ - private static HttpConnection getConnectionImpl(InetSocketAddress addr, - HttpClientImpl client, - HttpRequestImpl request, boolean isHttp2) - { + public static HttpConnection getConnection(InetSocketAddress addr, + HttpClientImpl client, + HttpRequestImpl request, + Version version) { HttpConnection c = null; - InetSocketAddress proxy = request.proxy(client); + InetSocketAddress proxy = request.proxy(); if (proxy != null && proxy.isUnresolved()) { - // The default proxy selector may select a proxy whose - // address is unresolved. We must resolve the address - // before using it to connect. + // The default proxy selector may select a proxy whose address is + // unresolved. We must resolve the address before connecting to it. proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort()); } boolean secure = request.secure(); ConnectionPool pool = client.connectionPool(); - String[] alpn = null; - - if (secure && isHttp2) { - alpn = new String[2]; - alpn[0] = "h2"; - alpn[1] = "http/1.1"; - } if (!secure) { c = pool.getConnection(false, addr, proxy); - if (c != null) { + if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { + final HttpConnection conn = c; + DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() + + ": plain connection retrieved from HTTP/1.1 pool"); return c; } else { return getPlainConnection(addr, proxy, request, client); } - } else { - if (!isHttp2) { // if http2 we don't cache connections + } else { // secure + if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool c = pool.getConnection(true, addr, proxy); } - if (c != null) { + if (c != null && c.isOpen()) { + final HttpConnection conn = c; + DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() + + ": SSL connection retrieved from HTTP/1.1 pool"); return c; } else { - return getSSLConnection(addr, proxy, request, alpn, isHttp2, client); + String[] alpn = null; + if (version == HTTP_2) { + alpn = new String[] { "h2", "http/1.1" }; + } + return getSSLConnection(addr, proxy, alpn, client); } } } - void returnToCache(HttpHeaders hdrs) { + private static HttpConnection getSSLConnection(InetSocketAddress addr, + InetSocketAddress proxy, + String[] alpn, + HttpClientImpl client) { + if (proxy != null) + return new AsyncSSLTunnelConnection(addr, client, alpn, proxy); + else + return new AsyncSSLConnection(addr, client, alpn); + } + + /* Returns either a plain HTTP connection or a plain tunnelling connection + * for proxied WebSocket */ + private static HttpConnection getPlainConnection(InetSocketAddress addr, + InetSocketAddress proxy, + HttpRequestImpl request, + HttpClientImpl client) { + if (request.isWebSocket() && proxy != null) + return new PlainTunnelingConnection(addr, proxy, client); + + if (proxy == null) + return new PlainHttpConnection(addr, client); + else + return new PlainProxyConnection(proxy, client); + } + + void closeOrReturnToCache(HttpHeaders hdrs) { if (hdrs == null) { - // the connection was closed by server + // the connection was closed by server, eof close(); return; } if (!isOpen()) { return; } + HttpClientImpl client = client(); + if (client == null) { + close(); + return; + } ConnectionPool pool = client.connectionPool(); boolean keepAlive = hdrs.firstValue("Connection") .map((s) -> !s.equalsIgnoreCase("close")) .orElse(true); if (keepAlive) { + Log.logTrace("Returning connection to the pool: {0}", this); pool.returnToPool(this); } else { close(); } } - /** - * Also check that the number of bytes written is what was expected. This - * could be different if the buffer is user-supplied and its internal - * pointers were manipulated in a race condition. - */ - final void checkWrite(long expected, ByteBuffer buffer) throws IOException { - long written = write(buffer); - if (written != expected) { - throw new IOException("incorrect number of bytes written"); - } - } - - final void checkWrite(long expected, - ByteBuffer[] buffers, - int start, - int length) - throws IOException - { - long written = write(buffers, start, length); - if (written != expected) { - throw new IOException("incorrect number of bytes written"); - } - } - abstract SocketChannel channel(); final InetSocketAddress address() { return address; } - synchronized void configureMode(Mode mode) throws IOException { - this.mode = mode; - if (mode == Mode.BLOCKING) { - channel().configureBlocking(true); - } else { - channel().configureBlocking(false); - } - } - - synchronized Mode getMode() { - return mode; - } - abstract ConnectionPool.CacheKey cacheKey(); - // overridden in SSL only - SSLParameters sslParameters() { - return null; - } - - // Methods to be implemented for Plain TCP and SSL - - abstract long write(ByteBuffer[] buffers, int start, int number) - throws IOException; - - abstract long write(ByteBuffer buffer) throws IOException; - - // Methods to be implemented for Plain TCP (async mode) and AsyncSSL - - /** - * In {@linkplain Mode#ASYNC async mode}, this method puts buffers at the - * end of the send queue; Otherwise, it is equivalent to {@link - * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}. - * When in async mode, calling this method should later be followed by - * subsequent flushAsync invocation. - * That allows multiple threads to put buffers into the queue while some other - * thread is writing. - */ - abstract void writeAsync(ByteBufferReference[] buffers) throws IOException; - - /** - * In {@linkplain Mode#ASYNC async mode}, this method may put - * buffers at the beginning of send queue, breaking frames sequence and - * allowing to write these buffers before other buffers in the queue; - * Otherwise, it is equivalent to {@link - * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}. - * When in async mode, calling this method should later be followed by - * subsequent flushAsync invocation. - * That allows multiple threads to put buffers into the queue while some other - * thread is writing. - */ - abstract void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException; - - /** - * This method should be called after any writeAsync/writeAsyncUnordered - * invocation. - * If there is a race to flushAsync from several threads one thread - * (race winner) capture flush operation and write the whole queue content. - * Other threads (race losers) exits from the method (not blocking) - * and continue execution. - */ - abstract void flushAsync() throws IOException; +// // overridden in SSL only +// SSLParameters sslParameters() { +// return null; +// } /** * Closes this connection, by returning the socket to its connection pool. @@ -316,32 +266,142 @@ abstract void shutdownOutput() throws IOException; - /** - * Puts position to limit and limit to capacity so we can resume reading - * into this buffer, but if required > 0 then limit may be reduced so that - * no more than required bytes are read next time. - */ - static void resumeChannelRead(ByteBuffer buf, int required) { - int limit = buf.limit(); - buf.position(limit); - int capacity = buf.capacity() - limit; - if (required > 0 && required < capacity) { - buf.limit(limit + required); - } else { - buf.limit(buf.capacity()); + // Support for WebSocket/RawChannelImpl which unfortunately + // still depends on synchronous read/writes. + // It should be removed when RawChannelImpl moves to using asynchronous APIs. + abstract static class DetachedConnectionChannel implements Closeable { + DetachedConnectionChannel() {} + abstract SocketChannel channel(); + abstract long write(ByteBuffer[] buffers, int start, int number) + throws IOException; + abstract void shutdownInput() throws IOException; + abstract void shutdownOutput() throws IOException; + abstract ByteBuffer read() throws IOException; + @Override + public abstract void close(); + @Override + public String toString() { + return this.getClass().getSimpleName() + ": " + channel().toString(); } } - final ByteBuffer read() throws IOException { - ByteBuffer b = readImpl(); - return b; + // Support for WebSocket/RawChannelImpl which unfortunately + // still depends on synchronous read/writes. + // It should be removed when RawChannelImpl moves to using asynchronous APIs. + abstract DetachedConnectionChannel detachChannel(); + + abstract FlowTube getConnectionFlow(); + + /** + * A publisher that makes it possible to publish (write) + * ordered (normal priority) and unordered (high priority) + * buffers downstream. + */ + final class PlainHttpPublisher implements HttpPublisher { + final Object reading; + PlainHttpPublisher() { + this(new Object()); + } + PlainHttpPublisher(Object readingLock) { + this.reading = readingLock; + } + final ConcurrentLinkedDeque> queue = new ConcurrentLinkedDeque<>(); + volatile Flow.Subscriber> subscriber; + volatile HttpWriteSubscription subscription; + final SequentialScheduler writeScheduler = + new SequentialScheduler(this::flushTask); + @Override + public void subscribe(Flow.Subscriber> subscriber) { + synchronized (reading) { + //assert this.subscription == null; + //assert this.subscriber == null; + if (subscription == null) { + subscription = new HttpWriteSubscription(); + } + this.subscriber = subscriber; + } + // TODO: should we do this in the flow? + subscriber.onSubscribe(subscription); + signal(); + } + + void flushTask(DeferredCompleter completer) { + try { + HttpWriteSubscription sub = subscription; + if (sub != null) sub.flush(); + } finally { + completer.complete(); + } + } + + void signal() { + writeScheduler.runOrSchedule(); + } + + final class HttpWriteSubscription implements Flow.Subscription { + final Demand demand = new Demand(); + + @Override + public void request(long n) { + if (n <= 0) throw new IllegalArgumentException("non-positive request"); + demand.increase(n); + debug.log(Level.DEBUG, () -> "HttpPublisher: got request of " + + n + " from " + + getConnectionFlow()); + writeScheduler.runOrSchedule(); + } + + @Override + public void cancel() { + debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by " + + getConnectionFlow()); + } + + void flush() { + while (!queue.isEmpty() && demand.tryDecrement()) { + List elem = queue.poll(); + debug.log(Level.DEBUG, () -> "HttpPublisher: sending " + + Utils.remaining(elem) + " bytes (" + + elem.size() + " buffers) to " + + getConnectionFlow()); + subscriber.onNext(elem); + } + } + } + + @Override + public void enqueue(List buffers) throws IOException { + queue.add(buffers); + int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); + debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes); + } + + @Override + public void enqueueUnordered(List buffers) throws IOException { + // Unordered frames are sent before existing frames. + int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); + queue.addFirst(buffers); + debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes); + } + + @Override + public void signalEnqueued() throws IOException { + debug.log(Level.DEBUG, "signalling the publisher of the write queue"); + signal(); + } } - /* - * Returns a ByteBuffer with the data available at the moment, or null if - * reached EOF. - */ - protected abstract ByteBuffer readImpl() throws IOException; + String dbgTag = null; + final String dbgString() { + FlowTube flow = getConnectionFlow(); + String tag = dbgTag; + if (tag == null && flow != null) { + dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")"; + } else if (tag == null) { + tag = this.getClass().getSimpleName() + "(?)"; + } + return tag; + } @Override public String toString() {