600 * same as above but for errors
601 *
602 * @param t
603 */
604 void completeResponseExceptionally(Throwable t) {
605 synchronized (response_cfs) {
606 for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
607 if (!cf.isDone()) {
608 cf.completeExceptionally(t);
609 response_cfs.remove(cf);
610 return;
611 }
612 }
613 response_cfs.add(CompletableFuture.failedFuture(t));
614 }
615 }
616
617 void sendBodyImpl() throws IOException, InterruptedException {
618 if (requestContentLen == 0) {
619 // no body
620 return;
621 }
622 DataFrame df;
623 do {
624 df = getDataFrame();
625 // TODO: check accumulated content length (if not checked below)
626 connection.sendFrame(df);
627 } while (!df.getFlag(DataFrame.END_STREAM));
628 requestSent();
629 }
630
631 @Override
632 void cancel() {
633 cancelImpl(new Exception("Cancelled"));
634 }
635
636
637 void cancelImpl(Throwable e) {
638 Log.logTrace("cancelling stream: {0}\n", e.toString());
639 inputQ.close();
650 CompletableFuture<Void> cf = new CompletableFuture<>();
651 executor.execute(() -> {
652 try {
653 sendRequest();
654 cf.complete(null);
655 } catch (IOException |InterruptedException e) {
656 cf.completeExceptionally(e);
657 }
658 }, null);
659 return cf;
660 }
661
662 @Override
663 <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
664 this.responseProcessor = processor;
665 T body = processor.onResponseBodyStart(
666 responseContentLen, responseHeaders,
667 responseFlowController); // TODO: filter headers
668 if (body == null) {
669 receiveData();
670 return processor.onResponseComplete();
671 } else
672 receiveDataAsync(processor);
673 responseReceived();
674 return body;
675 }
676
677 // called from Http2Connection reader thread
678 synchronized void updateOutgoingWindow(int update) {
679 remoteRequestFlowController.accept(update);
680 }
681
682 void close(String msg) {
683 cancel();
684 }
685
686 static class PushedStream extends Stream {
687 final PushGroup<?> pushGroup;
688 final private Stream parent; // used by server push streams
689 // push streams need the response CF allocated up front as it is
690 // given directly to user via the multi handler callback function.
|
600 * same as above but for errors
601 *
602 * @param t
603 */
604 void completeResponseExceptionally(Throwable t) {
605 synchronized (response_cfs) {
606 for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
607 if (!cf.isDone()) {
608 cf.completeExceptionally(t);
609 response_cfs.remove(cf);
610 return;
611 }
612 }
613 response_cfs.add(CompletableFuture.failedFuture(t));
614 }
615 }
616
617 void sendBodyImpl() throws IOException, InterruptedException {
618 if (requestContentLen == 0) {
619 // no body
620 requestSent();
621 return;
622 }
623 DataFrame df;
624 do {
625 df = getDataFrame();
626 // TODO: check accumulated content length (if not checked below)
627 connection.sendFrame(df);
628 } while (!df.getFlag(DataFrame.END_STREAM));
629 requestSent();
630 }
631
632 @Override
633 void cancel() {
634 cancelImpl(new Exception("Cancelled"));
635 }
636
637
638 void cancelImpl(Throwable e) {
639 Log.logTrace("cancelling stream: {0}\n", e.toString());
640 inputQ.close();
651 CompletableFuture<Void> cf = new CompletableFuture<>();
652 executor.execute(() -> {
653 try {
654 sendRequest();
655 cf.complete(null);
656 } catch (IOException |InterruptedException e) {
657 cf.completeExceptionally(e);
658 }
659 }, null);
660 return cf;
661 }
662
663 @Override
664 <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
665 this.responseProcessor = processor;
666 T body = processor.onResponseBodyStart(
667 responseContentLen, responseHeaders,
668 responseFlowController); // TODO: filter headers
669 if (body == null) {
670 receiveData();
671 body = processor.onResponseComplete();
672 } else
673 receiveDataAsync(processor);
674 responseReceived();
675 return body;
676 }
677
678 // called from Http2Connection reader thread
679 synchronized void updateOutgoingWindow(int update) {
680 remoteRequestFlowController.accept(update);
681 }
682
683 void close(String msg) {
684 cancel();
685 }
686
687 static class PushedStream extends Stream {
688 final PushGroup<?> pushGroup;
689 final private Stream parent; // used by server push streams
690 // push streams need the response CF allocated up front as it is
691 // given directly to user via the multi handler callback function.
|