< prev index next >

src/java.httpclient/share/classes/java/net/http/Stream.java

Print this page
rev 14860 : fix Memory leak in HTTP2Connection.streams
rev 14682 : 8158651: ConcurrentModification exceptions in httpclient
Reviewed-by: rriggs
rev 14584 : 8157105: HTTP/2 client hangs in blocking mode if an invalid frame has been received
Reviewed-by: rriggs
rev 14431 : 8156710: HttpTimeoutException should be thrown if server doesn't respond
Reviewed-by: michaelm
rev 14348 : 8087124: HTTP/2 implementation
Reviewed-by: chegar
rev 13785 : 8087112: HTTP API and HTTP/1.1 implementation
Reviewed-by: alanb, chegar, coffeys, psandoz, rriggs


 600      * same as above but for errors
 601      *
 602      * @param t
 603      */
 604     void completeResponseExceptionally(Throwable t) {
 605         synchronized (response_cfs) {
 606             for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
 607                 if (!cf.isDone()) {
 608                     cf.completeExceptionally(t);
 609                     response_cfs.remove(cf);
 610                     return;
 611                 }
 612             }
 613             response_cfs.add(CompletableFuture.failedFuture(t));
 614         }
 615     }
 616 
 617     void sendBodyImpl() throws IOException, InterruptedException {
 618         if (requestContentLen == 0) {
 619             // no body

 620             return;
 621         }
 622         DataFrame df;
 623         do {
 624             df = getDataFrame();
 625             // TODO: check accumulated content length (if not checked below)
 626             connection.sendFrame(df);
 627         } while (!df.getFlag(DataFrame.END_STREAM));
 628         requestSent();
 629     }
 630 
 631     @Override
 632     void cancel() {
 633         cancelImpl(new Exception("Cancelled"));
 634     }
 635 
 636 
 637     void cancelImpl(Throwable e) {
 638         Log.logTrace("cancelling stream: {0}\n", e.toString());
 639         inputQ.close();


 650         CompletableFuture<Void> cf = new CompletableFuture<>();
 651         executor.execute(() -> {
 652            try {
 653                sendRequest();
 654                cf.complete(null);
 655            } catch (IOException |InterruptedException e) {
 656                cf.completeExceptionally(e);
 657            }
 658         }, null);
 659         return cf;
 660     }
 661 
 662     @Override
 663     <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
 664         this.responseProcessor = processor;
 665         T body = processor.onResponseBodyStart(
 666                     responseContentLen, responseHeaders,
 667                     responseFlowController); // TODO: filter headers
 668         if (body == null) {
 669             receiveData();
 670             return processor.onResponseComplete();
 671         } else
 672             receiveDataAsync(processor);
 673         responseReceived();
 674         return body;
 675     }
 676 
 677     // called from Http2Connection reader thread
 678     synchronized void updateOutgoingWindow(int update) {
 679         remoteRequestFlowController.accept(update);
 680     }
 681 
 682     void close(String msg) {
 683         cancel();
 684     }
 685 
 686     static class PushedStream extends Stream {
 687         final PushGroup<?> pushGroup;
 688         final private Stream parent;      // used by server push streams
 689         // push streams need the response CF allocated up front as it is
 690         // given directly to user via the multi handler callback function.




 600      * same as above but for errors
 601      *
 602      * @param t
 603      */
 604     void completeResponseExceptionally(Throwable t) {
 605         synchronized (response_cfs) {
 606             for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
 607                 if (!cf.isDone()) {
 608                     cf.completeExceptionally(t);
 609                     response_cfs.remove(cf);
 610                     return;
 611                 }
 612             }
 613             response_cfs.add(CompletableFuture.failedFuture(t));
 614         }
 615     }
 616 
 617     void sendBodyImpl() throws IOException, InterruptedException {
 618         if (requestContentLen == 0) {
 619             // no body
 620             requestSent();
 621             return;
 622         }
 623         DataFrame df;
 624         do {
 625             df = getDataFrame();
 626             // TODO: check accumulated content length (if not checked below)
 627             connection.sendFrame(df);
 628         } while (!df.getFlag(DataFrame.END_STREAM));
 629         requestSent();
 630     }
 631 
 632     @Override
 633     void cancel() {
 634         cancelImpl(new Exception("Cancelled"));
 635     }
 636 
 637 
 638     void cancelImpl(Throwable e) {
 639         Log.logTrace("cancelling stream: {0}\n", e.toString());
 640         inputQ.close();


 651         CompletableFuture<Void> cf = new CompletableFuture<>();
 652         executor.execute(() -> {
 653            try {
 654                sendRequest();
 655                cf.complete(null);
 656            } catch (IOException |InterruptedException e) {
 657                cf.completeExceptionally(e);
 658            }
 659         }, null);
 660         return cf;
 661     }
 662 
 663     @Override
 664     <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
 665         this.responseProcessor = processor;
 666         T body = processor.onResponseBodyStart(
 667                     responseContentLen, responseHeaders,
 668                     responseFlowController); // TODO: filter headers
 669         if (body == null) {
 670             receiveData();
 671             body = processor.onResponseComplete();
 672         } else
 673             receiveDataAsync(processor);
 674         responseReceived();
 675         return body;
 676     }
 677 
 678     // called from Http2Connection reader thread
 679     synchronized void updateOutgoingWindow(int update) {
 680         remoteRequestFlowController.accept(update);
 681     }
 682 
 683     void close(String msg) {
 684         cancel();
 685     }
 686 
 687     static class PushedStream extends Stream {
 688         final PushGroup<?> pushGroup;
 689         final private Stream parent;      // used by server push streams
 690         // push streams need the response CF allocated up front as it is
 691         // given directly to user via the multi handler callback function.


< prev index next >