< prev index next >

src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java

Print this page




   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;

  29 import java.net.InetSocketAddress;
  30 import java.net.StandardSocketOptions;
  31 import java.nio.channels.SelectableChannel;
  32 import java.nio.channels.SelectionKey;
  33 import java.nio.channels.SocketChannel;
  34 import java.security.AccessController;
  35 import java.security.PrivilegedActionException;
  36 import java.security.PrivilegedExceptionAction;

  37 import java.util.concurrent.CompletableFuture;
  38 import jdk.internal.net.http.common.FlowTube;
  39 import jdk.internal.net.http.common.Log;
  40 import jdk.internal.net.http.common.MinimalFuture;
  41 import jdk.internal.net.http.common.Utils;
  42 
  43 /**
  44  * Plain raw TCP connection direct to destination.
  45  * The connection operates in asynchronous non-blocking mode.
  46  * All reads and writes are done non-blocking.
  47  */
  48 class PlainHttpConnection extends HttpConnection {
  49 
  50     private final Object reading = new Object();
  51     protected final SocketChannel chan;
  52     private final SocketTube tube; // need SocketTube to call signalClosed().
  53     private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
  54     private volatile boolean connected;
  55     private boolean closed;

  56 
  57     // should be volatile to provide proper synchronization(visibility) action
  58 










































  59     final class ConnectEvent extends AsyncEvent {
  60         private final CompletableFuture<Void> cf;
  61 
  62         ConnectEvent(CompletableFuture<Void> cf) {
  63             this.cf = cf;
  64         }
  65 
  66         @Override
  67         public SelectableChannel channel() {
  68             return chan;
  69         }
  70 
  71         @Override
  72         public int interestOps() {
  73             return SelectionKey.OP_CONNECT;
  74         }
  75 
  76         @Override
  77         public void handle() {
  78             try {
  79                 assert !connected : "Already connected";
  80                 assert !chan.isBlocking() : "Unexpected blocking channel";
  81                 if (debug.on())
  82                     debug.log("ConnectEvent: finishing connect");
  83                 boolean finished = chan.finishConnect();
  84                 assert finished : "Expected channel to be connected";
  85                 if (debug.on())
  86                     debug.log("ConnectEvent: connect finished: %s Local addr: %s",
  87                               finished, chan.getLocalAddress());
  88                 connected = true;
  89                 // complete async since the event runs on the SelectorManager thread
  90                 cf.completeAsync(() -> null, client().theExecutor());
  91             } catch (Throwable e) {
  92                 Throwable t = Utils.toConnectException(e);
  93                 client().theExecutor().execute( () -> cf.completeExceptionally(t));
  94                 close();
  95             }
  96         }
  97 
  98         @Override
  99         public void abort(IOException ioe) {
 100             client().theExecutor().execute( () -> cf.completeExceptionally(ioe));
 101             close();
 102         }
 103     }
 104 
 105     @Override
 106     public CompletableFuture<Void> connectAsync() {
 107         CompletableFuture<Void> cf = new MinimalFuture<>();
 108         try {
 109             assert !connected : "Already connected";
 110             assert !chan.isBlocking() : "Unexpected blocking channel";
 111             boolean finished = false;








 112             PrivilegedExceptionAction<Boolean> pa =
 113                     () -> chan.connect(Utils.resolveAddress(address));
 114             try {
 115                  finished = AccessController.doPrivileged(pa);
 116             } catch (PrivilegedActionException e) {
 117                throw e.getCause();
 118             }
 119             if (finished) {
 120                 if (debug.on()) debug.log("connect finished without blocking");
 121                 connected = true;
 122                 cf.complete(null);
 123             } else {
 124                 if (debug.on()) debug.log("registering connect event");
 125                 client().registerEvent(new ConnectEvent(cf));
 126             }
 127         } catch (Throwable throwable) {
 128             cf.completeExceptionally(Utils.toConnectException(throwable));
 129             try {
 130                 close();
 131             } catch (Exception x) {
 132                 if (debug.on())
 133                     debug.log("Failed to close channel after unsuccessful connect");
 134             }
 135         }
 136         return cf;
 137     }
 138 
 139     @Override










 140     SocketChannel channel() {
 141         return chan;
 142     }
 143 
 144     @Override
 145     final FlowTube getConnectionFlow() {
 146         return tube;
 147     }
 148 
 149     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
 150         super(addr, client);
 151         try {
 152             this.chan = SocketChannel.open();
 153             chan.configureBlocking(false);
 154             trySetReceiveBufferSize(client.getReceiveBufferSize());
 155             if (debug.on()) {
 156                 int bufsize = getInitialBufferSize();
 157                 debug.log("Initial receive buffer size is: %d", bufsize);
 158             }
 159             chan.setOption(StandardSocketOptions.TCP_NODELAY, true);


 193     @Override
 194     public String toString() {
 195         return "PlainHttpConnection: " + super.toString();
 196     }
 197 
 198     /**
 199      * Closes this connection
 200      */
 201     @Override
 202     public void close() {
 203         synchronized (this) {
 204             if (closed) {
 205                 return;
 206             }
 207             closed = true;
 208         }
 209         try {
 210             Log.logTrace("Closing: " + toString());
 211             if (debug.on())
 212                 debug.log("Closing channel: " + client().debugInterestOps(chan));


 213             chan.close();
 214             tube.signalClosed();
 215         } catch (IOException e) {
 216             Log.logTrace("Closing resulted in " + e);
 217         }
 218     }
 219 
 220 
 221     @Override
 222     ConnectionPool.CacheKey cacheKey() {
 223         return new ConnectionPool.CacheKey(address, null);
 224     }
 225 
 226     @Override
 227     synchronized boolean connected() {
 228         return connected;
 229     }
 230 
 231 
 232     @Override


   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;
  29 import java.net.ConnectException;
  30 import java.net.InetSocketAddress;
  31 import java.net.StandardSocketOptions;
  32 import java.nio.channels.SelectableChannel;
  33 import java.nio.channels.SelectionKey;
  34 import java.nio.channels.SocketChannel;
  35 import java.security.AccessController;
  36 import java.security.PrivilegedActionException;
  37 import java.security.PrivilegedExceptionAction;
  38 import java.time.Duration;
  39 import java.util.concurrent.CompletableFuture;
  40 import jdk.internal.net.http.common.FlowTube;
  41 import jdk.internal.net.http.common.Log;
  42 import jdk.internal.net.http.common.MinimalFuture;
  43 import jdk.internal.net.http.common.Utils;
  44 
  45 /**
  46  * Plain raw TCP connection direct to destination.
  47  * The connection operates in asynchronous non-blocking mode.
  48  * All reads and writes are done non-blocking.
  49  */
  50 class PlainHttpConnection extends HttpConnection {
  51 
  52     private final Object reading = new Object();
  53     protected final SocketChannel chan;
  54     private final SocketTube tube; // need SocketTube to call signalClosed().
  55     private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
  56     private volatile boolean connected;
  57     private boolean closed;
  58     private volatile ConnectTimerEvent connectTimerEvent;  // may be null
  59 
  60     // should be volatile to provide proper synchronization(visibility) action
  61 
  62     /**
  63      * Returns a ConnectTimerEvent iff there is a connect timeout duration,
  64      * otherwise null.
  65      */
  66     private ConnectTimerEvent newConnectTimer(Exchange<?> exchange,
  67                                               CompletableFuture<Void> cf) {
  68         Duration duration = client().connectTimeout().orElse(null);
  69         if (duration != null) {
  70             ConnectTimerEvent cte = new ConnectTimerEvent(duration, exchange, cf);
  71             return cte;
  72         }
  73         return null;
  74     }
  75 
  76     final class ConnectTimerEvent extends TimeoutEvent {
  77         private final CompletableFuture<Void> cf;
  78         private final Exchange<?> exchange;
  79 
  80         ConnectTimerEvent(Duration duration,
  81                           Exchange<?> exchange,
  82                           CompletableFuture<Void> cf) {
  83             super(duration);
  84             this.exchange = exchange;
  85             this.cf = cf;
  86         }
  87 
  88         @Override
  89         public void handle() {
  90             if (debug.on()) {
  91                 debug.log("HTTP connect timed out");
  92             }
  93             ConnectException ce = new ConnectException("HTTP connect timed out");
  94             exchange.multi.cancel(ce);
  95             client().theExecutor().execute(() -> cf.completeExceptionally(ce));
  96         }
  97 
  98         @Override
  99         public String toString() {
 100             return "ConnectTimerEvent, " + super.toString();
 101         }
 102     }
 103 
 104     final class ConnectEvent extends AsyncEvent {
 105         private final CompletableFuture<Void> cf;
 106 
 107         ConnectEvent(CompletableFuture<Void> cf) {
 108             this.cf = cf;
 109         }
 110 
 111         @Override
 112         public SelectableChannel channel() {
 113             return chan;
 114         }
 115 
 116         @Override
 117         public int interestOps() {
 118             return SelectionKey.OP_CONNECT;
 119         }
 120 
 121         @Override
 122         public void handle() {
 123             try {
 124                 assert !connected : "Already connected";
 125                 assert !chan.isBlocking() : "Unexpected blocking channel";
 126                 if (debug.on())
 127                     debug.log("ConnectEvent: finishing connect");
 128                 boolean finished = chan.finishConnect();
 129                 assert finished : "Expected channel to be connected";
 130                 if (debug.on())
 131                     debug.log("ConnectEvent: connect finished: %s Local addr: %s",
 132                               finished, chan.getLocalAddress());

 133                 // complete async since the event runs on the SelectorManager thread
 134                 cf.completeAsync(() -> null, client().theExecutor());
 135             } catch (Throwable e) {
 136                 Throwable t = Utils.toConnectException(e);
 137                 client().theExecutor().execute( () -> cf.completeExceptionally(t));
 138                 close();
 139             }
 140         }
 141 
 142         @Override
 143         public void abort(IOException ioe) {
 144             client().theExecutor().execute( () -> cf.completeExceptionally(ioe));
 145             close();
 146         }
 147     }
 148 
 149     @Override
 150     public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
 151         CompletableFuture<Void> cf = new MinimalFuture<>();
 152         try {
 153             assert !connected : "Already connected";
 154             assert !chan.isBlocking() : "Unexpected blocking channel";
 155             boolean finished;
 156 
 157             connectTimerEvent = newConnectTimer(exchange, cf);
 158             if (connectTimerEvent != null) {
 159                 if (debug.on())
 160                     debug.log("registering connect timer: " + connectTimerEvent);
 161                 client().registerTimer(connectTimerEvent);
 162             }
 163 
 164             PrivilegedExceptionAction<Boolean> pa =
 165                     () -> chan.connect(Utils.resolveAddress(address));
 166             try {
 167                  finished = AccessController.doPrivileged(pa);
 168             } catch (PrivilegedActionException e) {
 169                throw e.getCause();
 170             }
 171             if (finished) {
 172                 if (debug.on()) debug.log("connect finished without blocking");

 173                 cf.complete(null);
 174             } else {
 175                 if (debug.on()) debug.log("registering connect event");
 176                 client().registerEvent(new ConnectEvent(cf));
 177             }
 178         } catch (Throwable throwable) {
 179             cf.completeExceptionally(Utils.toConnectException(throwable));
 180             try {
 181                 close();
 182             } catch (Exception x) {
 183                 if (debug.on())
 184                     debug.log("Failed to close channel after unsuccessful connect");
 185             }
 186         }
 187         return cf;
 188     }
 189 
 190     @Override
 191     public CompletableFuture<Void> finishConnect() {
 192         assert connected == false;
 193         if (debug.on()) debug.log("finishConnect, setting connected=true");
 194         connected = true;
 195         if (connectTimerEvent != null)
 196             client().cancelTimer(connectTimerEvent);
 197         return MinimalFuture.completedFuture(null);
 198     }
 199 
 200     @Override
 201     SocketChannel channel() {
 202         return chan;
 203     }
 204 
 205     @Override
 206     final FlowTube getConnectionFlow() {
 207         return tube;
 208     }
 209 
 210     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
 211         super(addr, client);
 212         try {
 213             this.chan = SocketChannel.open();
 214             chan.configureBlocking(false);
 215             trySetReceiveBufferSize(client.getReceiveBufferSize());
 216             if (debug.on()) {
 217                 int bufsize = getInitialBufferSize();
 218                 debug.log("Initial receive buffer size is: %d", bufsize);
 219             }
 220             chan.setOption(StandardSocketOptions.TCP_NODELAY, true);


 254     @Override
 255     public String toString() {
 256         return "PlainHttpConnection: " + super.toString();
 257     }
 258 
 259     /**
 260      * Closes this connection
 261      */
 262     @Override
 263     public void close() {
 264         synchronized (this) {
 265             if (closed) {
 266                 return;
 267             }
 268             closed = true;
 269         }
 270         try {
 271             Log.logTrace("Closing: " + toString());
 272             if (debug.on())
 273                 debug.log("Closing channel: " + client().debugInterestOps(chan));
 274             if (connectTimerEvent != null)
 275                 client().cancelTimer(connectTimerEvent);
 276             chan.close();
 277             tube.signalClosed();
 278         } catch (IOException e) {
 279             Log.logTrace("Closing resulted in " + e);
 280         }
 281     }
 282 
 283 
 284     @Override
 285     ConnectionPool.CacheKey cacheKey() {
 286         return new ConnectionPool.CacheKey(address, null);
 287     }
 288 
 289     @Override
 290     synchronized boolean connected() {
 291         return connected;
 292     }
 293 
 294 
 295     @Override
< prev index next >