< prev index next >

src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java

Print this page




  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  */
  24 package java.net.http;
  25 
  26 import java.io.IOException;
  27 import java.net.InetSocketAddress;
  28 import java.net.StandardSocketOptions;
  29 import java.nio.ByteBuffer;
  30 import java.nio.channels.SelectableChannel;
  31 import java.nio.channels.SelectionKey;
  32 import java.nio.channels.SocketChannel;
  33 import java.util.concurrent.CompletableFuture;

  34 
  35 /**
  36  * Plain raw TCP connection direct to destination







  37  */
  38 class PlainHttpConnection extends HttpConnection {
  39 
  40     protected SocketChannel chan;
  41     private volatile boolean connected;
  42     private boolean closed;





  43 
  44     class ConnectEvent extends AsyncEvent implements AsyncEvent.Blocking {
  45         CompletableFuture<Void> cf;
  46 
  47         ConnectEvent(CompletableFuture<Void> cf) {

  48             this.cf = cf;
  49         }
  50 
  51         @Override
  52         public SelectableChannel channel() {
  53             return chan;
  54         }
  55 
  56         @Override
  57         public int interestOps() {
  58             return SelectionKey.OP_CONNECT;
  59         }
  60 
  61         @Override
  62         public void handle() {
  63             try {
  64                 chan.finishConnect();
  65             } catch (IOException e) {
  66                 cf.completeExceptionally(e);
  67             }


  95     }
  96 
  97     @Override
  98     SocketChannel channel() {
  99         return chan;
 100     }
 101 
 102     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
 103         super(addr, client);
 104         try {
 105             this.chan = SocketChannel.open();
 106             int bufsize = client.getReceiveBufferSize();
 107             chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
 108         } catch (IOException e) {
 109             throw new InternalError(e);
 110         }
 111     }
 112 
 113     @Override
 114     long write(ByteBuffer[] buffers, int start, int number) throws IOException {
 115         //debugPrint("Send", buffers, start, number);
 116         return chan.write(buffers, start, number);







































 117     }
 118 
 119     @Override
 120     long write(ByteBuffer buffer) throws IOException {
 121         //debugPrint("Send", buffer);
 122         return chan.write(buffer);









 123     }
 124 
 125     @Override
 126     public String toString() {
 127         return "PlainHttpConnection: " + super.toString();
 128     }
 129 
 130     /**
 131      * Close this connection
 132      */
 133     @Override
 134     synchronized void close() {
 135         if (closed)
 136             return;
 137         closed = true;
 138         try {
 139             Log.logError("Closing: " + toString());
 140             //System.out.println("Closing: " + this);
 141             chan.close();
 142         } catch (IOException e) {}
 143     }
 144 
 145     @Override
 146     protected ByteBuffer readImpl(int length) throws IOException {
 147         ByteBuffer buf = getBuffer(); // TODO not using length
 148         int n = chan.read(buf);
 149         if (n == -1) {
 150             return null;
 151         }
 152         buf.flip();
 153         String s = "Receive (" + n + " bytes) ";
 154         //debugPrint(s, buf);
 155         return buf;
 156     }
 157 




























 158     @Override
 159     protected int readImpl(ByteBuffer buf) throws IOException {
 160         int mark = buf.position();
 161         int n = chan.read(buf);







 162         if (n == -1) {
 163             return -1;
 164         }
 165         Utils.flipToMark(buffer, mark);
 166         String s = "Receive (" + n + " bytes) ";
 167         //debugPrint(s, buf);
 168         return n;
 169     }
 170 
 171     @Override
 172     ConnectionPool.CacheKey cacheKey() {
 173         return new ConnectionPool.CacheKey(address, null);
 174     }
 175 
 176     @Override
 177     synchronized boolean connected() {
 178         return connected;
 179     }
 180 
 181     class ReceiveResponseEvent extends AsyncEvent implements AsyncEvent.Blocking {
























































 182         CompletableFuture<Void> cf;
 183 
 184         ReceiveResponseEvent(CompletableFuture<Void> cf) {

 185             this.cf = cf;
 186         }
 187         @Override
 188         public SelectableChannel channel() {
 189             return chan;
 190         }
 191 
 192         @Override
 193         public void handle() {
 194             cf.complete(null);
 195         }
 196 
 197         @Override
 198         public int interestOps() {
 199             return SelectionKey.OP_READ;
 200         }
 201 
 202         @Override
 203         public void abort() {
 204             close();
 205         }
 206     }
 207 
 208     @Override
 209     boolean isSecure() {
 210         return false;
 211     }
 212 
 213     @Override
 214     boolean isProxied() {
 215         return false;














 216     }
 217 
 218     @Override
 219     CompletableFuture<Void> whenReceivingResponse() {
 220         CompletableFuture<Void> cf = new CompletableFuture<>();
 221         try {
 222             client.registerEvent(new ReceiveResponseEvent(cf));
 223         } catch (IOException e) {
 224             cf.completeExceptionally(e);
 225         }
 226         return cf;
 227     }
 228 }


  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  */
  24 package java.net.http;
  25 
  26 import java.io.IOException;
  27 import java.net.InetSocketAddress;
  28 import java.net.StandardSocketOptions;
  29 import java.nio.ByteBuffer;
  30 import java.nio.channels.SelectableChannel;
  31 import java.nio.channels.SelectionKey;
  32 import java.nio.channels.SocketChannel;
  33 import java.util.concurrent.CompletableFuture;
  34 import java.util.function.Consumer;
  35 
  36 /**
  37  * Plain raw TCP connection direct to destination. 2 modes
  38  * 1) Blocking used by http/1. In this case the connect is actually non
  39  *    blocking but the request is sent blocking. The first byte of a response
  40  *    is received non-blocking and the remainder of the response is received
  41  *    blocking
  42  * 2) Non-blocking. In this case (for http/2) the connection is actually opened
  43  *    blocking but all reads and writes are done non-blocking under the
  44  *    control of a Http2Connection object.
  45  */
  46 class PlainHttpConnection extends HttpConnection implements AsyncConnection {
  47 
  48     protected SocketChannel chan;
  49     private volatile boolean connected;
  50     private boolean closed;
  51     Consumer<ByteBuffer> asyncReceiver;
  52     Consumer<Throwable> errorReceiver;
  53     Queue<ByteBuffer> asyncOutputQ;
  54     final Object reading = new Object();
  55     final Object writing = new Object();
  56 
  57     class ConnectEvent extends AsyncEvent {
  58         CompletableFuture<Void> cf;
  59 
  60         ConnectEvent(CompletableFuture<Void> cf) {
  61             super(AsyncEvent.BLOCKING);
  62             this.cf = cf;
  63         }
  64 
  65         @Override
  66         public SelectableChannel channel() {
  67             return chan;
  68         }
  69 
  70         @Override
  71         public int interestOps() {
  72             return SelectionKey.OP_CONNECT;
  73         }
  74 
  75         @Override
  76         public void handle() {
  77             try {
  78                 chan.finishConnect();
  79             } catch (IOException e) {
  80                 cf.completeExceptionally(e);
  81             }


 109     }
 110 
 111     @Override
 112     SocketChannel channel() {
 113         return chan;
 114     }
 115 
 116     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
 117         super(addr, client);
 118         try {
 119             this.chan = SocketChannel.open();
 120             int bufsize = client.getReceiveBufferSize();
 121             chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
 122         } catch (IOException e) {
 123             throw new InternalError(e);
 124         }
 125     }
 126 
 127     @Override
 128     long write(ByteBuffer[] buffers, int start, int number) throws IOException {
 129         if (mode != Mode.ASYNC)
 130             return chan.write(buffers, start, number);
 131         // async
 132         synchronized(writing) {
 133             int qlen = asyncOutputQ.size();
 134             ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
 135             long n = Utils.remaining(bufs);
 136             asyncOutputQ.putAll(bufs);
 137             if (qlen == 0)
 138                 asyncOutput();
 139             return n;
 140         }
 141     }
 142 
 143     ByteBuffer asyncBuffer = null;
 144 
 145     void asyncOutput() {
 146         synchronized (writing) {
 147             try {
 148                 while (true) {
 149                     if (asyncBuffer == null) {
 150                         asyncBuffer = asyncOutputQ.poll();
 151                         if (asyncBuffer == null) {
 152                             return;
 153                         }
 154                     }
 155                     if (!asyncBuffer.hasRemaining()) {
 156                         asyncBuffer = null;
 157                         continue;
 158                     }
 159                     int n = chan.write(asyncBuffer);
 160                     //System.err.printf("Written %d bytes to chan\n", n);
 161                     if (n == 0) {
 162                         client.registerEvent(new WriteEvent());
 163                         return;
 164                     }
 165                 }
 166             } catch (IOException e) {
 167                 shutdown();
 168             }
 169         }
 170     }
 171 
 172     @Override
 173     long write(ByteBuffer buffer) throws IOException {
 174         if (mode != Mode.ASYNC)
 175             return chan.write(buffer);
 176         // async
 177         synchronized(writing) {
 178             int qlen = asyncOutputQ.size();
 179             long n = buffer.remaining();
 180             asyncOutputQ.put(buffer);
 181             if (qlen == 0)
 182                 asyncOutput();
 183             return n;
 184         }
 185     }
 186 
 187     @Override
 188     public String toString() {
 189         return "PlainHttpConnection: " + super.toString();
 190     }
 191 
 192     /**
 193      * Close this connection
 194      */
 195     @Override
 196     public synchronized void close() {
 197         if (closed)
 198             return;
 199         closed = true;
 200         try {
 201             Log.logError("Closing: " + toString());
 202             //System.out.println("Closing: " + this);
 203             chan.close();
 204         } catch (IOException e) {}
 205     }
 206 
 207     @Override
 208     protected ByteBuffer readImpl(int length) throws IOException {
 209         ByteBuffer buf = getBuffer(); // TODO not using length
 210         int n = chan.read(buf);
 211         if (n == -1) {
 212             return null;
 213         }
 214         buf.flip();
 215         String s = "Receive (" + n + " bytes) ";
 216         //debugPrint(s, buf);
 217         return buf;
 218     }
 219 
 220     void shutdown() {
 221         close();
 222         errorReceiver.accept(new IOException("Connection aborted"));
 223     }
 224 
 225     void asyncRead() {
 226         synchronized (reading) {
 227             try {
 228                 while (true) {
 229                     ByteBuffer buf = getBuffer();
 230                     int n = chan.read(buf);
 231                     //System.err.printf("Read %d bytes from chan\n", n);
 232                     if (n == -1) {
 233                         throw new IOException();
 234                     }
 235                     if (n == 0) {
 236                         returnBuffer(buf);
 237                         return;
 238                     }
 239                     buf.flip();
 240                     asyncReceiver.accept(buf);
 241                 }
 242             } catch (IOException e) {
 243                 shutdown();
 244             }
 245         }
 246     }
 247 
 248     @Override
 249     protected int readImpl(ByteBuffer buf) throws IOException {
 250         int mark = buf.position();
 251         int n;
 252         // FIXME: this hack works in conjunction with the corresponding change
 253         // in java.net.http.RawChannel.registerEvent
 254         if ((n = buffer.remaining()) != 0) {
 255             buf.put(buffer);
 256         } else {
 257             n = chan.read(buf);
 258         }
 259         if (n == -1) {
 260             return -1;
 261         }
 262         Utils.flipToMark(buf, mark);
 263         String s = "Receive (" + n + " bytes) ";
 264         //debugPrint(s, buf);
 265         return n;
 266     }
 267 
 268     @Override
 269     ConnectionPool.CacheKey cacheKey() {
 270         return new ConnectionPool.CacheKey(address, null);
 271     }
 272 
 273     @Override
 274     synchronized boolean connected() {
 275         return connected;
 276     }
 277 
 278     // used for all output in HTTP/2
 279     class WriteEvent extends AsyncEvent {
 280         WriteEvent() {
 281             super(0);
 282         }
 283 
 284         @Override
 285         public SelectableChannel channel() {
 286             return chan;
 287         }
 288 
 289         @Override
 290         public int interestOps() {
 291             return SelectionKey.OP_WRITE;
 292         }
 293 
 294         @Override
 295         public void handle() {
 296             asyncOutput();
 297         }
 298 
 299         @Override
 300         public void abort() {
 301             shutdown();
 302         }
 303     }
 304 
 305     // used for all input in HTTP/2
 306     class ReadEvent extends AsyncEvent {
 307         ReadEvent() {
 308             super(AsyncEvent.REPEATING); // && !BLOCKING
 309         }
 310 
 311         @Override
 312         public SelectableChannel channel() {
 313             return chan;
 314         }
 315 
 316         @Override
 317         public int interestOps() {
 318             return SelectionKey.OP_READ;
 319         }
 320 
 321         @Override
 322         public void handle() {
 323             asyncRead();
 324         }
 325 
 326         @Override
 327         public void abort() {
 328             shutdown();
 329         }
 330 
 331     }
 332 
 333     // used in blocking channels only
 334     class ReceiveResponseEvent extends AsyncEvent {
 335         CompletableFuture<Void> cf;
 336 
 337         ReceiveResponseEvent(CompletableFuture<Void> cf) {
 338             super(AsyncEvent.BLOCKING);
 339             this.cf = cf;
 340         }
 341         @Override
 342         public SelectableChannel channel() {
 343             return chan;
 344         }
 345 
 346         @Override
 347         public void handle() {
 348             cf.complete(null);
 349         }
 350 
 351         @Override
 352         public int interestOps() {
 353             return SelectionKey.OP_READ;
 354         }
 355 
 356         @Override
 357         public void abort() {
 358             close();
 359         }
 360     }
 361 
 362     @Override
 363     boolean isSecure() {
 364         return false;
 365     }
 366 
 367     @Override
 368     boolean isProxied() {
 369         return false;
 370     }
 371 
 372     @Override
 373     public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver,
 374             Consumer<Throwable> errorReceiver) {
 375         this.asyncReceiver = asyncReceiver;
 376         this.errorReceiver = errorReceiver;
 377         asyncOutputQ = new Queue<>();
 378         asyncOutputQ.registerPutCallback(this::asyncOutput);
 379         try {
 380             client.registerEvent(new ReadEvent());
 381         } catch (IOException e) {
 382             shutdown();
 383         }
 384     }
 385 
 386     @Override
 387     CompletableFuture<Void> whenReceivingResponse() {
 388         CompletableFuture<Void> cf = new CompletableFuture<>();
 389         try {
 390             client.registerEvent(new ReceiveResponseEvent(cf));
 391         } catch (IOException e) {
 392             cf.completeExceptionally(e);
 393         }
 394         return cf;
 395     }
 396 }
< prev index next >