< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java

Print this page




   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import javax.net.ssl.SSLParameters;
  29 import java.io.Closeable;
  30 import java.io.IOException;

  31 import java.net.InetSocketAddress;
  32 import java.nio.ByteBuffer;
  33 import java.nio.channels.SocketChannel;



  34 import java.util.concurrent.CompletableFuture;
  35 
  36 import jdk.incubator.http.internal.common.ByteBufferReference;









  37 
  38 /**
  39  * Wraps socket channel layer and takes care of SSL also.
  40  *
  41  * Subtypes are:
  42  *      PlainHttpConnection: regular direct TCP connection to server
  43  *      PlainProxyConnection: plain text proxy connection
  44  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
  45  *      SSLConnection: TLS channel direct to server
  46  *      SSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
  47  */
  48 abstract class HttpConnection implements Closeable {
  49 
  50     enum Mode {
  51         BLOCKING,
  52         NON_BLOCKING,
  53         ASYNC
  54     }
  55 
  56     protected Mode mode;
  57 
  58     // address we are connected to. Could be a server or a proxy
  59     final InetSocketAddress address;
  60     final HttpClientImpl client;

  61 
  62     HttpConnection(InetSocketAddress address, HttpClientImpl client) {
  63         this.address = address;
  64         this.client = client;

  65     }
  66 
  67     /**
  68      * Public API to this class. addr is the ultimate destination. Any proxies
  69      * etc are figured out from the request. Returns an instance of one of the
  70      * following
  71      *      PlainHttpConnection
  72      *      PlainTunnelingConnection
  73      *      SSLConnection
  74      *      SSLTunnelConnection
  75      *
  76      * When object returned, connect() or connectAsync() must be called, which
  77      * when it returns/completes, the connection is usable for requests.
  78      */
  79     public static HttpConnection getConnection(
  80             InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request)
  81     {
  82         return getConnectionImpl(addr, client, request, false);
  83     }
  84 
  85     /**
  86      * Called specifically to get an async connection for HTTP/2 over SSL.
  87      */
  88     public static HttpConnection getConnection(InetSocketAddress addr,
  89         HttpClientImpl client, HttpRequestImpl request, boolean isHttp2) {
  90 
  91         return getConnectionImpl(addr, client, request, isHttp2);





  92     }
  93 
  94     public abstract void connect() throws IOException, InterruptedException;
  95 
  96     public abstract CompletableFuture<Void> connectAsync();
  97 
  98     /**
  99      * Returns whether this connection is connected to its destination
 100      */
 101     abstract boolean connected();
 102 

 103     abstract boolean isSecure();
 104 

 105     abstract boolean isProxied();
 106 
 107     /**
 108      * Completes when the first byte of the response is available to be read.
 109      */
 110     abstract CompletableFuture<Void> whenReceivingResponse();
 111 
 112     final boolean isOpen() {
 113         return channel().isOpen();

 114     }
 115 
 116     /* Returns either a plain HTTP connection or a plain tunnelling connection
 117      * for proxied WebSocket */
 118     private static HttpConnection getPlainConnection(InetSocketAddress addr,
 119                                                      InetSocketAddress proxy,
 120                                                      HttpRequestImpl request,
 121                                                      HttpClientImpl client) {
 122         if (request.isWebSocket() && proxy != null) {
 123             return new PlainTunnelingConnection(addr, proxy, client);
 124         } else {
 125             if (proxy == null) {
 126                 return new PlainHttpConnection(addr, client);
 127             } else {
 128                 return new PlainProxyConnection(proxy, client);
 129             }
 130         }
 131     }
 132 
 133     private static HttpConnection getSSLConnection(InetSocketAddress addr,
 134             InetSocketAddress proxy, HttpRequestImpl request,
 135             String[] alpn, boolean isHttp2, HttpClientImpl client)
 136     {
 137         if (proxy != null) {
 138             if (!isHttp2) {
 139                 return new SSLTunnelConnection(addr, client, proxy);
 140             } else {
 141                 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy);
 142             }
 143         } else if (!isHttp2) {
 144             return new SSLConnection(addr, client, alpn);
 145         } else {
 146             return new AsyncSSLConnection(addr, client, alpn);
 147         }
 148     }
 149 
 150     /**
 151      * Main factory method.   Gets a HttpConnection, either cached or new if
 152      * none available.










 153      */
 154     private static HttpConnection getConnectionImpl(InetSocketAddress addr,
 155             HttpClientImpl client,
 156             HttpRequestImpl request, boolean isHttp2)
 157     {
 158         HttpConnection c = null;
 159         InetSocketAddress proxy = request.proxy(client);
 160         if (proxy != null && proxy.isUnresolved()) {
 161             // The default proxy selector may select a proxy whose
 162             // address is unresolved. We must resolve the address
 163             // before using it to connect.
 164             proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
 165         }
 166         boolean secure = request.secure();
 167         ConnectionPool pool = client.connectionPool();
 168         String[] alpn =  null;
 169 
 170         if (secure && isHttp2) {
 171             alpn = new String[2];
 172             alpn[0] = "h2";
 173             alpn[1] = "http/1.1";
 174         }
 175 
 176         if (!secure) {
 177             c = pool.getConnection(false, addr, proxy);
 178             if (c != null) {



 179                 return c;
 180             } else {
 181                 return getPlainConnection(addr, proxy, request, client);
 182             }
 183         } else {
 184             if (!isHttp2) { // if http2 we don't cache connections
 185                 c = pool.getConnection(true, addr, proxy);
 186             }
 187             if (c != null) {



 188                 return c;
 189             } else {
 190                 return getSSLConnection(addr, proxy, request, alpn, isHttp2, client);





 191             }
 192         }
























 193     }
 194 
 195     void returnToCache(HttpHeaders hdrs) {
 196         if (hdrs == null) {
 197             // the connection was closed by server
 198             close();
 199             return;
 200         }
 201         if (!isOpen()) {
 202             return;
 203         }





 204         ConnectionPool pool = client.connectionPool();
 205         boolean keepAlive = hdrs.firstValue("Connection")
 206                 .map((s) -> !s.equalsIgnoreCase("close"))
 207                 .orElse(true);
 208 
 209         if (keepAlive) {

 210             pool.returnToPool(this);
 211         } else {
 212             close();
 213         }
 214     }
 215 
 216     /**
 217      * Also check that the number of bytes written is what was expected. This
 218      * could be different if the buffer is user-supplied and its internal
 219      * pointers were manipulated in a race condition.
 220      */
 221     final void checkWrite(long expected, ByteBuffer buffer) throws IOException {
 222         long written = write(buffer);
 223         if (written != expected) {
 224             throw new IOException("incorrect number of bytes written");
 225         }
 226     }
 227 
 228     final void checkWrite(long expected,
 229                           ByteBuffer[] buffers,
 230                           int start,
 231                           int length)
 232         throws IOException
 233     {
 234         long written = write(buffers, start, length);
 235         if (written != expected) {
 236             throw new IOException("incorrect number of bytes written");
 237         }
 238     }
 239 
 240     abstract SocketChannel channel();
 241 
 242     final InetSocketAddress address() {
 243         return address;
 244     }
 245 
 246     synchronized void configureMode(Mode mode) throws IOException {
 247         this.mode = mode;
 248         if (mode == Mode.BLOCKING) {
 249             channel().configureBlocking(true);
 250         } else {
 251             channel().configureBlocking(false);
 252         }
 253     }
 254 
 255     synchronized Mode getMode() {
 256         return mode;
 257     }

 258 
 259     abstract ConnectionPool.CacheKey cacheKey();




 260 
 261     // overridden in SSL only
 262     SSLParameters sslParameters() {
 263         return null;
 264     }
 265 
 266     // Methods to be implemented for Plain TCP and SSL
 267 






 268     abstract long write(ByteBuffer[] buffers, int start, int number)
 269         throws IOException;










 270 
 271     abstract long write(ByteBuffer buffer) throws IOException;



 272 
 273     // Methods to be implemented for Plain TCP (async mode) and AsyncSSL
 274 
 275     /**
 276      * In {@linkplain Mode#ASYNC async mode}, this method puts buffers at the
 277      * end of the send queue; Otherwise, it is equivalent to {@link
 278      * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}.
 279      * When in async mode, calling this method should later be followed by
 280      * subsequent flushAsync invocation.
 281      * That allows multiple threads to put buffers into the queue while some other
 282      * thread is writing.
 283      */
 284     abstract void writeAsync(ByteBufferReference[] buffers) throws IOException;


























 285 
 286     /**
 287      * In {@linkplain Mode#ASYNC async mode}, this method may put
 288      * buffers at the beginning of send queue, breaking frames sequence and
 289      * allowing to write these buffers before other buffers in the queue;
 290      * Otherwise, it is equivalent to {@link
 291      * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}.
 292      * When in async mode, calling this method should later be followed by
 293      * subsequent flushAsync invocation.
 294      * That allows multiple threads to put buffers into the queue while some other
 295      * thread is writing.
 296      */
 297     abstract void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException;
 298 
 299     /**
 300      * This method should be called after  any writeAsync/writeAsyncUnordered
 301      * invocation.
 302      * If there is a race to flushAsync from several threads one thread
 303      * (race winner) capture flush operation and write the whole queue content.
 304      * Other threads (race losers) exits from the method (not blocking)
 305      * and continue execution.
 306      */
 307     abstract void flushAsync() throws IOException;
 308 
 309     /**
 310      * Closes this connection, by returning the socket to its connection pool.
 311      */
 312     @Override
 313     public abstract void close();







 314 
 315     abstract void shutdownInput() throws IOException;




 316 
 317     abstract void shutdownOutput() throws IOException;










 318 
 319     /**
 320      * Puts position to limit and limit to capacity so we can resume reading
 321      * into this buffer, but if required > 0 then limit may be reduced so that
 322      * no more than required bytes are read next time.
 323      */
 324     static void resumeChannelRead(ByteBuffer buf, int required) {
 325         int limit = buf.limit();
 326         buf.position(limit);
 327         int capacity = buf.capacity() - limit;
 328         if (required > 0 && required < capacity) {
 329             buf.limit(limit + required);
 330         } else {
 331             buf.limit(buf.capacity());
 332         }







 333     }
 334 
 335     final ByteBuffer read() throws IOException {
 336         ByteBuffer b = readImpl();
 337         return b;


 338     }
 339 
 340     /*
 341      * Returns a ByteBuffer with the data available at the moment, or null if
 342      * reached EOF.
 343      */
 344     protected abstract ByteBuffer readImpl() throws IOException;






 345 
 346     @Override
 347     public String toString() {
 348         return "HttpConnection: " + channel().toString();
 349     }
 350 }


   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 

  28 import java.io.Closeable;
  29 import java.io.IOException;
  30 import java.lang.System.Logger.Level;
  31 import java.net.InetSocketAddress;
  32 import java.nio.ByteBuffer;
  33 import java.nio.channels.SocketChannel;
  34 import java.util.IdentityHashMap;
  35 import java.util.List;
  36 import java.util.Map;
  37 import java.util.concurrent.CompletableFuture;
  38 import java.util.concurrent.CompletionStage;
  39 import java.util.concurrent.ConcurrentLinkedDeque;
  40 import java.util.concurrent.Flow;
  41 import jdk.incubator.http.HttpClient.Version;
  42 import jdk.incubator.http.internal.common.Demand;
  43 import jdk.incubator.http.internal.common.FlowTube;
  44 import jdk.incubator.http.internal.common.SequentialScheduler;
  45 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
  46 import jdk.incubator.http.internal.common.Log;
  47 import jdk.incubator.http.internal.common.Utils;
  48 import static jdk.incubator.http.HttpClient.Version.HTTP_2;
  49 
  50 /**
  51  * Wraps socket channel layer and takes care of SSL also.
  52  *
  53  * Subtypes are:
  54  *      PlainHttpConnection: regular direct TCP connection to server
  55  *      PlainProxyConnection: plain text proxy connection
  56  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
  57  *      AsyncSSLConnection: TLS channel direct to server
  58  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
  59  */
  60 abstract class HttpConnection implements Closeable {
  61 
  62     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  63     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
  64     final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger(
  65             () -> "HttpConnection(SocketTube(?))", DEBUG);



  66 
  67     /** The address this connection is connected to. Could be a server or a proxy. */
  68     final InetSocketAddress address;
  69     private final HttpClientImpl client;
  70     private final TrailingOperations trailingOperations;
  71 
  72     HttpConnection(InetSocketAddress address, HttpClientImpl client) {
  73         this.address = address;
  74         this.client = client;
  75         trailingOperations = new TrailingOperations();
  76     }
  77 
  78     private static final class TrailingOperations {
  79         private final Map<CompletionStage<?>, Boolean> operations =
  80                 new IdentityHashMap<>();
  81         void add(CompletionStage<?> cf) {
  82             synchronized(operations) {
  83                 cf.whenComplete((r,t)-> remove(cf));
  84                 operations.put(cf, Boolean.TRUE);
  85             }
  86         }
  87         boolean remove(CompletionStage<?> cf) {
  88             synchronized(operations) {
  89                 return operations.remove(cf);
  90             }
  91         }


  92     }
  93 
  94     final void addTrailingOperation(CompletionStage<?> cf) {
  95         trailingOperations.add(cf);
  96     }


  97 
  98 //    final void removeTrailingOperation(CompletableFuture<?> cf) {
  99 //        trailingOperations.remove(cf);
 100 //    }
 101 
 102     final HttpClientImpl client() {
 103         return client;
 104     }
 105 
 106     //public abstract void connect() throws IOException, InterruptedException;
 107 
 108     public abstract CompletableFuture<Void> connectAsync();
 109 
 110     /** Tells whether, or not, this connection is connected to its destination. */


 111     abstract boolean connected();
 112 
 113     /** Tells whether, or not, this connection is secure ( over SSL ) */
 114     abstract boolean isSecure();
 115 
 116     /** Tells whether, or not, this connection is proxied. */
 117     abstract boolean isProxied();
 118 
 119     /** Tells whether, or not, this connection is open. */




 120     final boolean isOpen() {
 121         return channel().isOpen() &&
 122                 (connected() ? !getConnectionFlow().isFinished() : true);
 123     }
 124 
 125     interface HttpPublisher extends FlowTube.TubePublisher {
 126         void enqueue(List<ByteBuffer> buffers) throws IOException;
 127         void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
 128         void signalEnqueued() throws IOException;











 129     }
 130 
 131     /**
 132      * Returns the HTTP publisher associated with this connection.  May be null
 133      * if invoked before connecting.
 134      */
 135     abstract HttpPublisher publisher();











 136 
 137     /**
 138      * Factory for retrieving HttpConnections. A connection can be retrieved
 139      * from the connection pool, or a new one created if none available.
 140      *
 141      * The given {@code addr} is the ultimate destination. Any proxies,
 142      * etc, are determined from the request. Returns a concrete instance which
 143      * is one of the following:
 144      *      {@link PlainHttpConnection}
 145      *      {@link PlainTunnelingConnection}
 146      *
 147      * The returned connection, if not from the connection pool, must have its,
 148      * connect() or connectAsync() method invoked, which ( when it completes
 149      * successfully ) renders the connection usable for requests.
 150      */
 151     public static HttpConnection getConnection(InetSocketAddress addr,
 152                                                HttpClientImpl client,
 153                                                HttpRequestImpl request,
 154                                                Version version) {
 155         HttpConnection c = null;
 156         InetSocketAddress proxy = request.proxy();
 157         if (proxy != null && proxy.isUnresolved()) {
 158             // The default proxy selector may select a proxy whose  address is
 159             // unresolved. We must resolve the address before connecting to it.

 160             proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
 161         }
 162         boolean secure = request.secure();
 163         ConnectionPool pool = client.connectionPool();







 164 
 165         if (!secure) {
 166             c = pool.getConnection(false, addr, proxy);
 167             if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
 168                 final HttpConnection conn = c;
 169                 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
 170                             + ": plain connection retrieved from HTTP/1.1 pool");
 171                 return c;
 172             } else {
 173                 return getPlainConnection(addr, proxy, request, client);
 174             }
 175         } else {  // secure
 176             if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
 177                 c = pool.getConnection(true, addr, proxy);
 178             }
 179             if (c != null && c.isOpen()) {
 180                 final HttpConnection conn = c;
 181                 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
 182                             + ": SSL connection retrieved from HTTP/1.1 pool");
 183                 return c;
 184             } else {
 185                 String[] alpn = null;
 186                 if (version == HTTP_2) {
 187                     alpn = new String[] { "h2", "http/1.1" };
 188                 }
 189                 return getSSLConnection(addr, proxy, alpn, client);
 190             }
 191         }
 192     }
 193 
 194     private static HttpConnection getSSLConnection(InetSocketAddress addr,
 195                                                    InetSocketAddress proxy,
 196                                                    String[] alpn,
 197                                                    HttpClientImpl client) {
 198         if (proxy != null)
 199             return new AsyncSSLTunnelConnection(addr, client, alpn, proxy);
 200         else
 201             return new AsyncSSLConnection(addr, client, alpn);
 202     }
 203 
 204     /* Returns either a plain HTTP connection or a plain tunnelling connection
 205      * for proxied WebSocket */
 206     private static HttpConnection getPlainConnection(InetSocketAddress addr,
 207                                                      InetSocketAddress proxy,
 208                                                      HttpRequestImpl request,
 209                                                      HttpClientImpl client) {
 210         if (request.isWebSocket() && proxy != null)
 211             return new PlainTunnelingConnection(addr, proxy, client);
 212 
 213         if (proxy == null)
 214             return new PlainHttpConnection(addr, client);
 215         else
 216             return new PlainProxyConnection(proxy, client);
 217     }
 218 
 219     void closeOrReturnToCache(HttpHeaders hdrs) {
 220         if (hdrs == null) {
 221             // the connection was closed by server, eof
 222             close();
 223             return;
 224         }
 225         if (!isOpen()) {
 226             return;
 227         }
 228         HttpClientImpl client = client();
 229         if (client == null) {
 230             close();
 231             return;
 232         }
 233         ConnectionPool pool = client.connectionPool();
 234         boolean keepAlive = hdrs.firstValue("Connection")
 235                 .map((s) -> !s.equalsIgnoreCase("close"))
 236                 .orElse(true);
 237 
 238         if (keepAlive) {
 239             Log.logTrace("Returning connection to the pool: {0}", this);
 240             pool.returnToPool(this);
 241         } else {
 242             close();
 243         }
 244     }
 245 
























 246     abstract SocketChannel channel();
 247 
 248     final InetSocketAddress address() {
 249         return address;
 250     }
 251 
 252     abstract ConnectionPool.CacheKey cacheKey();







 253 
 254 //    // overridden in SSL only
 255 //    SSLParameters sslParameters() {
 256 //        return null;
 257 //    }
 258 
 259     /**
 260      * Closes this connection, by returning the socket to its connection pool.
 261      */
 262     @Override
 263     public abstract void close();
 264 
 265     abstract void shutdownInput() throws IOException;



 266 
 267     abstract void shutdownOutput() throws IOException;
 268 
 269     // Support for WebSocket/RawChannelImpl which unfortunately
 270     // still depends on synchronous read/writes.
 271     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
 272     abstract static class DetachedConnectionChannel implements Closeable {
 273         DetachedConnectionChannel() {}
 274         abstract SocketChannel channel();
 275         abstract long write(ByteBuffer[] buffers, int start, int number)
 276                 throws IOException;
 277         abstract void shutdownInput() throws IOException;
 278         abstract void shutdownOutput() throws IOException;
 279         abstract ByteBuffer read() throws IOException;
 280         @Override
 281         public abstract void close();
 282         @Override
 283         public String toString() {
 284             return this.getClass().getSimpleName() + ": " + channel().toString();
 285         }
 286     }
 287 
 288     // Support for WebSocket/RawChannelImpl which unfortunately
 289     // still depends on synchronous read/writes.
 290     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
 291     abstract DetachedConnectionChannel detachChannel();
 292 
 293     abstract FlowTube getConnectionFlow();
 294 
 295     /**
 296      * A publisher that makes it possible to publish (write)
 297      * ordered (normal priority) and unordered (high priority)
 298      * buffers downstream.




 299      */
 300     final class PlainHttpPublisher implements HttpPublisher {
 301         final Object reading;
 302         PlainHttpPublisher() {
 303             this(new Object());
 304         }
 305         PlainHttpPublisher(Object readingLock) {
 306             this.reading = readingLock;
 307         }
 308         final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
 309         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
 310         volatile HttpWriteSubscription subscription;
 311         final SequentialScheduler writeScheduler =
 312                     new SequentialScheduler(this::flushTask);
 313         @Override
 314         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 315             synchronized (reading) {
 316                 //assert this.subscription == null;
 317                 //assert this.subscriber == null;
 318                 if (subscription == null) {
 319                     subscription = new HttpWriteSubscription();
 320                 }
 321                 this.subscriber = subscriber;
 322             }
 323             // TODO: should we do this in the flow?
 324             subscriber.onSubscribe(subscription);
 325             signal();
 326         }
 327 
 328         void flushTask(DeferredCompleter completer) {
 329             try {
 330                 HttpWriteSubscription sub = subscription;
 331                 if (sub != null) sub.flush();
 332             } finally {
 333                 completer.complete();
 334             }
 335         }




 336 
 337         void signal() {
 338             writeScheduler.runOrSchedule();
 339         }
 340 
 341         final class HttpWriteSubscription implements Flow.Subscription {
 342             final Demand demand = new Demand();



 343 



 344             @Override
 345             public void request(long n) {
 346                 if (n <= 0) throw new IllegalArgumentException("non-positive request");
 347                 demand.increase(n);
 348                 debug.log(Level.DEBUG, () -> "HttpPublisher: got request of "
 349                             + n + " from "
 350                             + getConnectionFlow());
 351                 writeScheduler.runOrSchedule();
 352             }
 353 
 354             @Override
 355             public void cancel() {
 356                 debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by "
 357                           + getConnectionFlow());
 358             }
 359 
 360             void flush() {
 361                 while (!queue.isEmpty() && demand.tryDecrement()) {
 362                     List<ByteBuffer> elem = queue.poll();
 363                     debug.log(Level.DEBUG, () -> "HttpPublisher: sending "
 364                                 + Utils.remaining(elem) + " bytes ("
 365                                 + elem.size() + " buffers) to "
 366                                 + getConnectionFlow());
 367                     subscriber.onNext(elem);
 368                 }
 369             }
 370         }
 371 
 372         @Override
 373         public void enqueue(List<ByteBuffer> buffers) throws IOException {
 374             queue.add(buffers);
 375             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 376             debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes);








 377         }
 378 
 379         @Override
 380         public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
 381             // Unordered frames are sent before existing frames.
 382             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 383             queue.addFirst(buffers);
 384             debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
 385         }
 386 
 387         @Override
 388         public void signalEnqueued() throws IOException {
 389             debug.log(Level.DEBUG, "signalling the publisher of the write queue");
 390             signal();
 391         }
 392     }
 393 
 394     String dbgTag = null;
 395     final String dbgString() {
 396         FlowTube flow = getConnectionFlow();
 397         String tag = dbgTag;
 398         if (tag == null && flow != null) {
 399             dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
 400         } else if (tag == null) {
 401             tag = this.getClass().getSimpleName() + "(?)";
 402         }
 403         return tag;
 404     }
 405 
 406     @Override
 407     public String toString() {
 408         return "HttpConnection: " + channel().toString();
 409     }
 410 }
< prev index next >