1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.net.http; 27 28 import sun.net.httpclient.hpack.DecodingCallback; 29 30 import java.io.IOException; 31 import java.net.URI; 32 import java.nio.ByteBuffer; 33 import java.util.LinkedList; 34 import java.util.ArrayList; 35 import java.util.List; 36 import java.util.concurrent.CompletableFuture; 37 import java.util.concurrent.CompletionException; 38 import java.util.concurrent.ExecutionException; 39 import java.util.concurrent.TimeUnit; 40 import java.util.concurrent.TimeoutException; 41 import java.util.function.BiFunction; 42 import java.util.function.LongConsumer; 43 44 /** 45 * Http/2 Stream handling. 46 * 47 * REQUESTS 48 * 49 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q 50 * 51 * sendRequest() -- sendHeadersOnly() + sendBody() 52 * 53 * sendBody() -- in calling thread: obeys all flow control (so may block) 54 * obtains data from request body processor and places on connection 55 * outbound Q. 56 * 57 * sendBodyAsync() -- calls sendBody() in an executor thread. 58 * 59 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block 60 * 61 * sendRequestAsync() -- calls sendRequest() in an executor thread 62 * 63 * RESPONSES 64 * 65 * Multiple responses can be received per request. Responses are queued up on 66 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed 67 * with the next response 68 * 69 * getResponseAsync() -- queries list of response CFs and returns first one 70 * if one exists. Otherwise, creates one and adds it to list 71 * and returns it. Completion is achieved through the 72 * incoming() upcall from connection reader thread. 73 * 74 * getResponse() -- calls getResponseAsync() and waits for CF to complete 75 * 76 * responseBody() -- in calling thread: blocks for incoming DATA frames on 77 * stream inputQ. Obeys remote and local flow control so may block. 78 * Calls user response body processor with data buffers. 79 * 80 * responseBodyAsync() -- calls responseBody() in an executor thread. 81 * 82 * incoming() -- entry point called from connection reader thread. Frames are 83 * either handled immediately without blocking or for data frames 84 * placed on the stream's inputQ which is consumed by the stream's 85 * reader thread. 86 * 87 * PushedStream sub class 88 * ====================== 89 * Sending side methods are not used because the request comes from a PUSH_PROMISE 90 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream 91 * is created. PushedStream does not use responseCF list as there can be only 92 * one response. The CF is created when the object created and when the response 93 * HEADERS frame is received the object is completed. 94 */ 95 class Stream extends ExchangeImpl { 96 97 final ClosableBlockingQueue<DataFrame> inputQ = new ClosableBlockingQueue<>(); 98 99 volatile int streamid; 100 101 long responseContentLen = -1; 102 long responseBytesProcessed = 0; 103 long requestContentLen; 104 105 Http2Connection connection; 106 HttpClientImpl client; 107 final HttpRequestImpl request; 108 final DecodingCallback rspHeadersConsumer; 109 HttpHeadersImpl responseHeaders; 110 final HttpHeadersImpl requestHeaders; 111 final HttpHeadersImpl requestPseudoHeaders; 112 HttpResponse.BodyProcessor<?> responseProcessor; 113 final HttpRequest.BodyProcessor requestProcessor; 114 HttpResponse response; 115 final WindowUpdateSender windowUpdater; 116 // state flags 117 boolean requestSent, responseReceived; 118 119 final FlowController userRequestFlowController = 120 new FlowController(); 121 final FlowController responseFlowController = 122 new FlowController(); 123 final WindowControl outgoingWindow = new WindowControl(); 124 125 final ExecutorWrapper executor; 126 127 @Override 128 @SuppressWarnings("unchecked") 129 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) { 130 this.responseProcessor = processor; 131 CompletableFuture<T> cf; 132 try { 133 T body = processor.onResponseBodyStart( 134 responseContentLen, responseHeaders, 135 responseFlowController); // TODO: filter headers 136 if (body != null) { 137 cf = CompletableFuture.completedFuture(body); 138 receiveDataAsync(processor); 139 } else 140 cf = receiveDataAsync(processor); 141 } catch (IOException e) { 142 cf = CompletableFuture.failedFuture(e); 143 } 144 PushGroup<?> pg = request.pushGroup(); 145 if (pg != null) { 146 // if an error occurs make sure it is recorded in the PushGroup 147 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 148 } 149 return cf; 150 } 151 152 @Override 153 public String toString() { 154 StringBuilder sb = new StringBuilder(); 155 sb.append("streamid: ") 156 .append(streamid); 157 return sb.toString(); 158 } 159 160 // pushes entire response body into response processor 161 // blocking when required by local or remote flow control 162 void receiveData() throws IOException { 163 DataFrame df = null; 164 try { 165 boolean endOfStream; 166 do { 167 df = inputQ.take(); 168 endOfStream = df.getFlag(DataFrame.END_STREAM); 169 int len = df.getDataLength(); 170 ByteBuffer[] buffers = df.getData(); 171 for (ByteBuffer b : buffers) { 172 responseFlowController.take(); 173 responseProcessor.onResponseBodyChunk(b); 174 } 175 connection.windowUpdater.update(len); 176 if(!endOfStream) { 177 // if we got the last frame in the stream we shouldn't send WindowUpdate 178 windowUpdater.update(len); 179 } 180 } while (!endOfStream); 181 } catch (InterruptedException e) { 182 throw new IOException(e); 183 } 184 } 185 186 private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) { 187 CompletableFuture<T> cf = new CompletableFuture<>(); 188 executor.execute(() -> { 189 try { 190 receiveData(); 191 T body = processor.onResponseComplete(); 192 cf.complete(body); 193 responseReceived(); 194 } catch (Throwable t) { 195 cf.completeExceptionally(t); 196 } 197 }, null); 198 return cf; 199 } 200 201 @Override 202 CompletableFuture<Void> sendBodyAsync() { 203 final CompletableFuture<Void> cf = new CompletableFuture<>(); 204 executor.execute(() -> { 205 try { 206 sendBodyImpl(); 207 cf.complete(null); 208 } catch (IOException | InterruptedException e) { 209 cf.completeExceptionally(e); 210 } 211 }, null); 212 return cf; 213 } 214 215 @SuppressWarnings("unchecked") 216 Stream(HttpClientImpl client, Http2Connection connection, Exchange e) { 217 super(e); 218 this.client = client; 219 this.connection = connection; 220 this.request = e.request(); 221 this.requestProcessor = request.requestProcessor(); 222 responseHeaders = new HttpHeadersImpl(); 223 requestHeaders = new HttpHeadersImpl(); 224 rspHeadersConsumer = (name, value) -> { 225 responseHeaders.addHeader(name.toString(), value.toString()); 226 }; 227 this.executor = client.executorWrapper(); 228 //this.response_cf = new CompletableFuture<HttpResponseImpl>(); 229 this.requestPseudoHeaders = new HttpHeadersImpl(); 230 // NEW 231 this.windowUpdater = new StreamWindowUpdateSender(connection); 232 } 233 234 @SuppressWarnings("unchecked") 235 Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) { 236 super(null); 237 this.client = client; 238 this.connection = connection; 239 this.request = req; 240 this.requestProcessor = null; 241 responseHeaders = new HttpHeadersImpl(); 242 requestHeaders = new HttpHeadersImpl(); 243 rspHeadersConsumer = (name, value) -> { 244 responseHeaders.addHeader(name.toString(), value.toString()); 245 }; 246 this.executor = client.executorWrapper(); 247 //this.response_cf = new CompletableFuture<HttpResponseImpl>(); 248 this.requestPseudoHeaders = new HttpHeadersImpl(); 249 // NEW 250 this.windowUpdater = new StreamWindowUpdateSender(connection); 251 } 252 253 /** 254 * Entry point from Http2Connection reader thread. 255 * 256 * Data frames will be removed by response body thread. 257 * 258 * @param frame 259 * @throws IOException 260 */ 261 void incoming(Http2Frame frame) throws IOException, InterruptedException { 262 if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) { 263 // Complete headers accumulated. handle response. 264 // It's okay if there are multiple HeaderFrames. 265 handleResponse(); 266 } else if (frame instanceof DataFrame) { 267 inputQ.put((DataFrame)frame); 268 } else { 269 otherFrame(frame); 270 } 271 } 272 273 void otherFrame(Http2Frame frame) throws IOException { 274 switch (frame.type()) { 275 case WindowUpdateFrame.TYPE: 276 incoming_windowUpdate((WindowUpdateFrame) frame); 277 break; 278 case ResetFrame.TYPE: 279 incoming_reset((ResetFrame) frame); 280 break; 281 case PriorityFrame.TYPE: 282 incoming_priority((PriorityFrame) frame); 283 break; 284 default: 285 String msg = "Unexpected frame: " + frame.toString(); 286 throw new IOException(msg); 287 } 288 } 289 290 // The Hpack decoder decodes into one of these consumers of name,value pairs 291 292 DecodingCallback rspHeadersConsumer() { 293 return rspHeadersConsumer; 294 } 295 296 // create and return the HttpResponseImpl 297 protected void handleResponse() throws IOException { 298 HttpConnection c = connection.connection; // TODO: improve 299 long statusCode = responseHeaders 300 .firstValueAsLong(":status") 301 .orElseThrow(() -> new IOException("no statuscode in response")); 302 303 this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null, 304 c.sslParameters(), HttpClient.Version.HTTP_2, c); 305 this.responseContentLen = responseHeaders 306 .firstValueAsLong("content-length") 307 .orElse(-1L); 308 // different implementations for normal streams and pushed streams 309 completeResponse(response); 310 } 311 312 void incoming_reset(ResetFrame frame) { 313 // TODO: implement reset 314 int error = frame.getErrorCode(); 315 IOException e = new IOException(ErrorFrame.stringForCode(error)); 316 completeResponseExceptionally(e); 317 throw new UnsupportedOperationException("Not implemented"); 318 } 319 320 void incoming_priority(PriorityFrame frame) { 321 // TODO: implement priority 322 throw new UnsupportedOperationException("Not implemented"); 323 } 324 325 void incoming_windowUpdate(WindowUpdateFrame frame) { 326 outgoingWindow.update(frame.getUpdate()); 327 } 328 329 void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException { 330 if (Log.requests()) { 331 Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); 332 } 333 PushGroup<?> pushGroup = request.pushGroup(); 334 if (pushGroup == null) { 335 cancelImpl(new IllegalStateException("unexpected push promise")); 336 } 337 // get the handler and call it. 338 BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph = 339 pushGroup.pushHandler(); 340 341 CompletableFuture<HttpResponse> pushCF = pushStream 342 .getResponseAsync(null) 343 .thenApply(r -> (HttpResponse)r); 344 boolean accept = ph.apply(pushReq, pushCF); 345 if (!accept) { 346 IOException ex = new IOException("Stream cancelled by user"); 347 cancelImpl(ex); 348 pushCF.completeExceptionally(ex); 349 } else { 350 pushStream.requestSent(); 351 pushGroup.addPush(); 352 } 353 } 354 355 private OutgoingHeaders headerFrame(long contentLength) { 356 HttpHeadersImpl h = request.getSystemHeaders(); 357 if (contentLength > 0) { 358 h.setHeader("content-length", Long.toString(contentLength)); 359 } 360 setPseudoHeaderFields(); 361 OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this); 362 if (contentLength == 0) { 363 f.setFlag(HeadersFrame.END_STREAM); 364 } 365 return f; 366 } 367 368 private void setPseudoHeaderFields() { 369 HttpHeadersImpl hdrs = requestPseudoHeaders; 370 String method = request.method(); 371 hdrs.setHeader(":method", method); 372 URI uri = request.uri(); 373 hdrs.setHeader(":scheme", uri.getScheme()); 374 // TODO: userinfo deprecated. Needs to be removed 375 hdrs.setHeader(":authority", uri.getAuthority()); 376 // TODO: ensure header names beginning with : not in user headers 377 String query = uri.getQuery(); 378 String path = uri.getPath(); 379 if (path == null) { 380 if (method.equalsIgnoreCase("OPTIONS")) { 381 path = "*"; 382 } else { 383 path = "/"; 384 } 385 } 386 if (query != null) { 387 path += "?" + query; 388 } 389 hdrs.setHeader(":path", path); 390 } 391 392 HttpHeadersImpl getRequestPseudoHeaders() { 393 return requestPseudoHeaders; 394 } 395 396 @Override 397 HttpResponseImpl getResponse() throws IOException { 398 try { 399 if (request.timeval() > 0) { 400 return getResponseAsync(null).get( 401 request.timeval(), TimeUnit.MILLISECONDS); 402 } else { 403 return getResponseAsync(null).join(); 404 } 405 } catch (TimeoutException e) { 406 throw new HttpTimeoutException("Response timed out"); 407 } catch (InterruptedException | ExecutionException | CompletionException e) { 408 Throwable t = e.getCause(); 409 if (t instanceof IOException) { 410 throw (IOException)t; 411 } 412 throw new IOException(e); 413 } 414 } 415 416 @Override 417 void sendRequest() throws IOException, InterruptedException { 418 sendHeadersOnly(); 419 sendBody(); 420 } 421 422 /** 423 * A simple general purpose blocking flow controller 424 */ 425 class FlowController implements LongConsumer { 426 int permits; 427 428 FlowController() { 429 this.permits = 0; 430 } 431 432 @Override 433 public synchronized void accept(long n) { 434 if (n < 1) { 435 throw new InternalError("FlowController.accept called with " + n); 436 } 437 if (permits == 0) { 438 permits += n; 439 notifyAll(); 440 } else { 441 permits += n; 442 } 443 } 444 445 public synchronized void take() throws InterruptedException { 446 take(1); 447 } 448 449 public synchronized void take(int amount) throws InterruptedException { 450 assert permits >= 0; 451 while (amount > 0) { 452 int n = Math.min(amount, permits); 453 permits -= n; 454 amount -= n; 455 if (amount > 0) 456 wait(); 457 } 458 } 459 } 460 461 @Override 462 void sendHeadersOnly() throws IOException, InterruptedException { 463 if (Log.requests() && request != null) { 464 Log.logRequest(request.toString()); 465 } 466 requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController); 467 OutgoingHeaders f = headerFrame(requestContentLen); 468 connection.sendFrame(f); 469 } 470 471 @Override 472 void sendBody() throws IOException, InterruptedException { 473 sendBodyImpl(); 474 } 475 476 void registerStream(int id) { 477 this.streamid = id; 478 connection.putStream(this, streamid); 479 } 480 481 DataFrame getDataFrame() throws IOException, InterruptedException { 482 userRequestFlowController.take(); 483 int maxpayloadLen = connection.getMaxSendFrameSize(); 484 ByteBuffer buffer = connection.getBuffer(); 485 buffer.limit(maxpayloadLen); 486 boolean complete = requestProcessor.onRequestBodyChunk(buffer); 487 buffer.flip(); 488 int amount = buffer.remaining(); 489 // wait for flow control if necessary. Following method will block 490 // until after headers frame is sent, so correct streamid is set. 491 outgoingWindow.acquire(amount); 492 connection.connectionSendWindow.acquire(amount); 493 DataFrame df = new DataFrame(); 494 df.streamid(streamid); 495 if (complete) { 496 df.setFlag(DataFrame.END_STREAM); 497 } 498 df.setData(buffer); 499 df.computeLength(); 500 return df; 501 } 502 503 504 @Override 505 CompletableFuture<Void> sendHeadersAsync() { 506 try { 507 sendHeadersOnly(); 508 return CompletableFuture.completedFuture(null); 509 } catch (IOException | InterruptedException ex) { 510 return CompletableFuture.failedFuture(ex); 511 } 512 } 513 514 /** 515 * A List of responses relating to this stream. Normally there is only 516 * one response, but intermediate responses like 100 are allowed 517 * and must be passed up to higher level before continuing. Deals with races 518 * such as if responses are returned before the CFs get created by 519 * getResponseAsync() 520 */ 521 522 final List<CompletableFuture<HttpResponseImpl>> response_cfs = new ArrayList<>(5); 523 524 @Override 525 CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) { 526 CompletableFuture<HttpResponseImpl> cf; 527 synchronized (response_cfs) { 528 if (!response_cfs.isEmpty()) { 529 cf = response_cfs.remove(0); 530 } else { 531 cf = new CompletableFuture<>(); 532 response_cfs.add(cf); 533 } 534 } 535 PushGroup<?> pg = request.pushGroup(); 536 if (pg != null) { 537 // if an error occurs make sure it is recorded in the PushGroup 538 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 539 } 540 return cf; 541 } 542 543 /** 544 * Completes the first uncompleted CF on list, and removes it. If there is no 545 * uncompleted CF then creates one (completes it) and adds to list 546 */ 547 void completeResponse(HttpResponse r) { 548 HttpResponseImpl resp = (HttpResponseImpl)r; 549 synchronized (response_cfs) { 550 int cfs_len = response_cfs.size(); 551 for (int i=0; i<cfs_len; i++) { 552 CompletableFuture<HttpResponseImpl> cf = response_cfs.get(i); 553 if (!cf.isDone()) { 554 cf.complete(resp); 555 response_cfs.remove(cf); 556 return; 557 } 558 } 559 response_cfs.add(CompletableFuture.completedFuture(resp)); 560 } 561 } 562 563 // methods to update state and remove stream when finished 564 565 synchronized void requestSent() { 566 requestSent = true; 567 if (responseReceived) 568 connection.deleteStream(this); 569 } 570 571 synchronized void responseReceived() { 572 responseReceived = true; 573 if (requestSent) 574 connection.deleteStream(this); 575 PushGroup<?> pg = request.pushGroup(); 576 if (pg != null) 577 pg.noMorePushes(); 578 } 579 580 /** 581 * same as above but for errors 582 * 583 * @param t 584 */ 585 void completeResponseExceptionally(Throwable t) { 586 synchronized (response_cfs) { 587 for (CompletableFuture<HttpResponseImpl> cf : response_cfs) { 588 if (!cf.isDone()) { 589 cf.completeExceptionally(t); 590 response_cfs.remove(cf); 591 return; 592 } 593 } 594 response_cfs.add(CompletableFuture.failedFuture(t)); 595 } 596 } 597 598 void sendBodyImpl() throws IOException, InterruptedException { 599 if (requestContentLen == 0) { 600 // no body 601 requestSent(); 602 return; 603 } 604 DataFrame df; 605 do { 606 df = getDataFrame(); 607 // TODO: check accumulated content length (if not checked below) 608 connection.sendDataFrame(df); 609 } while (!df.getFlag(DataFrame.END_STREAM)); 610 requestSent(); 611 } 612 613 @Override 614 void cancel() { 615 cancelImpl(new Exception("Cancelled")); 616 } 617 618 619 void cancelImpl(Throwable e) { 620 Log.logTrace("cancelling stream: {0}\n", e.toString()); 621 inputQ.close(); 622 completeResponseExceptionally(e); 623 try { 624 connection.resetStream(streamid, ResetFrame.CANCEL); 625 } catch (IOException | InterruptedException ex) { 626 Log.logError(ex); 627 } 628 } 629 630 @Override 631 CompletableFuture<Void> sendRequestAsync() { 632 CompletableFuture<Void> cf = new CompletableFuture<>(); 633 executor.execute(() -> { 634 try { 635 sendRequest(); 636 cf.complete(null); 637 } catch (IOException |InterruptedException e) { 638 cf.completeExceptionally(e); 639 } 640 }, null); 641 return cf; 642 } 643 644 @Override 645 <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException { 646 this.responseProcessor = processor; 647 T body = processor.onResponseBodyStart( 648 responseContentLen, responseHeaders, 649 responseFlowController); // TODO: filter headers 650 if (body == null) { 651 receiveData(); 652 body = processor.onResponseComplete(); 653 } else 654 receiveDataAsync(processor); 655 responseReceived(); 656 return body; 657 } 658 659 // called from Http2Connection reader thread 660 void updateOutgoingWindow(int update) { 661 outgoingWindow.update(update); 662 } 663 664 void close(String msg) { 665 cancel(); 666 } 667 668 static class PushedStream extends Stream { 669 final PushGroup<?> pushGroup; 670 final private Stream parent; // used by server push streams 671 // push streams need the response CF allocated up front as it is 672 // given directly to user via the multi handler callback function. 673 final CompletableFuture<HttpResponseImpl> pushCF; 674 final HttpRequestImpl pushReq; 675 676 PushedStream(PushGroup<?> pushGroup, HttpClientImpl client, 677 Http2Connection connection, Stream parent, 678 HttpRequestImpl pushReq) { 679 super(client, connection, pushReq); 680 this.pushGroup = pushGroup; 681 this.pushReq = pushReq; 682 this.pushCF = new CompletableFuture<>(); 683 this.parent = parent; 684 } 685 686 // Following methods call the super class but in case of 687 // error record it in the PushGroup. The error method is called 688 // with a null value when no error occurred (is a no-op) 689 @Override 690 CompletableFuture<Void> sendBodyAsync() { 691 return super.sendBodyAsync() 692 .whenComplete((v, t) -> pushGroup.pushError(t)); 693 } 694 695 @Override 696 CompletableFuture<Void> sendHeadersAsync() { 697 return super.sendHeadersAsync() 698 .whenComplete((v, t) -> pushGroup.pushError(t)); 699 } 700 701 @Override 702 CompletableFuture<Void> sendRequestAsync() { 703 return super.sendRequestAsync() 704 .whenComplete((v, t) -> pushGroup.pushError(t)); 705 } 706 707 @Override 708 CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) { 709 return pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); 710 } 711 712 @Override 713 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) { 714 return super.responseBodyAsync(processor) 715 .whenComplete((v, t) -> pushGroup.pushError(t)); 716 } 717 718 @Override 719 void completeResponse(HttpResponse r) { 720 HttpResponseImpl resp = (HttpResponseImpl)r; 721 Utils.logResponse(resp); 722 pushCF.complete(resp); 723 } 724 725 @Override 726 void completeResponseExceptionally(Throwable t) { 727 pushCF.completeExceptionally(t); 728 } 729 730 @Override 731 synchronized void responseReceived() { 732 super.responseReceived(); 733 pushGroup.pushCompleted(); 734 } 735 736 // create and return the PushResponseImpl 737 @Override 738 protected void handleResponse() { 739 HttpConnection c = connection.connection; // TODO: improve 740 long statusCode = responseHeaders 741 .firstValueAsLong(":status") 742 .orElse(-1L); 743 744 if (statusCode == -1L) 745 completeResponseExceptionally(new IOException("No status code")); 746 ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS); 747 this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this, 748 c.sslParameters()); 749 this.responseContentLen = responseHeaders 750 .firstValueAsLong("content-length") 751 .orElse(-1L); 752 // different implementations for normal streams and pushed streams 753 completeResponse(response); 754 } 755 } 756 757 /** 758 * One PushGroup object is associated with the parent Stream of 759 * the pushed Streams. This keeps track of all common state associated 760 * with the pushes. 761 */ 762 static class PushGroup<T> { 763 // the overall completion object, completed when all pushes are done. 764 final CompletableFuture<T> resultCF; 765 Throwable error; // any exception that occured during pushes 766 767 // CF for main response 768 final CompletableFuture<HttpResponse> mainResponse; 769 770 // user's processor object 771 final HttpResponse.MultiProcessor<T> multiProcessor; 772 773 // per push handler function provided by processor 774 final private BiFunction<HttpRequest, 775 CompletableFuture<HttpResponse>, 776 Boolean> pushHandler; 777 int numberOfPushes; 778 int remainingPushes; 779 boolean noMorePushes = false; 780 781 PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) { 782 this.resultCF = new CompletableFuture<>(); 783 this.mainResponse = new CompletableFuture<>(); 784 this.multiProcessor = multiProcessor; 785 this.pushHandler = multiProcessor.onStart(req, mainResponse); 786 } 787 788 CompletableFuture<T> groupResult() { 789 return resultCF; 790 } 791 792 CompletableFuture<HttpResponse> mainResponse() { 793 return mainResponse; 794 } 795 796 private BiFunction<HttpRequest, 797 CompletableFuture<HttpResponse>, Boolean> pushHandler() 798 { 799 return pushHandler; 800 } 801 802 synchronized void addPush() { 803 numberOfPushes++; 804 remainingPushes++; 805 } 806 807 synchronized int numberOfPushes() { 808 return numberOfPushes; 809 } 810 // This is called when the main body response completes because it means 811 // no more PUSH_PROMISEs are possible 812 synchronized void noMorePushes() { 813 noMorePushes = true; 814 checkIfCompleted(); 815 } 816 817 synchronized void pushCompleted() { 818 remainingPushes--; 819 checkIfCompleted(); 820 } 821 822 synchronized void checkIfCompleted() { 823 if (remainingPushes == 0 && error == null && noMorePushes) { 824 T overallResult = multiProcessor.onComplete(); 825 resultCF.complete(overallResult); 826 } 827 } 828 829 synchronized void pushError(Throwable t) { 830 if (t == null) 831 return; 832 this.error = t; 833 resultCF.completeExceptionally(t); 834 } 835 } 836 837 class StreamWindowUpdateSender extends WindowUpdateSender { 838 839 StreamWindowUpdateSender(Http2Connection connection) { 840 super(connection); 841 } 842 843 @Override 844 int getStreamId() { 845 return streamid; 846 } 847 } 848 }