1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.net.URI; 30 import java.nio.ByteBuffer; 31 import java.util.ArrayList; 32 import java.util.Arrays; 33 import java.util.List; 34 import java.util.Optional; 35 import java.util.concurrent.CompletableFuture; 36 import java.util.concurrent.CompletionException; 37 import java.util.concurrent.ExecutionException; 38 import java.util.concurrent.Executor; 39 import java.util.concurrent.Flow; 40 import java.util.concurrent.Flow.Subscription; 41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.TimeoutException; 43 import java.util.function.Consumer; 44 45 import jdk.incubator.http.internal.common.*; 46 import jdk.incubator.http.internal.frame.*; 47 import jdk.incubator.http.internal.hpack.DecodingCallback; 48 import static java.util.stream.Collectors.toList; 49 50 /** 51 * Http/2 Stream handling. 52 * 53 * REQUESTS 54 * 55 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q 56 * 57 * sendRequest() -- sendHeadersOnly() + sendBody() 58 * 59 * sendBody() -- in calling thread: obeys all flow control (so may block) 60 * obtains data from request body processor and places on connection 61 * outbound Q. 62 * 63 * sendBodyAsync() -- calls sendBody() in an executor thread. 64 * 65 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block 66 * 67 * sendRequestAsync() -- calls sendRequest() in an executor thread 68 * 69 * RESPONSES 70 * 71 * Multiple responses can be received per request. Responses are queued up on 72 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed 73 * with the next response 74 * 75 * getResponseAsync() -- queries list of response CFs and returns first one 76 * if one exists. Otherwise, creates one and adds it to list 77 * and returns it. Completion is achieved through the 78 * incoming() upcall from connection reader thread. 79 * 80 * getResponse() -- calls getResponseAsync() and waits for CF to complete 81 * 82 * responseBody() -- in calling thread: blocks for incoming DATA frames on 83 * stream inputQ. Obeys remote and local flow control so may block. 84 * Calls user response body processor with data buffers. 85 * 86 * responseBodyAsync() -- calls responseBody() in an executor thread. 87 * 88 * incoming() -- entry point called from connection reader thread. Frames are 89 * either handled immediately without blocking or for data frames 90 * placed on the stream's inputQ which is consumed by the stream's 91 * reader thread. 92 * 93 * PushedStream sub class 94 * ====================== 95 * Sending side methods are not used because the request comes from a PUSH_PROMISE 96 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream 97 * is created. PushedStream does not use responseCF list as there can be only 98 * one response. The CF is created when the object created and when the response 99 * HEADERS frame is received the object is completed. 100 */ 101 class Stream<T> extends ExchangeImpl<T> { 102 103 final AsyncDataReadQueue inputQ = new AsyncDataReadQueue(); 104 105 /** 106 * This stream's identifier. Assigned lazily by the HTTP2Connection before 107 * the stream's first frame is sent. 108 */ 109 protected volatile int streamid; 110 111 long responseContentLen = -1; 112 long responseBytesProcessed = 0; 113 long requestContentLen; 114 115 final Http2Connection connection; 116 HttpClientImpl client; 117 final HttpRequestImpl request; 118 final DecodingCallback rspHeadersConsumer; 119 HttpHeadersImpl responseHeaders; 120 final HttpHeadersImpl requestHeaders; 121 final HttpHeadersImpl requestPseudoHeaders; 122 HttpResponse.BodyProcessor<T> responseProcessor; 123 final HttpRequest.BodyProcessor requestProcessor; 124 volatile int responseCode; 125 volatile Response response; 126 volatile CompletableFuture<Response> responseCF; 127 final AbstractPushPublisher<List<ByteBuffer>> publisher; 128 final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>(); 129 130 /** True if END_STREAM has been seen in a frame received on this stream. */ 131 private volatile boolean remotelyClosed; 132 private volatile boolean closed; 133 private volatile boolean endStreamSent; 134 135 // state flags 136 boolean requestSent, responseReceived, responseHeadersReceived; 137 138 /** 139 * A reference to this Stream's connection Send Window controller. The 140 * stream MUST acquire the appropriate amount of Send Window before 141 * sending any data. Will be null for PushStreams, as they cannot send data. 142 */ 143 private final WindowController windowController; 144 private final WindowUpdateSender windowUpdater; 145 146 @Override 147 HttpConnection connection() { 148 return connection.connection; 149 } 150 151 @Override 152 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, 153 boolean returnConnectionToPool, 154 Executor executor) 155 { 156 Log.logTrace("Reading body on stream {0}", streamid); 157 responseProcessor = handler.apply(responseCode, responseHeaders); 158 publisher.subscribe(responseProcessor); 159 CompletableFuture<T> cf = receiveData(executor); 160 161 PushGroup<?,?> pg = exchange.getPushGroup(); 162 if (pg != null) { 163 // if an error occurs make sure it is recorded in the PushGroup 164 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 165 } 166 return cf; 167 } 168 169 @Override 170 T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool) 171 throws IOException 172 { 173 CompletableFuture<T> cf = readBodyAsync(handler, 174 returnConnectionToPool, 175 null); 176 try { 177 return cf.join(); 178 } catch (CompletionException e) { 179 throw Utils.getIOException(e); 180 } 181 } 182 183 @Override 184 public String toString() { 185 StringBuilder sb = new StringBuilder(); 186 sb.append("streamid: ") 187 .append(streamid); 188 return sb.toString(); 189 } 190 191 private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException { 192 if (frame instanceof ResetFrame) { 193 handleReset((ResetFrame) frame); 194 return true; 195 } else if (!(frame instanceof DataFrame)) { 196 assert false; 197 return true; 198 } 199 DataFrame df = (DataFrame) frame; 200 // RFC 7540 6.1: 201 // The entire DATA frame payload is included in flow control, 202 // including the Pad Length and Padding fields if present 203 int len = df.payloadLength(); 204 ByteBufferReference[] buffers = df.getData(); 205 List<ByteBuffer> dsts = Arrays.stream(buffers) 206 .map(ByteBufferReference::get) 207 .filter(ByteBuffer::hasRemaining) 208 .collect(toList()); 209 if (!dsts.isEmpty()) { 210 publisher.acceptData(Optional.of(dsts)); 211 } 212 connection.windowUpdater.update(len); 213 if (df.getFlag(DataFrame.END_STREAM)) { 214 setEndStreamReceived(); 215 publisher.acceptData(Optional.empty()); 216 return false; 217 } 218 // Don't send window update on a stream which is 219 // closed or half closed. 220 windowUpdater.update(len); 221 return true; 222 } 223 224 // pushes entire response body into response processor 225 // blocking when required by local or remote flow control 226 CompletableFuture<T> receiveData(Executor executor) { 227 CompletableFuture<T> cf = responseProcessor 228 .getBody() 229 .toCompletableFuture(); 230 Consumer<Throwable> onError = e -> { 231 Log.logTrace("receiveData: {0}", e.toString()); 232 e.printStackTrace(); 233 cf.completeExceptionally(e); 234 publisher.acceptError(e); 235 }; 236 if (executor == null) { 237 inputQ.blockingReceive(this::receiveDataFrame, onError); 238 } else { 239 inputQ.asyncReceive(executor, this::receiveDataFrame, onError); 240 } 241 return cf; 242 } 243 244 @Override 245 void sendBody() throws IOException { 246 try { 247 sendBodyImpl().join(); 248 } catch (CompletionException e) { 249 throw Utils.getIOException(e); 250 } 251 } 252 253 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 254 return sendBodyImpl().thenApply( v -> this); 255 } 256 257 @SuppressWarnings("unchecked") 258 Stream(HttpClientImpl client, 259 Http2Connection connection, 260 Exchange<T> e, 261 WindowController windowController) 262 { 263 super(e); 264 this.client = client; 265 this.connection = connection; 266 this.windowController = windowController; 267 this.request = e.request(); 268 this.requestProcessor = request.requestProcessor; 269 responseHeaders = new HttpHeadersImpl(); 270 requestHeaders = new HttpHeadersImpl(); 271 rspHeadersConsumer = (name, value) -> { 272 responseHeaders.addHeader(name.toString(), value.toString()); 273 if (Log.headers() && Log.trace()) { 274 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}", 275 streamid, name, value); 276 } 277 }; 278 this.requestPseudoHeaders = new HttpHeadersImpl(); 279 // NEW 280 this.publisher = new BlockingPushPublisher<>(); 281 this.windowUpdater = new StreamWindowUpdateSender(connection); 282 } 283 284 /** 285 * Entry point from Http2Connection reader thread. 286 * 287 * Data frames will be removed by response body thread. 288 */ 289 void incoming(Http2Frame frame) throws IOException { 290 if ((frame instanceof HeaderFrame)) { 291 HeaderFrame hframe = (HeaderFrame)frame; 292 if (hframe.endHeaders()) { 293 Log.logTrace("handling response (streamid={0})", streamid); 294 handleResponse(); 295 if (hframe.getFlag(HeaderFrame.END_STREAM)) { 296 inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0])); 297 } 298 } 299 } else if (frame instanceof DataFrame) { 300 inputQ.put(frame); 301 } else { 302 otherFrame(frame); 303 } 304 } 305 306 void otherFrame(Http2Frame frame) throws IOException { 307 switch (frame.type()) { 308 case WindowUpdateFrame.TYPE: 309 incoming_windowUpdate((WindowUpdateFrame) frame); 310 break; 311 case ResetFrame.TYPE: 312 incoming_reset((ResetFrame) frame); 313 break; 314 case PriorityFrame.TYPE: 315 incoming_priority((PriorityFrame) frame); 316 break; 317 default: 318 String msg = "Unexpected frame: " + frame.toString(); 319 throw new IOException(msg); 320 } 321 } 322 323 // The Hpack decoder decodes into one of these consumers of name,value pairs 324 325 DecodingCallback rspHeadersConsumer() { 326 return rspHeadersConsumer; 327 } 328 329 protected void handleResponse() throws IOException { 330 synchronized(this) { 331 responseHeadersReceived = true; 332 } 333 HttpConnection c = connection.connection; // TODO: improve 334 responseCode = (int)responseHeaders 335 .firstValueAsLong(":status") 336 .orElseThrow(() -> new IOException("no statuscode in response")); 337 338 response = new Response( 339 request, exchange, responseHeaders, 340 responseCode, HttpClient.Version.HTTP_2); 341 342 this.responseContentLen = responseHeaders 343 .firstValueAsLong("content-length") 344 .orElse(-1L); 345 346 if (Log.headers()) { 347 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 348 Log.dumpHeaders(sb, " ", responseHeaders); 349 Log.logHeaders(sb.toString()); 350 } 351 352 completeResponse(response); 353 } 354 355 void incoming_reset(ResetFrame frame) throws IOException { 356 Log.logTrace("Received RST_STREAM on stream {0}", streamid); 357 if (endStreamReceived()) { 358 Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); 359 } else if (closed) { 360 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 361 } else { 362 boolean pushedToQueue = false; 363 synchronized(this) { 364 // if the response headers are not yet 365 // received, or the inputQueue is closed, handle reset directly. 366 // Otherwise, put it in the input queue in order to read all 367 // pending data frames first. Indeed, a server may send 368 // RST_STREAM after sending END_STREAM, in which case we should 369 // ignore it. However, we won't know if we have received END_STREAM 370 // or not until all pending data frames are read. 371 // Because the inputQ will not be read until the response 372 // headers are received, and because response headers won't be 373 // sent if the server sent RST_STREAM, then we must handle 374 // reset here directly unless responseHeadersReceived is true. 375 pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame); 376 } 377 if (!pushedToQueue) { 378 // RST_STREAM was not pushed to the queue: handle it. 379 try { 380 handleReset(frame); 381 } catch (IOException io) { 382 completeResponseExceptionally(io); 383 } 384 } else { 385 // RST_STREAM was pushed to the queue. It will be handled by 386 // asyncReceive after all pending data frames have been 387 // processed. 388 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); 389 } 390 } 391 } 392 393 void handleReset(ResetFrame frame) throws IOException { 394 Log.logTrace("Handling RST_STREAM on stream {0}", streamid); 395 if (!closed) { 396 close(); 397 int error = frame.getErrorCode(); 398 throw new IOException(ErrorFrame.stringForCode(error)); 399 } else { 400 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 401 } 402 } 403 404 void incoming_priority(PriorityFrame frame) { 405 // TODO: implement priority 406 throw new UnsupportedOperationException("Not implemented"); 407 } 408 409 private void incoming_windowUpdate(WindowUpdateFrame frame) 410 throws IOException 411 { 412 int amount = frame.getUpdate(); 413 if (amount <= 0) { 414 Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n", 415 streamid, streamid, amount); 416 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 417 } else { 418 assert streamid != 0; 419 boolean success = windowController.increaseStreamWindow(amount, streamid); 420 if (!success) { // overflow 421 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 422 } 423 } 424 } 425 426 void incoming_pushPromise(HttpRequestImpl pushReq, 427 PushedStream<?,T> pushStream) 428 throws IOException 429 { 430 if (Log.requests()) { 431 Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); 432 } 433 PushGroup<?,T> pushGroup = exchange.getPushGroup(); 434 if (pushGroup == null || pushGroup.noMorePushes()) { 435 cancelImpl(new IllegalStateException("unexpected push promise" 436 + " on stream " + streamid)); 437 } 438 439 HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor(); 440 441 CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF(); 442 443 Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest( 444 pushReq); 445 446 if (!bpOpt.isPresent()) { 447 IOException ex = new IOException("Stream " 448 + streamid + " cancelled by user"); 449 if (Log.trace()) { 450 Log.logTrace("No body processor for {0}: {1}", pushReq, 451 ex.getMessage()); 452 } 453 pushStream.cancelImpl(ex); 454 cf.completeExceptionally(ex); 455 return; 456 } 457 458 pushGroup.addPush(); 459 pushStream.requestSent(); 460 pushStream.setPushHandler(bpOpt.get()); 461 // setup housekeeping for when the push is received 462 // TODO: deal with ignoring of CF anti-pattern 463 cf.whenComplete((HttpResponse<T> resp, Throwable t) -> { 464 if (Log.trace()) { 465 Log.logTrace("Push completed on stream {0} for {1}{2}", 466 pushStream.streamid, resp, 467 ((t==null) ? "": " with exception " + t)); 468 } 469 if (t != null) { 470 pushGroup.pushError(t); 471 proc.onError(pushReq, t); 472 } else { 473 proc.onResponse(resp); 474 } 475 pushGroup.pushCompleted(); 476 }); 477 478 } 479 480 private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) { 481 HttpHeadersImpl h = request.getSystemHeaders(); 482 if (contentLength > 0) { 483 h.setHeader("content-length", Long.toString(contentLength)); 484 } 485 setPseudoHeaderFields(); 486 OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(h, request.getUserHeaders(), this); 487 if (contentLength == 0) { 488 f.setFlag(HeadersFrame.END_STREAM); 489 endStreamSent = true; 490 } 491 return f; 492 } 493 494 private void setPseudoHeaderFields() { 495 HttpHeadersImpl hdrs = requestPseudoHeaders; 496 String method = request.method(); 497 hdrs.setHeader(":method", method); 498 URI uri = request.uri(); 499 hdrs.setHeader(":scheme", uri.getScheme()); 500 // TODO: userinfo deprecated. Needs to be removed 501 hdrs.setHeader(":authority", uri.getAuthority()); 502 // TODO: ensure header names beginning with : not in user headers 503 String query = uri.getQuery(); 504 String path = uri.getPath(); 505 if (path == null || path.isEmpty()) { 506 if (method.equalsIgnoreCase("OPTIONS")) { 507 path = "*"; 508 } else { 509 path = "/"; 510 } 511 } 512 if (query != null) { 513 path += "?" + query; 514 } 515 hdrs.setHeader(":path", path); 516 } 517 518 HttpHeadersImpl getRequestPseudoHeaders() { 519 return requestPseudoHeaders; 520 } 521 522 @Override 523 Response getResponse() throws IOException { 524 try { 525 if (request.duration() != null) { 526 Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)", 527 streamid, 528 request.duration().toMillis()); 529 return getResponseAsync(null).get( 530 request.duration().toMillis(), TimeUnit.MILLISECONDS); 531 } else { 532 Log.logTrace("Waiting for response (streamid={0})", streamid); 533 return getResponseAsync(null).join(); 534 } 535 } catch (TimeoutException e) { 536 Log.logTrace("Response timeout (streamid={0})", streamid); 537 throw new HttpTimeoutException("Response timed out"); 538 } catch (InterruptedException | ExecutionException | CompletionException e) { 539 Throwable t = e.getCause(); 540 Log.logTrace("Response failed (streamid={0}): {1}", streamid, t); 541 if (t instanceof IOException) { 542 throw (IOException)t; 543 } 544 throw new IOException(e); 545 } finally { 546 Log.logTrace("Got response or failed (streamid={0})", streamid); 547 } 548 } 549 550 /** Sets endStreamReceived. Should be called only once. */ 551 void setEndStreamReceived() { 552 assert remotelyClosed == false: "Unexpected endStream already set"; 553 remotelyClosed = true; 554 responseReceived(); 555 } 556 557 /** Tells whether, or not, the END_STREAM Flag has been seen in any frame 558 * received on this stream. */ 559 private boolean endStreamReceived() { 560 return remotelyClosed; 561 } 562 563 @Override 564 void sendHeadersOnly() throws IOException, InterruptedException { 565 if (Log.requests() && request != null) { 566 Log.logRequest(request.toString()); 567 } 568 requestContentLen = requestProcessor.contentLength(); 569 OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen); 570 connection.sendFrame(f); 571 } 572 573 void registerStream(int id) { 574 this.streamid = id; 575 connection.putStream(this, streamid); 576 } 577 578 class RequestSubscriber implements Flow.Subscriber<ByteBuffer> { 579 // can be < 0 if the actual length is not known. 580 private volatile long remainingContentLength; 581 private volatile Subscription subscription; 582 583 RequestSubscriber(long contentLen) { 584 this.remainingContentLength = contentLen; 585 } 586 587 @Override 588 public void onSubscribe(Flow.Subscription subscription) { 589 if (this.subscription != null) { 590 throw new IllegalStateException(); 591 } 592 this.subscription = subscription; 593 subscription.request(1); 594 } 595 596 @Override 597 public void onNext(ByteBuffer item) { 598 if (requestBodyCF.isDone()) { 599 throw new IllegalStateException(); 600 } 601 602 try { 603 while (item.hasRemaining()) { 604 assert !endStreamSent : "internal error, send data after END_STREAM flag"; 605 DataFrame df = getDataFrame(item); 606 if (remainingContentLength > 0) { 607 remainingContentLength -= df.getDataLength(); 608 assert remainingContentLength >= 0; 609 if (remainingContentLength == 0) { 610 df.setFlag(DataFrame.END_STREAM); 611 endStreamSent = true; 612 } 613 } 614 connection.sendDataFrame(df); 615 } 616 subscription.request(1); 617 } catch (InterruptedException ex) { 618 subscription.cancel(); 619 requestBodyCF.completeExceptionally(ex); 620 } 621 } 622 623 @Override 624 public void onError(Throwable throwable) { 625 if (requestBodyCF.isDone()) { 626 return; 627 } 628 subscription.cancel(); 629 requestBodyCF.completeExceptionally(throwable); 630 } 631 632 @Override 633 public void onComplete() { 634 assert endStreamSent || remainingContentLength < 0; 635 try { 636 if (!endStreamSent) { 637 endStreamSent = true; 638 connection.sendDataFrame(getEmptyEndStreamDataFrame()); 639 } 640 requestBodyCF.complete(null); 641 } catch (InterruptedException ex) { 642 requestBodyCF.completeExceptionally(ex); 643 } 644 } 645 } 646 647 DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException { 648 int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); 649 // blocks waiting for stream send window, if exhausted 650 int actualAmount = windowController.tryAcquire(requestAmount, streamid); 651 ByteBuffer outBuf = Utils.slice(buffer, actualAmount); 652 DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf)); 653 return df; 654 } 655 656 private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException { 657 return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]); 658 } 659 660 /** 661 * A List of responses relating to this stream. Normally there is only 662 * one response, but intermediate responses like 100 are allowed 663 * and must be passed up to higher level before continuing. Deals with races 664 * such as if responses are returned before the CFs get created by 665 * getResponseAsync() 666 */ 667 668 final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5); 669 670 @Override 671 CompletableFuture<Response> getResponseAsync(Executor executor) { 672 CompletableFuture<Response> cf = null; 673 // The code below deals with race condition that can be caused when 674 // completeResponse() is being called before getResponseAsync() 675 synchronized (response_cfs) { 676 if (!response_cfs.isEmpty()) { 677 // This CompletableFuture was created by completeResponse(). 678 // it will be already completed. 679 cf = response_cfs.remove(0); 680 // if we find a cf here it should be already completed. 681 // finding a non completed cf should not happen. just assert it. 682 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!"; 683 } else { 684 // getResponseAsync() is called first. Create a CompletableFuture 685 // that will be completed by completeResponse() when 686 // completeResponse() is called. 687 cf = new MinimalFuture<>(); 688 response_cfs.add(cf); 689 } 690 } 691 if (executor != null && !cf.isDone()) { 692 // protect from executing later chain of CompletableFuture operations from SelectorManager thread 693 cf = cf.thenApplyAsync(r -> r, executor); 694 } 695 Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf); 696 PushGroup<?,?> pg = exchange.getPushGroup(); 697 if (pg != null) { 698 // if an error occurs make sure it is recorded in the PushGroup 699 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 700 } 701 return cf; 702 } 703 704 /** 705 * Completes the first uncompleted CF on list, and removes it. If there is no 706 * uncompleted CF then creates one (completes it) and adds to list 707 */ 708 void completeResponse(Response resp) { 709 synchronized (response_cfs) { 710 CompletableFuture<Response> cf; 711 int cfs_len = response_cfs.size(); 712 for (int i=0; i<cfs_len; i++) { 713 cf = response_cfs.get(i); 714 if (!cf.isDone()) { 715 Log.logTrace("Completing response (streamid={0}): {1}", 716 streamid, cf); 717 cf.complete(resp); 718 response_cfs.remove(cf); 719 return; 720 } // else we found the previous response: just leave it alone. 721 } 722 cf = MinimalFuture.completedFuture(resp); 723 Log.logTrace("Created completed future (streamid={0}): {1}", 724 streamid, cf); 725 response_cfs.add(cf); 726 } 727 } 728 729 // methods to update state and remove stream when finished 730 731 synchronized void requestSent() { 732 requestSent = true; 733 if (responseReceived) { 734 close(); 735 } 736 } 737 738 final synchronized boolean isResponseReceived() { 739 return responseReceived; 740 } 741 742 synchronized void responseReceived() { 743 responseReceived = true; 744 if (requestSent) { 745 close(); 746 } 747 } 748 749 /** 750 * same as above but for errors 751 */ 752 void completeResponseExceptionally(Throwable t) { 753 synchronized (response_cfs) { 754 // use index to avoid ConcurrentModificationException 755 // caused by removing the CF from within the loop. 756 for (int i = 0; i < response_cfs.size(); i++) { 757 CompletableFuture<Response> cf = response_cfs.get(i); 758 if (!cf.isDone()) { 759 cf.completeExceptionally(t); 760 response_cfs.remove(i); 761 return; 762 } 763 } 764 response_cfs.add(MinimalFuture.failedFuture(t)); 765 } 766 } 767 768 CompletableFuture<Void> sendBodyImpl() { 769 RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); 770 requestProcessor.subscribe(subscriber); 771 requestBodyCF.whenComplete((v,t) -> requestSent()); 772 return requestBodyCF; 773 } 774 775 @Override 776 void cancel() { 777 cancel(new IOException("Stream " + streamid + " cancelled")); 778 } 779 780 @Override 781 void cancel(IOException cause) { 782 cancelImpl(cause); 783 } 784 785 // This method sends a RST_STREAM frame 786 void cancelImpl(Throwable e) { 787 if (Log.trace()) { 788 Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); 789 } 790 boolean closing; 791 if (closing = !closed) { // assigning closing to !closed 792 synchronized (this) { 793 if (closing = !closed) { // assigning closing to !closed 794 closed=true; 795 } 796 } 797 } 798 if (closing) { // true if the stream has not been closed yet 799 inputQ.close(); 800 } 801 completeResponseExceptionally(e); 802 try { 803 // will send a RST_STREAM frame 804 if (streamid != 0) { 805 connection.resetStream(streamid, ResetFrame.CANCEL); 806 } 807 } catch (IOException ex) { 808 Log.logError(ex); 809 } 810 } 811 812 // This method doesn't send any frame 813 void close() { 814 if (closed) return; 815 synchronized(this) { 816 if (closed) return; 817 closed = true; 818 } 819 Log.logTrace("Closing stream {0}", streamid); 820 inputQ.close(); 821 connection.closeStream(streamid); 822 Log.logTrace("Stream {0} closed", streamid); 823 } 824 825 static class PushedStream<U,T> extends Stream<T> { 826 final PushGroup<U,T> pushGroup; 827 private final Stream<T> parent; // used by server push streams 828 // push streams need the response CF allocated up front as it is 829 // given directly to user via the multi handler callback function. 830 final CompletableFuture<Response> pushCF; 831 final CompletableFuture<HttpResponse<T>> responseCF; 832 final HttpRequestImpl pushReq; 833 HttpResponse.BodyHandler<T> pushHandler; 834 835 PushedStream(PushGroup<U,T> pushGroup, HttpClientImpl client, 836 Http2Connection connection, Stream<T> parent, 837 Exchange<T> pushReq) { 838 // ## no request body possible, null window controller 839 super(client, connection, pushReq, null); 840 this.pushGroup = pushGroup; 841 this.pushReq = pushReq.request(); 842 this.pushCF = new MinimalFuture<>(); 843 this.responseCF = new MinimalFuture<>(); 844 this.parent = parent; 845 } 846 847 CompletableFuture<HttpResponse<T>> responseCF() { 848 return responseCF; 849 } 850 851 synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) { 852 this.pushHandler = pushHandler; 853 } 854 855 synchronized HttpResponse.BodyHandler<T> getPushHandler() { 856 // ignored parameters to function can be used as BodyHandler 857 return this.pushHandler; 858 } 859 860 // Following methods call the super class but in case of 861 // error record it in the PushGroup. The error method is called 862 // with a null value when no error occurred (is a no-op) 863 @Override 864 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 865 return super.sendBodyAsync() 866 .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t)); 867 } 868 869 @Override 870 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 871 return super.sendHeadersAsync() 872 .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t)); 873 } 874 875 @Override 876 CompletableFuture<Response> getResponseAsync(Executor executor) { 877 CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); 878 if(executor!=null && !cf.isDone()) { 879 cf = cf.thenApplyAsync( r -> r, executor); 880 } 881 return cf; 882 } 883 884 @Override 885 CompletableFuture<T> readBodyAsync( 886 HttpResponse.BodyHandler<T> handler, 887 boolean returnConnectionToPool, 888 Executor executor) 889 { 890 return super.readBodyAsync(handler, returnConnectionToPool, executor) 891 .whenComplete((v, t) -> pushGroup.pushError(t)); 892 } 893 894 @Override 895 void completeResponse(Response r) { 896 HttpResponseImpl.logResponse(r); 897 pushCF.complete(r); // not strictly required for push API 898 // start reading the body using the obtained BodyProcessor 899 CompletableFuture<Void> start = new MinimalFuture<>(); 900 start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor())) 901 .whenComplete((T body, Throwable t) -> { 902 if (t != null) { 903 responseCF.completeExceptionally(t); 904 } else { 905 HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange()); 906 responseCF.complete(response); 907 } 908 }); 909 start.completeAsync(() -> null, getExchange().executor()); 910 } 911 912 @Override 913 void completeResponseExceptionally(Throwable t) { 914 pushCF.completeExceptionally(t); 915 } 916 917 @Override 918 synchronized void responseReceived() { 919 super.responseReceived(); 920 } 921 922 // create and return the PushResponseImpl 923 @Override 924 protected void handleResponse() { 925 HttpConnection c = connection.connection; // TODO: improve 926 responseCode = (int)responseHeaders 927 .firstValueAsLong(":status") 928 .orElse(-1); 929 930 if (responseCode == -1) { 931 completeResponseExceptionally(new IOException("No status code")); 932 } 933 934 this.response = new Response( 935 pushReq, exchange, responseHeaders, 936 responseCode, HttpClient.Version.HTTP_2); 937 938 this.responseContentLen = responseHeaders 939 .firstValueAsLong("content-length") 940 .orElse(-1L); 941 942 if (Log.headers()) { 943 StringBuilder sb = new StringBuilder("RESPONSE HEADERS"); 944 sb.append(" (streamid=").append(streamid).append("): "); 945 Log.dumpHeaders(sb, " ", responseHeaders); 946 Log.logHeaders(sb.toString()); 947 } 948 949 // different implementations for normal streams and pushed streams 950 completeResponse(response); 951 } 952 } 953 954 final class StreamWindowUpdateSender extends WindowUpdateSender { 955 956 StreamWindowUpdateSender(Http2Connection connection) { 957 super(connection); 958 } 959 960 @Override 961 int getStreamId() { 962 return streamid; 963 } 964 } 965 966 }