< prev index next >
   1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  */
  24 package java.net.http;
  25 
  26 import java.io.IOException;
  27 import java.net.InetSocketAddress;
  28 import java.net.StandardSocketOptions;
  29 import java.nio.ByteBuffer;
  30 import java.nio.channels.SelectableChannel;
  31 import java.nio.channels.SelectionKey;
  32 import java.nio.channels.SocketChannel;
  33 import java.util.concurrent.CompletableFuture;
  34 import java.util.function.Consumer;
  35 
  36 /**
  37  * Plain raw TCP connection direct to destination. 2 modes
  38  * 1) Blocking used by http/1. In this case the connect is actually non
  39  *    blocking but the request is sent blocking. The first byte of a response
  40  *    is received non-blocking and the remainder of the response is received
  41  *    blocking
  42  * 2) Non-blocking. In this case (for http/2) the connection is actually opened
  43  *    blocking but all reads and writes are done non-blocking under the
  44  *    control of a Http2Connection object.
  45  */
  46 class PlainHttpConnection extends HttpConnection implements AsyncConnection {
  47 
  48     protected SocketChannel chan;
  49     private volatile boolean connected;
  50     private boolean closed;
  51     Consumer<ByteBuffer> asyncReceiver;
  52     Consumer<Throwable> errorReceiver;
  53     Queue<ByteBuffer> asyncOutputQ;
  54     final Object reading = new Object();
  55     final Object writing = new Object();
  56 
  57     @Override
  58     public void startReading() {
  59         try {
  60             client.registerEvent(new ReadEvent());
  61         } catch (IOException e) {
  62             shutdown();
  63         }
  64     }
  65 
  66     class ConnectEvent extends AsyncEvent {
  67         CompletableFuture<Void> cf;
  68 
  69         ConnectEvent(CompletableFuture<Void> cf) {
  70             super(AsyncEvent.BLOCKING);
  71             this.cf = cf;
  72         }
  73 
  74         @Override
  75         public SelectableChannel channel() {
  76             return chan;
  77         }
  78 
  79         @Override
  80         public int interestOps() {
  81             return SelectionKey.OP_CONNECT;
  82         }
  83 
  84         @Override
  85         public void handle() {
  86             try {
  87                 chan.finishConnect();
  88             } catch (IOException e) {
  89                 cf.completeExceptionally(e);
  90             }
  91             connected = true;
  92             cf.complete(null);
  93         }
  94 
  95         @Override
  96         public void abort() {
  97             close();
  98         }
  99     }
 100 
 101     @Override
 102     public CompletableFuture<Void> connectAsync() {
 103         CompletableFuture<Void> plainFuture = new CompletableFuture<>();
 104         try {
 105             chan.configureBlocking(false);
 106             chan.connect(address);
 107             client.registerEvent(new ConnectEvent(plainFuture));
 108         } catch (IOException e) {
 109             plainFuture.completeExceptionally(e);
 110         }
 111         return plainFuture;
 112     }
 113 
 114     @Override
 115     public void connect() throws IOException {
 116         chan.connect(address);
 117         connected = true;
 118     }
 119 
 120     @Override
 121     SocketChannel channel() {
 122         return chan;
 123     }
 124 
 125     PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
 126         super(addr, client);
 127         try {
 128             this.chan = SocketChannel.open();
 129             int bufsize = client.getReceiveBufferSize();
 130             chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
 131         } catch (IOException e) {
 132             throw new InternalError(e);
 133         }
 134     }
 135 
 136     @Override
 137     long write(ByteBuffer[] buffers, int start, int number) throws IOException {
 138         if (mode != Mode.ASYNC)
 139             return chan.write(buffers, start, number);
 140         // async
 141         synchronized(writing) {
 142             int qlen = asyncOutputQ.size();
 143             ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
 144             long n = Utils.remaining(bufs);
 145             asyncOutputQ.putAll(bufs);
 146             if (qlen == 0)
 147                 asyncOutput();
 148             return n;
 149         }
 150     }
 151 
 152     ByteBuffer asyncBuffer = null;
 153 
 154     void asyncOutput() {
 155         synchronized (writing) {
 156             try {
 157                 while (true) {
 158                     if (asyncBuffer == null) {
 159                         asyncBuffer = asyncOutputQ.poll();
 160                         if (asyncBuffer == null) {
 161                             return;
 162                         }
 163                     }
 164                     if (!asyncBuffer.hasRemaining()) {
 165                         asyncBuffer = null;
 166                         continue;
 167                     }
 168                     int n = chan.write(asyncBuffer);
 169                     //System.err.printf("Written %d bytes to chan\n", n);
 170                     if (n == 0) {
 171                         client.registerEvent(new WriteEvent());
 172                         return;
 173                     }
 174                 }
 175             } catch (IOException e) {
 176                 shutdown();
 177             }
 178         }
 179     }
 180 
 181     @Override
 182     long write(ByteBuffer buffer) throws IOException {
 183         if (mode != Mode.ASYNC)
 184             return chan.write(buffer);
 185         // async
 186         synchronized(writing) {
 187             int qlen = asyncOutputQ.size();
 188             long n = buffer.remaining();
 189             asyncOutputQ.put(buffer);
 190             if (qlen == 0)
 191                 asyncOutput();
 192             return n;
 193         }
 194     }
 195 
 196     @Override
 197     public String toString() {
 198         return "PlainHttpConnection: " + super.toString();
 199     }
 200 
 201     /**
 202      * Close this connection
 203      */
 204     @Override
 205     public synchronized void close() {
 206         if (closed)
 207             return;
 208         closed = true;
 209         try {
 210             Log.logError("Closing: " + toString());
 211             //System.out.println("Closing: " + this);
 212             chan.close();
 213         } catch (IOException e) {}
 214     }
 215 
 216     @Override
 217     protected ByteBuffer readImpl(int length) throws IOException {
 218         ByteBuffer buf = getBuffer(); // TODO not using length
 219         int n = chan.read(buf);
 220         if (n == -1) {
 221             return null;
 222         }
 223         buf.flip();
 224         String s = "Receive (" + n + " bytes) ";
 225         //debugPrint(s, buf);
 226         return buf;
 227     }
 228 
 229     void shutdown() {
 230         close();
 231         errorReceiver.accept(new IOException("Connection aborted"));
 232     }
 233 
 234     void asyncRead() {
 235         synchronized (reading) {
 236             try {
 237                 while (true) {
 238                     ByteBuffer buf = getBuffer();
 239                     int n = chan.read(buf);
 240                     //System.err.printf("Read %d bytes from chan\n", n);
 241                     if (n == -1) {
 242                         throw new IOException();
 243                     }
 244                     if (n == 0) {
 245                         returnBuffer(buf);
 246                         return;
 247                     }
 248                     buf.flip();
 249                     asyncReceiver.accept(buf);
 250                 }
 251             } catch (IOException e) {
 252                 shutdown();
 253             }
 254         }
 255     }
 256 
 257     @Override
 258     protected int readImpl(ByteBuffer buf) throws IOException {
 259         int mark = buf.position();
 260         int n;
 261         // FIXME: this hack works in conjunction with the corresponding change
 262         // in java.net.http.RawChannel.registerEvent
 263         if ((n = buffer.remaining()) != 0) {
 264             buf.put(buffer);
 265         } else {
 266             n = chan.read(buf);
 267         }
 268         if (n == -1) {
 269             return -1;
 270         }
 271         Utils.flipToMark(buf, mark);
 272         String s = "Receive (" + n + " bytes) ";
 273         //debugPrint(s, buf);
 274         return n;
 275     }
 276 
 277     @Override
 278     ConnectionPool.CacheKey cacheKey() {
 279         return new ConnectionPool.CacheKey(address, null);
 280     }
 281 
 282     @Override
 283     synchronized boolean connected() {
 284         return connected;
 285     }
 286 
 287     // used for all output in HTTP/2
 288     class WriteEvent extends AsyncEvent {
 289         WriteEvent() {
 290             super(0);
 291         }
 292 
 293         @Override
 294         public SelectableChannel channel() {
 295             return chan;
 296         }
 297 
 298         @Override
 299         public int interestOps() {
 300             return SelectionKey.OP_WRITE;
 301         }
 302 
 303         @Override
 304         public void handle() {
 305             asyncOutput();
 306         }
 307 
 308         @Override
 309         public void abort() {
 310             shutdown();
 311         }
 312     }
 313 
 314     // used for all input in HTTP/2
 315     class ReadEvent extends AsyncEvent {
 316         ReadEvent() {
 317             super(AsyncEvent.REPEATING); // && !BLOCKING
 318         }
 319 
 320         @Override
 321         public SelectableChannel channel() {
 322             return chan;
 323         }
 324 
 325         @Override
 326         public int interestOps() {
 327             return SelectionKey.OP_READ;
 328         }
 329 
 330         @Override
 331         public void handle() {
 332             asyncRead();
 333         }
 334 
 335         @Override
 336         public void abort() {
 337             shutdown();
 338         }
 339 
 340     }
 341 
 342     // used in blocking channels only
 343     class ReceiveResponseEvent extends AsyncEvent {
 344         CompletableFuture<Void> cf;
 345 
 346         ReceiveResponseEvent(CompletableFuture<Void> cf) {
 347             super(AsyncEvent.BLOCKING);
 348             this.cf = cf;
 349         }
 350         @Override
 351         public SelectableChannel channel() {
 352             return chan;
 353         }
 354 
 355         @Override
 356         public void handle() {
 357             cf.complete(null);
 358         }
 359 
 360         @Override
 361         public int interestOps() {
 362             return SelectionKey.OP_READ;
 363         }
 364 
 365         @Override
 366         public void abort() {
 367             close();
 368         }
 369     }
 370 
 371     @Override
 372     boolean isSecure() {
 373         return false;
 374     }
 375 
 376     @Override
 377     boolean isProxied() {
 378         return false;
 379     }
 380 
 381     @Override
 382     public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver,
 383             Consumer<Throwable> errorReceiver) {
 384         this.asyncReceiver = asyncReceiver;
 385         this.errorReceiver = errorReceiver;
 386         asyncOutputQ = new Queue<>();
 387         asyncOutputQ.registerPutCallback(this::asyncOutput);
 388     }
 389 
 390     @Override
 391     CompletableFuture<Void> whenReceivingResponse() {
 392         CompletableFuture<Void> cf = new CompletableFuture<>();
 393         try {
 394             client.registerEvent(new ReceiveResponseEvent(cf));
 395         } catch (IOException e) {
 396             cf.completeExceptionally(e);
 397         }
 398         return cf;
 399     }
 400 }
< prev index next >