< prev index next >

src/java.httpclient/share/classes/java/net/http/Stream.java

Print this page
rev 15333 : JDK-8162497 fix obtainSendWindow deadlock
rev 15334 : JDK-8161004 bulk sendWindowUpdate
rev 15335 : Async Queues


  77  *               stream inputQ. Obeys remote and local flow control so may block.
  78  *               Calls user response body processor with data buffers.
  79  *
  80  * responseBodyAsync() -- calls responseBody() in an executor thread.
  81  *
  82  * incoming() -- entry point called from connection reader thread. Frames are
  83  *               either handled immediately without blocking or for data frames
  84  *               placed on the stream's inputQ which is consumed by the stream's
  85  *               reader thread.
  86  *
  87  * PushedStream sub class
  88  * ======================
  89  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  90  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  91  * is created. PushedStream does not use responseCF list as there can be only
  92  * one response. The CF is created when the object created and when the response
  93  * HEADERS frame is received the object is completed.
  94  */
  95 class Stream extends ExchangeImpl {
  96 
  97     final Queue<Http2Frame> inputQ;
  98 
  99     volatile int streamid;
 100 
 101     long responseContentLen = -1;
 102     long responseBytesProcessed = 0;
 103     long requestContentLen;
 104 
 105     Http2Connection connection;
 106     HttpClientImpl client;
 107     final HttpRequestImpl request;
 108     final DecodingCallback rspHeadersConsumer;
 109     HttpHeadersImpl responseHeaders;
 110     final HttpHeadersImpl requestHeaders;
 111     final HttpHeadersImpl requestPseudoHeaders;
 112     HttpResponse.BodyProcessor<?> responseProcessor;
 113     final HttpRequest.BodyProcessor requestProcessor;
 114     HttpResponse response;
 115 
 116     // state flags
 117     boolean requestSent, responseReceived;
 118 
 119     final FlowController userRequestFlowController =
 120             new FlowController();
 121     final FlowController remoteRequestFlowController =
 122             new FlowController();
 123     final FlowController responseFlowController =
 124             new FlowController();

 125 
 126     final ExecutorWrapper executor;
 127 
 128     @Override
 129     @SuppressWarnings("unchecked")
 130     <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
 131         this.responseProcessor = processor;
 132         CompletableFuture<T> cf;
 133         try {
 134             T body = processor.onResponseBodyStart(
 135                     responseContentLen, responseHeaders,
 136                     responseFlowController); // TODO: filter headers
 137             if (body != null) {
 138                 cf = CompletableFuture.completedFuture(body);
 139                 receiveDataAsync(processor);
 140             } else
 141                 cf = receiveDataAsync(processor);
 142         } catch (IOException e) {
 143             cf = CompletableFuture.failedFuture(e);
 144         }
 145         PushGroup<?> pg = request.pushGroup();
 146         if (pg != null) {
 147             // if an error occurs make sure it is recorded in the PushGroup
 148             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 149         }
 150         return cf;
 151     }
 152 
 153     @Override
 154     public String toString() {
 155         StringBuilder sb = new StringBuilder();
 156         sb.append("streamid: ")
 157                 .append(streamid);
 158         return sb.toString();
 159     }
 160 
 161     // pushes entire response body into response processor
 162     // blocking when required by local or remote flow control
 163     void receiveData() throws IOException {
 164         Http2Frame frame;
 165         DataFrame df = null;
 166         try {

 167             do {
 168                 frame = inputQ.take();
 169                 if (!(frame instanceof DataFrame)) {
 170                     assert false;
 171                     continue;
 172                 }
 173                 df = (DataFrame) frame;
 174                 int len = df.getDataLength();
 175                 ByteBuffer[] buffers = df.getData();
 176                 for (ByteBuffer b : buffers) {
 177                     responseFlowController.take();
 178                     responseProcessor.onResponseBodyChunk(b);
 179                 }
 180                 sendWindowUpdate(len);
 181             } while (!df.getFlag(DataFrame.END_STREAM));




 182         } catch (InterruptedException e) {
 183             throw new IOException(e);
 184         }
 185     }
 186 
 187     private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
 188         CompletableFuture<T> cf = new CompletableFuture<>();
 189         executor.execute(() -> {
 190             try {
 191                 receiveData();
 192                 T body = processor.onResponseComplete();
 193                 cf.complete(body);
 194                 responseReceived();
 195             } catch (Throwable t) {
 196                 cf.completeExceptionally(t);
 197             }
 198         }, null);
 199         return cf;
 200     }
 201 
 202     private void sendWindowUpdate(int increment)
 203             throws IOException, InterruptedException {
 204         if (increment == 0)
 205             return;
 206         LinkedList<Http2Frame> list = new LinkedList<>();
 207         WindowUpdateFrame frame = new WindowUpdateFrame();
 208         frame.streamid(streamid);
 209         frame.setUpdate(increment);
 210         list.add(frame);
 211         frame = new WindowUpdateFrame();
 212         frame.streamid(0);
 213         frame.setUpdate(increment);
 214         list.add(frame);
 215         connection.sendFrames(list);
 216     }
 217 
 218     @Override
 219     CompletableFuture<Void> sendBodyAsync() {
 220         final CompletableFuture<Void> cf = new CompletableFuture<>();
 221         executor.execute(() -> {
 222             try {
 223                 sendBodyImpl();
 224                 cf.complete(null);
 225             } catch (IOException | InterruptedException e) {
 226                 cf.completeExceptionally(e);
 227             }
 228         }, null);
 229         return cf;
 230     }
 231 
 232     @SuppressWarnings("unchecked")
 233     Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
 234         super(e);
 235         this.client = client;
 236         this.connection = connection;
 237         this.request = e.request();
 238         this.requestProcessor = request.requestProcessor();
 239         responseHeaders = new HttpHeadersImpl();
 240         requestHeaders = new HttpHeadersImpl();
 241         rspHeadersConsumer = (name, value) -> {
 242             responseHeaders.addHeader(name.toString(), value.toString());
 243         };
 244         this.executor = client.executorWrapper();
 245         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 246         this.requestPseudoHeaders = new HttpHeadersImpl();
 247         // NEW
 248         this.inputQ = new Queue<>();
 249     }
 250 
 251     @SuppressWarnings("unchecked")
 252     Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
 253         super(null);
 254         this.client = client;
 255         this.connection = connection;
 256         this.request = req;
 257         this.requestProcessor = null;
 258         responseHeaders = new HttpHeadersImpl();
 259         requestHeaders = new HttpHeadersImpl();
 260         rspHeadersConsumer = (name, value) -> {
 261             responseHeaders.addHeader(name.toString(), value.toString());
 262         };
 263         this.executor = client.executorWrapper();
 264         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 265         this.requestPseudoHeaders = new HttpHeadersImpl();
 266         // NEW
 267         this.inputQ = new Queue<>();
 268     }
 269 
 270     /**
 271      * Entry point from Http2Connection reader thread.
 272      *
 273      * Data frames will be removed by response body thread.
 274      *
 275      * @param frame
 276      * @throws IOException
 277      */
 278     void incoming(Http2Frame frame) throws IOException, InterruptedException {
 279         if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
 280             // Complete headers accumulated. handle response.
 281             // It's okay if there are multiple HeaderFrames.
 282             handleResponse();
 283         } else if (frame instanceof DataFrame) {
 284             inputQ.put(frame);
 285         } else {
 286             otherFrame(frame);
 287         }
 288     }
 289 
 290     void otherFrame(Http2Frame frame) throws IOException {
 291         switch (frame.type()) {
 292             case WindowUpdateFrame.TYPE:
 293                 incoming_windowUpdate((WindowUpdateFrame) frame);
 294                 break;
 295             case ResetFrame.TYPE:
 296                 incoming_reset((ResetFrame) frame);
 297                 break;
 298             case PriorityFrame.TYPE:
 299                 incoming_priority((PriorityFrame) frame);
 300                 break;
 301             default:
 302                 String msg = "Unexpected frame: " + frame.toString();
 303                 throw new IOException(msg);
 304         }


 323                 .firstValueAsLong("content-length")
 324                 .orElse(-1L);
 325         // different implementations for normal streams and pushed streams
 326         completeResponse(response);
 327     }
 328 
 329     void incoming_reset(ResetFrame frame) {
 330         // TODO: implement reset
 331         int error = frame.getErrorCode();
 332         IOException e = new IOException(ErrorFrame.stringForCode(error));
 333         completeResponseExceptionally(e);
 334         throw new UnsupportedOperationException("Not implemented");
 335     }
 336 
 337     void incoming_priority(PriorityFrame frame) {
 338         // TODO: implement priority
 339         throw new UnsupportedOperationException("Not implemented");
 340     }
 341 
 342     void incoming_windowUpdate(WindowUpdateFrame frame) {
 343         int amount = frame.getUpdate();
 344         if (amount > 0)
 345             remoteRequestFlowController.accept(amount);
 346     }
 347 
 348     void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
 349         if (Log.requests()) {
 350             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 351         }
 352         PushGroup<?> pushGroup = request.pushGroup();
 353         if (pushGroup == null) {
 354             cancelImpl(new IllegalStateException("unexpected push promise"));
 355         }
 356         // get the handler and call it.
 357         BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
 358             pushGroup.pushHandler();
 359 
 360         CompletableFuture<HttpResponse> pushCF = pushStream
 361                 .getResponseAsync(null)
 362                 .thenApply(r -> (HttpResponse)r);
 363         boolean accept = ph.apply(pushReq, pushCF);
 364         if (!accept) {
 365             IOException ex = new IOException("Stream cancelled by user");


 490     @Override
 491     void sendBody() throws IOException, InterruptedException {
 492         sendBodyImpl();
 493     }
 494 
 495     void registerStream(int id) {
 496         this.streamid = id;
 497         connection.putStream(this, streamid);
 498     }
 499 
 500     DataFrame getDataFrame() throws IOException, InterruptedException {
 501         userRequestFlowController.take();
 502         int maxpayloadLen = connection.getMaxSendFrameSize();
 503         ByteBuffer buffer = connection.getBuffer();
 504         buffer.limit(maxpayloadLen);
 505         boolean complete = requestProcessor.onRequestBodyChunk(buffer);
 506         buffer.flip();
 507         int amount = buffer.remaining();
 508         // wait for flow control if necessary. Following method will block
 509         // until after headers frame is sent, so correct streamid is set.
 510         remoteRequestFlowController.take(amount);
 511         connection.obtainSendWindow(amount);
 512 
 513         DataFrame df = new DataFrame();
 514         df.streamid(streamid);
 515         if (complete) {
 516             df.setFlag(DataFrame.END_STREAM);
 517         }
 518         df.setData(buffer);
 519         df.computeLength();
 520         return df;
 521     }
 522 
 523 
 524     @Override
 525     CompletableFuture<Void> sendHeadersAsync() {
 526         try {
 527             sendHeadersOnly();
 528             return CompletableFuture.completedFuture(null);
 529         } catch (IOException | InterruptedException ex) {
 530             return CompletableFuture.failedFuture(ex);
 531         }
 532     }


 608                 if (!cf.isDone()) {
 609                     cf.completeExceptionally(t);
 610                     response_cfs.remove(cf);
 611                     return;
 612                 }
 613             }
 614             response_cfs.add(CompletableFuture.failedFuture(t));
 615         }
 616     }
 617 
 618     void sendBodyImpl() throws IOException, InterruptedException {
 619         if (requestContentLen == 0) {
 620             // no body
 621             requestSent();
 622             return;
 623         }
 624         DataFrame df;
 625         do {
 626             df = getDataFrame();
 627             // TODO: check accumulated content length (if not checked below)
 628             connection.sendFrame(df);
 629         } while (!df.getFlag(DataFrame.END_STREAM));
 630         requestSent();
 631     }
 632 
 633     @Override
 634     void cancel() {
 635         cancelImpl(new Exception("Cancelled"));
 636     }
 637 
 638 
 639     void cancelImpl(Throwable e) {
 640         Log.logTrace("cancelling stream: {0}\n", e.toString());
 641         inputQ.close();
 642         completeResponseExceptionally(e);
 643         try {
 644             connection.resetStream(streamid, ResetFrame.CANCEL);
 645         } catch (IOException | InterruptedException ex) {
 646             Log.logError(ex);
 647         }
 648     }


 660         }, null);
 661         return cf;
 662     }
 663 
 664     @Override
 665     <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
 666         this.responseProcessor = processor;
 667         T body = processor.onResponseBodyStart(
 668                     responseContentLen, responseHeaders,
 669                     responseFlowController); // TODO: filter headers
 670         if (body == null) {
 671             receiveData();
 672             body = processor.onResponseComplete();
 673         } else
 674             receiveDataAsync(processor);
 675         responseReceived();
 676         return body;
 677     }
 678 
 679     // called from Http2Connection reader thread
 680     synchronized void updateOutgoingWindow(int update) {
 681         remoteRequestFlowController.accept(update);
 682     }
 683 
 684     void close(String msg) {
 685         cancel();
 686     }
 687 
 688     static class PushedStream extends Stream {
 689         final PushGroup<?> pushGroup;
 690         final private Stream parent;      // used by server push streams
 691         // push streams need the response CF allocated up front as it is
 692         // given directly to user via the multi handler callback function.
 693         final CompletableFuture<HttpResponseImpl> pushCF;
 694         final HttpRequestImpl pushReq;
 695 
 696         PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
 697                 Http2Connection connection, Stream parent,
 698                 HttpRequestImpl pushReq) {
 699             super(client, connection, pushReq);
 700             this.pushGroup = pushGroup;
 701             this.pushReq = pushReq;


 834             checkIfCompleted();
 835         }
 836 
 837         synchronized void pushCompleted() {
 838             remainingPushes--;
 839             checkIfCompleted();
 840         }
 841 
 842         synchronized void checkIfCompleted() {
 843             if (remainingPushes == 0 && error == null && noMorePushes) {
 844                 T overallResult = multiProcessor.onComplete();
 845                 resultCF.complete(overallResult);
 846             }
 847         }
 848 
 849         synchronized void pushError(Throwable t) {
 850             if (t == null)
 851                 return;
 852             this.error = t;
 853             resultCF.completeExceptionally(t);












 854         }
 855     }
 856 }


  77  *               stream inputQ. Obeys remote and local flow control so may block.
  78  *               Calls user response body processor with data buffers.
  79  *
  80  * responseBodyAsync() -- calls responseBody() in an executor thread.
  81  *
  82  * incoming() -- entry point called from connection reader thread. Frames are
  83  *               either handled immediately without blocking or for data frames
  84  *               placed on the stream's inputQ which is consumed by the stream's
  85  *               reader thread.
  86  *
  87  * PushedStream sub class
  88  * ======================
  89  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  90  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  91  * is created. PushedStream does not use responseCF list as there can be only
  92  * one response. The CF is created when the object created and when the response
  93  * HEADERS frame is received the object is completed.
  94  */
  95 class Stream extends ExchangeImpl {
  96 
  97     final ClosableBlockingQueue<DataFrame> inputQ = new ClosableBlockingQueue<>();
  98 
  99     volatile int streamid;
 100 
 101     long responseContentLen = -1;
 102     long responseBytesProcessed = 0;
 103     long requestContentLen;
 104 
 105     Http2Connection connection;
 106     HttpClientImpl client;
 107     final HttpRequestImpl request;
 108     final DecodingCallback rspHeadersConsumer;
 109     HttpHeadersImpl responseHeaders;
 110     final HttpHeadersImpl requestHeaders;
 111     final HttpHeadersImpl requestPseudoHeaders;
 112     HttpResponse.BodyProcessor<?> responseProcessor;
 113     final HttpRequest.BodyProcessor requestProcessor;
 114     HttpResponse response;
 115     final WindowUpdateSender windowUpdater;
 116     // state flags
 117     boolean requestSent, responseReceived;
 118 
 119     final FlowController userRequestFlowController =
 120             new FlowController();


 121     final FlowController responseFlowController =
 122             new FlowController();
 123     final WindowControl outgoingWindow =  new WindowControl();
 124 
 125     final ExecutorWrapper executor;
 126 
 127     @Override
 128     @SuppressWarnings("unchecked")
 129     <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
 130         this.responseProcessor = processor;
 131         CompletableFuture<T> cf;
 132         try {
 133             T body = processor.onResponseBodyStart(
 134                     responseContentLen, responseHeaders,
 135                     responseFlowController); // TODO: filter headers
 136             if (body != null) {
 137                 cf = CompletableFuture.completedFuture(body);
 138                 receiveDataAsync(processor);
 139             } else
 140                 cf = receiveDataAsync(processor);
 141         } catch (IOException e) {
 142             cf = CompletableFuture.failedFuture(e);
 143         }
 144         PushGroup<?> pg = request.pushGroup();
 145         if (pg != null) {
 146             // if an error occurs make sure it is recorded in the PushGroup
 147             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 148         }
 149         return cf;
 150     }
 151 
 152     @Override
 153     public String toString() {
 154         StringBuilder sb = new StringBuilder();
 155         sb.append("streamid: ")
 156                 .append(streamid);
 157         return sb.toString();
 158     }
 159 
 160     // pushes entire response body into response processor
 161     // blocking when required by local or remote flow control
 162     void receiveData() throws IOException {

 163         DataFrame df = null;
 164         try {
 165             boolean endOfStream;
 166             do {
 167                 df = inputQ.take();
 168                 endOfStream = df.getFlag(DataFrame.END_STREAM);




 169                 int len = df.getDataLength();
 170                 ByteBuffer[] buffers = df.getData();
 171                 for (ByteBuffer b : buffers) {
 172                     responseFlowController.take();
 173                     responseProcessor.onResponseBodyChunk(b);
 174                 }
 175                 connection.windowUpdater.update(len);
 176                 if(!endOfStream) {
 177                     // if we got the last frame in the stream we shouldn't send WindowUpdate
 178                     windowUpdater.update(len);
 179                 }
 180             } while (!endOfStream);
 181         } catch (InterruptedException e) {
 182             throw new IOException(e);
 183         }
 184     }
 185 
 186     private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
 187         CompletableFuture<T> cf = new CompletableFuture<>();
 188         executor.execute(() -> {
 189             try {
 190                 receiveData();
 191                 T body = processor.onResponseComplete();
 192                 cf.complete(body);
 193                 responseReceived();
 194             } catch (Throwable t) {
 195                 cf.completeExceptionally(t);
 196             }
 197         }, null);
 198         return cf;
 199     }
 200 
















 201     @Override
 202     CompletableFuture<Void> sendBodyAsync() {
 203         final CompletableFuture<Void> cf = new CompletableFuture<>();
 204         executor.execute(() -> {
 205             try {
 206                 sendBodyImpl();
 207                 cf.complete(null);
 208             } catch (IOException | InterruptedException e) {
 209                 cf.completeExceptionally(e);
 210             }
 211         }, null);
 212         return cf;
 213     }
 214 
 215     @SuppressWarnings("unchecked")
 216     Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
 217         super(e);
 218         this.client = client;
 219         this.connection = connection;
 220         this.request = e.request();
 221         this.requestProcessor = request.requestProcessor();
 222         responseHeaders = new HttpHeadersImpl();
 223         requestHeaders = new HttpHeadersImpl();
 224         rspHeadersConsumer = (name, value) -> {
 225             responseHeaders.addHeader(name.toString(), value.toString());
 226         };
 227         this.executor = client.executorWrapper();
 228         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 229         this.requestPseudoHeaders = new HttpHeadersImpl();
 230         // NEW
 231         this.windowUpdater = new StreamWindowUpdateSender(connection);
 232     }
 233 
 234     @SuppressWarnings("unchecked")
 235     Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
 236         super(null);
 237         this.client = client;
 238         this.connection = connection;
 239         this.request = req;
 240         this.requestProcessor = null;
 241         responseHeaders = new HttpHeadersImpl();
 242         requestHeaders = new HttpHeadersImpl();
 243         rspHeadersConsumer = (name, value) -> {
 244             responseHeaders.addHeader(name.toString(), value.toString());
 245         };
 246         this.executor = client.executorWrapper();
 247         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 248         this.requestPseudoHeaders = new HttpHeadersImpl();
 249         // NEW
 250         this.windowUpdater = new StreamWindowUpdateSender(connection);
 251     }
 252 
 253     /**
 254      * Entry point from Http2Connection reader thread.
 255      *
 256      * Data frames will be removed by response body thread.
 257      *
 258      * @param frame
 259      * @throws IOException
 260      */
 261     void incoming(Http2Frame frame) throws IOException, InterruptedException {
 262         if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
 263             // Complete headers accumulated. handle response.
 264             // It's okay if there are multiple HeaderFrames.
 265             handleResponse();
 266         } else if (frame instanceof DataFrame) {
 267             inputQ.put((DataFrame)frame);
 268         } else {
 269             otherFrame(frame);
 270         }
 271     }
 272 
 273     void otherFrame(Http2Frame frame) throws IOException {
 274         switch (frame.type()) {
 275             case WindowUpdateFrame.TYPE:
 276                 incoming_windowUpdate((WindowUpdateFrame) frame);
 277                 break;
 278             case ResetFrame.TYPE:
 279                 incoming_reset((ResetFrame) frame);
 280                 break;
 281             case PriorityFrame.TYPE:
 282                 incoming_priority((PriorityFrame) frame);
 283                 break;
 284             default:
 285                 String msg = "Unexpected frame: " + frame.toString();
 286                 throw new IOException(msg);
 287         }


 306                 .firstValueAsLong("content-length")
 307                 .orElse(-1L);
 308         // different implementations for normal streams and pushed streams
 309         completeResponse(response);
 310     }
 311 
 312     void incoming_reset(ResetFrame frame) {
 313         // TODO: implement reset
 314         int error = frame.getErrorCode();
 315         IOException e = new IOException(ErrorFrame.stringForCode(error));
 316         completeResponseExceptionally(e);
 317         throw new UnsupportedOperationException("Not implemented");
 318     }
 319 
 320     void incoming_priority(PriorityFrame frame) {
 321         // TODO: implement priority
 322         throw new UnsupportedOperationException("Not implemented");
 323     }
 324 
 325     void incoming_windowUpdate(WindowUpdateFrame frame) {
 326         outgoingWindow.update(frame.getUpdate());


 327     }
 328 
 329     void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
 330         if (Log.requests()) {
 331             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 332         }
 333         PushGroup<?> pushGroup = request.pushGroup();
 334         if (pushGroup == null) {
 335             cancelImpl(new IllegalStateException("unexpected push promise"));
 336         }
 337         // get the handler and call it.
 338         BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
 339             pushGroup.pushHandler();
 340 
 341         CompletableFuture<HttpResponse> pushCF = pushStream
 342                 .getResponseAsync(null)
 343                 .thenApply(r -> (HttpResponse)r);
 344         boolean accept = ph.apply(pushReq, pushCF);
 345         if (!accept) {
 346             IOException ex = new IOException("Stream cancelled by user");


 471     @Override
 472     void sendBody() throws IOException, InterruptedException {
 473         sendBodyImpl();
 474     }
 475 
 476     void registerStream(int id) {
 477         this.streamid = id;
 478         connection.putStream(this, streamid);
 479     }
 480 
 481     DataFrame getDataFrame() throws IOException, InterruptedException {
 482         userRequestFlowController.take();
 483         int maxpayloadLen = connection.getMaxSendFrameSize();
 484         ByteBuffer buffer = connection.getBuffer();
 485         buffer.limit(maxpayloadLen);
 486         boolean complete = requestProcessor.onRequestBodyChunk(buffer);
 487         buffer.flip();
 488         int amount = buffer.remaining();
 489         // wait for flow control if necessary. Following method will block
 490         // until after headers frame is sent, so correct streamid is set.
 491         outgoingWindow.acquire(amount);
 492         connection.connectionSendWindow.acquire(amount);

 493         DataFrame df = new DataFrame();
 494         df.streamid(streamid);
 495         if (complete) {
 496             df.setFlag(DataFrame.END_STREAM);
 497         }
 498         df.setData(buffer);
 499         df.computeLength();
 500         return df;
 501     }
 502 
 503 
 504     @Override
 505     CompletableFuture<Void> sendHeadersAsync() {
 506         try {
 507             sendHeadersOnly();
 508             return CompletableFuture.completedFuture(null);
 509         } catch (IOException | InterruptedException ex) {
 510             return CompletableFuture.failedFuture(ex);
 511         }
 512     }


 588                 if (!cf.isDone()) {
 589                     cf.completeExceptionally(t);
 590                     response_cfs.remove(cf);
 591                     return;
 592                 }
 593             }
 594             response_cfs.add(CompletableFuture.failedFuture(t));
 595         }
 596     }
 597 
 598     void sendBodyImpl() throws IOException, InterruptedException {
 599         if (requestContentLen == 0) {
 600             // no body
 601             requestSent();
 602             return;
 603         }
 604         DataFrame df;
 605         do {
 606             df = getDataFrame();
 607             // TODO: check accumulated content length (if not checked below)
 608             connection.sendDataFrame(df);
 609         } while (!df.getFlag(DataFrame.END_STREAM));
 610         requestSent();
 611     }
 612 
 613     @Override
 614     void cancel() {
 615         cancelImpl(new Exception("Cancelled"));
 616     }
 617 
 618 
 619     void cancelImpl(Throwable e) {
 620         Log.logTrace("cancelling stream: {0}\n", e.toString());
 621         inputQ.close();
 622         completeResponseExceptionally(e);
 623         try {
 624             connection.resetStream(streamid, ResetFrame.CANCEL);
 625         } catch (IOException | InterruptedException ex) {
 626             Log.logError(ex);
 627         }
 628     }


 640         }, null);
 641         return cf;
 642     }
 643 
 644     @Override
 645     <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
 646         this.responseProcessor = processor;
 647         T body = processor.onResponseBodyStart(
 648                     responseContentLen, responseHeaders,
 649                     responseFlowController); // TODO: filter headers
 650         if (body == null) {
 651             receiveData();
 652             body = processor.onResponseComplete();
 653         } else
 654             receiveDataAsync(processor);
 655         responseReceived();
 656         return body;
 657     }
 658 
 659     // called from Http2Connection reader thread
 660     void updateOutgoingWindow(int update) {
 661         outgoingWindow.update(update);
 662     }
 663 
 664     void close(String msg) {
 665         cancel();
 666     }
 667 
 668     static class PushedStream extends Stream {
 669         final PushGroup<?> pushGroup;
 670         final private Stream parent;      // used by server push streams
 671         // push streams need the response CF allocated up front as it is
 672         // given directly to user via the multi handler callback function.
 673         final CompletableFuture<HttpResponseImpl> pushCF;
 674         final HttpRequestImpl pushReq;
 675 
 676         PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
 677                 Http2Connection connection, Stream parent,
 678                 HttpRequestImpl pushReq) {
 679             super(client, connection, pushReq);
 680             this.pushGroup = pushGroup;
 681             this.pushReq = pushReq;


 814             checkIfCompleted();
 815         }
 816 
 817         synchronized void pushCompleted() {
 818             remainingPushes--;
 819             checkIfCompleted();
 820         }
 821 
 822         synchronized void checkIfCompleted() {
 823             if (remainingPushes == 0 && error == null && noMorePushes) {
 824                 T overallResult = multiProcessor.onComplete();
 825                 resultCF.complete(overallResult);
 826             }
 827         }
 828 
 829         synchronized void pushError(Throwable t) {
 830             if (t == null)
 831                 return;
 832             this.error = t;
 833             resultCF.completeExceptionally(t);
 834         }
 835     }
 836 
 837     class StreamWindowUpdateSender extends WindowUpdateSender {
 838 
 839         StreamWindowUpdateSender(Http2Connection connection) {
 840             super(connection);
 841         }
 842 
 843         @Override
 844         int getStreamId() {
 845             return streamid;
 846         }
 847     }
 848 }
< prev index next >