1 /*
   2  * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;
  29 import java.net.ConnectException;
  30 import java.net.InetSocketAddress;
  31 import java.net.StandardSocketOptions;
  32 import java.nio.channels.SelectableChannel;
  33 import java.nio.channels.SelectionKey;
  34 import java.nio.channels.SocketChannel;
  35 import java.security.AccessController;
  36 import java.security.PrivilegedActionException;
  37 import java.security.PrivilegedExceptionAction;
  38 import java.time.Duration;
  39 import java.util.concurrent.CompletableFuture;
  40 import jdk.internal.net.http.common.FlowTube;
  41 import jdk.internal.net.http.common.Log;
  42 import jdk.internal.net.http.common.MinimalFuture;
  43 import jdk.internal.net.http.common.Utils;
  44 
  45 /**
  46  * Plain raw TCP connection direct to destination.
  47  * The connection operates in asynchronous non-blocking mode.
  48  * All reads and writes are done non-blocking.
  49  */
  50 class PlainHttpConnection extends HttpConnection {
  51 
  52     private final Object reading = new Object();
  53     protected final SocketChannel chan;
  54     private final SocketTube tube; // need SocketTube to call signalClosed().
  55     private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
  56     private volatile boolean connected;
  57     private boolean closed;
  58     private volatile ConnectTimerEvent connectTimerEvent;  // may be null
  59 
  60     // should be volatile to provide proper synchronization(visibility) action
  61 
  62     /**
  63      * Returns a ConnectTimerEvent iff there is a connect timeout duration,
  64      * otherwise null.
  65      */
  66     private ConnectTimerEvent newConnectTimer(Exchange<?> exchange,
  67                                               CompletableFuture<Void> cf) {
  68         Duration duration = exchange.remainingConnectTimeout().orElse(null);
  69         if (duration != null) {
  70             ConnectTimerEvent cte = new ConnectTimerEvent(duration, exchange, cf);
  71             return cte;
  72         }
  73         return null;
  74     }
  75 
  76     final class ConnectTimerEvent extends TimeoutEvent {
  77         private final CompletableFuture<Void> cf;
  78         private final Exchange<?> exchange;
  79 
  80         ConnectTimerEvent(Duration duration,
  81                           Exchange<?> exchange,
  82                           CompletableFuture<Void> cf) {
  83             super(duration);
  84             this.exchange = exchange;
  85             this.cf = cf;
  86         }
  87 
  88         @Override
  89         public void handle() {
  90             if (debug.on()) {
  91                 debug.log("HTTP connect timed out");
  92             }
  93             ConnectException ce = new ConnectException("HTTP connect timed out");
  94             exchange.multi.cancel(ce);
  95             client().theExecutor().execute(() -> cf.completeExceptionally(ce));
  96         }
  97 
  98         @Override
  99         public String toString() {
 100             return "ConnectTimerEvent, " + super.toString();
 101         }
 102     }
 103 
 104     final class ConnectEvent extends AsyncEvent {
 105         private final CompletableFuture<Void> cf;
 106 
 107         ConnectEvent(CompletableFuture<Void> cf) {
 108             this.cf = cf;
 109         }
 110 
 111         @Override
 112         public SelectableChannel channel() {
 113             return chan;
 114         }
 115 
 116         @Override
 117         public int interestOps() {
 118             return SelectionKey.OP_CONNECT;
 119         }
 120 
 121         @Override
 122         public void handle() {
 123             try {
 124                 assert !connected : "Already connected";
 125                 assert !chan.isBlocking() : "Unexpected blocking channel";
 126                 if (debug.on())
 127                     debug.log("ConnectEvent: finishing connect");
 128                 boolean finished = chan.finishConnect();
 129                 assert finished : "Expected channel to be connected";
 130                 if (debug.on())
 131                     debug.log("ConnectEvent: connect finished: %s Local addr: %s",
 132                               finished, chan.getLocalAddress());
 133                 // complete async since the event runs on the SelectorManager thread
 134                 cf.completeAsync(() -> null, client().theExecutor());
 135             } catch (Throwable e) {
 136                 Throwable t = Utils.toConnectException(e);
 137                 client().theExecutor().execute( () -> cf.completeExceptionally(t));
 138                 close();
 139             }
 140         }
 141 
 142         @Override
 143         public void abort(IOException ioe) {
 144             client().theExecutor().execute( () -> cf.completeExceptionally(ioe));
 145             close();
 146         }
 147     }
 148 
 149     @Override
 150     public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
 151         CompletableFuture<Void> cf = new MinimalFuture<>();
 152         try {
 153             assert !connected : "Already connected";
 154             assert !chan.isBlocking() : "Unexpected blocking channel";
 155             boolean finished;
 156 
 157             connectTimerEvent = newConnectTimer(exchange, cf);
 158             if (connectTimerEvent != null) {
 159                 if (debug.on())
 160                     debug.log("registering connect timer: " + connectTimerEvent);
 161                 client().registerTimer(connectTimerEvent);
 162             }
 163 
 164             PrivilegedExceptionAction<Boolean> pa =
 165                     () -> chan.connect(Utils.resolveAddress(address));
 166             try {
 167                  finished = AccessController.doPrivileged(pa);
 168             } catch (PrivilegedActionException e) {
 169                throw e.getCause();
 170             }
 171             if (finished) {
 172                 if (debug.on()) debug.log("connect finished without blocking");
 173                 cf.complete(null);
 174             } else {
 175                 if (debug.on()) debug.log("registering connect event");
 176                 client().registerEvent(new ConnectEvent(cf));
 177             }
 178         } catch (Throwable throwable) {
 179             cf.completeExceptionally(Utils.toConnectException(throwable));
 180             try {
 181                 close();
 182             } catch (Exception x) {
 183                 if (debug.on())
 184                     debug.log("Failed to close channel after unsuccessful connect");
 185             }
 186         }
 187         return cf;
 188     }
 189 
 190     @Override
 191     public CompletableFuture<Void> finishConnect() {
 192         assert connected == false;
 193         if (debug.on()) debug.log("finishConnect, setting connected=true");
 194         connected = true;
 195         if (connectTimerEvent != null)
 196             client().cancelTimer(connectTimerEvent);
 197         return MinimalFuture.completedFuture(null);
 198     }
 199 
 200     @Override
 201     SocketChannel channel() {
 202         return chan;
 203     }
 204 
 205     @Override
 206     final FlowTube getConnectionFlow() {
 207         return tube;
 208     }
 209 
 210     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
 211         super(addr, client);
 212         try {
 213             this.chan = SocketChannel.open();
 214             chan.configureBlocking(false);
 215             trySetReceiveBufferSize(client.getReceiveBufferSize());
 216             if (debug.on()) {
 217                 int bufsize = getInitialBufferSize();
 218                 debug.log("Initial receive buffer size is: %d", bufsize);
 219             }
 220             chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
 221             // wrap the channel in a Tube for async reading and writing
 222             tube = new SocketTube(client(), chan, Utils::getBuffer);
 223         } catch (IOException e) {
 224             throw new InternalError(e);
 225         }
 226     }
 227 
 228     private int getInitialBufferSize() {
 229         try {
 230             return chan.getOption(StandardSocketOptions.SO_RCVBUF);
 231         } catch(IOException x) {
 232             if (debug.on())
 233                 debug.log("Failed to get initial receive buffer size on %s", chan);
 234         }
 235         return 0;
 236     }
 237 
 238     private void trySetReceiveBufferSize(int bufsize) {
 239         try {
 240             if (bufsize > 0) {
 241                 chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
 242             }
 243         } catch(IOException x) {
 244             if (debug.on())
 245                 debug.log("Failed to set receive buffer size to %d on %s",
 246                           bufsize, chan);
 247         }
 248     }
 249 
 250     @Override
 251     HttpPublisher publisher() { return writePublisher; }
 252 
 253 
 254     @Override
 255     public String toString() {
 256         return "PlainHttpConnection: " + super.toString();
 257     }
 258 
 259     /**
 260      * Closes this connection
 261      */
 262     @Override
 263     public void close() {
 264         synchronized (this) {
 265             if (closed) {
 266                 return;
 267             }
 268             closed = true;
 269         }
 270         try {
 271             Log.logTrace("Closing: " + toString());
 272             if (debug.on())
 273                 debug.log("Closing channel: " + client().debugInterestOps(chan));
 274             if (connectTimerEvent != null)
 275                 client().cancelTimer(connectTimerEvent);
 276             chan.close();
 277             tube.signalClosed();
 278         } catch (IOException e) {
 279             Log.logTrace("Closing resulted in " + e);
 280         }
 281     }
 282 
 283 
 284     @Override
 285     ConnectionPool.CacheKey cacheKey() {
 286         return new ConnectionPool.CacheKey(address, null);
 287     }
 288 
 289     @Override
 290     synchronized boolean connected() {
 291         return connected;
 292     }
 293 
 294 
 295     @Override
 296     boolean isSecure() {
 297         return false;
 298     }
 299 
 300     @Override
 301     boolean isProxied() {
 302         return false;
 303     }
 304 
 305 }