1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.net.InetSocketAddress; 31 import java.net.StandardSocketOptions; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.SelectableChannel; 34 import java.nio.channels.SelectionKey; 35 import java.nio.channels.SocketChannel; 36 import java.security.AccessController; 37 import java.security.PrivilegedActionException; 38 import java.security.PrivilegedExceptionAction; 39 import java.util.concurrent.CompletableFuture; 40 import jdk.incubator.http.internal.common.FlowTube; 41 import jdk.incubator.http.internal.common.Log; 42 import jdk.incubator.http.internal.common.MinimalFuture; 43 import jdk.incubator.http.internal.common.Utils; 44 45 /** 46 * Plain raw TCP connection direct to destination. 47 * The connection operates in asynchronous non-blocking mode. 48 * All reads and writes are done non-blocking. 49 */ 50 class PlainHttpConnection extends HttpConnection { 51 52 private final Object reading = new Object(); 53 protected final SocketChannel chan; 54 private final FlowTube tube; 55 private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading); 56 private volatile boolean connected; 57 private boolean closed; 58 59 // should be volatile to provide proper synchronization(visibility) action 60 61 final class ConnectEvent extends AsyncEvent { 62 private final CompletableFuture<Void> cf; 63 64 ConnectEvent(CompletableFuture<Void> cf) { 65 this.cf = cf; 66 } 67 68 @Override 69 public SelectableChannel channel() { 70 return chan; 71 } 72 73 @Override 74 public int interestOps() { 75 return SelectionKey.OP_CONNECT; 76 } 77 78 @Override 79 public void handle() { 80 try { 81 assert !connected : "Already connected"; 82 assert !chan.isBlocking() : "Unexpected blocking channel"; 83 debug.log(Level.DEBUG, "ConnectEvent: finishing connect"); 84 boolean finished = chan.finishConnect(); 85 assert finished : "Expected channel to be connected"; 86 debug.log(Level.DEBUG, 87 "ConnectEvent: connect finished: %s", finished); 88 connected = true; 89 // complete async since the event runs on the SelectorManager thread 90 cf.completeAsync(() -> null, client().theExecutor()); 91 } catch (Throwable e) { 92 client().theExecutor().execute( () -> cf.completeExceptionally(e)); 93 } 94 } 95 96 @Override 97 public void abort(IOException ioe) { 98 close(); 99 client().theExecutor().execute( () -> cf.completeExceptionally(ioe)); 100 } 101 } 102 103 @Override 104 public CompletableFuture<Void> connectAsync() { 105 CompletableFuture<Void> cf = new MinimalFuture<>(); 106 try { 107 assert !connected : "Already connected"; 108 assert !chan.isBlocking() : "Unexpected blocking channel"; 109 boolean finished = false; 110 PrivilegedExceptionAction<Boolean> pa = () -> chan.connect(address); 111 try { 112 finished = AccessController.doPrivileged(pa); 113 } catch (PrivilegedActionException e) { 114 cf.completeExceptionally(e.getCause()); 115 } 116 if (finished) { 117 debug.log(Level.DEBUG, "connect finished without blocking"); 118 connected = true; 119 cf.complete(null); 120 } else { 121 debug.log(Level.DEBUG, "registering connect event"); 122 client().registerEvent(new ConnectEvent(cf)); 123 } 124 } catch (Throwable throwable) { 125 cf.completeExceptionally(throwable); 126 } 127 return cf; 128 } 129 130 @Override 131 SocketChannel channel() { 132 return chan; 133 } 134 135 @Override 136 final FlowTube getConnectionFlow() { 137 return tube; 138 } 139 140 PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) { 141 super(addr, client); 142 try { 143 this.chan = SocketChannel.open(); 144 chan.configureBlocking(false); 145 int bufsize = client.getReceiveBufferSize(); 146 if (!trySetReceiveBufferSize(bufsize)) { 147 trySetReceiveBufferSize(256*1024); 148 } 149 chan.setOption(StandardSocketOptions.TCP_NODELAY, true); 150 // wrap the connected channel in a Tube for async reading and writing 151 tube = new SocketTube(client(), chan, Utils::getBuffer); 152 } catch (IOException e) { 153 throw new InternalError(e); 154 } 155 } 156 157 private boolean trySetReceiveBufferSize(int bufsize) { 158 try { 159 chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize); 160 return true; 161 } catch(IOException x) { 162 debug.log(Level.DEBUG, 163 "Failed to set receive buffer size to %d on %s", 164 bufsize, chan); 165 } 166 return false; 167 } 168 169 @Override 170 HttpPublisher publisher() { return writePublisher; } 171 172 173 @Override 174 public String toString() { 175 return "PlainHttpConnection: " + super.toString(); 176 } 177 178 /** 179 * Closes this connection 180 */ 181 @Override 182 public synchronized void close() { 183 if (closed) { 184 return; 185 } 186 closed = true; 187 try { 188 Log.logTrace("Closing: " + toString()); 189 chan.close(); 190 } catch (IOException e) {} 191 } 192 193 @Override 194 void shutdownInput() throws IOException { 195 debug.log(Level.DEBUG, "Shutting down input"); 196 chan.shutdownInput(); 197 } 198 199 @Override 200 void shutdownOutput() throws IOException { 201 debug.log(Level.DEBUG, "Shutting down output"); 202 chan.shutdownOutput(); 203 } 204 205 @Override 206 ConnectionPool.CacheKey cacheKey() { 207 return new ConnectionPool.CacheKey(address, null); 208 } 209 210 @Override 211 synchronized boolean connected() { 212 return connected; 213 } 214 215 216 @Override 217 boolean isSecure() { 218 return false; 219 } 220 221 @Override 222 boolean isProxied() { 223 return false; 224 } 225 226 // Support for WebSocket/RawChannelImpl which unfortunately 227 // still depends on synchronous read/writes. 228 // It should be removed when RawChannelImpl moves to using asynchronous APIs. 229 private static final class PlainDetachedChannel 230 extends DetachedConnectionChannel { 231 final PlainHttpConnection plainConnection; 232 boolean closed; 233 PlainDetachedChannel(PlainHttpConnection conn) { 234 // We're handing the connection channel over to a web socket. 235 // We need the selector manager's thread to stay alive until 236 // the WebSocket is closed. 237 conn.client().webSocketOpen(); 238 this.plainConnection = conn; 239 } 240 241 @Override 242 SocketChannel channel() { 243 return plainConnection.channel(); 244 } 245 246 @Override 247 ByteBuffer read() throws IOException { 248 ByteBuffer dst = ByteBuffer.allocate(8192); 249 int n = readImpl(dst); 250 if (n > 0) { 251 return dst; 252 } else if (n == 0) { 253 return Utils.EMPTY_BYTEBUFFER; 254 } else { 255 return null; 256 } 257 } 258 259 @Override 260 public void close() { 261 HttpClientImpl client = plainConnection.client(); 262 try { 263 plainConnection.close(); 264 } finally { 265 // notify the HttpClientImpl that the websocket is no 266 // no longer operating. 267 synchronized(this) { 268 if (closed == true) return; 269 closed = true; 270 } 271 client.webSocketClose(); 272 } 273 } 274 275 @Override 276 public long write(ByteBuffer[] buffers, int start, int number) 277 throws IOException 278 { 279 return channel().write(buffers, start, number); 280 } 281 282 @Override 283 public void shutdownInput() throws IOException { 284 plainConnection.shutdownInput(); 285 } 286 287 @Override 288 public void shutdownOutput() throws IOException { 289 plainConnection.shutdownOutput(); 290 } 291 292 private int readImpl(ByteBuffer buf) throws IOException { 293 int mark = buf.position(); 294 int n; 295 n = channel().read(buf); 296 if (n == -1) { 297 return -1; 298 } 299 Utils.flipToMark(buf, mark); 300 return n; 301 } 302 } 303 304 // Support for WebSocket/RawChannelImpl which unfortunately 305 // still depends on synchronous read/writes. 306 // It should be removed when RawChannelImpl moves to using asynchronous APIs. 307 @Override 308 DetachedConnectionChannel detachChannel() { 309 client().cancelRegistration(channel()); 310 return new PlainDetachedChannel(this); 311 } 312 313 }