< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java

Print this page




   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;

  29 import java.net.InetSocketAddress;
  30 import java.net.StandardSocketOptions;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.SelectableChannel;
  33 import java.nio.channels.SelectionKey;
  34 import java.nio.channels.SocketChannel;



  35 import java.util.concurrent.CompletableFuture;
  36 import java.util.function.Consumer;
  37 import java.util.function.Supplier;
  38 
  39 import jdk.incubator.http.internal.common.AsyncWriteQueue;
  40 import jdk.incubator.http.internal.common.ByteBufferReference;
  41 import jdk.incubator.http.internal.common.Log;
  42 import jdk.incubator.http.internal.common.MinimalFuture;
  43 import jdk.incubator.http.internal.common.Utils;
  44 
  45 /**
  46  * Plain raw TCP connection direct to destination. 2 modes
  47  * 1) Blocking used by http/1. In this case the connect is actually non
  48  *    blocking but the request is sent blocking. The first byte of a response
  49  *    is received non-blocking and the remainder of the response is received
  50  *    blocking
  51  * 2) Non-blocking. In this case (for http/2) the connection is actually opened
  52  *    blocking but all reads and writes are done non-blocking under the
  53  *    control of a Http2Connection object.
  54  */
  55 class PlainHttpConnection extends HttpConnection implements AsyncConnection {
  56 

  57     protected final SocketChannel chan;


  58     private volatile boolean connected;
  59     private boolean closed;
  60 
  61     // should be volatile to provide proper synchronization(visibility) action
  62     private volatile Consumer<ByteBufferReference> asyncReceiver;
  63     private volatile Consumer<Throwable> errorReceiver;
  64     private volatile Supplier<ByteBufferReference> readBufferSupplier;
  65     private boolean asyncReading;
  66 
  67     private final AsyncWriteQueue asyncOutputQ = new AsyncWriteQueue(this::asyncOutput);
  68 
  69     private final Object reading = new Object();
  70 
  71     @Override
  72     public void startReading() {
  73         try {
  74             synchronized(reading) {
  75                 asyncReading = true;
  76             }
  77             client.registerEvent(new ReadEvent());
  78         } catch (IOException e) {
  79             shutdown();
  80         }
  81     }
  82 
  83     @Override
  84     public void stopAsyncReading() {
  85         synchronized(reading) {
  86             asyncReading = false;
  87         }
  88         client.cancelRegistration(chan);
  89     }
  90 
  91     class ConnectEvent extends AsyncEvent {
  92         CompletableFuture<Void> cf;
  93 
  94         ConnectEvent(CompletableFuture<Void> cf) {
  95             super(AsyncEvent.BLOCKING);
  96             this.cf = cf;
  97         }
  98 
  99         @Override
 100         public SelectableChannel channel() {
 101             return chan;
 102         }
 103 
 104         @Override
 105         public int interestOps() {
 106             return SelectionKey.OP_CONNECT;
 107         }
 108 
 109         @Override
 110         public void handle() {


 111             try {
 112                 chan.finishConnect();







 113             } catch (IOException e) {
 114                 cf.completeExceptionally(e);
 115                 return;
 116             }
 117             connected = true;
 118             cf.complete(null);
 119         }
 120 
 121         @Override
 122         public void abort() {
 123             close();

 124         }
 125     }
 126 
 127     @Override
 128     public CompletableFuture<Void> connectAsync() {
 129         CompletableFuture<Void> plainFuture = new MinimalFuture<>();


 130         try {
 131             chan.configureBlocking(false);
 132             chan.connect(address);
 133             client.registerEvent(new ConnectEvent(plainFuture));
 134         } catch (IOException e) {
 135             plainFuture.completeExceptionally(e);
 136         }
 137         return plainFuture;
 138     }
 139 
 140     @Override
 141     public void connect() throws IOException {
 142         chan.connect(address);
 143         connected = true;









 144     }
 145 
 146     @Override
 147     SocketChannel channel() {
 148         return chan;
 149     }
 150 





 151     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
 152         super(addr, client);
 153         try {
 154             this.chan = SocketChannel.open();

 155             int bufsize = client.getReceiveBufferSize();
 156             chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
 157             chan.setOption(StandardSocketOptions.TCP_NODELAY, true);


 158         } catch (IOException e) {
 159             throw new InternalError(e);
 160         }
 161     }
 162 
 163     @Override
 164     long write(ByteBuffer[] buffers, int start, int number) throws IOException {
 165         if (getMode() != Mode.ASYNC) {
 166             return chan.write(buffers, start, number);
 167         }
 168         // async
 169         buffers = Utils.reduce(buffers, start, number);
 170         long n = Utils.remaining(buffers);
 171         asyncOutputQ.put(ByteBufferReference.toReferences(buffers));
 172         flushAsync();
 173         return n;
 174     }
 175 
 176     @Override
 177     long write(ByteBuffer buffer) throws IOException {
 178         if (getMode() != Mode.ASYNC) {
 179             return chan.write(buffer);
 180         }
 181         // async
 182         long n = buffer.remaining();
 183         asyncOutputQ.put(ByteBufferReference.toReferences(buffer));
 184         flushAsync();
 185         return n;
 186     }
 187 
 188     // handle registered WriteEvent; invoked from SelectorManager thread
 189     void flushRegistered() {
 190         if (getMode() == Mode.ASYNC) {
 191             try {
 192                 asyncOutputQ.flushDelayed();
 193             } catch (IOException e) {
 194                 // Only IOException caused by closed Queue is expected here
 195                 shutdown();
 196             }
 197         }
 198     }
 199 
 200     @Override
 201     public void writeAsync(ByteBufferReference[] buffers) throws IOException {
 202         if (getMode() != Mode.ASYNC) {
 203             chan.write(ByteBufferReference.toBuffers(buffers));
 204             ByteBufferReference.clear(buffers);
 205         } else {
 206             asyncOutputQ.put(buffers);
 207         }
 208     }
 209 
 210     @Override
 211     public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
 212         if (getMode() != Mode.ASYNC) {
 213             chan.write(ByteBufferReference.toBuffers(buffers));
 214             ByteBufferReference.clear(buffers);
 215         } else {
 216             // Unordered frames are sent before existing frames.
 217             asyncOutputQ.putFirst(buffers);
 218         }
 219     }
 220 
 221     @Override
 222     public void flushAsync() throws IOException {
 223         if (getMode() == Mode.ASYNC) {
 224             asyncOutputQ.flush();
 225         }
 226     }
 227 
 228     @Override
 229     public void enableCallback() {
 230         // not used
 231         assert false;
 232     }
 233 
 234     boolean asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
 235         try {
 236             ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs);
 237             while (Utils.remaining(bufs) > 0) {
 238                 long n = chan.write(bufs);
 239                 if (n == 0) {
 240                     delayCallback.setDelayed(refs);
 241                     client.registerEvent(new WriteEvent());
 242                     return false;
 243                 }
 244             }
 245             ByteBufferReference.clear(refs);
 246         } catch (IOException e) {
 247             shutdown();
 248         }
 249         return true;
 250     }
 251 
 252     @Override
 253     public String toString() {
 254         return "PlainHttpConnection: " + super.toString();
 255     }
 256 
 257     /**
 258      * Close this connection
 259      */
 260     @Override
 261     public synchronized void close() {
 262         if (closed) {
 263             return;
 264         }
 265         closed = true;
 266         try {
 267             Log.logError("Closing: " + toString());
 268             chan.close();
 269         } catch (IOException e) {}
 270     }
 271 
 272     @Override
 273     void shutdownInput() throws IOException {

 274         chan.shutdownInput();
 275     }
 276 
 277     @Override
 278     void shutdownOutput() throws IOException {

 279         chan.shutdownOutput();
 280     }
 281 
 282     void shutdown() {
 283         close();
 284         errorReceiver.accept(new IOException("Connection aborted"));
 285     }
 286 
 287     void asyncRead() {
 288         synchronized (reading) {
 289             try {
 290                 while (asyncReading) {
 291                     ByteBufferReference buf = readBufferSupplier.get();
 292                     int n = chan.read(buf.get());
 293                     if (n == -1) {
 294                         throw new IOException();
 295                     }
 296                     if (n == 0) {
 297                         buf.clear();
 298                         return;
 299                     }
 300                     buf.get().flip();
 301                     asyncReceiver.accept(buf);
 302                 }
 303             } catch (IOException e) {
 304                 shutdown();
 305             }
 306         }
 307     }
 308 
 309     @Override
 310     protected ByteBuffer readImpl() throws IOException {
 311         ByteBuffer dst = ByteBuffer.allocate(8192);
 312         int n = readImpl(dst);
 313         if (n > 0) {
 314             return dst;
 315         } else if (n == 0) {
 316             return Utils.EMPTY_BYTEBUFFER;
 317         } else {
 318             return null;
 319         }
 320     }
 321 
 322     private int readImpl(ByteBuffer buf) throws IOException {
 323         int mark = buf.position();
 324         int n;
 325         // FIXME: this hack works in conjunction with the corresponding change
 326         // in jdk.incubator.http.RawChannel.registerEvent
 327         //if ((n = buffer.remaining()) != 0) {
 328             //buf.put(buffer);
 329         //} else {
 330             n = chan.read(buf);
 331         //}
 332         if (n == -1) {
 333             return -1;
 334         }
 335         Utils.flipToMark(buf, mark);
 336         // String s = "Receive (" + n + " bytes) ";
 337         //debugPrint(s, buf);
 338         return n;
 339     }
 340 
 341     @Override
 342     ConnectionPool.CacheKey cacheKey() {
 343         return new ConnectionPool.CacheKey(address, null);
 344     }
 345 
 346     @Override
 347     synchronized boolean connected() {
 348         return connected;
 349     }
 350 
 351     // used for all output in HTTP/2
 352     class WriteEvent extends AsyncEvent {
 353         WriteEvent() {
 354             super(0);
 355         }
 356 
 357         @Override
 358         public SelectableChannel channel() {
 359             return chan;
 360         }
 361 
 362         @Override
 363         public int interestOps() {
 364             return SelectionKey.OP_WRITE;
 365         }
 366 
 367         @Override
 368         public void handle() {
 369             flushRegistered();
 370         }
 371 
 372         @Override
 373         public void abort() {
 374             shutdown();
 375         }
 376     }
 377 
 378     // used for all input in HTTP/2
 379     class ReadEvent extends AsyncEvent {
 380         ReadEvent() {
 381             super(AsyncEvent.REPEATING); // && !BLOCKING









 382         }
 383 
 384         @Override
 385         public SelectableChannel channel() {
 386             return chan;
 387         }
 388 
 389         @Override
 390         public int interestOps() {
 391             return SelectionKey.OP_READ;
 392         }
 393 
 394         @Override
 395         public void handle() {
 396             asyncRead();







 397         }
 398 
 399         @Override
 400         public void abort() {
 401             shutdown();
 402         }
 403 
 404         @Override
 405         public String toString() {
 406             return super.toString() + "/" + chan;








 407         }

 408     }
 409 
 410     // used in blocking channels only
 411     class ReceiveResponseEvent extends AsyncEvent {
 412         CompletableFuture<Void> cf;
 413 
 414         ReceiveResponseEvent(CompletableFuture<Void> cf) {
 415             super(AsyncEvent.BLOCKING);
 416             this.cf = cf;
 417         }
 418         @Override
 419         public SelectableChannel channel() {
 420             return chan;
 421         }
 422 
 423         @Override
 424         public void handle() {
 425             cf.complete(null);


 426         }
 427 
 428         @Override
 429         public int interestOps() {
 430             return SelectionKey.OP_READ;
 431         }
 432 
 433         @Override
 434         public void abort() {
 435             close();
 436         }
 437 
 438         @Override
 439         public String toString() {
 440             return super.toString() + "/" + chan;
 441         }


 442     }
 443 
 444     @Override
 445     boolean isSecure() {
 446         return false;
 447     }
 448 
 449     @Override
 450     boolean isProxied() {
 451         return false;
 452     }
 453 
 454     @Override
 455     public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver,
 456                                   Consumer<Throwable> errorReceiver,
 457                                   Supplier<ByteBufferReference> readBufferSupplier) {
 458         this.asyncReceiver = asyncReceiver;
 459         this.errorReceiver = errorReceiver;
 460         this.readBufferSupplier = readBufferSupplier;
 461     }
 462 
 463     @Override
 464     CompletableFuture<Void> whenReceivingResponse() {
 465         CompletableFuture<Void> cf = new MinimalFuture<>();
 466         try {
 467             ReceiveResponseEvent evt = new ReceiveResponseEvent(cf);
 468             client.registerEvent(evt);
 469         } catch (IOException e) {
 470             cf.completeExceptionally(e);
 471         }
 472         return cf;
 473     }
 474 }


   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.InetSocketAddress;
  31 import java.net.StandardSocketOptions;
  32 import java.nio.ByteBuffer;
  33 import java.nio.channels.SelectableChannel;
  34 import java.nio.channels.SelectionKey;
  35 import java.nio.channels.SocketChannel;
  36 import java.security.AccessController;
  37 import java.security.PrivilegedActionException;
  38 import java.security.PrivilegedExceptionAction;
  39 import java.util.concurrent.CompletableFuture;
  40 import jdk.incubator.http.internal.common.FlowTube;




  41 import jdk.incubator.http.internal.common.Log;
  42 import jdk.incubator.http.internal.common.MinimalFuture;
  43 import jdk.incubator.http.internal.common.Utils;
  44 
  45 /**
  46  * Plain raw TCP connection direct to destination.
  47  * The connection operates in asynchronous non-blocking mode.
  48  * All reads and writes are done non-blocking.





  49  */
  50 class PlainHttpConnection extends HttpConnection {
  51 
  52     private final Object reading = new Object();
  53     protected final SocketChannel chan;
  54     private final FlowTube tube;
  55     private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
  56     private volatile boolean connected;
  57     private boolean closed;
  58 
  59     // should be volatile to provide proper synchronization(visibility) action






  60 
  61     final class ConnectEvent extends AsyncEvent {
  62         private final CompletableFuture<Void> cf;






















  63 
  64         ConnectEvent(CompletableFuture<Void> cf) {

  65             this.cf = cf;
  66         }
  67 
  68         @Override
  69         public SelectableChannel channel() {
  70             return chan;
  71         }
  72 
  73         @Override
  74         public int interestOps() {
  75             return SelectionKey.OP_CONNECT;
  76         }
  77 
  78         @Override
  79         public void handle() {
  80             assert !connected : "Already connected";
  81             assert !chan.isBlocking() : "Unexpected blocking channel";
  82             try {
  83                 debug.log(Level.DEBUG, "ConnectEvent: finishing connect");
  84                 boolean finished = chan.finishConnect();
  85                 assert finished : "Expected channel to be connected";
  86                 debug.log(Level.DEBUG,
  87                           "ConnectEvent: connect finished: %s", finished);
  88                 connected = true;
  89                 // complete async since the event runs on the SelectorManager thread
  90                 cf.completeAsync(() -> null, client().theExecutor());
  91             } catch (IOException e) {
  92                 client().theExecutor().execute( () -> cf.completeExceptionally(e));

  93             }


  94         }
  95 
  96         @Override
  97         public void abort(IOException ioe) {
  98             close();
  99             client().theExecutor().execute( () -> cf.completeExceptionally(ioe));
 100         }
 101     }
 102 
 103     @Override
 104     public CompletableFuture<Void> connectAsync() {
 105         assert !connected : "Already connected";
 106         assert !chan.isBlocking() : "Unexpected blocking channel";
 107         CompletableFuture<Void> cf = new MinimalFuture<>();
 108         try {
 109             boolean finished = false;
 110             PrivilegedExceptionAction<Boolean> pa = () -> chan.connect(address);
 111             try {
 112                  finished = AccessController.doPrivileged(pa);
 113             } catch (PrivilegedActionException e) {
 114                 cf.completeExceptionally(e.getCause());

 115             }
 116             if (finished) {
 117                 debug.log(Level.DEBUG, "connect finished without blocking");


 118                 connected = true;
 119                 cf.complete(null);
 120             } else {
 121                 debug.log(Level.DEBUG, "registering connect event");
 122                 client().registerEvent(new ConnectEvent(cf));
 123             }
 124         } catch (Throwable throwable) {
 125             cf.completeExceptionally(throwable);
 126         }
 127         return cf;
 128     }
 129 
 130     @Override
 131     SocketChannel channel() {
 132         return chan;
 133     }
 134 
 135     @Override
 136     final FlowTube getConnectionFlow() {
 137         return tube;
 138     }
 139 
 140     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
 141         super(addr, client);
 142         try {
 143             this.chan = SocketChannel.open();
 144             chan.configureBlocking(false);
 145             int bufsize = client.getReceiveBufferSize();
 146             chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
 147             chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
 148             // wrap the connected channel in a Tube for async reading and writing
 149             tube = new SocketTube(client(), chan, Utils::getBuffer);
 150         } catch (IOException e) {
 151             throw new InternalError(e);
 152         }
 153     }
 154 
 155     @Override
 156     HttpPublisher publisher() { return writePublisher; }










 157 











































































 158 
 159     @Override
 160     public String toString() {
 161         return "PlainHttpConnection: " + super.toString();
 162     }
 163 
 164     /**
 165      * Closes this connection
 166      */
 167     @Override
 168     public synchronized void close() {
 169         if (closed) {
 170             return;
 171         }
 172         closed = true;
 173         try {
 174             Log.logTrace("Closing: " + toString());
 175             chan.close();
 176         } catch (IOException e) {}
 177     }
 178 
 179     @Override
 180     void shutdownInput() throws IOException {
 181         debug.log(Level.DEBUG, "Shutting down input");
 182         chan.shutdownInput();
 183     }
 184 
 185     @Override
 186     void shutdownOutput() throws IOException {
 187         debug.log(Level.DEBUG, "Shutting down output");
 188         chan.shutdownOutput();
 189     }
 190 



























































 191     @Override
 192     ConnectionPool.CacheKey cacheKey() {
 193         return new ConnectionPool.CacheKey(address, null);
 194     }
 195 
 196     @Override
 197     synchronized boolean connected() {
 198         return connected;
 199     }
 200 















 201 
 202     @Override
 203     boolean isSecure() {
 204         return false;
 205     }
 206 
 207     @Override
 208     boolean isProxied() {
 209         return false;

 210     }
 211 
 212     // Support for WebSocket/RawChannelImpl which unfortunately
 213     // still depends on synchronous read/writes.
 214     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
 215     private static final class PlainDetachedChannel
 216             extends DetachedConnectionChannel {
 217         final PlainHttpConnection plainConnection;
 218         boolean closed;
 219         PlainDetachedChannel(PlainHttpConnection conn) {
 220             // We're handing the connection channel over to a web socket.
 221             // We need the selector manager's thread to stay alive until
 222             // the WebSocket is closed.
 223             conn.client().webSocketOpen();
 224             this.plainConnection = conn;
 225         }
 226 
 227         @Override
 228         SocketChannel channel() {
 229             return plainConnection.channel();





 230         }
 231 
 232         @Override
 233         ByteBuffer read() throws IOException {
 234             ByteBuffer dst = ByteBuffer.allocate(8192);
 235             int n = readImpl(dst);
 236             if (n > 0) {
 237                 return dst;
 238             } else if (n == 0) {
 239                 return Utils.EMPTY_BYTEBUFFER;
 240             } else {
 241                 return null;
 242             }




 243         }
 244 
 245         @Override
 246         public void close() {
 247             HttpClientImpl client = plainConnection.client();
 248             try {
 249                 plainConnection.close();
 250             } finally {
 251                 // notify the HttpClientImpl that the websocket is no
 252                 // no longer operating.
 253                 synchronized(this) {
 254                     if (closed == true) return;
 255                     closed = true;
 256                 }
 257                 client.webSocketClose();
 258             }












 259         }
 260 
 261         @Override
 262         public long write(ByteBuffer[] buffers, int start, int number)
 263                 throws IOException
 264         {
 265             return channel().write(buffers, start, number);
 266         }
 267 
 268         @Override
 269         public void shutdownInput() throws IOException {
 270             plainConnection.shutdownInput();
 271         }
 272 
 273         @Override
 274         public void shutdownOutput() throws IOException {
 275             plainConnection.shutdownOutput();
 276         }
 277 
 278         private int readImpl(ByteBuffer buf) throws IOException {
 279             int mark = buf.position();
 280             int n;
 281             n = channel().read(buf);
 282             if (n == -1) {
 283                 return -1;
 284             }
 285             Utils.flipToMark(buf, mark);
 286             return n;


 287         }




 288     }
 289 
 290     // Support for WebSocket/RawChannelImpl which unfortunately
 291     // still depends on synchronous read/writes.
 292     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
 293     @Override
 294     DetachedConnectionChannel detachChannel() {
 295         client().cancelRegistration(channel());
 296         return new PlainDetachedChannel(this);
 297     }
 298 











 299 }
< prev index next >