1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.Closeable; 29 import java.io.IOException; 30 import java.lang.System.Logger.Level; 31 import java.net.InetSocketAddress; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.SocketChannel; 34 import java.util.IdentityHashMap; 35 import java.util.List; 36 import java.util.Map; 37 import java.util.concurrent.CompletableFuture; 38 import java.util.concurrent.CompletionStage; 39 import java.util.concurrent.ConcurrentLinkedDeque; 40 import java.util.concurrent.Flow; 41 import jdk.incubator.http.HttpClient.Version; 42 import jdk.incubator.http.internal.common.Demand; 43 import jdk.incubator.http.internal.common.FlowTube; 44 import jdk.incubator.http.internal.common.SequentialScheduler; 45 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter; 46 import jdk.incubator.http.internal.common.Log; 47 import jdk.incubator.http.internal.common.Utils; 48 import static jdk.incubator.http.HttpClient.Version.HTTP_2; 49 50 /** 51 * Wraps socket channel layer and takes care of SSL also. 52 * 53 * Subtypes are: 54 * PlainHttpConnection: regular direct TCP connection to server 55 * PlainProxyConnection: plain text proxy connection 56 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server 57 * AsyncSSLConnection: TLS channel direct to server 58 * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel 59 */ 60 abstract class HttpConnection implements Closeable { 61 62 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 63 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 64 final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger( 65 () -> "HttpConnection(SocketTube(?))", DEBUG); 66 67 /** The address this connection is connected to. Could be a server or a proxy. */ 68 final InetSocketAddress address; 69 private final HttpClientImpl client; 70 private final TrailingOperations trailingOperations; 71 72 HttpConnection(InetSocketAddress address, HttpClientImpl client) { 73 this.address = address; 74 this.client = client; 75 trailingOperations = new TrailingOperations(); 76 } 77 78 private static final class TrailingOperations { 79 private final Map<CompletionStage<?>, Boolean> operations = 80 new IdentityHashMap<>(); 81 void add(CompletionStage<?> cf) { 82 synchronized(operations) { 83 cf.whenComplete((r,t)-> remove(cf)); 84 operations.put(cf, Boolean.TRUE); 85 } 86 } 87 boolean remove(CompletionStage<?> cf) { 88 synchronized(operations) { 89 return operations.remove(cf); 90 } 91 } 92 } 93 94 final void addTrailingOperation(CompletionStage<?> cf) { 95 trailingOperations.add(cf); 96 } 97 98 // final void removeTrailingOperation(CompletableFuture<?> cf) { 99 // trailingOperations.remove(cf); 100 // } 101 102 final HttpClientImpl client() { 103 return client; 104 } 105 106 //public abstract void connect() throws IOException, InterruptedException; 107 108 public abstract CompletableFuture<Void> connectAsync(); 109 110 /** Tells whether, or not, this connection is connected to its destination. */ 111 abstract boolean connected(); 112 113 /** Tells whether, or not, this connection is secure ( over SSL ) */ 114 abstract boolean isSecure(); 115 116 /** Tells whether, or not, this connection is proxied. */ 117 abstract boolean isProxied(); 118 119 /** Tells whether, or not, this connection is open. */ 120 final boolean isOpen() { 121 return channel().isOpen() && 122 (connected() ? !getConnectionFlow().isFinished() : true); 123 } 124 125 interface HttpPublisher extends FlowTube.TubePublisher { 126 void enqueue(List<ByteBuffer> buffers) throws IOException; 127 void enqueueUnordered(List<ByteBuffer> buffers) throws IOException; 128 void signalEnqueued() throws IOException; 129 } 130 131 /** 132 * Returns the HTTP publisher associated with this connection. May be null 133 * if invoked before connecting. 134 */ 135 abstract HttpPublisher publisher(); 136 137 /** 138 * Factory for retrieving HttpConnections. A connection can be retrieved 139 * from the connection pool, or a new one created if none available. 140 * 141 * The given {@code addr} is the ultimate destination. Any proxies, 142 * etc, are determined from the request. Returns a concrete instance which 143 * is one of the following: 144 * {@link PlainHttpConnection} 145 * {@link PlainTunnelingConnection} 146 * 147 * The returned connection, if not from the connection pool, must have its, 148 * connect() or connectAsync() method invoked, which ( when it completes 149 * successfully ) renders the connection usable for requests. 150 */ 151 public static HttpConnection getConnection(InetSocketAddress addr, 152 HttpClientImpl client, 153 HttpRequestImpl request, 154 Version version) { 155 HttpConnection c = null; 156 InetSocketAddress proxy = request.proxy(); 157 if (proxy != null && proxy.isUnresolved()) { 158 // The default proxy selector may select a proxy whose address is 159 // unresolved. We must resolve the address before connecting to it. 160 proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort()); 161 } 162 boolean secure = request.secure(); 163 ConnectionPool pool = client.connectionPool(); 164 165 if (!secure) { 166 c = pool.getConnection(false, addr, proxy); 167 if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { 168 final HttpConnection conn = c; 169 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() 170 + ": plain connection retrieved from HTTP/1.1 pool"); 171 return c; 172 } else { 173 return getPlainConnection(addr, proxy, request, client); 174 } 175 } else { // secure 176 if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool 177 c = pool.getConnection(true, addr, proxy); 178 } 179 if (c != null && c.isOpen()) { 180 final HttpConnection conn = c; 181 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() 182 + ": SSL connection retrieved from HTTP/1.1 pool"); 183 return c; 184 } else { 185 String[] alpn = null; 186 if (version == HTTP_2) { 187 alpn = new String[] { "h2", "http/1.1" }; 188 } 189 return getSSLConnection(addr, proxy, alpn, client); 190 } 191 } 192 } 193 194 private static HttpConnection getSSLConnection(InetSocketAddress addr, 195 InetSocketAddress proxy, 196 String[] alpn, 197 HttpClientImpl client) { 198 if (proxy != null) 199 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy); 200 else 201 return new AsyncSSLConnection(addr, client, alpn); 202 } 203 204 /* Returns either a plain HTTP connection or a plain tunnelling connection 205 * for proxied WebSocket */ 206 private static HttpConnection getPlainConnection(InetSocketAddress addr, 207 InetSocketAddress proxy, 208 HttpRequestImpl request, 209 HttpClientImpl client) { 210 if (request.isWebSocket() && proxy != null) 211 return new PlainTunnelingConnection(addr, proxy, client); 212 213 if (proxy == null) 214 return new PlainHttpConnection(addr, client); 215 else 216 return new PlainProxyConnection(proxy, client); 217 } 218 219 void closeOrReturnToCache(HttpHeaders hdrs) { 220 if (hdrs == null) { 221 // the connection was closed by server, eof 222 close(); 223 return; 224 } 225 if (!isOpen()) { 226 return; 227 } 228 HttpClientImpl client = client(); 229 if (client == null) { 230 close(); 231 return; 232 } 233 ConnectionPool pool = client.connectionPool(); 234 boolean keepAlive = hdrs.firstValue("Connection") 235 .map((s) -> !s.equalsIgnoreCase("close")) 236 .orElse(true); 237 238 if (keepAlive) { 239 Log.logTrace("Returning connection to the pool: {0}", this); 240 pool.returnToPool(this); 241 } else { 242 close(); 243 } 244 } 245 246 abstract SocketChannel channel(); 247 248 final InetSocketAddress address() { 249 return address; 250 } 251 252 abstract ConnectionPool.CacheKey cacheKey(); 253 254 // // overridden in SSL only 255 // SSLParameters sslParameters() { 256 // return null; 257 // } 258 259 /** 260 * Closes this connection, by returning the socket to its connection pool. 261 */ 262 @Override 263 public abstract void close(); 264 265 abstract void shutdownInput() throws IOException; 266 267 abstract void shutdownOutput() throws IOException; 268 269 // Support for WebSocket/RawChannelImpl which unfortunately 270 // still depends on synchronous read/writes. 271 // It should be removed when RawChannelImpl moves to using asynchronous APIs. 272 abstract static class DetachedConnectionChannel implements Closeable { 273 DetachedConnectionChannel() {} 274 abstract SocketChannel channel(); 275 abstract long write(ByteBuffer[] buffers, int start, int number) 276 throws IOException; 277 abstract void shutdownInput() throws IOException; 278 abstract void shutdownOutput() throws IOException; 279 abstract ByteBuffer read() throws IOException; 280 @Override 281 public abstract void close(); 282 @Override 283 public String toString() { 284 return this.getClass().getSimpleName() + ": " + channel().toString(); 285 } 286 } 287 288 // Support for WebSocket/RawChannelImpl which unfortunately 289 // still depends on synchronous read/writes. 290 // It should be removed when RawChannelImpl moves to using asynchronous APIs. 291 abstract DetachedConnectionChannel detachChannel(); 292 293 abstract FlowTube getConnectionFlow(); 294 295 /** 296 * A publisher that makes it possible to publish (write) 297 * ordered (normal priority) and unordered (high priority) 298 * buffers downstream. 299 */ 300 final class PlainHttpPublisher implements HttpPublisher { 301 final Object reading; 302 PlainHttpPublisher() { 303 this(new Object()); 304 } 305 PlainHttpPublisher(Object readingLock) { 306 this.reading = readingLock; 307 } 308 final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>(); 309 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 310 volatile HttpWriteSubscription subscription; 311 final SequentialScheduler writeScheduler = 312 new SequentialScheduler(this::flushTask); 313 @Override 314 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 315 synchronized (reading) { 316 //assert this.subscription == null; 317 //assert this.subscriber == null; 318 if (subscription == null) { 319 subscription = new HttpWriteSubscription(); 320 } 321 this.subscriber = subscriber; 322 } 323 // TODO: should we do this in the flow? 324 subscriber.onSubscribe(subscription); 325 signal(); 326 } 327 328 void flushTask(DeferredCompleter completer) { 329 try { 330 HttpWriteSubscription sub = subscription; 331 if (sub != null) sub.flush(); 332 } finally { 333 completer.complete(); 334 } 335 } 336 337 void signal() { 338 writeScheduler.runOrSchedule(); 339 } 340 341 final class HttpWriteSubscription implements Flow.Subscription { 342 final Demand demand = new Demand(); 343 344 @Override 345 public void request(long n) { 346 if (n <= 0) throw new IllegalArgumentException("non-positive request"); 347 demand.increase(n); 348 debug.log(Level.DEBUG, () -> "HttpPublisher: got request of " 349 + n + " from " 350 + getConnectionFlow()); 351 writeScheduler.runOrSchedule(); 352 } 353 354 @Override 355 public void cancel() { 356 debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by " 357 + getConnectionFlow()); 358 } 359 360 void flush() { 361 while (!queue.isEmpty() && demand.tryDecrement()) { 362 List<ByteBuffer> elem = queue.poll(); 363 debug.log(Level.DEBUG, () -> "HttpPublisher: sending " 364 + Utils.remaining(elem) + " bytes (" 365 + elem.size() + " buffers) to " 366 + getConnectionFlow()); 367 subscriber.onNext(elem); 368 } 369 } 370 } 371 372 @Override 373 public void enqueue(List<ByteBuffer> buffers) throws IOException { 374 queue.add(buffers); 375 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 376 debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes); 377 } 378 379 @Override 380 public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException { 381 // Unordered frames are sent before existing frames. 382 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 383 queue.addFirst(buffers); 384 debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes); 385 } 386 387 @Override 388 public void signalEnqueued() throws IOException { 389 debug.log(Level.DEBUG, "signalling the publisher of the write queue"); 390 signal(); 391 } 392 } 393 394 String dbgTag = null; 395 final String dbgString() { 396 FlowTube flow = getConnectionFlow(); 397 String tag = dbgTag; 398 if (tag == null && flow != null) { 399 dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")"; 400 } else if (tag == null) { 401 tag = this.getClass().getSimpleName() + "(?)"; 402 } 403 return tag; 404 } 405 406 @Override 407 public String toString() { 408 return "HttpConnection: " + channel().toString(); 409 } 410 }