9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.StandardSocketOptions; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.SelectableChannel; 33 import java.nio.channels.SelectionKey; 34 import java.nio.channels.SocketChannel; 35 import java.util.concurrent.CompletableFuture; 36 import java.util.function.Consumer; 37 import java.util.function.Supplier; 38 39 import jdk.incubator.http.internal.common.AsyncWriteQueue; 40 import jdk.incubator.http.internal.common.ByteBufferReference; 41 import jdk.incubator.http.internal.common.Log; 42 import jdk.incubator.http.internal.common.MinimalFuture; 43 import jdk.incubator.http.internal.common.Utils; 44 45 /** 46 * Plain raw TCP connection direct to destination. 2 modes 47 * 1) Blocking used by http/1. In this case the connect is actually non 48 * blocking but the request is sent blocking. The first byte of a response 49 * is received non-blocking and the remainder of the response is received 50 * blocking 51 * 2) Non-blocking. In this case (for http/2) the connection is actually opened 52 * blocking but all reads and writes are done non-blocking under the 53 * control of a Http2Connection object. 54 */ 55 class PlainHttpConnection extends HttpConnection implements AsyncConnection { 56 57 protected final SocketChannel chan; 58 private volatile boolean connected; 59 private boolean closed; 60 61 // should be volatile to provide proper synchronization(visibility) action 62 private volatile Consumer<ByteBufferReference> asyncReceiver; 63 private volatile Consumer<Throwable> errorReceiver; 64 private volatile Supplier<ByteBufferReference> readBufferSupplier; 65 private boolean asyncReading; 66 67 private final AsyncWriteQueue asyncOutputQ = new AsyncWriteQueue(this::asyncOutput); 68 69 private final Object reading = new Object(); 70 71 @Override 72 public void startReading() { 73 try { 74 synchronized(reading) { 75 asyncReading = true; 76 } 77 client.registerEvent(new ReadEvent()); 78 } catch (IOException e) { 79 shutdown(); 80 } 81 } 82 83 @Override 84 public void stopAsyncReading() { 85 synchronized(reading) { 86 asyncReading = false; 87 } 88 client.cancelRegistration(chan); 89 } 90 91 class ConnectEvent extends AsyncEvent { 92 CompletableFuture<Void> cf; 93 94 ConnectEvent(CompletableFuture<Void> cf) { 95 super(AsyncEvent.BLOCKING); 96 this.cf = cf; 97 } 98 99 @Override 100 public SelectableChannel channel() { 101 return chan; 102 } 103 104 @Override 105 public int interestOps() { 106 return SelectionKey.OP_CONNECT; 107 } 108 109 @Override 110 public void handle() { 111 try { 112 chan.finishConnect(); 113 } catch (IOException e) { 114 cf.completeExceptionally(e); 115 return; 116 } 117 connected = true; 118 cf.complete(null); 119 } 120 121 @Override 122 public void abort() { 123 close(); 124 } 125 } 126 127 @Override 128 public CompletableFuture<Void> connectAsync() { 129 CompletableFuture<Void> plainFuture = new MinimalFuture<>(); 130 try { 131 chan.configureBlocking(false); 132 chan.connect(address); 133 client.registerEvent(new ConnectEvent(plainFuture)); 134 } catch (IOException e) { 135 plainFuture.completeExceptionally(e); 136 } 137 return plainFuture; 138 } 139 140 @Override 141 public void connect() throws IOException { 142 chan.connect(address); 143 connected = true; 144 } 145 146 @Override 147 SocketChannel channel() { 148 return chan; 149 } 150 151 PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) { 152 super(addr, client); 153 try { 154 this.chan = SocketChannel.open(); 155 int bufsize = client.getReceiveBufferSize(); 156 chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize); 157 chan.setOption(StandardSocketOptions.TCP_NODELAY, true); 158 } catch (IOException e) { 159 throw new InternalError(e); 160 } 161 } 162 163 @Override 164 long write(ByteBuffer[] buffers, int start, int number) throws IOException { 165 if (getMode() != Mode.ASYNC) { 166 return chan.write(buffers, start, number); 167 } 168 // async 169 buffers = Utils.reduce(buffers, start, number); 170 long n = Utils.remaining(buffers); 171 asyncOutputQ.put(ByteBufferReference.toReferences(buffers)); 172 flushAsync(); 173 return n; 174 } 175 176 @Override 177 long write(ByteBuffer buffer) throws IOException { 178 if (getMode() != Mode.ASYNC) { 179 return chan.write(buffer); 180 } 181 // async 182 long n = buffer.remaining(); 183 asyncOutputQ.put(ByteBufferReference.toReferences(buffer)); 184 flushAsync(); 185 return n; 186 } 187 188 // handle registered WriteEvent; invoked from SelectorManager thread 189 void flushRegistered() { 190 if (getMode() == Mode.ASYNC) { 191 try { 192 asyncOutputQ.flushDelayed(); 193 } catch (IOException e) { 194 // Only IOException caused by closed Queue is expected here 195 shutdown(); 196 } 197 } 198 } 199 200 @Override 201 public void writeAsync(ByteBufferReference[] buffers) throws IOException { 202 if (getMode() != Mode.ASYNC) { 203 chan.write(ByteBufferReference.toBuffers(buffers)); 204 ByteBufferReference.clear(buffers); 205 } else { 206 asyncOutputQ.put(buffers); 207 } 208 } 209 210 @Override 211 public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException { 212 if (getMode() != Mode.ASYNC) { 213 chan.write(ByteBufferReference.toBuffers(buffers)); 214 ByteBufferReference.clear(buffers); 215 } else { 216 // Unordered frames are sent before existing frames. 217 asyncOutputQ.putFirst(buffers); 218 } 219 } 220 221 @Override 222 public void flushAsync() throws IOException { 223 if (getMode() == Mode.ASYNC) { 224 asyncOutputQ.flush(); 225 } 226 } 227 228 @Override 229 public void enableCallback() { 230 // not used 231 assert false; 232 } 233 234 boolean asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { 235 try { 236 ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs); 237 while (Utils.remaining(bufs) > 0) { 238 long n = chan.write(bufs); 239 if (n == 0) { 240 delayCallback.setDelayed(refs); 241 client.registerEvent(new WriteEvent()); 242 return false; 243 } 244 } 245 ByteBufferReference.clear(refs); 246 } catch (IOException e) { 247 shutdown(); 248 } 249 return true; 250 } 251 252 @Override 253 public String toString() { 254 return "PlainHttpConnection: " + super.toString(); 255 } 256 257 /** 258 * Close this connection 259 */ 260 @Override 261 public synchronized void close() { 262 if (closed) { 263 return; 264 } 265 closed = true; 266 try { 267 Log.logError("Closing: " + toString()); 268 chan.close(); 269 } catch (IOException e) {} 270 } 271 272 @Override 273 void shutdownInput() throws IOException { 274 chan.shutdownInput(); 275 } 276 277 @Override 278 void shutdownOutput() throws IOException { 279 chan.shutdownOutput(); 280 } 281 282 void shutdown() { 283 close(); 284 errorReceiver.accept(new IOException("Connection aborted")); 285 } 286 287 void asyncRead() { 288 synchronized (reading) { 289 try { 290 while (asyncReading) { 291 ByteBufferReference buf = readBufferSupplier.get(); 292 int n = chan.read(buf.get()); 293 if (n == -1) { 294 throw new IOException(); 295 } 296 if (n == 0) { 297 buf.clear(); 298 return; 299 } 300 buf.get().flip(); 301 asyncReceiver.accept(buf); 302 } 303 } catch (IOException e) { 304 shutdown(); 305 } 306 } 307 } 308 309 @Override 310 protected ByteBuffer readImpl() throws IOException { 311 ByteBuffer dst = ByteBuffer.allocate(8192); 312 int n = readImpl(dst); 313 if (n > 0) { 314 return dst; 315 } else if (n == 0) { 316 return Utils.EMPTY_BYTEBUFFER; 317 } else { 318 return null; 319 } 320 } 321 322 private int readImpl(ByteBuffer buf) throws IOException { 323 int mark = buf.position(); 324 int n; 325 // FIXME: this hack works in conjunction with the corresponding change 326 // in jdk.incubator.http.RawChannel.registerEvent 327 //if ((n = buffer.remaining()) != 0) { 328 //buf.put(buffer); 329 //} else { 330 n = chan.read(buf); 331 //} 332 if (n == -1) { 333 return -1; 334 } 335 Utils.flipToMark(buf, mark); 336 // String s = "Receive (" + n + " bytes) "; 337 //debugPrint(s, buf); 338 return n; 339 } 340 341 @Override 342 ConnectionPool.CacheKey cacheKey() { 343 return new ConnectionPool.CacheKey(address, null); 344 } 345 346 @Override 347 synchronized boolean connected() { 348 return connected; 349 } 350 351 // used for all output in HTTP/2 352 class WriteEvent extends AsyncEvent { 353 WriteEvent() { 354 super(0); 355 } 356 357 @Override 358 public SelectableChannel channel() { 359 return chan; 360 } 361 362 @Override 363 public int interestOps() { 364 return SelectionKey.OP_WRITE; 365 } 366 367 @Override 368 public void handle() { 369 flushRegistered(); 370 } 371 372 @Override 373 public void abort() { 374 shutdown(); 375 } 376 } 377 378 // used for all input in HTTP/2 379 class ReadEvent extends AsyncEvent { 380 ReadEvent() { 381 super(AsyncEvent.REPEATING); // && !BLOCKING 382 } 383 384 @Override 385 public SelectableChannel channel() { 386 return chan; 387 } 388 389 @Override 390 public int interestOps() { 391 return SelectionKey.OP_READ; 392 } 393 394 @Override 395 public void handle() { 396 asyncRead(); 397 } 398 399 @Override 400 public void abort() { 401 shutdown(); 402 } 403 404 @Override 405 public String toString() { 406 return super.toString() + "/" + chan; 407 } 408 } 409 410 // used in blocking channels only 411 class ReceiveResponseEvent extends AsyncEvent { 412 CompletableFuture<Void> cf; 413 414 ReceiveResponseEvent(CompletableFuture<Void> cf) { 415 super(AsyncEvent.BLOCKING); 416 this.cf = cf; 417 } 418 @Override 419 public SelectableChannel channel() { 420 return chan; 421 } 422 423 @Override 424 public void handle() { 425 cf.complete(null); 426 } 427 428 @Override 429 public int interestOps() { 430 return SelectionKey.OP_READ; 431 } 432 433 @Override 434 public void abort() { 435 close(); 436 } 437 438 @Override 439 public String toString() { 440 return super.toString() + "/" + chan; 441 } 442 } 443 444 @Override 445 boolean isSecure() { 446 return false; 447 } 448 449 @Override 450 boolean isProxied() { 451 return false; 452 } 453 454 @Override 455 public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver, 456 Consumer<Throwable> errorReceiver, 457 Supplier<ByteBufferReference> readBufferSupplier) { 458 this.asyncReceiver = asyncReceiver; 459 this.errorReceiver = errorReceiver; 460 this.readBufferSupplier = readBufferSupplier; 461 } 462 463 @Override 464 CompletableFuture<Void> whenReceivingResponse() { 465 CompletableFuture<Void> cf = new MinimalFuture<>(); 466 try { 467 ReceiveResponseEvent evt = new ReceiveResponseEvent(cf); 468 client.registerEvent(evt); 469 } catch (IOException e) { 470 cf.completeExceptionally(e); 471 } 472 return cf; 473 } 474 } | 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.net.InetSocketAddress; 31 import java.net.StandardSocketOptions; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.SelectableChannel; 34 import java.nio.channels.SelectionKey; 35 import java.nio.channels.SocketChannel; 36 import java.security.AccessController; 37 import java.security.PrivilegedActionException; 38 import java.security.PrivilegedExceptionAction; 39 import java.util.concurrent.CompletableFuture; 40 import jdk.incubator.http.internal.common.FlowTube; 41 import jdk.incubator.http.internal.common.Log; 42 import jdk.incubator.http.internal.common.MinimalFuture; 43 import jdk.incubator.http.internal.common.Utils; 44 45 /** 46 * Plain raw TCP connection direct to destination. 47 * The connection operates in asynchronous non-blocking mode. 48 * All reads and writes are done non-blocking. 49 */ 50 class PlainHttpConnection extends HttpConnection { 51 52 private final Object reading = new Object(); 53 protected final SocketChannel chan; 54 private final FlowTube tube; 55 private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading); 56 private volatile boolean connected; 57 private boolean closed; 58 59 // should be volatile to provide proper synchronization(visibility) action 60 61 final class ConnectEvent extends AsyncEvent { 62 private final CompletableFuture<Void> cf; 63 64 ConnectEvent(CompletableFuture<Void> cf) { 65 this.cf = cf; 66 } 67 68 @Override 69 public SelectableChannel channel() { 70 return chan; 71 } 72 73 @Override 74 public int interestOps() { 75 return SelectionKey.OP_CONNECT; 76 } 77 78 @Override 79 public void handle() { 80 assert !connected : "Already connected"; 81 assert !chan.isBlocking() : "Unexpected blocking channel"; 82 try { 83 debug.log(Level.DEBUG, "ConnectEvent: finishing connect"); 84 boolean finished = chan.finishConnect(); 85 assert finished : "Expected channel to be connected"; 86 debug.log(Level.DEBUG, 87 "ConnectEvent: connect finished: %s", finished); 88 connected = true; 89 // complete async since the event runs on the SelectorManager thread 90 cf.completeAsync(() -> null, client().theExecutor()); 91 } catch (IOException e) { 92 client().theExecutor().execute( () -> cf.completeExceptionally(e)); 93 } 94 } 95 96 @Override 97 public void abort(IOException ioe) { 98 close(); 99 client().theExecutor().execute( () -> cf.completeExceptionally(ioe)); 100 } 101 } 102 103 @Override 104 public CompletableFuture<Void> connectAsync() { 105 assert !connected : "Already connected"; 106 assert !chan.isBlocking() : "Unexpected blocking channel"; 107 CompletableFuture<Void> cf = new MinimalFuture<>(); 108 try { 109 boolean finished = false; 110 PrivilegedExceptionAction<Boolean> pa = () -> chan.connect(address); 111 try { 112 finished = AccessController.doPrivileged(pa); 113 } catch (PrivilegedActionException e) { 114 cf.completeExceptionally(e.getCause()); 115 } 116 if (finished) { 117 debug.log(Level.DEBUG, "connect finished without blocking"); 118 connected = true; 119 cf.complete(null); 120 } else { 121 debug.log(Level.DEBUG, "registering connect event"); 122 client().registerEvent(new ConnectEvent(cf)); 123 } 124 } catch (Throwable throwable) { 125 cf.completeExceptionally(throwable); 126 } 127 return cf; 128 } 129 130 @Override 131 SocketChannel channel() { 132 return chan; 133 } 134 135 @Override 136 final FlowTube getConnectionFlow() { 137 return tube; 138 } 139 140 PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) { 141 super(addr, client); 142 try { 143 this.chan = SocketChannel.open(); 144 chan.configureBlocking(false); 145 int bufsize = client.getReceiveBufferSize(); 146 chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize); 147 chan.setOption(StandardSocketOptions.TCP_NODELAY, true); 148 // wrap the connected channel in a Tube for async reading and writing 149 tube = new SocketTube(client(), chan, Utils::getBuffer); 150 } catch (IOException e) { 151 throw new InternalError(e); 152 } 153 } 154 155 @Override 156 HttpPublisher publisher() { return writePublisher; } 157 158 159 @Override 160 public String toString() { 161 return "PlainHttpConnection: " + super.toString(); 162 } 163 164 /** 165 * Closes this connection 166 */ 167 @Override 168 public synchronized void close() { 169 if (closed) { 170 return; 171 } 172 closed = true; 173 try { 174 Log.logTrace("Closing: " + toString()); 175 chan.close(); 176 } catch (IOException e) {} 177 } 178 179 @Override 180 void shutdownInput() throws IOException { 181 debug.log(Level.DEBUG, "Shutting down input"); 182 chan.shutdownInput(); 183 } 184 185 @Override 186 void shutdownOutput() throws IOException { 187 debug.log(Level.DEBUG, "Shutting down output"); 188 chan.shutdownOutput(); 189 } 190 191 @Override 192 ConnectionPool.CacheKey cacheKey() { 193 return new ConnectionPool.CacheKey(address, null); 194 } 195 196 @Override 197 synchronized boolean connected() { 198 return connected; 199 } 200 201 202 @Override 203 boolean isSecure() { 204 return false; 205 } 206 207 @Override 208 boolean isProxied() { 209 return false; 210 } 211 212 // Support for WebSocket/RawChannelImpl which unfortunately 213 // still depends on synchronous read/writes. 214 // It should be removed when RawChannelImpl moves to using asynchronous APIs. 215 private static final class PlainDetachedChannel 216 extends DetachedConnectionChannel { 217 final PlainHttpConnection plainConnection; 218 boolean closed; 219 PlainDetachedChannel(PlainHttpConnection conn) { 220 // We're handing the connection channel over to a web socket. 221 // We need the selector manager's thread to stay alive until 222 // the WebSocket is closed. 223 conn.client().webSocketOpen(); 224 this.plainConnection = conn; 225 } 226 227 @Override 228 SocketChannel channel() { 229 return plainConnection.channel(); 230 } 231 232 @Override 233 ByteBuffer read() throws IOException { 234 ByteBuffer dst = ByteBuffer.allocate(8192); 235 int n = readImpl(dst); 236 if (n > 0) { 237 return dst; 238 } else if (n == 0) { 239 return Utils.EMPTY_BYTEBUFFER; 240 } else { 241 return null; 242 } 243 } 244 245 @Override 246 public void close() { 247 HttpClientImpl client = plainConnection.client(); 248 try { 249 plainConnection.close(); 250 } finally { 251 // notify the HttpClientImpl that the websocket is no 252 // no longer operating. 253 synchronized(this) { 254 if (closed == true) return; 255 closed = true; 256 } 257 client.webSocketClose(); 258 } 259 } 260 261 @Override 262 public long write(ByteBuffer[] buffers, int start, int number) 263 throws IOException 264 { 265 return channel().write(buffers, start, number); 266 } 267 268 @Override 269 public void shutdownInput() throws IOException { 270 plainConnection.shutdownInput(); 271 } 272 273 @Override 274 public void shutdownOutput() throws IOException { 275 plainConnection.shutdownOutput(); 276 } 277 278 private int readImpl(ByteBuffer buf) throws IOException { 279 int mark = buf.position(); 280 int n; 281 n = channel().read(buf); 282 if (n == -1) { 283 return -1; 284 } 285 Utils.flipToMark(buf, mark); 286 return n; 287 } 288 } 289 290 // Support for WebSocket/RawChannelImpl which unfortunately 291 // still depends on synchronous read/writes. 292 // It should be removed when RawChannelImpl moves to using asynchronous APIs. 293 @Override 294 DetachedConnectionChannel detachChannel() { 295 client().cancelRegistration(channel()); 296 return new PlainDetachedChannel(this); 297 } 298 299 } |