1 /* 2 * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.IOException; 29 import java.net.ConnectException; 30 import java.net.InetSocketAddress; 31 import java.net.StandardSocketOptions; 32 import java.nio.channels.SelectableChannel; 33 import java.nio.channels.SelectionKey; 34 import java.nio.channels.SocketChannel; 35 import java.security.AccessController; 36 import java.security.PrivilegedActionException; 37 import java.security.PrivilegedExceptionAction; 38 import java.time.Duration; 39 import java.util.concurrent.CompletableFuture; 40 import jdk.internal.net.http.common.FlowTube; 41 import jdk.internal.net.http.common.Log; 42 import jdk.internal.net.http.common.MinimalFuture; 43 import jdk.internal.net.http.common.Utils; 44 45 /** 46 * Plain raw TCP connection direct to destination. 47 * The connection operates in asynchronous non-blocking mode. 48 * All reads and writes are done non-blocking. 49 */ 50 class PlainHttpConnection extends HttpConnection { 51 52 private final Object reading = new Object(); 53 protected final SocketChannel chan; 54 private final SocketTube tube; // need SocketTube to call signalClosed(). 55 private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading); 56 private volatile boolean connected; 57 private boolean closed; 58 private volatile ConnectTimerEvent connectTimerEvent; // may be null 59 60 // should be volatile to provide proper synchronization(visibility) action 61 62 /** 63 * Returns a ConnectTimerEvent iff there is a connect timeout duration, 64 * otherwise null. 65 */ 66 private ConnectTimerEvent newConnectTimer(Exchange<?> exchange, 67 CompletableFuture<Void> cf) { 68 Duration duration = exchange.remainingConnectTimeout().orElse(null); 69 if (duration != null) { 70 ConnectTimerEvent cte = new ConnectTimerEvent(duration, exchange, cf); 71 return cte; 72 } 73 return null; 74 } 75 76 final class ConnectTimerEvent extends TimeoutEvent { 77 private final CompletableFuture<Void> cf; 78 private final Exchange<?> exchange; 79 80 ConnectTimerEvent(Duration duration, 81 Exchange<?> exchange, 82 CompletableFuture<Void> cf) { 83 super(duration); 84 this.exchange = exchange; 85 this.cf = cf; 86 } 87 88 @Override 89 public void handle() { 90 if (debug.on()) { 91 debug.log("HTTP connect timed out"); 92 } 93 ConnectException ce = new ConnectException("HTTP connect timed out"); 94 exchange.multi.cancel(ce); 95 client().theExecutor().execute(() -> cf.completeExceptionally(ce)); 96 } 97 98 @Override 99 public String toString() { 100 return "ConnectTimerEvent, " + super.toString(); 101 } 102 } 103 104 final class ConnectEvent extends AsyncEvent { 105 private final CompletableFuture<Void> cf; 106 107 ConnectEvent(CompletableFuture<Void> cf) { 108 this.cf = cf; 109 } 110 111 @Override 112 public SelectableChannel channel() { 113 return chan; 114 } 115 116 @Override 117 public int interestOps() { 118 return SelectionKey.OP_CONNECT; 119 } 120 121 @Override 122 public void handle() { 123 try { 124 assert !connected : "Already connected"; 125 assert !chan.isBlocking() : "Unexpected blocking channel"; 126 if (debug.on()) 127 debug.log("ConnectEvent: finishing connect"); 128 boolean finished = chan.finishConnect(); 129 assert finished : "Expected channel to be connected"; 130 if (debug.on()) 131 debug.log("ConnectEvent: connect finished: %s Local addr: %s", 132 finished, chan.getLocalAddress()); 133 // complete async since the event runs on the SelectorManager thread 134 cf.completeAsync(() -> null, client().theExecutor()); 135 } catch (Throwable e) { 136 Throwable t = Utils.toConnectException(e); 137 client().theExecutor().execute( () -> cf.completeExceptionally(t)); 138 close(); 139 } 140 } 141 142 @Override 143 public void abort(IOException ioe) { 144 client().theExecutor().execute( () -> cf.completeExceptionally(ioe)); 145 close(); 146 } 147 } 148 149 @Override 150 public CompletableFuture<Void> connectAsync(Exchange<?> exchange) { 151 CompletableFuture<Void> cf = new MinimalFuture<>(); 152 try { 153 assert !connected : "Already connected"; 154 assert !chan.isBlocking() : "Unexpected blocking channel"; 155 boolean finished; 156 157 connectTimerEvent = newConnectTimer(exchange, cf); 158 if (connectTimerEvent != null) { 159 if (debug.on()) 160 debug.log("registering connect timer: " + connectTimerEvent); 161 client().registerTimer(connectTimerEvent); 162 } 163 164 PrivilegedExceptionAction<Boolean> pa = 165 () -> chan.connect(Utils.resolveAddress(address)); 166 try { 167 finished = AccessController.doPrivileged(pa); 168 } catch (PrivilegedActionException e) { 169 throw e.getCause(); 170 } 171 if (finished) { 172 if (debug.on()) debug.log("connect finished without blocking"); 173 cf.complete(null); 174 } else { 175 if (debug.on()) debug.log("registering connect event"); 176 client().registerEvent(new ConnectEvent(cf)); 177 } 178 } catch (Throwable throwable) { 179 cf.completeExceptionally(Utils.toConnectException(throwable)); 180 try { 181 close(); 182 } catch (Exception x) { 183 if (debug.on()) 184 debug.log("Failed to close channel after unsuccessful connect"); 185 } 186 } 187 return cf; 188 } 189 190 @Override 191 public CompletableFuture<Void> finishConnect() { 192 assert connected == false; 193 if (debug.on()) debug.log("finishConnect, setting connected=true"); 194 connected = true; 195 if (connectTimerEvent != null) 196 client().cancelTimer(connectTimerEvent); 197 return MinimalFuture.completedFuture(null); 198 } 199 200 @Override 201 SocketChannel channel() { 202 return chan; 203 } 204 205 @Override 206 final FlowTube getConnectionFlow() { 207 return tube; 208 } 209 210 PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) { 211 super(addr, client); 212 try { 213 this.chan = SocketChannel.open(); 214 chan.configureBlocking(false); 215 trySetReceiveBufferSize(client.getReceiveBufferSize()); 216 if (debug.on()) { 217 int bufsize = getInitialBufferSize(); 218 debug.log("Initial receive buffer size is: %d", bufsize); 219 } 220 chan.setOption(StandardSocketOptions.TCP_NODELAY, true); 221 // wrap the channel in a Tube for async reading and writing 222 tube = new SocketTube(client(), chan, Utils::getBuffer); 223 } catch (IOException e) { 224 throw new InternalError(e); 225 } 226 } 227 228 private int getInitialBufferSize() { 229 try { 230 return chan.getOption(StandardSocketOptions.SO_RCVBUF); 231 } catch(IOException x) { 232 if (debug.on()) 233 debug.log("Failed to get initial receive buffer size on %s", chan); 234 } 235 return 0; 236 } 237 238 private void trySetReceiveBufferSize(int bufsize) { 239 try { 240 if (bufsize > 0) { 241 chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize); 242 } 243 } catch(IOException x) { 244 if (debug.on()) 245 debug.log("Failed to set receive buffer size to %d on %s", 246 bufsize, chan); 247 } 248 } 249 250 @Override 251 HttpPublisher publisher() { return writePublisher; } 252 253 254 @Override 255 public String toString() { 256 return "PlainHttpConnection: " + super.toString(); 257 } 258 259 /** 260 * Closes this connection 261 */ 262 @Override 263 public void close() { 264 synchronized (this) { 265 if (closed) { 266 return; 267 } 268 closed = true; 269 } 270 try { 271 Log.logTrace("Closing: " + toString()); 272 if (debug.on()) 273 debug.log("Closing channel: " + client().debugInterestOps(chan)); 274 if (connectTimerEvent != null) 275 client().cancelTimer(connectTimerEvent); 276 chan.close(); 277 tube.signalClosed(); 278 } catch (IOException e) { 279 Log.logTrace("Closing resulted in " + e); 280 } 281 } 282 283 284 @Override 285 ConnectionPool.CacheKey cacheKey() { 286 return new ConnectionPool.CacheKey(address, null); 287 } 288 289 @Override 290 synchronized boolean connected() { 291 return connected; 292 } 293 294 295 @Override 296 boolean isSecure() { 297 return false; 298 } 299 300 @Override 301 boolean isProxied() { 302 return false; 303 } 304 305 }