1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.StandardSocketOptions; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.SelectableChannel; 33 import java.nio.channels.SelectionKey; 34 import java.nio.channels.SocketChannel; 35 import java.util.concurrent.CompletableFuture; 36 import java.util.function.Consumer; 37 import java.util.function.Supplier; 38 39 import jdk.incubator.http.internal.common.AsyncWriteQueue; 40 import jdk.incubator.http.internal.common.ByteBufferReference; 41 import jdk.incubator.http.internal.common.Log; 42 import jdk.incubator.http.internal.common.MinimalFuture; 43 import jdk.incubator.http.internal.common.Utils; 44 45 /** 46 * Plain raw TCP connection direct to destination. 2 modes 47 * 1) Blocking used by http/1. In this case the connect is actually non 48 * blocking but the request is sent blocking. The first byte of a response 49 * is received non-blocking and the remainder of the response is received 50 * blocking 51 * 2) Non-blocking. In this case (for http/2) the connection is actually opened 52 * blocking but all reads and writes are done non-blocking under the 53 * control of a Http2Connection object. 54 */ 55 class PlainHttpConnection extends HttpConnection implements AsyncConnection { 56 57 protected final SocketChannel chan; 58 private volatile boolean connected; 59 private boolean closed; 60 61 // should be volatile to provide proper synchronization(visibility) action 62 private volatile Consumer<ByteBufferReference> asyncReceiver; 63 private volatile Consumer<Throwable> errorReceiver; 64 private volatile Supplier<ByteBufferReference> readBufferSupplier; 65 private boolean asyncReading; 66 67 private final AsyncWriteQueue asyncOutputQ = new AsyncWriteQueue(this::asyncOutput); 68 69 private final Object reading = new Object(); 70 71 @Override 72 public void startReading() { 73 try { 74 synchronized(reading) { 75 asyncReading = true; 76 } 77 client.registerEvent(new ReadEvent()); 78 } catch (IOException e) { 79 shutdown(); 80 } 81 } 82 83 @Override 84 public void stopAsyncReading() { 85 synchronized(reading) { 86 asyncReading = false; 87 } 88 client.cancelRegistration(chan); 89 } 90 91 class ConnectEvent extends AsyncEvent { 92 CompletableFuture<Void> cf; 93 94 ConnectEvent(CompletableFuture<Void> cf) { 95 super(AsyncEvent.BLOCKING); 96 this.cf = cf; 97 } 98 99 @Override 100 public SelectableChannel channel() { 101 return chan; 102 } 103 104 @Override 105 public int interestOps() { 106 return SelectionKey.OP_CONNECT; 107 } 108 109 @Override 110 public void handle() { 111 try { 112 chan.finishConnect(); 113 } catch (IOException e) { 114 cf.completeExceptionally(e); 115 return; 116 } 117 connected = true; 118 cf.complete(null); 119 } 120 121 @Override 122 public void abort() { 123 close(); 124 } 125 } 126 127 @Override 128 public CompletableFuture<Void> connectAsync() { 129 CompletableFuture<Void> plainFuture = new MinimalFuture<>(); 130 try { 131 chan.configureBlocking(false); 132 chan.connect(address); 133 client.registerEvent(new ConnectEvent(plainFuture)); 134 } catch (IOException e) { 135 plainFuture.completeExceptionally(e); 136 } 137 return plainFuture; 138 } 139 140 @Override 141 public void connect() throws IOException { 142 chan.connect(address); 143 connected = true; 144 } 145 146 @Override 147 SocketChannel channel() { 148 return chan; 149 } 150 151 PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) { 152 super(addr, client); 153 try { 154 this.chan = SocketChannel.open(); 155 int bufsize = client.getReceiveBufferSize(); 156 chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize); 157 chan.setOption(StandardSocketOptions.TCP_NODELAY, true); 158 } catch (IOException e) { 159 throw new InternalError(e); 160 } 161 } 162 163 @Override 164 long write(ByteBuffer[] buffers, int start, int number) throws IOException { 165 if (getMode() != Mode.ASYNC) { 166 return chan.write(buffers, start, number); 167 } 168 // async 169 buffers = Utils.reduce(buffers, start, number); 170 long n = Utils.remaining(buffers); 171 asyncOutputQ.put(ByteBufferReference.toReferences(buffers)); 172 flushAsync(); 173 return n; 174 } 175 176 @Override 177 long write(ByteBuffer buffer) throws IOException { 178 if (getMode() != Mode.ASYNC) { 179 return chan.write(buffer); 180 } 181 // async 182 long n = buffer.remaining(); 183 asyncOutputQ.put(ByteBufferReference.toReferences(buffer)); 184 flushAsync(); 185 return n; 186 } 187 188 // handle registered WriteEvent; invoked from SelectorManager thread 189 void flushRegistered() { 190 if (getMode() == Mode.ASYNC) { 191 try { 192 asyncOutputQ.flushDelayed(); 193 } catch (IOException e) { 194 // Only IOException caused by closed Queue is expected here 195 shutdown(); 196 } 197 } 198 } 199 200 @Override 201 public void writeAsync(ByteBufferReference[] buffers) throws IOException { 202 if (getMode() != Mode.ASYNC) { 203 chan.write(ByteBufferReference.toBuffers(buffers)); 204 ByteBufferReference.clear(buffers); 205 } else { 206 asyncOutputQ.put(buffers); 207 } 208 } 209 210 @Override 211 public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException { 212 if (getMode() != Mode.ASYNC) { 213 chan.write(ByteBufferReference.toBuffers(buffers)); 214 ByteBufferReference.clear(buffers); 215 } else { 216 // Unordered frames are sent before existing frames. 217 asyncOutputQ.putFirst(buffers); 218 } 219 } 220 221 @Override 222 public void flushAsync() throws IOException { 223 if (getMode() == Mode.ASYNC) { 224 asyncOutputQ.flush(); 225 } 226 } 227 228 @Override 229 public void enableCallback() { 230 // not used 231 assert false; 232 } 233 234 void asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { 235 try { 236 ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs); 237 while (Utils.remaining(bufs) > 0) { 238 long n = chan.write(bufs); 239 if (n == 0) { 240 delayCallback.setDelayed(refs); 241 client.registerEvent(new WriteEvent()); 242 return; 243 } 244 } 245 ByteBufferReference.clear(refs); 246 } catch (IOException e) { 247 shutdown(); 248 } 249 } 250 251 @Override 252 public String toString() { 253 return "PlainHttpConnection: " + super.toString(); 254 } 255 256 /** 257 * Close this connection 258 */ 259 @Override 260 public synchronized void close() { 261 if (closed) { 262 return; 263 } 264 closed = true; 265 try { 266 Log.logError("Closing: " + toString()); 267 chan.close(); 268 } catch (IOException e) {} 269 } 270 271 @Override 272 void shutdownInput() throws IOException { 273 chan.shutdownInput(); 274 } 275 276 @Override 277 void shutdownOutput() throws IOException { 278 chan.shutdownOutput(); 279 } 280 281 void shutdown() { 282 close(); 283 errorReceiver.accept(new IOException("Connection aborted")); 284 } 285 286 void asyncRead() { 287 synchronized (reading) { 288 try { 289 while (asyncReading) { 290 ByteBufferReference buf = readBufferSupplier.get(); 291 int n = chan.read(buf.get()); 292 if (n == -1) { 293 throw new IOException(); 294 } 295 if (n == 0) { 296 buf.clear(); 297 return; 298 } 299 buf.get().flip(); 300 asyncReceiver.accept(buf); 301 } 302 } catch (IOException e) { 303 shutdown(); 304 } 305 } 306 } 307 308 @Override 309 protected ByteBuffer readImpl() throws IOException { 310 ByteBuffer dst = ByteBuffer.allocate(8192); 311 int n = readImpl(dst); 312 if (n > 0) { 313 return dst; 314 } else if (n == 0) { 315 return Utils.EMPTY_BYTEBUFFER; 316 } else { 317 return null; 318 } 319 } 320 321 private int readImpl(ByteBuffer buf) throws IOException { 322 int mark = buf.position(); 323 int n; 324 // FIXME: this hack works in conjunction with the corresponding change 325 // in jdk.incubator.http.RawChannel.registerEvent 326 //if ((n = buffer.remaining()) != 0) { 327 //buf.put(buffer); 328 //} else { 329 n = chan.read(buf); 330 //} 331 if (n == -1) { 332 return -1; 333 } 334 Utils.flipToMark(buf, mark); 335 // String s = "Receive (" + n + " bytes) "; 336 //debugPrint(s, buf); 337 return n; 338 } 339 340 @Override 341 ConnectionPool.CacheKey cacheKey() { 342 return new ConnectionPool.CacheKey(address, null); 343 } 344 345 @Override 346 synchronized boolean connected() { 347 return connected; 348 } 349 350 // used for all output in HTTP/2 351 class WriteEvent extends AsyncEvent { 352 WriteEvent() { 353 super(0); 354 } 355 356 @Override 357 public SelectableChannel channel() { 358 return chan; 359 } 360 361 @Override 362 public int interestOps() { 363 return SelectionKey.OP_WRITE; 364 } 365 366 @Override 367 public void handle() { 368 flushRegistered(); 369 } 370 371 @Override 372 public void abort() { 373 shutdown(); 374 } 375 } 376 377 // used for all input in HTTP/2 378 class ReadEvent extends AsyncEvent { 379 ReadEvent() { 380 super(AsyncEvent.REPEATING); // && !BLOCKING 381 } 382 383 @Override 384 public SelectableChannel channel() { 385 return chan; 386 } 387 388 @Override 389 public int interestOps() { 390 return SelectionKey.OP_READ; 391 } 392 393 @Override 394 public void handle() { 395 asyncRead(); 396 } 397 398 @Override 399 public void abort() { 400 shutdown(); 401 } 402 403 @Override 404 public String toString() { 405 return super.toString() + "/" + chan; 406 } 407 } 408 409 // used in blocking channels only 410 class ReceiveResponseEvent extends AsyncEvent { 411 CompletableFuture<Void> cf; 412 413 ReceiveResponseEvent(CompletableFuture<Void> cf) { 414 super(AsyncEvent.BLOCKING); 415 this.cf = cf; 416 } 417 @Override 418 public SelectableChannel channel() { 419 return chan; 420 } 421 422 @Override 423 public void handle() { 424 cf.complete(null); 425 } 426 427 @Override 428 public int interestOps() { 429 return SelectionKey.OP_READ; 430 } 431 432 @Override 433 public void abort() { 434 close(); 435 } 436 437 @Override 438 public String toString() { 439 return super.toString() + "/" + chan; 440 } 441 } 442 443 @Override 444 boolean isSecure() { 445 return false; 446 } 447 448 @Override 449 boolean isProxied() { 450 return false; 451 } 452 453 @Override 454 public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver, 455 Consumer<Throwable> errorReceiver, 456 Supplier<ByteBufferReference> readBufferSupplier) { 457 this.asyncReceiver = asyncReceiver; 458 this.errorReceiver = errorReceiver; 459 this.readBufferSupplier = readBufferSupplier; 460 } 461 462 @Override 463 CompletableFuture<Void> whenReceivingResponse() { 464 CompletableFuture<Void> cf = new MinimalFuture<>(); 465 try { 466 ReceiveResponseEvent evt = new ReceiveResponseEvent(cf); 467 client.registerEvent(evt); 468 } catch (IOException e) { 469 cf.completeExceptionally(e); 470 } 471 return cf; 472 } 473 }