< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java

Print this page




  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.net.URI;
  30 import java.nio.ByteBuffer;
  31 import java.util.ArrayList;

  32 import java.util.List;
  33 import java.util.Optional;
  34 import java.util.concurrent.CompletableFuture;
  35 import java.util.concurrent.CompletionException;
  36 import java.util.concurrent.ExecutionException;
  37 import java.util.concurrent.Executor;
  38 import java.util.concurrent.Flow;
  39 import java.util.concurrent.Flow.Subscription;
  40 import java.util.concurrent.TimeUnit;
  41 import java.util.concurrent.TimeoutException;
  42 import java.util.function.Consumer;
  43 
  44 import jdk.incubator.http.internal.common.*;
  45 import jdk.incubator.http.internal.frame.*;
  46 import jdk.incubator.http.internal.hpack.DecodingCallback;

  47 
  48 /**
  49  * Http/2 Stream handling.
  50  *
  51  * REQUESTS
  52  *
  53  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  54  *
  55  * sendRequest() -- sendHeadersOnly() + sendBody()
  56  *
  57  * sendBody() -- in calling thread: obeys all flow control (so may block)
  58  *               obtains data from request body processor and places on connection
  59  *               outbound Q.
  60  *
  61  * sendBodyAsync() -- calls sendBody() in an executor thread.
  62  *
  63  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  64  *
  65  * sendRequestAsync() -- calls sendRequest() in an executor thread
  66  *


 105      * the stream's first frame is sent.
 106      */
 107     protected volatile int streamid;
 108 
 109     long responseContentLen = -1;
 110     long responseBytesProcessed = 0;
 111     long requestContentLen;
 112 
 113     final Http2Connection connection;
 114     HttpClientImpl client;
 115     final HttpRequestImpl request;
 116     final DecodingCallback rspHeadersConsumer;
 117     HttpHeadersImpl responseHeaders;
 118     final HttpHeadersImpl requestHeaders;
 119     final HttpHeadersImpl requestPseudoHeaders;
 120     HttpResponse.BodyProcessor<T> responseProcessor;
 121     final HttpRequest.BodyProcessor requestProcessor;
 122     volatile int responseCode;
 123     volatile Response response;
 124     volatile CompletableFuture<Response> responseCF;
 125     final AbstractPushPublisher<ByteBuffer> publisher;
 126     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
 127 
 128     /** True if END_STREAM has been seen in a frame received on this stream. */
 129     private volatile boolean remotelyClosed;
 130     private volatile boolean closed;
 131     private volatile boolean endStreamSent;
 132 
 133     // state flags
 134     boolean requestSent, responseReceived, responseHeadersReceived;
 135 
 136     /**
 137      * A reference to this Stream's connection Send Window controller. The
 138      * stream MUST acquire the appropriate amount of Send Window before
 139      * sending any data. Will be null for PushStreams, as they cannot send data.
 140      */
 141     private final WindowController windowController;
 142     private final WindowUpdateSender windowUpdater;
 143 
 144     @Override
 145     HttpConnection connection() {


 183         StringBuilder sb = new StringBuilder();
 184         sb.append("streamid: ")
 185                 .append(streamid);
 186         return sb.toString();
 187     }
 188 
 189     private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
 190         if (frame instanceof ResetFrame) {
 191             handleReset((ResetFrame) frame);
 192             return true;
 193         } else if (!(frame instanceof DataFrame)) {
 194             assert false;
 195             return true;
 196         }
 197         DataFrame df = (DataFrame) frame;
 198         // RFC 7540 6.1:
 199         // The entire DATA frame payload is included in flow control,
 200         // including the Pad Length and Padding fields if present
 201         int len = df.payloadLength();
 202         ByteBufferReference[] buffers = df.getData();
 203         for (ByteBufferReference b : buffers) {
 204             ByteBuffer buf = b.get();
 205             if (buf.hasRemaining()) {
 206                 publisher.acceptData(Optional.of(buf));
 207             }

 208         }
 209         connection.windowUpdater.update(len);
 210         if (df.getFlag(DataFrame.END_STREAM)) {
 211             setEndStreamReceived();
 212             publisher.acceptData(Optional.empty());
 213             return false;
 214         }
 215         // Don't send window update on a stream which is
 216         // closed or half closed.
 217         windowUpdater.update(len);
 218         return true;
 219     }
 220 
 221     // pushes entire response body into response processor
 222     // blocking when required by local or remote flow control
 223     CompletableFuture<T> receiveData(Executor executor) {
 224         CompletableFuture<T> cf = responseProcessor
 225                 .getBody()
 226                 .toCompletableFuture();
 227         Consumer<Throwable> onError = e -> {




  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.net.URI;
  30 import java.nio.ByteBuffer;
  31 import java.util.ArrayList;
  32 import java.util.Arrays;
  33 import java.util.List;
  34 import java.util.Optional;
  35 import java.util.concurrent.CompletableFuture;
  36 import java.util.concurrent.CompletionException;
  37 import java.util.concurrent.ExecutionException;
  38 import java.util.concurrent.Executor;
  39 import java.util.concurrent.Flow;
  40 import java.util.concurrent.Flow.Subscription;
  41 import java.util.concurrent.TimeUnit;
  42 import java.util.concurrent.TimeoutException;
  43 import java.util.function.Consumer;
  44 
  45 import jdk.incubator.http.internal.common.*;
  46 import jdk.incubator.http.internal.frame.*;
  47 import jdk.incubator.http.internal.hpack.DecodingCallback;
  48 import static java.util.stream.Collectors.toList;
  49 
  50 /**
  51  * Http/2 Stream handling.
  52  *
  53  * REQUESTS
  54  *
  55  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  56  *
  57  * sendRequest() -- sendHeadersOnly() + sendBody()
  58  *
  59  * sendBody() -- in calling thread: obeys all flow control (so may block)
  60  *               obtains data from request body processor and places on connection
  61  *               outbound Q.
  62  *
  63  * sendBodyAsync() -- calls sendBody() in an executor thread.
  64  *
  65  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  66  *
  67  * sendRequestAsync() -- calls sendRequest() in an executor thread
  68  *


 107      * the stream's first frame is sent.
 108      */
 109     protected volatile int streamid;
 110 
 111     long responseContentLen = -1;
 112     long responseBytesProcessed = 0;
 113     long requestContentLen;
 114 
 115     final Http2Connection connection;
 116     HttpClientImpl client;
 117     final HttpRequestImpl request;
 118     final DecodingCallback rspHeadersConsumer;
 119     HttpHeadersImpl responseHeaders;
 120     final HttpHeadersImpl requestHeaders;
 121     final HttpHeadersImpl requestPseudoHeaders;
 122     HttpResponse.BodyProcessor<T> responseProcessor;
 123     final HttpRequest.BodyProcessor requestProcessor;
 124     volatile int responseCode;
 125     volatile Response response;
 126     volatile CompletableFuture<Response> responseCF;
 127     final AbstractPushPublisher<List<ByteBuffer>> publisher;
 128     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
 129 
 130     /** True if END_STREAM has been seen in a frame received on this stream. */
 131     private volatile boolean remotelyClosed;
 132     private volatile boolean closed;
 133     private volatile boolean endStreamSent;
 134 
 135     // state flags
 136     boolean requestSent, responseReceived, responseHeadersReceived;
 137 
 138     /**
 139      * A reference to this Stream's connection Send Window controller. The
 140      * stream MUST acquire the appropriate amount of Send Window before
 141      * sending any data. Will be null for PushStreams, as they cannot send data.
 142      */
 143     private final WindowController windowController;
 144     private final WindowUpdateSender windowUpdater;
 145 
 146     @Override
 147     HttpConnection connection() {


 185         StringBuilder sb = new StringBuilder();
 186         sb.append("streamid: ")
 187                 .append(streamid);
 188         return sb.toString();
 189     }
 190 
 191     private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
 192         if (frame instanceof ResetFrame) {
 193             handleReset((ResetFrame) frame);
 194             return true;
 195         } else if (!(frame instanceof DataFrame)) {
 196             assert false;
 197             return true;
 198         }
 199         DataFrame df = (DataFrame) frame;
 200         // RFC 7540 6.1:
 201         // The entire DATA frame payload is included in flow control,
 202         // including the Pad Length and Padding fields if present
 203         int len = df.payloadLength();
 204         ByteBufferReference[] buffers = df.getData();
 205         List<ByteBuffer> dsts = Arrays.stream(buffers)
 206                 .map(ByteBufferReference::get)
 207                 .filter(ByteBuffer::hasRemaining)
 208                 .collect(toList());
 209         if (!dsts.isEmpty()) {
 210             publisher.acceptData(Optional.of(dsts));
 211         }
 212         connection.windowUpdater.update(len);
 213         if (df.getFlag(DataFrame.END_STREAM)) {
 214             setEndStreamReceived();
 215             publisher.acceptData(Optional.empty());
 216             return false;
 217         }
 218         // Don't send window update on a stream which is
 219         // closed or half closed.
 220         windowUpdater.update(len);
 221         return true;
 222     }
 223 
 224     // pushes entire response body into response processor
 225     // blocking when required by local or remote flow control
 226     CompletableFuture<T> receiveData(Executor executor) {
 227         CompletableFuture<T> cf = responseProcessor
 228                 .getBody()
 229                 .toCompletableFuture();
 230         Consumer<Throwable> onError = e -> {


< prev index next >