8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import javax.net.ssl.SSLParameters; 29 import java.io.Closeable; 30 import java.io.IOException; 31 import java.net.InetSocketAddress; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.SocketChannel; 34 import java.util.concurrent.CompletableFuture; 35 36 import jdk.incubator.http.internal.common.ByteBufferReference; 37 38 /** 39 * Wraps socket channel layer and takes care of SSL also. 40 * 41 * Subtypes are: 42 * PlainHttpConnection: regular direct TCP connection to server 43 * PlainProxyConnection: plain text proxy connection 44 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server 45 * SSLConnection: TLS channel direct to server 46 * SSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel 47 */ 48 abstract class HttpConnection implements Closeable { 49 50 enum Mode { 51 BLOCKING, 52 NON_BLOCKING, 53 ASYNC 54 } 55 56 protected Mode mode; 57 58 // address we are connected to. Could be a server or a proxy 59 final InetSocketAddress address; 60 final HttpClientImpl client; 61 62 HttpConnection(InetSocketAddress address, HttpClientImpl client) { 63 this.address = address; 64 this.client = client; 65 } 66 67 /** 68 * Public API to this class. addr is the ultimate destination. Any proxies 69 * etc are figured out from the request. Returns an instance of one of the 70 * following 71 * PlainHttpConnection 72 * PlainTunnelingConnection 73 * SSLConnection 74 * SSLTunnelConnection 75 * 76 * When object returned, connect() or connectAsync() must be called, which 77 * when it returns/completes, the connection is usable for requests. 78 */ 79 public static HttpConnection getConnection( 80 InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request) 81 { 82 return getConnectionImpl(addr, client, request, false); 83 } 84 85 /** 86 * Called specifically to get an async connection for HTTP/2 over SSL. 87 */ 88 public static HttpConnection getConnection(InetSocketAddress addr, 89 HttpClientImpl client, HttpRequestImpl request, boolean isHttp2) { 90 91 return getConnectionImpl(addr, client, request, isHttp2); 92 } 93 94 public abstract void connect() throws IOException, InterruptedException; 95 96 public abstract CompletableFuture<Void> connectAsync(); 97 98 /** 99 * Returns whether this connection is connected to its destination 100 */ 101 abstract boolean connected(); 102 103 abstract boolean isSecure(); 104 105 abstract boolean isProxied(); 106 107 /** 108 * Completes when the first byte of the response is available to be read. 109 */ 110 abstract CompletableFuture<Void> whenReceivingResponse(); 111 112 final boolean isOpen() { 113 return channel().isOpen(); 114 } 115 116 /* Returns either a plain HTTP connection or a plain tunnelling connection 117 * for proxied WebSocket */ 118 private static HttpConnection getPlainConnection(InetSocketAddress addr, 119 InetSocketAddress proxy, 120 HttpRequestImpl request, 121 HttpClientImpl client) { 122 if (request.isWebSocket() && proxy != null) { 123 return new PlainTunnelingConnection(addr, proxy, client); 124 } else { 125 if (proxy == null) { 126 return new PlainHttpConnection(addr, client); 127 } else { 128 return new PlainProxyConnection(proxy, client); 129 } 130 } 131 } 132 133 private static HttpConnection getSSLConnection(InetSocketAddress addr, 134 InetSocketAddress proxy, HttpRequestImpl request, 135 String[] alpn, boolean isHttp2, HttpClientImpl client) 136 { 137 if (proxy != null) { 138 if (!isHttp2) { 139 return new SSLTunnelConnection(addr, client, proxy); 140 } else { 141 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy); 142 } 143 } else if (!isHttp2) { 144 return new SSLConnection(addr, client, alpn); 145 } else { 146 return new AsyncSSLConnection(addr, client, alpn); 147 } 148 } 149 150 /** 151 * Main factory method. Gets a HttpConnection, either cached or new if 152 * none available. 153 */ 154 private static HttpConnection getConnectionImpl(InetSocketAddress addr, 155 HttpClientImpl client, 156 HttpRequestImpl request, boolean isHttp2) 157 { 158 HttpConnection c = null; 159 InetSocketAddress proxy = request.proxy(client); 160 if (proxy != null && proxy.isUnresolved()) { 161 // The default proxy selector may select a proxy whose 162 // address is unresolved. We must resolve the address 163 // before using it to connect. 164 proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort()); 165 } 166 boolean secure = request.secure(); 167 ConnectionPool pool = client.connectionPool(); 168 String[] alpn = null; 169 170 if (secure && isHttp2) { 171 alpn = new String[2]; 172 alpn[0] = "h2"; 173 alpn[1] = "http/1.1"; 174 } 175 176 if (!secure) { 177 c = pool.getConnection(false, addr, proxy); 178 if (c != null) { 179 return c; 180 } else { 181 return getPlainConnection(addr, proxy, request, client); 182 } 183 } else { 184 if (!isHttp2) { // if http2 we don't cache connections 185 c = pool.getConnection(true, addr, proxy); 186 } 187 if (c != null) { 188 return c; 189 } else { 190 return getSSLConnection(addr, proxy, request, alpn, isHttp2, client); 191 } 192 } 193 } 194 195 void returnToCache(HttpHeaders hdrs) { 196 if (hdrs == null) { 197 // the connection was closed by server 198 close(); 199 return; 200 } 201 if (!isOpen()) { 202 return; 203 } 204 ConnectionPool pool = client.connectionPool(); 205 boolean keepAlive = hdrs.firstValue("Connection") 206 .map((s) -> !s.equalsIgnoreCase("close")) 207 .orElse(true); 208 209 if (keepAlive) { 210 pool.returnToPool(this); 211 } else { 212 close(); 213 } 214 } 215 216 /** 217 * Also check that the number of bytes written is what was expected. This 218 * could be different if the buffer is user-supplied and its internal 219 * pointers were manipulated in a race condition. 220 */ 221 final void checkWrite(long expected, ByteBuffer buffer) throws IOException { 222 long written = write(buffer); 223 if (written != expected) { 224 throw new IOException("incorrect number of bytes written"); 225 } 226 } 227 228 final void checkWrite(long expected, 229 ByteBuffer[] buffers, 230 int start, 231 int length) 232 throws IOException 233 { 234 long written = write(buffers, start, length); 235 if (written != expected) { 236 throw new IOException("incorrect number of bytes written"); 237 } 238 } 239 240 abstract SocketChannel channel(); 241 242 final InetSocketAddress address() { 243 return address; 244 } 245 246 synchronized void configureMode(Mode mode) throws IOException { 247 this.mode = mode; 248 if (mode == Mode.BLOCKING) { 249 channel().configureBlocking(true); 250 } else { 251 channel().configureBlocking(false); 252 } 253 } 254 255 synchronized Mode getMode() { 256 return mode; 257 } 258 259 abstract ConnectionPool.CacheKey cacheKey(); 260 261 // overridden in SSL only 262 SSLParameters sslParameters() { 263 return null; 264 } 265 266 // Methods to be implemented for Plain TCP and SSL 267 268 abstract long write(ByteBuffer[] buffers, int start, int number) 269 throws IOException; 270 271 abstract long write(ByteBuffer buffer) throws IOException; 272 273 // Methods to be implemented for Plain TCP (async mode) and AsyncSSL 274 275 /** 276 * In {@linkplain Mode#ASYNC async mode}, this method puts buffers at the 277 * end of the send queue; Otherwise, it is equivalent to {@link 278 * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}. 279 * When in async mode, calling this method should later be followed by 280 * subsequent flushAsync invocation. 281 * That allows multiple threads to put buffers into the queue while some other 282 * thread is writing. 283 */ 284 abstract void writeAsync(ByteBufferReference[] buffers) throws IOException; 285 286 /** 287 * In {@linkplain Mode#ASYNC async mode}, this method may put 288 * buffers at the beginning of send queue, breaking frames sequence and 289 * allowing to write these buffers before other buffers in the queue; 290 * Otherwise, it is equivalent to {@link 291 * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}. 292 * When in async mode, calling this method should later be followed by 293 * subsequent flushAsync invocation. 294 * That allows multiple threads to put buffers into the queue while some other 295 * thread is writing. 296 */ 297 abstract void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException; 298 299 /** 300 * This method should be called after any writeAsync/writeAsyncUnordered 301 * invocation. 302 * If there is a race to flushAsync from several threads one thread 303 * (race winner) capture flush operation and write the whole queue content. 304 * Other threads (race losers) exits from the method (not blocking) 305 * and continue execution. 306 */ 307 abstract void flushAsync() throws IOException; 308 309 /** 310 * Closes this connection, by returning the socket to its connection pool. 311 */ 312 @Override 313 public abstract void close(); 314 315 abstract void shutdownInput() throws IOException; 316 317 abstract void shutdownOutput() throws IOException; 318 319 /** 320 * Puts position to limit and limit to capacity so we can resume reading 321 * into this buffer, but if required > 0 then limit may be reduced so that 322 * no more than required bytes are read next time. 323 */ 324 static void resumeChannelRead(ByteBuffer buf, int required) { 325 int limit = buf.limit(); 326 buf.position(limit); 327 int capacity = buf.capacity() - limit; 328 if (required > 0 && required < capacity) { 329 buf.limit(limit + required); 330 } else { 331 buf.limit(buf.capacity()); 332 } 333 } 334 335 final ByteBuffer read() throws IOException { 336 ByteBuffer b = readImpl(); 337 return b; 338 } 339 340 /* 341 * Returns a ByteBuffer with the data available at the moment, or null if 342 * reached EOF. 343 */ 344 protected abstract ByteBuffer readImpl() throws IOException; 345 346 @Override 347 public String toString() { 348 return "HttpConnection: " + channel().toString(); 349 } 350 } | 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.Closeable; 29 import java.io.IOException; 30 import java.lang.System.Logger.Level; 31 import java.net.InetSocketAddress; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.SocketChannel; 34 import java.util.IdentityHashMap; 35 import java.util.List; 36 import java.util.Map; 37 import java.util.concurrent.CompletableFuture; 38 import java.util.concurrent.CompletionStage; 39 import java.util.concurrent.ConcurrentLinkedDeque; 40 import java.util.concurrent.Flow; 41 import jdk.incubator.http.HttpClient.Version; 42 import jdk.incubator.http.internal.common.Demand; 43 import jdk.incubator.http.internal.common.FlowTube; 44 import jdk.incubator.http.internal.common.SequentialScheduler; 45 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter; 46 import jdk.incubator.http.internal.common.Log; 47 import jdk.incubator.http.internal.common.Utils; 48 import static jdk.incubator.http.HttpClient.Version.HTTP_2; 49 50 /** 51 * Wraps socket channel layer and takes care of SSL also. 52 * 53 * Subtypes are: 54 * PlainHttpConnection: regular direct TCP connection to server 55 * PlainProxyConnection: plain text proxy connection 56 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server 57 * AsyncSSLConnection: TLS channel direct to server 58 * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel 59 */ 60 abstract class HttpConnection implements Closeable { 61 62 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 63 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 64 final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger( 65 () -> "HttpConnection(SocketTube(?))", DEBUG); 66 67 /** The address this connection is connected to. Could be a server or a proxy. */ 68 final InetSocketAddress address; 69 private final HttpClientImpl client; 70 private final TrailingOperations trailingOperations; 71 72 HttpConnection(InetSocketAddress address, HttpClientImpl client) { 73 this.address = address; 74 this.client = client; 75 trailingOperations = new TrailingOperations(); 76 } 77 78 private static final class TrailingOperations { 79 private final Map<CompletionStage<?>, Boolean> operations = 80 new IdentityHashMap<>(); 81 void add(CompletionStage<?> cf) { 82 synchronized(operations) { 83 cf.whenComplete((r,t)-> remove(cf)); 84 operations.put(cf, Boolean.TRUE); 85 } 86 } 87 boolean remove(CompletionStage<?> cf) { 88 synchronized(operations) { 89 return operations.remove(cf); 90 } 91 } 92 } 93 94 final void addTrailingOperation(CompletionStage<?> cf) { 95 trailingOperations.add(cf); 96 } 97 98 // final void removeTrailingOperation(CompletableFuture<?> cf) { 99 // trailingOperations.remove(cf); 100 // } 101 102 final HttpClientImpl client() { 103 return client; 104 } 105 106 //public abstract void connect() throws IOException, InterruptedException; 107 108 public abstract CompletableFuture<Void> connectAsync(); 109 110 /** Tells whether, or not, this connection is connected to its destination. */ 111 abstract boolean connected(); 112 113 /** Tells whether, or not, this connection is secure ( over SSL ) */ 114 abstract boolean isSecure(); 115 116 /** Tells whether, or not, this connection is proxied. */ 117 abstract boolean isProxied(); 118 119 /** Tells whether, or not, this connection is open. */ 120 final boolean isOpen() { 121 return channel().isOpen() && 122 (connected() ? !getConnectionFlow().isFinished() : true); 123 } 124 125 interface HttpPublisher extends FlowTube.TubePublisher { 126 void enqueue(List<ByteBuffer> buffers) throws IOException; 127 void enqueueUnordered(List<ByteBuffer> buffers) throws IOException; 128 void signalEnqueued() throws IOException; 129 } 130 131 /** 132 * Returns the HTTP publisher associated with this connection. May be null 133 * if invoked before connecting. 134 */ 135 abstract HttpPublisher publisher(); 136 137 /** 138 * Factory for retrieving HttpConnections. A connection can be retrieved 139 * from the connection pool, or a new one created if none available. 140 * 141 * The given {@code addr} is the ultimate destination. Any proxies, 142 * etc, are determined from the request. Returns a concrete instance which 143 * is one of the following: 144 * {@link PlainHttpConnection} 145 * {@link PlainTunnelingConnection} 146 * 147 * The returned connection, if not from the connection pool, must have its, 148 * connect() or connectAsync() method invoked, which ( when it completes 149 * successfully ) renders the connection usable for requests. 150 */ 151 public static HttpConnection getConnection(InetSocketAddress addr, 152 HttpClientImpl client, 153 HttpRequestImpl request, 154 Version version) { 155 HttpConnection c = null; 156 InetSocketAddress proxy = request.proxy(); 157 if (proxy != null && proxy.isUnresolved()) { 158 // The default proxy selector may select a proxy whose address is 159 // unresolved. We must resolve the address before connecting to it. 160 proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort()); 161 } 162 boolean secure = request.secure(); 163 ConnectionPool pool = client.connectionPool(); 164 165 if (!secure) { 166 c = pool.getConnection(false, addr, proxy); 167 if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { 168 final HttpConnection conn = c; 169 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() 170 + ": plain connection retrieved from HTTP/1.1 pool"); 171 return c; 172 } else { 173 return getPlainConnection(addr, proxy, request, client); 174 } 175 } else { // secure 176 if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool 177 c = pool.getConnection(true, addr, proxy); 178 } 179 if (c != null && c.isOpen()) { 180 final HttpConnection conn = c; 181 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() 182 + ": SSL connection retrieved from HTTP/1.1 pool"); 183 return c; 184 } else { 185 String[] alpn = null; 186 if (version == HTTP_2) { 187 alpn = new String[] { "h2", "http/1.1" }; 188 } 189 return getSSLConnection(addr, proxy, alpn, client); 190 } 191 } 192 } 193 194 private static HttpConnection getSSLConnection(InetSocketAddress addr, 195 InetSocketAddress proxy, 196 String[] alpn, 197 HttpClientImpl client) { 198 if (proxy != null) 199 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy); 200 else 201 return new AsyncSSLConnection(addr, client, alpn); 202 } 203 204 /* Returns either a plain HTTP connection or a plain tunnelling connection 205 * for proxied WebSocket */ 206 private static HttpConnection getPlainConnection(InetSocketAddress addr, 207 InetSocketAddress proxy, 208 HttpRequestImpl request, 209 HttpClientImpl client) { 210 if (request.isWebSocket() && proxy != null) 211 return new PlainTunnelingConnection(addr, proxy, client); 212 213 if (proxy == null) 214 return new PlainHttpConnection(addr, client); 215 else 216 return new PlainProxyConnection(proxy, client); 217 } 218 219 void closeOrReturnToCache(HttpHeaders hdrs) { 220 if (hdrs == null) { 221 // the connection was closed by server, eof 222 close(); 223 return; 224 } 225 if (!isOpen()) { 226 return; 227 } 228 HttpClientImpl client = client(); 229 if (client == null) { 230 close(); 231 return; 232 } 233 ConnectionPool pool = client.connectionPool(); 234 boolean keepAlive = hdrs.firstValue("Connection") 235 .map((s) -> !s.equalsIgnoreCase("close")) 236 .orElse(true); 237 238 if (keepAlive) { 239 Log.logTrace("Returning connection to the pool: {0}", this); 240 pool.returnToPool(this); 241 } else { 242 close(); 243 } 244 } 245 246 abstract SocketChannel channel(); 247 248 final InetSocketAddress address() { 249 return address; 250 } 251 252 abstract ConnectionPool.CacheKey cacheKey(); 253 254 // // overridden in SSL only 255 // SSLParameters sslParameters() { 256 // return null; 257 // } 258 259 /** 260 * Closes this connection, by returning the socket to its connection pool. 261 */ 262 @Override 263 public abstract void close(); 264 265 abstract void shutdownInput() throws IOException; 266 267 abstract void shutdownOutput() throws IOException; 268 269 // Support for WebSocket/RawChannelImpl which unfortunately 270 // still depends on synchronous read/writes. 271 // It should be removed when RawChannelImpl moves to using asynchronous APIs. 272 abstract static class DetachedConnectionChannel implements Closeable { 273 DetachedConnectionChannel() {} 274 abstract SocketChannel channel(); 275 abstract long write(ByteBuffer[] buffers, int start, int number) 276 throws IOException; 277 abstract void shutdownInput() throws IOException; 278 abstract void shutdownOutput() throws IOException; 279 abstract ByteBuffer read() throws IOException; 280 @Override 281 public abstract void close(); 282 @Override 283 public String toString() { 284 return this.getClass().getSimpleName() + ": " + channel().toString(); 285 } 286 } 287 288 // Support for WebSocket/RawChannelImpl which unfortunately 289 // still depends on synchronous read/writes. 290 // It should be removed when RawChannelImpl moves to using asynchronous APIs. 291 abstract DetachedConnectionChannel detachChannel(); 292 293 abstract FlowTube getConnectionFlow(); 294 295 /** 296 * A publisher that makes it possible to publish (write) 297 * ordered (normal priority) and unordered (high priority) 298 * buffers downstream. 299 */ 300 final class PlainHttpPublisher implements HttpPublisher { 301 final Object reading; 302 PlainHttpPublisher() { 303 this(new Object()); 304 } 305 PlainHttpPublisher(Object readingLock) { 306 this.reading = readingLock; 307 } 308 final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>(); 309 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 310 volatile HttpWriteSubscription subscription; 311 final SequentialScheduler writeScheduler = 312 new SequentialScheduler(this::flushTask); 313 @Override 314 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 315 synchronized (reading) { 316 //assert this.subscription == null; 317 //assert this.subscriber == null; 318 if (subscription == null) { 319 subscription = new HttpWriteSubscription(); 320 } 321 this.subscriber = subscriber; 322 } 323 // TODO: should we do this in the flow? 324 subscriber.onSubscribe(subscription); 325 signal(); 326 } 327 328 void flushTask(DeferredCompleter completer) { 329 try { 330 HttpWriteSubscription sub = subscription; 331 if (sub != null) sub.flush(); 332 } finally { 333 completer.complete(); 334 } 335 } 336 337 void signal() { 338 writeScheduler.runOrSchedule(); 339 } 340 341 final class HttpWriteSubscription implements Flow.Subscription { 342 final Demand demand = new Demand(); 343 344 @Override 345 public void request(long n) { 346 if (n <= 0) throw new IllegalArgumentException("non-positive request"); 347 demand.increase(n); 348 debug.log(Level.DEBUG, () -> "HttpPublisher: got request of " 349 + n + " from " 350 + getConnectionFlow()); 351 writeScheduler.runOrSchedule(); 352 } 353 354 @Override 355 public void cancel() { 356 debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by " 357 + getConnectionFlow()); 358 } 359 360 void flush() { 361 while (!queue.isEmpty() && demand.tryDecrement()) { 362 List<ByteBuffer> elem = queue.poll(); 363 debug.log(Level.DEBUG, () -> "HttpPublisher: sending " 364 + Utils.remaining(elem) + " bytes (" 365 + elem.size() + " buffers) to " 366 + getConnectionFlow()); 367 subscriber.onNext(elem); 368 } 369 } 370 } 371 372 @Override 373 public void enqueue(List<ByteBuffer> buffers) throws IOException { 374 queue.add(buffers); 375 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 376 debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes); 377 } 378 379 @Override 380 public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException { 381 // Unordered frames are sent before existing frames. 382 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 383 queue.addFirst(buffers); 384 debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes); 385 } 386 387 @Override 388 public void signalEnqueued() throws IOException { 389 debug.log(Level.DEBUG, "signalling the publisher of the write queue"); 390 signal(); 391 } 392 } 393 394 String dbgTag = null; 395 final String dbgString() { 396 FlowTube flow = getConnectionFlow(); 397 String tag = dbgTag; 398 if (tag == null && flow != null) { 399 dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")"; 400 } else if (tag == null) { 401 tag = this.getClass().getSimpleName() + "(?)"; 402 } 403 return tag; 404 } 405 406 @Override 407 public String toString() { 408 return "HttpConnection: " + channel().toString(); 409 } 410 } |