1 /* 2 * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.Closeable; 29 import java.io.IOException; 30 import java.net.InetSocketAddress; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.SocketChannel; 33 import java.util.Arrays; 34 import java.util.IdentityHashMap; 35 import java.util.List; 36 import java.util.Map; 37 import java.util.TreeMap; 38 import java.util.concurrent.CompletableFuture; 39 import java.util.concurrent.CompletionStage; 40 import java.util.concurrent.ConcurrentLinkedDeque; 41 import java.util.concurrent.Flow; 42 import java.util.function.BiPredicate; 43 import java.util.function.Predicate; 44 import java.net.http.HttpClient; 45 import java.net.http.HttpClient.Version; 46 import java.net.http.HttpHeaders; 47 import jdk.internal.net.http.common.Demand; 48 import jdk.internal.net.http.common.FlowTube; 49 import jdk.internal.net.http.common.Logger; 50 import jdk.internal.net.http.common.SequentialScheduler; 51 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; 52 import jdk.internal.net.http.common.Log; 53 import jdk.internal.net.http.common.Utils; 54 import static java.net.http.HttpClient.Version.HTTP_2; 55 56 /** 57 * Wraps socket channel layer and takes care of SSL also. 58 * 59 * Subtypes are: 60 * PlainHttpConnection: regular direct TCP connection to server 61 * PlainProxyConnection: plain text proxy connection 62 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server 63 * AsyncSSLConnection: TLS channel direct to server 64 * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel 65 */ 66 abstract class HttpConnection implements Closeable { 67 68 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 69 final static Logger DEBUG_LOGGER = Utils.getDebugLogger( 70 () -> "HttpConnection(SocketTube(?))", Utils.DEBUG); 71 72 /** The address this connection is connected to. Could be a server or a proxy. */ 73 final InetSocketAddress address; 74 private final HttpClientImpl client; 75 private final TrailingOperations trailingOperations; 76 77 HttpConnection(InetSocketAddress address, HttpClientImpl client) { 78 this.address = address; 79 this.client = client; 80 trailingOperations = new TrailingOperations(); 81 } 82 83 private static final class TrailingOperations { 84 private final Map<CompletionStage<?>, Boolean> operations = 85 new IdentityHashMap<>(); 86 void add(CompletionStage<?> cf) { 87 synchronized(operations) { 88 operations.put(cf, Boolean.TRUE); 89 cf.whenComplete((r,t)-> remove(cf)); 90 } 91 } 92 boolean remove(CompletionStage<?> cf) { 93 synchronized(operations) { 94 return operations.remove(cf); 95 } 96 } 97 } 98 99 final void addTrailingOperation(CompletionStage<?> cf) { 100 trailingOperations.add(cf); 101 } 102 103 // final void removeTrailingOperation(CompletableFuture<?> cf) { 104 // trailingOperations.remove(cf); 105 // } 106 107 final HttpClientImpl client() { 108 return client; 109 } 110 111 /** 112 * Initiates the connect phase. 113 * 114 * Returns a CompletableFuture that completes when the underlying 115 * TCP connection has been established or an error occurs. 116 */ 117 public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange); 118 119 /** 120 * Finishes the connection phase. 121 * 122 * Returns a CompletableFuture that completes when any additional, 123 * type specific, setup has been done. Must be called after connectAsync. */ 124 public abstract CompletableFuture<Void> finishConnect(); 125 126 /** Tells whether, or not, this connection is connected to its destination. */ 127 abstract boolean connected(); 128 129 /** Tells whether, or not, this connection is secure ( over SSL ) */ 130 abstract boolean isSecure(); 131 132 /** 133 * Tells whether, or not, this connection is proxied. 134 * Returns true for tunnel connections, or clear connection to 135 * any host through proxy. 136 */ 137 abstract boolean isProxied(); 138 139 /** 140 * Returns the address of the proxy used by this connection. 141 * Returns the proxy address for tunnel connections, or 142 * clear connection to any host through proxy. 143 * Returns {@code null} otherwise. 144 */ 145 abstract InetSocketAddress proxy(); 146 147 /** Tells whether, or not, this connection is open. */ 148 final boolean isOpen() { 149 return channel().isOpen() && 150 (connected() ? !getConnectionFlow().isFinished() : true); 151 } 152 153 interface HttpPublisher extends FlowTube.TubePublisher { 154 void enqueue(List<ByteBuffer> buffers) throws IOException; 155 void enqueueUnordered(List<ByteBuffer> buffers) throws IOException; 156 void signalEnqueued() throws IOException; 157 } 158 159 /** 160 * Returns the HTTP publisher associated with this connection. May be null 161 * if invoked before connecting. 162 */ 163 abstract HttpPublisher publisher(); 164 165 // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS 166 private static final Predicate<String> testRequiredHTTP2TLSVersion = proto -> 167 proto.equals("TLSv1.2") || proto.equals("TLSv1.3"); 168 169 /** 170 * Returns true if the given client's SSL parameter protocols contains at 171 * least one TLS version that HTTP/2 requires. 172 */ 173 private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) { 174 String[] protos = client.sslParameters().getProtocols(); 175 if (protos != null) { 176 return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent(); 177 } else { 178 return false; 179 } 180 } 181 182 /** 183 * Factory for retrieving HttpConnections. A connection can be retrieved 184 * from the connection pool, or a new one created if none available. 185 * 186 * The given {@code addr} is the ultimate destination. Any proxies, 187 * etc, are determined from the request. Returns a concrete instance which 188 * is one of the following: 189 * {@link PlainHttpConnection} 190 * {@link PlainTunnelingConnection} 191 * 192 * The returned connection, if not from the connection pool, must have its, 193 * connect() or connectAsync() method invoked, which ( when it completes 194 * successfully ) renders the connection usable for requests. 195 */ 196 public static HttpConnection getConnection(InetSocketAddress addr, 197 HttpClientImpl client, 198 HttpRequestImpl request, 199 Version version) { 200 // The default proxy selector may select a proxy whose address is 201 // unresolved. We must resolve the address before connecting to it. 202 InetSocketAddress proxy = Utils.resolveAddress(request.proxy()); 203 HttpConnection c = null; 204 boolean secure = request.secure(); 205 ConnectionPool pool = client.connectionPool(); 206 207 if (!secure) { 208 c = pool.getConnection(false, addr, proxy); 209 if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { 210 final HttpConnection conn = c; 211 if (DEBUG_LOGGER.on()) 212 DEBUG_LOGGER.log(conn.getConnectionFlow() 213 + ": plain connection retrieved from HTTP/1.1 pool"); 214 return c; 215 } else { 216 return getPlainConnection(addr, proxy, request, client); 217 } 218 } else { // secure 219 if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool 220 c = pool.getConnection(true, addr, proxy); 221 } 222 if (c != null && c.isOpen()) { 223 final HttpConnection conn = c; 224 if (DEBUG_LOGGER.on()) 225 DEBUG_LOGGER.log(conn.getConnectionFlow() 226 + ": SSL connection retrieved from HTTP/1.1 pool"); 227 return c; 228 } else { 229 String[] alpn = null; 230 if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) { 231 alpn = new String[] { "h2", "http/1.1" }; 232 } 233 return getSSLConnection(addr, proxy, alpn, request, client); 234 } 235 } 236 } 237 238 private static HttpConnection getSSLConnection(InetSocketAddress addr, 239 InetSocketAddress proxy, 240 String[] alpn, 241 HttpRequestImpl request, 242 HttpClientImpl client) { 243 if (proxy != null) 244 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy, 245 proxyTunnelHeaders(request)); 246 else 247 return new AsyncSSLConnection(addr, client, alpn); 248 } 249 250 /** 251 * This method is used to build a filter that will accept or 252 * veto (header-name, value) tuple for transmission on the 253 * wire. 254 * The filter is applied to the headers when sending the headers 255 * to the remote party. 256 * Which tuple is accepted/vetoed depends on: 257 * <pre> 258 * - whether the connection is a tunnel connection 259 * [talking to a server through a proxy tunnel] 260 * - whether the method is CONNECT 261 * [establishing a CONNECT tunnel through a proxy] 262 * - whether the request is using a proxy 263 * (and the connection is not a tunnel) 264 * [talking to a server through a proxy] 265 * - whether the request is a direct connection to 266 * a server (no tunnel, no proxy). 267 * </pre> 268 * @param request 269 * @return 270 */ 271 BiPredicate<String,String> headerFilter(HttpRequestImpl request) { 272 if (isTunnel()) { 273 // talking to a server through a proxy tunnel 274 // don't send proxy-* headers to a plain server 275 assert !request.isConnect(); 276 return Utils.NO_PROXY_HEADERS_FILTER; 277 } else if (request.isConnect()) { 278 // establishing a proxy tunnel 279 // check for proxy tunnel disabled schemes 280 // assert !this.isTunnel(); 281 assert request.proxy() == null; 282 return Utils.PROXY_TUNNEL_FILTER; 283 } else if (request.proxy() != null) { 284 // talking to a server through a proxy (no tunnel) 285 // check for proxy disabled schemes 286 // assert !isTunnel() && !request.isConnect(); 287 return Utils.PROXY_FILTER; 288 } else { 289 // talking to a server directly (no tunnel, no proxy) 290 // don't send proxy-* headers to a plain server 291 // assert request.proxy() == null && !request.isConnect(); 292 return Utils.NO_PROXY_HEADERS_FILTER; 293 } 294 } 295 296 // Composes a new immutable HttpHeaders that combines the 297 // user and system header but only keeps those headers that 298 // start with "proxy-" 299 private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) { 300 Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); 301 combined.putAll(request.getSystemHeadersBuilder().map()); 302 combined.putAll(request.headers().map()); // let user override system 303 304 // keep only proxy-* - and also strip authorization headers 305 // for disabled schemes 306 return HttpHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER); 307 } 308 309 /* Returns either a plain HTTP connection or a plain tunnelling connection 310 * for proxied WebSocket */ 311 private static HttpConnection getPlainConnection(InetSocketAddress addr, 312 InetSocketAddress proxy, 313 HttpRequestImpl request, 314 HttpClientImpl client) { 315 if (request.isWebSocket() && proxy != null) 316 return new PlainTunnelingConnection(addr, proxy, client, 317 proxyTunnelHeaders(request)); 318 319 if (proxy == null) 320 return new PlainHttpConnection(addr, client); 321 else 322 return new PlainProxyConnection(proxy, client); 323 } 324 325 void closeOrReturnToCache(HttpHeaders hdrs) { 326 if (hdrs == null) { 327 // the connection was closed by server, eof 328 Log.logTrace("Cannot return connection to pool: closing {0}", this); 329 close(); 330 return; 331 } 332 HttpClientImpl client = client(); 333 if (client == null) { 334 Log.logTrace("Client released: closing {0}", this); 335 close(); 336 return; 337 } 338 ConnectionPool pool = client.connectionPool(); 339 boolean keepAlive = hdrs.firstValue("Connection") 340 .map((s) -> !s.equalsIgnoreCase("close")) 341 .orElse(true); 342 343 if (keepAlive && isOpen()) { 344 Log.logTrace("Returning connection to the pool: {0}", this); 345 pool.returnToPool(this); 346 } else { 347 Log.logTrace("Closing connection (keepAlive={0}, isOpen={1}): {2}", 348 keepAlive, isOpen(), this); 349 close(); 350 } 351 } 352 353 /* Tells whether or not this connection is a tunnel through a proxy */ 354 boolean isTunnel() { return false; } 355 356 abstract SocketChannel channel(); 357 358 final InetSocketAddress address() { 359 return address; 360 } 361 362 abstract ConnectionPool.CacheKey cacheKey(); 363 364 /** 365 * Closes this connection, by returning the socket to its connection pool. 366 */ 367 @Override 368 public abstract void close(); 369 370 abstract FlowTube getConnectionFlow(); 371 372 /** 373 * A publisher that makes it possible to publish (write) ordered (normal 374 * priority) and unordered (high priority) buffers downstream. 375 */ 376 final class PlainHttpPublisher implements HttpPublisher { 377 final Object reading; 378 PlainHttpPublisher() { 379 this(new Object()); 380 } 381 PlainHttpPublisher(Object readingLock) { 382 this.reading = readingLock; 383 } 384 final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>(); 385 final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>(); 386 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 387 volatile HttpWriteSubscription subscription; 388 final SequentialScheduler writeScheduler = 389 new SequentialScheduler(this::flushTask); 390 @Override 391 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 392 synchronized (reading) { 393 //assert this.subscription == null; 394 //assert this.subscriber == null; 395 if (subscription == null) { 396 subscription = new HttpWriteSubscription(); 397 } 398 this.subscriber = subscriber; 399 } 400 // TODO: should we do this in the flow? 401 subscriber.onSubscribe(subscription); 402 signal(); 403 } 404 405 void flushTask(DeferredCompleter completer) { 406 try { 407 HttpWriteSubscription sub = subscription; 408 if (sub != null) sub.flush(); 409 } finally { 410 completer.complete(); 411 } 412 } 413 414 void signal() { 415 writeScheduler.runOrSchedule(); 416 } 417 418 final class HttpWriteSubscription implements Flow.Subscription { 419 final Demand demand = new Demand(); 420 421 @Override 422 public void request(long n) { 423 if (n <= 0) throw new IllegalArgumentException("non-positive request"); 424 demand.increase(n); 425 if (debug.on()) 426 debug.log("HttpPublisher: got request of " + n + " from " 427 + getConnectionFlow()); 428 writeScheduler.runOrSchedule(); 429 } 430 431 @Override 432 public void cancel() { 433 if (debug.on()) 434 debug.log("HttpPublisher: cancelled by " + getConnectionFlow()); 435 } 436 437 private boolean isEmpty() { 438 return queue.isEmpty() && priority.isEmpty(); 439 } 440 441 private List<ByteBuffer> poll() { 442 List<ByteBuffer> elem = priority.poll(); 443 return elem == null ? queue.poll() : elem; 444 } 445 446 void flush() { 447 while (!isEmpty() && demand.tryDecrement()) { 448 List<ByteBuffer> elem = poll(); 449 if (debug.on()) 450 debug.log("HttpPublisher: sending " 451 + Utils.remaining(elem) + " bytes (" 452 + elem.size() + " buffers) to " 453 + getConnectionFlow()); 454 subscriber.onNext(elem); 455 } 456 } 457 } 458 459 @Override 460 public void enqueue(List<ByteBuffer> buffers) throws IOException { 461 queue.add(buffers); 462 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 463 debug.log("added %d bytes to the write queue", bytes); 464 } 465 466 @Override 467 public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException { 468 // Unordered frames are sent before existing frames. 469 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 470 priority.add(buffers); 471 debug.log("added %d bytes in the priority write queue", bytes); 472 } 473 474 @Override 475 public void signalEnqueued() throws IOException { 476 debug.log("signalling the publisher of the write queue"); 477 signal(); 478 } 479 } 480 481 String dbgTag; 482 final String dbgString() { 483 FlowTube flow = getConnectionFlow(); 484 String tag = dbgTag; 485 if (tag == null && flow != null) { 486 dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")"; 487 } else if (tag == null) { 488 tag = this.getClass().getSimpleName() + "(?)"; 489 } 490 return tag; 491 } 492 493 @Override 494 public String toString() { 495 return "HttpConnection: " + channel().toString(); 496 } 497 }