1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.net.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.StandardSocketOptions; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.SelectableChannel; 33 import java.nio.channels.SelectionKey; 34 import java.nio.channels.SocketChannel; 35 import java.util.concurrent.CompletableFuture; 36 import java.util.function.Consumer; 37 38 /** 39 * Plain raw TCP connection direct to destination. 2 modes 40 * 1) Blocking used by http/1. In this case the connect is actually non 41 * blocking but the request is sent blocking. The first byte of a response 42 * is received non-blocking and the remainder of the response is received 43 * blocking 44 * 2) Non-blocking. In this case (for http/2) the connection is actually opened 45 * blocking but all reads and writes are done non-blocking under the 46 * control of a Http2Connection object. 47 */ 48 class PlainHttpConnection extends HttpConnection implements AsyncConnection { 49 50 protected SocketChannel chan; 51 private volatile boolean connected; 52 private boolean closed; 53 Consumer<ByteBuffer> asyncReceiver; 54 Consumer<Throwable> errorReceiver; 55 Queue<ByteBuffer> asyncOutputQ; 56 final Object reading = new Object(); 57 final Object writing = new Object(); 58 59 @Override 60 public void startReading() { 61 try { 62 client.registerEvent(new ReadEvent()); 63 } catch (IOException e) { 64 shutdown(); 65 } 66 } 67 68 class ConnectEvent extends AsyncEvent { 69 CompletableFuture<Void> cf; 70 71 ConnectEvent(CompletableFuture<Void> cf) { 72 super(AsyncEvent.BLOCKING); 73 this.cf = cf; 74 } 75 76 @Override 77 public SelectableChannel channel() { 78 return chan; 79 } 80 81 @Override 82 public int interestOps() { 83 return SelectionKey.OP_CONNECT; 84 } 85 86 @Override 87 public void handle() { 88 try { 89 chan.finishConnect(); 90 } catch (IOException e) { 91 cf.completeExceptionally(e); 92 } 93 connected = true; 94 cf.complete(null); 95 } 96 97 @Override 98 public void abort() { 99 close(); 100 } 101 } 102 103 @Override 104 public CompletableFuture<Void> connectAsync() { 105 CompletableFuture<Void> plainFuture = new CompletableFuture<>(); 106 try { 107 chan.configureBlocking(false); 108 chan.connect(address); 109 client.registerEvent(new ConnectEvent(plainFuture)); 110 } catch (IOException e) { 111 plainFuture.completeExceptionally(e); 112 } 113 return plainFuture; 114 } 115 116 @Override 117 public void connect() throws IOException { 118 chan.connect(address); 119 connected = true; 120 } 121 122 @Override 123 SocketChannel channel() { 124 return chan; 125 } 126 127 PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) { 128 super(addr, client); 129 try { 130 this.chan = SocketChannel.open(); 131 int bufsize = client.getReceiveBufferSize(); 132 chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize); 133 chan.setOption(StandardSocketOptions.TCP_NODELAY, true); 134 } catch (IOException e) { 135 throw new InternalError(e); 136 } 137 } 138 139 @Override 140 long write(ByteBuffer[] buffers, int start, int number) throws IOException { 141 if (mode != Mode.ASYNC) 142 return chan.write(buffers, start, number); 143 // async 144 synchronized(writing) { 145 int qlen = asyncOutputQ.size(); 146 ByteBuffer[] bufs = Utils.reduce(buffers, start, number); 147 long n = Utils.remaining(bufs); 148 asyncOutputQ.putAll(bufs); 149 if (qlen == 0) 150 asyncOutput(); 151 return n; 152 } 153 } 154 155 ByteBuffer asyncBuffer = null; 156 157 void asyncOutput() { 158 synchronized (writing) { 159 try { 160 while (true) { 161 if (asyncBuffer == null) { 162 asyncBuffer = asyncOutputQ.poll(); 163 if (asyncBuffer == null) { 164 return; 165 } 166 } 167 if (!asyncBuffer.hasRemaining()) { 168 asyncBuffer = null; 169 continue; 170 } 171 int n = chan.write(asyncBuffer); 172 //System.err.printf("Written %d bytes to chan\n", n); 173 if (n == 0) { 174 client.registerEvent(new WriteEvent()); 175 return; 176 } 177 } 178 } catch (IOException e) { 179 shutdown(); 180 } 181 } 182 } 183 184 @Override 185 long write(ByteBuffer buffer) throws IOException { 186 if (mode != Mode.ASYNC) 187 return chan.write(buffer); 188 // async 189 synchronized(writing) { 190 int qlen = asyncOutputQ.size(); 191 long n = buffer.remaining(); 192 asyncOutputQ.put(buffer); 193 if (qlen == 0) 194 asyncOutput(); 195 return n; 196 } 197 } 198 199 @Override 200 public String toString() { 201 return "PlainHttpConnection: " + super.toString(); 202 } 203 204 /** 205 * Close this connection 206 */ 207 @Override 208 public synchronized void close() { 209 if (closed) 210 return; 211 closed = true; 212 try { 213 Log.logError("Closing: " + toString()); 214 //System.out.println("Closing: " + this); 215 chan.close(); 216 } catch (IOException e) {} 217 } 218 219 @Override 220 protected ByteBuffer readImpl(int length) throws IOException { 221 ByteBuffer buf = getBuffer(); // TODO not using length 222 int n = chan.read(buf); 223 if (n == -1) { 224 return null; 225 } 226 buf.flip(); 227 String s = "Receive (" + n + " bytes) "; 228 //debugPrint(s, buf); 229 return buf; 230 } 231 232 void shutdown() { 233 close(); 234 errorReceiver.accept(new IOException("Connection aborted")); 235 } 236 237 void asyncRead() { 238 synchronized (reading) { 239 try { 240 while (true) { 241 ByteBuffer buf = getBuffer(); 242 int n = chan.read(buf); 243 //System.err.printf("Read %d bytes from chan\n", n); 244 if (n == -1) { 245 throw new IOException(); 246 } 247 if (n == 0) { 248 returnBuffer(buf); 249 return; 250 } 251 buf.flip(); 252 asyncReceiver.accept(buf); 253 } 254 } catch (IOException e) { 255 shutdown(); 256 } 257 } 258 } 259 260 @Override 261 protected int readImpl(ByteBuffer buf) throws IOException { 262 int mark = buf.position(); 263 int n; 264 // FIXME: this hack works in conjunction with the corresponding change 265 // in java.net.http.RawChannel.registerEvent 266 if ((n = buffer.remaining()) != 0) { 267 buf.put(buffer); 268 } else { 269 n = chan.read(buf); 270 } 271 if (n == -1) { 272 return -1; 273 } 274 Utils.flipToMark(buf, mark); 275 String s = "Receive (" + n + " bytes) "; 276 //debugPrint(s, buf); 277 return n; 278 } 279 280 @Override 281 ConnectionPool.CacheKey cacheKey() { 282 return new ConnectionPool.CacheKey(address, null); 283 } 284 285 @Override 286 synchronized boolean connected() { 287 return connected; 288 } 289 290 // used for all output in HTTP/2 291 class WriteEvent extends AsyncEvent { 292 WriteEvent() { 293 super(0); 294 } 295 296 @Override 297 public SelectableChannel channel() { 298 return chan; 299 } 300 301 @Override 302 public int interestOps() { 303 return SelectionKey.OP_WRITE; 304 } 305 306 @Override 307 public void handle() { 308 asyncOutput(); 309 } 310 311 @Override 312 public void abort() { 313 shutdown(); 314 } 315 } 316 317 // used for all input in HTTP/2 318 class ReadEvent extends AsyncEvent { 319 ReadEvent() { 320 super(AsyncEvent.REPEATING); // && !BLOCKING 321 } 322 323 @Override 324 public SelectableChannel channel() { 325 return chan; 326 } 327 328 @Override 329 public int interestOps() { 330 return SelectionKey.OP_READ; 331 } 332 333 @Override 334 public void handle() { 335 asyncRead(); 336 } 337 338 @Override 339 public void abort() { 340 shutdown(); 341 } 342 343 } 344 345 // used in blocking channels only 346 class ReceiveResponseEvent extends AsyncEvent { 347 CompletableFuture<Void> cf; 348 349 ReceiveResponseEvent(CompletableFuture<Void> cf) { 350 super(AsyncEvent.BLOCKING); 351 this.cf = cf; 352 } 353 @Override 354 public SelectableChannel channel() { 355 return chan; 356 } 357 358 @Override 359 public void handle() { 360 cf.complete(null); 361 } 362 363 @Override 364 public int interestOps() { 365 return SelectionKey.OP_READ; 366 } 367 368 @Override 369 public void abort() { 370 close(); 371 } 372 } 373 374 @Override 375 boolean isSecure() { 376 return false; 377 } 378 379 @Override 380 boolean isProxied() { 381 return false; 382 } 383 384 @Override 385 public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver, 386 Consumer<Throwable> errorReceiver) { 387 this.asyncReceiver = asyncReceiver; 388 this.errorReceiver = errorReceiver; 389 asyncOutputQ = new Queue<>(); 390 asyncOutputQ.registerPutCallback(this::asyncOutput); 391 } 392 393 @Override 394 CompletableFuture<Void> whenReceivingResponse() { 395 CompletableFuture<Void> cf = new CompletableFuture<>(); 396 try { 397 client.registerEvent(new ReceiveResponseEvent(cf)); 398 } catch (IOException e) { 399 cf.completeExceptionally(e); 400 } 401 return cf; 402 } 403 }