1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.net.URI; 31 import java.nio.ByteBuffer; 32 import java.util.ArrayList; 33 import java.util.Collections; 34 import java.util.List; 35 import java.util.Optional; 36 import java.util.concurrent.CompletableFuture; 37 import java.util.concurrent.ConcurrentLinkedDeque; 38 import java.util.concurrent.ConcurrentLinkedQueue; 39 import java.util.concurrent.Executor; 40 import java.util.concurrent.Flow; 41 import java.util.concurrent.Flow.Subscription; 42 import java.util.concurrent.atomic.AtomicReference; 43 import jdk.incubator.http.HttpResponse.BodySubscriber; 44 import jdk.incubator.http.internal.common.*; 45 import jdk.incubator.http.internal.frame.*; 46 import jdk.incubator.http.internal.hpack.DecodingCallback; 47 48 /** 49 * Http/2 Stream handling. 50 * 51 * REQUESTS 52 * 53 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q 54 * 55 * sendRequest() -- sendHeadersOnly() + sendBody() 56 * 57 * sendBodyAsync() -- calls sendBody() in an executor thread. 58 * 59 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block 60 * 61 * sendRequestAsync() -- calls sendRequest() in an executor thread 62 * 63 * RESPONSES 64 * 65 * Multiple responses can be received per request. Responses are queued up on 66 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed 67 * with the next response 68 * 69 * getResponseAsync() -- queries list of response CFs and returns first one 70 * if one exists. Otherwise, creates one and adds it to list 71 * and returns it. Completion is achieved through the 72 * incoming() upcall from connection reader thread. 73 * 74 * getResponse() -- calls getResponseAsync() and waits for CF to complete 75 * 76 * responseBodyAsync() -- calls responseBody() in an executor thread. 77 * 78 * incoming() -- entry point called from connection reader thread. Frames are 79 * either handled immediately without blocking or for data frames 80 * placed on the stream's inputQ which is consumed by the stream's 81 * reader thread. 82 * 83 * PushedStream sub class 84 * ====================== 85 * Sending side methods are not used because the request comes from a PUSH_PROMISE 86 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream 87 * is created. PushedStream does not use responseCF list as there can be only 88 * one response. The CF is created when the object created and when the response 89 * HEADERS frame is received the object is completed. 90 */ 91 class Stream<T> extends ExchangeImpl<T> { 92 93 final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag 94 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 95 96 final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>(); 97 final SequentialScheduler sched = 98 SequentialScheduler.synchronizedScheduler(this::schedule); 99 final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel); 100 101 /** 102 * This stream's identifier. Assigned lazily by the HTTP2Connection before 103 * the stream's first frame is sent. 104 */ 105 protected volatile int streamid; 106 107 long requestContentLen; 108 109 final Http2Connection connection; 110 final HttpRequestImpl request; 111 final DecodingCallback rspHeadersConsumer; 112 HttpHeadersImpl responseHeaders; 113 final HttpHeadersImpl requestPseudoHeaders; 114 volatile HttpResponse.BodySubscriber<T> responseSubscriber; 115 final HttpRequest.BodyPublisher requestPublisher; 116 volatile RequestSubscriber requestSubscriber; 117 volatile int responseCode; 118 volatile Response response; 119 volatile Throwable failed; // The exception with which this stream was canceled. 120 final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>(); 121 volatile CompletableFuture<T> responseBodyCF; 122 123 /** True if END_STREAM has been seen in a frame received on this stream. */ 124 private volatile boolean remotelyClosed; 125 private volatile boolean closed; 126 private volatile boolean endStreamSent; 127 128 // state flags 129 private boolean requestSent, responseReceived; 130 131 /** 132 * A reference to this Stream's connection Send Window controller. The 133 * stream MUST acquire the appropriate amount of Send Window before 134 * sending any data. Will be null for PushStreams, as they cannot send data. 135 */ 136 private final WindowController windowController; 137 private final WindowUpdateSender windowUpdater; 138 139 @Override 140 HttpConnection connection() { 141 return connection.connection; 142 } 143 144 /** 145 * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() } 146 * of after user subscription window has re-opened, from SubscriptionBase.request() 147 */ 148 private void schedule() { 149 if (responseSubscriber == null) 150 // can't process anything yet 151 return; 152 153 while (!inputQ.isEmpty()) { 154 Http2Frame frame = inputQ.peek(); 155 if (frame instanceof ResetFrame) { 156 inputQ.remove(); 157 handleReset((ResetFrame)frame); 158 return; 159 } 160 DataFrame df = (DataFrame)frame; 161 boolean finished = df.getFlag(DataFrame.END_STREAM); 162 163 List<ByteBuffer> buffers = df.getData(); 164 List<ByteBuffer> dsts = Collections.unmodifiableList(buffers); 165 int size = Utils.remaining(dsts, Integer.MAX_VALUE); 166 if (size == 0 && finished) { 167 inputQ.remove(); 168 Log.logTrace("responseSubscriber.onComplete"); 169 debug.log(Level.DEBUG, "incoming: onComplete"); 170 sched.stop(); 171 responseSubscriber.onComplete(); 172 setEndStreamReceived(); 173 return; 174 } else if (userSubscription.tryDecrement()) { 175 inputQ.remove(); 176 Log.logTrace("responseSubscriber.onNext {0}", size); 177 debug.log(Level.DEBUG, "incoming: onNext(%d)", size); 178 responseSubscriber.onNext(dsts); 179 if (consumed(df)) { 180 Log.logTrace("responseSubscriber.onComplete"); 181 debug.log(Level.DEBUG, "incoming: onComplete"); 182 sched.stop(); 183 responseSubscriber.onComplete(); 184 setEndStreamReceived(); 185 return; 186 } 187 } else { 188 return; 189 } 190 } 191 Throwable t = failed; 192 if (t != null) { 193 sched.stop(); 194 responseSubscriber.onError(t); 195 close(); 196 } 197 } 198 199 // Callback invoked after the Response BodySubscriber has consumed the 200 // buffers contained in a DataFrame. 201 // Returns true if END_STREAM is reached, false otherwise. 202 private boolean consumed(DataFrame df) { 203 // RFC 7540 6.1: 204 // The entire DATA frame payload is included in flow control, 205 // including the Pad Length and Padding fields if present 206 int len = df.payloadLength(); 207 connection.windowUpdater.update(len); 208 209 if (!df.getFlag(DataFrame.END_STREAM)) { 210 // Don't send window update on a stream which is 211 // closed or half closed. 212 windowUpdater.update(len); 213 return false; // more data coming 214 } 215 return true; // end of stream 216 } 217 218 @Override 219 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, 220 boolean returnConnectionToPool, 221 Executor executor) 222 { 223 Log.logTrace("Reading body on stream {0}", streamid); 224 BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders); 225 CompletableFuture<T> cf = receiveData(bodySubscriber); 226 227 PushGroup<?,?> pg = exchange.getPushGroup(); 228 if (pg != null) { 229 // if an error occurs make sure it is recorded in the PushGroup 230 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 231 } 232 return cf; 233 } 234 235 @Override 236 public String toString() { 237 StringBuilder sb = new StringBuilder(); 238 sb.append("streamid: ") 239 .append(streamid); 240 return sb.toString(); 241 } 242 243 private void receiveDataFrame(DataFrame df) { 244 inputQ.add(df); 245 sched.runOrSchedule(); 246 } 247 248 /** Handles a RESET frame. RESET is always handled inline in the queue. */ 249 private void receiveResetFrame(ResetFrame frame) { 250 inputQ.add(frame); 251 sched.runOrSchedule(); 252 } 253 254 // pushes entire response body into response subscriber 255 // blocking when required by local or remote flow control 256 CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) { 257 responseBodyCF = MinimalFuture.of(bodySubscriber.getBody()); 258 259 if (isCanceled()) { 260 Throwable t = getCancelCause(); 261 responseBodyCF.completeExceptionally(t); 262 } else { 263 bodySubscriber.onSubscribe(userSubscription); 264 } 265 // Set the responseSubscriber field now that onSubscribe has been called. 266 // This effectively allows the scheduler to start invoking the callbacks. 267 responseSubscriber = bodySubscriber; 268 sched.runOrSchedule(); // in case data waiting already to be processed 269 return responseBodyCF; 270 } 271 272 @Override 273 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 274 return sendBodyImpl().thenApply( v -> this); 275 } 276 277 @SuppressWarnings("unchecked") 278 Stream(Http2Connection connection, 279 Exchange<T> e, 280 WindowController windowController) 281 { 282 super(e); 283 this.connection = connection; 284 this.windowController = windowController; 285 this.request = e.request(); 286 this.requestPublisher = request.requestPublisher; // may be null 287 responseHeaders = new HttpHeadersImpl(); 288 rspHeadersConsumer = (name, value) -> { 289 responseHeaders.addHeader(name.toString(), value.toString()); 290 if (Log.headers() && Log.trace()) { 291 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}", 292 streamid, name, value); 293 } 294 }; 295 this.requestPseudoHeaders = new HttpHeadersImpl(); 296 // NEW 297 this.windowUpdater = new StreamWindowUpdateSender(connection); 298 } 299 300 /** 301 * Entry point from Http2Connection reader thread. 302 * 303 * Data frames will be removed by response body thread. 304 */ 305 void incoming(Http2Frame frame) throws IOException { 306 debug.log(Level.DEBUG, "incoming: %s", frame); 307 if ((frame instanceof HeaderFrame)) { 308 HeaderFrame hframe = (HeaderFrame)frame; 309 if (hframe.endHeaders()) { 310 Log.logTrace("handling response (streamid={0})", streamid); 311 handleResponse(); 312 if (hframe.getFlag(HeaderFrame.END_STREAM)) { 313 receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of())); 314 } 315 } 316 } else if (frame instanceof DataFrame) { 317 receiveDataFrame((DataFrame)frame); 318 } else { 319 otherFrame(frame); 320 } 321 } 322 323 void otherFrame(Http2Frame frame) throws IOException { 324 switch (frame.type()) { 325 case WindowUpdateFrame.TYPE: 326 incoming_windowUpdate((WindowUpdateFrame) frame); 327 break; 328 case ResetFrame.TYPE: 329 incoming_reset((ResetFrame) frame); 330 break; 331 case PriorityFrame.TYPE: 332 incoming_priority((PriorityFrame) frame); 333 break; 334 default: 335 String msg = "Unexpected frame: " + frame.toString(); 336 throw new IOException(msg); 337 } 338 } 339 340 // The Hpack decoder decodes into one of these consumers of name,value pairs 341 342 DecodingCallback rspHeadersConsumer() { 343 return rspHeadersConsumer; 344 } 345 346 protected void handleResponse() throws IOException { 347 responseCode = (int)responseHeaders 348 .firstValueAsLong(":status") 349 .orElseThrow(() -> new IOException("no statuscode in response")); 350 351 response = new Response( 352 request, exchange, responseHeaders, 353 responseCode, HttpClient.Version.HTTP_2); 354 355 /* TODO: review if needs to be removed 356 the value is not used, but in case `content-length` doesn't parse as 357 long, there will be NumberFormatException. If left as is, make sure 358 code up the stack handles NFE correctly. */ 359 responseHeaders.firstValueAsLong("content-length"); 360 361 if (Log.headers()) { 362 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 363 Log.dumpHeaders(sb, " ", responseHeaders); 364 Log.logHeaders(sb.toString()); 365 } 366 367 completeResponse(response); 368 } 369 370 void incoming_reset(ResetFrame frame) { 371 Log.logTrace("Received RST_STREAM on stream {0}", streamid); 372 if (endStreamReceived()) { 373 Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); 374 } else if (closed) { 375 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 376 } else { 377 // put it in the input queue in order to read all 378 // pending data frames first. Indeed, a server may send 379 // RST_STREAM after sending END_STREAM, in which case we should 380 // ignore it. However, we won't know if we have received END_STREAM 381 // or not until all pending data frames are read. 382 receiveResetFrame(frame); 383 // RST_STREAM was pushed to the queue. It will be handled by 384 // asyncReceive after all pending data frames have been 385 // processed. 386 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); 387 } 388 } 389 390 void handleReset(ResetFrame frame) { 391 Log.logTrace("Handling RST_STREAM on stream {0}", streamid); 392 if (!closed) { 393 close(); 394 int error = frame.getErrorCode(); 395 completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error))); 396 } else { 397 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 398 } 399 } 400 401 void incoming_priority(PriorityFrame frame) { 402 // TODO: implement priority 403 throw new UnsupportedOperationException("Not implemented"); 404 } 405 406 private void incoming_windowUpdate(WindowUpdateFrame frame) 407 throws IOException 408 { 409 int amount = frame.getUpdate(); 410 if (amount <= 0) { 411 Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n", 412 streamid, streamid, amount); 413 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 414 } else { 415 assert streamid != 0; 416 boolean success = windowController.increaseStreamWindow(amount, streamid); 417 if (!success) { // overflow 418 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 419 } 420 } 421 } 422 423 void incoming_pushPromise(HttpRequestImpl pushReq, 424 PushedStream<?,T> pushStream) 425 throws IOException 426 { 427 if (Log.requests()) { 428 Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); 429 } 430 PushGroup<?,T> pushGroup = exchange.getPushGroup(); 431 if (pushGroup == null) { 432 Log.logTrace("Rejecting push promise stream " + streamid); 433 connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM); 434 pushStream.close(); 435 return; 436 } 437 438 HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber(); 439 440 CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF(); 441 442 Optional<HttpResponse.BodyHandler<T>> bpOpt = 443 pushGroup.handlerForPushRequest(pushReq); 444 445 if (!bpOpt.isPresent()) { 446 IOException ex = new IOException("Stream " 447 + streamid + " cancelled by user"); 448 if (Log.trace()) { 449 Log.logTrace("No body subscriber for {0}: {1}", pushReq, 450 ex.getMessage()); 451 } 452 pushStream.cancelImpl(ex); 453 cf.completeExceptionally(ex); 454 return; 455 } 456 457 pushGroup.addPush(); 458 pushStream.requestSent(); 459 pushStream.setPushHandler(bpOpt.get()); 460 // setup housekeeping for when the push is received 461 // TODO: deal with ignoring of CF anti-pattern 462 cf.whenComplete((HttpResponse<T> resp, Throwable t) -> { 463 t = Utils.getCompletionCause(t); 464 if (Log.trace()) { 465 Log.logTrace("Push completed on stream {0} for {1}{2}", 466 pushStream.streamid, resp, 467 ((t==null) ? "": " with exception " + t)); 468 } 469 if (t != null) { 470 pushGroup.pushError(t); 471 proc.onError(pushReq, t); 472 } else { 473 proc.onResponse(resp); 474 } 475 pushGroup.pushCompleted(); 476 }); 477 478 } 479 480 private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) { 481 HttpHeadersImpl h = request.getSystemHeaders(); 482 if (contentLength > 0) { 483 h.setHeader("content-length", Long.toString(contentLength)); 484 } 485 setPseudoHeaderFields(); 486 OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(h, request.getUserHeaders(), this); 487 if (contentLength == 0) { 488 f.setFlag(HeadersFrame.END_STREAM); 489 endStreamSent = true; 490 } 491 return f; 492 } 493 494 private void setPseudoHeaderFields() { 495 HttpHeadersImpl hdrs = requestPseudoHeaders; 496 String method = request.method(); 497 hdrs.setHeader(":method", method); 498 URI uri = request.uri(); 499 hdrs.setHeader(":scheme", uri.getScheme()); 500 // TODO: userinfo deprecated. Needs to be removed 501 hdrs.setHeader(":authority", uri.getAuthority()); 502 // TODO: ensure header names beginning with : not in user headers 503 String query = uri.getQuery(); 504 String path = uri.getPath(); 505 if (path == null || path.isEmpty()) { 506 if (method.equalsIgnoreCase("OPTIONS")) { 507 path = "*"; 508 } else { 509 path = "/"; 510 } 511 } 512 if (query != null) { 513 path += "?" + query; 514 } 515 hdrs.setHeader(":path", path); 516 } 517 518 HttpHeadersImpl getRequestPseudoHeaders() { 519 return requestPseudoHeaders; 520 } 521 522 /** Sets endStreamReceived. Should be called only once. */ 523 void setEndStreamReceived() { 524 assert remotelyClosed == false: "Unexpected endStream already set"; 525 remotelyClosed = true; 526 responseReceived(); 527 } 528 529 /** Tells whether, or not, the END_STREAM Flag has been seen in any frame 530 * received on this stream. */ 531 private boolean endStreamReceived() { 532 return remotelyClosed; 533 } 534 535 @Override 536 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 537 debug.log(Level.DEBUG, "sendHeadersOnly()"); 538 if (Log.requests() && request != null) { 539 Log.logRequest(request.toString()); 540 } 541 if (requestPublisher != null) { 542 requestContentLen = requestPublisher.contentLength(); 543 } else { 544 requestContentLen = 0; 545 } 546 OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen); 547 connection.sendFrame(f); 548 CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>(); 549 cf.complete(this); // #### good enough for now 550 return cf; 551 } 552 553 @Override 554 void released() { 555 if (streamid > 0) { 556 debug.log(Level.DEBUG, "Released stream %d", streamid); 557 // remove this stream from the Http2Connection map. 558 connection.closeStream(streamid); 559 } else { 560 debug.log(Level.DEBUG, "Can't release stream %d", streamid); 561 } 562 } 563 564 @Override 565 void completed() { 566 // There should be nothing to do here: the stream should have 567 // been already closed (or will be closed shortly after). 568 } 569 570 void registerStream(int id) { 571 this.streamid = id; 572 connection.putStream(this, streamid); 573 debug.log(Level.DEBUG, "Registered stream %d", id); 574 } 575 576 void signalWindowUpdate() { 577 RequestSubscriber subscriber = requestSubscriber; 578 assert subscriber != null; 579 debug.log(Level.DEBUG, "Signalling window update"); 580 subscriber.sendScheduler.runOrSchedule(); 581 } 582 583 static final ByteBuffer COMPLETED = ByteBuffer.allocate(0); 584 class RequestSubscriber implements Flow.Subscriber<ByteBuffer> { 585 // can be < 0 if the actual length is not known. 586 private final long contentLength; 587 private volatile long remainingContentLength; 588 private volatile Subscription subscription; 589 590 // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers. 591 // 1) The data that was published by the request body Publisher, and 592 // 2) the COMPLETED sentinel, since onComplete can be invoked without demand. 593 final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>(); 594 595 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 596 // A scheduler used to honor window updates. Writing must be paused 597 // when the window is exhausted, and resumed when the window acquires 598 // some space. The sendScheduler makes it possible to implement this 599 // behaviour in an asynchronous non-blocking way. 600 // See RequestSubscriber::trySend below. 601 final SequentialScheduler sendScheduler; 602 603 RequestSubscriber(long contentLen) { 604 this.contentLength = contentLen; 605 this.remainingContentLength = contentLen; 606 this.sendScheduler = 607 SequentialScheduler.synchronizedScheduler(this::trySend); 608 } 609 610 @Override 611 public void onSubscribe(Flow.Subscription subscription) { 612 if (this.subscription != null) { 613 throw new IllegalStateException("already subscribed"); 614 } 615 this.subscription = subscription; 616 debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1"); 617 subscription.request(1); 618 } 619 620 @Override 621 public void onNext(ByteBuffer item) { 622 debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining()); 623 int size = outgoing.size(); 624 assert size == 0 : "non-zero size: " + size; 625 onNextImpl(item); 626 } 627 628 private void onNextImpl(ByteBuffer item) { 629 // Got some more request body bytes to send. 630 if (requestBodyCF.isDone()) { 631 // stream already cancelled, probably in timeout 632 sendScheduler.stop(); 633 subscription.cancel(); 634 return; 635 } 636 outgoing.add(item); 637 sendScheduler.runOrSchedule(); 638 } 639 640 @Override 641 public void onError(Throwable throwable) { 642 debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable); 643 // ensure that errors are handled within the flow. 644 if (errorRef.compareAndSet(null, throwable)) { 645 sendScheduler.runOrSchedule(); 646 } 647 } 648 649 @Override 650 public void onComplete() { 651 debug.log(Level.DEBUG, "RequestSubscriber: onComplete"); 652 int size = outgoing.size(); 653 assert size == 0 || size == 1 : "non-zero or one size: " + size; 654 // last byte of request body has been obtained. 655 // ensure that everything is completed within the flow. 656 onNextImpl(COMPLETED); 657 } 658 659 // Attempts to send the data, if any. 660 // Handles errors and completion state. 661 // Pause writing if the send window is exhausted, resume it if the 662 // send window has some bytes that can be acquired. 663 void trySend() { 664 try { 665 // handle errors raised by onError; 666 Throwable t = errorRef.get(); 667 if (t != null) { 668 sendScheduler.stop(); 669 if (requestBodyCF.isDone()) return; 670 subscription.cancel(); 671 requestBodyCF.completeExceptionally(t); 672 return; 673 } 674 675 do { 676 // handle COMPLETED; 677 ByteBuffer item = outgoing.peekFirst(); 678 if (item == null) return; 679 else if (item == COMPLETED) { 680 sendScheduler.stop(); 681 complete(); 682 return; 683 } 684 685 // handle bytes to send downstream 686 while (item.hasRemaining()) { 687 debug.log(Level.DEBUG, "trySend: %d", item.remaining()); 688 assert !endStreamSent : "internal error, send data after END_STREAM flag"; 689 DataFrame df = getDataFrame(item); 690 if (df == null) { 691 debug.log(Level.DEBUG, "trySend: can't send yet: %d", 692 item.remaining()); 693 return; // the send window is exhausted: come back later 694 } 695 696 if (contentLength > 0) { 697 remainingContentLength -= df.getDataLength(); 698 if (remainingContentLength < 0) { 699 String msg = connection().getConnectionFlow() 700 + " stream=" + streamid + " " 701 + "[" + Thread.currentThread().getName() + "] " 702 + "Too many bytes in request body. Expected: " 703 + contentLength + ", got: " 704 + (contentLength - remainingContentLength); 705 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 706 throw new IOException(msg); 707 } else if (remainingContentLength == 0) { 708 df.setFlag(DataFrame.END_STREAM); 709 endStreamSent = true; 710 } 711 } 712 debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength()); 713 connection.sendDataFrame(df); 714 } 715 assert !item.hasRemaining(); 716 ByteBuffer b = outgoing.removeFirst(); 717 assert b == item; 718 } while (outgoing.peekFirst() != null); 719 720 debug.log(Level.DEBUG, "trySend: request 1"); 721 subscription.request(1); 722 } catch (Throwable ex) { 723 debug.log(Level.DEBUG, "trySend: ", ex); 724 sendScheduler.stop(); 725 subscription.cancel(); 726 requestBodyCF.completeExceptionally(ex); 727 } 728 } 729 730 private void complete() throws IOException { 731 long remaining = remainingContentLength; 732 long written = contentLength - remaining; 733 if (remaining > 0) { 734 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 735 // let trySend() handle the exception 736 throw new IOException(connection().getConnectionFlow() 737 + " stream=" + streamid + " " 738 + "[" + Thread.currentThread().getName() +"] " 739 + "Too few bytes returned by the publisher (" 740 + written + "/" 741 + contentLength + ")"); 742 } 743 if (!endStreamSent) { 744 endStreamSent = true; 745 connection.sendDataFrame(getEmptyEndStreamDataFrame()); 746 } 747 requestBodyCF.complete(null); 748 } 749 } 750 751 /** 752 * Send a RESET frame to tell server to stop sending data on this stream 753 */ 754 @Override 755 public CompletableFuture<Void> ignoreBody() { 756 try { 757 connection.resetStream(streamid, ResetFrame.STREAM_CLOSED); 758 return MinimalFuture.completedFuture(null); 759 } catch (Throwable e) { 760 Log.logTrace("Error resetting stream {0}", e.toString()); 761 return MinimalFuture.failedFuture(e); 762 } 763 } 764 765 DataFrame getDataFrame(ByteBuffer buffer) { 766 int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); 767 // blocks waiting for stream send window, if exhausted 768 int actualAmount = windowController.tryAcquire(requestAmount, streamid, this); 769 if (actualAmount <= 0) return null; 770 ByteBuffer outBuf = Utils.slice(buffer, actualAmount); 771 DataFrame df = new DataFrame(streamid, 0 , outBuf); 772 return df; 773 } 774 775 private DataFrame getEmptyEndStreamDataFrame() { 776 return new DataFrame(streamid, DataFrame.END_STREAM, List.of()); 777 } 778 779 /** 780 * A List of responses relating to this stream. Normally there is only 781 * one response, but intermediate responses like 100 are allowed 782 * and must be passed up to higher level before continuing. Deals with races 783 * such as if responses are returned before the CFs get created by 784 * getResponseAsync() 785 */ 786 787 final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5); 788 789 @Override 790 CompletableFuture<Response> getResponseAsync(Executor executor) { 791 CompletableFuture<Response> cf; 792 // The code below deals with race condition that can be caused when 793 // completeResponse() is being called before getResponseAsync() 794 synchronized (response_cfs) { 795 if (!response_cfs.isEmpty()) { 796 // This CompletableFuture was created by completeResponse(). 797 // it will be already completed. 798 cf = response_cfs.remove(0); 799 // if we find a cf here it should be already completed. 800 // finding a non completed cf should not happen. just assert it. 801 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!"; 802 } else { 803 // getResponseAsync() is called first. Create a CompletableFuture 804 // that will be completed by completeResponse() when 805 // completeResponse() is called. 806 cf = new MinimalFuture<>(); 807 response_cfs.add(cf); 808 } 809 } 810 if (executor != null && !cf.isDone()) { 811 // protect from executing later chain of CompletableFuture operations from SelectorManager thread 812 cf = cf.thenApplyAsync(r -> r, executor); 813 } 814 Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf); 815 PushGroup<?,?> pg = exchange.getPushGroup(); 816 if (pg != null) { 817 // if an error occurs make sure it is recorded in the PushGroup 818 cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e))); 819 } 820 return cf; 821 } 822 823 /** 824 * Completes the first uncompleted CF on list, and removes it. If there is no 825 * uncompleted CF then creates one (completes it) and adds to list 826 */ 827 void completeResponse(Response resp) { 828 synchronized (response_cfs) { 829 CompletableFuture<Response> cf; 830 int cfs_len = response_cfs.size(); 831 for (int i=0; i<cfs_len; i++) { 832 cf = response_cfs.get(i); 833 if (!cf.isDone()) { 834 Log.logTrace("Completing response (streamid={0}): {1}", 835 streamid, cf); 836 cf.complete(resp); 837 response_cfs.remove(cf); 838 return; 839 } // else we found the previous response: just leave it alone. 840 } 841 cf = MinimalFuture.completedFuture(resp); 842 Log.logTrace("Created completed future (streamid={0}): {1}", 843 streamid, cf); 844 response_cfs.add(cf); 845 } 846 } 847 848 // methods to update state and remove stream when finished 849 850 synchronized void requestSent() { 851 requestSent = true; 852 if (responseReceived) { 853 close(); 854 } 855 } 856 857 synchronized void responseReceived() { 858 responseReceived = true; 859 if (requestSent) { 860 close(); 861 } 862 } 863 864 /** 865 * same as above but for errors 866 */ 867 void completeResponseExceptionally(Throwable t) { 868 synchronized (response_cfs) { 869 // use index to avoid ConcurrentModificationException 870 // caused by removing the CF from within the loop. 871 for (int i = 0; i < response_cfs.size(); i++) { 872 CompletableFuture<Response> cf = response_cfs.get(i); 873 if (!cf.isDone()) { 874 cf.completeExceptionally(t); 875 response_cfs.remove(i); 876 return; 877 } 878 } 879 response_cfs.add(MinimalFuture.failedFuture(t)); 880 } 881 } 882 883 CompletableFuture<Void> sendBodyImpl() { 884 requestBodyCF.whenComplete((v, t) -> requestSent()); 885 if (requestPublisher != null) { 886 final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); 887 requestPublisher.subscribe(requestSubscriber = subscriber); 888 } else { 889 // there is no request body, therefore the request is complete, 890 // END_STREAM has already sent with outgoing headers 891 requestBodyCF.complete(null); 892 } 893 return requestBodyCF; 894 } 895 896 @Override 897 void cancel() { 898 cancel(new IOException("Stream " + streamid + " cancelled")); 899 } 900 901 @Override 902 void cancel(IOException cause) { 903 cancelImpl(cause); 904 } 905 906 // This method sends a RST_STREAM frame 907 void cancelImpl(Throwable e) { 908 debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e); 909 if (Log.trace()) { 910 Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); 911 } 912 boolean closing; 913 if (closing = !closed) { // assigning closing to !closed 914 synchronized (this) { 915 failed = e; 916 if (closing = !closed) { // assigning closing to !closed 917 closed=true; 918 } 919 } 920 } 921 if (closing) { // true if the stream has not been closed yet 922 if (responseSubscriber != null) 923 sched.runOrSchedule(); 924 } 925 completeResponseExceptionally(e); 926 if (!requestBodyCF.isDone()) { 927 requestBodyCF.completeExceptionally(e); // we may be sending the body.. 928 } 929 if (responseBodyCF != null) { 930 responseBodyCF.completeExceptionally(e); 931 } 932 try { 933 // will send a RST_STREAM frame 934 if (streamid != 0) { 935 connection.resetStream(streamid, ResetFrame.CANCEL); 936 } 937 } catch (IOException ex) { 938 Log.logError(ex); 939 } 940 } 941 942 // This method doesn't send any frame 943 void close() { 944 if (closed) return; 945 synchronized(this) { 946 if (closed) return; 947 closed = true; 948 } 949 Log.logTrace("Closing stream {0}", streamid); 950 connection.closeStream(streamid); 951 Log.logTrace("Stream {0} closed", streamid); 952 } 953 954 static class PushedStream<U,T> extends Stream<T> { 955 final PushGroup<U,T> pushGroup; 956 // push streams need the response CF allocated up front as it is 957 // given directly to user via the multi handler callback function. 958 final CompletableFuture<Response> pushCF; 959 final CompletableFuture<HttpResponse<T>> responseCF; 960 final HttpRequestImpl pushReq; 961 HttpResponse.BodyHandler<T> pushHandler; 962 963 PushedStream(PushGroup<U,T> pushGroup, 964 Http2Connection connection, 965 Exchange<T> pushReq) { 966 // ## no request body possible, null window controller 967 super(connection, pushReq, null); 968 this.pushGroup = pushGroup; 969 this.pushReq = pushReq.request(); 970 this.pushCF = new MinimalFuture<>(); 971 this.responseCF = new MinimalFuture<>(); 972 } 973 974 CompletableFuture<HttpResponse<T>> responseCF() { 975 return responseCF; 976 } 977 978 synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) { 979 this.pushHandler = pushHandler; 980 } 981 982 synchronized HttpResponse.BodyHandler<T> getPushHandler() { 983 // ignored parameters to function can be used as BodyHandler 984 return this.pushHandler; 985 } 986 987 // Following methods call the super class but in case of 988 // error record it in the PushGroup. The error method is called 989 // with a null value when no error occurred (is a no-op) 990 @Override 991 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 992 return super.sendBodyAsync() 993 .whenComplete((ExchangeImpl<T> v, Throwable t) 994 -> pushGroup.pushError(Utils.getCompletionCause(t))); 995 } 996 997 @Override 998 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 999 return super.sendHeadersAsync() 1000 .whenComplete((ExchangeImpl<T> ex, Throwable t) 1001 -> pushGroup.pushError(Utils.getCompletionCause(t))); 1002 } 1003 1004 @Override 1005 CompletableFuture<Response> getResponseAsync(Executor executor) { 1006 CompletableFuture<Response> cf = pushCF.whenComplete( 1007 (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t))); 1008 if(executor!=null && !cf.isDone()) { 1009 cf = cf.thenApplyAsync( r -> r, executor); 1010 } 1011 return cf; 1012 } 1013 1014 @Override 1015 CompletableFuture<T> readBodyAsync( 1016 HttpResponse.BodyHandler<T> handler, 1017 boolean returnConnectionToPool, 1018 Executor executor) 1019 { 1020 return super.readBodyAsync(handler, returnConnectionToPool, executor) 1021 .whenComplete((v, t) -> pushGroup.pushError(t)); 1022 } 1023 1024 @Override 1025 void completeResponse(Response r) { 1026 Log.logResponse(r::toString); 1027 pushCF.complete(r); // not strictly required for push API 1028 // start reading the body using the obtained BodySubscriber 1029 CompletableFuture<Void> start = new MinimalFuture<>(); 1030 start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor())) 1031 .whenComplete((T body, Throwable t) -> { 1032 if (t != null) { 1033 responseCF.completeExceptionally(t); 1034 } else { 1035 HttpResponseImpl<T> resp = 1036 new HttpResponseImpl<>(r.request, r, null, body, getExchange()); 1037 responseCF.complete(resp); 1038 } 1039 }); 1040 start.completeAsync(() -> null, getExchange().executor()); 1041 } 1042 1043 @Override 1044 void completeResponseExceptionally(Throwable t) { 1045 pushCF.completeExceptionally(t); 1046 } 1047 1048 // @Override 1049 // synchronized void responseReceived() { 1050 // super.responseReceived(); 1051 // } 1052 1053 // create and return the PushResponseImpl 1054 @Override 1055 protected void handleResponse() { 1056 responseCode = (int)responseHeaders 1057 .firstValueAsLong(":status") 1058 .orElse(-1); 1059 1060 if (responseCode == -1) { 1061 completeResponseExceptionally(new IOException("No status code")); 1062 } 1063 1064 this.response = new Response( 1065 pushReq, exchange, responseHeaders, 1066 responseCode, HttpClient.Version.HTTP_2); 1067 1068 /* TODO: review if needs to be removed 1069 the value is not used, but in case `content-length` doesn't parse 1070 as long, there will be NumberFormatException. If left as is, make 1071 sure code up the stack handles NFE correctly. */ 1072 responseHeaders.firstValueAsLong("content-length"); 1073 1074 if (Log.headers()) { 1075 StringBuilder sb = new StringBuilder("RESPONSE HEADERS"); 1076 sb.append(" (streamid=").append(streamid).append("): "); 1077 Log.dumpHeaders(sb, " ", responseHeaders); 1078 Log.logHeaders(sb.toString()); 1079 } 1080 1081 // different implementations for normal streams and pushed streams 1082 completeResponse(response); 1083 } 1084 } 1085 1086 final class StreamWindowUpdateSender extends WindowUpdateSender { 1087 1088 StreamWindowUpdateSender(Http2Connection connection) { 1089 super(connection); 1090 } 1091 1092 @Override 1093 int getStreamId() { 1094 return streamid; 1095 } 1096 } 1097 1098 /** 1099 * Returns true if this exchange was canceled. 1100 * @return true if this exchange was canceled. 1101 */ 1102 synchronized boolean isCanceled() { 1103 return failed != null; 1104 } 1105 1106 /** 1107 * Returns the cause for which this exchange was canceled, if available. 1108 * @return the cause for which this exchange was canceled, if available. 1109 */ 1110 synchronized Throwable getCancelCause() { 1111 return failed; 1112 } 1113 1114 final String dbgString() { 1115 return connection.dbgString() + "/Stream("+streamid+")"; 1116 } 1117 }