1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.net.http; 27 28 import sun.net.httpclient.hpack.DecodingCallback; 29 30 import java.io.IOException; 31 import java.net.URI; 32 import java.nio.ByteBuffer; 33 import java.util.LinkedList; 34 import java.util.ArrayList; 35 import java.util.List; 36 import java.util.concurrent.CompletableFuture; 37 import java.util.concurrent.CompletionException; 38 import java.util.concurrent.ExecutionException; 39 import java.util.concurrent.TimeUnit; 40 import java.util.concurrent.TimeoutException; 41 import java.util.function.BiFunction; 42 import java.util.function.LongConsumer; 43 44 /** 45 * Http/2 Stream handling. 46 * 47 * REQUESTS 48 * 49 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q 50 * 51 * sendRequest() -- sendHeadersOnly() + sendBody() 52 * 53 * sendBody() -- in calling thread: obeys all flow control (so may block) 54 * obtains data from request body processor and places on connection 55 * outbound Q. 56 * 57 * sendBodyAsync() -- calls sendBody() in an executor thread. 58 * 59 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block 60 * 61 * sendRequestAsync() -- calls sendRequest() in an executor thread 62 * 63 * RESPONSES 64 * 65 * Multiple responses can be received per request. Responses are queued up on 66 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed 67 * with the next response 68 * 69 * getResponseAsync() -- queries list of response CFs and returns first one 70 * if one exists. Otherwise, creates one and adds it to list 71 * and returns it. Completion is achieved through the 72 * incoming() upcall from connection reader thread. 73 * 74 * getResponse() -- calls getResponseAsync() and waits for CF to complete 75 * 76 * responseBody() -- in calling thread: blocks for incoming DATA frames on 77 * stream inputQ. Obeys remote and local flow control so may block. 78 * Calls user response body processor with data buffers. 79 * 80 * responseBodyAsync() -- calls responseBody() in an executor thread. 81 * 82 * incoming() -- entry point called from connection reader thread. Frames are 83 * either handled immediately without blocking or for data frames 84 * placed on the stream's inputQ which is consumed by the stream's 85 * reader thread. 86 * 87 * PushedStream sub class 88 * ====================== 89 * Sending side methods are not used because the request comes from a PUSH_PROMISE 90 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream 91 * is created. PushedStream does not use responseCF list as there can be only 92 * one response. The CF is created when the object created and when the response 93 * HEADERS frame is received the object is completed. 94 */ 95 class Stream extends ExchangeImpl { 96 97 final Queue<Http2Frame> inputQ; 98 99 volatile int streamid; 100 101 long responseContentLen = -1; 102 long responseBytesProcessed = 0; 103 long requestContentLen; 104 105 Http2Connection connection; 106 HttpClientImpl client; 107 final HttpRequestImpl request; 108 final DecodingCallback rspHeadersConsumer; 109 HttpHeadersImpl responseHeaders; 110 final HttpHeadersImpl requestHeaders; 111 final HttpHeadersImpl requestPseudoHeaders; 112 HttpResponse.BodyProcessor<?> responseProcessor; 113 final HttpRequest.BodyProcessor requestProcessor; 114 HttpResponse response; 115 116 // state flags 117 boolean requestSent, responseReceived; 118 119 final FlowController userRequestFlowController = 120 new FlowController(); 121 final FlowController responseFlowController = 122 new FlowController(); 123 final WindowControl outgoingWindow = new WindowControl(); 124 125 final ExecutorWrapper executor; 126 127 @Override 128 @SuppressWarnings("unchecked") 129 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) { 130 this.responseProcessor = processor; 131 CompletableFuture<T> cf; 132 try { 133 T body = processor.onResponseBodyStart( 134 responseContentLen, responseHeaders, 135 responseFlowController); // TODO: filter headers 136 if (body != null) { 137 cf = CompletableFuture.completedFuture(body); 138 receiveDataAsync(processor); 139 } else 140 cf = receiveDataAsync(processor); 141 } catch (IOException e) { 142 cf = CompletableFuture.failedFuture(e); 143 } 144 PushGroup<?> pg = request.pushGroup(); 145 if (pg != null) { 146 // if an error occurs make sure it is recorded in the PushGroup 147 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 148 } 149 return cf; 150 } 151 152 @Override 153 public String toString() { 154 StringBuilder sb = new StringBuilder(); 155 sb.append("streamid: ") 156 .append(streamid); 157 return sb.toString(); 158 } 159 160 // pushes entire response body into response processor 161 // blocking when required by local or remote flow control 162 void receiveData() throws IOException { 163 Http2Frame frame; 164 DataFrame df = null; 165 try { 166 do { 167 frame = inputQ.take(); 168 if (!(frame instanceof DataFrame)) { 169 assert false; 170 continue; 171 } 172 df = (DataFrame) frame; 173 int len = df.getDataLength(); 174 ByteBuffer[] buffers = df.getData(); 175 for (ByteBuffer b : buffers) { 176 responseFlowController.take(); 177 responseProcessor.onResponseBodyChunk(b); 178 } 179 sendWindowUpdate(len); 180 } while (!df.getFlag(DataFrame.END_STREAM)); 181 } catch (InterruptedException e) { 182 throw new IOException(e); 183 } 184 } 185 186 private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) { 187 CompletableFuture<T> cf = new CompletableFuture<>(); 188 executor.execute(() -> { 189 try { 190 receiveData(); 191 T body = processor.onResponseComplete(); 192 cf.complete(body); 193 responseReceived(); 194 } catch (Throwable t) { 195 cf.completeExceptionally(t); 196 } 197 }, null); 198 return cf; 199 } 200 201 private void sendWindowUpdate(int increment) 202 throws IOException, InterruptedException { 203 if (increment == 0) 204 return; 205 LinkedList<Http2Frame> list = new LinkedList<>(); 206 WindowUpdateFrame frame = new WindowUpdateFrame(); 207 frame.streamid(streamid); 208 frame.setUpdate(increment); 209 list.add(frame); 210 frame = new WindowUpdateFrame(); 211 frame.streamid(0); 212 frame.setUpdate(increment); 213 list.add(frame); 214 connection.sendFrames(list); 215 } 216 217 @Override 218 CompletableFuture<Void> sendBodyAsync() { 219 final CompletableFuture<Void> cf = new CompletableFuture<>(); 220 executor.execute(() -> { 221 try { 222 sendBodyImpl(); 223 cf.complete(null); 224 } catch (IOException | InterruptedException e) { 225 cf.completeExceptionally(e); 226 } 227 }, null); 228 return cf; 229 } 230 231 @SuppressWarnings("unchecked") 232 Stream(HttpClientImpl client, Http2Connection connection, Exchange e) { 233 super(e); 234 this.client = client; 235 this.connection = connection; 236 this.request = e.request(); 237 this.requestProcessor = request.requestProcessor(); 238 responseHeaders = new HttpHeadersImpl(); 239 requestHeaders = new HttpHeadersImpl(); 240 rspHeadersConsumer = (name, value) -> { 241 responseHeaders.addHeader(name.toString(), value.toString()); 242 }; 243 this.executor = client.executorWrapper(); 244 //this.response_cf = new CompletableFuture<HttpResponseImpl>(); 245 this.requestPseudoHeaders = new HttpHeadersImpl(); 246 // NEW 247 this.inputQ = new Queue<>(); 248 } 249 250 @SuppressWarnings("unchecked") 251 Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) { 252 super(null); 253 this.client = client; 254 this.connection = connection; 255 this.request = req; 256 this.requestProcessor = null; 257 responseHeaders = new HttpHeadersImpl(); 258 requestHeaders = new HttpHeadersImpl(); 259 rspHeadersConsumer = (name, value) -> { 260 responseHeaders.addHeader(name.toString(), value.toString()); 261 }; 262 this.executor = client.executorWrapper(); 263 //this.response_cf = new CompletableFuture<HttpResponseImpl>(); 264 this.requestPseudoHeaders = new HttpHeadersImpl(); 265 // NEW 266 this.inputQ = new Queue<>(); 267 } 268 269 /** 270 * Entry point from Http2Connection reader thread. 271 * 272 * Data frames will be removed by response body thread. 273 * 274 * @param frame 275 * @throws IOException 276 */ 277 void incoming(Http2Frame frame) throws IOException, InterruptedException { 278 if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) { 279 // Complete headers accumulated. handle response. 280 // It's okay if there are multiple HeaderFrames. 281 handleResponse(); 282 } else if (frame instanceof DataFrame) { 283 inputQ.put(frame); 284 } else { 285 otherFrame(frame); 286 } 287 } 288 289 void otherFrame(Http2Frame frame) throws IOException { 290 switch (frame.type()) { 291 case WindowUpdateFrame.TYPE: 292 incoming_windowUpdate((WindowUpdateFrame) frame); 293 break; 294 case ResetFrame.TYPE: 295 incoming_reset((ResetFrame) frame); 296 break; 297 case PriorityFrame.TYPE: 298 incoming_priority((PriorityFrame) frame); 299 break; 300 default: 301 String msg = "Unexpected frame: " + frame.toString(); 302 throw new IOException(msg); 303 } 304 } 305 306 // The Hpack decoder decodes into one of these consumers of name,value pairs 307 308 DecodingCallback rspHeadersConsumer() { 309 return rspHeadersConsumer; 310 } 311 312 // create and return the HttpResponseImpl 313 protected void handleResponse() throws IOException { 314 HttpConnection c = connection.connection; // TODO: improve 315 long statusCode = responseHeaders 316 .firstValueAsLong(":status") 317 .orElseThrow(() -> new IOException("no statuscode in response")); 318 319 this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null, 320 c.sslParameters(), HttpClient.Version.HTTP_2, c); 321 this.responseContentLen = responseHeaders 322 .firstValueAsLong("content-length") 323 .orElse(-1L); 324 // different implementations for normal streams and pushed streams 325 completeResponse(response); 326 } 327 328 void incoming_reset(ResetFrame frame) { 329 // TODO: implement reset 330 int error = frame.getErrorCode(); 331 IOException e = new IOException(ErrorFrame.stringForCode(error)); 332 completeResponseExceptionally(e); 333 throw new UnsupportedOperationException("Not implemented"); 334 } 335 336 void incoming_priority(PriorityFrame frame) { 337 // TODO: implement priority 338 throw new UnsupportedOperationException("Not implemented"); 339 } 340 341 void incoming_windowUpdate(WindowUpdateFrame frame) { 342 outgoingWindow.update(frame.getUpdate()); 343 } 344 345 void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException { 346 if (Log.requests()) { 347 Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); 348 } 349 PushGroup<?> pushGroup = request.pushGroup(); 350 if (pushGroup == null) { 351 cancelImpl(new IllegalStateException("unexpected push promise")); 352 } 353 // get the handler and call it. 354 BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph = 355 pushGroup.pushHandler(); 356 357 CompletableFuture<HttpResponse> pushCF = pushStream 358 .getResponseAsync(null) 359 .thenApply(r -> (HttpResponse)r); 360 boolean accept = ph.apply(pushReq, pushCF); 361 if (!accept) { 362 IOException ex = new IOException("Stream cancelled by user"); 363 cancelImpl(ex); 364 pushCF.completeExceptionally(ex); 365 } else { 366 pushStream.requestSent(); 367 pushGroup.addPush(); 368 } 369 } 370 371 private OutgoingHeaders headerFrame(long contentLength) { 372 HttpHeadersImpl h = request.getSystemHeaders(); 373 if (contentLength > 0) { 374 h.setHeader("content-length", Long.toString(contentLength)); 375 } 376 setPseudoHeaderFields(); 377 OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this); 378 if (contentLength == 0) { 379 f.setFlag(HeadersFrame.END_STREAM); 380 } 381 return f; 382 } 383 384 private void setPseudoHeaderFields() { 385 HttpHeadersImpl hdrs = requestPseudoHeaders; 386 String method = request.method(); 387 hdrs.setHeader(":method", method); 388 URI uri = request.uri(); 389 hdrs.setHeader(":scheme", uri.getScheme()); 390 // TODO: userinfo deprecated. Needs to be removed 391 hdrs.setHeader(":authority", uri.getAuthority()); 392 // TODO: ensure header names beginning with : not in user headers 393 String query = uri.getQuery(); 394 String path = uri.getPath(); 395 if (path == null) { 396 if (method.equalsIgnoreCase("OPTIONS")) { 397 path = "*"; 398 } else { 399 path = "/"; 400 } 401 } 402 if (query != null) { 403 path += "?" + query; 404 } 405 hdrs.setHeader(":path", path); 406 } 407 408 HttpHeadersImpl getRequestPseudoHeaders() { 409 return requestPseudoHeaders; 410 } 411 412 @Override 413 HttpResponseImpl getResponse() throws IOException { 414 try { 415 if (request.timeval() > 0) { 416 return getResponseAsync(null).get( 417 request.timeval(), TimeUnit.MILLISECONDS); 418 } else { 419 return getResponseAsync(null).join(); 420 } 421 } catch (TimeoutException e) { 422 throw new HttpTimeoutException("Response timed out"); 423 } catch (InterruptedException | ExecutionException | CompletionException e) { 424 Throwable t = e.getCause(); 425 if (t instanceof IOException) { 426 throw (IOException)t; 427 } 428 throw new IOException(e); 429 } 430 } 431 432 @Override 433 void sendRequest() throws IOException, InterruptedException { 434 sendHeadersOnly(); 435 sendBody(); 436 } 437 438 /** 439 * A simple general purpose blocking flow controller 440 */ 441 class FlowController implements LongConsumer { 442 int permits; 443 444 FlowController() { 445 this.permits = 0; 446 } 447 448 @Override 449 public synchronized void accept(long n) { 450 if (n < 1) { 451 throw new InternalError("FlowController.accept called with " + n); 452 } 453 if (permits == 0) { 454 permits += n; 455 notifyAll(); 456 } else { 457 permits += n; 458 } 459 } 460 461 public synchronized void take() throws InterruptedException { 462 take(1); 463 } 464 465 public synchronized void take(int amount) throws InterruptedException { 466 assert permits >= 0; 467 while (amount > 0) { 468 int n = Math.min(amount, permits); 469 permits -= n; 470 amount -= n; 471 if (amount > 0) 472 wait(); 473 } 474 } 475 } 476 477 @Override 478 void sendHeadersOnly() throws IOException, InterruptedException { 479 if (Log.requests() && request != null) { 480 Log.logRequest(request.toString()); 481 } 482 requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController); 483 OutgoingHeaders f = headerFrame(requestContentLen); 484 connection.sendFrame(f); 485 } 486 487 @Override 488 void sendBody() throws IOException, InterruptedException { 489 sendBodyImpl(); 490 } 491 492 void registerStream(int id) { 493 this.streamid = id; 494 connection.putStream(this, streamid); 495 } 496 497 DataFrame getDataFrame() throws IOException, InterruptedException { 498 userRequestFlowController.take(); 499 int maxpayloadLen = connection.getMaxSendFrameSize(); 500 ByteBuffer buffer = connection.getBuffer(); 501 buffer.limit(maxpayloadLen); 502 boolean complete = requestProcessor.onRequestBodyChunk(buffer); 503 buffer.flip(); 504 int amount = buffer.remaining(); 505 // wait for flow control if necessary. Following method will block 506 // until after headers frame is sent, so correct streamid is set. 507 outgoingWindow.acquire(amount); 508 connection.connectionSendWindow.acquire(amount); 509 DataFrame df = new DataFrame(); 510 df.streamid(streamid); 511 if (complete) { 512 df.setFlag(DataFrame.END_STREAM); 513 } 514 df.setData(buffer); 515 df.computeLength(); 516 return df; 517 } 518 519 520 @Override 521 CompletableFuture<Void> sendHeadersAsync() { 522 try { 523 sendHeadersOnly(); 524 return CompletableFuture.completedFuture(null); 525 } catch (IOException | InterruptedException ex) { 526 return CompletableFuture.failedFuture(ex); 527 } 528 } 529 530 /** 531 * A List of responses relating to this stream. Normally there is only 532 * one response, but intermediate responses like 100 are allowed 533 * and must be passed up to higher level before continuing. Deals with races 534 * such as if responses are returned before the CFs get created by 535 * getResponseAsync() 536 */ 537 538 final List<CompletableFuture<HttpResponseImpl>> response_cfs = new ArrayList<>(5); 539 540 @Override 541 CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) { 542 CompletableFuture<HttpResponseImpl> cf; 543 synchronized (response_cfs) { 544 if (!response_cfs.isEmpty()) { 545 cf = response_cfs.remove(0); 546 } else { 547 cf = new CompletableFuture<>(); 548 response_cfs.add(cf); 549 } 550 } 551 PushGroup<?> pg = request.pushGroup(); 552 if (pg != null) { 553 // if an error occurs make sure it is recorded in the PushGroup 554 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 555 } 556 return cf; 557 } 558 559 /** 560 * Completes the first uncompleted CF on list, and removes it. If there is no 561 * uncompleted CF then creates one (completes it) and adds to list 562 */ 563 void completeResponse(HttpResponse r) { 564 HttpResponseImpl resp = (HttpResponseImpl)r; 565 synchronized (response_cfs) { 566 int cfs_len = response_cfs.size(); 567 for (int i=0; i<cfs_len; i++) { 568 CompletableFuture<HttpResponseImpl> cf = response_cfs.get(i); 569 if (!cf.isDone()) { 570 cf.complete(resp); 571 response_cfs.remove(cf); 572 return; 573 } 574 } 575 response_cfs.add(CompletableFuture.completedFuture(resp)); 576 } 577 } 578 579 // methods to update state and remove stream when finished 580 581 synchronized void requestSent() { 582 requestSent = true; 583 if (responseReceived) 584 connection.deleteStream(this); 585 } 586 587 synchronized void responseReceived() { 588 responseReceived = true; 589 if (requestSent) 590 connection.deleteStream(this); 591 PushGroup<?> pg = request.pushGroup(); 592 if (pg != null) 593 pg.noMorePushes(); 594 } 595 596 /** 597 * same as above but for errors 598 * 599 * @param t 600 */ 601 void completeResponseExceptionally(Throwable t) { 602 synchronized (response_cfs) { 603 for (CompletableFuture<HttpResponseImpl> cf : response_cfs) { 604 if (!cf.isDone()) { 605 cf.completeExceptionally(t); 606 response_cfs.remove(cf); 607 return; 608 } 609 } 610 response_cfs.add(CompletableFuture.failedFuture(t)); 611 } 612 } 613 614 void sendBodyImpl() throws IOException, InterruptedException { 615 if (requestContentLen == 0) { 616 // no body 617 requestSent(); 618 return; 619 } 620 DataFrame df; 621 do { 622 df = getDataFrame(); 623 // TODO: check accumulated content length (if not checked below) 624 connection.sendFrame(df); 625 } while (!df.getFlag(DataFrame.END_STREAM)); 626 requestSent(); 627 } 628 629 @Override 630 void cancel() { 631 cancelImpl(new Exception("Cancelled")); 632 } 633 634 635 void cancelImpl(Throwable e) { 636 Log.logTrace("cancelling stream: {0}\n", e.toString()); 637 inputQ.close(); 638 completeResponseExceptionally(e); 639 try { 640 connection.resetStream(streamid, ResetFrame.CANCEL); 641 } catch (IOException | InterruptedException ex) { 642 Log.logError(ex); 643 } 644 } 645 646 @Override 647 CompletableFuture<Void> sendRequestAsync() { 648 CompletableFuture<Void> cf = new CompletableFuture<>(); 649 executor.execute(() -> { 650 try { 651 sendRequest(); 652 cf.complete(null); 653 } catch (IOException |InterruptedException e) { 654 cf.completeExceptionally(e); 655 } 656 }, null); 657 return cf; 658 } 659 660 @Override 661 <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException { 662 this.responseProcessor = processor; 663 T body = processor.onResponseBodyStart( 664 responseContentLen, responseHeaders, 665 responseFlowController); // TODO: filter headers 666 if (body == null) { 667 receiveData(); 668 body = processor.onResponseComplete(); 669 } else 670 receiveDataAsync(processor); 671 responseReceived(); 672 return body; 673 } 674 675 // called from Http2Connection reader thread 676 void updateOutgoingWindow(int update) { 677 outgoingWindow.update(update); 678 } 679 680 void close(String msg) { 681 cancel(); 682 } 683 684 static class PushedStream extends Stream { 685 final PushGroup<?> pushGroup; 686 final private Stream parent; // used by server push streams 687 // push streams need the response CF allocated up front as it is 688 // given directly to user via the multi handler callback function. 689 final CompletableFuture<HttpResponseImpl> pushCF; 690 final HttpRequestImpl pushReq; 691 692 PushedStream(PushGroup<?> pushGroup, HttpClientImpl client, 693 Http2Connection connection, Stream parent, 694 HttpRequestImpl pushReq) { 695 super(client, connection, pushReq); 696 this.pushGroup = pushGroup; 697 this.pushReq = pushReq; 698 this.pushCF = new CompletableFuture<>(); 699 this.parent = parent; 700 } 701 702 // Following methods call the super class but in case of 703 // error record it in the PushGroup. The error method is called 704 // with a null value when no error occurred (is a no-op) 705 @Override 706 CompletableFuture<Void> sendBodyAsync() { 707 return super.sendBodyAsync() 708 .whenComplete((v, t) -> pushGroup.pushError(t)); 709 } 710 711 @Override 712 CompletableFuture<Void> sendHeadersAsync() { 713 return super.sendHeadersAsync() 714 .whenComplete((v, t) -> pushGroup.pushError(t)); 715 } 716 717 @Override 718 CompletableFuture<Void> sendRequestAsync() { 719 return super.sendRequestAsync() 720 .whenComplete((v, t) -> pushGroup.pushError(t)); 721 } 722 723 @Override 724 CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) { 725 return pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); 726 } 727 728 @Override 729 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) { 730 return super.responseBodyAsync(processor) 731 .whenComplete((v, t) -> pushGroup.pushError(t)); 732 } 733 734 @Override 735 void completeResponse(HttpResponse r) { 736 HttpResponseImpl resp = (HttpResponseImpl)r; 737 Utils.logResponse(resp); 738 pushCF.complete(resp); 739 } 740 741 @Override 742 void completeResponseExceptionally(Throwable t) { 743 pushCF.completeExceptionally(t); 744 } 745 746 @Override 747 synchronized void responseReceived() { 748 super.responseReceived(); 749 pushGroup.pushCompleted(); 750 } 751 752 // create and return the PushResponseImpl 753 @Override 754 protected void handleResponse() { 755 HttpConnection c = connection.connection; // TODO: improve 756 long statusCode = responseHeaders 757 .firstValueAsLong(":status") 758 .orElse(-1L); 759 760 if (statusCode == -1L) 761 completeResponseExceptionally(new IOException("No status code")); 762 ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS); 763 this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this, 764 c.sslParameters()); 765 this.responseContentLen = responseHeaders 766 .firstValueAsLong("content-length") 767 .orElse(-1L); 768 // different implementations for normal streams and pushed streams 769 completeResponse(response); 770 } 771 } 772 773 /** 774 * One PushGroup object is associated with the parent Stream of 775 * the pushed Streams. This keeps track of all common state associated 776 * with the pushes. 777 */ 778 static class PushGroup<T> { 779 // the overall completion object, completed when all pushes are done. 780 final CompletableFuture<T> resultCF; 781 Throwable error; // any exception that occured during pushes 782 783 // CF for main response 784 final CompletableFuture<HttpResponse> mainResponse; 785 786 // user's processor object 787 final HttpResponse.MultiProcessor<T> multiProcessor; 788 789 // per push handler function provided by processor 790 final private BiFunction<HttpRequest, 791 CompletableFuture<HttpResponse>, 792 Boolean> pushHandler; 793 int numberOfPushes; 794 int remainingPushes; 795 boolean noMorePushes = false; 796 797 PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) { 798 this.resultCF = new CompletableFuture<>(); 799 this.mainResponse = new CompletableFuture<>(); 800 this.multiProcessor = multiProcessor; 801 this.pushHandler = multiProcessor.onStart(req, mainResponse); 802 } 803 804 CompletableFuture<T> groupResult() { 805 return resultCF; 806 } 807 808 CompletableFuture<HttpResponse> mainResponse() { 809 return mainResponse; 810 } 811 812 private BiFunction<HttpRequest, 813 CompletableFuture<HttpResponse>, Boolean> pushHandler() 814 { 815 return pushHandler; 816 } 817 818 synchronized void addPush() { 819 numberOfPushes++; 820 remainingPushes++; 821 } 822 823 synchronized int numberOfPushes() { 824 return numberOfPushes; 825 } 826 // This is called when the main body response completes because it means 827 // no more PUSH_PROMISEs are possible 828 synchronized void noMorePushes() { 829 noMorePushes = true; 830 checkIfCompleted(); 831 } 832 833 synchronized void pushCompleted() { 834 remainingPushes--; 835 checkIfCompleted(); 836 } 837 838 synchronized void checkIfCompleted() { 839 if (remainingPushes == 0 && error == null && noMorePushes) { 840 T overallResult = multiProcessor.onComplete(); 841 resultCF.complete(overallResult); 842 } 843 } 844 845 synchronized void pushError(Throwable t) { 846 if (t == null) 847 return; 848 this.error = t; 849 resultCF.completeExceptionally(t); 850 } 851 } 852 }