1 /* 2 * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.Closeable; 29 import java.io.IOException; 30 import java.net.InetSocketAddress; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.SocketChannel; 33 import java.util.Arrays; 34 import java.util.IdentityHashMap; 35 import java.util.List; 36 import java.util.Map; 37 import java.util.TreeMap; 38 import java.util.concurrent.CompletableFuture; 39 import java.util.concurrent.CompletionStage; 40 import java.util.concurrent.ConcurrentLinkedDeque; 41 import java.util.concurrent.Flow; 42 import java.util.function.BiPredicate; 43 import java.util.function.Predicate; 44 import java.net.http.HttpClient; 45 import java.net.http.HttpClient.Version; 46 import java.net.http.HttpHeaders; 47 import jdk.internal.net.http.common.Demand; 48 import jdk.internal.net.http.common.FlowTube; 49 import jdk.internal.net.http.common.Logger; 50 import jdk.internal.net.http.common.SequentialScheduler; 51 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; 52 import jdk.internal.net.http.common.Log; 53 import jdk.internal.net.http.common.Utils; 54 import static java.net.http.HttpClient.Version.HTTP_2; 55 56 /** 57 * Wraps socket channel layer and takes care of SSL also. 58 * 59 * Subtypes are: 60 * PlainHttpConnection: regular direct TCP connection to server 61 * PlainProxyConnection: plain text proxy connection 62 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server 63 * AsyncSSLConnection: TLS channel direct to server 64 * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel 65 */ 66 abstract class HttpConnection implements Closeable { 67 68 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 69 final static Logger DEBUG_LOGGER = Utils.getDebugLogger( 70 () -> "HttpConnection(SocketTube(?))", Utils.DEBUG); 71 72 /** The address this connection is connected to. Could be a server or a proxy. */ 73 final InetSocketAddress address; 74 private final HttpClientImpl client; 75 private final TrailingOperations trailingOperations; 76 77 HttpConnection(InetSocketAddress address, HttpClientImpl client) { 78 this.address = address; 79 this.client = client; 80 trailingOperations = new TrailingOperations(); 81 } 82 83 private static final class TrailingOperations { 84 private final Map<CompletionStage<?>, Boolean> operations = 85 new IdentityHashMap<>(); 86 void add(CompletionStage<?> cf) { 87 synchronized(operations) { 88 operations.put(cf, Boolean.TRUE); 89 cf.whenComplete((r,t)-> remove(cf)); 90 } 91 } 92 boolean remove(CompletionStage<?> cf) { 93 synchronized(operations) { 94 return operations.remove(cf); 95 } 96 } 97 } 98 99 final void addTrailingOperation(CompletionStage<?> cf) { 100 trailingOperations.add(cf); 101 } 102 103 // final void removeTrailingOperation(CompletableFuture<?> cf) { 104 // trailingOperations.remove(cf); 105 // } 106 107 final HttpClientImpl client() { 108 return client; 109 } 110 111 /** 112 * Initiates the connect phase. 113 * 114 * Returns a CompletableFuture that completes when the underlying 115 * TCP connection has been established or an error occurs. 116 */ 117 public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange); 118 119 /** 120 * Finishes the connection phase. 121 * 122 * Returns a CompletableFuture that completes when any additional, 123 * type specific, setup has been done. Must be called after connectAsync. */ 124 public abstract CompletableFuture<Void> finishConnect(); 125 126 /** Tells whether, or not, this connection is connected to its destination. */ 127 abstract boolean connected(); 128 129 /** Tells whether, or not, this connection is secure ( over SSL ) */ 130 abstract boolean isSecure(); 131 132 /** 133 * Tells whether, or not, this connection is proxied. 134 * Returns true for tunnel connections, or clear connection to 135 * any host through proxy. 136 */ 137 abstract boolean isProxied(); 138 139 /** Tells whether, or not, this connection is open. */ 140 final boolean isOpen() { 141 return channel().isOpen() && 142 (connected() ? !getConnectionFlow().isFinished() : true); 143 } 144 145 interface HttpPublisher extends FlowTube.TubePublisher { 146 void enqueue(List<ByteBuffer> buffers) throws IOException; 147 void enqueueUnordered(List<ByteBuffer> buffers) throws IOException; 148 void signalEnqueued() throws IOException; 149 } 150 151 /** 152 * Returns the HTTP publisher associated with this connection. May be null 153 * if invoked before connecting. 154 */ 155 abstract HttpPublisher publisher(); 156 157 // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS 158 private static final Predicate<String> testRequiredHTTP2TLSVersion = proto -> 159 proto.equals("TLSv1.2") || proto.equals("TLSv1.3"); 160 161 /** 162 * Returns true if the given client's SSL parameter protocols contains at 163 * least one TLS version that HTTP/2 requires. 164 */ 165 private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) { 166 String[] protos = client.sslParameters().getProtocols(); 167 if (protos != null) { 168 return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent(); 169 } else { 170 return false; 171 } 172 } 173 174 /** 175 * Factory for retrieving HttpConnections. A connection can be retrieved 176 * from the connection pool, or a new one created if none available. 177 * 178 * The given {@code addr} is the ultimate destination. Any proxies, 179 * etc, are determined from the request. Returns a concrete instance which 180 * is one of the following: 181 * {@link PlainHttpConnection} 182 * {@link PlainTunnelingConnection} 183 * 184 * The returned connection, if not from the connection pool, must have its, 185 * connect() or connectAsync() method invoked, which ( when it completes 186 * successfully ) renders the connection usable for requests. 187 */ 188 public static HttpConnection getConnection(InetSocketAddress addr, 189 HttpClientImpl client, 190 HttpRequestImpl request, 191 Version version) { 192 // The default proxy selector may select a proxy whose address is 193 // unresolved. We must resolve the address before connecting to it. 194 InetSocketAddress proxy = Utils.resolveAddress(request.proxy()); 195 HttpConnection c = null; 196 boolean secure = request.secure(); 197 ConnectionPool pool = client.connectionPool(); 198 199 if (!secure) { 200 c = pool.getConnection(false, addr, proxy); 201 if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { 202 final HttpConnection conn = c; 203 if (DEBUG_LOGGER.on()) 204 DEBUG_LOGGER.log(conn.getConnectionFlow() 205 + ": plain connection retrieved from HTTP/1.1 pool"); 206 return c; 207 } else { 208 return getPlainConnection(addr, proxy, request, client); 209 } 210 } else { // secure 211 if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool 212 c = pool.getConnection(true, addr, proxy); 213 } 214 if (c != null && c.isOpen()) { 215 final HttpConnection conn = c; 216 if (DEBUG_LOGGER.on()) 217 DEBUG_LOGGER.log(conn.getConnectionFlow() 218 + ": SSL connection retrieved from HTTP/1.1 pool"); 219 return c; 220 } else { 221 String[] alpn = null; 222 if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) { 223 alpn = new String[] { "h2", "http/1.1" }; 224 } 225 return getSSLConnection(addr, proxy, alpn, request, client); 226 } 227 } 228 } 229 230 private static HttpConnection getSSLConnection(InetSocketAddress addr, 231 InetSocketAddress proxy, 232 String[] alpn, 233 HttpRequestImpl request, 234 HttpClientImpl client) { 235 if (proxy != null) 236 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy, 237 proxyTunnelHeaders(request)); 238 else 239 return new AsyncSSLConnection(addr, client, alpn); 240 } 241 242 /** 243 * This method is used to build a filter that will accept or 244 * veto (header-name, value) tuple for transmission on the 245 * wire. 246 * The filter is applied to the headers when sending the headers 247 * to the remote party. 248 * Which tuple is accepted/vetoed depends on: 249 * <pre> 250 * - whether the connection is a tunnel connection 251 * [talking to a server through a proxy tunnel] 252 * - whether the method is CONNECT 253 * [establishing a CONNECT tunnel through a proxy] 254 * - whether the request is using a proxy 255 * (and the connection is not a tunnel) 256 * [talking to a server through a proxy] 257 * - whether the request is a direct connection to 258 * a server (no tunnel, no proxy). 259 * </pre> 260 * @param request 261 * @return 262 */ 263 BiPredicate<String,String> headerFilter(HttpRequestImpl request) { 264 if (isTunnel()) { 265 // talking to a server through a proxy tunnel 266 // don't send proxy-* headers to a plain server 267 assert !request.isConnect(); 268 return Utils.NO_PROXY_HEADERS_FILTER; 269 } else if (request.isConnect()) { 270 // establishing a proxy tunnel 271 // check for proxy tunnel disabled schemes 272 // assert !this.isTunnel(); 273 assert request.proxy() == null; 274 return Utils.PROXY_TUNNEL_FILTER; 275 } else if (request.proxy() != null) { 276 // talking to a server through a proxy (no tunnel) 277 // check for proxy disabled schemes 278 // assert !isTunnel() && !request.isConnect(); 279 return Utils.PROXY_FILTER; 280 } else { 281 // talking to a server directly (no tunnel, no proxy) 282 // don't send proxy-* headers to a plain server 283 // assert request.proxy() == null && !request.isConnect(); 284 return Utils.NO_PROXY_HEADERS_FILTER; 285 } 286 } 287 288 // Composes a new immutable HttpHeaders that combines the 289 // user and system header but only keeps those headers that 290 // start with "proxy-" 291 private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) { 292 Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); 293 combined.putAll(request.getSystemHeadersBuilder().map()); 294 combined.putAll(request.headers().map()); // let user override system 295 296 // keep only proxy-* - and also strip authorization headers 297 // for disabled schemes 298 return HttpHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER); 299 } 300 301 /* Returns either a plain HTTP connection or a plain tunnelling connection 302 * for proxied WebSocket */ 303 private static HttpConnection getPlainConnection(InetSocketAddress addr, 304 InetSocketAddress proxy, 305 HttpRequestImpl request, 306 HttpClientImpl client) { 307 if (request.isWebSocket() && proxy != null) 308 return new PlainTunnelingConnection(addr, proxy, client, 309 proxyTunnelHeaders(request)); 310 311 if (proxy == null) 312 return new PlainHttpConnection(addr, client); 313 else 314 return new PlainProxyConnection(proxy, client); 315 } 316 317 void closeOrReturnToCache(HttpHeaders hdrs) { 318 if (hdrs == null) { 319 // the connection was closed by server, eof 320 Log.logTrace("Cannot return connection to pool: closing {0}", this); 321 close(); 322 return; 323 } 324 HttpClientImpl client = client(); 325 if (client == null) { 326 Log.logTrace("Client released: closing {0}", this); 327 close(); 328 return; 329 } 330 ConnectionPool pool = client.connectionPool(); 331 boolean keepAlive = hdrs.firstValue("Connection") 332 .map((s) -> !s.equalsIgnoreCase("close")) 333 .orElse(true); 334 335 if (keepAlive && isOpen()) { 336 Log.logTrace("Returning connection to the pool: {0}", this); 337 pool.returnToPool(this); 338 } else { 339 Log.logTrace("Closing connection (keepAlive={0}, isOpen={1}): {2}", 340 keepAlive, isOpen(), this); 341 close(); 342 } 343 } 344 345 /* Tells whether or not this connection is a tunnel through a proxy */ 346 boolean isTunnel() { return false; } 347 348 abstract SocketChannel channel(); 349 350 final InetSocketAddress address() { 351 return address; 352 } 353 354 abstract ConnectionPool.CacheKey cacheKey(); 355 356 /** 357 * Closes this connection, by returning the socket to its connection pool. 358 */ 359 @Override 360 public abstract void close(); 361 362 abstract FlowTube getConnectionFlow(); 363 364 /** 365 * A publisher that makes it possible to publish (write) ordered (normal 366 * priority) and unordered (high priority) buffers downstream. 367 */ 368 final class PlainHttpPublisher implements HttpPublisher { 369 final Object reading; 370 PlainHttpPublisher() { 371 this(new Object()); 372 } 373 PlainHttpPublisher(Object readingLock) { 374 this.reading = readingLock; 375 } 376 final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>(); 377 final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>(); 378 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 379 volatile HttpWriteSubscription subscription; 380 final SequentialScheduler writeScheduler = 381 new SequentialScheduler(this::flushTask); 382 @Override 383 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 384 synchronized (reading) { 385 //assert this.subscription == null; 386 //assert this.subscriber == null; 387 if (subscription == null) { 388 subscription = new HttpWriteSubscription(); 389 } 390 this.subscriber = subscriber; 391 } 392 // TODO: should we do this in the flow? 393 subscriber.onSubscribe(subscription); 394 signal(); 395 } 396 397 void flushTask(DeferredCompleter completer) { 398 try { 399 HttpWriteSubscription sub = subscription; 400 if (sub != null) sub.flush(); 401 } finally { 402 completer.complete(); 403 } 404 } 405 406 void signal() { 407 writeScheduler.runOrSchedule(); 408 } 409 410 final class HttpWriteSubscription implements Flow.Subscription { 411 final Demand demand = new Demand(); 412 413 @Override 414 public void request(long n) { 415 if (n <= 0) throw new IllegalArgumentException("non-positive request"); 416 demand.increase(n); 417 if (debug.on()) 418 debug.log("HttpPublisher: got request of " + n + " from " 419 + getConnectionFlow()); 420 writeScheduler.runOrSchedule(); 421 } 422 423 @Override 424 public void cancel() { 425 if (debug.on()) 426 debug.log("HttpPublisher: cancelled by " + getConnectionFlow()); 427 } 428 429 private boolean isEmpty() { 430 return queue.isEmpty() && priority.isEmpty(); 431 } 432 433 private List<ByteBuffer> poll() { 434 List<ByteBuffer> elem = priority.poll(); 435 return elem == null ? queue.poll() : elem; 436 } 437 438 void flush() { 439 while (!isEmpty() && demand.tryDecrement()) { 440 List<ByteBuffer> elem = poll(); 441 if (debug.on()) 442 debug.log("HttpPublisher: sending " 443 + Utils.remaining(elem) + " bytes (" 444 + elem.size() + " buffers) to " 445 + getConnectionFlow()); 446 subscriber.onNext(elem); 447 } 448 } 449 } 450 451 @Override 452 public void enqueue(List<ByteBuffer> buffers) throws IOException { 453 queue.add(buffers); 454 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 455 debug.log("added %d bytes to the write queue", bytes); 456 } 457 458 @Override 459 public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException { 460 // Unordered frames are sent before existing frames. 461 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 462 priority.add(buffers); 463 debug.log("added %d bytes in the priority write queue", bytes); 464 } 465 466 @Override 467 public void signalEnqueued() throws IOException { 468 debug.log("signalling the publisher of the write queue"); 469 signal(); 470 } 471 } 472 473 String dbgTag; 474 final String dbgString() { 475 FlowTube flow = getConnectionFlow(); 476 String tag = dbgTag; 477 if (tag == null && flow != null) { 478 dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")"; 479 } else if (tag == null) { 480 tag = this.getClass().getSimpleName() + "(?)"; 481 } 482 return tag; 483 } 484 485 @Override 486 public String toString() { 487 return "HttpConnection: " + channel().toString(); 488 } 489 }