1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.Closeable; 29 import java.io.IOException; 30 import java.net.InetSocketAddress; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.SocketChannel; 33 import java.util.Arrays; 34 import java.util.IdentityHashMap; 35 import java.util.List; 36 import java.util.Map; 37 import java.util.TreeMap; 38 import java.util.concurrent.CompletableFuture; 39 import java.util.concurrent.CompletionStage; 40 import java.util.concurrent.ConcurrentLinkedDeque; 41 import java.util.concurrent.Flow; 42 import java.util.function.BiPredicate; 43 import java.util.function.Predicate; 44 import java.net.http.HttpClient; 45 import java.net.http.HttpClient.Version; 46 import java.net.http.HttpHeaders; 47 import jdk.internal.net.http.common.Demand; 48 import jdk.internal.net.http.common.FlowTube; 49 import jdk.internal.net.http.common.Logger; 50 import jdk.internal.net.http.common.SequentialScheduler; 51 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; 52 import jdk.internal.net.http.common.Log; 53 import jdk.internal.net.http.common.Utils; 54 import static java.net.http.HttpClient.Version.HTTP_2; 55 56 /** 57 * Wraps socket channel layer and takes care of SSL also. 58 * 59 * Subtypes are: 60 * PlainHttpConnection: regular direct TCP connection to server 61 * PlainProxyConnection: plain text proxy connection 62 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server 63 * AsyncSSLConnection: TLS channel direct to server 64 * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel 65 */ 66 abstract class HttpConnection implements Closeable { 67 68 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 69 final static Logger DEBUG_LOGGER = Utils.getDebugLogger( 70 () -> "HttpConnection(SocketTube(?))", Utils.DEBUG); 71 72 /** The address this connection is connected to. Could be a server or a proxy. */ 73 final InetSocketAddress address; 74 private final HttpClientImpl client; 75 private final TrailingOperations trailingOperations; 76 77 HttpConnection(InetSocketAddress address, HttpClientImpl client) { 78 this.address = address; 79 this.client = client; 80 trailingOperations = new TrailingOperations(); 81 } 82 83 private static final class TrailingOperations { 84 private final Map<CompletionStage<?>, Boolean> operations = 85 new IdentityHashMap<>(); 86 void add(CompletionStage<?> cf) { 87 synchronized(operations) { 88 cf.whenComplete((r,t)-> remove(cf)); 89 operations.put(cf, Boolean.TRUE); 90 } 91 } 92 boolean remove(CompletionStage<?> cf) { 93 synchronized(operations) { 94 return operations.remove(cf); 95 } 96 } 97 } 98 99 final void addTrailingOperation(CompletionStage<?> cf) { 100 trailingOperations.add(cf); 101 } 102 103 // final void removeTrailingOperation(CompletableFuture<?> cf) { 104 // trailingOperations.remove(cf); 105 // } 106 107 final HttpClientImpl client() { 108 return client; 109 } 110 111 /** 112 * Initiates the connect and returns a CompletableFuture that completes 113 * when the initial connection phase has been established or an error 114 * occurs. The given exchange provides access connect timeout configuration. 115 */ 116 public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange); 117 118 /** Completes the connection phase. Must be called after connectAsync. */ 119 public abstract CompletableFuture<Void> finishConnect(); 120 121 /** Tells whether, or not, this connection is connected to its destination. */ 122 abstract boolean connected(); 123 124 /** Tells whether, or not, this connection is secure ( over SSL ) */ 125 abstract boolean isSecure(); 126 127 /** 128 * Tells whether, or not, this connection is proxied. 129 * Returns true for tunnel connections, or clear connection to 130 * any host through proxy. 131 */ 132 abstract boolean isProxied(); 133 134 /** Tells whether, or not, this connection is open. */ 135 final boolean isOpen() { 136 return channel().isOpen() && 137 (connected() ? !getConnectionFlow().isFinished() : true); 138 } 139 140 interface HttpPublisher extends FlowTube.TubePublisher { 141 void enqueue(List<ByteBuffer> buffers) throws IOException; 142 void enqueueUnordered(List<ByteBuffer> buffers) throws IOException; 143 void signalEnqueued() throws IOException; 144 } 145 146 /** 147 * Returns the HTTP publisher associated with this connection. May be null 148 * if invoked before connecting. 149 */ 150 abstract HttpPublisher publisher(); 151 152 // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS 153 private static final Predicate<String> testRequiredHTTP2TLSVersion = proto -> 154 proto.equals("TLSv1.2") || proto.equals("TLSv1.3"); 155 156 /** 157 * Returns true if the given client's SSL parameter protocols contains at 158 * least one TLS version that HTTP/2 requires. 159 */ 160 private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) { 161 String[] protos = client.sslParameters().getProtocols(); 162 if (protos != null) { 163 return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent(); 164 } else { 165 return false; 166 } 167 } 168 169 /** 170 * Factory for retrieving HttpConnections. A connection can be retrieved 171 * from the connection pool, or a new one created if none available. 172 * 173 * The given {@code addr} is the ultimate destination. Any proxies, 174 * etc, are determined from the request. Returns a concrete instance which 175 * is one of the following: 176 * {@link PlainHttpConnection} 177 * {@link PlainTunnelingConnection} 178 * 179 * The returned connection, if not from the connection pool, must have its, 180 * connect() or connectAsync() method invoked, which ( when it completes 181 * successfully ) renders the connection usable for requests. 182 */ 183 public static HttpConnection getConnection(InetSocketAddress addr, 184 HttpClientImpl client, 185 HttpRequestImpl request, 186 Version version) { 187 // The default proxy selector may select a proxy whose address is 188 // unresolved. We must resolve the address before connecting to it. 189 InetSocketAddress proxy = Utils.resolveAddress(request.proxy()); 190 HttpConnection c = null; 191 boolean secure = request.secure(); 192 ConnectionPool pool = client.connectionPool(); 193 194 if (!secure) { 195 c = pool.getConnection(false, addr, proxy); 196 if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { 197 final HttpConnection conn = c; 198 if (DEBUG_LOGGER.on()) 199 DEBUG_LOGGER.log(conn.getConnectionFlow() 200 + ": plain connection retrieved from HTTP/1.1 pool"); 201 return c; 202 } else { 203 return getPlainConnection(addr, proxy, request, client); 204 } 205 } else { // secure 206 if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool 207 c = pool.getConnection(true, addr, proxy); 208 } 209 if (c != null && c.isOpen()) { 210 final HttpConnection conn = c; 211 if (DEBUG_LOGGER.on()) 212 DEBUG_LOGGER.log(conn.getConnectionFlow() 213 + ": SSL connection retrieved from HTTP/1.1 pool"); 214 return c; 215 } else { 216 String[] alpn = null; 217 if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) { 218 alpn = new String[] { "h2", "http/1.1" }; 219 } 220 return getSSLConnection(addr, proxy, alpn, request, client); 221 } 222 } 223 } 224 225 private static HttpConnection getSSLConnection(InetSocketAddress addr, 226 InetSocketAddress proxy, 227 String[] alpn, 228 HttpRequestImpl request, 229 HttpClientImpl client) { 230 if (proxy != null) 231 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy, 232 proxyTunnelHeaders(request)); 233 else 234 return new AsyncSSLConnection(addr, client, alpn); 235 } 236 237 /** 238 * This method is used to build a filter that will accept or 239 * veto (header-name, value) tuple for transmission on the 240 * wire. 241 * The filter is applied to the headers when sending the headers 242 * to the remote party. 243 * Which tuple is accepted/vetoed depends on: 244 * <pre> 245 * - whether the connection is a tunnel connection 246 * [talking to a server through a proxy tunnel] 247 * - whether the method is CONNECT 248 * [establishing a CONNECT tunnel through a proxy] 249 * - whether the request is using a proxy 250 * (and the connection is not a tunnel) 251 * [talking to a server through a proxy] 252 * - whether the request is a direct connection to 253 * a server (no tunnel, no proxy). 254 * </pre> 255 * @param request 256 * @return 257 */ 258 BiPredicate<String,String> headerFilter(HttpRequestImpl request) { 259 if (isTunnel()) { 260 // talking to a server through a proxy tunnel 261 // don't send proxy-* headers to a plain server 262 assert !request.isConnect(); 263 return Utils.NO_PROXY_HEADERS_FILTER; 264 } else if (request.isConnect()) { 265 // establishing a proxy tunnel 266 // check for proxy tunnel disabled schemes 267 // assert !this.isTunnel(); 268 assert request.proxy() == null; 269 return Utils.PROXY_TUNNEL_FILTER; 270 } else if (request.proxy() != null) { 271 // talking to a server through a proxy (no tunnel) 272 // check for proxy disabled schemes 273 // assert !isTunnel() && !request.isConnect(); 274 return Utils.PROXY_FILTER; 275 } else { 276 // talking to a server directly (no tunnel, no proxy) 277 // don't send proxy-* headers to a plain server 278 // assert request.proxy() == null && !request.isConnect(); 279 return Utils.NO_PROXY_HEADERS_FILTER; 280 } 281 } 282 283 // Composes a new immutable HttpHeaders that combines the 284 // user and system header but only keeps those headers that 285 // start with "proxy-" 286 private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) { 287 Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); 288 combined.putAll(request.getSystemHeadersBuilder().map()); 289 combined.putAll(request.headers().map()); // let user override system 290 291 // keep only proxy-* - and also strip authorization headers 292 // for disabled schemes 293 return HttpHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER); 294 } 295 296 /* Returns either a plain HTTP connection or a plain tunnelling connection 297 * for proxied WebSocket */ 298 private static HttpConnection getPlainConnection(InetSocketAddress addr, 299 InetSocketAddress proxy, 300 HttpRequestImpl request, 301 HttpClientImpl client) { 302 if (request.isWebSocket() && proxy != null) 303 return new PlainTunnelingConnection(addr, proxy, client, 304 proxyTunnelHeaders(request)); 305 306 if (proxy == null) 307 return new PlainHttpConnection(addr, client); 308 else 309 return new PlainProxyConnection(proxy, client); 310 } 311 312 void closeOrReturnToCache(HttpHeaders hdrs) { 313 if (hdrs == null) { 314 // the connection was closed by server, eof 315 close(); 316 return; 317 } 318 if (!isOpen()) { 319 return; 320 } 321 HttpClientImpl client = client(); 322 if (client == null) { 323 close(); 324 return; 325 } 326 ConnectionPool pool = client.connectionPool(); 327 boolean keepAlive = hdrs.firstValue("Connection") 328 .map((s) -> !s.equalsIgnoreCase("close")) 329 .orElse(true); 330 331 if (keepAlive) { 332 Log.logTrace("Returning connection to the pool: {0}", this); 333 pool.returnToPool(this); 334 } else { 335 close(); 336 } 337 } 338 339 /* Tells whether or not this connection is a tunnel through a proxy */ 340 boolean isTunnel() { return false; } 341 342 abstract SocketChannel channel(); 343 344 final InetSocketAddress address() { 345 return address; 346 } 347 348 abstract ConnectionPool.CacheKey cacheKey(); 349 350 /** 351 * Closes this connection, by returning the socket to its connection pool. 352 */ 353 @Override 354 public abstract void close(); 355 356 abstract FlowTube getConnectionFlow(); 357 358 /** 359 * A publisher that makes it possible to publish (write) ordered (normal 360 * priority) and unordered (high priority) buffers downstream. 361 */ 362 final class PlainHttpPublisher implements HttpPublisher { 363 final Object reading; 364 PlainHttpPublisher() { 365 this(new Object()); 366 } 367 PlainHttpPublisher(Object readingLock) { 368 this.reading = readingLock; 369 } 370 final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>(); 371 final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>(); 372 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 373 volatile HttpWriteSubscription subscription; 374 final SequentialScheduler writeScheduler = 375 new SequentialScheduler(this::flushTask); 376 @Override 377 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 378 synchronized (reading) { 379 //assert this.subscription == null; 380 //assert this.subscriber == null; 381 if (subscription == null) { 382 subscription = new HttpWriteSubscription(); 383 } 384 this.subscriber = subscriber; 385 } 386 // TODO: should we do this in the flow? 387 subscriber.onSubscribe(subscription); 388 signal(); 389 } 390 391 void flushTask(DeferredCompleter completer) { 392 try { 393 HttpWriteSubscription sub = subscription; 394 if (sub != null) sub.flush(); 395 } finally { 396 completer.complete(); 397 } 398 } 399 400 void signal() { 401 writeScheduler.runOrSchedule(); 402 } 403 404 final class HttpWriteSubscription implements Flow.Subscription { 405 final Demand demand = new Demand(); 406 407 @Override 408 public void request(long n) { 409 if (n <= 0) throw new IllegalArgumentException("non-positive request"); 410 demand.increase(n); 411 if (debug.on()) 412 debug.log("HttpPublisher: got request of " + n + " from " 413 + getConnectionFlow()); 414 writeScheduler.runOrSchedule(); 415 } 416 417 @Override 418 public void cancel() { 419 if (debug.on()) 420 debug.log("HttpPublisher: cancelled by " + getConnectionFlow()); 421 } 422 423 private boolean isEmpty() { 424 return queue.isEmpty() && priority.isEmpty(); 425 } 426 427 private List<ByteBuffer> poll() { 428 List<ByteBuffer> elem = priority.poll(); 429 return elem == null ? queue.poll() : elem; 430 } 431 432 void flush() { 433 while (!isEmpty() && demand.tryDecrement()) { 434 List<ByteBuffer> elem = poll(); 435 if (debug.on()) 436 debug.log("HttpPublisher: sending " 437 + Utils.remaining(elem) + " bytes (" 438 + elem.size() + " buffers) to " 439 + getConnectionFlow()); 440 subscriber.onNext(elem); 441 } 442 } 443 } 444 445 @Override 446 public void enqueue(List<ByteBuffer> buffers) throws IOException { 447 queue.add(buffers); 448 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 449 debug.log("added %d bytes to the write queue", bytes); 450 } 451 452 @Override 453 public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException { 454 // Unordered frames are sent before existing frames. 455 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 456 priority.add(buffers); 457 debug.log("added %d bytes in the priority write queue", bytes); 458 } 459 460 @Override 461 public void signalEnqueued() throws IOException { 462 debug.log("signalling the publisher of the write queue"); 463 signal(); 464 } 465 } 466 467 String dbgTag; 468 final String dbgString() { 469 FlowTube flow = getConnectionFlow(); 470 String tag = dbgTag; 471 if (tag == null && flow != null) { 472 dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")"; 473 } else if (tag == null) { 474 tag = this.getClass().getSimpleName() + "(?)"; 475 } 476 return tag; 477 } 478 479 @Override 480 public String toString() { 481 return "HttpConnection: " + channel().toString(); 482 } 483 }