12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package jdk.incubator.http;
27
28 import java.io.IOException;
29 import java.net.URI;
30 import java.nio.ByteBuffer;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Optional;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CompletionException;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.Flow;
39 import java.util.concurrent.Flow.Subscription;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.TimeoutException;
42 import java.util.function.Consumer;
43
44 import jdk.incubator.http.internal.common.*;
45 import jdk.incubator.http.internal.frame.*;
46 import jdk.incubator.http.internal.hpack.DecodingCallback;
47
48 /**
49 * Http/2 Stream handling.
50 *
51 * REQUESTS
52 *
53 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
54 *
55 * sendRequest() -- sendHeadersOnly() + sendBody()
56 *
57 * sendBody() -- in calling thread: obeys all flow control (so may block)
58 * obtains data from request body processor and places on connection
59 * outbound Q.
60 *
61 * sendBodyAsync() -- calls sendBody() in an executor thread.
62 *
63 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
64 *
65 * sendRequestAsync() -- calls sendRequest() in an executor thread
66 *
105 * the stream's first frame is sent.
106 */
107 protected volatile int streamid;
108
109 long responseContentLen = -1;
110 long responseBytesProcessed = 0;
111 long requestContentLen;
112
113 final Http2Connection connection;
114 HttpClientImpl client;
115 final HttpRequestImpl request;
116 final DecodingCallback rspHeadersConsumer;
117 HttpHeadersImpl responseHeaders;
118 final HttpHeadersImpl requestHeaders;
119 final HttpHeadersImpl requestPseudoHeaders;
120 HttpResponse.BodyProcessor<T> responseProcessor;
121 final HttpRequest.BodyProcessor requestProcessor;
122 volatile int responseCode;
123 volatile Response response;
124 volatile CompletableFuture<Response> responseCF;
125 final AbstractPushPublisher<ByteBuffer> publisher;
126 final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
127
128 /** True if END_STREAM has been seen in a frame received on this stream. */
129 private volatile boolean remotelyClosed;
130 private volatile boolean closed;
131 private volatile boolean endStreamSent;
132
133 // state flags
134 boolean requestSent, responseReceived, responseHeadersReceived;
135
136 /**
137 * A reference to this Stream's connection Send Window controller. The
138 * stream MUST acquire the appropriate amount of Send Window before
139 * sending any data. Will be null for PushStreams, as they cannot send data.
140 */
141 private final WindowController windowController;
142 private final WindowUpdateSender windowUpdater;
143
144 @Override
145 HttpConnection connection() {
183 StringBuilder sb = new StringBuilder();
184 sb.append("streamid: ")
185 .append(streamid);
186 return sb.toString();
187 }
188
189 private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
190 if (frame instanceof ResetFrame) {
191 handleReset((ResetFrame) frame);
192 return true;
193 } else if (!(frame instanceof DataFrame)) {
194 assert false;
195 return true;
196 }
197 DataFrame df = (DataFrame) frame;
198 // RFC 7540 6.1:
199 // The entire DATA frame payload is included in flow control,
200 // including the Pad Length and Padding fields if present
201 int len = df.payloadLength();
202 ByteBufferReference[] buffers = df.getData();
203 for (ByteBufferReference b : buffers) {
204 ByteBuffer buf = b.get();
205 if (buf.hasRemaining()) {
206 publisher.acceptData(Optional.of(buf));
207 }
208 }
209 connection.windowUpdater.update(len);
210 if (df.getFlag(DataFrame.END_STREAM)) {
211 setEndStreamReceived();
212 publisher.acceptData(Optional.empty());
213 return false;
214 }
215 // Don't send window update on a stream which is
216 // closed or half closed.
217 windowUpdater.update(len);
218 return true;
219 }
220
221 // pushes entire response body into response processor
222 // blocking when required by local or remote flow control
223 CompletableFuture<T> receiveData(Executor executor) {
224 CompletableFuture<T> cf = responseProcessor
225 .getBody()
226 .toCompletableFuture();
227 Consumer<Throwable> onError = e -> {
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package jdk.incubator.http;
27
28 import java.io.IOException;
29 import java.net.URI;
30 import java.nio.ByteBuffer;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.List;
34 import java.util.Optional;
35 import java.util.concurrent.CompletableFuture;
36 import java.util.concurrent.CompletionException;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.Flow;
40 import java.util.concurrent.Flow.Subscription;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.function.Consumer;
44
45 import jdk.incubator.http.internal.common.*;
46 import jdk.incubator.http.internal.frame.*;
47 import jdk.incubator.http.internal.hpack.DecodingCallback;
48 import static java.util.stream.Collectors.toList;
49
50 /**
51 * Http/2 Stream handling.
52 *
53 * REQUESTS
54 *
55 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
56 *
57 * sendRequest() -- sendHeadersOnly() + sendBody()
58 *
59 * sendBody() -- in calling thread: obeys all flow control (so may block)
60 * obtains data from request body processor and places on connection
61 * outbound Q.
62 *
63 * sendBodyAsync() -- calls sendBody() in an executor thread.
64 *
65 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
66 *
67 * sendRequestAsync() -- calls sendRequest() in an executor thread
68 *
107 * the stream's first frame is sent.
108 */
109 protected volatile int streamid;
110
111 long responseContentLen = -1;
112 long responseBytesProcessed = 0;
113 long requestContentLen;
114
115 final Http2Connection connection;
116 HttpClientImpl client;
117 final HttpRequestImpl request;
118 final DecodingCallback rspHeadersConsumer;
119 HttpHeadersImpl responseHeaders;
120 final HttpHeadersImpl requestHeaders;
121 final HttpHeadersImpl requestPseudoHeaders;
122 HttpResponse.BodyProcessor<T> responseProcessor;
123 final HttpRequest.BodyProcessor requestProcessor;
124 volatile int responseCode;
125 volatile Response response;
126 volatile CompletableFuture<Response> responseCF;
127 final AbstractPushPublisher<List<ByteBuffer>> publisher;
128 final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
129
130 /** True if END_STREAM has been seen in a frame received on this stream. */
131 private volatile boolean remotelyClosed;
132 private volatile boolean closed;
133 private volatile boolean endStreamSent;
134
135 // state flags
136 boolean requestSent, responseReceived, responseHeadersReceived;
137
138 /**
139 * A reference to this Stream's connection Send Window controller. The
140 * stream MUST acquire the appropriate amount of Send Window before
141 * sending any data. Will be null for PushStreams, as they cannot send data.
142 */
143 private final WindowController windowController;
144 private final WindowUpdateSender windowUpdater;
145
146 @Override
147 HttpConnection connection() {
185 StringBuilder sb = new StringBuilder();
186 sb.append("streamid: ")
187 .append(streamid);
188 return sb.toString();
189 }
190
191 private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
192 if (frame instanceof ResetFrame) {
193 handleReset((ResetFrame) frame);
194 return true;
195 } else if (!(frame instanceof DataFrame)) {
196 assert false;
197 return true;
198 }
199 DataFrame df = (DataFrame) frame;
200 // RFC 7540 6.1:
201 // The entire DATA frame payload is included in flow control,
202 // including the Pad Length and Padding fields if present
203 int len = df.payloadLength();
204 ByteBufferReference[] buffers = df.getData();
205 List<ByteBuffer> dsts = Arrays.stream(buffers)
206 .map(ByteBufferReference::get)
207 .filter(ByteBuffer::hasRemaining)
208 .collect(toList());
209 if (!dsts.isEmpty()) {
210 publisher.acceptData(Optional.of(dsts));
211 }
212 connection.windowUpdater.update(len);
213 if (df.getFlag(DataFrame.END_STREAM)) {
214 setEndStreamReceived();
215 publisher.acceptData(Optional.empty());
216 return false;
217 }
218 // Don't send window update on a stream which is
219 // closed or half closed.
220 windowUpdater.update(len);
221 return true;
222 }
223
224 // pushes entire response body into response processor
225 // blocking when required by local or remote flow control
226 CompletableFuture<T> receiveData(Executor executor) {
227 CompletableFuture<T> cf = responseProcessor
228 .getBody()
229 .toCompletableFuture();
230 Consumer<Throwable> onError = e -> {
|