1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  */
  24 package java.net.http;
  25 
  26 import java.io.IOException;
  27 import java.net.InetSocketAddress;
  28 import java.net.StandardSocketOptions;
  29 import java.nio.ByteBuffer;
  30 import java.nio.channels.SelectableChannel;
  31 import java.nio.channels.SelectionKey;
  32 import java.nio.channels.SocketChannel;
  33 import java.util.concurrent.CompletableFuture;
  34 import java.util.function.Consumer;
  35 
  36 /**
  37  * Plain raw TCP connection direct to destination. 2 modes
  38  * 1) Blocking used by http/1. In this case the connect is actually non
  39  *    blocking but the request is sent blocking. The first byte of a response
  40  *    is received non-blocking and the remainder of the response is received
  41  *    blocking
  42  * 2) Non-blocking. In this case (for http/2) the connection is actually opened
  43  *    blocking but all reads and writes are done non-blocking under the
  44  *    control of a Http2Connection object.
  45  */
  46 class PlainHttpConnection extends HttpConnection implements AsyncConnection {
  47 
  48     protected SocketChannel chan;
  49     private volatile boolean connected;
  50     private boolean closed;
  51     Consumer<ByteBuffer> asyncReceiver;
  52     Consumer<Throwable> errorReceiver;
  53     Queue<ByteBuffer> asyncOutputQ;
  54     final Object reading = new Object();
  55     final Object writing = new Object();
  56 
  57     @Override
  58     public void startReading() {
  59         try {
  60             client.registerEvent(new ReadEvent());
  61         } catch (IOException e) {
  62             shutdown();
  63         }
  64     }
  65 
  66     class ConnectEvent extends AsyncEvent {
  67         CompletableFuture<Void> cf;
  68 
  69         ConnectEvent(CompletableFuture<Void> cf) {
  70             super(AsyncEvent.BLOCKING);
  71             this.cf = cf;
  72         }
  73 
  74         @Override
  75         public SelectableChannel channel() {
  76             return chan;
  77         }
  78 
  79         @Override
  80         public int interestOps() {
  81             return SelectionKey.OP_CONNECT;
  82         }
  83 
  84         @Override
  85         public void handle() {
  86             try {
  87                 chan.finishConnect();
  88             } catch (IOException e) {
  89                 cf.completeExceptionally(e);
  90             }
  91             connected = true;
  92             cf.complete(null);
  93         }
  94 
  95         @Override
  96         public void abort() {
  97             close();
  98         }
  99     }
 100 
 101     @Override
 102     public CompletableFuture<Void> connectAsync() {
 103         CompletableFuture<Void> plainFuture = new CompletableFuture<>();
 104         try {
 105             chan.configureBlocking(false);
 106             chan.connect(address);
 107             client.registerEvent(new ConnectEvent(plainFuture));
 108         } catch (IOException e) {
 109             plainFuture.completeExceptionally(e);
 110         }
 111         return plainFuture;
 112     }
 113 
 114     @Override
 115     public void connect() throws IOException {
 116         chan.connect(address);
 117         connected = true;
 118     }
 119 
 120     @Override
 121     SocketChannel channel() {
 122         return chan;
 123     }
 124 
 125     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
 126         super(addr, client);
 127         try {
 128             this.chan = SocketChannel.open();
 129             int bufsize = client.getReceiveBufferSize();
 130             chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
 131             chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
 132         } catch (IOException e) {
 133             throw new InternalError(e);
 134         }
 135     }
 136 
 137     @Override
 138     long write(ByteBuffer[] buffers, int start, int number) throws IOException {
 139         if (mode != Mode.ASYNC)
 140             return chan.write(buffers, start, number);
 141         // async
 142         synchronized(writing) {
 143             int qlen = asyncOutputQ.size();
 144             ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
 145             long n = Utils.remaining(bufs);
 146             asyncOutputQ.putAll(bufs);
 147             if (qlen == 0)
 148                 asyncOutput();
 149             return n;
 150         }
 151     }
 152 
 153     ByteBuffer asyncBuffer = null;
 154 
 155     void asyncOutput() {
 156         synchronized (writing) {
 157             try {
 158                 while (true) {
 159                     if (asyncBuffer == null) {
 160                         asyncBuffer = asyncOutputQ.poll();
 161                         if (asyncBuffer == null) {
 162                             return;
 163                         }
 164                     }
 165                     if (!asyncBuffer.hasRemaining()) {
 166                         asyncBuffer = null;
 167                         continue;
 168                     }
 169                     int n = chan.write(asyncBuffer);
 170                     //System.err.printf("Written %d bytes to chan\n", n);
 171                     if (n == 0) {
 172                         client.registerEvent(new WriteEvent());
 173                         return;
 174                     }
 175                 }
 176             } catch (IOException e) {
 177                 shutdown();
 178             }
 179         }
 180     }
 181 
 182     @Override
 183     long write(ByteBuffer buffer) throws IOException {
 184         if (mode != Mode.ASYNC)
 185             return chan.write(buffer);
 186         // async
 187         synchronized(writing) {
 188             int qlen = asyncOutputQ.size();
 189             long n = buffer.remaining();
 190             asyncOutputQ.put(buffer);
 191             if (qlen == 0)
 192                 asyncOutput();
 193             return n;
 194         }
 195     }
 196 
 197     @Override
 198     public String toString() {
 199         return "PlainHttpConnection: " + super.toString();
 200     }
 201 
 202     /**
 203      * Close this connection
 204      */
 205     @Override
 206     public synchronized void close() {
 207         if (closed)
 208             return;
 209         closed = true;
 210         try {
 211             Log.logError("Closing: " + toString());
 212             //System.out.println("Closing: " + this);
 213             chan.close();
 214         } catch (IOException e) {}
 215     }
 216 
 217     @Override
 218     protected ByteBuffer readImpl(int length) throws IOException {
 219         ByteBuffer buf = getBuffer(); // TODO not using length
 220         int n = chan.read(buf);
 221         if (n == -1) {
 222             return null;
 223         }
 224         buf.flip();
 225         String s = "Receive (" + n + " bytes) ";
 226         //debugPrint(s, buf);
 227         return buf;
 228     }
 229 
 230     void shutdown() {
 231         close();
 232         errorReceiver.accept(new IOException("Connection aborted"));
 233     }
 234 
 235     void asyncRead() {
 236         synchronized (reading) {
 237             try {
 238                 while (true) {
 239                     ByteBuffer buf = getBuffer();
 240                     int n = chan.read(buf);
 241                     //System.err.printf("Read %d bytes from chan\n", n);
 242                     if (n == -1) {
 243                         throw new IOException();
 244                     }
 245                     if (n == 0) {
 246                         returnBuffer(buf);
 247                         return;
 248                     }
 249                     buf.flip();
 250                     asyncReceiver.accept(buf);
 251                 }
 252             } catch (IOException e) {
 253                 shutdown();
 254             }
 255         }
 256     }
 257 
 258     @Override
 259     protected int readImpl(ByteBuffer buf) throws IOException {
 260         int mark = buf.position();
 261         int n;
 262         // FIXME: this hack works in conjunction with the corresponding change
 263         // in java.net.http.RawChannel.registerEvent
 264         if ((n = buffer.remaining()) != 0) {
 265             buf.put(buffer);
 266         } else {
 267             n = chan.read(buf);
 268         }
 269         if (n == -1) {
 270             return -1;
 271         }
 272         Utils.flipToMark(buf, mark);
 273         String s = "Receive (" + n + " bytes) ";
 274         //debugPrint(s, buf);
 275         return n;
 276     }
 277 
 278     @Override
 279     ConnectionPool.CacheKey cacheKey() {
 280         return new ConnectionPool.CacheKey(address, null);
 281     }
 282 
 283     @Override
 284     synchronized boolean connected() {
 285         return connected;
 286     }
 287 
 288     // used for all output in HTTP/2
 289     class WriteEvent extends AsyncEvent {
 290         WriteEvent() {
 291             super(0);
 292         }
 293 
 294         @Override
 295         public SelectableChannel channel() {
 296             return chan;
 297         }
 298 
 299         @Override
 300         public int interestOps() {
 301             return SelectionKey.OP_WRITE;
 302         }
 303 
 304         @Override
 305         public void handle() {
 306             asyncOutput();
 307         }
 308 
 309         @Override
 310         public void abort() {
 311             shutdown();
 312         }
 313     }
 314 
 315     // used for all input in HTTP/2
 316     class ReadEvent extends AsyncEvent {
 317         ReadEvent() {
 318             super(AsyncEvent.REPEATING); // && !BLOCKING
 319         }
 320 
 321         @Override
 322         public SelectableChannel channel() {
 323             return chan;
 324         }
 325 
 326         @Override
 327         public int interestOps() {
 328             return SelectionKey.OP_READ;
 329         }
 330 
 331         @Override
 332         public void handle() {
 333             asyncRead();
 334         }
 335 
 336         @Override
 337         public void abort() {
 338             shutdown();
 339         }
 340 
 341     }
 342 
 343     // used in blocking channels only
 344     class ReceiveResponseEvent extends AsyncEvent {
 345         CompletableFuture<Void> cf;
 346 
 347         ReceiveResponseEvent(CompletableFuture<Void> cf) {
 348             super(AsyncEvent.BLOCKING);
 349             this.cf = cf;
 350         }
 351         @Override
 352         public SelectableChannel channel() {
 353             return chan;
 354         }
 355 
 356         @Override
 357         public void handle() {
 358             cf.complete(null);
 359         }
 360 
 361         @Override
 362         public int interestOps() {
 363             return SelectionKey.OP_READ;
 364         }
 365 
 366         @Override
 367         public void abort() {
 368             close();
 369         }
 370     }
 371 
 372     @Override
 373     boolean isSecure() {
 374         return false;
 375     }
 376 
 377     @Override
 378     boolean isProxied() {
 379         return false;
 380     }
 381 
 382     @Override
 383     public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver,
 384             Consumer<Throwable> errorReceiver) {
 385         this.asyncReceiver = asyncReceiver;
 386         this.errorReceiver = errorReceiver;
 387         asyncOutputQ = new Queue<>();
 388         asyncOutputQ.registerPutCallback(this::asyncOutput);
 389     }
 390 
 391     @Override
 392     CompletableFuture<Void> whenReceivingResponse() {
 393         CompletableFuture<Void> cf = new CompletableFuture<>();
 394         try {
 395             client.registerEvent(new ReceiveResponseEvent(cf));
 396         } catch (IOException e) {
 397             cf.completeExceptionally(e);
 398         }
 399         return cf;
 400     }
 401 }