77 * stream inputQ. Obeys remote and local flow control so may block.
78 * Calls user response body processor with data buffers.
79 *
80 * responseBodyAsync() -- calls responseBody() in an executor thread.
81 *
82 * incoming() -- entry point called from connection reader thread. Frames are
83 * either handled immediately without blocking or for data frames
84 * placed on the stream's inputQ which is consumed by the stream's
85 * reader thread.
86 *
87 * PushedStream sub class
88 * ======================
89 * Sending side methods are not used because the request comes from a PUSH_PROMISE
90 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
91 * is created. PushedStream does not use responseCF list as there can be only
92 * one response. The CF is created when the object created and when the response
93 * HEADERS frame is received the object is completed.
94 */
95 class Stream extends ExchangeImpl {
96
97 final Queue<Http2Frame> inputQ;
98
99 volatile int streamid;
100
101 long responseContentLen = -1;
102 long responseBytesProcessed = 0;
103 long requestContentLen;
104
105 Http2Connection connection;
106 HttpClientImpl client;
107 final HttpRequestImpl request;
108 final DecodingCallback rspHeadersConsumer;
109 HttpHeadersImpl responseHeaders;
110 final HttpHeadersImpl requestHeaders;
111 final HttpHeadersImpl requestPseudoHeaders;
112 HttpResponse.BodyProcessor<?> responseProcessor;
113 final HttpRequest.BodyProcessor requestProcessor;
114 HttpResponse response;
115
116 // state flags
117 boolean requestSent, responseReceived;
118
119 final FlowController userRequestFlowController =
120 new FlowController();
121 final FlowController remoteRequestFlowController =
122 new FlowController();
123 final FlowController responseFlowController =
124 new FlowController();
125
126 final ExecutorWrapper executor;
127
128 @Override
129 @SuppressWarnings("unchecked")
130 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
131 this.responseProcessor = processor;
132 CompletableFuture<T> cf;
133 try {
134 T body = processor.onResponseBodyStart(
135 responseContentLen, responseHeaders,
136 responseFlowController); // TODO: filter headers
137 if (body != null) {
138 cf = CompletableFuture.completedFuture(body);
139 receiveDataAsync(processor);
140 } else
141 cf = receiveDataAsync(processor);
142 } catch (IOException e) {
143 cf = CompletableFuture.failedFuture(e);
144 }
145 PushGroup<?> pg = request.pushGroup();
146 if (pg != null) {
147 // if an error occurs make sure it is recorded in the PushGroup
148 cf = cf.whenComplete((t,e) -> pg.pushError(e));
149 }
150 return cf;
151 }
152
153 @Override
154 public String toString() {
155 StringBuilder sb = new StringBuilder();
156 sb.append("streamid: ")
157 .append(streamid);
158 return sb.toString();
159 }
160
161 // pushes entire response body into response processor
162 // blocking when required by local or remote flow control
163 void receiveData() throws IOException {
164 Http2Frame frame;
165 DataFrame df = null;
166 try {
167 do {
168 frame = inputQ.take();
169 if (!(frame instanceof DataFrame)) {
170 assert false;
171 continue;
172 }
173 df = (DataFrame) frame;
174 int len = df.getDataLength();
175 ByteBuffer[] buffers = df.getData();
176 for (ByteBuffer b : buffers) {
177 responseFlowController.take();
178 responseProcessor.onResponseBodyChunk(b);
179 }
180 sendWindowUpdate(len);
181 } while (!df.getFlag(DataFrame.END_STREAM));
182 } catch (InterruptedException e) {
183 throw new IOException(e);
184 }
185 }
186
187 private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
188 CompletableFuture<T> cf = new CompletableFuture<>();
189 executor.execute(() -> {
190 try {
191 receiveData();
192 T body = processor.onResponseComplete();
193 cf.complete(body);
194 responseReceived();
195 } catch (Throwable t) {
196 cf.completeExceptionally(t);
197 }
198 }, null);
199 return cf;
200 }
201
202 private void sendWindowUpdate(int increment)
203 throws IOException, InterruptedException {
204 if (increment == 0)
205 return;
206 LinkedList<Http2Frame> list = new LinkedList<>();
207 WindowUpdateFrame frame = new WindowUpdateFrame();
208 frame.streamid(streamid);
209 frame.setUpdate(increment);
210 list.add(frame);
211 frame = new WindowUpdateFrame();
212 frame.streamid(0);
213 frame.setUpdate(increment);
214 list.add(frame);
215 connection.sendFrames(list);
216 }
217
218 @Override
219 CompletableFuture<Void> sendBodyAsync() {
220 final CompletableFuture<Void> cf = new CompletableFuture<>();
221 executor.execute(() -> {
222 try {
223 sendBodyImpl();
224 cf.complete(null);
225 } catch (IOException | InterruptedException e) {
226 cf.completeExceptionally(e);
227 }
228 }, null);
229 return cf;
230 }
231
232 @SuppressWarnings("unchecked")
233 Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
234 super(e);
235 this.client = client;
236 this.connection = connection;
237 this.request = e.request();
238 this.requestProcessor = request.requestProcessor();
239 responseHeaders = new HttpHeadersImpl();
240 requestHeaders = new HttpHeadersImpl();
241 rspHeadersConsumer = (name, value) -> {
242 responseHeaders.addHeader(name.toString(), value.toString());
243 };
244 this.executor = client.executorWrapper();
245 //this.response_cf = new CompletableFuture<HttpResponseImpl>();
246 this.requestPseudoHeaders = new HttpHeadersImpl();
247 // NEW
248 this.inputQ = new Queue<>();
249 }
250
251 @SuppressWarnings("unchecked")
252 Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
253 super(null);
254 this.client = client;
255 this.connection = connection;
256 this.request = req;
257 this.requestProcessor = null;
258 responseHeaders = new HttpHeadersImpl();
259 requestHeaders = new HttpHeadersImpl();
260 rspHeadersConsumer = (name, value) -> {
261 responseHeaders.addHeader(name.toString(), value.toString());
262 };
263 this.executor = client.executorWrapper();
264 //this.response_cf = new CompletableFuture<HttpResponseImpl>();
265 this.requestPseudoHeaders = new HttpHeadersImpl();
266 // NEW
267 this.inputQ = new Queue<>();
268 }
269
270 /**
271 * Entry point from Http2Connection reader thread.
272 *
273 * Data frames will be removed by response body thread.
274 *
275 * @param frame
276 * @throws IOException
277 */
278 void incoming(Http2Frame frame) throws IOException, InterruptedException {
279 if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
280 // Complete headers accumulated. handle response.
281 // It's okay if there are multiple HeaderFrames.
282 handleResponse();
283 } else if (frame instanceof DataFrame) {
284 inputQ.put(frame);
285 } else {
286 otherFrame(frame);
287 }
288 }
289
290 void otherFrame(Http2Frame frame) throws IOException {
291 switch (frame.type()) {
292 case WindowUpdateFrame.TYPE:
293 incoming_windowUpdate((WindowUpdateFrame) frame);
294 break;
295 case ResetFrame.TYPE:
296 incoming_reset((ResetFrame) frame);
297 break;
298 case PriorityFrame.TYPE:
299 incoming_priority((PriorityFrame) frame);
300 break;
301 default:
302 String msg = "Unexpected frame: " + frame.toString();
303 throw new IOException(msg);
304 }
323 .firstValueAsLong("content-length")
324 .orElse(-1L);
325 // different implementations for normal streams and pushed streams
326 completeResponse(response);
327 }
328
329 void incoming_reset(ResetFrame frame) {
330 // TODO: implement reset
331 int error = frame.getErrorCode();
332 IOException e = new IOException(ErrorFrame.stringForCode(error));
333 completeResponseExceptionally(e);
334 throw new UnsupportedOperationException("Not implemented");
335 }
336
337 void incoming_priority(PriorityFrame frame) {
338 // TODO: implement priority
339 throw new UnsupportedOperationException("Not implemented");
340 }
341
342 void incoming_windowUpdate(WindowUpdateFrame frame) {
343 int amount = frame.getUpdate();
344 if (amount > 0)
345 remoteRequestFlowController.accept(amount);
346 }
347
348 void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
349 if (Log.requests()) {
350 Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
351 }
352 PushGroup<?> pushGroup = request.pushGroup();
353 if (pushGroup == null) {
354 cancelImpl(new IllegalStateException("unexpected push promise"));
355 }
356 // get the handler and call it.
357 BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
358 pushGroup.pushHandler();
359
360 CompletableFuture<HttpResponse> pushCF = pushStream
361 .getResponseAsync(null)
362 .thenApply(r -> (HttpResponse)r);
363 boolean accept = ph.apply(pushReq, pushCF);
364 if (!accept) {
365 IOException ex = new IOException("Stream cancelled by user");
490 @Override
491 void sendBody() throws IOException, InterruptedException {
492 sendBodyImpl();
493 }
494
495 void registerStream(int id) {
496 this.streamid = id;
497 connection.putStream(this, streamid);
498 }
499
500 DataFrame getDataFrame() throws IOException, InterruptedException {
501 userRequestFlowController.take();
502 int maxpayloadLen = connection.getMaxSendFrameSize();
503 ByteBuffer buffer = connection.getBuffer();
504 buffer.limit(maxpayloadLen);
505 boolean complete = requestProcessor.onRequestBodyChunk(buffer);
506 buffer.flip();
507 int amount = buffer.remaining();
508 // wait for flow control if necessary. Following method will block
509 // until after headers frame is sent, so correct streamid is set.
510 remoteRequestFlowController.take(amount);
511 connection.obtainSendWindow(amount);
512
513 DataFrame df = new DataFrame();
514 df.streamid(streamid);
515 if (complete) {
516 df.setFlag(DataFrame.END_STREAM);
517 }
518 df.setData(buffer);
519 df.computeLength();
520 return df;
521 }
522
523
524 @Override
525 CompletableFuture<Void> sendHeadersAsync() {
526 try {
527 sendHeadersOnly();
528 return CompletableFuture.completedFuture(null);
529 } catch (IOException | InterruptedException ex) {
530 return CompletableFuture.failedFuture(ex);
531 }
532 }
608 if (!cf.isDone()) {
609 cf.completeExceptionally(t);
610 response_cfs.remove(cf);
611 return;
612 }
613 }
614 response_cfs.add(CompletableFuture.failedFuture(t));
615 }
616 }
617
618 void sendBodyImpl() throws IOException, InterruptedException {
619 if (requestContentLen == 0) {
620 // no body
621 requestSent();
622 return;
623 }
624 DataFrame df;
625 do {
626 df = getDataFrame();
627 // TODO: check accumulated content length (if not checked below)
628 connection.sendFrame(df);
629 } while (!df.getFlag(DataFrame.END_STREAM));
630 requestSent();
631 }
632
633 @Override
634 void cancel() {
635 cancelImpl(new Exception("Cancelled"));
636 }
637
638
639 void cancelImpl(Throwable e) {
640 Log.logTrace("cancelling stream: {0}\n", e.toString());
641 inputQ.close();
642 completeResponseExceptionally(e);
643 try {
644 connection.resetStream(streamid, ResetFrame.CANCEL);
645 } catch (IOException | InterruptedException ex) {
646 Log.logError(ex);
647 }
648 }
660 }, null);
661 return cf;
662 }
663
664 @Override
665 <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
666 this.responseProcessor = processor;
667 T body = processor.onResponseBodyStart(
668 responseContentLen, responseHeaders,
669 responseFlowController); // TODO: filter headers
670 if (body == null) {
671 receiveData();
672 body = processor.onResponseComplete();
673 } else
674 receiveDataAsync(processor);
675 responseReceived();
676 return body;
677 }
678
679 // called from Http2Connection reader thread
680 synchronized void updateOutgoingWindow(int update) {
681 remoteRequestFlowController.accept(update);
682 }
683
684 void close(String msg) {
685 cancel();
686 }
687
688 static class PushedStream extends Stream {
689 final PushGroup<?> pushGroup;
690 final private Stream parent; // used by server push streams
691 // push streams need the response CF allocated up front as it is
692 // given directly to user via the multi handler callback function.
693 final CompletableFuture<HttpResponseImpl> pushCF;
694 final HttpRequestImpl pushReq;
695
696 PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
697 Http2Connection connection, Stream parent,
698 HttpRequestImpl pushReq) {
699 super(client, connection, pushReq);
700 this.pushGroup = pushGroup;
701 this.pushReq = pushReq;
834 checkIfCompleted();
835 }
836
837 synchronized void pushCompleted() {
838 remainingPushes--;
839 checkIfCompleted();
840 }
841
842 synchronized void checkIfCompleted() {
843 if (remainingPushes == 0 && error == null && noMorePushes) {
844 T overallResult = multiProcessor.onComplete();
845 resultCF.complete(overallResult);
846 }
847 }
848
849 synchronized void pushError(Throwable t) {
850 if (t == null)
851 return;
852 this.error = t;
853 resultCF.completeExceptionally(t);
854 }
855 }
856 }
|
77 * stream inputQ. Obeys remote and local flow control so may block.
78 * Calls user response body processor with data buffers.
79 *
80 * responseBodyAsync() -- calls responseBody() in an executor thread.
81 *
82 * incoming() -- entry point called from connection reader thread. Frames are
83 * either handled immediately without blocking or for data frames
84 * placed on the stream's inputQ which is consumed by the stream's
85 * reader thread.
86 *
87 * PushedStream sub class
88 * ======================
89 * Sending side methods are not used because the request comes from a PUSH_PROMISE
90 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
91 * is created. PushedStream does not use responseCF list as there can be only
92 * one response. The CF is created when the object created and when the response
93 * HEADERS frame is received the object is completed.
94 */
95 class Stream extends ExchangeImpl {
96
97 final ClosableBlockingQueue<DataFrame> inputQ = new ClosableBlockingQueue<>();
98
99 volatile int streamid;
100
101 long responseContentLen = -1;
102 long responseBytesProcessed = 0;
103 long requestContentLen;
104
105 Http2Connection connection;
106 HttpClientImpl client;
107 final HttpRequestImpl request;
108 final DecodingCallback rspHeadersConsumer;
109 HttpHeadersImpl responseHeaders;
110 final HttpHeadersImpl requestHeaders;
111 final HttpHeadersImpl requestPseudoHeaders;
112 HttpResponse.BodyProcessor<?> responseProcessor;
113 final HttpRequest.BodyProcessor requestProcessor;
114 HttpResponse response;
115 final WindowUpdateSender windowUpdater;
116 // state flags
117 boolean requestSent, responseReceived;
118
119 final FlowController userRequestFlowController =
120 new FlowController();
121 final FlowController responseFlowController =
122 new FlowController();
123 final WindowControl outgoingWindow = new WindowControl();
124
125 final ExecutorWrapper executor;
126
127 @Override
128 @SuppressWarnings("unchecked")
129 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
130 this.responseProcessor = processor;
131 CompletableFuture<T> cf;
132 try {
133 T body = processor.onResponseBodyStart(
134 responseContentLen, responseHeaders,
135 responseFlowController); // TODO: filter headers
136 if (body != null) {
137 cf = CompletableFuture.completedFuture(body);
138 receiveDataAsync(processor);
139 } else
140 cf = receiveDataAsync(processor);
141 } catch (IOException e) {
142 cf = CompletableFuture.failedFuture(e);
143 }
144 PushGroup<?> pg = request.pushGroup();
145 if (pg != null) {
146 // if an error occurs make sure it is recorded in the PushGroup
147 cf = cf.whenComplete((t,e) -> pg.pushError(e));
148 }
149 return cf;
150 }
151
152 @Override
153 public String toString() {
154 StringBuilder sb = new StringBuilder();
155 sb.append("streamid: ")
156 .append(streamid);
157 return sb.toString();
158 }
159
160 // pushes entire response body into response processor
161 // blocking when required by local or remote flow control
162 void receiveData() throws IOException {
163 DataFrame df = null;
164 try {
165 boolean endOfStream;
166 do {
167 df = inputQ.take();
168 endOfStream = df.getFlag(DataFrame.END_STREAM);
169 int len = df.getDataLength();
170 ByteBuffer[] buffers = df.getData();
171 for (ByteBuffer b : buffers) {
172 responseFlowController.take();
173 responseProcessor.onResponseBodyChunk(b);
174 }
175 connection.windowUpdater.update(len);
176 if(!endOfStream) {
177 // if we got the last frame in the stream we shouldn't send WindowUpdate
178 windowUpdater.update(len);
179 }
180 } while (!endOfStream);
181 } catch (InterruptedException e) {
182 throw new IOException(e);
183 }
184 }
185
186 private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
187 CompletableFuture<T> cf = new CompletableFuture<>();
188 executor.execute(() -> {
189 try {
190 receiveData();
191 T body = processor.onResponseComplete();
192 cf.complete(body);
193 responseReceived();
194 } catch (Throwable t) {
195 cf.completeExceptionally(t);
196 }
197 }, null);
198 return cf;
199 }
200
201 @Override
202 CompletableFuture<Void> sendBodyAsync() {
203 final CompletableFuture<Void> cf = new CompletableFuture<>();
204 executor.execute(() -> {
205 try {
206 sendBodyImpl();
207 cf.complete(null);
208 } catch (IOException | InterruptedException e) {
209 cf.completeExceptionally(e);
210 }
211 }, null);
212 return cf;
213 }
214
215 @SuppressWarnings("unchecked")
216 Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
217 super(e);
218 this.client = client;
219 this.connection = connection;
220 this.request = e.request();
221 this.requestProcessor = request.requestProcessor();
222 responseHeaders = new HttpHeadersImpl();
223 requestHeaders = new HttpHeadersImpl();
224 rspHeadersConsumer = (name, value) -> {
225 responseHeaders.addHeader(name.toString(), value.toString());
226 };
227 this.executor = client.executorWrapper();
228 //this.response_cf = new CompletableFuture<HttpResponseImpl>();
229 this.requestPseudoHeaders = new HttpHeadersImpl();
230 // NEW
231 this.windowUpdater = new StreamWindowUpdateSender(connection);
232 }
233
234 @SuppressWarnings("unchecked")
235 Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
236 super(null);
237 this.client = client;
238 this.connection = connection;
239 this.request = req;
240 this.requestProcessor = null;
241 responseHeaders = new HttpHeadersImpl();
242 requestHeaders = new HttpHeadersImpl();
243 rspHeadersConsumer = (name, value) -> {
244 responseHeaders.addHeader(name.toString(), value.toString());
245 };
246 this.executor = client.executorWrapper();
247 //this.response_cf = new CompletableFuture<HttpResponseImpl>();
248 this.requestPseudoHeaders = new HttpHeadersImpl();
249 // NEW
250 this.windowUpdater = new StreamWindowUpdateSender(connection);
251 }
252
253 /**
254 * Entry point from Http2Connection reader thread.
255 *
256 * Data frames will be removed by response body thread.
257 *
258 * @param frame
259 * @throws IOException
260 */
261 void incoming(Http2Frame frame) throws IOException, InterruptedException {
262 if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
263 // Complete headers accumulated. handle response.
264 // It's okay if there are multiple HeaderFrames.
265 handleResponse();
266 } else if (frame instanceof DataFrame) {
267 inputQ.put((DataFrame)frame);
268 } else {
269 otherFrame(frame);
270 }
271 }
272
273 void otherFrame(Http2Frame frame) throws IOException {
274 switch (frame.type()) {
275 case WindowUpdateFrame.TYPE:
276 incoming_windowUpdate((WindowUpdateFrame) frame);
277 break;
278 case ResetFrame.TYPE:
279 incoming_reset((ResetFrame) frame);
280 break;
281 case PriorityFrame.TYPE:
282 incoming_priority((PriorityFrame) frame);
283 break;
284 default:
285 String msg = "Unexpected frame: " + frame.toString();
286 throw new IOException(msg);
287 }
306 .firstValueAsLong("content-length")
307 .orElse(-1L);
308 // different implementations for normal streams and pushed streams
309 completeResponse(response);
310 }
311
312 void incoming_reset(ResetFrame frame) {
313 // TODO: implement reset
314 int error = frame.getErrorCode();
315 IOException e = new IOException(ErrorFrame.stringForCode(error));
316 completeResponseExceptionally(e);
317 throw new UnsupportedOperationException("Not implemented");
318 }
319
320 void incoming_priority(PriorityFrame frame) {
321 // TODO: implement priority
322 throw new UnsupportedOperationException("Not implemented");
323 }
324
325 void incoming_windowUpdate(WindowUpdateFrame frame) {
326 outgoingWindow.update(frame.getUpdate());
327 }
328
329 void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
330 if (Log.requests()) {
331 Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
332 }
333 PushGroup<?> pushGroup = request.pushGroup();
334 if (pushGroup == null) {
335 cancelImpl(new IllegalStateException("unexpected push promise"));
336 }
337 // get the handler and call it.
338 BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
339 pushGroup.pushHandler();
340
341 CompletableFuture<HttpResponse> pushCF = pushStream
342 .getResponseAsync(null)
343 .thenApply(r -> (HttpResponse)r);
344 boolean accept = ph.apply(pushReq, pushCF);
345 if (!accept) {
346 IOException ex = new IOException("Stream cancelled by user");
471 @Override
472 void sendBody() throws IOException, InterruptedException {
473 sendBodyImpl();
474 }
475
476 void registerStream(int id) {
477 this.streamid = id;
478 connection.putStream(this, streamid);
479 }
480
481 DataFrame getDataFrame() throws IOException, InterruptedException {
482 userRequestFlowController.take();
483 int maxpayloadLen = connection.getMaxSendFrameSize();
484 ByteBuffer buffer = connection.getBuffer();
485 buffer.limit(maxpayloadLen);
486 boolean complete = requestProcessor.onRequestBodyChunk(buffer);
487 buffer.flip();
488 int amount = buffer.remaining();
489 // wait for flow control if necessary. Following method will block
490 // until after headers frame is sent, so correct streamid is set.
491 outgoingWindow.acquire(amount);
492 connection.connectionSendWindow.acquire(amount);
493 DataFrame df = new DataFrame();
494 df.streamid(streamid);
495 if (complete) {
496 df.setFlag(DataFrame.END_STREAM);
497 }
498 df.setData(buffer);
499 df.computeLength();
500 return df;
501 }
502
503
504 @Override
505 CompletableFuture<Void> sendHeadersAsync() {
506 try {
507 sendHeadersOnly();
508 return CompletableFuture.completedFuture(null);
509 } catch (IOException | InterruptedException ex) {
510 return CompletableFuture.failedFuture(ex);
511 }
512 }
588 if (!cf.isDone()) {
589 cf.completeExceptionally(t);
590 response_cfs.remove(cf);
591 return;
592 }
593 }
594 response_cfs.add(CompletableFuture.failedFuture(t));
595 }
596 }
597
598 void sendBodyImpl() throws IOException, InterruptedException {
599 if (requestContentLen == 0) {
600 // no body
601 requestSent();
602 return;
603 }
604 DataFrame df;
605 do {
606 df = getDataFrame();
607 // TODO: check accumulated content length (if not checked below)
608 connection.sendDataFrame(df);
609 } while (!df.getFlag(DataFrame.END_STREAM));
610 requestSent();
611 }
612
613 @Override
614 void cancel() {
615 cancelImpl(new Exception("Cancelled"));
616 }
617
618
619 void cancelImpl(Throwable e) {
620 Log.logTrace("cancelling stream: {0}\n", e.toString());
621 inputQ.close();
622 completeResponseExceptionally(e);
623 try {
624 connection.resetStream(streamid, ResetFrame.CANCEL);
625 } catch (IOException | InterruptedException ex) {
626 Log.logError(ex);
627 }
628 }
640 }, null);
641 return cf;
642 }
643
644 @Override
645 <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
646 this.responseProcessor = processor;
647 T body = processor.onResponseBodyStart(
648 responseContentLen, responseHeaders,
649 responseFlowController); // TODO: filter headers
650 if (body == null) {
651 receiveData();
652 body = processor.onResponseComplete();
653 } else
654 receiveDataAsync(processor);
655 responseReceived();
656 return body;
657 }
658
659 // called from Http2Connection reader thread
660 void updateOutgoingWindow(int update) {
661 outgoingWindow.update(update);
662 }
663
664 void close(String msg) {
665 cancel();
666 }
667
668 static class PushedStream extends Stream {
669 final PushGroup<?> pushGroup;
670 final private Stream parent; // used by server push streams
671 // push streams need the response CF allocated up front as it is
672 // given directly to user via the multi handler callback function.
673 final CompletableFuture<HttpResponseImpl> pushCF;
674 final HttpRequestImpl pushReq;
675
676 PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
677 Http2Connection connection, Stream parent,
678 HttpRequestImpl pushReq) {
679 super(client, connection, pushReq);
680 this.pushGroup = pushGroup;
681 this.pushReq = pushReq;
814 checkIfCompleted();
815 }
816
817 synchronized void pushCompleted() {
818 remainingPushes--;
819 checkIfCompleted();
820 }
821
822 synchronized void checkIfCompleted() {
823 if (remainingPushes == 0 && error == null && noMorePushes) {
824 T overallResult = multiProcessor.onComplete();
825 resultCF.complete(overallResult);
826 }
827 }
828
829 synchronized void pushError(Throwable t) {
830 if (t == null)
831 return;
832 this.error = t;
833 resultCF.completeExceptionally(t);
834 }
835 }
836
837 class StreamWindowUpdateSender extends WindowUpdateSender {
838
839 StreamWindowUpdateSender(Http2Connection connection) {
840 super(connection);
841 }
842
843 @Override
844 int getStreamId() {
845 return streamid;
846 }
847 }
848 }
|