1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 */ 24 package java.net.http; 25 26 import java.io.IOException; 27 import java.net.InetSocketAddress; 28 import java.net.StandardSocketOptions; 29 import java.nio.ByteBuffer; 30 import java.nio.channels.SelectableChannel; 31 import java.nio.channels.SelectionKey; 32 import java.nio.channels.SocketChannel; 33 import java.util.concurrent.CompletableFuture; 34 import java.util.function.Consumer; 35 36 /** 37 * Plain raw TCP connection direct to destination. 2 modes 38 * 1) Blocking used by http/1. In this case the connect is actually non 39 * blocking but the request is sent blocking. The first byte of a response 40 * is received non-blocking and the remainder of the response is received 41 * blocking 42 * 2) Non-blocking. In this case (for http/2) the connection is actually opened 43 * blocking but all reads and writes are done non-blocking under the 44 * control of a Http2Connection object. 45 */ 46 class PlainHttpConnection extends HttpConnection implements AsyncConnection { 47 48 protected SocketChannel chan; 49 private volatile boolean connected; 50 private boolean closed; 51 Consumer<ByteBuffer> asyncReceiver; 52 Consumer<Throwable> errorReceiver; 53 Queue<ByteBuffer> asyncOutputQ; 54 final Object reading = new Object(); 55 final Object writing = new Object(); 56 57 @Override 58 public void startReading() { 59 try { 60 client.registerEvent(new ReadEvent()); 61 } catch (IOException e) { 62 shutdown(); 63 } 64 } 65 66 class ConnectEvent extends AsyncEvent { 67 CompletableFuture<Void> cf; 68 69 ConnectEvent(CompletableFuture<Void> cf) { 70 super(AsyncEvent.BLOCKING); 71 this.cf = cf; 72 } 73 74 @Override 75 public SelectableChannel channel() { 76 return chan; 77 } 78 79 @Override 80 public int interestOps() { 81 return SelectionKey.OP_CONNECT; 82 } 83 84 @Override 85 public void handle() { 86 try { 87 chan.finishConnect(); 88 } catch (IOException e) { 89 cf.completeExceptionally(e); 90 } 91 connected = true; 92 cf.complete(null); 93 } 94 95 @Override 96 public void abort() { 97 close(); 98 } 99 } 100 101 @Override 102 public CompletableFuture<Void> connectAsync() { 103 CompletableFuture<Void> plainFuture = new CompletableFuture<>(); 104 try { 105 chan.configureBlocking(false); 106 chan.connect(address); 107 client.registerEvent(new ConnectEvent(plainFuture)); 108 } catch (IOException e) { 109 plainFuture.completeExceptionally(e); 110 } 111 return plainFuture; 112 } 113 114 @Override 115 public void connect() throws IOException { 116 chan.connect(address); 117 connected = true; 118 } 119 120 @Override 121 SocketChannel channel() { 122 return chan; 123 } 124 125 PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) { 126 super(addr, client); 127 try { 128 this.chan = SocketChannel.open(); 129 int bufsize = client.getReceiveBufferSize(); 130 chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize); 131 chan.setOption(StandardSocketOptions.TCP_NODELAY, true); 132 } catch (IOException e) { 133 throw new InternalError(e); 134 } 135 } 136 137 @Override 138 long write(ByteBuffer[] buffers, int start, int number) throws IOException { 139 if (mode != Mode.ASYNC) 140 return chan.write(buffers, start, number); 141 // async 142 synchronized(writing) { 143 int qlen = asyncOutputQ.size(); 144 ByteBuffer[] bufs = Utils.reduce(buffers, start, number); 145 long n = Utils.remaining(bufs); 146 asyncOutputQ.putAll(bufs); 147 if (qlen == 0) 148 asyncOutput(); 149 return n; 150 } 151 } 152 153 ByteBuffer asyncBuffer = null; 154 155 void asyncOutput() { 156 synchronized (writing) { 157 try { 158 while (true) { 159 if (asyncBuffer == null) { 160 asyncBuffer = asyncOutputQ.poll(); 161 if (asyncBuffer == null) { 162 return; 163 } 164 } 165 if (!asyncBuffer.hasRemaining()) { 166 asyncBuffer = null; 167 continue; 168 } 169 int n = chan.write(asyncBuffer); 170 //System.err.printf("Written %d bytes to chan\n", n); 171 if (n == 0) { 172 client.registerEvent(new WriteEvent()); 173 return; 174 } 175 } 176 } catch (IOException e) { 177 shutdown(); 178 } 179 } 180 } 181 182 @Override 183 long write(ByteBuffer buffer) throws IOException { 184 if (mode != Mode.ASYNC) 185 return chan.write(buffer); 186 // async 187 synchronized(writing) { 188 int qlen = asyncOutputQ.size(); 189 long n = buffer.remaining(); 190 asyncOutputQ.put(buffer); 191 if (qlen == 0) 192 asyncOutput(); 193 return n; 194 } 195 } 196 197 @Override 198 public String toString() { 199 return "PlainHttpConnection: " + super.toString(); 200 } 201 202 /** 203 * Close this connection 204 */ 205 @Override 206 public synchronized void close() { 207 if (closed) 208 return; 209 closed = true; 210 try { 211 Log.logError("Closing: " + toString()); 212 //System.out.println("Closing: " + this); 213 chan.close(); 214 } catch (IOException e) {} 215 } 216 217 @Override 218 protected ByteBuffer readImpl(int length) throws IOException { 219 ByteBuffer buf = getBuffer(); // TODO not using length 220 int n = chan.read(buf); 221 if (n == -1) { 222 return null; 223 } 224 buf.flip(); 225 String s = "Receive (" + n + " bytes) "; 226 //debugPrint(s, buf); 227 return buf; 228 } 229 230 void shutdown() { 231 close(); 232 errorReceiver.accept(new IOException("Connection aborted")); 233 } 234 235 void asyncRead() { 236 synchronized (reading) { 237 try { 238 while (true) { 239 ByteBuffer buf = getBuffer(); 240 int n = chan.read(buf); 241 //System.err.printf("Read %d bytes from chan\n", n); 242 if (n == -1) { 243 throw new IOException(); 244 } 245 if (n == 0) { 246 returnBuffer(buf); 247 return; 248 } 249 buf.flip(); 250 asyncReceiver.accept(buf); 251 } 252 } catch (IOException e) { 253 shutdown(); 254 } 255 } 256 } 257 258 @Override 259 protected int readImpl(ByteBuffer buf) throws IOException { 260 int mark = buf.position(); 261 int n; 262 // FIXME: this hack works in conjunction with the corresponding change 263 // in java.net.http.RawChannel.registerEvent 264 if ((n = buffer.remaining()) != 0) { 265 buf.put(buffer); 266 } else { 267 n = chan.read(buf); 268 } 269 if (n == -1) { 270 return -1; 271 } 272 Utils.flipToMark(buf, mark); 273 String s = "Receive (" + n + " bytes) "; 274 //debugPrint(s, buf); 275 return n; 276 } 277 278 @Override 279 ConnectionPool.CacheKey cacheKey() { 280 return new ConnectionPool.CacheKey(address, null); 281 } 282 283 @Override 284 synchronized boolean connected() { 285 return connected; 286 } 287 288 // used for all output in HTTP/2 289 class WriteEvent extends AsyncEvent { 290 WriteEvent() { 291 super(0); 292 } 293 294 @Override 295 public SelectableChannel channel() { 296 return chan; 297 } 298 299 @Override 300 public int interestOps() { 301 return SelectionKey.OP_WRITE; 302 } 303 304 @Override 305 public void handle() { 306 asyncOutput(); 307 } 308 309 @Override 310 public void abort() { 311 shutdown(); 312 } 313 } 314 315 // used for all input in HTTP/2 316 class ReadEvent extends AsyncEvent { 317 ReadEvent() { 318 super(AsyncEvent.REPEATING); // && !BLOCKING 319 } 320 321 @Override 322 public SelectableChannel channel() { 323 return chan; 324 } 325 326 @Override 327 public int interestOps() { 328 return SelectionKey.OP_READ; 329 } 330 331 @Override 332 public void handle() { 333 asyncRead(); 334 } 335 336 @Override 337 public void abort() { 338 shutdown(); 339 } 340 341 } 342 343 // used in blocking channels only 344 class ReceiveResponseEvent extends AsyncEvent { 345 CompletableFuture<Void> cf; 346 347 ReceiveResponseEvent(CompletableFuture<Void> cf) { 348 super(AsyncEvent.BLOCKING); 349 this.cf = cf; 350 } 351 @Override 352 public SelectableChannel channel() { 353 return chan; 354 } 355 356 @Override 357 public void handle() { 358 cf.complete(null); 359 } 360 361 @Override 362 public int interestOps() { 363 return SelectionKey.OP_READ; 364 } 365 366 @Override 367 public void abort() { 368 close(); 369 } 370 } 371 372 @Override 373 boolean isSecure() { 374 return false; 375 } 376 377 @Override 378 boolean isProxied() { 379 return false; 380 } 381 382 @Override 383 public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver, 384 Consumer<Throwable> errorReceiver) { 385 this.asyncReceiver = asyncReceiver; 386 this.errorReceiver = errorReceiver; 387 asyncOutputQ = new Queue<>(); 388 asyncOutputQ.registerPutCallback(this::asyncOutput); 389 } 390 391 @Override 392 CompletableFuture<Void> whenReceivingResponse() { 393 CompletableFuture<Void> cf = new CompletableFuture<>(); 394 try { 395 client.registerEvent(new ReceiveResponseEvent(cf)); 396 } catch (IOException e) { 397 cf.completeExceptionally(e); 398 } 399 return cf; 400 } 401 }