< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java

Print this page

        

*** 23,349 **** * questions. */ package jdk.incubator.http; - import javax.net.ssl.SSLParameters; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.concurrent.CompletableFuture; ! ! import jdk.incubator.http.internal.common.ByteBufferReference; /** * Wraps socket channel layer and takes care of SSL also. * * Subtypes are: * PlainHttpConnection: regular direct TCP connection to server * PlainProxyConnection: plain text proxy connection * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server ! * SSLConnection: TLS channel direct to server ! * SSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel */ abstract class HttpConnection implements Closeable { ! enum Mode { ! BLOCKING, ! NON_BLOCKING, ! ASYNC ! } ! ! protected Mode mode; ! // address we are connected to. Could be a server or a proxy final InetSocketAddress address; ! final HttpClientImpl client; HttpConnection(InetSocketAddress address, HttpClientImpl client) { this.address = address; this.client = client; } ! /** ! * Public API to this class. addr is the ultimate destination. Any proxies ! * etc are figured out from the request. Returns an instance of one of the ! * following ! * PlainHttpConnection ! * PlainTunnelingConnection ! * SSLConnection ! * SSLTunnelConnection ! * ! * When object returned, connect() or connectAsync() must be called, which ! * when it returns/completes, the connection is usable for requests. ! */ ! public static HttpConnection getConnection( ! InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request) ! { ! return getConnectionImpl(addr, client, request, false); } ! /** ! * Called specifically to get an async connection for HTTP/2 over SSL. ! */ ! public static HttpConnection getConnection(InetSocketAddress addr, ! HttpClientImpl client, HttpRequestImpl request, boolean isHttp2) { ! return getConnectionImpl(addr, client, request, isHttp2); } ! public abstract void connect() throws IOException, InterruptedException; public abstract CompletableFuture<Void> connectAsync(); ! /** ! * Returns whether this connection is connected to its destination ! */ abstract boolean connected(); abstract boolean isSecure(); abstract boolean isProxied(); ! /** ! * Completes when the first byte of the response is available to be read. ! */ ! abstract CompletableFuture<Void> whenReceivingResponse(); ! final boolean isOpen() { ! return channel().isOpen(); } ! /* Returns either a plain HTTP connection or a plain tunnelling connection ! * for proxied WebSocket */ ! private static HttpConnection getPlainConnection(InetSocketAddress addr, ! InetSocketAddress proxy, ! HttpRequestImpl request, ! HttpClientImpl client) { ! if (request.isWebSocket() && proxy != null) { ! return new PlainTunnelingConnection(addr, proxy, client); ! } else { ! if (proxy == null) { ! return new PlainHttpConnection(addr, client); ! } else { ! return new PlainProxyConnection(proxy, client); ! } ! } } ! private static HttpConnection getSSLConnection(InetSocketAddress addr, ! InetSocketAddress proxy, HttpRequestImpl request, ! String[] alpn, boolean isHttp2, HttpClientImpl client) ! { ! if (proxy != null) { ! if (!isHttp2) { ! return new SSLTunnelConnection(addr, client, proxy); ! } else { ! return new AsyncSSLTunnelConnection(addr, client, alpn, proxy); ! } ! } else if (!isHttp2) { ! return new SSLConnection(addr, client, alpn); ! } else { ! return new AsyncSSLConnection(addr, client, alpn); ! } ! } /** ! * Main factory method. Gets a HttpConnection, either cached or new if ! * none available. */ ! private static HttpConnection getConnectionImpl(InetSocketAddress addr, HttpClientImpl client, ! HttpRequestImpl request, boolean isHttp2) ! { HttpConnection c = null; ! InetSocketAddress proxy = request.proxy(client); if (proxy != null && proxy.isUnresolved()) { ! // The default proxy selector may select a proxy whose ! // address is unresolved. We must resolve the address ! // before using it to connect. proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort()); } boolean secure = request.secure(); ConnectionPool pool = client.connectionPool(); - String[] alpn = null; - - if (secure && isHttp2) { - alpn = new String[2]; - alpn[0] = "h2"; - alpn[1] = "http/1.1"; - } if (!secure) { c = pool.getConnection(false, addr, proxy); ! if (c != null) { return c; } else { return getPlainConnection(addr, proxy, request, client); } ! } else { ! if (!isHttp2) { // if http2 we don't cache connections c = pool.getConnection(true, addr, proxy); } ! if (c != null) { return c; } else { ! return getSSLConnection(addr, proxy, request, alpn, isHttp2, client); } } } ! void returnToCache(HttpHeaders hdrs) { if (hdrs == null) { ! // the connection was closed by server close(); return; } if (!isOpen()) { return; } ConnectionPool pool = client.connectionPool(); boolean keepAlive = hdrs.firstValue("Connection") .map((s) -> !s.equalsIgnoreCase("close")) .orElse(true); if (keepAlive) { pool.returnToPool(this); } else { close(); } } - /** - * Also check that the number of bytes written is what was expected. This - * could be different if the buffer is user-supplied and its internal - * pointers were manipulated in a race condition. - */ - final void checkWrite(long expected, ByteBuffer buffer) throws IOException { - long written = write(buffer); - if (written != expected) { - throw new IOException("incorrect number of bytes written"); - } - } - - final void checkWrite(long expected, - ByteBuffer[] buffers, - int start, - int length) - throws IOException - { - long written = write(buffers, start, length); - if (written != expected) { - throw new IOException("incorrect number of bytes written"); - } - } - abstract SocketChannel channel(); final InetSocketAddress address() { return address; } ! synchronized void configureMode(Mode mode) throws IOException { ! this.mode = mode; ! if (mode == Mode.BLOCKING) { ! channel().configureBlocking(true); ! } else { ! channel().configureBlocking(false); ! } ! } ! synchronized Mode getMode() { ! return mode; ! } ! abstract ConnectionPool.CacheKey cacheKey(); ! // overridden in SSL only ! SSLParameters sslParameters() { ! return null; ! } ! // Methods to be implemented for Plain TCP and SSL abstract long write(ByteBuffer[] buffers, int start, int number) throws IOException; ! abstract long write(ByteBuffer buffer) throws IOException; ! // Methods to be implemented for Plain TCP (async mode) and AsyncSSL /** ! * In {@linkplain Mode#ASYNC async mode}, this method puts buffers at the ! * end of the send queue; Otherwise, it is equivalent to {@link ! * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}. ! * When in async mode, calling this method should later be followed by ! * subsequent flushAsync invocation. ! * That allows multiple threads to put buffers into the queue while some other ! * thread is writing. */ ! abstract void writeAsync(ByteBufferReference[] buffers) throws IOException; ! /** ! * In {@linkplain Mode#ASYNC async mode}, this method may put ! * buffers at the beginning of send queue, breaking frames sequence and ! * allowing to write these buffers before other buffers in the queue; ! * Otherwise, it is equivalent to {@link ! * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}. ! * When in async mode, calling this method should later be followed by ! * subsequent flushAsync invocation. ! * That allows multiple threads to put buffers into the queue while some other ! * thread is writing. ! */ ! abstract void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException; ! /** ! * This method should be called after any writeAsync/writeAsyncUnordered ! * invocation. ! * If there is a race to flushAsync from several threads one thread ! * (race winner) capture flush operation and write the whole queue content. ! * Other threads (race losers) exits from the method (not blocking) ! * and continue execution. ! */ ! abstract void flushAsync() throws IOException; - /** - * Closes this connection, by returning the socket to its connection pool. - */ @Override ! public abstract void close(); ! abstract void shutdownInput() throws IOException; ! abstract void shutdownOutput() throws IOException; ! /** ! * Puts position to limit and limit to capacity so we can resume reading ! * into this buffer, but if required > 0 then limit may be reduced so that ! * no more than required bytes are read next time. ! */ ! static void resumeChannelRead(ByteBuffer buf, int required) { ! int limit = buf.limit(); ! buf.position(limit); ! int capacity = buf.capacity() - limit; ! if (required > 0 && required < capacity) { ! buf.limit(limit + required); ! } else { ! buf.limit(buf.capacity()); } } ! final ByteBuffer read() throws IOException { ! ByteBuffer b = readImpl(); ! return b; } ! /* ! * Returns a ByteBuffer with the data available at the moment, or null if ! * reached EOF. ! */ ! protected abstract ByteBuffer readImpl() throws IOException; @Override public String toString() { return "HttpConnection: " + channel().toString(); } --- 23,409 ---- * questions. */ package jdk.incubator.http; import java.io.Closeable; import java.io.IOException; + import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; + import java.util.IdentityHashMap; + import java.util.List; + import java.util.Map; import java.util.concurrent.CompletableFuture; ! import java.util.concurrent.CompletionStage; ! import java.util.concurrent.ConcurrentLinkedDeque; ! import java.util.concurrent.Flow; ! import jdk.incubator.http.HttpClient.Version; ! import jdk.incubator.http.internal.common.Demand; ! import jdk.incubator.http.internal.common.FlowTube; ! import jdk.incubator.http.internal.common.SequentialScheduler; ! import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter; ! import jdk.incubator.http.internal.common.Log; ! import jdk.incubator.http.internal.common.Utils; ! import static jdk.incubator.http.HttpClient.Version.HTTP_2; /** * Wraps socket channel layer and takes care of SSL also. * * Subtypes are: * PlainHttpConnection: regular direct TCP connection to server * PlainProxyConnection: plain text proxy connection * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server ! * AsyncSSLConnection: TLS channel direct to server ! * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel */ abstract class HttpConnection implements Closeable { ! static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. ! final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); ! final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger( ! () -> "HttpConnection(SocketTube(?))", DEBUG); ! /** The address this connection is connected to. Could be a server or a proxy. */ final InetSocketAddress address; ! private final HttpClientImpl client; ! private final TrailingOperations trailingOperations; HttpConnection(InetSocketAddress address, HttpClientImpl client) { this.address = address; this.client = client; + trailingOperations = new TrailingOperations(); } ! private static final class TrailingOperations { ! private final Map<CompletionStage<?>, Boolean> operations = ! new IdentityHashMap<>(); ! void add(CompletionStage<?> cf) { ! synchronized(operations) { ! cf.whenComplete((r,t)-> remove(cf)); ! operations.put(cf, Boolean.TRUE); ! } ! } ! boolean remove(CompletionStage<?> cf) { ! synchronized(operations) { ! return operations.remove(cf); ! } ! } } ! final void addTrailingOperation(CompletionStage<?> cf) { ! trailingOperations.add(cf); ! } ! // final void removeTrailingOperation(CompletableFuture<?> cf) { ! // trailingOperations.remove(cf); ! // } ! ! final HttpClientImpl client() { ! return client; } ! //public abstract void connect() throws IOException, InterruptedException; public abstract CompletableFuture<Void> connectAsync(); ! /** Tells whether, or not, this connection is connected to its destination. */ abstract boolean connected(); + /** Tells whether, or not, this connection is secure ( over SSL ) */ abstract boolean isSecure(); + /** Tells whether, or not, this connection is proxied. */ abstract boolean isProxied(); ! /** Tells whether, or not, this connection is open. */ final boolean isOpen() { ! return channel().isOpen() && ! (connected() ? !getConnectionFlow().isFinished() : true); } ! interface HttpPublisher extends FlowTube.TubePublisher { ! void enqueue(List<ByteBuffer> buffers) throws IOException; ! void enqueueUnordered(List<ByteBuffer> buffers) throws IOException; ! void signalEnqueued() throws IOException; } ! /** ! * Returns the HTTP publisher associated with this connection. May be null ! * if invoked before connecting. ! */ ! abstract HttpPublisher publisher(); /** ! * Factory for retrieving HttpConnections. A connection can be retrieved ! * from the connection pool, or a new one created if none available. ! * ! * The given {@code addr} is the ultimate destination. Any proxies, ! * etc, are determined from the request. Returns a concrete instance which ! * is one of the following: ! * {@link PlainHttpConnection} ! * {@link PlainTunnelingConnection} ! * ! * The returned connection, if not from the connection pool, must have its, ! * connect() or connectAsync() method invoked, which ( when it completes ! * successfully ) renders the connection usable for requests. */ ! public static HttpConnection getConnection(InetSocketAddress addr, HttpClientImpl client, ! HttpRequestImpl request, ! Version version) { HttpConnection c = null; ! InetSocketAddress proxy = request.proxy(); if (proxy != null && proxy.isUnresolved()) { ! // The default proxy selector may select a proxy whose address is ! // unresolved. We must resolve the address before connecting to it. proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort()); } boolean secure = request.secure(); ConnectionPool pool = client.connectionPool(); if (!secure) { c = pool.getConnection(false, addr, proxy); ! if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { ! final HttpConnection conn = c; ! DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() ! + ": plain connection retrieved from HTTP/1.1 pool"); return c; } else { return getPlainConnection(addr, proxy, request, client); } ! } else { // secure ! if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool c = pool.getConnection(true, addr, proxy); } ! if (c != null && c.isOpen()) { ! final HttpConnection conn = c; ! DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() ! + ": SSL connection retrieved from HTTP/1.1 pool"); return c; } else { ! String[] alpn = null; ! if (version == HTTP_2) { ! alpn = new String[] { "h2", "http/1.1" }; ! } ! return getSSLConnection(addr, proxy, alpn, client); ! } } } + + private static HttpConnection getSSLConnection(InetSocketAddress addr, + InetSocketAddress proxy, + String[] alpn, + HttpClientImpl client) { + if (proxy != null) + return new AsyncSSLTunnelConnection(addr, client, alpn, proxy); + else + return new AsyncSSLConnection(addr, client, alpn); + } + + /* Returns either a plain HTTP connection or a plain tunnelling connection + * for proxied WebSocket */ + private static HttpConnection getPlainConnection(InetSocketAddress addr, + InetSocketAddress proxy, + HttpRequestImpl request, + HttpClientImpl client) { + if (request.isWebSocket() && proxy != null) + return new PlainTunnelingConnection(addr, proxy, client); + + if (proxy == null) + return new PlainHttpConnection(addr, client); + else + return new PlainProxyConnection(proxy, client); } ! void closeOrReturnToCache(HttpHeaders hdrs) { if (hdrs == null) { ! // the connection was closed by server, eof close(); return; } if (!isOpen()) { return; } + HttpClientImpl client = client(); + if (client == null) { + close(); + return; + } ConnectionPool pool = client.connectionPool(); boolean keepAlive = hdrs.firstValue("Connection") .map((s) -> !s.equalsIgnoreCase("close")) .orElse(true); if (keepAlive) { + Log.logTrace("Returning connection to the pool: {0}", this); pool.returnToPool(this); } else { close(); } } abstract SocketChannel channel(); final InetSocketAddress address() { return address; } ! abstract ConnectionPool.CacheKey cacheKey(); ! // // overridden in SSL only ! // SSLParameters sslParameters() { ! // return null; ! // } ! /** ! * Closes this connection, by returning the socket to its connection pool. ! */ ! @Override ! public abstract void close(); ! abstract void shutdownInput() throws IOException; ! abstract void shutdownOutput() throws IOException; + // Support for WebSocket/RawChannelImpl which unfortunately + // still depends on synchronous read/writes. + // It should be removed when RawChannelImpl moves to using asynchronous APIs. + abstract static class DetachedConnectionChannel implements Closeable { + DetachedConnectionChannel() {} + abstract SocketChannel channel(); abstract long write(ByteBuffer[] buffers, int start, int number) throws IOException; + abstract void shutdownInput() throws IOException; + abstract void shutdownOutput() throws IOException; + abstract ByteBuffer read() throws IOException; + @Override + public abstract void close(); + @Override + public String toString() { + return this.getClass().getSimpleName() + ": " + channel().toString(); + } + } ! // Support for WebSocket/RawChannelImpl which unfortunately ! // still depends on synchronous read/writes. ! // It should be removed when RawChannelImpl moves to using asynchronous APIs. ! abstract DetachedConnectionChannel detachChannel(); ! abstract FlowTube getConnectionFlow(); /** ! * A publisher that makes it possible to publish (write) ! * ordered (normal priority) and unordered (high priority) ! * buffers downstream. */ ! final class PlainHttpPublisher implements HttpPublisher { ! final Object reading; ! PlainHttpPublisher() { ! this(new Object()); ! } ! PlainHttpPublisher(Object readingLock) { ! this.reading = readingLock; ! } ! final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>(); ! volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; ! volatile HttpWriteSubscription subscription; ! final SequentialScheduler writeScheduler = ! new SequentialScheduler(this::flushTask); ! @Override ! public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { ! synchronized (reading) { ! //assert this.subscription == null; ! //assert this.subscriber == null; ! if (subscription == null) { ! subscription = new HttpWriteSubscription(); ! } ! this.subscriber = subscriber; ! } ! // TODO: should we do this in the flow? ! subscriber.onSubscribe(subscription); ! signal(); ! } ! void flushTask(DeferredCompleter completer) { ! try { ! HttpWriteSubscription sub = subscription; ! if (sub != null) sub.flush(); ! } finally { ! completer.complete(); ! } ! } ! void signal() { ! writeScheduler.runOrSchedule(); ! } ! ! final class HttpWriteSubscription implements Flow.Subscription { ! final Demand demand = new Demand(); @Override ! public void request(long n) { ! if (n <= 0) throw new IllegalArgumentException("non-positive request"); ! demand.increase(n); ! debug.log(Level.DEBUG, () -> "HttpPublisher: got request of " ! + n + " from " ! + getConnectionFlow()); ! writeScheduler.runOrSchedule(); ! } ! @Override ! public void cancel() { ! debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by " ! + getConnectionFlow()); ! } ! void flush() { ! while (!queue.isEmpty() && demand.tryDecrement()) { ! List<ByteBuffer> elem = queue.poll(); ! debug.log(Level.DEBUG, () -> "HttpPublisher: sending " ! + Utils.remaining(elem) + " bytes (" ! + elem.size() + " buffers) to " ! + getConnectionFlow()); ! subscriber.onNext(elem); ! } ! } ! } ! @Override ! public void enqueue(List<ByteBuffer> buffers) throws IOException { ! queue.add(buffers); ! int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); ! debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes); } + + @Override + public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException { + // Unordered frames are sent before existing frames. + int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); + queue.addFirst(buffers); + debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes); } ! @Override ! public void signalEnqueued() throws IOException { ! debug.log(Level.DEBUG, "signalling the publisher of the write queue"); ! signal(); ! } } ! String dbgTag = null; ! final String dbgString() { ! FlowTube flow = getConnectionFlow(); ! String tag = dbgTag; ! if (tag == null && flow != null) { ! dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")"; ! } else if (tag == null) { ! tag = this.getClass().getSimpleName() + "(?)"; ! } ! return tag; ! } @Override public String toString() { return "HttpConnection: " + channel().toString(); }
< prev index next >