14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 */ 24 package java.net.http; 25 26 import java.io.IOException; 27 import java.net.InetSocketAddress; 28 import java.net.StandardSocketOptions; 29 import java.nio.ByteBuffer; 30 import java.nio.channels.SelectableChannel; 31 import java.nio.channels.SelectionKey; 32 import java.nio.channels.SocketChannel; 33 import java.util.concurrent.CompletableFuture; 34 35 /** 36 * Plain raw TCP connection direct to destination 37 */ 38 class PlainHttpConnection extends HttpConnection { 39 40 protected SocketChannel chan; 41 private volatile boolean connected; 42 private boolean closed; 43 44 class ConnectEvent extends AsyncEvent implements AsyncEvent.Blocking { 45 CompletableFuture<Void> cf; 46 47 ConnectEvent(CompletableFuture<Void> cf) { 48 this.cf = cf; 49 } 50 51 @Override 52 public SelectableChannel channel() { 53 return chan; 54 } 55 56 @Override 57 public int interestOps() { 58 return SelectionKey.OP_CONNECT; 59 } 60 61 @Override 62 public void handle() { 63 try { 64 chan.finishConnect(); 65 } catch (IOException e) { 66 cf.completeExceptionally(e); 67 } 95 } 96 97 @Override 98 SocketChannel channel() { 99 return chan; 100 } 101 102 PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) { 103 super(addr, client); 104 try { 105 this.chan = SocketChannel.open(); 106 int bufsize = client.getReceiveBufferSize(); 107 chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize); 108 } catch (IOException e) { 109 throw new InternalError(e); 110 } 111 } 112 113 @Override 114 long write(ByteBuffer[] buffers, int start, int number) throws IOException { 115 //debugPrint("Send", buffers, start, number); 116 return chan.write(buffers, start, number); 117 } 118 119 @Override 120 long write(ByteBuffer buffer) throws IOException { 121 //debugPrint("Send", buffer); 122 return chan.write(buffer); 123 } 124 125 @Override 126 public String toString() { 127 return "PlainHttpConnection: " + super.toString(); 128 } 129 130 /** 131 * Close this connection 132 */ 133 @Override 134 synchronized void close() { 135 if (closed) 136 return; 137 closed = true; 138 try { 139 Log.logError("Closing: " + toString()); 140 //System.out.println("Closing: " + this); 141 chan.close(); 142 } catch (IOException e) {} 143 } 144 145 @Override 146 protected ByteBuffer readImpl(int length) throws IOException { 147 ByteBuffer buf = getBuffer(); // TODO not using length 148 int n = chan.read(buf); 149 if (n == -1) { 150 return null; 151 } 152 buf.flip(); 153 String s = "Receive (" + n + " bytes) "; 154 //debugPrint(s, buf); 155 return buf; 156 } 157 158 @Override 159 protected int readImpl(ByteBuffer buf) throws IOException { 160 int mark = buf.position(); 161 int n = chan.read(buf); 162 if (n == -1) { 163 return -1; 164 } 165 Utils.flipToMark(buffer, mark); 166 String s = "Receive (" + n + " bytes) "; 167 //debugPrint(s, buf); 168 return n; 169 } 170 171 @Override 172 ConnectionPool.CacheKey cacheKey() { 173 return new ConnectionPool.CacheKey(address, null); 174 } 175 176 @Override 177 synchronized boolean connected() { 178 return connected; 179 } 180 181 class ReceiveResponseEvent extends AsyncEvent implements AsyncEvent.Blocking { 182 CompletableFuture<Void> cf; 183 184 ReceiveResponseEvent(CompletableFuture<Void> cf) { 185 this.cf = cf; 186 } 187 @Override 188 public SelectableChannel channel() { 189 return chan; 190 } 191 192 @Override 193 public void handle() { 194 cf.complete(null); 195 } 196 197 @Override 198 public int interestOps() { 199 return SelectionKey.OP_READ; 200 } 201 202 @Override 203 public void abort() { 204 close(); 205 } 206 } 207 208 @Override 209 boolean isSecure() { 210 return false; 211 } 212 213 @Override 214 boolean isProxied() { 215 return false; 216 } 217 218 @Override 219 CompletableFuture<Void> whenReceivingResponse() { 220 CompletableFuture<Void> cf = new CompletableFuture<>(); 221 try { 222 client.registerEvent(new ReceiveResponseEvent(cf)); 223 } catch (IOException e) { 224 cf.completeExceptionally(e); 225 } 226 return cf; 227 } 228 } | 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 */ 24 package java.net.http; 25 26 import java.io.IOException; 27 import java.net.InetSocketAddress; 28 import java.net.StandardSocketOptions; 29 import java.nio.ByteBuffer; 30 import java.nio.channels.SelectableChannel; 31 import java.nio.channels.SelectionKey; 32 import java.nio.channels.SocketChannel; 33 import java.util.concurrent.CompletableFuture; 34 import java.util.function.Consumer; 35 36 /** 37 * Plain raw TCP connection direct to destination. 2 modes 38 * 1) Blocking used by http/1. In this case the connect is actually non 39 * blocking but the request is sent blocking. The first byte of a response 40 * is received non-blocking and the remainder of the response is received 41 * blocking 42 * 2) Non-blocking. In this case (for http/2) the connection is actually opened 43 * blocking but all reads and writes are done non-blocking under the 44 * control of a Http2Connection object. 45 */ 46 class PlainHttpConnection extends HttpConnection implements AsyncConnection { 47 48 protected SocketChannel chan; 49 private volatile boolean connected; 50 private boolean closed; 51 Consumer<ByteBuffer> asyncReceiver; 52 Consumer<Throwable> errorReceiver; 53 Queue<ByteBuffer> asyncOutputQ; 54 final Object reading = new Object(); 55 final Object writing = new Object(); 56 57 class ConnectEvent extends AsyncEvent { 58 CompletableFuture<Void> cf; 59 60 ConnectEvent(CompletableFuture<Void> cf) { 61 super(AsyncEvent.BLOCKING); 62 this.cf = cf; 63 } 64 65 @Override 66 public SelectableChannel channel() { 67 return chan; 68 } 69 70 @Override 71 public int interestOps() { 72 return SelectionKey.OP_CONNECT; 73 } 74 75 @Override 76 public void handle() { 77 try { 78 chan.finishConnect(); 79 } catch (IOException e) { 80 cf.completeExceptionally(e); 81 } 109 } 110 111 @Override 112 SocketChannel channel() { 113 return chan; 114 } 115 116 PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) { 117 super(addr, client); 118 try { 119 this.chan = SocketChannel.open(); 120 int bufsize = client.getReceiveBufferSize(); 121 chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize); 122 } catch (IOException e) { 123 throw new InternalError(e); 124 } 125 } 126 127 @Override 128 long write(ByteBuffer[] buffers, int start, int number) throws IOException { 129 if (mode != Mode.ASYNC) 130 return chan.write(buffers, start, number); 131 // async 132 synchronized(writing) { 133 int qlen = asyncOutputQ.size(); 134 ByteBuffer[] bufs = Utils.reduce(buffers, start, number); 135 long n = Utils.remaining(bufs); 136 asyncOutputQ.putAll(bufs); 137 if (qlen == 0) 138 asyncOutput(); 139 return n; 140 } 141 } 142 143 ByteBuffer asyncBuffer = null; 144 145 void asyncOutput() { 146 synchronized (writing) { 147 try { 148 while (true) { 149 if (asyncBuffer == null) { 150 asyncBuffer = asyncOutputQ.poll(); 151 if (asyncBuffer == null) { 152 return; 153 } 154 } 155 if (!asyncBuffer.hasRemaining()) { 156 asyncBuffer = null; 157 continue; 158 } 159 int n = chan.write(asyncBuffer); 160 //System.err.printf("Written %d bytes to chan\n", n); 161 if (n == 0) { 162 client.registerEvent(new WriteEvent()); 163 return; 164 } 165 } 166 } catch (IOException e) { 167 shutdown(); 168 } 169 } 170 } 171 172 @Override 173 long write(ByteBuffer buffer) throws IOException { 174 if (mode != Mode.ASYNC) 175 return chan.write(buffer); 176 // async 177 synchronized(writing) { 178 int qlen = asyncOutputQ.size(); 179 long n = buffer.remaining(); 180 asyncOutputQ.put(buffer); 181 if (qlen == 0) 182 asyncOutput(); 183 return n; 184 } 185 } 186 187 @Override 188 public String toString() { 189 return "PlainHttpConnection: " + super.toString(); 190 } 191 192 /** 193 * Close this connection 194 */ 195 @Override 196 public synchronized void close() { 197 if (closed) 198 return; 199 closed = true; 200 try { 201 Log.logError("Closing: " + toString()); 202 //System.out.println("Closing: " + this); 203 chan.close(); 204 } catch (IOException e) {} 205 } 206 207 @Override 208 protected ByteBuffer readImpl(int length) throws IOException { 209 ByteBuffer buf = getBuffer(); // TODO not using length 210 int n = chan.read(buf); 211 if (n == -1) { 212 return null; 213 } 214 buf.flip(); 215 String s = "Receive (" + n + " bytes) "; 216 //debugPrint(s, buf); 217 return buf; 218 } 219 220 void shutdown() { 221 close(); 222 errorReceiver.accept(new IOException("Connection aborted")); 223 } 224 225 void asyncRead() { 226 synchronized (reading) { 227 try { 228 while (true) { 229 ByteBuffer buf = getBuffer(); 230 int n = chan.read(buf); 231 //System.err.printf("Read %d bytes from chan\n", n); 232 if (n == -1) { 233 throw new IOException(); 234 } 235 if (n == 0) { 236 returnBuffer(buf); 237 return; 238 } 239 buf.flip(); 240 asyncReceiver.accept(buf); 241 } 242 } catch (IOException e) { 243 shutdown(); 244 } 245 } 246 } 247 248 @Override 249 protected int readImpl(ByteBuffer buf) throws IOException { 250 int mark = buf.position(); 251 int n; 252 // FIXME: this hack works in conjunction with the corresponding change 253 // in java.net.http.RawChannel.registerEvent 254 if ((n = buffer.remaining()) != 0) { 255 buf.put(buffer); 256 } else { 257 n = chan.read(buf); 258 } 259 if (n == -1) { 260 return -1; 261 } 262 Utils.flipToMark(buf, mark); 263 String s = "Receive (" + n + " bytes) "; 264 //debugPrint(s, buf); 265 return n; 266 } 267 268 @Override 269 ConnectionPool.CacheKey cacheKey() { 270 return new ConnectionPool.CacheKey(address, null); 271 } 272 273 @Override 274 synchronized boolean connected() { 275 return connected; 276 } 277 278 // used for all output in HTTP/2 279 class WriteEvent extends AsyncEvent { 280 WriteEvent() { 281 super(0); 282 } 283 284 @Override 285 public SelectableChannel channel() { 286 return chan; 287 } 288 289 @Override 290 public int interestOps() { 291 return SelectionKey.OP_WRITE; 292 } 293 294 @Override 295 public void handle() { 296 asyncOutput(); 297 } 298 299 @Override 300 public void abort() { 301 shutdown(); 302 } 303 } 304 305 // used for all input in HTTP/2 306 class ReadEvent extends AsyncEvent { 307 ReadEvent() { 308 super(AsyncEvent.REPEATING); // && !BLOCKING 309 } 310 311 @Override 312 public SelectableChannel channel() { 313 return chan; 314 } 315 316 @Override 317 public int interestOps() { 318 return SelectionKey.OP_READ; 319 } 320 321 @Override 322 public void handle() { 323 asyncRead(); 324 } 325 326 @Override 327 public void abort() { 328 shutdown(); 329 } 330 331 } 332 333 // used in blocking channels only 334 class ReceiveResponseEvent extends AsyncEvent { 335 CompletableFuture<Void> cf; 336 337 ReceiveResponseEvent(CompletableFuture<Void> cf) { 338 super(AsyncEvent.BLOCKING); 339 this.cf = cf; 340 } 341 @Override 342 public SelectableChannel channel() { 343 return chan; 344 } 345 346 @Override 347 public void handle() { 348 cf.complete(null); 349 } 350 351 @Override 352 public int interestOps() { 353 return SelectionKey.OP_READ; 354 } 355 356 @Override 357 public void abort() { 358 close(); 359 } 360 } 361 362 @Override 363 boolean isSecure() { 364 return false; 365 } 366 367 @Override 368 boolean isProxied() { 369 return false; 370 } 371 372 @Override 373 public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver, 374 Consumer<Throwable> errorReceiver) { 375 this.asyncReceiver = asyncReceiver; 376 this.errorReceiver = errorReceiver; 377 asyncOutputQ = new Queue<>(); 378 asyncOutputQ.registerPutCallback(this::asyncOutput); 379 try { 380 client.registerEvent(new ReadEvent()); 381 } catch (IOException e) { 382 shutdown(); 383 } 384 } 385 386 @Override 387 CompletableFuture<Void> whenReceivingResponse() { 388 CompletableFuture<Void> cf = new CompletableFuture<>(); 389 try { 390 client.registerEvent(new ReceiveResponseEvent(cf)); 391 } catch (IOException e) { 392 cf.completeExceptionally(e); 393 } 394 return cf; 395 } 396 } |