1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.net.URI; 30 import java.nio.ByteBuffer; 31 import java.util.ArrayList; 32 import java.util.List; 33 import java.util.Optional; 34 import java.util.concurrent.CompletableFuture; 35 import java.util.concurrent.CompletionException; 36 import java.util.concurrent.ExecutionException; 37 import java.util.concurrent.Executor; 38 import java.util.concurrent.Flow; 39 import java.util.concurrent.Flow.Subscription; 40 import java.util.concurrent.TimeUnit; 41 import java.util.concurrent.TimeoutException; 42 import java.util.function.Consumer; 43 44 import jdk.incubator.http.internal.common.*; 45 import jdk.incubator.http.internal.frame.*; 46 import jdk.incubator.http.internal.hpack.DecodingCallback; 47 48 /** 49 * Http/2 Stream handling. 50 * 51 * REQUESTS 52 * 53 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q 54 * 55 * sendRequest() -- sendHeadersOnly() + sendBody() 56 * 57 * sendBody() -- in calling thread: obeys all flow control (so may block) 58 * obtains data from request body processor and places on connection 59 * outbound Q. 60 * 61 * sendBodyAsync() -- calls sendBody() in an executor thread. 62 * 63 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block 64 * 65 * sendRequestAsync() -- calls sendRequest() in an executor thread 66 * 67 * RESPONSES 68 * 69 * Multiple responses can be received per request. Responses are queued up on 70 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed 71 * with the next response 72 * 73 * getResponseAsync() -- queries list of response CFs and returns first one 74 * if one exists. Otherwise, creates one and adds it to list 75 * and returns it. Completion is achieved through the 76 * incoming() upcall from connection reader thread. 77 * 78 * getResponse() -- calls getResponseAsync() and waits for CF to complete 79 * 80 * responseBody() -- in calling thread: blocks for incoming DATA frames on 81 * stream inputQ. Obeys remote and local flow control so may block. 82 * Calls user response body processor with data buffers. 83 * 84 * responseBodyAsync() -- calls responseBody() in an executor thread. 85 * 86 * incoming() -- entry point called from connection reader thread. Frames are 87 * either handled immediately without blocking or for data frames 88 * placed on the stream's inputQ which is consumed by the stream's 89 * reader thread. 90 * 91 * PushedStream sub class 92 * ====================== 93 * Sending side methods are not used because the request comes from a PUSH_PROMISE 94 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream 95 * is created. PushedStream does not use responseCF list as there can be only 96 * one response. The CF is created when the object created and when the response 97 * HEADERS frame is received the object is completed. 98 */ 99 class Stream<T> extends ExchangeImpl<T> { 100 101 final AsyncDataReadQueue inputQ = new AsyncDataReadQueue(); 102 103 /** 104 * This stream's identifier. Assigned lazily by the HTTP2Connection before 105 * the stream's first frame is sent. 106 */ 107 protected volatile int streamid; 108 109 long responseContentLen = -1; 110 long responseBytesProcessed = 0; 111 long requestContentLen; 112 113 final Http2Connection connection; 114 HttpClientImpl client; 115 final HttpRequestImpl request; 116 final DecodingCallback rspHeadersConsumer; 117 HttpHeadersImpl responseHeaders; 118 final HttpHeadersImpl requestHeaders; 119 final HttpHeadersImpl requestPseudoHeaders; 120 HttpResponse.BodyProcessor<T> responseProcessor; 121 final HttpRequest.BodyProcessor requestProcessor; 122 volatile int responseCode; 123 volatile Response response; 124 volatile CompletableFuture<Response> responseCF; 125 final AbstractPushPublisher<ByteBuffer> publisher; 126 final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>(); 127 128 /** True if END_STREAM has been seen in a frame received on this stream. */ 129 private volatile boolean remotelyClosed; 130 private volatile boolean closed; 131 private volatile boolean endStreamSent; 132 133 // state flags 134 boolean requestSent, responseReceived, responseHeadersReceived; 135 136 /** 137 * A reference to this Stream's connection Send Window controller. The 138 * stream MUST acquire the appropriate amount of Send Window before 139 * sending any data. Will be null for PushStreams, as they cannot send data. 140 */ 141 private final WindowController windowController; 142 private final WindowUpdateSender windowUpdater; 143 144 @Override 145 HttpConnection connection() { 146 return connection.connection; 147 } 148 149 @Override 150 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, 151 boolean returnConnectionToPool, 152 Executor executor) 153 { 154 Log.logTrace("Reading body on stream {0}", streamid); 155 responseProcessor = handler.apply(responseCode, responseHeaders); 156 publisher.subscribe(responseProcessor); 157 CompletableFuture<T> cf = receiveData(executor); 158 159 PushGroup<?,?> pg = exchange.getPushGroup(); 160 if (pg != null) { 161 // if an error occurs make sure it is recorded in the PushGroup 162 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 163 } 164 return cf; 165 } 166 167 @Override 168 T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool) 169 throws IOException 170 { 171 CompletableFuture<T> cf = readBodyAsync(handler, 172 returnConnectionToPool, 173 null); 174 try { 175 return cf.join(); 176 } catch (CompletionException e) { 177 throw Utils.getIOException(e); 178 } 179 } 180 181 @Override 182 public String toString() { 183 StringBuilder sb = new StringBuilder(); 184 sb.append("streamid: ") 185 .append(streamid); 186 return sb.toString(); 187 } 188 189 private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException { 190 if (frame instanceof ResetFrame) { 191 handleReset((ResetFrame) frame); 192 return true; 193 } else if (!(frame instanceof DataFrame)) { 194 assert false; 195 return true; 196 } 197 DataFrame df = (DataFrame) frame; 198 // RFC 7540 6.1: 199 // The entire DATA frame payload is included in flow control, 200 // including the Pad Length and Padding fields if present 201 int len = df.payloadLength(); 202 ByteBufferReference[] buffers = df.getData(); 203 for (ByteBufferReference b : buffers) { 204 ByteBuffer buf = b.get(); 205 if (buf.hasRemaining()) { 206 publisher.acceptData(Optional.of(buf)); 207 } 208 } 209 connection.windowUpdater.update(len); 210 if (df.getFlag(DataFrame.END_STREAM)) { 211 setEndStreamReceived(); 212 publisher.acceptData(Optional.empty()); 213 return false; 214 } 215 // Don't send window update on a stream which is 216 // closed or half closed. 217 windowUpdater.update(len); 218 return true; 219 } 220 221 // pushes entire response body into response processor 222 // blocking when required by local or remote flow control 223 CompletableFuture<T> receiveData(Executor executor) { 224 CompletableFuture<T> cf = responseProcessor 225 .getBody() 226 .toCompletableFuture(); 227 Consumer<Throwable> onError = e -> { 228 Log.logTrace("receiveData: {0}", e.toString()); 229 e.printStackTrace(); 230 cf.completeExceptionally(e); 231 publisher.acceptError(e); 232 }; 233 if (executor == null) { 234 inputQ.blockingReceive(this::receiveDataFrame, onError); 235 } else { 236 inputQ.asyncReceive(executor, this::receiveDataFrame, onError); 237 } 238 return cf; 239 } 240 241 @Override 242 void sendBody() throws IOException { 243 try { 244 sendBodyImpl().join(); 245 } catch (CompletionException e) { 246 throw Utils.getIOException(e); 247 } 248 } 249 250 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 251 return sendBodyImpl().thenApply( v -> this); 252 } 253 254 @SuppressWarnings("unchecked") 255 Stream(HttpClientImpl client, 256 Http2Connection connection, 257 Exchange<T> e, 258 WindowController windowController) 259 { 260 super(e); 261 this.client = client; 262 this.connection = connection; 263 this.windowController = windowController; 264 this.request = e.request(); 265 this.requestProcessor = request.requestProcessor; 266 responseHeaders = new HttpHeadersImpl(); 267 requestHeaders = new HttpHeadersImpl(); 268 rspHeadersConsumer = (name, value) -> { 269 responseHeaders.addHeader(name.toString(), value.toString()); 270 if (Log.headers() && Log.trace()) { 271 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}", 272 streamid, name, value); 273 } 274 }; 275 this.requestPseudoHeaders = new HttpHeadersImpl(); 276 // NEW 277 this.publisher = new BlockingPushPublisher<>(); 278 this.windowUpdater = new StreamWindowUpdateSender(connection); 279 } 280 281 /** 282 * Entry point from Http2Connection reader thread. 283 * 284 * Data frames will be removed by response body thread. 285 */ 286 void incoming(Http2Frame frame) throws IOException { 287 if ((frame instanceof HeaderFrame)) { 288 HeaderFrame hframe = (HeaderFrame)frame; 289 if (hframe.endHeaders()) { 290 Log.logTrace("handling response (streamid={0})", streamid); 291 handleResponse(); 292 if (hframe.getFlag(HeaderFrame.END_STREAM)) { 293 inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0])); 294 } 295 } 296 } else if (frame instanceof DataFrame) { 297 inputQ.put(frame); 298 } else { 299 otherFrame(frame); 300 } 301 } 302 303 void otherFrame(Http2Frame frame) throws IOException { 304 switch (frame.type()) { 305 case WindowUpdateFrame.TYPE: 306 incoming_windowUpdate((WindowUpdateFrame) frame); 307 break; 308 case ResetFrame.TYPE: 309 incoming_reset((ResetFrame) frame); 310 break; 311 case PriorityFrame.TYPE: 312 incoming_priority((PriorityFrame) frame); 313 break; 314 default: 315 String msg = "Unexpected frame: " + frame.toString(); 316 throw new IOException(msg); 317 } 318 } 319 320 // The Hpack decoder decodes into one of these consumers of name,value pairs 321 322 DecodingCallback rspHeadersConsumer() { 323 return rspHeadersConsumer; 324 } 325 326 protected void handleResponse() throws IOException { 327 synchronized(this) { 328 responseHeadersReceived = true; 329 } 330 HttpConnection c = connection.connection; // TODO: improve 331 responseCode = (int)responseHeaders 332 .firstValueAsLong(":status") 333 .orElseThrow(() -> new IOException("no statuscode in response")); 334 335 response = new Response( 336 request, exchange, responseHeaders, 337 responseCode, HttpClient.Version.HTTP_2); 338 339 this.responseContentLen = responseHeaders 340 .firstValueAsLong("content-length") 341 .orElse(-1L); 342 343 if (Log.headers()) { 344 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 345 Log.dumpHeaders(sb, " ", responseHeaders); 346 Log.logHeaders(sb.toString()); 347 } 348 349 completeResponse(response); 350 } 351 352 void incoming_reset(ResetFrame frame) throws IOException { 353 Log.logTrace("Received RST_STREAM on stream {0}", streamid); 354 if (endStreamReceived()) { 355 Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); 356 } else if (closed) { 357 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 358 } else { 359 boolean pushedToQueue = false; 360 synchronized(this) { 361 // if the response headers are not yet 362 // received, or the inputQueue is closed, handle reset directly. 363 // Otherwise, put it in the input queue in order to read all 364 // pending data frames first. Indeed, a server may send 365 // RST_STREAM after sending END_STREAM, in which case we should 366 // ignore it. However, we won't know if we have received END_STREAM 367 // or not until all pending data frames are read. 368 // Because the inputQ will not be read until the response 369 // headers are received, and because response headers won't be 370 // sent if the server sent RST_STREAM, then we must handle 371 // reset here directly unless responseHeadersReceived is true. 372 pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame); 373 } 374 if (!pushedToQueue) { 375 // RST_STREAM was not pushed to the queue: handle it. 376 try { 377 handleReset(frame); 378 } catch (IOException io) { 379 completeResponseExceptionally(io); 380 } 381 } else { 382 // RST_STREAM was pushed to the queue. It will be handled by 383 // asyncReceive after all pending data frames have been 384 // processed. 385 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); 386 } 387 } 388 } 389 390 void handleReset(ResetFrame frame) throws IOException { 391 Log.logTrace("Handling RST_STREAM on stream {0}", streamid); 392 if (!closed) { 393 close(); 394 int error = frame.getErrorCode(); 395 throw new IOException(ErrorFrame.stringForCode(error)); 396 } else { 397 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 398 } 399 } 400 401 void incoming_priority(PriorityFrame frame) { 402 // TODO: implement priority 403 throw new UnsupportedOperationException("Not implemented"); 404 } 405 406 private void incoming_windowUpdate(WindowUpdateFrame frame) 407 throws IOException 408 { 409 int amount = frame.getUpdate(); 410 if (amount <= 0) { 411 Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n", 412 streamid, streamid, amount); 413 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 414 } else { 415 assert streamid != 0; 416 boolean success = windowController.increaseStreamWindow(amount, streamid); 417 if (!success) { // overflow 418 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 419 } 420 } 421 } 422 423 void incoming_pushPromise(HttpRequestImpl pushReq, 424 PushedStream<?,T> pushStream) 425 throws IOException 426 { 427 if (Log.requests()) { 428 Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); 429 } 430 PushGroup<?,T> pushGroup = exchange.getPushGroup(); 431 if (pushGroup == null || pushGroup.noMorePushes()) { 432 cancelImpl(new IllegalStateException("unexpected push promise" 433 + " on stream " + streamid)); 434 } 435 436 HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor(); 437 438 CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF(); 439 440 Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest( 441 pushReq); 442 443 if (!bpOpt.isPresent()) { 444 IOException ex = new IOException("Stream " 445 + streamid + " cancelled by user"); 446 if (Log.trace()) { 447 Log.logTrace("No body processor for {0}: {1}", pushReq, 448 ex.getMessage()); 449 } 450 pushStream.cancelImpl(ex); 451 cf.completeExceptionally(ex); 452 return; 453 } 454 455 pushGroup.addPush(); 456 pushStream.requestSent(); 457 pushStream.setPushHandler(bpOpt.get()); 458 // setup housekeeping for when the push is received 459 // TODO: deal with ignoring of CF anti-pattern 460 cf.whenComplete((HttpResponse<T> resp, Throwable t) -> { 461 if (Log.trace()) { 462 Log.logTrace("Push completed on stream {0} for {1}{2}", 463 pushStream.streamid, resp, 464 ((t==null) ? "": " with exception " + t)); 465 } 466 if (t != null) { 467 pushGroup.pushError(t); 468 proc.onError(pushReq, t); 469 } else { 470 proc.onResponse(resp); 471 } 472 pushGroup.pushCompleted(); 473 }); 474 475 } 476 477 private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) { 478 HttpHeadersImpl h = request.getSystemHeaders(); 479 if (contentLength > 0) { 480 h.setHeader("content-length", Long.toString(contentLength)); 481 } 482 setPseudoHeaderFields(); 483 OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(h, request.getUserHeaders(), this); 484 if (contentLength == 0) { 485 f.setFlag(HeadersFrame.END_STREAM); 486 endStreamSent = true; 487 } 488 return f; 489 } 490 491 private void setPseudoHeaderFields() { 492 HttpHeadersImpl hdrs = requestPseudoHeaders; 493 String method = request.method(); 494 hdrs.setHeader(":method", method); 495 URI uri = request.uri(); 496 hdrs.setHeader(":scheme", uri.getScheme()); 497 // TODO: userinfo deprecated. Needs to be removed 498 hdrs.setHeader(":authority", uri.getAuthority()); 499 // TODO: ensure header names beginning with : not in user headers 500 String query = uri.getQuery(); 501 String path = uri.getPath(); 502 if (path == null || path.isEmpty()) { 503 if (method.equalsIgnoreCase("OPTIONS")) { 504 path = "*"; 505 } else { 506 path = "/"; 507 } 508 } 509 if (query != null) { 510 path += "?" + query; 511 } 512 hdrs.setHeader(":path", path); 513 } 514 515 HttpHeadersImpl getRequestPseudoHeaders() { 516 return requestPseudoHeaders; 517 } 518 519 @Override 520 Response getResponse() throws IOException { 521 try { 522 if (request.duration() != null) { 523 Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)", 524 streamid, 525 request.duration().toMillis()); 526 return getResponseAsync(null).get( 527 request.duration().toMillis(), TimeUnit.MILLISECONDS); 528 } else { 529 Log.logTrace("Waiting for response (streamid={0})", streamid); 530 return getResponseAsync(null).join(); 531 } 532 } catch (TimeoutException e) { 533 Log.logTrace("Response timeout (streamid={0})", streamid); 534 throw new HttpTimeoutException("Response timed out"); 535 } catch (InterruptedException | ExecutionException | CompletionException e) { 536 Throwable t = e.getCause(); 537 Log.logTrace("Response failed (streamid={0}): {1}", streamid, t); 538 if (t instanceof IOException) { 539 throw (IOException)t; 540 } 541 throw new IOException(e); 542 } finally { 543 Log.logTrace("Got response or failed (streamid={0})", streamid); 544 } 545 } 546 547 /** Sets endStreamReceived. Should be called only once. */ 548 void setEndStreamReceived() { 549 assert remotelyClosed == false: "Unexpected endStream already set"; 550 remotelyClosed = true; 551 responseReceived(); 552 } 553 554 /** Tells whether, or not, the END_STREAM Flag has been seen in any frame 555 * received on this stream. */ 556 private boolean endStreamReceived() { 557 return remotelyClosed; 558 } 559 560 @Override 561 void sendHeadersOnly() throws IOException, InterruptedException { 562 if (Log.requests() && request != null) { 563 Log.logRequest(request.toString()); 564 } 565 requestContentLen = requestProcessor.contentLength(); 566 OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen); 567 connection.sendFrame(f); 568 } 569 570 void registerStream(int id) { 571 this.streamid = id; 572 connection.putStream(this, streamid); 573 } 574 575 class RequestSubscriber implements Flow.Subscriber<ByteBuffer> { 576 // can be < 0 if the actual length is not known. 577 private volatile long remainingContentLength; 578 private volatile Subscription subscription; 579 580 RequestSubscriber(long contentLen) { 581 this.remainingContentLength = contentLen; 582 } 583 584 @Override 585 public void onSubscribe(Flow.Subscription subscription) { 586 if (this.subscription != null) { 587 throw new IllegalStateException(); 588 } 589 this.subscription = subscription; 590 subscription.request(1); 591 } 592 593 @Override 594 public void onNext(ByteBuffer item) { 595 if (requestBodyCF.isDone()) { 596 throw new IllegalStateException(); 597 } 598 599 try { 600 while (item.hasRemaining()) { 601 assert !endStreamSent : "internal error, send data after END_STREAM flag"; 602 DataFrame df = getDataFrame(item); 603 if (remainingContentLength > 0) { 604 remainingContentLength -= df.getDataLength(); 605 assert remainingContentLength >= 0; 606 if (remainingContentLength == 0) { 607 df.setFlag(DataFrame.END_STREAM); 608 endStreamSent = true; 609 } 610 } 611 connection.sendDataFrame(df); 612 } 613 subscription.request(1); 614 } catch (InterruptedException ex) { 615 subscription.cancel(); 616 requestBodyCF.completeExceptionally(ex); 617 } 618 } 619 620 @Override 621 public void onError(Throwable throwable) { 622 if (requestBodyCF.isDone()) { 623 return; 624 } 625 subscription.cancel(); 626 requestBodyCF.completeExceptionally(throwable); 627 } 628 629 @Override 630 public void onComplete() { 631 assert endStreamSent || remainingContentLength < 0; 632 try { 633 if (!endStreamSent) { 634 endStreamSent = true; 635 connection.sendDataFrame(getEmptyEndStreamDataFrame()); 636 } 637 requestBodyCF.complete(null); 638 } catch (InterruptedException ex) { 639 requestBodyCF.completeExceptionally(ex); 640 } 641 } 642 } 643 644 DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException { 645 int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); 646 // blocks waiting for stream send window, if exhausted 647 int actualAmount = windowController.tryAcquire(requestAmount, streamid); 648 ByteBuffer outBuf = Utils.slice(buffer, actualAmount); 649 DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf)); 650 return df; 651 } 652 653 private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException { 654 return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]); 655 } 656 657 /** 658 * A List of responses relating to this stream. Normally there is only 659 * one response, but intermediate responses like 100 are allowed 660 * and must be passed up to higher level before continuing. Deals with races 661 * such as if responses are returned before the CFs get created by 662 * getResponseAsync() 663 */ 664 665 final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5); 666 667 @Override 668 CompletableFuture<Response> getResponseAsync(Executor executor) { 669 CompletableFuture<Response> cf = null; 670 // The code below deals with race condition that can be caused when 671 // completeResponse() is being called before getResponseAsync() 672 synchronized (response_cfs) { 673 if (!response_cfs.isEmpty()) { 674 // This CompletableFuture was created by completeResponse(). 675 // it will be already completed. 676 cf = response_cfs.remove(0); 677 // if we find a cf here it should be already completed. 678 // finding a non completed cf should not happen. just assert it. 679 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!"; 680 } else { 681 // getResponseAsync() is called first. Create a CompletableFuture 682 // that will be completed by completeResponse() when 683 // completeResponse() is called. 684 cf = new MinimalFuture<>(); 685 response_cfs.add(cf); 686 } 687 } 688 if (executor != null && !cf.isDone()) { 689 // protect from executing later chain of CompletableFuture operations from SelectorManager thread 690 cf = cf.thenApplyAsync(r -> r, executor); 691 } 692 Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf); 693 PushGroup<?,?> pg = exchange.getPushGroup(); 694 if (pg != null) { 695 // if an error occurs make sure it is recorded in the PushGroup 696 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 697 } 698 return cf; 699 } 700 701 /** 702 * Completes the first uncompleted CF on list, and removes it. If there is no 703 * uncompleted CF then creates one (completes it) and adds to list 704 */ 705 void completeResponse(Response resp) { 706 synchronized (response_cfs) { 707 CompletableFuture<Response> cf; 708 int cfs_len = response_cfs.size(); 709 for (int i=0; i<cfs_len; i++) { 710 cf = response_cfs.get(i); 711 if (!cf.isDone()) { 712 Log.logTrace("Completing response (streamid={0}): {1}", 713 streamid, cf); 714 cf.complete(resp); 715 response_cfs.remove(cf); 716 return; 717 } // else we found the previous response: just leave it alone. 718 } 719 cf = MinimalFuture.completedFuture(resp); 720 Log.logTrace("Created completed future (streamid={0}): {1}", 721 streamid, cf); 722 response_cfs.add(cf); 723 } 724 } 725 726 // methods to update state and remove stream when finished 727 728 synchronized void requestSent() { 729 requestSent = true; 730 if (responseReceived) { 731 close(); 732 } 733 } 734 735 final synchronized boolean isResponseReceived() { 736 return responseReceived; 737 } 738 739 synchronized void responseReceived() { 740 responseReceived = true; 741 if (requestSent) { 742 close(); 743 } 744 } 745 746 /** 747 * same as above but for errors 748 */ 749 void completeResponseExceptionally(Throwable t) { 750 synchronized (response_cfs) { 751 // use index to avoid ConcurrentModificationException 752 // caused by removing the CF from within the loop. 753 for (int i = 0; i < response_cfs.size(); i++) { 754 CompletableFuture<Response> cf = response_cfs.get(i); 755 if (!cf.isDone()) { 756 cf.completeExceptionally(t); 757 response_cfs.remove(i); 758 return; 759 } 760 } 761 response_cfs.add(MinimalFuture.failedFuture(t)); 762 } 763 } 764 765 CompletableFuture<Void> sendBodyImpl() { 766 RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); 767 requestProcessor.subscribe(subscriber); 768 requestBodyCF.whenComplete((v,t) -> requestSent()); 769 return requestBodyCF; 770 } 771 772 @Override 773 void cancel() { 774 cancel(new IOException("Stream " + streamid + " cancelled")); 775 } 776 777 @Override 778 void cancel(IOException cause) { 779 cancelImpl(cause); 780 } 781 782 // This method sends a RST_STREAM frame 783 void cancelImpl(Throwable e) { 784 if (Log.trace()) { 785 Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); 786 } 787 boolean closing; 788 if (closing = !closed) { // assigning closing to !closed 789 synchronized (this) { 790 if (closing = !closed) { // assigning closing to !closed 791 closed=true; 792 } 793 } 794 } 795 if (closing) { // true if the stream has not been closed yet 796 inputQ.close(); 797 } 798 completeResponseExceptionally(e); 799 try { 800 // will send a RST_STREAM frame 801 if (streamid != 0) { 802 connection.resetStream(streamid, ResetFrame.CANCEL); 803 } 804 } catch (IOException ex) { 805 Log.logError(ex); 806 } 807 } 808 809 // This method doesn't send any frame 810 void close() { 811 if (closed) return; 812 synchronized(this) { 813 if (closed) return; 814 closed = true; 815 } 816 Log.logTrace("Closing stream {0}", streamid); 817 inputQ.close(); 818 connection.closeStream(streamid); 819 Log.logTrace("Stream {0} closed", streamid); 820 } 821 822 static class PushedStream<U,T> extends Stream<T> { 823 final PushGroup<U,T> pushGroup; 824 private final Stream<T> parent; // used by server push streams 825 // push streams need the response CF allocated up front as it is 826 // given directly to user via the multi handler callback function. 827 final CompletableFuture<Response> pushCF; 828 final CompletableFuture<HttpResponse<T>> responseCF; 829 final HttpRequestImpl pushReq; 830 HttpResponse.BodyHandler<T> pushHandler; 831 832 PushedStream(PushGroup<U,T> pushGroup, HttpClientImpl client, 833 Http2Connection connection, Stream<T> parent, 834 Exchange<T> pushReq) { 835 // ## no request body possible, null window controller 836 super(client, connection, pushReq, null); 837 this.pushGroup = pushGroup; 838 this.pushReq = pushReq.request(); 839 this.pushCF = new MinimalFuture<>(); 840 this.responseCF = new MinimalFuture<>(); 841 this.parent = parent; 842 } 843 844 CompletableFuture<HttpResponse<T>> responseCF() { 845 return responseCF; 846 } 847 848 synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) { 849 this.pushHandler = pushHandler; 850 } 851 852 synchronized HttpResponse.BodyHandler<T> getPushHandler() { 853 // ignored parameters to function can be used as BodyHandler 854 return this.pushHandler; 855 } 856 857 // Following methods call the super class but in case of 858 // error record it in the PushGroup. The error method is called 859 // with a null value when no error occurred (is a no-op) 860 @Override 861 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 862 return super.sendBodyAsync() 863 .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t)); 864 } 865 866 @Override 867 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 868 return super.sendHeadersAsync() 869 .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t)); 870 } 871 872 @Override 873 CompletableFuture<Response> getResponseAsync(Executor executor) { 874 CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); 875 if(executor!=null && !cf.isDone()) { 876 cf = cf.thenApplyAsync( r -> r, executor); 877 } 878 return cf; 879 } 880 881 @Override 882 CompletableFuture<T> readBodyAsync( 883 HttpResponse.BodyHandler<T> handler, 884 boolean returnConnectionToPool, 885 Executor executor) 886 { 887 return super.readBodyAsync(handler, returnConnectionToPool, executor) 888 .whenComplete((v, t) -> pushGroup.pushError(t)); 889 } 890 891 @Override 892 void completeResponse(Response r) { 893 HttpResponseImpl.logResponse(r); 894 pushCF.complete(r); // not strictly required for push API 895 // start reading the body using the obtained BodyProcessor 896 CompletableFuture<Void> start = new MinimalFuture<>(); 897 start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor())) 898 .whenComplete((T body, Throwable t) -> { 899 if (t != null) { 900 responseCF.completeExceptionally(t); 901 } else { 902 HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange()); 903 responseCF.complete(response); 904 } 905 }); 906 start.completeAsync(() -> null, getExchange().executor()); 907 } 908 909 @Override 910 void completeResponseExceptionally(Throwable t) { 911 pushCF.completeExceptionally(t); 912 } 913 914 @Override 915 synchronized void responseReceived() { 916 super.responseReceived(); 917 } 918 919 // create and return the PushResponseImpl 920 @Override 921 protected void handleResponse() { 922 HttpConnection c = connection.connection; // TODO: improve 923 responseCode = (int)responseHeaders 924 .firstValueAsLong(":status") 925 .orElse(-1); 926 927 if (responseCode == -1) { 928 completeResponseExceptionally(new IOException("No status code")); 929 } 930 931 this.response = new Response( 932 pushReq, exchange, responseHeaders, 933 responseCode, HttpClient.Version.HTTP_2); 934 935 this.responseContentLen = responseHeaders 936 .firstValueAsLong("content-length") 937 .orElse(-1L); 938 939 if (Log.headers()) { 940 StringBuilder sb = new StringBuilder("RESPONSE HEADERS"); 941 sb.append(" (streamid=").append(streamid).append("): "); 942 Log.dumpHeaders(sb, " ", responseHeaders); 943 Log.logHeaders(sb.toString()); 944 } 945 946 // different implementations for normal streams and pushed streams 947 completeResponse(response); 948 } 949 } 950 951 final class StreamWindowUpdateSender extends WindowUpdateSender { 952 953 StreamWindowUpdateSender(Http2Connection connection) { 954 super(connection); 955 } 956 957 @Override 958 int getStreamId() { 959 return streamid; 960 } 961 } 962 963 }