1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.EOFException; 29 import java.io.IOException; 30 import java.io.UncheckedIOException; 31 import java.net.URI; 32 import java.nio.ByteBuffer; 33 import java.util.ArrayList; 34 import java.util.Collections; 35 import java.util.List; 36 import java.util.concurrent.CompletableFuture; 37 import java.util.concurrent.ConcurrentLinkedDeque; 38 import java.util.concurrent.ConcurrentLinkedQueue; 39 import java.util.concurrent.Executor; 40 import java.util.concurrent.Flow; 41 import java.util.concurrent.Flow.Subscription; 42 import java.util.concurrent.atomic.AtomicBoolean; 43 import java.util.concurrent.atomic.AtomicReference; 44 import java.util.function.BiPredicate; 45 import java.net.http.HttpClient; 46 import java.net.http.HttpHeaders; 47 import java.net.http.HttpRequest; 48 import java.net.http.HttpResponse; 49 import java.net.http.HttpResponse.BodySubscriber; 50 import jdk.internal.net.http.common.*; 51 import jdk.internal.net.http.frame.*; 52 import jdk.internal.net.http.hpack.DecodingCallback; 53 54 /** 55 * Http/2 Stream handling. 56 * 57 * REQUESTS 58 * 59 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q 60 * 61 * sendRequest() -- sendHeadersOnly() + sendBody() 62 * 63 * sendBodyAsync() -- calls sendBody() in an executor thread. 64 * 65 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block 66 * 67 * sendRequestAsync() -- calls sendRequest() in an executor thread 68 * 69 * RESPONSES 70 * 71 * Multiple responses can be received per request. Responses are queued up on 72 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed 73 * with the next response 74 * 75 * getResponseAsync() -- queries list of response CFs and returns first one 76 * if one exists. Otherwise, creates one and adds it to list 77 * and returns it. Completion is achieved through the 78 * incoming() upcall from connection reader thread. 79 * 80 * getResponse() -- calls getResponseAsync() and waits for CF to complete 81 * 82 * responseBodyAsync() -- calls responseBody() in an executor thread. 83 * 84 * incoming() -- entry point called from connection reader thread. Frames are 85 * either handled immediately without blocking or for data frames 86 * placed on the stream's inputQ which is consumed by the stream's 87 * reader thread. 88 * 89 * PushedStream sub class 90 * ====================== 91 * Sending side methods are not used because the request comes from a PUSH_PROMISE 92 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream 93 * is created. PushedStream does not use responseCF list as there can be only 94 * one response. The CF is created when the object created and when the response 95 * HEADERS frame is received the object is completed. 96 */ 97 class Stream<T> extends ExchangeImpl<T> { 98 99 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 100 101 final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>(); 102 final SequentialScheduler sched = 103 SequentialScheduler.synchronizedScheduler(this::schedule); 104 final SubscriptionBase userSubscription = 105 new SubscriptionBase(sched, this::cancel, this::onSubscriptionError); 106 107 /** 108 * This stream's identifier. Assigned lazily by the HTTP2Connection before 109 * the stream's first frame is sent. 110 */ 111 protected volatile int streamid; 112 113 long requestContentLen; 114 115 final Http2Connection connection; 116 final HttpRequestImpl request; 117 final HeadersConsumer rspHeadersConsumer; 118 final HttpHeadersBuilder responseHeadersBuilder; 119 final HttpHeaders requestPseudoHeaders; 120 volatile HttpResponse.BodySubscriber<T> responseSubscriber; 121 final HttpRequest.BodyPublisher requestPublisher; 122 volatile RequestSubscriber requestSubscriber; 123 volatile int responseCode; 124 volatile Response response; 125 // The exception with which this stream was canceled. 126 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 127 final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>(); 128 volatile CompletableFuture<T> responseBodyCF; 129 volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber; 130 volatile boolean stopRequested; 131 132 /** True if END_STREAM has been seen in a frame received on this stream. */ 133 private volatile boolean remotelyClosed; 134 private volatile boolean closed; 135 private volatile boolean endStreamSent; 136 137 final AtomicBoolean deRegistered = new AtomicBoolean(false); 138 139 // state flags 140 private boolean requestSent, responseReceived; 141 142 /** 143 * A reference to this Stream's connection Send Window controller. The 144 * stream MUST acquire the appropriate amount of Send Window before 145 * sending any data. Will be null for PushStreams, as they cannot send data. 146 */ 147 private final WindowController windowController; 148 private final WindowUpdateSender windowUpdater; 149 150 @Override 151 HttpConnection connection() { 152 return connection.connection; 153 } 154 155 /** 156 * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() } 157 * of after user subscription window has re-opened, from SubscriptionBase.request() 158 */ 159 private void schedule() { 160 boolean onCompleteCalled = false; 161 HttpResponse.BodySubscriber<T> subscriber = responseSubscriber; 162 try { 163 if (subscriber == null) { 164 subscriber = responseSubscriber = pendingResponseSubscriber; 165 if (subscriber == null) { 166 // can't process anything yet 167 return; 168 } else { 169 if (debug.on()) debug.log("subscribing user subscriber"); 170 subscriber.onSubscribe(userSubscription); 171 } 172 } 173 while (!inputQ.isEmpty()) { 174 Http2Frame frame = inputQ.peek(); 175 if (frame instanceof ResetFrame) { 176 inputQ.remove(); 177 handleReset((ResetFrame)frame, subscriber); 178 return; 179 } 180 DataFrame df = (DataFrame)frame; 181 boolean finished = df.getFlag(DataFrame.END_STREAM); 182 183 List<ByteBuffer> buffers = df.getData(); 184 List<ByteBuffer> dsts = Collections.unmodifiableList(buffers); 185 int size = Utils.remaining(dsts, Integer.MAX_VALUE); 186 if (size == 0 && finished) { 187 inputQ.remove(); 188 connection.ensureWindowUpdated(df); // must update connection window 189 Log.logTrace("responseSubscriber.onComplete"); 190 if (debug.on()) debug.log("incoming: onComplete"); 191 sched.stop(); 192 connection.decrementStreamsCount(streamid); 193 subscriber.onComplete(); 194 onCompleteCalled = true; 195 setEndStreamReceived(); 196 return; 197 } else if (userSubscription.tryDecrement()) { 198 inputQ.remove(); 199 Log.logTrace("responseSubscriber.onNext {0}", size); 200 if (debug.on()) debug.log("incoming: onNext(%d)", size); 201 try { 202 subscriber.onNext(dsts); 203 } catch (Throwable t) { 204 connection.dropDataFrame(df); // must update connection window 205 throw t; 206 } 207 if (consumed(df)) { 208 Log.logTrace("responseSubscriber.onComplete"); 209 if (debug.on()) debug.log("incoming: onComplete"); 210 sched.stop(); 211 connection.decrementStreamsCount(streamid); 212 subscriber.onComplete(); 213 onCompleteCalled = true; 214 setEndStreamReceived(); 215 return; 216 } 217 } else { 218 if (stopRequested) break; 219 return; 220 } 221 } 222 } catch (Throwable throwable) { 223 errorRef.compareAndSet(null, throwable); 224 } finally { 225 if (sched.isStopped()) drainInputQueue(); 226 } 227 228 Throwable t = errorRef.get(); 229 if (t != null) { 230 sched.stop(); 231 try { 232 if (!onCompleteCalled) { 233 if (debug.on()) 234 debug.log("calling subscriber.onError: %s", (Object) t); 235 subscriber.onError(t); 236 } else { 237 if (debug.on()) 238 debug.log("already completed: dropping error %s", (Object) t); 239 } 240 } catch (Throwable x) { 241 Log.logError("Subscriber::onError threw exception: {0}", (Object) t); 242 } finally { 243 cancelImpl(t); 244 drainInputQueue(); 245 } 246 } 247 } 248 249 // must only be called from the scheduler schedule() loop. 250 // ensure that all received data frames are accounted for 251 // in the connection window flow control if the scheduler 252 // is stopped before all the data is consumed. 253 private void drainInputQueue() { 254 Http2Frame frame; 255 while ((frame = inputQ.poll()) != null) { 256 if (frame instanceof DataFrame) { 257 connection.dropDataFrame((DataFrame)frame); 258 } 259 } 260 } 261 262 263 // Callback invoked after the Response BodySubscriber has consumed the 264 // buffers contained in a DataFrame. 265 // Returns true if END_STREAM is reached, false otherwise. 266 private boolean consumed(DataFrame df) { 267 // RFC 7540 6.1: 268 // The entire DATA frame payload is included in flow control, 269 // including the Pad Length and Padding fields if present 270 int len = df.payloadLength(); 271 boolean endStream = df.getFlag(DataFrame.END_STREAM); 272 if (len == 0) return endStream; 273 274 connection.windowUpdater.update(len); 275 276 if (!endStream) { 277 // Don't send window update on a stream which is 278 // closed or half closed. 279 windowUpdater.update(len); 280 } 281 282 // true: end of stream; false: more data coming 283 return endStream; 284 } 285 286 boolean deRegister() { 287 return deRegistered.compareAndSet(false, true); 288 } 289 290 @Override 291 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, 292 boolean returnConnectionToPool, 293 Executor executor) 294 { 295 try { 296 Log.logTrace("Reading body on stream {0}", streamid); 297 debug.log("Getting BodySubscriber for: " + response); 298 BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response)); 299 CompletableFuture<T> cf = receiveData(bodySubscriber, executor); 300 301 PushGroup<?> pg = exchange.getPushGroup(); 302 if (pg != null) { 303 // if an error occurs make sure it is recorded in the PushGroup 304 cf = cf.whenComplete((t, e) -> pg.pushError(e)); 305 } 306 return cf; 307 } catch (Throwable t) { 308 // may be thrown by handler.apply 309 cancelImpl(t); 310 return MinimalFuture.failedFuture(t); 311 } 312 } 313 314 @Override 315 public String toString() { 316 StringBuilder sb = new StringBuilder(); 317 sb.append("streamid: ") 318 .append(streamid); 319 return sb.toString(); 320 } 321 322 private void receiveDataFrame(DataFrame df) { 323 inputQ.add(df); 324 sched.runOrSchedule(); 325 } 326 327 /** Handles a RESET frame. RESET is always handled inline in the queue. */ 328 private void receiveResetFrame(ResetFrame frame) { 329 inputQ.add(frame); 330 sched.runOrSchedule(); 331 } 332 333 // pushes entire response body into response subscriber 334 // blocking when required by local or remote flow control 335 CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) { 336 responseBodyCF = new MinimalFuture<>(); 337 // We want to allow the subscriber's getBody() method to block so it 338 // can work with InputStreams. So, we offload execution. 339 executor.execute(() -> { 340 try { 341 bodySubscriber.getBody().whenComplete((T body, Throwable t) -> { 342 if (t == null) 343 responseBodyCF.complete(body); 344 else 345 responseBodyCF.completeExceptionally(t); 346 }); 347 } catch(Throwable t) { 348 cancelImpl(t); 349 } 350 }); 351 352 if (isCanceled()) { 353 Throwable t = getCancelCause(); 354 responseBodyCF.completeExceptionally(t); 355 } else { 356 pendingResponseSubscriber = bodySubscriber; 357 sched.runOrSchedule(); // in case data waiting already to be processed 358 } 359 return responseBodyCF; 360 } 361 362 @Override 363 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 364 return sendBodyImpl().thenApply( v -> this); 365 } 366 367 @SuppressWarnings("unchecked") 368 Stream(Http2Connection connection, 369 Exchange<T> e, 370 WindowController windowController) 371 { 372 super(e); 373 this.connection = connection; 374 this.windowController = windowController; 375 this.request = e.request(); 376 this.requestPublisher = request.requestPublisher; // may be null 377 this.responseHeadersBuilder = new HttpHeadersBuilder(); 378 this.rspHeadersConsumer = new HeadersConsumer(); 379 this.requestPseudoHeaders = createPseudoHeaders(request); 380 this.windowUpdater = new StreamWindowUpdateSender(connection); 381 } 382 383 /** 384 * Entry point from Http2Connection reader thread. 385 * 386 * Data frames will be removed by response body thread. 387 */ 388 void incoming(Http2Frame frame) throws IOException { 389 if (debug.on()) debug.log("incoming: %s", frame); 390 if ((frame instanceof HeaderFrame)) { 391 HeaderFrame hframe = (HeaderFrame)frame; 392 if (hframe.endHeaders()) { 393 Log.logTrace("handling response (streamid={0})", streamid); 394 handleResponse(); 395 if (hframe.getFlag(HeaderFrame.END_STREAM)) { 396 receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of())); 397 } 398 } 399 } else if (frame instanceof DataFrame) { 400 receiveDataFrame((DataFrame)frame); 401 } else { 402 otherFrame(frame); 403 } 404 } 405 406 void otherFrame(Http2Frame frame) throws IOException { 407 switch (frame.type()) { 408 case WindowUpdateFrame.TYPE: 409 incoming_windowUpdate((WindowUpdateFrame) frame); 410 break; 411 case ResetFrame.TYPE: 412 incoming_reset((ResetFrame) frame); 413 break; 414 case PriorityFrame.TYPE: 415 incoming_priority((PriorityFrame) frame); 416 break; 417 default: 418 String msg = "Unexpected frame: " + frame.toString(); 419 throw new IOException(msg); 420 } 421 } 422 423 // The Hpack decoder decodes into one of these consumers of name,value pairs 424 425 DecodingCallback rspHeadersConsumer() { 426 return rspHeadersConsumer; 427 } 428 429 protected void handleResponse() throws IOException { 430 HttpHeaders responseHeaders = responseHeadersBuilder.build(); 431 responseCode = (int)responseHeaders 432 .firstValueAsLong(":status") 433 .orElseThrow(() -> new IOException("no statuscode in response")); 434 435 response = new Response( 436 request, exchange, responseHeaders, connection(), 437 responseCode, HttpClient.Version.HTTP_2); 438 439 /* TODO: review if needs to be removed 440 the value is not used, but in case `content-length` doesn't parse as 441 long, there will be NumberFormatException. If left as is, make sure 442 code up the stack handles NFE correctly. */ 443 responseHeaders.firstValueAsLong("content-length"); 444 445 if (Log.headers()) { 446 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 447 Log.dumpHeaders(sb, " ", responseHeaders); 448 Log.logHeaders(sb.toString()); 449 } 450 451 // this will clear the response headers 452 rspHeadersConsumer.reset(); 453 454 completeResponse(response); 455 } 456 457 void incoming_reset(ResetFrame frame) { 458 Log.logTrace("Received RST_STREAM on stream {0}", streamid); 459 if (endStreamReceived()) { 460 Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); 461 } else if (closed) { 462 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 463 } else { 464 Flow.Subscriber<?> subscriber = 465 responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber; 466 if (response == null && subscriber == null) { 467 // we haven't receive the headers yet, and won't receive any! 468 // handle reset now. 469 handleReset(frame, subscriber); 470 } else { 471 // put it in the input queue in order to read all 472 // pending data frames first. Indeed, a server may send 473 // RST_STREAM after sending END_STREAM, in which case we should 474 // ignore it. However, we won't know if we have received END_STREAM 475 // or not until all pending data frames are read. 476 receiveResetFrame(frame); 477 // RST_STREAM was pushed to the queue. It will be handled by 478 // asyncReceive after all pending data frames have been 479 // processed. 480 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); 481 } 482 } 483 } 484 485 void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) { 486 Log.logTrace("Handling RST_STREAM on stream {0}", streamid); 487 if (!closed) { 488 synchronized (this) { 489 if (closed) { 490 if (debug.on()) debug.log("Stream already closed: ignoring RESET"); 491 return; 492 } 493 closed = true; 494 } 495 try { 496 int error = frame.getErrorCode(); 497 IOException e = new IOException("Received RST_STREAM: " 498 + ErrorFrame.stringForCode(error)); 499 if (errorRef.compareAndSet(null, e)) { 500 if (subscriber != null) { 501 subscriber.onError(e); 502 } 503 } 504 completeResponseExceptionally(e); 505 if (!requestBodyCF.isDone()) { 506 requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body.. 507 } 508 if (responseBodyCF != null) { 509 responseBodyCF.completeExceptionally(errorRef.get()); 510 } 511 } finally { 512 connection.decrementStreamsCount(streamid); 513 connection.closeStream(streamid); 514 } 515 } else { 516 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 517 } 518 } 519 520 void incoming_priority(PriorityFrame frame) { 521 // TODO: implement priority 522 throw new UnsupportedOperationException("Not implemented"); 523 } 524 525 private void incoming_windowUpdate(WindowUpdateFrame frame) 526 throws IOException 527 { 528 int amount = frame.getUpdate(); 529 if (amount <= 0) { 530 Log.logTrace("Resetting stream: {0}, Window Update amount: {1}", 531 streamid, amount); 532 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 533 } else { 534 assert streamid != 0; 535 boolean success = windowController.increaseStreamWindow(amount, streamid); 536 if (!success) { // overflow 537 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 538 } 539 } 540 } 541 542 void incoming_pushPromise(HttpRequestImpl pushRequest, 543 PushedStream<T> pushStream) 544 throws IOException 545 { 546 if (Log.requests()) { 547 Log.logRequest("PUSH_PROMISE: " + pushRequest.toString()); 548 } 549 PushGroup<T> pushGroup = exchange.getPushGroup(); 550 if (pushGroup == null) { 551 Log.logTrace("Rejecting push promise stream " + streamid); 552 connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM); 553 pushStream.close(); 554 return; 555 } 556 557 PushGroup.Acceptor<T> acceptor = null; 558 boolean accepted = false; 559 try { 560 acceptor = pushGroup.acceptPushRequest(pushRequest); 561 accepted = acceptor.accepted(); 562 } catch (Throwable t) { 563 if (debug.on()) 564 debug.log("PushPromiseHandler::applyPushPromise threw exception %s", 565 (Object)t); 566 } 567 if (!accepted) { 568 // cancel / reject 569 IOException ex = new IOException("Stream " + streamid + " cancelled by users handler"); 570 if (Log.trace()) { 571 Log.logTrace("No body subscriber for {0}: {1}", pushRequest, 572 ex.getMessage()); 573 } 574 pushStream.cancelImpl(ex); 575 return; 576 } 577 578 assert accepted && acceptor != null; 579 CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf(); 580 HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler(); 581 assert pushHandler != null; 582 583 pushStream.requestSent(); 584 pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ? 585 // setup housekeeping for when the push is received 586 // TODO: deal with ignoring of CF anti-pattern 587 CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF(); 588 cf.whenComplete((HttpResponse<T> resp, Throwable t) -> { 589 t = Utils.getCompletionCause(t); 590 if (Log.trace()) { 591 Log.logTrace("Push completed on stream {0} for {1}{2}", 592 pushStream.streamid, resp, 593 ((t==null) ? "": " with exception " + t)); 594 } 595 if (t != null) { 596 pushGroup.pushError(t); 597 pushResponseCF.completeExceptionally(t); 598 } else { 599 pushResponseCF.complete(resp); 600 } 601 pushGroup.pushCompleted(); 602 }); 603 604 } 605 606 private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) { 607 HttpHeadersBuilder h = request.getSystemHeadersBuilder(); 608 if (contentLength > 0) { 609 h.setHeader("content-length", Long.toString(contentLength)); 610 } 611 HttpHeaders sysh = filterHeaders(h.build()); 612 HttpHeaders userh = filterHeaders(request.getUserHeaders()); 613 OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this); 614 if (contentLength == 0) { 615 f.setFlag(HeadersFrame.END_STREAM); 616 endStreamSent = true; 617 } 618 return f; 619 } 620 621 private boolean hasProxyAuthorization(HttpHeaders headers) { 622 return headers.firstValue("proxy-authorization") 623 .isPresent(); 624 } 625 626 // Determines whether we need to build a new HttpHeader object. 627 // 628 // Ideally we should pass the filter to OutgoingHeaders refactor the 629 // code that creates the HeaderFrame to honor the filter. 630 // We're not there yet - so depending on the filter we need to 631 // apply and the content of the header we will try to determine 632 // whether anything might need to be filtered. 633 // If nothing needs filtering then we can just use the 634 // original headers. 635 private boolean needsFiltering(HttpHeaders headers, 636 BiPredicate<String, String> filter) { 637 if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) { 638 // we're either connecting or proxying 639 // slight optimization: we only need to filter out 640 // disabled schemes, so if there are none just 641 // pass through. 642 return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER) 643 && hasProxyAuthorization(headers); 644 } else { 645 // we're talking to a server, either directly or through 646 // a tunnel. 647 // Slight optimization: we only need to filter out 648 // proxy authorization headers, so if there are none just 649 // pass through. 650 return hasProxyAuthorization(headers); 651 } 652 } 653 654 private HttpHeaders filterHeaders(HttpHeaders headers) { 655 HttpConnection conn = connection(); 656 BiPredicate<String, String> filter = conn.headerFilter(request); 657 if (needsFiltering(headers, filter)) { 658 return HttpHeaders.of(headers.map(), filter); 659 } 660 return headers; 661 } 662 663 private static HttpHeaders createPseudoHeaders(HttpRequest request) { 664 HttpHeadersBuilder hdrs = new HttpHeadersBuilder(); 665 String method = request.method(); 666 hdrs.setHeader(":method", method); 667 URI uri = request.uri(); 668 hdrs.setHeader(":scheme", uri.getScheme()); 669 // TODO: userinfo deprecated. Needs to be removed 670 hdrs.setHeader(":authority", uri.getAuthority()); 671 // TODO: ensure header names beginning with : not in user headers 672 String query = uri.getRawQuery(); 673 String path = uri.getRawPath(); 674 if (path == null || path.isEmpty()) { 675 if (method.equalsIgnoreCase("OPTIONS")) { 676 path = "*"; 677 } else { 678 path = "/"; 679 } 680 } 681 if (query != null) { 682 path += "?" + query; 683 } 684 hdrs.setHeader(":path", Utils.encode(path)); 685 return hdrs.build(); 686 } 687 688 HttpHeaders getRequestPseudoHeaders() { 689 return requestPseudoHeaders; 690 } 691 692 /** Sets endStreamReceived. Should be called only once. */ 693 void setEndStreamReceived() { 694 assert remotelyClosed == false: "Unexpected endStream already set"; 695 remotelyClosed = true; 696 responseReceived(); 697 } 698 699 /** Tells whether, or not, the END_STREAM Flag has been seen in any frame 700 * received on this stream. */ 701 private boolean endStreamReceived() { 702 return remotelyClosed; 703 } 704 705 @Override 706 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 707 if (debug.on()) debug.log("sendHeadersOnly()"); 708 if (Log.requests() && request != null) { 709 Log.logRequest(request.toString()); 710 } 711 if (requestPublisher != null) { 712 requestContentLen = requestPublisher.contentLength(); 713 } else { 714 requestContentLen = 0; 715 } 716 OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen); 717 connection.sendFrame(f); 718 CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>(); 719 cf.complete(this); // #### good enough for now 720 return cf; 721 } 722 723 @Override 724 void released() { 725 if (streamid > 0) { 726 if (debug.on()) debug.log("Released stream %d", streamid); 727 // remove this stream from the Http2Connection map. 728 connection.decrementStreamsCount(streamid); 729 connection.closeStream(streamid); 730 } else { 731 if (debug.on()) debug.log("Can't release stream %d", streamid); 732 } 733 } 734 735 @Override 736 void completed() { 737 // There should be nothing to do here: the stream should have 738 // been already closed (or will be closed shortly after). 739 } 740 741 void registerStream(int id) { 742 this.streamid = id; 743 connection.putStream(this, streamid); 744 if (debug.on()) debug.log("Registered stream %d", id); 745 } 746 747 void signalWindowUpdate() { 748 RequestSubscriber subscriber = requestSubscriber; 749 assert subscriber != null; 750 if (debug.on()) debug.log("Signalling window update"); 751 subscriber.sendScheduler.runOrSchedule(); 752 } 753 754 static final ByteBuffer COMPLETED = ByteBuffer.allocate(0); 755 class RequestSubscriber implements Flow.Subscriber<ByteBuffer> { 756 // can be < 0 if the actual length is not known. 757 private final long contentLength; 758 private volatile long remainingContentLength; 759 private volatile Subscription subscription; 760 761 // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers. 762 // 1) The data that was published by the request body Publisher, and 763 // 2) the COMPLETED sentinel, since onComplete can be invoked without demand. 764 final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>(); 765 766 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 767 // A scheduler used to honor window updates. Writing must be paused 768 // when the window is exhausted, and resumed when the window acquires 769 // some space. The sendScheduler makes it possible to implement this 770 // behaviour in an asynchronous non-blocking way. 771 // See RequestSubscriber::trySend below. 772 final SequentialScheduler sendScheduler; 773 774 RequestSubscriber(long contentLen) { 775 this.contentLength = contentLen; 776 this.remainingContentLength = contentLen; 777 this.sendScheduler = 778 SequentialScheduler.synchronizedScheduler(this::trySend); 779 } 780 781 @Override 782 public void onSubscribe(Flow.Subscription subscription) { 783 if (this.subscription != null) { 784 throw new IllegalStateException("already subscribed"); 785 } 786 this.subscription = subscription; 787 if (debug.on()) 788 debug.log("RequestSubscriber: onSubscribe, request 1"); 789 subscription.request(1); 790 } 791 792 @Override 793 public void onNext(ByteBuffer item) { 794 if (debug.on()) 795 debug.log("RequestSubscriber: onNext(%d)", item.remaining()); 796 int size = outgoing.size(); 797 assert size == 0 : "non-zero size: " + size; 798 onNextImpl(item); 799 } 800 801 private void onNextImpl(ByteBuffer item) { 802 // Got some more request body bytes to send. 803 if (requestBodyCF.isDone()) { 804 // stream already cancelled, probably in timeout 805 sendScheduler.stop(); 806 subscription.cancel(); 807 return; 808 } 809 outgoing.add(item); 810 sendScheduler.runOrSchedule(); 811 } 812 813 @Override 814 public void onError(Throwable throwable) { 815 if (debug.on()) 816 debug.log(() -> "RequestSubscriber: onError: " + throwable); 817 // ensure that errors are handled within the flow. 818 if (errorRef.compareAndSet(null, throwable)) { 819 sendScheduler.runOrSchedule(); 820 } 821 } 822 823 @Override 824 public void onComplete() { 825 if (debug.on()) debug.log("RequestSubscriber: onComplete"); 826 int size = outgoing.size(); 827 assert size == 0 || size == 1 : "non-zero or one size: " + size; 828 // last byte of request body has been obtained. 829 // ensure that everything is completed within the flow. 830 onNextImpl(COMPLETED); 831 } 832 833 // Attempts to send the data, if any. 834 // Handles errors and completion state. 835 // Pause writing if the send window is exhausted, resume it if the 836 // send window has some bytes that can be acquired. 837 void trySend() { 838 try { 839 // handle errors raised by onError; 840 Throwable t = errorRef.get(); 841 if (t != null) { 842 sendScheduler.stop(); 843 if (requestBodyCF.isDone()) return; 844 subscription.cancel(); 845 requestBodyCF.completeExceptionally(t); 846 cancelImpl(t); 847 return; 848 } 849 850 do { 851 // handle COMPLETED; 852 ByteBuffer item = outgoing.peekFirst(); 853 if (item == null) return; 854 else if (item == COMPLETED) { 855 sendScheduler.stop(); 856 complete(); 857 return; 858 } 859 860 // handle bytes to send downstream 861 while (item.hasRemaining()) { 862 if (debug.on()) debug.log("trySend: %d", item.remaining()); 863 assert !endStreamSent : "internal error, send data after END_STREAM flag"; 864 DataFrame df = getDataFrame(item); 865 if (df == null) { 866 if (debug.on()) 867 debug.log("trySend: can't send yet: %d", item.remaining()); 868 return; // the send window is exhausted: come back later 869 } 870 871 if (contentLength > 0) { 872 remainingContentLength -= df.getDataLength(); 873 if (remainingContentLength < 0) { 874 String msg = connection().getConnectionFlow() 875 + " stream=" + streamid + " " 876 + "[" + Thread.currentThread().getName() + "] " 877 + "Too many bytes in request body. Expected: " 878 + contentLength + ", got: " 879 + (contentLength - remainingContentLength); 880 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 881 throw new IOException(msg); 882 } else if (remainingContentLength == 0) { 883 df.setFlag(DataFrame.END_STREAM); 884 endStreamSent = true; 885 } 886 } 887 if (debug.on()) 888 debug.log("trySend: sending: %d", df.getDataLength()); 889 connection.sendDataFrame(df); 890 } 891 assert !item.hasRemaining(); 892 ByteBuffer b = outgoing.removeFirst(); 893 assert b == item; 894 } while (outgoing.peekFirst() != null); 895 896 if (debug.on()) debug.log("trySend: request 1"); 897 subscription.request(1); 898 } catch (Throwable ex) { 899 if (debug.on()) debug.log("trySend: ", ex); 900 sendScheduler.stop(); 901 subscription.cancel(); 902 requestBodyCF.completeExceptionally(ex); 903 // need to cancel the stream to 1. tell the server 904 // we don't want to receive any more data and 905 // 2. ensure that the operation ref count will be 906 // decremented on the HttpClient. 907 cancelImpl(ex); 908 } 909 } 910 911 private void complete() throws IOException { 912 long remaining = remainingContentLength; 913 long written = contentLength - remaining; 914 if (remaining > 0) { 915 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 916 // let trySend() handle the exception 917 throw new IOException(connection().getConnectionFlow() 918 + " stream=" + streamid + " " 919 + "[" + Thread.currentThread().getName() +"] " 920 + "Too few bytes returned by the publisher (" 921 + written + "/" 922 + contentLength + ")"); 923 } 924 if (!endStreamSent) { 925 endStreamSent = true; 926 connection.sendDataFrame(getEmptyEndStreamDataFrame()); 927 } 928 requestBodyCF.complete(null); 929 } 930 } 931 932 /** 933 * Send a RESET frame to tell server to stop sending data on this stream 934 */ 935 @Override 936 public CompletableFuture<Void> ignoreBody() { 937 try { 938 connection.resetStream(streamid, ResetFrame.STREAM_CLOSED); 939 return MinimalFuture.completedFuture(null); 940 } catch (Throwable e) { 941 Log.logTrace("Error resetting stream {0}", e.toString()); 942 return MinimalFuture.failedFuture(e); 943 } 944 } 945 946 DataFrame getDataFrame(ByteBuffer buffer) { 947 int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); 948 // blocks waiting for stream send window, if exhausted 949 int actualAmount = windowController.tryAcquire(requestAmount, streamid, this); 950 if (actualAmount <= 0) return null; 951 ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer, actualAmount); 952 DataFrame df = new DataFrame(streamid, 0 , outBuf); 953 return df; 954 } 955 956 private DataFrame getEmptyEndStreamDataFrame() { 957 return new DataFrame(streamid, DataFrame.END_STREAM, List.of()); 958 } 959 960 /** 961 * A List of responses relating to this stream. Normally there is only 962 * one response, but intermediate responses like 100 are allowed 963 * and must be passed up to higher level before continuing. Deals with races 964 * such as if responses are returned before the CFs get created by 965 * getResponseAsync() 966 */ 967 968 final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5); 969 970 @Override 971 CompletableFuture<Response> getResponseAsync(Executor executor) { 972 CompletableFuture<Response> cf; 973 // The code below deals with race condition that can be caused when 974 // completeResponse() is being called before getResponseAsync() 975 synchronized (response_cfs) { 976 if (!response_cfs.isEmpty()) { 977 // This CompletableFuture was created by completeResponse(). 978 // it will be already completed. 979 cf = response_cfs.remove(0); 980 // if we find a cf here it should be already completed. 981 // finding a non completed cf should not happen. just assert it. 982 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!"; 983 } else { 984 // getResponseAsync() is called first. Create a CompletableFuture 985 // that will be completed by completeResponse() when 986 // completeResponse() is called. 987 cf = new MinimalFuture<>(); 988 response_cfs.add(cf); 989 } 990 } 991 if (executor != null && !cf.isDone()) { 992 // protect from executing later chain of CompletableFuture operations from SelectorManager thread 993 cf = cf.thenApplyAsync(r -> r, executor); 994 } 995 Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf); 996 PushGroup<?> pg = exchange.getPushGroup(); 997 if (pg != null) { 998 // if an error occurs make sure it is recorded in the PushGroup 999 cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e))); 1000 } 1001 return cf; 1002 } 1003 1004 /** 1005 * Completes the first uncompleted CF on list, and removes it. If there is no 1006 * uncompleted CF then creates one (completes it) and adds to list 1007 */ 1008 void completeResponse(Response resp) { 1009 synchronized (response_cfs) { 1010 CompletableFuture<Response> cf; 1011 int cfs_len = response_cfs.size(); 1012 for (int i=0; i<cfs_len; i++) { 1013 cf = response_cfs.get(i); 1014 if (!cf.isDone()) { 1015 Log.logTrace("Completing response (streamid={0}): {1}", 1016 streamid, cf); 1017 if (debug.on()) 1018 debug.log("Completing responseCF(%d) with response headers", i); 1019 response_cfs.remove(cf); 1020 cf.complete(resp); 1021 return; 1022 } // else we found the previous response: just leave it alone. 1023 } 1024 cf = MinimalFuture.completedFuture(resp); 1025 Log.logTrace("Created completed future (streamid={0}): {1}", 1026 streamid, cf); 1027 if (debug.on()) 1028 debug.log("Adding completed responseCF(0) with response headers"); 1029 response_cfs.add(cf); 1030 } 1031 } 1032 1033 // methods to update state and remove stream when finished 1034 1035 synchronized void requestSent() { 1036 requestSent = true; 1037 if (responseReceived) { 1038 close(); 1039 } 1040 } 1041 1042 synchronized void responseReceived() { 1043 responseReceived = true; 1044 if (requestSent) { 1045 close(); 1046 } 1047 } 1048 1049 /** 1050 * same as above but for errors 1051 */ 1052 void completeResponseExceptionally(Throwable t) { 1053 synchronized (response_cfs) { 1054 // use index to avoid ConcurrentModificationException 1055 // caused by removing the CF from within the loop. 1056 for (int i = 0; i < response_cfs.size(); i++) { 1057 CompletableFuture<Response> cf = response_cfs.get(i); 1058 if (!cf.isDone()) { 1059 response_cfs.remove(i); 1060 cf.completeExceptionally(t); 1061 return; 1062 } 1063 } 1064 response_cfs.add(MinimalFuture.failedFuture(t)); 1065 } 1066 } 1067 1068 CompletableFuture<Void> sendBodyImpl() { 1069 requestBodyCF.whenComplete((v, t) -> requestSent()); 1070 try { 1071 if (requestPublisher != null) { 1072 final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); 1073 requestPublisher.subscribe(requestSubscriber = subscriber); 1074 } else { 1075 // there is no request body, therefore the request is complete, 1076 // END_STREAM has already sent with outgoing headers 1077 requestBodyCF.complete(null); 1078 } 1079 } catch (Throwable t) { 1080 cancelImpl(t); 1081 requestBodyCF.completeExceptionally(t); 1082 } 1083 return requestBodyCF; 1084 } 1085 1086 @Override 1087 void cancel() { 1088 cancel(new IOException("Stream " + streamid + " cancelled")); 1089 } 1090 1091 void onSubscriptionError(Throwable t) { 1092 errorRef.compareAndSet(null, t); 1093 if (debug.on()) debug.log("Got subscription error: %s", (Object)t); 1094 // This is the special case where the subscriber 1095 // has requested an illegal number of items. 1096 // In this case, the error doesn't come from 1097 // upstream, but from downstream, and we need to 1098 // handle the error without waiting for the inputQ 1099 // to be exhausted. 1100 stopRequested = true; 1101 sched.runOrSchedule(); 1102 } 1103 1104 @Override 1105 void cancel(IOException cause) { 1106 cancelImpl(cause); 1107 } 1108 1109 void connectionClosing(Throwable cause) { 1110 Flow.Subscriber<?> subscriber = 1111 responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber; 1112 errorRef.compareAndSet(null, cause); 1113 if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) { 1114 sched.runOrSchedule(); 1115 } else cancelImpl(cause); 1116 } 1117 1118 // This method sends a RST_STREAM frame 1119 void cancelImpl(Throwable e) { 1120 errorRef.compareAndSet(null, e); 1121 if (debug.on()) debug.log("cancelling stream {0}: {1}", streamid, e); 1122 if (Log.trace()) { 1123 Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); 1124 } 1125 boolean closing; 1126 if (closing = !closed) { // assigning closing to !closed 1127 synchronized (this) { 1128 if (closing = !closed) { // assigning closing to !closed 1129 closed=true; 1130 } 1131 } 1132 } 1133 if (closing) { // true if the stream has not been closed yet 1134 if (responseSubscriber != null || pendingResponseSubscriber != null) 1135 sched.runOrSchedule(); 1136 } 1137 completeResponseExceptionally(e); 1138 if (!requestBodyCF.isDone()) { 1139 requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body.. 1140 } 1141 if (responseBodyCF != null) { 1142 responseBodyCF.completeExceptionally(errorRef.get()); 1143 } 1144 try { 1145 // will send a RST_STREAM frame 1146 if (streamid != 0) { 1147 connection.decrementStreamsCount(streamid); 1148 e = Utils.getCompletionCause(e); 1149 if (e instanceof EOFException) { 1150 // read EOF: no need to try & send reset 1151 connection.closeStream(streamid); 1152 } else { 1153 connection.resetStream(streamid, ResetFrame.CANCEL); 1154 } 1155 } 1156 } catch (Throwable ex) { 1157 Log.logError(ex); 1158 } 1159 } 1160 1161 // This method doesn't send any frame 1162 void close() { 1163 if (closed) return; 1164 synchronized(this) { 1165 if (closed) return; 1166 closed = true; 1167 } 1168 Log.logTrace("Closing stream {0}", streamid); 1169 connection.closeStream(streamid); 1170 Log.logTrace("Stream {0} closed", streamid); 1171 } 1172 1173 static class PushedStream<T> extends Stream<T> { 1174 final PushGroup<T> pushGroup; 1175 // push streams need the response CF allocated up front as it is 1176 // given directly to user via the multi handler callback function. 1177 final CompletableFuture<Response> pushCF; 1178 CompletableFuture<HttpResponse<T>> responseCF; 1179 final HttpRequestImpl pushReq; 1180 HttpResponse.BodyHandler<T> pushHandler; 1181 1182 PushedStream(PushGroup<T> pushGroup, 1183 Http2Connection connection, 1184 Exchange<T> pushReq) { 1185 // ## no request body possible, null window controller 1186 super(connection, pushReq, null); 1187 this.pushGroup = pushGroup; 1188 this.pushReq = pushReq.request(); 1189 this.pushCF = new MinimalFuture<>(); 1190 this.responseCF = new MinimalFuture<>(); 1191 1192 } 1193 1194 CompletableFuture<HttpResponse<T>> responseCF() { 1195 return responseCF; 1196 } 1197 1198 synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) { 1199 this.pushHandler = pushHandler; 1200 } 1201 1202 synchronized HttpResponse.BodyHandler<T> getPushHandler() { 1203 // ignored parameters to function can be used as BodyHandler 1204 return this.pushHandler; 1205 } 1206 1207 // Following methods call the super class but in case of 1208 // error record it in the PushGroup. The error method is called 1209 // with a null value when no error occurred (is a no-op) 1210 @Override 1211 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 1212 return super.sendBodyAsync() 1213 .whenComplete((ExchangeImpl<T> v, Throwable t) 1214 -> pushGroup.pushError(Utils.getCompletionCause(t))); 1215 } 1216 1217 @Override 1218 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 1219 return super.sendHeadersAsync() 1220 .whenComplete((ExchangeImpl<T> ex, Throwable t) 1221 -> pushGroup.pushError(Utils.getCompletionCause(t))); 1222 } 1223 1224 @Override 1225 CompletableFuture<Response> getResponseAsync(Executor executor) { 1226 CompletableFuture<Response> cf = pushCF.whenComplete( 1227 (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t))); 1228 if(executor!=null && !cf.isDone()) { 1229 cf = cf.thenApplyAsync( r -> r, executor); 1230 } 1231 return cf; 1232 } 1233 1234 @Override 1235 CompletableFuture<T> readBodyAsync( 1236 HttpResponse.BodyHandler<T> handler, 1237 boolean returnConnectionToPool, 1238 Executor executor) 1239 { 1240 return super.readBodyAsync(handler, returnConnectionToPool, executor) 1241 .whenComplete((v, t) -> pushGroup.pushError(t)); 1242 } 1243 1244 @Override 1245 void completeResponse(Response r) { 1246 Log.logResponse(r::toString); 1247 pushCF.complete(r); // not strictly required for push API 1248 // start reading the body using the obtained BodySubscriber 1249 CompletableFuture<Void> start = new MinimalFuture<>(); 1250 start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor())) 1251 .whenComplete((T body, Throwable t) -> { 1252 if (t != null) { 1253 responseCF.completeExceptionally(t); 1254 } else { 1255 HttpResponseImpl<T> resp = 1256 new HttpResponseImpl<>(r.request, r, null, body, getExchange()); 1257 responseCF.complete(resp); 1258 } 1259 }); 1260 start.completeAsync(() -> null, getExchange().executor()); 1261 } 1262 1263 @Override 1264 void completeResponseExceptionally(Throwable t) { 1265 pushCF.completeExceptionally(t); 1266 } 1267 1268 // @Override 1269 // synchronized void responseReceived() { 1270 // super.responseReceived(); 1271 // } 1272 1273 // create and return the PushResponseImpl 1274 @Override 1275 protected void handleResponse() { 1276 HttpHeaders responseHeaders = responseHeadersBuilder.build(); 1277 responseCode = (int)responseHeaders 1278 .firstValueAsLong(":status") 1279 .orElse(-1); 1280 1281 if (responseCode == -1) { 1282 completeResponseExceptionally(new IOException("No status code")); 1283 } 1284 1285 this.response = new Response( 1286 pushReq, exchange, responseHeaders, connection(), 1287 responseCode, HttpClient.Version.HTTP_2); 1288 1289 /* TODO: review if needs to be removed 1290 the value is not used, but in case `content-length` doesn't parse 1291 as long, there will be NumberFormatException. If left as is, make 1292 sure code up the stack handles NFE correctly. */ 1293 responseHeaders.firstValueAsLong("content-length"); 1294 1295 if (Log.headers()) { 1296 StringBuilder sb = new StringBuilder("RESPONSE HEADERS"); 1297 sb.append(" (streamid=").append(streamid).append("):\n"); 1298 Log.dumpHeaders(sb, " ", responseHeaders); 1299 Log.logHeaders(sb.toString()); 1300 } 1301 1302 rspHeadersConsumer.reset(); 1303 1304 // different implementations for normal streams and pushed streams 1305 completeResponse(response); 1306 } 1307 } 1308 1309 final class StreamWindowUpdateSender extends WindowUpdateSender { 1310 1311 StreamWindowUpdateSender(Http2Connection connection) { 1312 super(connection); 1313 } 1314 1315 @Override 1316 int getStreamId() { 1317 return streamid; 1318 } 1319 1320 @Override 1321 String dbgString() { 1322 String dbg = dbgString; 1323 if (dbg != null) return dbg; 1324 if (streamid == 0) { 1325 return connection.dbgString() + ":WindowUpdateSender(stream: ?)"; 1326 } else { 1327 dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")"; 1328 return dbgString = dbg; 1329 } 1330 } 1331 } 1332 1333 /** 1334 * Returns true if this exchange was canceled. 1335 * @return true if this exchange was canceled. 1336 */ 1337 synchronized boolean isCanceled() { 1338 return errorRef.get() != null; 1339 } 1340 1341 /** 1342 * Returns the cause for which this exchange was canceled, if available. 1343 * @return the cause for which this exchange was canceled, if available. 1344 */ 1345 synchronized Throwable getCancelCause() { 1346 return errorRef.get(); 1347 } 1348 1349 final String dbgString() { 1350 return connection.dbgString() + "/Stream("+streamid+")"; 1351 } 1352 1353 private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer { 1354 1355 void reset() { 1356 super.reset(); 1357 responseHeadersBuilder.clear(); 1358 debug.log("Response builder cleared, ready to receive new headers."); 1359 } 1360 1361 @Override 1362 public void onDecoded(CharSequence name, CharSequence value) 1363 throws UncheckedIOException 1364 { 1365 String n = name.toString(); 1366 String v = value.toString(); 1367 super.onDecoded(n, v); 1368 responseHeadersBuilder.addHeader(n, v); 1369 if (Log.headers() && Log.trace()) { 1370 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}", 1371 streamid, n, v); 1372 } 1373 } 1374 } 1375 }