1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.net.http; 27 28 import sun.net.httpclient.hpack.DecodingCallback; 29 30 import java.io.IOException; 31 import java.net.URI; 32 import java.nio.ByteBuffer; 33 import java.util.LinkedList; 34 import java.util.ArrayList; 35 import java.util.List; 36 import java.util.concurrent.CompletableFuture; 37 import java.util.concurrent.CompletionException; 38 import java.util.concurrent.ExecutionException; 39 import java.util.concurrent.TimeUnit; 40 import java.util.concurrent.TimeoutException; 41 import java.util.function.BiFunction; 42 import java.util.function.LongConsumer; 43 44 /** 45 * Http/2 Stream handling. 46 * 47 * REQUESTS 48 * 49 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q 50 * 51 * sendRequest() -- sendHeadersOnly() + sendBody() 52 * 53 * sendBody() -- in calling thread: obeys all flow control (so may block) 54 * obtains data from request body processor and places on connection 55 * outbound Q. 56 * 57 * sendBodyAsync() -- calls sendBody() in an executor thread. 58 * 59 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block 60 * 61 * sendRequestAsync() -- calls sendRequest() in an executor thread 62 * 63 * RESPONSES 64 * 65 * Multiple responses can be received per request. Responses are queued up on 66 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed 67 * with the next response 68 * 69 * getResponseAsync() -- queries list of response CFs and returns first one 70 * if one exists. Otherwise, creates one and adds it to list 71 * and returns it. Completion is achieved through the 72 * incoming() upcall from connection reader thread. 73 * 74 * getResponse() -- calls getResponseAsync() and waits for CF to complete 75 * 76 * responseBody() -- in calling thread: blocks for incoming DATA frames on 77 * stream inputQ. Obeys remote and local flow control so may block. 78 * Calls user response body processor with data buffers. 79 * 80 * responseBodyAsync() -- calls responseBody() in an executor thread. 81 * 82 * incoming() -- entry point called from connection reader thread. Frames are 83 * either handled immediately without blocking or for data frames 84 * placed on the stream's inputQ which is consumed by the stream's 85 * reader thread. 86 * 87 * PushedStream sub class 88 * ====================== 89 * Sending side methods are not used because the request comes from a PUSH_PROMISE 90 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream 91 * is created. PushedStream does not use responseCF list as there can be only 92 * one response. The CF is created when the object created and when the response 93 * HEADERS frame is received the object is completed. 94 */ 95 class Stream extends ExchangeImpl { 96 97 final Queue<Http2Frame> inputQ; 98 99 volatile int streamid; 100 101 long responseContentLen = -1; 102 long responseBytesProcessed = 0; 103 long requestContentLen; 104 105 Http2Connection connection; 106 HttpClientImpl client; 107 final HttpRequestImpl request; 108 final DecodingCallback rspHeadersConsumer; 109 HttpHeadersImpl responseHeaders; 110 final HttpHeadersImpl requestHeaders; 111 final HttpHeadersImpl requestPseudoHeaders; 112 HttpResponse.BodyProcessor<?> responseProcessor; 113 final HttpRequest.BodyProcessor requestProcessor; 114 HttpResponse response; 115 116 // state flags 117 boolean requestSent, responseReceived; 118 119 final FlowController userRequestFlowController = 120 new FlowController(); 121 final FlowController remoteRequestFlowController = 122 new FlowController(); 123 final FlowController responseFlowController = 124 new FlowController(); 125 126 final ExecutorWrapper executor; 127 128 @Override 129 @SuppressWarnings("unchecked") 130 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) { 131 this.responseProcessor = processor; 132 CompletableFuture<T> cf; 133 try { 134 T body = processor.onResponseBodyStart( 135 responseContentLen, responseHeaders, 136 responseFlowController); // TODO: filter headers 137 if (body != null) { 138 cf = CompletableFuture.completedFuture(body); 139 receiveDataAsync(processor); 140 } else 141 cf = receiveDataAsync(processor); 142 } catch (IOException e) { 143 cf = CompletableFuture.failedFuture(e); 144 } 145 PushGroup<?> pg = request.pushGroup(); 146 if (pg != null) { 147 // if an error occurs make sure it is recorded in the PushGroup 148 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 149 } 150 return cf; 151 } 152 153 @Override 154 public String toString() { 155 StringBuilder sb = new StringBuilder(); 156 sb.append("streamid: ") 157 .append(streamid); 158 return sb.toString(); 159 } 160 161 // pushes entire response body into response processor 162 // blocking when required by local or remote flow control 163 void receiveData() throws IOException { 164 Http2Frame frame; 165 DataFrame df = null; 166 try { 167 do { 168 frame = inputQ.take(); 169 if (!(frame instanceof DataFrame)) { 170 assert false; 171 continue; 172 } 173 df = (DataFrame) frame; 174 int len = df.getDataLength(); 175 ByteBuffer[] buffers = df.getData(); 176 for (ByteBuffer b : buffers) { 177 responseFlowController.take(); 178 responseProcessor.onResponseBodyChunk(b); 179 } 180 sendWindowUpdate(len); 181 } while (!df.getFlag(DataFrame.END_STREAM)); 182 } catch (InterruptedException e) { 183 throw new IOException(e); 184 } 185 } 186 187 private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) { 188 CompletableFuture<T> cf = new CompletableFuture<>(); 189 executor.execute(() -> { 190 try { 191 receiveData(); 192 T body = processor.onResponseComplete(); 193 cf.complete(body); 194 responseReceived(); 195 } catch (Throwable t) { 196 cf.completeExceptionally(t); 197 } 198 }, null); 199 return cf; 200 } 201 202 private void sendWindowUpdate(int increment) 203 throws IOException, InterruptedException { 204 if (increment == 0) 205 return; 206 LinkedList<Http2Frame> list = new LinkedList<>(); 207 WindowUpdateFrame frame = new WindowUpdateFrame(); 208 frame.streamid(streamid); 209 frame.setUpdate(increment); 210 list.add(frame); 211 frame = new WindowUpdateFrame(); 212 frame.streamid(0); 213 frame.setUpdate(increment); 214 list.add(frame); 215 connection.sendFrames(list); 216 } 217 218 @Override 219 CompletableFuture<Void> sendBodyAsync() { 220 final CompletableFuture<Void> cf = new CompletableFuture<>(); 221 executor.execute(() -> { 222 try { 223 sendBodyImpl(); 224 cf.complete(null); 225 } catch (IOException | InterruptedException e) { 226 cf.completeExceptionally(e); 227 } 228 }, null); 229 return cf; 230 } 231 232 @SuppressWarnings("unchecked") 233 Stream(HttpClientImpl client, Http2Connection connection, Exchange e) { 234 super(e); 235 this.client = client; 236 this.connection = connection; 237 this.request = e.request(); 238 this.requestProcessor = request.requestProcessor(); 239 responseHeaders = new HttpHeadersImpl(); 240 requestHeaders = new HttpHeadersImpl(); 241 rspHeadersConsumer = (name, value) -> { 242 responseHeaders.addHeader(name.toString(), value.toString()); 243 }; 244 this.executor = client.executorWrapper(); 245 //this.response_cf = new CompletableFuture<HttpResponseImpl>(); 246 this.requestPseudoHeaders = new HttpHeadersImpl(); 247 // NEW 248 this.inputQ = new Queue<>(); 249 } 250 251 @SuppressWarnings("unchecked") 252 Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) { 253 super(null); 254 this.client = client; 255 this.connection = connection; 256 this.request = req; 257 this.requestProcessor = null; 258 responseHeaders = new HttpHeadersImpl(); 259 requestHeaders = new HttpHeadersImpl(); 260 rspHeadersConsumer = (name, value) -> { 261 responseHeaders.addHeader(name.toString(), value.toString()); 262 }; 263 this.executor = client.executorWrapper(); 264 //this.response_cf = new CompletableFuture<HttpResponseImpl>(); 265 this.requestPseudoHeaders = new HttpHeadersImpl(); 266 // NEW 267 this.inputQ = new Queue<>(); 268 } 269 270 /** 271 * Entry point from Http2Connection reader thread. 272 * 273 * Data frames will be removed by response body thread. 274 * 275 * @param frame 276 * @throws IOException 277 */ 278 void incoming(Http2Frame frame) throws IOException, InterruptedException { 279 if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) { 280 // Complete headers accumulated. handle response. 281 // It's okay if there are multiple HeaderFrames. 282 handleResponse(); 283 } else if (frame instanceof DataFrame) { 284 inputQ.put(frame); 285 } else { 286 otherFrame(frame); 287 } 288 } 289 290 void otherFrame(Http2Frame frame) throws IOException { 291 switch (frame.type()) { 292 case WindowUpdateFrame.TYPE: 293 incoming_windowUpdate((WindowUpdateFrame) frame); 294 break; 295 case ResetFrame.TYPE: 296 incoming_reset((ResetFrame) frame); 297 break; 298 case PriorityFrame.TYPE: 299 incoming_priority((PriorityFrame) frame); 300 break; 301 default: 302 String msg = "Unexpected frame: " + frame.toString(); 303 throw new IOException(msg); 304 } 305 } 306 307 // The Hpack decoder decodes into one of these consumers of name,value pairs 308 309 DecodingCallback rspHeadersConsumer() { 310 return rspHeadersConsumer; 311 } 312 313 // create and return the HttpResponseImpl 314 protected void handleResponse() throws IOException { 315 HttpConnection c = connection.connection; // TODO: improve 316 long statusCode = responseHeaders 317 .firstValueAsLong(":status") 318 .orElseThrow(() -> new IOException("no statuscode in response")); 319 320 this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null, 321 c.sslParameters(), HttpClient.Version.HTTP_2, c); 322 this.responseContentLen = responseHeaders 323 .firstValueAsLong("content-length") 324 .orElse(-1L); 325 // different implementations for normal streams and pushed streams 326 completeResponse(response); 327 } 328 329 void incoming_reset(ResetFrame frame) { 330 // TODO: implement reset 331 int error = frame.getErrorCode(); 332 IOException e = new IOException(ErrorFrame.stringForCode(error)); 333 completeResponseExceptionally(e); 334 throw new UnsupportedOperationException("Not implemented"); 335 } 336 337 void incoming_priority(PriorityFrame frame) { 338 // TODO: implement priority 339 throw new UnsupportedOperationException("Not implemented"); 340 } 341 342 void incoming_windowUpdate(WindowUpdateFrame frame) { 343 int amount = frame.getUpdate(); 344 if (amount > 0) 345 remoteRequestFlowController.accept(amount); 346 } 347 348 void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException { 349 if (Log.requests()) { 350 Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); 351 } 352 PushGroup<?> pushGroup = request.pushGroup(); 353 if (pushGroup == null) { 354 cancelImpl(new IllegalStateException("unexpected push promise")); 355 } 356 // get the handler and call it. 357 BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph = 358 pushGroup.pushHandler(); 359 360 CompletableFuture<HttpResponse> pushCF = pushStream 361 .getResponseAsync(null) 362 .thenApply(r -> (HttpResponse)r); 363 boolean accept = ph.apply(pushReq, pushCF); 364 if (!accept) { 365 IOException ex = new IOException("Stream cancelled by user"); 366 cancelImpl(ex); 367 pushCF.completeExceptionally(ex); 368 } else { 369 pushStream.requestSent(); 370 pushGroup.addPush(); 371 } 372 } 373 374 private OutgoingHeaders headerFrame(long contentLength) { 375 HttpHeadersImpl h = request.getSystemHeaders(); 376 if (contentLength > 0) { 377 h.setHeader("content-length", Long.toString(contentLength)); 378 } 379 setPseudoHeaderFields(); 380 OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this); 381 if (contentLength == 0) { 382 f.setFlag(HeadersFrame.END_STREAM); 383 } 384 return f; 385 } 386 387 private void setPseudoHeaderFields() { 388 HttpHeadersImpl hdrs = requestPseudoHeaders; 389 String method = request.method(); 390 hdrs.setHeader(":method", method); 391 URI uri = request.uri(); 392 hdrs.setHeader(":scheme", uri.getScheme()); 393 // TODO: userinfo deprecated. Needs to be removed 394 hdrs.setHeader(":authority", uri.getAuthority()); 395 // TODO: ensure header names beginning with : not in user headers 396 String query = uri.getQuery(); 397 String path = uri.getPath(); 398 if (path == null) { 399 if (method.equalsIgnoreCase("OPTIONS")) { 400 path = "*"; 401 } else { 402 path = "/"; 403 } 404 } 405 if (query != null) { 406 path += "?" + query; 407 } 408 hdrs.setHeader(":path", path); 409 } 410 411 HttpHeadersImpl getRequestPseudoHeaders() { 412 return requestPseudoHeaders; 413 } 414 415 @Override 416 HttpResponseImpl getResponse() throws IOException { 417 try { 418 if (request.timeval() > 0) { 419 return getResponseAsync(null).get( 420 request.timeval(), TimeUnit.MILLISECONDS); 421 } else { 422 return getResponseAsync(null).join(); 423 } 424 } catch (TimeoutException e) { 425 throw new HttpTimeoutException("Response timed out"); 426 } catch (InterruptedException | ExecutionException | CompletionException e) { 427 Throwable t = e.getCause(); 428 if (t instanceof IOException) { 429 throw (IOException)t; 430 } 431 throw new IOException(e); 432 } 433 } 434 435 @Override 436 void sendRequest() throws IOException, InterruptedException { 437 sendHeadersOnly(); 438 sendBody(); 439 } 440 441 /** 442 * A simple general purpose blocking flow controller 443 */ 444 class FlowController implements LongConsumer { 445 int permits; 446 447 FlowController() { 448 this.permits = 0; 449 } 450 451 @Override 452 public synchronized void accept(long n) { 453 if (n < 1) { 454 throw new InternalError("FlowController.accept called with " + n); 455 } 456 if (permits == 0) { 457 permits += n; 458 notifyAll(); 459 } else { 460 permits += n; 461 } 462 } 463 464 public synchronized void take() throws InterruptedException { 465 take(1); 466 } 467 468 public synchronized void take(int amount) throws InterruptedException { 469 assert permits >= 0; 470 while (permits < amount) { 471 int n = Math.min(amount, permits); 472 permits -= n; 473 amount -= n; 474 if (amount > 0) 475 wait(); 476 } 477 } 478 } 479 480 @Override 481 void sendHeadersOnly() throws IOException, InterruptedException { 482 if (Log.requests() && request != null) { 483 Log.logRequest(request.toString()); 484 } 485 requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController); 486 OutgoingHeaders f = headerFrame(requestContentLen); 487 connection.sendFrame(f); 488 } 489 490 @Override 491 void sendBody() throws IOException, InterruptedException { 492 sendBodyImpl(); 493 } 494 495 void registerStream(int id) { 496 this.streamid = id; 497 connection.putStream(this, streamid); 498 } 499 500 DataFrame getDataFrame() throws IOException, InterruptedException { 501 userRequestFlowController.take(); 502 int maxpayloadLen = connection.getMaxSendFrameSize() - 9; 503 ByteBuffer buffer = connection.getBuffer(); 504 buffer.limit(maxpayloadLen); 505 boolean complete = requestProcessor.onRequestBodyChunk(buffer); 506 buffer.flip(); 507 int amount = buffer.remaining(); 508 // wait for flow control if necessary. Following method will block 509 // until after headers frame is sent, so correct streamid is set. 510 remoteRequestFlowController.take(amount); 511 connection.obtainSendWindow(amount); 512 513 DataFrame df = new DataFrame(); 514 df.streamid(streamid); 515 if (complete) { 516 df.setFlag(DataFrame.END_STREAM); 517 } 518 df.setData(buffer); 519 df.computeLength(); 520 return df; 521 } 522 523 524 @Override 525 CompletableFuture<Void> sendHeadersAsync() { 526 try { 527 sendHeadersOnly(); 528 return CompletableFuture.completedFuture(null); 529 } catch (IOException | InterruptedException ex) { 530 return CompletableFuture.failedFuture(ex); 531 } 532 } 533 534 /** 535 * A List of responses relating to this stream. Normally there is only 536 * one response, but intermediate responses like 100 are allowed 537 * and must be passed up to higher level before continuing. Deals with races 538 * such as if responses are returned before the CFs get created by 539 * getResponseAsync() 540 */ 541 542 final List<CompletableFuture<HttpResponseImpl>> response_cfs = new ArrayList<>(5); 543 544 @Override 545 CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) { 546 CompletableFuture<HttpResponseImpl> cf; 547 synchronized (response_cfs) { 548 if (!response_cfs.isEmpty()) { 549 cf = response_cfs.remove(0); 550 } else { 551 cf = new CompletableFuture<>(); 552 response_cfs.add(cf); 553 } 554 } 555 PushGroup<?> pg = request.pushGroup(); 556 if (pg != null) { 557 // if an error occurs make sure it is recorded in the PushGroup 558 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 559 } 560 return cf; 561 } 562 563 /** 564 * Completes the first uncompleted CF on list, and removes it. If there is no 565 * uncompleted CF then creates one (completes it) and adds to list 566 */ 567 void completeResponse(HttpResponse r) { 568 HttpResponseImpl resp = (HttpResponseImpl)r; 569 synchronized (response_cfs) { 570 int cfs_len = response_cfs.size(); 571 for (int i=0; i<cfs_len; i++) { 572 CompletableFuture<HttpResponseImpl> cf = response_cfs.get(i); 573 if (!cf.isDone()) { 574 cf.complete(resp); 575 response_cfs.remove(cf); 576 return; 577 } 578 } 579 response_cfs.add(CompletableFuture.completedFuture(resp)); 580 } 581 } 582 583 // methods to update state and remove stream when finished 584 585 synchronized void requestSent() { 586 requestSent = true; 587 if (responseReceived) 588 connection.deleteStream(this); 589 } 590 591 synchronized void responseReceived() { 592 responseReceived = true; 593 if (requestSent) 594 connection.deleteStream(this); 595 PushGroup<?> pg = request.pushGroup(); 596 if (pg != null) 597 pg.noMorePushes(); 598 } 599 600 /** 601 * same as above but for errors 602 * 603 * @param t 604 */ 605 void completeResponseExceptionally(Throwable t) { 606 synchronized (response_cfs) { 607 for (CompletableFuture<HttpResponseImpl> cf : response_cfs) { 608 if (!cf.isDone()) { 609 cf.completeExceptionally(t); 610 response_cfs.remove(cf); 611 return; 612 } 613 } 614 response_cfs.add(CompletableFuture.failedFuture(t)); 615 } 616 } 617 618 void sendBodyImpl() throws IOException, InterruptedException { 619 if (requestContentLen == 0) { 620 // no body 621 requestSent(); 622 return; 623 } 624 DataFrame df; 625 do { 626 df = getDataFrame(); 627 // TODO: check accumulated content length (if not checked below) 628 connection.sendFrame(df); 629 } while (!df.getFlag(DataFrame.END_STREAM)); 630 requestSent(); 631 } 632 633 @Override 634 void cancel() { 635 cancelImpl(new Exception("Cancelled")); 636 } 637 638 639 void cancelImpl(Throwable e) { 640 Log.logTrace("cancelling stream: {0}\n", e.toString()); 641 inputQ.close(); 642 completeResponseExceptionally(e); 643 try { 644 connection.resetStream(streamid, ResetFrame.CANCEL); 645 } catch (IOException | InterruptedException ex) { 646 Log.logError(ex); 647 } 648 } 649 650 @Override 651 CompletableFuture<Void> sendRequestAsync() { 652 CompletableFuture<Void> cf = new CompletableFuture<>(); 653 executor.execute(() -> { 654 try { 655 sendRequest(); 656 cf.complete(null); 657 } catch (IOException |InterruptedException e) { 658 cf.completeExceptionally(e); 659 } 660 }, null); 661 return cf; 662 } 663 664 @Override 665 <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException { 666 this.responseProcessor = processor; 667 T body = processor.onResponseBodyStart( 668 responseContentLen, responseHeaders, 669 responseFlowController); // TODO: filter headers 670 if (body == null) { 671 receiveData(); 672 body = processor.onResponseComplete(); 673 } else 674 receiveDataAsync(processor); 675 responseReceived(); 676 return body; 677 } 678 679 // called from Http2Connection reader thread 680 synchronized void updateOutgoingWindow(int update) { 681 remoteRequestFlowController.accept(update); 682 } 683 684 void close(String msg) { 685 cancel(); 686 } 687 688 static class PushedStream extends Stream { 689 final PushGroup<?> pushGroup; 690 final private Stream parent; // used by server push streams 691 // push streams need the response CF allocated up front as it is 692 // given directly to user via the multi handler callback function. 693 final CompletableFuture<HttpResponseImpl> pushCF; 694 final HttpRequestImpl pushReq; 695 696 PushedStream(PushGroup<?> pushGroup, HttpClientImpl client, 697 Http2Connection connection, Stream parent, 698 HttpRequestImpl pushReq) { 699 super(client, connection, pushReq); 700 this.pushGroup = pushGroup; 701 this.pushReq = pushReq; 702 this.pushCF = new CompletableFuture<>(); 703 this.parent = parent; 704 } 705 706 // Following methods call the super class but in case of 707 // error record it in the PushGroup. The error method is called 708 // with a null value when no error occurred (is a no-op) 709 @Override 710 CompletableFuture<Void> sendBodyAsync() { 711 return super.sendBodyAsync() 712 .whenComplete((v, t) -> pushGroup.pushError(t)); 713 } 714 715 @Override 716 CompletableFuture<Void> sendHeadersAsync() { 717 return super.sendHeadersAsync() 718 .whenComplete((v, t) -> pushGroup.pushError(t)); 719 } 720 721 @Override 722 CompletableFuture<Void> sendRequestAsync() { 723 return super.sendRequestAsync() 724 .whenComplete((v, t) -> pushGroup.pushError(t)); 725 } 726 727 @Override 728 CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) { 729 return pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); 730 } 731 732 @Override 733 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) { 734 return super.responseBodyAsync(processor) 735 .whenComplete((v, t) -> pushGroup.pushError(t)); 736 } 737 738 @Override 739 void completeResponse(HttpResponse r) { 740 HttpResponseImpl resp = (HttpResponseImpl)r; 741 Utils.logResponse(resp); 742 pushCF.complete(resp); 743 } 744 745 @Override 746 void completeResponseExceptionally(Throwable t) { 747 pushCF.completeExceptionally(t); 748 } 749 750 @Override 751 synchronized void responseReceived() { 752 super.responseReceived(); 753 pushGroup.pushCompleted(); 754 } 755 756 // create and return the PushResponseImpl 757 @Override 758 protected void handleResponse() { 759 HttpConnection c = connection.connection; // TODO: improve 760 long statusCode = responseHeaders 761 .firstValueAsLong(":status") 762 .orElse(-1L); 763 764 if (statusCode == -1L) 765 completeResponseExceptionally(new IOException("No status code")); 766 ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS); 767 this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this, 768 c.sslParameters()); 769 this.responseContentLen = responseHeaders 770 .firstValueAsLong("content-length") 771 .orElse(-1L); 772 // different implementations for normal streams and pushed streams 773 completeResponse(response); 774 } 775 } 776 777 /** 778 * One PushGroup object is associated with the parent Stream of 779 * the pushed Streams. This keeps track of all common state associated 780 * with the pushes. 781 */ 782 static class PushGroup<T> { 783 // the overall completion object, completed when all pushes are done. 784 final CompletableFuture<T> resultCF; 785 Throwable error; // any exception that occured during pushes 786 787 // CF for main response 788 final CompletableFuture<HttpResponse> mainResponse; 789 790 // user's processor object 791 final HttpResponse.MultiProcessor<T> multiProcessor; 792 793 // per push handler function provided by processor 794 final private BiFunction<HttpRequest, 795 CompletableFuture<HttpResponse>, 796 Boolean> pushHandler; 797 int numberOfPushes; 798 int remainingPushes; 799 boolean noMorePushes = false; 800 801 PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) { 802 this.resultCF = new CompletableFuture<>(); 803 this.mainResponse = new CompletableFuture<>(); 804 this.multiProcessor = multiProcessor; 805 this.pushHandler = multiProcessor.onStart(req, mainResponse); 806 } 807 808 CompletableFuture<T> groupResult() { 809 return resultCF; 810 } 811 812 CompletableFuture<HttpResponse> mainResponse() { 813 return mainResponse; 814 } 815 816 private BiFunction<HttpRequest, 817 CompletableFuture<HttpResponse>, Boolean> pushHandler() 818 { 819 return pushHandler; 820 } 821 822 synchronized void addPush() { 823 numberOfPushes++; 824 remainingPushes++; 825 } 826 827 synchronized int numberOfPushes() { 828 return numberOfPushes; 829 } 830 // This is called when the main body response completes because it means 831 // no more PUSH_PROMISEs are possible 832 synchronized void noMorePushes() { 833 noMorePushes = true; 834 checkIfCompleted(); 835 } 836 837 synchronized void pushCompleted() { 838 remainingPushes--; 839 checkIfCompleted(); 840 } 841 842 synchronized void checkIfCompleted() { 843 if (remainingPushes == 0 && error == null && noMorePushes) { 844 T overallResult = multiProcessor.onComplete(); 845 resultCF.complete(overallResult); 846 } 847 } 848 849 synchronized void pushError(Throwable t) { 850 if (t == null) 851 return; 852 this.error = t; 853 resultCF.completeExceptionally(t); 854 } 855 } 856 }