1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.net.InetSocketAddress;
  30 import java.net.StandardSocketOptions;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.SelectableChannel;
  33 import java.nio.channels.SelectionKey;
  34 import java.nio.channels.SocketChannel;
  35 import java.util.concurrent.CompletableFuture;
  36 import java.util.function.Consumer;
  37 import java.util.function.Supplier;
  38 
  39 import jdk.incubator.http.internal.common.AsyncWriteQueue;
  40 import jdk.incubator.http.internal.common.ByteBufferReference;
  41 import jdk.incubator.http.internal.common.Log;
  42 import jdk.incubator.http.internal.common.MinimalFuture;
  43 import jdk.incubator.http.internal.common.Utils;
  44 
  45 /**
  46  * Plain raw TCP connection direct to destination. 2 modes
  47  * 1) Blocking used by http/1. In this case the connect is actually non
  48  *    blocking but the request is sent blocking. The first byte of a response
  49  *    is received non-blocking and the remainder of the response is received
  50  *    blocking
  51  * 2) Non-blocking. In this case (for http/2) the connection is actually opened
  52  *    blocking but all reads and writes are done non-blocking under the
  53  *    control of a Http2Connection object.
  54  */
  55 class PlainHttpConnection extends HttpConnection implements AsyncConnection {
  56 
  57     protected final SocketChannel chan;
  58     private volatile boolean connected;
  59     private boolean closed;
  60 
  61     // should be volatile to provide proper synchronization(visibility) action
  62     private volatile Consumer<ByteBufferReference> asyncReceiver;
  63     private volatile Consumer<Throwable> errorReceiver;
  64     private volatile Supplier<ByteBufferReference> readBufferSupplier;
  65     private boolean asyncReading;
  66 
  67     private final AsyncWriteQueue asyncOutputQ = new AsyncWriteQueue(this::asyncOutput);
  68 
  69     private final Object reading = new Object();
  70 
  71     @Override
  72     public void startReading() {
  73         try {
  74             synchronized(reading) {
  75                 asyncReading = true;
  76             }
  77             client.registerEvent(new ReadEvent());
  78         } catch (IOException e) {
  79             shutdown();
  80         }
  81     }
  82 
  83     @Override
  84     public void stopAsyncReading() {
  85         synchronized(reading) {
  86             asyncReading = false;
  87         }
  88         client.cancelRegistration(chan);
  89     }
  90 
  91     class ConnectEvent extends AsyncEvent {
  92         CompletableFuture<Void> cf;
  93 
  94         ConnectEvent(CompletableFuture<Void> cf) {
  95             super(AsyncEvent.BLOCKING);
  96             this.cf = cf;
  97         }
  98 
  99         @Override
 100         public SelectableChannel channel() {
 101             return chan;
 102         }
 103 
 104         @Override
 105         public int interestOps() {
 106             return SelectionKey.OP_CONNECT;
 107         }
 108 
 109         @Override
 110         public void handle() {
 111             try {
 112                 chan.finishConnect();
 113             } catch (IOException e) {
 114                 cf.completeExceptionally(e);
 115                 return;
 116             }
 117             connected = true;
 118             cf.complete(null);
 119         }
 120 
 121         @Override
 122         public void abort() {
 123             close();
 124         }
 125     }
 126 
 127     @Override
 128     public CompletableFuture<Void> connectAsync() {
 129         CompletableFuture<Void> plainFuture = new MinimalFuture<>();
 130         try {
 131             chan.configureBlocking(false);
 132             chan.connect(address);
 133             client.registerEvent(new ConnectEvent(plainFuture));
 134         } catch (IOException e) {
 135             plainFuture.completeExceptionally(e);
 136         }
 137         return plainFuture;
 138     }
 139 
 140     @Override
 141     public void connect() throws IOException {
 142         chan.connect(address);
 143         connected = true;
 144     }
 145 
 146     @Override
 147     SocketChannel channel() {
 148         return chan;
 149     }
 150 
 151     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
 152         super(addr, client);
 153         try {
 154             this.chan = SocketChannel.open();
 155             int bufsize = client.getReceiveBufferSize();
 156             chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
 157             chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
 158         } catch (IOException e) {
 159             throw new InternalError(e);
 160         }
 161     }
 162 
 163     @Override
 164     long write(ByteBuffer[] buffers, int start, int number) throws IOException {
 165         if (getMode() != Mode.ASYNC) {
 166             return chan.write(buffers, start, number);
 167         }
 168         // async
 169         buffers = Utils.reduce(buffers, start, number);
 170         long n = Utils.remaining(buffers);
 171         asyncOutputQ.put(ByteBufferReference.toReferences(buffers));
 172         flushAsync();
 173         return n;
 174     }
 175 
 176     @Override
 177     long write(ByteBuffer buffer) throws IOException {
 178         if (getMode() != Mode.ASYNC) {
 179             return chan.write(buffer);
 180         }
 181         // async
 182         long n = buffer.remaining();
 183         asyncOutputQ.put(ByteBufferReference.toReferences(buffer));
 184         flushAsync();
 185         return n;
 186     }
 187 
 188     // handle registered WriteEvent; invoked from SelectorManager thread
 189     void flushRegistered() {
 190         if (getMode() == Mode.ASYNC) {
 191             try {
 192                 asyncOutputQ.flushDelayed();
 193             } catch (IOException e) {
 194                 // Only IOException caused by closed Queue is expected here
 195                 shutdown();
 196             }
 197         }
 198     }
 199 
 200     @Override
 201     public void writeAsync(ByteBufferReference[] buffers) throws IOException {
 202         if (getMode() != Mode.ASYNC) {
 203             chan.write(ByteBufferReference.toBuffers(buffers));
 204             ByteBufferReference.clear(buffers);
 205         } else {
 206             asyncOutputQ.put(buffers);
 207         }
 208     }
 209 
 210     @Override
 211     public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
 212         if (getMode() != Mode.ASYNC) {
 213             chan.write(ByteBufferReference.toBuffers(buffers));
 214             ByteBufferReference.clear(buffers);
 215         } else {
 216             // Unordered frames are sent before existing frames.
 217             asyncOutputQ.putFirst(buffers);
 218         }
 219     }
 220 
 221     @Override
 222     public void flushAsync() throws IOException {
 223         if (getMode() == Mode.ASYNC) {
 224             asyncOutputQ.flush();
 225         }
 226     }
 227 
 228     @Override
 229     public void enableCallback() {
 230         // not used
 231         assert false;
 232     }
 233 
 234     void asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
 235         try {
 236             ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs);
 237             while (Utils.remaining(bufs) > 0) {
 238                 long n = chan.write(bufs);
 239                 if (n == 0) {
 240                     delayCallback.setDelayed(refs);
 241                     client.registerEvent(new WriteEvent());
 242                     return;
 243                 }
 244             }
 245             ByteBufferReference.clear(refs);
 246         } catch (IOException e) {
 247             shutdown();
 248         }
 249     }
 250 
 251     @Override
 252     public String toString() {
 253         return "PlainHttpConnection: " + super.toString();
 254     }
 255 
 256     /**
 257      * Close this connection
 258      */
 259     @Override
 260     public synchronized void close() {
 261         if (closed) {
 262             return;
 263         }
 264         closed = true;
 265         try {
 266             Log.logError("Closing: " + toString());
 267             chan.close();
 268         } catch (IOException e) {}
 269     }
 270 
 271     @Override
 272     void shutdownInput() throws IOException {
 273         chan.shutdownInput();
 274     }
 275 
 276     @Override
 277     void shutdownOutput() throws IOException {
 278         chan.shutdownOutput();
 279     }
 280 
 281     void shutdown() {
 282         close();
 283         errorReceiver.accept(new IOException("Connection aborted"));
 284     }
 285 
 286     void asyncRead() {
 287         synchronized (reading) {
 288             try {
 289                 while (asyncReading) {
 290                     ByteBufferReference buf = readBufferSupplier.get();
 291                     int n = chan.read(buf.get());
 292                     if (n == -1) {
 293                         throw new IOException();
 294                     }
 295                     if (n == 0) {
 296                         buf.clear();
 297                         return;
 298                     }
 299                     buf.get().flip();
 300                     asyncReceiver.accept(buf);
 301                 }
 302             } catch (IOException e) {
 303                 shutdown();
 304             }
 305         }
 306     }
 307 
 308     @Override
 309     protected ByteBuffer readImpl() throws IOException {
 310         ByteBuffer dst = ByteBuffer.allocate(8192);
 311         int n = readImpl(dst);
 312         if (n > 0) {
 313             return dst;
 314         } else if (n == 0) {
 315             return Utils.EMPTY_BYTEBUFFER;
 316         } else {
 317             return null;
 318         }
 319     }
 320 
 321     private int readImpl(ByteBuffer buf) throws IOException {
 322         int mark = buf.position();
 323         int n;
 324         // FIXME: this hack works in conjunction with the corresponding change
 325         // in jdk.incubator.http.RawChannel.registerEvent
 326         //if ((n = buffer.remaining()) != 0) {
 327             //buf.put(buffer);
 328         //} else {
 329             n = chan.read(buf);
 330         //}
 331         if (n == -1) {
 332             return -1;
 333         }
 334         Utils.flipToMark(buf, mark);
 335         // String s = "Receive (" + n + " bytes) ";
 336         //debugPrint(s, buf);
 337         return n;
 338     }
 339 
 340     @Override
 341     ConnectionPool.CacheKey cacheKey() {
 342         return new ConnectionPool.CacheKey(address, null);
 343     }
 344 
 345     @Override
 346     synchronized boolean connected() {
 347         return connected;
 348     }
 349 
 350     // used for all output in HTTP/2
 351     class WriteEvent extends AsyncEvent {
 352         WriteEvent() {
 353             super(0);
 354         }
 355 
 356         @Override
 357         public SelectableChannel channel() {
 358             return chan;
 359         }
 360 
 361         @Override
 362         public int interestOps() {
 363             return SelectionKey.OP_WRITE;
 364         }
 365 
 366         @Override
 367         public void handle() {
 368             flushRegistered();
 369         }
 370 
 371         @Override
 372         public void abort() {
 373             shutdown();
 374         }
 375     }
 376 
 377     // used for all input in HTTP/2
 378     class ReadEvent extends AsyncEvent {
 379         ReadEvent() {
 380             super(AsyncEvent.REPEATING); // && !BLOCKING
 381         }
 382 
 383         @Override
 384         public SelectableChannel channel() {
 385             return chan;
 386         }
 387 
 388         @Override
 389         public int interestOps() {
 390             return SelectionKey.OP_READ;
 391         }
 392 
 393         @Override
 394         public void handle() {
 395             asyncRead();
 396         }
 397 
 398         @Override
 399         public void abort() {
 400             shutdown();
 401         }
 402 
 403         @Override
 404         public String toString() {
 405             return super.toString() + "/" + chan;
 406         }
 407     }
 408 
 409     // used in blocking channels only
 410     class ReceiveResponseEvent extends AsyncEvent {
 411         CompletableFuture<Void> cf;
 412 
 413         ReceiveResponseEvent(CompletableFuture<Void> cf) {
 414             super(AsyncEvent.BLOCKING);
 415             this.cf = cf;
 416         }
 417         @Override
 418         public SelectableChannel channel() {
 419             return chan;
 420         }
 421 
 422         @Override
 423         public void handle() {
 424             cf.complete(null);
 425         }
 426 
 427         @Override
 428         public int interestOps() {
 429             return SelectionKey.OP_READ;
 430         }
 431 
 432         @Override
 433         public void abort() {
 434             close();
 435         }
 436 
 437         @Override
 438         public String toString() {
 439             return super.toString() + "/" + chan;
 440         }
 441     }
 442 
 443     @Override
 444     boolean isSecure() {
 445         return false;
 446     }
 447 
 448     @Override
 449     boolean isProxied() {
 450         return false;
 451     }
 452 
 453     @Override
 454     public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver,
 455                                   Consumer<Throwable> errorReceiver,
 456                                   Supplier<ByteBufferReference> readBufferSupplier) {
 457         this.asyncReceiver = asyncReceiver;
 458         this.errorReceiver = errorReceiver;
 459         this.readBufferSupplier = readBufferSupplier;
 460     }
 461 
 462     @Override
 463     CompletableFuture<Void> whenReceivingResponse() {
 464         CompletableFuture<Void> cf = new MinimalFuture<>();
 465         try {
 466             ReceiveResponseEvent evt = new ReceiveResponseEvent(cf);
 467             client.registerEvent(evt);
 468         } catch (IOException e) {
 469             cf.completeExceptionally(e);
 470         }
 471         return cf;
 472     }
 473 }