9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.net.URI; 30 import java.nio.ByteBuffer; 31 import java.util.ArrayList; 32 import java.util.List; 33 import java.util.Optional; 34 import java.util.concurrent.CompletableFuture; 35 import java.util.concurrent.CompletionException; 36 import java.util.concurrent.ExecutionException; 37 import java.util.concurrent.Executor; 38 import java.util.concurrent.Flow; 39 import java.util.concurrent.Flow.Subscription; 40 import java.util.concurrent.TimeUnit; 41 import java.util.concurrent.TimeoutException; 42 import java.util.function.Consumer; 43 44 import jdk.incubator.http.internal.common.*; 45 import jdk.incubator.http.internal.frame.*; 46 import jdk.incubator.http.internal.hpack.DecodingCallback; 47 48 /** 49 * Http/2 Stream handling. 50 * 51 * REQUESTS 52 * 53 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q 54 * 55 * sendRequest() -- sendHeadersOnly() + sendBody() 56 * 57 * sendBody() -- in calling thread: obeys all flow control (so may block) 58 * obtains data from request body processor and places on connection 59 * outbound Q. 60 * 61 * sendBodyAsync() -- calls sendBody() in an executor thread. 62 * 63 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block 64 * 65 * sendRequestAsync() -- calls sendRequest() in an executor thread 66 * 67 * RESPONSES 68 * 69 * Multiple responses can be received per request. Responses are queued up on 70 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed 71 * with the next response 72 * 73 * getResponseAsync() -- queries list of response CFs and returns first one 74 * if one exists. Otherwise, creates one and adds it to list 75 * and returns it. Completion is achieved through the 76 * incoming() upcall from connection reader thread. 77 * 78 * getResponse() -- calls getResponseAsync() and waits for CF to complete 79 * 80 * responseBody() -- in calling thread: blocks for incoming DATA frames on 81 * stream inputQ. Obeys remote and local flow control so may block. 82 * Calls user response body processor with data buffers. 83 * 84 * responseBodyAsync() -- calls responseBody() in an executor thread. 85 * 86 * incoming() -- entry point called from connection reader thread. Frames are 87 * either handled immediately without blocking or for data frames 88 * placed on the stream's inputQ which is consumed by the stream's 89 * reader thread. 90 * 91 * PushedStream sub class 92 * ====================== 93 * Sending side methods are not used because the request comes from a PUSH_PROMISE 94 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream 95 * is created. PushedStream does not use responseCF list as there can be only 96 * one response. The CF is created when the object created and when the response 97 * HEADERS frame is received the object is completed. 98 */ 99 class Stream<T> extends ExchangeImpl<T> { 100 101 final AsyncDataReadQueue inputQ = new AsyncDataReadQueue(); 102 103 /** 104 * This stream's identifier. Assigned lazily by the HTTP2Connection before 105 * the stream's first frame is sent. 106 */ 107 protected volatile int streamid; 108 109 long responseContentLen = -1; 110 long responseBytesProcessed = 0; 111 long requestContentLen; 112 113 final Http2Connection connection; 114 HttpClientImpl client; 115 final HttpRequestImpl request; 116 final DecodingCallback rspHeadersConsumer; 117 HttpHeadersImpl responseHeaders; 118 final HttpHeadersImpl requestHeaders; 119 final HttpHeadersImpl requestPseudoHeaders; 120 HttpResponse.BodyProcessor<T> responseProcessor; 121 final HttpRequest.BodyProcessor requestProcessor; 122 volatile int responseCode; 123 volatile Response response; 124 volatile CompletableFuture<Response> responseCF; 125 final AbstractPushPublisher<ByteBuffer> publisher; 126 final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>(); 127 128 /** True if END_STREAM has been seen in a frame received on this stream. */ 129 private volatile boolean remotelyClosed; 130 private volatile boolean closed; 131 private volatile boolean endStreamSent; 132 133 // state flags 134 boolean requestSent, responseReceived, responseHeadersReceived; 135 136 /** 137 * A reference to this Stream's connection Send Window controller. The 138 * stream MUST acquire the appropriate amount of Send Window before 139 * sending any data. Will be null for PushStreams, as they cannot send data. 140 */ 141 private final WindowController windowController; 142 private final WindowUpdateSender windowUpdater; 143 144 @Override 145 HttpConnection connection() { 146 return connection.connection; 147 } 148 149 @Override 150 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, 151 boolean returnConnectionToPool, 152 Executor executor) 153 { 154 Log.logTrace("Reading body on stream {0}", streamid); 155 responseProcessor = handler.apply(responseCode, responseHeaders); 156 publisher.subscribe(responseProcessor); 157 CompletableFuture<T> cf = receiveData(executor); 158 159 PushGroup<?,?> pg = exchange.getPushGroup(); 160 if (pg != null) { 161 // if an error occurs make sure it is recorded in the PushGroup 162 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 163 } 164 return cf; 165 } 166 167 @Override 168 T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool) 169 throws IOException 170 { 171 CompletableFuture<T> cf = readBodyAsync(handler, 172 returnConnectionToPool, 173 null); 174 try { 175 return cf.join(); 176 } catch (CompletionException e) { 177 throw Utils.getIOException(e); 178 } 179 } 180 181 @Override 182 public String toString() { 183 StringBuilder sb = new StringBuilder(); 184 sb.append("streamid: ") 185 .append(streamid); 186 return sb.toString(); 187 } 188 189 private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException { 190 if (frame instanceof ResetFrame) { 191 handleReset((ResetFrame) frame); 192 return true; 193 } else if (!(frame instanceof DataFrame)) { 194 assert false; 195 return true; 196 } 197 DataFrame df = (DataFrame) frame; 198 // RFC 7540 6.1: 199 // The entire DATA frame payload is included in flow control, 200 // including the Pad Length and Padding fields if present 201 int len = df.payloadLength(); 202 ByteBufferReference[] buffers = df.getData(); 203 for (ByteBufferReference b : buffers) { 204 ByteBuffer buf = b.get(); 205 if (buf.hasRemaining()) { 206 publisher.acceptData(Optional.of(buf)); 207 } 208 } 209 connection.windowUpdater.update(len); 210 if (df.getFlag(DataFrame.END_STREAM)) { 211 setEndStreamReceived(); 212 publisher.acceptData(Optional.empty()); 213 return false; 214 } 215 // Don't send window update on a stream which is 216 // closed or half closed. 217 windowUpdater.update(len); 218 return true; 219 } 220 221 // pushes entire response body into response processor 222 // blocking when required by local or remote flow control 223 CompletableFuture<T> receiveData(Executor executor) { 224 CompletableFuture<T> cf = responseProcessor 225 .getBody() 226 .toCompletableFuture(); 227 Consumer<Throwable> onError = e -> { 228 Log.logTrace("receiveData: {0}", e.toString()); 229 e.printStackTrace(); 230 cf.completeExceptionally(e); 231 publisher.acceptError(e); 232 }; 233 if (executor == null) { 234 inputQ.blockingReceive(this::receiveDataFrame, onError); 235 } else { 236 inputQ.asyncReceive(executor, this::receiveDataFrame, onError); 237 } 238 return cf; 239 } 240 241 @Override 242 void sendBody() throws IOException { 243 try { 244 sendBodyImpl().join(); 245 } catch (CompletionException e) { 246 throw Utils.getIOException(e); 247 } 248 } 249 250 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 251 return sendBodyImpl().thenApply( v -> this); 252 } 253 254 @SuppressWarnings("unchecked") 255 Stream(HttpClientImpl client, 256 Http2Connection connection, 257 Exchange<T> e, 258 WindowController windowController) 259 { 260 super(e); 261 this.client = client; 262 this.connection = connection; 263 this.windowController = windowController; 264 this.request = e.request(); 265 this.requestProcessor = request.requestProcessor; 266 responseHeaders = new HttpHeadersImpl(); 267 requestHeaders = new HttpHeadersImpl(); 268 rspHeadersConsumer = (name, value) -> { 269 responseHeaders.addHeader(name.toString(), value.toString()); 270 if (Log.headers() && Log.trace()) { 271 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}", 272 streamid, name, value); 273 } 274 }; 275 this.requestPseudoHeaders = new HttpHeadersImpl(); 276 // NEW 277 this.publisher = new BlockingPushPublisher<>(); 278 this.windowUpdater = new StreamWindowUpdateSender(connection); 279 } 280 281 /** 282 * Entry point from Http2Connection reader thread. 283 * 284 * Data frames will be removed by response body thread. 285 */ 286 void incoming(Http2Frame frame) throws IOException { 287 if ((frame instanceof HeaderFrame)) { 288 HeaderFrame hframe = (HeaderFrame)frame; 289 if (hframe.endHeaders()) { 290 Log.logTrace("handling response (streamid={0})", streamid); 291 handleResponse(); 292 if (hframe.getFlag(HeaderFrame.END_STREAM)) { 293 inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0])); 294 } 295 } 296 } else if (frame instanceof DataFrame) { 297 inputQ.put(frame); 298 } else { 299 otherFrame(frame); 300 } 301 } 302 303 void otherFrame(Http2Frame frame) throws IOException { 304 switch (frame.type()) { 305 case WindowUpdateFrame.TYPE: 306 incoming_windowUpdate((WindowUpdateFrame) frame); 307 break; 308 case ResetFrame.TYPE: 309 incoming_reset((ResetFrame) frame); 310 break; 311 case PriorityFrame.TYPE: 312 incoming_priority((PriorityFrame) frame); 313 break; 314 default: 315 String msg = "Unexpected frame: " + frame.toString(); 316 throw new IOException(msg); 317 } 318 } 319 320 // The Hpack decoder decodes into one of these consumers of name,value pairs 321 322 DecodingCallback rspHeadersConsumer() { 323 return rspHeadersConsumer; 324 } 325 326 protected void handleResponse() throws IOException { 327 synchronized(this) { 328 responseHeadersReceived = true; 329 } 330 HttpConnection c = connection.connection; // TODO: improve 331 responseCode = (int)responseHeaders 332 .firstValueAsLong(":status") 333 .orElseThrow(() -> new IOException("no statuscode in response")); 334 335 response = new Response( 336 request, exchange, responseHeaders, 337 responseCode, HttpClient.Version.HTTP_2); 338 339 this.responseContentLen = responseHeaders 340 .firstValueAsLong("content-length") 341 .orElse(-1L); 342 343 if (Log.headers()) { 344 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 345 Log.dumpHeaders(sb, " ", responseHeaders); 346 Log.logHeaders(sb.toString()); 347 } 348 349 completeResponse(response); 350 } 351 352 void incoming_reset(ResetFrame frame) throws IOException { 353 Log.logTrace("Received RST_STREAM on stream {0}", streamid); 354 if (endStreamReceived()) { 355 Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); 356 } else if (closed) { 357 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 358 } else { 359 boolean pushedToQueue = false; 360 synchronized(this) { 361 // if the response headers are not yet 362 // received, or the inputQueue is closed, handle reset directly. 363 // Otherwise, put it in the input queue in order to read all 364 // pending data frames first. Indeed, a server may send 365 // RST_STREAM after sending END_STREAM, in which case we should 366 // ignore it. However, we won't know if we have received END_STREAM 367 // or not until all pending data frames are read. 368 // Because the inputQ will not be read until the response 369 // headers are received, and because response headers won't be 370 // sent if the server sent RST_STREAM, then we must handle 371 // reset here directly unless responseHeadersReceived is true. 372 pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame); 373 } 374 if (!pushedToQueue) { 375 // RST_STREAM was not pushed to the queue: handle it. 376 try { 377 handleReset(frame); 378 } catch (IOException io) { 379 completeResponseExceptionally(io); 380 } 381 } else { 382 // RST_STREAM was pushed to the queue. It will be handled by 383 // asyncReceive after all pending data frames have been 384 // processed. 385 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); 386 } 387 } 388 } 389 390 void handleReset(ResetFrame frame) throws IOException { 391 Log.logTrace("Handling RST_STREAM on stream {0}", streamid); 392 if (!closed) { 393 close(); 394 int error = frame.getErrorCode(); 395 throw new IOException(ErrorFrame.stringForCode(error)); 396 } else { 397 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 398 } 399 } 400 401 void incoming_priority(PriorityFrame frame) { 402 // TODO: implement priority 403 throw new UnsupportedOperationException("Not implemented"); 404 } 405 406 private void incoming_windowUpdate(WindowUpdateFrame frame) 407 throws IOException 408 { 409 int amount = frame.getUpdate(); 410 if (amount <= 0) { 411 Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n", 412 streamid, streamid, amount); 413 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 414 } else { 415 assert streamid != 0; 416 boolean success = windowController.increaseStreamWindow(amount, streamid); 417 if (!success) { // overflow 418 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 419 } 420 } 421 } 422 423 void incoming_pushPromise(HttpRequestImpl pushReq, 424 PushedStream<?,T> pushStream) 425 throws IOException 426 { 427 if (Log.requests()) { 428 Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); 429 } 430 PushGroup<?,T> pushGroup = exchange.getPushGroup(); 431 if (pushGroup == null || pushGroup.noMorePushes()) { 432 cancelImpl(new IllegalStateException("unexpected push promise" 433 + " on stream " + streamid)); 434 } 435 436 HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor(); 437 438 CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF(); 439 440 Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest( 441 pushReq); 442 443 if (!bpOpt.isPresent()) { 444 IOException ex = new IOException("Stream " 445 + streamid + " cancelled by user"); 446 if (Log.trace()) { 447 Log.logTrace("No body processor for {0}: {1}", pushReq, 448 ex.getMessage()); 449 } 450 pushStream.cancelImpl(ex); 451 cf.completeExceptionally(ex); 452 return; 453 } 454 455 pushGroup.addPush(); 456 pushStream.requestSent(); 457 pushStream.setPushHandler(bpOpt.get()); 458 // setup housekeeping for when the push is received 459 // TODO: deal with ignoring of CF anti-pattern 460 cf.whenComplete((HttpResponse<T> resp, Throwable t) -> { 461 if (Log.trace()) { 462 Log.logTrace("Push completed on stream {0} for {1}{2}", 463 pushStream.streamid, resp, 464 ((t==null) ? "": " with exception " + t)); 465 } 466 if (t != null) { 467 pushGroup.pushError(t); 468 proc.onError(pushReq, t); 469 } else { 470 proc.onResponse(resp); 471 } 472 pushGroup.pushCompleted(); 473 }); 474 475 } 476 477 private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) { 478 HttpHeadersImpl h = request.getSystemHeaders(); 479 if (contentLength > 0) { 480 h.setHeader("content-length", Long.toString(contentLength)); 499 // TODO: ensure header names beginning with : not in user headers 500 String query = uri.getQuery(); 501 String path = uri.getPath(); 502 if (path == null || path.isEmpty()) { 503 if (method.equalsIgnoreCase("OPTIONS")) { 504 path = "*"; 505 } else { 506 path = "/"; 507 } 508 } 509 if (query != null) { 510 path += "?" + query; 511 } 512 hdrs.setHeader(":path", path); 513 } 514 515 HttpHeadersImpl getRequestPseudoHeaders() { 516 return requestPseudoHeaders; 517 } 518 519 @Override 520 Response getResponse() throws IOException { 521 try { 522 if (request.duration() != null) { 523 Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)", 524 streamid, 525 request.duration().toMillis()); 526 return getResponseAsync(null).get( 527 request.duration().toMillis(), TimeUnit.MILLISECONDS); 528 } else { 529 Log.logTrace("Waiting for response (streamid={0})", streamid); 530 return getResponseAsync(null).join(); 531 } 532 } catch (TimeoutException e) { 533 Log.logTrace("Response timeout (streamid={0})", streamid); 534 throw new HttpTimeoutException("Response timed out"); 535 } catch (InterruptedException | ExecutionException | CompletionException e) { 536 Throwable t = e.getCause(); 537 Log.logTrace("Response failed (streamid={0}): {1}", streamid, t); 538 if (t instanceof IOException) { 539 throw (IOException)t; 540 } 541 throw new IOException(e); 542 } finally { 543 Log.logTrace("Got response or failed (streamid={0})", streamid); 544 } 545 } 546 547 /** Sets endStreamReceived. Should be called only once. */ 548 void setEndStreamReceived() { 549 assert remotelyClosed == false: "Unexpected endStream already set"; 550 remotelyClosed = true; 551 responseReceived(); 552 } 553 554 /** Tells whether, or not, the END_STREAM Flag has been seen in any frame 555 * received on this stream. */ 556 private boolean endStreamReceived() { 557 return remotelyClosed; 558 } 559 560 @Override 561 void sendHeadersOnly() throws IOException, InterruptedException { 562 if (Log.requests() && request != null) { 563 Log.logRequest(request.toString()); 564 } 565 requestContentLen = requestProcessor.contentLength(); 566 OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen); 567 connection.sendFrame(f); 568 } 569 570 void registerStream(int id) { 571 this.streamid = id; 572 connection.putStream(this, streamid); 573 } 574 575 class RequestSubscriber implements Flow.Subscriber<ByteBuffer> { 576 // can be < 0 if the actual length is not known. 577 private volatile long remainingContentLength; 578 private volatile Subscription subscription; 579 580 RequestSubscriber(long contentLen) { 581 this.remainingContentLength = contentLen; 582 } 583 584 @Override 585 public void onSubscribe(Flow.Subscription subscription) { 586 if (this.subscription != null) { 587 throw new IllegalStateException(); 588 } 589 this.subscription = subscription; 590 subscription.request(1); 591 } 592 593 @Override 594 public void onNext(ByteBuffer item) { 595 if (requestBodyCF.isDone()) { 596 throw new IllegalStateException(); 597 } 598 599 try { 600 while (item.hasRemaining()) { 601 assert !endStreamSent : "internal error, send data after END_STREAM flag"; 602 DataFrame df = getDataFrame(item); 603 if (remainingContentLength > 0) { 604 remainingContentLength -= df.getDataLength(); 605 assert remainingContentLength >= 0; 606 if (remainingContentLength == 0) { 607 df.setFlag(DataFrame.END_STREAM); 608 endStreamSent = true; 609 } 610 } 611 connection.sendDataFrame(df); 612 } 613 subscription.request(1); 614 } catch (InterruptedException ex) { 615 subscription.cancel(); 616 requestBodyCF.completeExceptionally(ex); 617 } 618 } 619 620 @Override 621 public void onError(Throwable throwable) { 622 if (requestBodyCF.isDone()) { 623 return; 624 } 625 subscription.cancel(); 626 requestBodyCF.completeExceptionally(throwable); 627 } 628 629 @Override 630 public void onComplete() { 631 assert endStreamSent || remainingContentLength < 0; 632 try { 633 if (!endStreamSent) { 634 endStreamSent = true; 635 connection.sendDataFrame(getEmptyEndStreamDataFrame()); 636 } 637 requestBodyCF.complete(null); 638 } catch (InterruptedException ex) { 639 requestBodyCF.completeExceptionally(ex); 640 } 641 } 642 } 643 644 DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException { 645 int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); 646 // blocks waiting for stream send window, if exhausted 647 int actualAmount = windowController.tryAcquire(requestAmount, streamid); 648 ByteBuffer outBuf = Utils.slice(buffer, actualAmount); 649 DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf)); 650 return df; 651 } 652 653 private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException { 654 return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]); 655 } 656 657 /** 658 * A List of responses relating to this stream. Normally there is only 659 * one response, but intermediate responses like 100 are allowed 660 * and must be passed up to higher level before continuing. Deals with races 661 * such as if responses are returned before the CFs get created by 662 * getResponseAsync() 663 */ 664 665 final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5); 666 667 @Override 668 CompletableFuture<Response> getResponseAsync(Executor executor) { 669 CompletableFuture<Response> cf = null; 670 // The code below deals with race condition that can be caused when 671 // completeResponse() is being called before getResponseAsync() 672 synchronized (response_cfs) { 673 if (!response_cfs.isEmpty()) { 674 // This CompletableFuture was created by completeResponse(). 675 // it will be already completed. 676 cf = response_cfs.remove(0); 677 // if we find a cf here it should be already completed. 678 // finding a non completed cf should not happen. just assert it. 679 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!"; 680 } else { 681 // getResponseAsync() is called first. Create a CompletableFuture 682 // that will be completed by completeResponse() when 683 // completeResponse() is called. 684 cf = new MinimalFuture<>(); 685 response_cfs.add(cf); 686 } 687 } 688 if (executor != null && !cf.isDone()) { 689 // protect from executing later chain of CompletableFuture operations from SelectorManager thread 690 cf = cf.thenApplyAsync(r -> r, executor); 691 } 692 Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf); 693 PushGroup<?,?> pg = exchange.getPushGroup(); 694 if (pg != null) { 695 // if an error occurs make sure it is recorded in the PushGroup 696 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 697 } 698 return cf; 699 } 700 701 /** 702 * Completes the first uncompleted CF on list, and removes it. If there is no 703 * uncompleted CF then creates one (completes it) and adds to list 704 */ 705 void completeResponse(Response resp) { 706 synchronized (response_cfs) { 707 CompletableFuture<Response> cf; 708 int cfs_len = response_cfs.size(); 709 for (int i=0; i<cfs_len; i++) { 710 cf = response_cfs.get(i); 711 if (!cf.isDone()) { 712 Log.logTrace("Completing response (streamid={0}): {1}", 713 streamid, cf); 714 cf.complete(resp); 715 response_cfs.remove(cf); 716 return; 717 } // else we found the previous response: just leave it alone. 718 } 719 cf = MinimalFuture.completedFuture(resp); 720 Log.logTrace("Created completed future (streamid={0}): {1}", 721 streamid, cf); 722 response_cfs.add(cf); 723 } 724 } 725 726 // methods to update state and remove stream when finished 727 728 synchronized void requestSent() { 729 requestSent = true; 730 if (responseReceived) { 731 close(); 732 } 733 } 734 735 final synchronized boolean isResponseReceived() { 736 return responseReceived; 737 } 738 739 synchronized void responseReceived() { 740 responseReceived = true; 741 if (requestSent) { 742 close(); 743 } 744 } 745 746 /** 747 * same as above but for errors 748 */ 749 void completeResponseExceptionally(Throwable t) { 750 synchronized (response_cfs) { 751 // use index to avoid ConcurrentModificationException 752 // caused by removing the CF from within the loop. 753 for (int i = 0; i < response_cfs.size(); i++) { 754 CompletableFuture<Response> cf = response_cfs.get(i); 755 if (!cf.isDone()) { 756 cf.completeExceptionally(t); 757 response_cfs.remove(i); 758 return; 759 } 760 } 761 response_cfs.add(MinimalFuture.failedFuture(t)); 762 } 763 } 764 765 CompletableFuture<Void> sendBodyImpl() { 766 RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); 767 requestProcessor.subscribe(subscriber); 768 requestBodyCF.whenComplete((v,t) -> requestSent()); 769 return requestBodyCF; 770 } 771 772 @Override 773 void cancel() { 774 cancel(new IOException("Stream " + streamid + " cancelled")); 775 } 776 777 @Override 778 void cancel(IOException cause) { 779 cancelImpl(cause); 780 } 781 782 // This method sends a RST_STREAM frame 783 void cancelImpl(Throwable e) { 784 if (Log.trace()) { 785 Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); 786 } 787 boolean closing; 788 if (closing = !closed) { // assigning closing to !closed 789 synchronized (this) { 790 if (closing = !closed) { // assigning closing to !closed 791 closed=true; 792 } 793 } 794 } 795 if (closing) { // true if the stream has not been closed yet 796 inputQ.close(); 797 } 798 completeResponseExceptionally(e); 799 try { 800 // will send a RST_STREAM frame 801 if (streamid != 0) { 802 connection.resetStream(streamid, ResetFrame.CANCEL); 803 } 804 } catch (IOException ex) { 805 Log.logError(ex); 806 } 807 } 808 809 // This method doesn't send any frame 810 void close() { 811 if (closed) return; 812 synchronized(this) { 813 if (closed) return; 814 closed = true; 815 } 816 Log.logTrace("Closing stream {0}", streamid); 817 inputQ.close(); 818 connection.closeStream(streamid); 819 Log.logTrace("Stream {0} closed", streamid); 820 } 821 822 static class PushedStream<U,T> extends Stream<T> { 823 final PushGroup<U,T> pushGroup; 824 private final Stream<T> parent; // used by server push streams 825 // push streams need the response CF allocated up front as it is 826 // given directly to user via the multi handler callback function. 827 final CompletableFuture<Response> pushCF; 828 final CompletableFuture<HttpResponse<T>> responseCF; 829 final HttpRequestImpl pushReq; 830 HttpResponse.BodyHandler<T> pushHandler; 831 832 PushedStream(PushGroup<U,T> pushGroup, HttpClientImpl client, 833 Http2Connection connection, Stream<T> parent, 834 Exchange<T> pushReq) { 835 // ## no request body possible, null window controller 836 super(client, connection, pushReq, null); 837 this.pushGroup = pushGroup; 838 this.pushReq = pushReq.request(); 839 this.pushCF = new MinimalFuture<>(); 840 this.responseCF = new MinimalFuture<>(); 841 this.parent = parent; 842 } 843 844 CompletableFuture<HttpResponse<T>> responseCF() { 845 return responseCF; 846 } 847 848 synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) { 849 this.pushHandler = pushHandler; 850 } 851 852 synchronized HttpResponse.BodyHandler<T> getPushHandler() { 853 // ignored parameters to function can be used as BodyHandler 854 return this.pushHandler; 855 } 856 857 // Following methods call the super class but in case of 858 // error record it in the PushGroup. The error method is called 859 // with a null value when no error occurred (is a no-op) 860 @Override 861 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 862 return super.sendBodyAsync() 863 .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t)); 864 } 865 866 @Override 867 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 868 return super.sendHeadersAsync() 869 .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t)); 870 } 871 872 @Override 873 CompletableFuture<Response> getResponseAsync(Executor executor) { 874 CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); 875 if(executor!=null && !cf.isDone()) { 876 cf = cf.thenApplyAsync( r -> r, executor); 877 } 878 return cf; 879 } 880 881 @Override 882 CompletableFuture<T> readBodyAsync( 883 HttpResponse.BodyHandler<T> handler, 884 boolean returnConnectionToPool, 885 Executor executor) 886 { 887 return super.readBodyAsync(handler, returnConnectionToPool, executor) 888 .whenComplete((v, t) -> pushGroup.pushError(t)); 889 } 890 891 @Override 892 void completeResponse(Response r) { 893 HttpResponseImpl.logResponse(r); 894 pushCF.complete(r); // not strictly required for push API 895 // start reading the body using the obtained BodyProcessor 896 CompletableFuture<Void> start = new MinimalFuture<>(); 897 start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor())) 898 .whenComplete((T body, Throwable t) -> { 899 if (t != null) { 900 responseCF.completeExceptionally(t); 901 } else { 902 HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange()); 903 responseCF.complete(response); 904 } 905 }); 906 start.completeAsync(() -> null, getExchange().executor()); 907 } 908 909 @Override 910 void completeResponseExceptionally(Throwable t) { 911 pushCF.completeExceptionally(t); 912 } 913 914 @Override 915 synchronized void responseReceived() { 916 super.responseReceived(); 917 } 918 919 // create and return the PushResponseImpl 920 @Override 921 protected void handleResponse() { 922 HttpConnection c = connection.connection; // TODO: improve 923 responseCode = (int)responseHeaders 924 .firstValueAsLong(":status") 925 .orElse(-1); 926 927 if (responseCode == -1) { 928 completeResponseExceptionally(new IOException("No status code")); 929 } 930 931 this.response = new Response( 932 pushReq, exchange, responseHeaders, 933 responseCode, HttpClient.Version.HTTP_2); 934 935 this.responseContentLen = responseHeaders 936 .firstValueAsLong("content-length") 937 .orElse(-1L); 938 939 if (Log.headers()) { 940 StringBuilder sb = new StringBuilder("RESPONSE HEADERS"); 941 sb.append(" (streamid=").append(streamid).append("): "); 942 Log.dumpHeaders(sb, " ", responseHeaders); 943 Log.logHeaders(sb.toString()); 944 } 945 946 // different implementations for normal streams and pushed streams 947 completeResponse(response); 948 } 949 } 950 951 final class StreamWindowUpdateSender extends WindowUpdateSender { 952 953 StreamWindowUpdateSender(Http2Connection connection) { 954 super(connection); 955 } 956 957 @Override 958 int getStreamId() { 959 return streamid; 960 } 961 } 962 963 } | 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.net.URI; 31 import java.nio.ByteBuffer; 32 import java.util.ArrayList; 33 import java.util.Collections; 34 import java.util.List; 35 import java.util.Optional; 36 import java.util.concurrent.CompletableFuture; 37 import java.util.concurrent.ConcurrentLinkedDeque; 38 import java.util.concurrent.ConcurrentLinkedQueue; 39 import java.util.concurrent.Executor; 40 import java.util.concurrent.Flow; 41 import java.util.concurrent.Flow.Subscription; 42 import java.util.concurrent.atomic.AtomicReference; 43 import jdk.incubator.http.HttpResponse.BodySubscriber; 44 import jdk.incubator.http.internal.common.*; 45 import jdk.incubator.http.internal.frame.*; 46 import jdk.incubator.http.internal.hpack.DecodingCallback; 47 48 /** 49 * Http/2 Stream handling. 50 * 51 * REQUESTS 52 * 53 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q 54 * 55 * sendRequest() -- sendHeadersOnly() + sendBody() 56 * 57 * sendBodyAsync() -- calls sendBody() in an executor thread. 58 * 59 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block 60 * 61 * sendRequestAsync() -- calls sendRequest() in an executor thread 62 * 63 * RESPONSES 64 * 65 * Multiple responses can be received per request. Responses are queued up on 66 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed 67 * with the next response 68 * 69 * getResponseAsync() -- queries list of response CFs and returns first one 70 * if one exists. Otherwise, creates one and adds it to list 71 * and returns it. Completion is achieved through the 72 * incoming() upcall from connection reader thread. 73 * 74 * getResponse() -- calls getResponseAsync() and waits for CF to complete 75 * 76 * responseBodyAsync() -- calls responseBody() in an executor thread. 77 * 78 * incoming() -- entry point called from connection reader thread. Frames are 79 * either handled immediately without blocking or for data frames 80 * placed on the stream's inputQ which is consumed by the stream's 81 * reader thread. 82 * 83 * PushedStream sub class 84 * ====================== 85 * Sending side methods are not used because the request comes from a PUSH_PROMISE 86 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream 87 * is created. PushedStream does not use responseCF list as there can be only 88 * one response. The CF is created when the object created and when the response 89 * HEADERS frame is received the object is completed. 90 */ 91 class Stream<T> extends ExchangeImpl<T> { 92 93 final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag 94 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 95 96 final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>(); 97 final SequentialScheduler sched = 98 SequentialScheduler.synchronizedScheduler(this::schedule); 99 final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel); 100 101 /** 102 * This stream's identifier. Assigned lazily by the HTTP2Connection before 103 * the stream's first frame is sent. 104 */ 105 protected volatile int streamid; 106 107 long requestContentLen; 108 109 final Http2Connection connection; 110 final HttpRequestImpl request; 111 final DecodingCallback rspHeadersConsumer; 112 HttpHeadersImpl responseHeaders; 113 final HttpHeadersImpl requestPseudoHeaders; 114 volatile HttpResponse.BodySubscriber<T> responseSubscriber; 115 final HttpRequest.BodyPublisher requestPublisher; 116 volatile RequestSubscriber requestSubscriber; 117 volatile int responseCode; 118 volatile Response response; 119 volatile Throwable failed; // The exception with which this stream was canceled. 120 final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>(); 121 volatile CompletableFuture<T> responseBodyCF; 122 123 /** True if END_STREAM has been seen in a frame received on this stream. */ 124 private volatile boolean remotelyClosed; 125 private volatile boolean closed; 126 private volatile boolean endStreamSent; 127 128 // state flags 129 private boolean requestSent, responseReceived; 130 131 /** 132 * A reference to this Stream's connection Send Window controller. The 133 * stream MUST acquire the appropriate amount of Send Window before 134 * sending any data. Will be null for PushStreams, as they cannot send data. 135 */ 136 private final WindowController windowController; 137 private final WindowUpdateSender windowUpdater; 138 139 @Override 140 HttpConnection connection() { 141 return connection.connection; 142 } 143 144 /** 145 * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() } 146 * of after user subscription window has re-opened, from SubscriptionBase.request() 147 */ 148 private void schedule() { 149 if (responseSubscriber == null) 150 // can't process anything yet 151 return; 152 153 while (!inputQ.isEmpty()) { 154 Http2Frame frame = inputQ.peek(); 155 if (frame instanceof ResetFrame) { 156 inputQ.remove(); 157 handleReset((ResetFrame)frame); 158 return; 159 } 160 DataFrame df = (DataFrame)frame; 161 boolean finished = df.getFlag(DataFrame.END_STREAM); 162 163 List<ByteBuffer> buffers = df.getData(); 164 List<ByteBuffer> dsts = Collections.unmodifiableList(buffers); 165 int size = Utils.remaining(dsts, Integer.MAX_VALUE); 166 if (size == 0 && finished) { 167 inputQ.remove(); 168 Log.logTrace("responseSubscriber.onComplete"); 169 debug.log(Level.DEBUG, "incoming: onComplete"); 170 sched.stop(); 171 responseSubscriber.onComplete(); 172 setEndStreamReceived(); 173 return; 174 } else if (userSubscription.tryDecrement()) { 175 inputQ.remove(); 176 Log.logTrace("responseSubscriber.onNext {0}", size); 177 debug.log(Level.DEBUG, "incoming: onNext(%d)", size); 178 responseSubscriber.onNext(dsts); 179 if (consumed(df)) { 180 Log.logTrace("responseSubscriber.onComplete"); 181 debug.log(Level.DEBUG, "incoming: onComplete"); 182 sched.stop(); 183 responseSubscriber.onComplete(); 184 setEndStreamReceived(); 185 return; 186 } 187 } else { 188 return; 189 } 190 } 191 Throwable t = failed; 192 if (t != null) { 193 sched.stop(); 194 responseSubscriber.onError(t); 195 close(); 196 } 197 } 198 199 // Callback invoked after the Response BodySubscriber has consumed the 200 // buffers contained in a DataFrame. 201 // Returns true if END_STREAM is reached, false otherwise. 202 private boolean consumed(DataFrame df) { 203 // RFC 7540 6.1: 204 // The entire DATA frame payload is included in flow control, 205 // including the Pad Length and Padding fields if present 206 int len = df.payloadLength(); 207 connection.windowUpdater.update(len); 208 209 if (!df.getFlag(DataFrame.END_STREAM)) { 210 // Don't send window update on a stream which is 211 // closed or half closed. 212 windowUpdater.update(len); 213 return false; // more data coming 214 } 215 return true; // end of stream 216 } 217 218 @Override 219 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, 220 boolean returnConnectionToPool, 221 Executor executor) 222 { 223 Log.logTrace("Reading body on stream {0}", streamid); 224 BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders); 225 CompletableFuture<T> cf = receiveData(bodySubscriber); 226 227 PushGroup<?,?> pg = exchange.getPushGroup(); 228 if (pg != null) { 229 // if an error occurs make sure it is recorded in the PushGroup 230 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 231 } 232 return cf; 233 } 234 235 @Override 236 public String toString() { 237 StringBuilder sb = new StringBuilder(); 238 sb.append("streamid: ") 239 .append(streamid); 240 return sb.toString(); 241 } 242 243 private void receiveDataFrame(DataFrame df) { 244 inputQ.add(df); 245 sched.runOrSchedule(); 246 } 247 248 /** Handles a RESET frame. RESET is always handled inline in the queue. */ 249 private void receiveResetFrame(ResetFrame frame) { 250 inputQ.add(frame); 251 sched.runOrSchedule(); 252 } 253 254 // pushes entire response body into response subscriber 255 // blocking when required by local or remote flow control 256 CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) { 257 responseBodyCF = MinimalFuture.of(bodySubscriber.getBody()); 258 259 if (isCanceled()) { 260 Throwable t = getCancelCause(); 261 responseBodyCF.completeExceptionally(t); 262 } else { 263 bodySubscriber.onSubscribe(userSubscription); 264 } 265 // Set the responseSubscriber field now that onSubscribe has been called. 266 // This effectively allows the scheduler to start invoking the callbacks. 267 responseSubscriber = bodySubscriber; 268 sched.runOrSchedule(); // in case data waiting already to be processed 269 return responseBodyCF; 270 } 271 272 @Override 273 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 274 return sendBodyImpl().thenApply( v -> this); 275 } 276 277 @SuppressWarnings("unchecked") 278 Stream(Http2Connection connection, 279 Exchange<T> e, 280 WindowController windowController) 281 { 282 super(e); 283 this.connection = connection; 284 this.windowController = windowController; 285 this.request = e.request(); 286 this.requestPublisher = request.requestPublisher; // may be null 287 responseHeaders = new HttpHeadersImpl(); 288 rspHeadersConsumer = (name, value) -> { 289 responseHeaders.addHeader(name.toString(), value.toString()); 290 if (Log.headers() && Log.trace()) { 291 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}", 292 streamid, name, value); 293 } 294 }; 295 this.requestPseudoHeaders = new HttpHeadersImpl(); 296 // NEW 297 this.windowUpdater = new StreamWindowUpdateSender(connection); 298 } 299 300 /** 301 * Entry point from Http2Connection reader thread. 302 * 303 * Data frames will be removed by response body thread. 304 */ 305 void incoming(Http2Frame frame) throws IOException { 306 debug.log(Level.DEBUG, "incoming: %s", frame); 307 if ((frame instanceof HeaderFrame)) { 308 HeaderFrame hframe = (HeaderFrame)frame; 309 if (hframe.endHeaders()) { 310 Log.logTrace("handling response (streamid={0})", streamid); 311 handleResponse(); 312 if (hframe.getFlag(HeaderFrame.END_STREAM)) { 313 receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of())); 314 } 315 } 316 } else if (frame instanceof DataFrame) { 317 receiveDataFrame((DataFrame)frame); 318 } else { 319 otherFrame(frame); 320 } 321 } 322 323 void otherFrame(Http2Frame frame) throws IOException { 324 switch (frame.type()) { 325 case WindowUpdateFrame.TYPE: 326 incoming_windowUpdate((WindowUpdateFrame) frame); 327 break; 328 case ResetFrame.TYPE: 329 incoming_reset((ResetFrame) frame); 330 break; 331 case PriorityFrame.TYPE: 332 incoming_priority((PriorityFrame) frame); 333 break; 334 default: 335 String msg = "Unexpected frame: " + frame.toString(); 336 throw new IOException(msg); 337 } 338 } 339 340 // The Hpack decoder decodes into one of these consumers of name,value pairs 341 342 DecodingCallback rspHeadersConsumer() { 343 return rspHeadersConsumer; 344 } 345 346 protected void handleResponse() throws IOException { 347 responseCode = (int)responseHeaders 348 .firstValueAsLong(":status") 349 .orElseThrow(() -> new IOException("no statuscode in response")); 350 351 response = new Response( 352 request, exchange, responseHeaders, 353 responseCode, HttpClient.Version.HTTP_2); 354 355 /* TODO: review if needs to be removed 356 the value is not used, but in case `content-length` doesn't parse as 357 long, there will be NumberFormatException. If left as is, make sure 358 code up the stack handles NFE correctly. */ 359 responseHeaders.firstValueAsLong("content-length"); 360 361 if (Log.headers()) { 362 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 363 Log.dumpHeaders(sb, " ", responseHeaders); 364 Log.logHeaders(sb.toString()); 365 } 366 367 completeResponse(response); 368 } 369 370 void incoming_reset(ResetFrame frame) { 371 Log.logTrace("Received RST_STREAM on stream {0}", streamid); 372 if (endStreamReceived()) { 373 Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); 374 } else if (closed) { 375 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 376 } else { 377 // put it in the input queue in order to read all 378 // pending data frames first. Indeed, a server may send 379 // RST_STREAM after sending END_STREAM, in which case we should 380 // ignore it. However, we won't know if we have received END_STREAM 381 // or not until all pending data frames are read. 382 receiveResetFrame(frame); 383 // RST_STREAM was pushed to the queue. It will be handled by 384 // asyncReceive after all pending data frames have been 385 // processed. 386 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); 387 } 388 } 389 390 void handleReset(ResetFrame frame) { 391 Log.logTrace("Handling RST_STREAM on stream {0}", streamid); 392 if (!closed) { 393 close(); 394 int error = frame.getErrorCode(); 395 completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error))); 396 } else { 397 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 398 } 399 } 400 401 void incoming_priority(PriorityFrame frame) { 402 // TODO: implement priority 403 throw new UnsupportedOperationException("Not implemented"); 404 } 405 406 private void incoming_windowUpdate(WindowUpdateFrame frame) 407 throws IOException 408 { 409 int amount = frame.getUpdate(); 410 if (amount <= 0) { 411 Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n", 412 streamid, streamid, amount); 413 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 414 } else { 415 assert streamid != 0; 416 boolean success = windowController.increaseStreamWindow(amount, streamid); 417 if (!success) { // overflow 418 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 419 } 420 } 421 } 422 423 void incoming_pushPromise(HttpRequestImpl pushReq, 424 PushedStream<?,T> pushStream) 425 throws IOException 426 { 427 if (Log.requests()) { 428 Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); 429 } 430 PushGroup<?,T> pushGroup = exchange.getPushGroup(); 431 if (pushGroup == null || pushGroup.noMorePushes()) { 432 cancelImpl(new IllegalStateException("unexpected push promise" 433 + " on stream " + streamid)); 434 return; 435 } 436 437 HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber(); 438 439 CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF(); 440 441 Optional<HttpResponse.BodyHandler<T>> bpOpt = 442 pushGroup.handlerForPushRequest(pushReq); 443 444 if (!bpOpt.isPresent()) { 445 IOException ex = new IOException("Stream " 446 + streamid + " cancelled by user"); 447 if (Log.trace()) { 448 Log.logTrace("No body subscriber for {0}: {1}", pushReq, 449 ex.getMessage()); 450 } 451 pushStream.cancelImpl(ex); 452 cf.completeExceptionally(ex); 453 return; 454 } 455 456 pushGroup.addPush(); 457 pushStream.requestSent(); 458 pushStream.setPushHandler(bpOpt.get()); 459 // setup housekeeping for when the push is received 460 // TODO: deal with ignoring of CF anti-pattern 461 cf.whenComplete((HttpResponse<T> resp, Throwable t) -> { 462 t = Utils.getCompletionCause(t); 463 if (Log.trace()) { 464 Log.logTrace("Push completed on stream {0} for {1}{2}", 465 pushStream.streamid, resp, 466 ((t==null) ? "": " with exception " + t)); 467 } 468 if (t != null) { 469 pushGroup.pushError(t); 470 proc.onError(pushReq, t); 471 } else { 472 proc.onResponse(resp); 473 } 474 pushGroup.pushCompleted(); 475 }); 476 477 } 478 479 private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) { 480 HttpHeadersImpl h = request.getSystemHeaders(); 481 if (contentLength > 0) { 482 h.setHeader("content-length", Long.toString(contentLength)); 501 // TODO: ensure header names beginning with : not in user headers 502 String query = uri.getQuery(); 503 String path = uri.getPath(); 504 if (path == null || path.isEmpty()) { 505 if (method.equalsIgnoreCase("OPTIONS")) { 506 path = "*"; 507 } else { 508 path = "/"; 509 } 510 } 511 if (query != null) { 512 path += "?" + query; 513 } 514 hdrs.setHeader(":path", path); 515 } 516 517 HttpHeadersImpl getRequestPseudoHeaders() { 518 return requestPseudoHeaders; 519 } 520 521 /** Sets endStreamReceived. Should be called only once. */ 522 void setEndStreamReceived() { 523 assert remotelyClosed == false: "Unexpected endStream already set"; 524 remotelyClosed = true; 525 responseReceived(); 526 } 527 528 /** Tells whether, or not, the END_STREAM Flag has been seen in any frame 529 * received on this stream. */ 530 private boolean endStreamReceived() { 531 return remotelyClosed; 532 } 533 534 @Override 535 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 536 debug.log(Level.DEBUG, "sendHeadersOnly()"); 537 if (Log.requests() && request != null) { 538 Log.logRequest(request.toString()); 539 } 540 if (requestPublisher != null) { 541 requestContentLen = requestPublisher.contentLength(); 542 } else { 543 requestContentLen = 0; 544 } 545 OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen); 546 connection.sendFrame(f); 547 CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>(); 548 cf.complete(this); // #### good enough for now 549 return cf; 550 } 551 552 @Override 553 void released() { 554 if (streamid > 0) { 555 debug.log(Level.DEBUG, "Released stream %d", streamid); 556 // remove this stream from the Http2Connection map. 557 connection.closeStream(streamid); 558 } else { 559 debug.log(Level.DEBUG, "Can't release stream %d", streamid); 560 } 561 } 562 563 @Override 564 void completed() { 565 // There should be nothing to do here: the stream should have 566 // been already closed (or will be closed shortly after). 567 } 568 569 void registerStream(int id) { 570 this.streamid = id; 571 connection.putStream(this, streamid); 572 debug.log(Level.DEBUG, "Registered stream %d", id); 573 } 574 575 void signalWindowUpdate() { 576 RequestSubscriber subscriber = requestSubscriber; 577 assert subscriber != null; 578 debug.log(Level.DEBUG, "Signalling window update"); 579 subscriber.sendScheduler.runOrSchedule(); 580 } 581 582 static final ByteBuffer COMPLETED = ByteBuffer.allocate(0); 583 class RequestSubscriber implements Flow.Subscriber<ByteBuffer> { 584 // can be < 0 if the actual length is not known. 585 private final long contentLength; 586 private volatile long remainingContentLength; 587 private volatile Subscription subscription; 588 589 // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers. 590 // 1) The data that was published by the request body Publisher, and 591 // 2) the COMPLETED sentinel, since onComplete can be invoked without demand. 592 final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>(); 593 594 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 595 // A scheduler used to honor window updates. Writing must be paused 596 // when the window is exhausted, and resumed when the window acquires 597 // some space. The sendScheduler makes it possible to implement this 598 // behaviour in an asynchronous non-blocking way. 599 // See RequestSubscriber::trySend below. 600 final SequentialScheduler sendScheduler; 601 602 RequestSubscriber(long contentLen) { 603 this.contentLength = contentLen; 604 this.remainingContentLength = contentLen; 605 this.sendScheduler = 606 SequentialScheduler.synchronizedScheduler(this::trySend); 607 } 608 609 @Override 610 public void onSubscribe(Flow.Subscription subscription) { 611 if (this.subscription != null) { 612 throw new IllegalStateException("already subscribed"); 613 } 614 this.subscription = subscription; 615 debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1"); 616 subscription.request(1); 617 } 618 619 @Override 620 public void onNext(ByteBuffer item) { 621 debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining()); 622 int size = outgoing.size(); 623 assert size == 0 : "non-zero size: " + size; 624 onNextImpl(item); 625 } 626 627 private void onNextImpl(ByteBuffer item) { 628 // Got some more request body bytes to send. 629 if (requestBodyCF.isDone()) { 630 // stream already cancelled, probably in timeout 631 sendScheduler.stop(); 632 subscription.cancel(); 633 return; 634 } 635 outgoing.add(item); 636 sendScheduler.runOrSchedule(); 637 } 638 639 @Override 640 public void onError(Throwable throwable) { 641 debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable); 642 // ensure that errors are handled within the flow. 643 if (errorRef.compareAndSet(null, throwable)) { 644 sendScheduler.runOrSchedule(); 645 } 646 } 647 648 @Override 649 public void onComplete() { 650 debug.log(Level.DEBUG, "RequestSubscriber: onComplete"); 651 int size = outgoing.size(); 652 assert size == 0 || size == 1 : "non-zero or one size: " + size; 653 // last byte of request body has been obtained. 654 // ensure that everything is completed within the flow. 655 onNextImpl(COMPLETED); 656 } 657 658 // Attempts to send the data, if any. 659 // Handles errors and completion state. 660 // Pause writing if the send window is exhausted, resume it if the 661 // send window has some bytes that can be acquired. 662 void trySend() { 663 try { 664 // handle errors raised by onError; 665 Throwable t = errorRef.get(); 666 if (t != null) { 667 sendScheduler.stop(); 668 if (requestBodyCF.isDone()) return; 669 subscription.cancel(); 670 requestBodyCF.completeExceptionally(t); 671 return; 672 } 673 674 do { 675 // handle COMPLETED; 676 ByteBuffer item = outgoing.peekFirst(); 677 if (item == null) return; 678 else if (item == COMPLETED) { 679 sendScheduler.stop(); 680 complete(); 681 return; 682 } 683 684 // handle bytes to send downstream 685 while (item.hasRemaining()) { 686 debug.log(Level.DEBUG, "trySend: %d", item.remaining()); 687 assert !endStreamSent : "internal error, send data after END_STREAM flag"; 688 DataFrame df = getDataFrame(item); 689 if (df == null) { 690 debug.log(Level.DEBUG, "trySend: can't send yet: %d", 691 item.remaining()); 692 return; // the send window is exhausted: come back later 693 } 694 695 if (contentLength > 0) { 696 remainingContentLength -= df.getDataLength(); 697 if (remainingContentLength < 0) { 698 String msg = connection().getConnectionFlow() 699 + " stream=" + streamid + " " 700 + "[" + Thread.currentThread().getName() + "] " 701 + "Too many bytes in request body. Expected: " 702 + contentLength + ", got: " 703 + (contentLength - remainingContentLength); 704 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 705 throw new IOException(msg); 706 } else if (remainingContentLength == 0) { 707 df.setFlag(DataFrame.END_STREAM); 708 endStreamSent = true; 709 } 710 } 711 debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength()); 712 connection.sendDataFrame(df); 713 } 714 assert !item.hasRemaining(); 715 ByteBuffer b = outgoing.removeFirst(); 716 assert b == item; 717 } while (outgoing.peekFirst() != null); 718 719 debug.log(Level.DEBUG, "trySend: request 1"); 720 subscription.request(1); 721 } catch (Throwable ex) { 722 debug.log(Level.DEBUG, "trySend: ", ex); 723 sendScheduler.stop(); 724 subscription.cancel(); 725 requestBodyCF.completeExceptionally(ex); 726 } 727 } 728 729 private void complete() throws IOException { 730 long remaining = remainingContentLength; 731 long written = contentLength - remaining; 732 if (remaining > 0) { 733 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 734 // let trySend() handle the exception 735 throw new IOException(connection().getConnectionFlow() 736 + " stream=" + streamid + " " 737 + "[" + Thread.currentThread().getName() +"] " 738 + "Too few bytes returned by the publisher (" 739 + written + "/" 740 + contentLength + ")"); 741 } 742 if (!endStreamSent) { 743 endStreamSent = true; 744 connection.sendDataFrame(getEmptyEndStreamDataFrame()); 745 } 746 requestBodyCF.complete(null); 747 } 748 } 749 750 /** 751 * Send a RESET frame to tell server to stop sending data on this stream 752 */ 753 @Override 754 public CompletableFuture<Void> ignoreBody() { 755 try { 756 connection.resetStream(streamid, ResetFrame.STREAM_CLOSED); 757 return MinimalFuture.completedFuture(null); 758 } catch (Throwable e) { 759 Log.logTrace("Error resetting stream {0}", e.toString()); 760 return MinimalFuture.failedFuture(e); 761 } 762 } 763 764 DataFrame getDataFrame(ByteBuffer buffer) { 765 int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); 766 // blocks waiting for stream send window, if exhausted 767 int actualAmount = windowController.tryAcquire(requestAmount, streamid, this); 768 if (actualAmount <= 0) return null; 769 ByteBuffer outBuf = Utils.slice(buffer, actualAmount); 770 DataFrame df = new DataFrame(streamid, 0 , outBuf); 771 return df; 772 } 773 774 private DataFrame getEmptyEndStreamDataFrame() { 775 return new DataFrame(streamid, DataFrame.END_STREAM, List.of()); 776 } 777 778 /** 779 * A List of responses relating to this stream. Normally there is only 780 * one response, but intermediate responses like 100 are allowed 781 * and must be passed up to higher level before continuing. Deals with races 782 * such as if responses are returned before the CFs get created by 783 * getResponseAsync() 784 */ 785 786 final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5); 787 788 @Override 789 CompletableFuture<Response> getResponseAsync(Executor executor) { 790 CompletableFuture<Response> cf; 791 // The code below deals with race condition that can be caused when 792 // completeResponse() is being called before getResponseAsync() 793 synchronized (response_cfs) { 794 if (!response_cfs.isEmpty()) { 795 // This CompletableFuture was created by completeResponse(). 796 // it will be already completed. 797 cf = response_cfs.remove(0); 798 // if we find a cf here it should be already completed. 799 // finding a non completed cf should not happen. just assert it. 800 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!"; 801 } else { 802 // getResponseAsync() is called first. Create a CompletableFuture 803 // that will be completed by completeResponse() when 804 // completeResponse() is called. 805 cf = new MinimalFuture<>(); 806 response_cfs.add(cf); 807 } 808 } 809 if (executor != null && !cf.isDone()) { 810 // protect from executing later chain of CompletableFuture operations from SelectorManager thread 811 cf = cf.thenApplyAsync(r -> r, executor); 812 } 813 Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf); 814 PushGroup<?,?> pg = exchange.getPushGroup(); 815 if (pg != null) { 816 // if an error occurs make sure it is recorded in the PushGroup 817 cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e))); 818 } 819 return cf; 820 } 821 822 /** 823 * Completes the first uncompleted CF on list, and removes it. If there is no 824 * uncompleted CF then creates one (completes it) and adds to list 825 */ 826 void completeResponse(Response resp) { 827 synchronized (response_cfs) { 828 CompletableFuture<Response> cf; 829 int cfs_len = response_cfs.size(); 830 for (int i=0; i<cfs_len; i++) { 831 cf = response_cfs.get(i); 832 if (!cf.isDone()) { 833 Log.logTrace("Completing response (streamid={0}): {1}", 834 streamid, cf); 835 cf.complete(resp); 836 response_cfs.remove(cf); 837 return; 838 } // else we found the previous response: just leave it alone. 839 } 840 cf = MinimalFuture.completedFuture(resp); 841 Log.logTrace("Created completed future (streamid={0}): {1}", 842 streamid, cf); 843 response_cfs.add(cf); 844 } 845 } 846 847 // methods to update state and remove stream when finished 848 849 synchronized void requestSent() { 850 requestSent = true; 851 if (responseReceived) { 852 close(); 853 } 854 } 855 856 synchronized void responseReceived() { 857 responseReceived = true; 858 if (requestSent) { 859 close(); 860 } 861 } 862 863 /** 864 * same as above but for errors 865 */ 866 void completeResponseExceptionally(Throwable t) { 867 synchronized (response_cfs) { 868 // use index to avoid ConcurrentModificationException 869 // caused by removing the CF from within the loop. 870 for (int i = 0; i < response_cfs.size(); i++) { 871 CompletableFuture<Response> cf = response_cfs.get(i); 872 if (!cf.isDone()) { 873 cf.completeExceptionally(t); 874 response_cfs.remove(i); 875 return; 876 } 877 } 878 response_cfs.add(MinimalFuture.failedFuture(t)); 879 } 880 } 881 882 CompletableFuture<Void> sendBodyImpl() { 883 requestBodyCF.whenComplete((v, t) -> requestSent()); 884 if (requestPublisher != null) { 885 final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); 886 requestPublisher.subscribe(requestSubscriber = subscriber); 887 } else { 888 // there is no request body, therefore the request is complete, 889 // END_STREAM has already sent with outgoing headers 890 requestBodyCF.complete(null); 891 } 892 return requestBodyCF; 893 } 894 895 @Override 896 void cancel() { 897 cancel(new IOException("Stream " + streamid + " cancelled")); 898 } 899 900 @Override 901 void cancel(IOException cause) { 902 cancelImpl(cause); 903 } 904 905 // This method sends a RST_STREAM frame 906 void cancelImpl(Throwable e) { 907 debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e); 908 if (Log.trace()) { 909 Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); 910 } 911 boolean closing; 912 if (closing = !closed) { // assigning closing to !closed 913 synchronized (this) { 914 failed = e; 915 if (closing = !closed) { // assigning closing to !closed 916 closed=true; 917 } 918 } 919 } 920 if (closing) { // true if the stream has not been closed yet 921 if (responseSubscriber != null) 922 sched.runOrSchedule(); 923 } 924 completeResponseExceptionally(e); 925 if (!requestBodyCF.isDone()) { 926 requestBodyCF.completeExceptionally(e); // we may be sending the body.. 927 } 928 if (responseBodyCF != null) { 929 responseBodyCF.completeExceptionally(e); 930 } 931 try { 932 // will send a RST_STREAM frame 933 if (streamid != 0) { 934 connection.resetStream(streamid, ResetFrame.CANCEL); 935 } 936 } catch (IOException ex) { 937 Log.logError(ex); 938 } 939 } 940 941 // This method doesn't send any frame 942 void close() { 943 if (closed) return; 944 synchronized(this) { 945 if (closed) return; 946 closed = true; 947 } 948 Log.logTrace("Closing stream {0}", streamid); 949 connection.closeStream(streamid); 950 Log.logTrace("Stream {0} closed", streamid); 951 } 952 953 static class PushedStream<U,T> extends Stream<T> { 954 final PushGroup<U,T> pushGroup; 955 // push streams need the response CF allocated up front as it is 956 // given directly to user via the multi handler callback function. 957 final CompletableFuture<Response> pushCF; 958 final CompletableFuture<HttpResponse<T>> responseCF; 959 final HttpRequestImpl pushReq; 960 HttpResponse.BodyHandler<T> pushHandler; 961 962 PushedStream(PushGroup<U,T> pushGroup, 963 Http2Connection connection, 964 Exchange<T> pushReq) { 965 // ## no request body possible, null window controller 966 super(connection, pushReq, null); 967 this.pushGroup = pushGroup; 968 this.pushReq = pushReq.request(); 969 this.pushCF = new MinimalFuture<>(); 970 this.responseCF = new MinimalFuture<>(); 971 } 972 973 CompletableFuture<HttpResponse<T>> responseCF() { 974 return responseCF; 975 } 976 977 synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) { 978 this.pushHandler = pushHandler; 979 } 980 981 synchronized HttpResponse.BodyHandler<T> getPushHandler() { 982 // ignored parameters to function can be used as BodyHandler 983 return this.pushHandler; 984 } 985 986 // Following methods call the super class but in case of 987 // error record it in the PushGroup. The error method is called 988 // with a null value when no error occurred (is a no-op) 989 @Override 990 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 991 return super.sendBodyAsync() 992 .whenComplete((ExchangeImpl<T> v, Throwable t) 993 -> pushGroup.pushError(Utils.getCompletionCause(t))); 994 } 995 996 @Override 997 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 998 return super.sendHeadersAsync() 999 .whenComplete((ExchangeImpl<T> ex, Throwable t) 1000 -> pushGroup.pushError(Utils.getCompletionCause(t))); 1001 } 1002 1003 @Override 1004 CompletableFuture<Response> getResponseAsync(Executor executor) { 1005 CompletableFuture<Response> cf = pushCF.whenComplete( 1006 (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t))); 1007 if(executor!=null && !cf.isDone()) { 1008 cf = cf.thenApplyAsync( r -> r, executor); 1009 } 1010 return cf; 1011 } 1012 1013 @Override 1014 CompletableFuture<T> readBodyAsync( 1015 HttpResponse.BodyHandler<T> handler, 1016 boolean returnConnectionToPool, 1017 Executor executor) 1018 { 1019 return super.readBodyAsync(handler, returnConnectionToPool, executor) 1020 .whenComplete((v, t) -> pushGroup.pushError(t)); 1021 } 1022 1023 @Override 1024 void completeResponse(Response r) { 1025 Log.logResponse(r::toString); 1026 pushCF.complete(r); // not strictly required for push API 1027 // start reading the body using the obtained BodySubscriber 1028 CompletableFuture<Void> start = new MinimalFuture<>(); 1029 start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor())) 1030 .whenComplete((T body, Throwable t) -> { 1031 if (t != null) { 1032 responseCF.completeExceptionally(t); 1033 } else { 1034 HttpResponseImpl<T> resp = 1035 new HttpResponseImpl<>(r.request, r, null, body, getExchange()); 1036 responseCF.complete(resp); 1037 } 1038 }); 1039 start.completeAsync(() -> null, getExchange().executor()); 1040 } 1041 1042 @Override 1043 void completeResponseExceptionally(Throwable t) { 1044 pushCF.completeExceptionally(t); 1045 } 1046 1047 // @Override 1048 // synchronized void responseReceived() { 1049 // super.responseReceived(); 1050 // } 1051 1052 // create and return the PushResponseImpl 1053 @Override 1054 protected void handleResponse() { 1055 responseCode = (int)responseHeaders 1056 .firstValueAsLong(":status") 1057 .orElse(-1); 1058 1059 if (responseCode == -1) { 1060 completeResponseExceptionally(new IOException("No status code")); 1061 } 1062 1063 this.response = new Response( 1064 pushReq, exchange, responseHeaders, 1065 responseCode, HttpClient.Version.HTTP_2); 1066 1067 /* TODO: review if needs to be removed 1068 the value is not used, but in case `content-length` doesn't parse 1069 as long, there will be NumberFormatException. If left as is, make 1070 sure code up the stack handles NFE correctly. */ 1071 responseHeaders.firstValueAsLong("content-length"); 1072 1073 if (Log.headers()) { 1074 StringBuilder sb = new StringBuilder("RESPONSE HEADERS"); 1075 sb.append(" (streamid=").append(streamid).append("): "); 1076 Log.dumpHeaders(sb, " ", responseHeaders); 1077 Log.logHeaders(sb.toString()); 1078 } 1079 1080 // different implementations for normal streams and pushed streams 1081 completeResponse(response); 1082 } 1083 } 1084 1085 final class StreamWindowUpdateSender extends WindowUpdateSender { 1086 1087 StreamWindowUpdateSender(Http2Connection connection) { 1088 super(connection); 1089 } 1090 1091 @Override 1092 int getStreamId() { 1093 return streamid; 1094 } 1095 } 1096 1097 /** 1098 * Returns true if this exchange was canceled. 1099 * @return true if this exchange was canceled. 1100 */ 1101 synchronized boolean isCanceled() { 1102 return failed != null; 1103 } 1104 1105 /** 1106 * Returns the cause for which this exchange was canceled, if available. 1107 * @return the cause for which this exchange was canceled, if available. 1108 */ 1109 synchronized Throwable getCancelCause() { 1110 return failed; 1111 } 1112 1113 final String dbgString() { 1114 return connection.dbgString() + "/Stream("+streamid+")"; 1115 } 1116 } |