1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 */ 24 25 package java.net.http; 26 27 import sun.net.httpclient.hpack.DecodingCallback; 28 29 import java.io.IOException; 30 import java.net.URI; 31 import java.nio.ByteBuffer; 32 import java.util.LinkedList; 33 import java.util.ArrayList; 34 import java.util.List; 35 import java.util.concurrent.CompletableFuture; 36 import java.util.concurrent.CompletionException; 37 import java.util.concurrent.ExecutionException; 38 import java.util.concurrent.TimeUnit; 39 import java.util.concurrent.TimeoutException; 40 import java.util.function.BiFunction; 41 import java.util.function.LongConsumer; 42 43 /** 44 * Http/2 Stream handling. 45 * 46 * REQUESTS 47 * 48 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q 49 * 50 * sendRequest() -- sendHeadersOnly() + sendBody() 51 * 52 * sendBody() -- in calling thread: obeys all flow control (so may block) 53 * obtains data from request body processor and places on connection 54 * outbound Q. 55 * 56 * sendBodyAsync() -- calls sendBody() in an executor thread. 57 * 58 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block 59 * 60 * sendRequestAsync() -- calls sendRequest() in an executor thread 61 * 62 * RESPONSES 63 * 64 * Multiple responses can be received per request. Responses are queued up on 65 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed 66 * with the next response 67 * 68 * getResponseAsync() -- queries list of response CFs and returns first one 69 * if one exists. Otherwise, creates one and adds it to list 70 * and returns it. Completion is achieved through the 71 * incoming() upcall from connection reader thread. 72 * 73 * getResponse() -- calls getResponseAsync() and waits for CF to complete 74 * 75 * responseBody() -- in calling thread: blocks for incoming DATA frames on 76 * stream inputQ. Obeys remote and local flow control so may block. 77 * Calls user response body processor with data buffers. 78 * 79 * responseBodyAsync() -- calls responseBody() in an executor thread. 80 * 81 * incoming() -- entry point called from connection reader thread. Frames are 82 * either handled immediately without blocking or for data frames 83 * placed on the stream's inputQ which is consumed by the stream's 84 * reader thread. 85 * 86 * PushedStream sub class 87 * ====================== 88 * Sending side methods are not used because the request comes from a PUSH_PROMISE 89 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream 90 * is created. PushedStream does not use responseCF list as there can be only 91 * one response. The CF is created when the object created and when the response 92 * HEADERS frame is received the object is completed. 93 */ 94 class Stream extends ExchangeImpl { 95 96 final Queue<Http2Frame> inputQ; 97 98 volatile int streamid; 99 100 long responseContentLen = -1; 101 long responseBytesProcessed = 0; 102 long requestContentLen; 103 104 Http2Connection connection; 105 HttpClientImpl client; 106 final HttpRequestImpl request; 107 final DecodingCallback rspHeadersConsumer; 108 HttpHeadersImpl responseHeaders; 109 final HttpHeadersImpl requestHeaders; 110 final HttpHeadersImpl requestPseudoHeaders; 111 HttpResponse.BodyProcessor<?> responseProcessor; 112 final HttpRequest.BodyProcessor requestProcessor; 113 HttpResponse response; 114 115 // state flags 116 boolean requestSent, responseReceived; 117 118 final FlowController userRequestFlowController = 119 new FlowController(); 120 final FlowController remoteRequestFlowController = 121 new FlowController(); 122 final FlowController responseFlowController = 123 new FlowController(); 124 125 final ExecutorWrapper executor; 126 127 @Override 128 @SuppressWarnings("unchecked") 129 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) { 130 this.responseProcessor = processor; 131 CompletableFuture<T> cf; 132 try { 133 T body = processor.onResponseBodyStart( 134 responseContentLen, responseHeaders, 135 responseFlowController); // TODO: filter headers 136 if (body != null) { 137 cf = CompletableFuture.completedFuture(body); 138 receiveDataAsync(processor); 139 } else 140 cf = receiveDataAsync(processor); 141 } catch (IOException e) { 142 cf = CompletableFuture.failedFuture(e); 143 } 144 PushGroup<?> pg = request.pushGroup(); 145 if (pg != null) { 146 // if an error occurs make sure it is recorded in the PushGroup 147 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 148 } 149 return cf; 150 } 151 152 @Override 153 public String toString() { 154 StringBuilder sb = new StringBuilder(); 155 sb.append("streamid: ") 156 .append(streamid); 157 return sb.toString(); 158 } 159 160 // pushes entire response body into response processor 161 // blocking when required by local or remote flow control 162 void receiveData() throws IOException { 163 Http2Frame frame; 164 DataFrame df = null; 165 try { 166 do { 167 frame = inputQ.take(); 168 if (!(frame instanceof DataFrame)) { 169 assert false; 170 continue; 171 } 172 df = (DataFrame) frame; 173 int len = df.getDataLength(); 174 ByteBuffer[] buffers = df.getData(); 175 for (ByteBuffer b : buffers) { 176 responseFlowController.take(); 177 responseProcessor.onResponseBodyChunk(b); 178 } 179 sendWindowUpdate(len); 180 } while (!df.getFlag(DataFrame.END_STREAM)); 181 } catch (InterruptedException e) { 182 throw new IOException(e); 183 } 184 } 185 186 private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) { 187 CompletableFuture<T> cf = new CompletableFuture<>(); 188 executor.execute(() -> { 189 try { 190 receiveData(); 191 T body = processor.onResponseComplete(); 192 cf.complete(body); 193 responseReceived(); 194 } catch (Throwable t) { 195 cf.completeExceptionally(t); 196 } 197 }, null); 198 return cf; 199 } 200 201 private void sendWindowUpdate(int increment) 202 throws IOException, InterruptedException { 203 if (increment == 0) 204 return; 205 LinkedList<Http2Frame> list = new LinkedList<>(); 206 WindowUpdateFrame frame = new WindowUpdateFrame(); 207 frame.streamid(streamid); 208 frame.setUpdate(increment); 209 list.add(frame); 210 frame = new WindowUpdateFrame(); 211 frame.streamid(0); 212 frame.setUpdate(increment); 213 list.add(frame); 214 connection.sendFrames(list); 215 } 216 217 @Override 218 CompletableFuture<Void> sendBodyAsync() { 219 final CompletableFuture<Void> cf = new CompletableFuture<>(); 220 executor.execute(() -> { 221 try { 222 sendBodyImpl(); 223 cf.complete(null); 224 } catch (IOException | InterruptedException e) { 225 cf.completeExceptionally(e); 226 } 227 }, null); 228 return cf; 229 } 230 231 @SuppressWarnings("unchecked") 232 Stream(HttpClientImpl client, Http2Connection connection, Exchange e) { 233 super(e); 234 this.client = client; 235 this.connection = connection; 236 this.request = e.request(); 237 this.requestProcessor = request.requestProcessor(); 238 responseHeaders = new HttpHeadersImpl(); 239 requestHeaders = new HttpHeadersImpl(); 240 rspHeadersConsumer = (name, value) -> { 241 responseHeaders.addHeader(name.toString(), value.toString()); 242 }; 243 this.executor = client.executorWrapper(); 244 //this.response_cf = new CompletableFuture<HttpResponseImpl>(); 245 this.requestPseudoHeaders = new HttpHeadersImpl(); 246 // NEW 247 this.inputQ = new Queue<>(); 248 } 249 250 @SuppressWarnings("unchecked") 251 Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) { 252 super(null); 253 this.client = client; 254 this.connection = connection; 255 this.request = req; 256 this.requestProcessor = null; 257 responseHeaders = new HttpHeadersImpl(); 258 requestHeaders = new HttpHeadersImpl(); 259 rspHeadersConsumer = (name, value) -> { 260 responseHeaders.addHeader(name.toString(), value.toString()); 261 }; 262 this.executor = client.executorWrapper(); 263 //this.response_cf = new CompletableFuture<HttpResponseImpl>(); 264 this.requestPseudoHeaders = new HttpHeadersImpl(); 265 // NEW 266 this.inputQ = new Queue<>(); 267 } 268 269 /** 270 * Entry point from Http2Connection reader thread. 271 * 272 * Data frames will be removed by response body thread. 273 * 274 * @param frame 275 * @throws IOException 276 */ 277 void incoming(Http2Frame frame) throws IOException, InterruptedException { 278 if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) { 279 // Complete headers accumulated. handle response. 280 // It's okay if there are multiple HeaderFrames. 281 handleResponse(); 282 } else if (frame instanceof DataFrame) { 283 inputQ.put(frame); 284 } else { 285 otherFrame(frame); 286 } 287 } 288 289 void otherFrame(Http2Frame frame) throws IOException { 290 switch (frame.type()) { 291 case WindowUpdateFrame.TYPE: 292 incoming_windowUpdate((WindowUpdateFrame) frame); 293 break; 294 case ResetFrame.TYPE: 295 incoming_reset((ResetFrame) frame); 296 break; 297 case PriorityFrame.TYPE: 298 incoming_priority((PriorityFrame) frame); 299 break; 300 default: 301 String msg = "Unexpected frame: " + frame.toString(); 302 throw new IOException(msg); 303 } 304 } 305 306 // The Hpack decoder decodes into one of these consumers of name,value pairs 307 308 DecodingCallback rspHeadersConsumer() { 309 return rspHeadersConsumer; 310 } 311 312 // create and return the HttpResponseImpl 313 protected void handleResponse() throws IOException { 314 HttpConnection c = connection.connection; // TODO: improve 315 long statusCode = responseHeaders 316 .firstValueAsLong(":status") 317 .orElseThrow(() -> new IOException("no statuscode in response")); 318 319 this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null, 320 c.sslParameters(), HttpClient.Version.HTTP_2, c); 321 this.responseContentLen = responseHeaders 322 .firstValueAsLong("content-length") 323 .orElse(-1L); 324 // different implementations for normal streams and pushed streams 325 completeResponse(response); 326 } 327 328 void incoming_reset(ResetFrame frame) { 329 // TODO: implement reset 330 int error = frame.getErrorCode(); 331 IOException e = new IOException(ErrorFrame.stringForCode(error)); 332 completeResponseExceptionally(e); 333 throw new UnsupportedOperationException("Not implemented"); 334 } 335 336 void incoming_priority(PriorityFrame frame) { 337 // TODO: implement priority 338 throw new UnsupportedOperationException("Not implemented"); 339 } 340 341 void incoming_windowUpdate(WindowUpdateFrame frame) { 342 int amount = frame.getUpdate(); 343 if (amount > 0) 344 remoteRequestFlowController.accept(amount); 345 } 346 347 void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException { 348 if (Log.requests()) { 349 Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); 350 } 351 PushGroup<?> pushGroup = request.pushGroup(); 352 if (pushGroup == null) { 353 cancelImpl(new IllegalStateException("unexpected push promise")); 354 } 355 // get the handler and call it. 356 BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph = 357 pushGroup.pushHandler(); 358 359 CompletableFuture<HttpResponse> pushCF = pushStream 360 .getResponseAsync(null) 361 .thenApply(r -> (HttpResponse)r); 362 boolean accept = ph.apply(pushReq, pushCF); 363 if (!accept) { 364 IOException ex = new IOException("Stream cancelled by user"); 365 cancelImpl(ex); 366 pushCF.completeExceptionally(ex); 367 } else { 368 pushStream.requestSent(); 369 pushGroup.addPush(); 370 } 371 } 372 373 private OutgoingHeaders headerFrame(long contentLength) { 374 HttpHeadersImpl h = request.getSystemHeaders(); 375 if (contentLength > 0) { 376 h.setHeader("content-length", Long.toString(contentLength)); 377 } 378 setPseudoHeaderFields(); 379 OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this); 380 if (contentLength == 0) { 381 f.setFlag(HeadersFrame.END_STREAM); 382 } 383 return f; 384 } 385 386 private void setPseudoHeaderFields() { 387 HttpHeadersImpl hdrs = requestPseudoHeaders; 388 String method = request.method(); 389 hdrs.setHeader(":method", method); 390 URI uri = request.uri(); 391 hdrs.setHeader(":scheme", uri.getScheme()); 392 // TODO: userinfo deprecated. Needs to be removed 393 hdrs.setHeader(":authority", uri.getAuthority()); 394 // TODO: ensure header names beginning with : not in user headers 395 String query = uri.getQuery(); 396 String path = uri.getPath(); 397 if (path == null) { 398 if (method.equalsIgnoreCase("OPTIONS")) { 399 path = "*"; 400 } else { 401 path = "/"; 402 } 403 } 404 if (query != null) { 405 path += "?" + query; 406 } 407 hdrs.setHeader(":path", path); 408 } 409 410 HttpHeadersImpl getRequestPseudoHeaders() { 411 return requestPseudoHeaders; 412 } 413 414 @Override 415 HttpResponseImpl getResponse() throws IOException { 416 try { 417 if (request.timeval() > 0) { 418 return getResponseAsync(null).get( 419 request.timeval(), TimeUnit.MILLISECONDS); 420 } else { 421 return getResponseAsync(null).join(); 422 } 423 } catch (TimeoutException e) { 424 throw new HttpTimeoutException("Response timed out"); 425 } catch (InterruptedException | ExecutionException | CompletionException e) { 426 Throwable t = e.getCause(); 427 if (t instanceof IOException) { 428 throw (IOException)t; 429 } 430 throw new IOException(e); 431 } 432 } 433 434 @Override 435 void sendRequest() throws IOException, InterruptedException { 436 sendHeadersOnly(); 437 sendBody(); 438 } 439 440 /** 441 * A simple general purpose blocking flow controller 442 */ 443 class FlowController implements LongConsumer { 444 int permits; 445 446 FlowController() { 447 this.permits = 0; 448 } 449 450 @Override 451 public synchronized void accept(long n) { 452 if (n < 1) { 453 throw new InternalError("FlowController.accept called with " + n); 454 } 455 if (permits == 0) { 456 permits += n; 457 notifyAll(); 458 } else { 459 permits += n; 460 } 461 } 462 463 public synchronized void take() throws InterruptedException { 464 take(1); 465 } 466 467 public synchronized void take(int amount) throws InterruptedException { 468 assert permits >= 0; 469 while (permits < amount) { 470 int n = Math.min(amount, permits); 471 permits -= n; 472 amount -= n; 473 if (amount > 0) 474 wait(); 475 } 476 } 477 } 478 479 @Override 480 void sendHeadersOnly() throws IOException, InterruptedException { 481 if (Log.requests() && request != null) { 482 Log.logRequest(request.toString()); 483 } 484 requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController); 485 OutgoingHeaders f = headerFrame(requestContentLen); 486 connection.sendFrame(f); 487 } 488 489 @Override 490 void sendBody() throws IOException, InterruptedException { 491 sendBodyImpl(); 492 } 493 494 void registerStream(int id) { 495 this.streamid = id; 496 connection.putStream(this, streamid); 497 } 498 499 DataFrame getDataFrame() throws IOException, InterruptedException { 500 userRequestFlowController.take(); 501 int maxpayloadLen = connection.getMaxSendFrameSize() - 9; 502 ByteBuffer buffer = connection.getBuffer(); 503 buffer.limit(maxpayloadLen); 504 boolean complete = requestProcessor.onRequestBodyChunk(buffer); 505 buffer.flip(); 506 int amount = buffer.remaining(); 507 // wait for flow control if necessary. Following method will block 508 // until after headers frame is sent, so correct streamid is set. 509 remoteRequestFlowController.take(amount); 510 connection.obtainSendWindow(amount); 511 512 DataFrame df = new DataFrame(); 513 df.streamid(streamid); 514 if (complete) { 515 df.setFlag(DataFrame.END_STREAM); 516 } 517 df.setData(buffer); 518 df.computeLength(); 519 return df; 520 } 521 522 523 @Override 524 CompletableFuture<Void> sendHeadersAsync() { 525 try { 526 sendHeadersOnly(); 527 return CompletableFuture.completedFuture(null); 528 } catch (IOException | InterruptedException ex) { 529 return CompletableFuture.failedFuture(ex); 530 } 531 } 532 533 /** 534 * A List of responses relating to this stream. Normally there is only 535 * one response, but intermediate responses like 100 are allowed 536 * and must be passed up to higher level before continuing. Deals with races 537 * such as if responses are returned before the CFs get created by 538 * getResponseAsync() 539 */ 540 541 final List<CompletableFuture<HttpResponseImpl>> response_cfs = new ArrayList<>(5); 542 543 @Override 544 CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) { 545 CompletableFuture<HttpResponseImpl> cf; 546 synchronized (response_cfs) { 547 if (!response_cfs.isEmpty()) { 548 cf = response_cfs.remove(0); 549 } else { 550 cf = new CompletableFuture<>(); 551 response_cfs.add(cf); 552 } 553 } 554 PushGroup<?> pg = request.pushGroup(); 555 if (pg != null) { 556 // if an error occurs make sure it is recorded in the PushGroup 557 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 558 } 559 return cf; 560 } 561 562 /** 563 * Completes the first uncompleted CF on list, and removes it. If there is no 564 * uncompleted CF then creates one (completes it) and adds to list 565 */ 566 void completeResponse(HttpResponse r) { 567 HttpResponseImpl resp = (HttpResponseImpl)r; 568 synchronized (response_cfs) { 569 int cfs_len = response_cfs.size(); 570 for (int i=0; i<cfs_len; i++) { 571 CompletableFuture<HttpResponseImpl> cf = response_cfs.get(i); 572 if (!cf.isDone()) { 573 cf.complete(resp); 574 response_cfs.remove(cf); 575 return; 576 } 577 } 578 response_cfs.add(CompletableFuture.completedFuture(resp)); 579 } 580 } 581 582 // methods to update state and remove stream when finished 583 584 synchronized void requestSent() { 585 requestSent = true; 586 if (responseReceived) 587 connection.deleteStream(this); 588 } 589 590 synchronized void responseReceived() { 591 responseReceived = true; 592 if (requestSent) 593 connection.deleteStream(this); 594 PushGroup<?> pg = request.pushGroup(); 595 if (pg != null) 596 pg.noMorePushes(); 597 } 598 599 /** 600 * same as above but for errors 601 * 602 * @param t 603 */ 604 void completeResponseExceptionally(Throwable t) { 605 synchronized (response_cfs) { 606 for (CompletableFuture<HttpResponseImpl> cf : response_cfs) { 607 if (!cf.isDone()) { 608 cf.completeExceptionally(t); 609 response_cfs.remove(cf); 610 return; 611 } 612 } 613 response_cfs.add(CompletableFuture.failedFuture(t)); 614 } 615 } 616 617 void sendBodyImpl() throws IOException, InterruptedException { 618 if (requestContentLen == 0) { 619 // no body 620 requestSent(); 621 return; 622 } 623 DataFrame df; 624 do { 625 df = getDataFrame(); 626 // TODO: check accumulated content length (if not checked below) 627 connection.sendFrame(df); 628 } while (!df.getFlag(DataFrame.END_STREAM)); 629 requestSent(); 630 } 631 632 @Override 633 void cancel() { 634 cancelImpl(new Exception("Cancelled")); 635 } 636 637 638 void cancelImpl(Throwable e) { 639 Log.logTrace("cancelling stream: {0}\n", e.toString()); 640 inputQ.close(); 641 completeResponseExceptionally(e); 642 try { 643 connection.resetStream(streamid, ResetFrame.CANCEL); 644 } catch (IOException | InterruptedException ex) { 645 Log.logError(ex); 646 } 647 } 648 649 @Override 650 CompletableFuture<Void> sendRequestAsync() { 651 CompletableFuture<Void> cf = new CompletableFuture<>(); 652 executor.execute(() -> { 653 try { 654 sendRequest(); 655 cf.complete(null); 656 } catch (IOException |InterruptedException e) { 657 cf.completeExceptionally(e); 658 } 659 }, null); 660 return cf; 661 } 662 663 @Override 664 <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException { 665 this.responseProcessor = processor; 666 T body = processor.onResponseBodyStart( 667 responseContentLen, responseHeaders, 668 responseFlowController); // TODO: filter headers 669 if (body == null) { 670 receiveData(); 671 body = processor.onResponseComplete(); 672 } else 673 receiveDataAsync(processor); 674 responseReceived(); 675 return body; 676 } 677 678 // called from Http2Connection reader thread 679 synchronized void updateOutgoingWindow(int update) { 680 remoteRequestFlowController.accept(update); 681 } 682 683 void close(String msg) { 684 cancel(); 685 } 686 687 static class PushedStream extends Stream { 688 final PushGroup<?> pushGroup; 689 final private Stream parent; // used by server push streams 690 // push streams need the response CF allocated up front as it is 691 // given directly to user via the multi handler callback function. 692 final CompletableFuture<HttpResponseImpl> pushCF; 693 final HttpRequestImpl pushReq; 694 695 PushedStream(PushGroup<?> pushGroup, HttpClientImpl client, 696 Http2Connection connection, Stream parent, 697 HttpRequestImpl pushReq) { 698 super(client, connection, pushReq); 699 this.pushGroup = pushGroup; 700 this.pushReq = pushReq; 701 this.pushCF = new CompletableFuture<>(); 702 this.parent = parent; 703 } 704 705 // Following methods call the super class but in case of 706 // error record it in the PushGroup. The error method is called 707 // with a null value when no error occurred (is a no-op) 708 @Override 709 CompletableFuture<Void> sendBodyAsync() { 710 return super.sendBodyAsync() 711 .whenComplete((v, t) -> pushGroup.pushError(t)); 712 } 713 714 @Override 715 CompletableFuture<Void> sendHeadersAsync() { 716 return super.sendHeadersAsync() 717 .whenComplete((v, t) -> pushGroup.pushError(t)); 718 } 719 720 @Override 721 CompletableFuture<Void> sendRequestAsync() { 722 return super.sendRequestAsync() 723 .whenComplete((v, t) -> pushGroup.pushError(t)); 724 } 725 726 @Override 727 CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) { 728 return pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); 729 } 730 731 @Override 732 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) { 733 return super.responseBodyAsync(processor) 734 .whenComplete((v, t) -> pushGroup.pushError(t)); 735 } 736 737 @Override 738 void completeResponse(HttpResponse r) { 739 HttpResponseImpl resp = (HttpResponseImpl)r; 740 Utils.logResponse(resp); 741 pushCF.complete(resp); 742 } 743 744 @Override 745 void completeResponseExceptionally(Throwable t) { 746 pushCF.completeExceptionally(t); 747 } 748 749 @Override 750 synchronized void responseReceived() { 751 super.responseReceived(); 752 pushGroup.pushCompleted(); 753 } 754 755 // create and return the PushResponseImpl 756 @Override 757 protected void handleResponse() { 758 HttpConnection c = connection.connection; // TODO: improve 759 long statusCode = responseHeaders 760 .firstValueAsLong(":status") 761 .orElse(-1L); 762 763 if (statusCode == -1L) 764 completeResponseExceptionally(new IOException("No status code")); 765 ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS); 766 this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this, 767 c.sslParameters()); 768 this.responseContentLen = responseHeaders 769 .firstValueAsLong("content-length") 770 .orElse(-1L); 771 // different implementations for normal streams and pushed streams 772 completeResponse(response); 773 } 774 } 775 776 /** 777 * One PushGroup object is associated with the parent Stream of 778 * the pushed Streams. This keeps track of all common state associated 779 * with the pushes. 780 */ 781 static class PushGroup<T> { 782 // the overall completion object, completed when all pushes are done. 783 final CompletableFuture<T> resultCF; 784 Throwable error; // any exception that occured during pushes 785 786 // CF for main response 787 final CompletableFuture<HttpResponse> mainResponse; 788 789 // user's processor object 790 final HttpResponse.MultiProcessor<T> multiProcessor; 791 792 // per push handler function provided by processor 793 final private BiFunction<HttpRequest, 794 CompletableFuture<HttpResponse>, 795 Boolean> pushHandler; 796 int numberOfPushes; 797 int remainingPushes; 798 boolean noMorePushes = false; 799 800 PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) { 801 this.resultCF = new CompletableFuture<>(); 802 this.mainResponse = new CompletableFuture<>(); 803 this.multiProcessor = multiProcessor; 804 this.pushHandler = multiProcessor.onStart(req, mainResponse); 805 } 806 807 CompletableFuture<T> groupResult() { 808 return resultCF; 809 } 810 811 CompletableFuture<HttpResponse> mainResponse() { 812 return mainResponse; 813 } 814 815 private BiFunction<HttpRequest, 816 CompletableFuture<HttpResponse>, Boolean> pushHandler() 817 { 818 return pushHandler; 819 } 820 821 synchronized void addPush() { 822 numberOfPushes++; 823 remainingPushes++; 824 } 825 826 synchronized int numberOfPushes() { 827 return numberOfPushes; 828 } 829 // This is called when the main body response completes because it means 830 // no more PUSH_PROMISEs are possible 831 synchronized void noMorePushes() { 832 noMorePushes = true; 833 checkIfCompleted(); 834 } 835 836 synchronized void pushCompleted() { 837 remainingPushes--; 838 checkIfCompleted(); 839 } 840 841 synchronized void checkIfCompleted() { 842 if (remainingPushes == 0 && error == null && noMorePushes) { 843 T overallResult = multiProcessor.onComplete(); 844 resultCF.complete(overallResult); 845 } 846 } 847 848 synchronized void pushError(Throwable t) { 849 if (t == null) 850 return; 851 this.error = t; 852 resultCF.completeExceptionally(t); 853 } 854 } 855 }