1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.net.http;
  27 
  28 import java.io.IOException;
  29 import java.net.InetSocketAddress;
  30 import java.net.StandardSocketOptions;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.SelectableChannel;
  33 import java.nio.channels.SelectionKey;
  34 import java.nio.channels.SocketChannel;
  35 import java.util.concurrent.CompletableFuture;
  36 import java.util.function.Consumer;
  37 
  38 /**
  39  * Plain raw TCP connection direct to destination. 2 modes
  40  * 1) Blocking used by http/1. In this case the connect is actually non
  41  *    blocking but the request is sent blocking. The first byte of a response
  42  *    is received non-blocking and the remainder of the response is received
  43  *    blocking
  44  * 2) Non-blocking. In this case (for http/2) the connection is actually opened
  45  *    blocking but all reads and writes are done non-blocking under the
  46  *    control of a Http2Connection object.
  47  */
  48 class PlainHttpConnection extends HttpConnection implements AsyncConnection {
  49 
  50     protected SocketChannel chan;
  51     private volatile boolean connected;
  52     private boolean closed;
  53     Consumer<ByteBuffer> asyncReceiver;
  54     Consumer<Throwable> errorReceiver;
  55     Queue<ByteBuffer> asyncOutputQ;
  56     final Object reading = new Object();
  57     final Object writing = new Object();
  58 
  59     @Override
  60     public void startReading() {
  61         try {
  62             client.registerEvent(new ReadEvent());
  63         } catch (IOException e) {
  64             shutdown();
  65         }
  66     }
  67 
  68     class ConnectEvent extends AsyncEvent {
  69         CompletableFuture<Void> cf;
  70 
  71         ConnectEvent(CompletableFuture<Void> cf) {
  72             super(AsyncEvent.BLOCKING);
  73             this.cf = cf;
  74         }
  75 
  76         @Override
  77         public SelectableChannel channel() {
  78             return chan;
  79         }
  80 
  81         @Override
  82         public int interestOps() {
  83             return SelectionKey.OP_CONNECT;
  84         }
  85 
  86         @Override
  87         public void handle() {
  88             try {
  89                 chan.finishConnect();
  90             } catch (IOException e) {
  91                 cf.completeExceptionally(e);
  92             }
  93             connected = true;
  94             cf.complete(null);
  95         }
  96 
  97         @Override
  98         public void abort() {
  99             close();
 100         }
 101     }
 102 
 103     @Override
 104     public CompletableFuture<Void> connectAsync() {
 105         CompletableFuture<Void> plainFuture = new CompletableFuture<>();
 106         try {
 107             chan.configureBlocking(false);
 108             chan.connect(address);
 109             client.registerEvent(new ConnectEvent(plainFuture));
 110         } catch (IOException e) {
 111             plainFuture.completeExceptionally(e);
 112         }
 113         return plainFuture;
 114     }
 115 
 116     @Override
 117     public void connect() throws IOException {
 118         chan.connect(address);
 119         connected = true;
 120     }
 121 
 122     @Override
 123     SocketChannel channel() {
 124         return chan;
 125     }
 126 
 127     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
 128         super(addr, client);
 129         try {
 130             this.chan = SocketChannel.open();
 131             int bufsize = client.getReceiveBufferSize();
 132             chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
 133             chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
 134         } catch (IOException e) {
 135             throw new InternalError(e);
 136         }
 137     }
 138 
 139     @Override
 140     long write(ByteBuffer[] buffers, int start, int number) throws IOException {
 141         if (mode != Mode.ASYNC)
 142             return chan.write(buffers, start, number);
 143         // async
 144         synchronized(writing) {
 145             int qlen = asyncOutputQ.size();
 146             ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
 147             long n = Utils.remaining(bufs);
 148             asyncOutputQ.putAll(bufs);
 149             if (qlen == 0)
 150                 asyncOutput();
 151             return n;
 152         }
 153     }
 154 
 155     ByteBuffer asyncBuffer = null;
 156 
 157     void asyncOutput() {
 158         synchronized (writing) {
 159             try {
 160                 while (true) {
 161                     if (asyncBuffer == null) {
 162                         asyncBuffer = asyncOutputQ.poll();
 163                         if (asyncBuffer == null) {
 164                             return;
 165                         }
 166                     }
 167                     if (!asyncBuffer.hasRemaining()) {
 168                         asyncBuffer = null;
 169                         continue;
 170                     }
 171                     int n = chan.write(asyncBuffer);
 172                     //System.err.printf("Written %d bytes to chan\n", n);
 173                     if (n == 0) {
 174                         client.registerEvent(new WriteEvent());
 175                         return;
 176                     }
 177                 }
 178             } catch (IOException e) {
 179                 shutdown();
 180             }
 181         }
 182     }
 183 
 184     @Override
 185     long write(ByteBuffer buffer) throws IOException {
 186         if (mode != Mode.ASYNC)
 187             return chan.write(buffer);
 188         // async
 189         synchronized(writing) {
 190             int qlen = asyncOutputQ.size();
 191             long n = buffer.remaining();
 192             asyncOutputQ.put(buffer);
 193             if (qlen == 0)
 194                 asyncOutput();
 195             return n;
 196         }
 197     }
 198 
 199     @Override
 200     public String toString() {
 201         return "PlainHttpConnection: " + super.toString();
 202     }
 203 
 204     /**
 205      * Close this connection
 206      */
 207     @Override
 208     public synchronized void close() {
 209         if (closed)
 210             return;
 211         closed = true;
 212         try {
 213             Log.logError("Closing: " + toString());
 214             //System.out.println("Closing: " + this);
 215             chan.close();
 216         } catch (IOException e) {}
 217     }
 218 
 219     @Override
 220     protected ByteBuffer readImpl(int length) throws IOException {
 221         ByteBuffer buf = getBuffer(); // TODO not using length
 222         int n = chan.read(buf);
 223         if (n == -1) {
 224             return null;
 225         }
 226         buf.flip();
 227         String s = "Receive (" + n + " bytes) ";
 228         //debugPrint(s, buf);
 229         return buf;
 230     }
 231 
 232     void shutdown() {
 233         close();
 234         errorReceiver.accept(new IOException("Connection aborted"));
 235     }
 236 
 237     void asyncRead() {
 238         synchronized (reading) {
 239             try {
 240                 while (true) {
 241                     ByteBuffer buf = getBuffer();
 242                     int n = chan.read(buf);
 243                     //System.err.printf("Read %d bytes from chan\n", n);
 244                     if (n == -1) {
 245                         throw new IOException();
 246                     }
 247                     if (n == 0) {
 248                         returnBuffer(buf);
 249                         return;
 250                     }
 251                     buf.flip();
 252                     asyncReceiver.accept(buf);
 253                 }
 254             } catch (IOException e) {
 255                 shutdown();
 256             }
 257         }
 258     }
 259 
 260     @Override
 261     protected int readImpl(ByteBuffer buf) throws IOException {
 262         int mark = buf.position();
 263         int n;
 264         // FIXME: this hack works in conjunction with the corresponding change
 265         // in java.net.http.RawChannel.registerEvent
 266         if ((n = buffer.remaining()) != 0) {
 267             buf.put(buffer);
 268         } else {
 269             n = chan.read(buf);
 270         }
 271         if (n == -1) {
 272             return -1;
 273         }
 274         Utils.flipToMark(buf, mark);
 275         String s = "Receive (" + n + " bytes) ";
 276         //debugPrint(s, buf);
 277         return n;
 278     }
 279 
 280     @Override
 281     ConnectionPool.CacheKey cacheKey() {
 282         return new ConnectionPool.CacheKey(address, null);
 283     }
 284 
 285     @Override
 286     synchronized boolean connected() {
 287         return connected;
 288     }
 289 
 290     // used for all output in HTTP/2
 291     class WriteEvent extends AsyncEvent {
 292         WriteEvent() {
 293             super(0);
 294         }
 295 
 296         @Override
 297         public SelectableChannel channel() {
 298             return chan;
 299         }
 300 
 301         @Override
 302         public int interestOps() {
 303             return SelectionKey.OP_WRITE;
 304         }
 305 
 306         @Override
 307         public void handle() {
 308             asyncOutput();
 309         }
 310 
 311         @Override
 312         public void abort() {
 313             shutdown();
 314         }
 315     }
 316 
 317     // used for all input in HTTP/2
 318     class ReadEvent extends AsyncEvent {
 319         ReadEvent() {
 320             super(AsyncEvent.REPEATING); // && !BLOCKING
 321         }
 322 
 323         @Override
 324         public SelectableChannel channel() {
 325             return chan;
 326         }
 327 
 328         @Override
 329         public int interestOps() {
 330             return SelectionKey.OP_READ;
 331         }
 332 
 333         @Override
 334         public void handle() {
 335             asyncRead();
 336         }
 337 
 338         @Override
 339         public void abort() {
 340             shutdown();
 341         }
 342 
 343     }
 344 
 345     // used in blocking channels only
 346     class ReceiveResponseEvent extends AsyncEvent {
 347         CompletableFuture<Void> cf;
 348 
 349         ReceiveResponseEvent(CompletableFuture<Void> cf) {
 350             super(AsyncEvent.BLOCKING);
 351             this.cf = cf;
 352         }
 353         @Override
 354         public SelectableChannel channel() {
 355             return chan;
 356         }
 357 
 358         @Override
 359         public void handle() {
 360             cf.complete(null);
 361         }
 362 
 363         @Override
 364         public int interestOps() {
 365             return SelectionKey.OP_READ;
 366         }
 367 
 368         @Override
 369         public void abort() {
 370             close();
 371         }
 372     }
 373 
 374     @Override
 375     boolean isSecure() {
 376         return false;
 377     }
 378 
 379     @Override
 380     boolean isProxied() {
 381         return false;
 382     }
 383 
 384     @Override
 385     public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver,
 386             Consumer<Throwable> errorReceiver) {
 387         this.asyncReceiver = asyncReceiver;
 388         this.errorReceiver = errorReceiver;
 389         asyncOutputQ = new Queue<>();
 390         asyncOutputQ.registerPutCallback(this::asyncOutput);
 391     }
 392 
 393     @Override
 394     CompletableFuture<Void> whenReceivingResponse() {
 395         CompletableFuture<Void> cf = new CompletableFuture<>();
 396         try {
 397             client.registerEvent(new ReceiveResponseEvent(cf));
 398         } catch (IOException e) {
 399             cf.completeExceptionally(e);
 400         }
 401         return cf;
 402     }
 403 }