1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.EOFException;
  29 import java.io.IOException;
  30 import java.io.UncheckedIOException;
  31 import java.net.URI;
  32 import java.nio.ByteBuffer;
  33 import java.util.ArrayList;
  34 import java.util.Collections;
  35 import java.util.List;
  36 import java.util.concurrent.CompletableFuture;
  37 import java.util.concurrent.ConcurrentLinkedDeque;
  38 import java.util.concurrent.ConcurrentLinkedQueue;
  39 import java.util.concurrent.Executor;
  40 import java.util.concurrent.Flow;
  41 import java.util.concurrent.Flow.Subscription;
  42 import java.util.concurrent.atomic.AtomicBoolean;
  43 import java.util.concurrent.atomic.AtomicReference;
  44 import java.util.function.BiPredicate;
  45 import java.net.http.HttpClient;
  46 import java.net.http.HttpHeaders;
  47 import java.net.http.HttpRequest;
  48 import java.net.http.HttpResponse;
  49 import java.net.http.HttpResponse.BodySubscriber;
  50 import jdk.internal.net.http.common.*;
  51 import jdk.internal.net.http.frame.*;
  52 import jdk.internal.net.http.hpack.DecodingCallback;
  53 
  54 /**
  55  * Http/2 Stream handling.
  56  *
  57  * REQUESTS
  58  *
  59  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  60  *
  61  * sendRequest() -- sendHeadersOnly() + sendBody()
  62  *
  63  * sendBodyAsync() -- calls sendBody() in an executor thread.
  64  *
  65  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  66  *
  67  * sendRequestAsync() -- calls sendRequest() in an executor thread
  68  *
  69  * RESPONSES
  70  *
  71  * Multiple responses can be received per request. Responses are queued up on
  72  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
  73  * with the next response
  74  *
  75  * getResponseAsync() -- queries list of response CFs and returns first one
  76  *               if one exists. Otherwise, creates one and adds it to list
  77  *               and returns it. Completion is achieved through the
  78  *               incoming() upcall from connection reader thread.
  79  *
  80  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  81  *
  82  * responseBodyAsync() -- calls responseBody() in an executor thread.
  83  *
  84  * incoming() -- entry point called from connection reader thread. Frames are
  85  *               either handled immediately without blocking or for data frames
  86  *               placed on the stream's inputQ which is consumed by the stream's
  87  *               reader thread.
  88  *
  89  * PushedStream sub class
  90  * ======================
  91  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  92  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  93  * is created. PushedStream does not use responseCF list as there can be only
  94  * one response. The CF is created when the object created and when the response
  95  * HEADERS frame is received the object is completed.
  96  */
  97 class Stream<T> extends ExchangeImpl<T> {
  98 
  99     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
 100 
 101     final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
 102     final SequentialScheduler sched =
 103             SequentialScheduler.synchronizedScheduler(this::schedule);
 104     final SubscriptionBase userSubscription =
 105             new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);
 106 
 107     /**
 108      * This stream's identifier. Assigned lazily by the HTTP2Connection before
 109      * the stream's first frame is sent.
 110      */
 111     protected volatile int streamid;
 112 
 113     long requestContentLen;
 114 
 115     final Http2Connection connection;
 116     final HttpRequestImpl request;
 117     final HeadersConsumer rspHeadersConsumer;
 118     final HttpHeadersBuilder responseHeadersBuilder;
 119     final HttpHeaders requestPseudoHeaders;
 120     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
 121     final HttpRequest.BodyPublisher requestPublisher;
 122     volatile RequestSubscriber requestSubscriber;
 123     volatile int responseCode;
 124     volatile Response response;
 125     // The exception with which this stream was canceled.
 126     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 127     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
 128     volatile CompletableFuture<T> responseBodyCF;
 129     volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;
 130     volatile boolean stopRequested;
 131 
 132     /** True if END_STREAM has been seen in a frame received on this stream. */
 133     private volatile boolean remotelyClosed;
 134     private volatile boolean closed;
 135     private volatile boolean endStreamSent;
 136 
 137     final AtomicBoolean deRegistered = new AtomicBoolean(false);
 138 
 139     // state flags
 140     private boolean requestSent, responseReceived;
 141 
 142     /**
 143      * A reference to this Stream's connection Send Window controller. The
 144      * stream MUST acquire the appropriate amount of Send Window before
 145      * sending any data. Will be null for PushStreams, as they cannot send data.
 146      */
 147     private final WindowController windowController;
 148     private final WindowUpdateSender windowUpdater;
 149 
 150     @Override
 151     HttpConnection connection() {
 152         return connection.connection;
 153     }
 154 
 155     /**
 156      * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
 157      * of after user subscription window has re-opened, from SubscriptionBase.request()
 158      */
 159     private void schedule() {
 160         boolean onCompleteCalled = false;
 161         HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
 162         try {
 163             if (subscriber == null) {
 164                 subscriber = responseSubscriber = pendingResponseSubscriber;
 165                 if (subscriber == null) {
 166                     // can't process anything yet
 167                     return;
 168                 } else {
 169                     if (debug.on()) debug.log("subscribing user subscriber");
 170                     subscriber.onSubscribe(userSubscription);
 171                 }
 172             }
 173             while (!inputQ.isEmpty()) {
 174                 Http2Frame frame = inputQ.peek();
 175                 if (frame instanceof ResetFrame) {
 176                     inputQ.remove();
 177                     handleReset((ResetFrame)frame, subscriber);
 178                     return;
 179                 }
 180                 DataFrame df = (DataFrame)frame;
 181                 boolean finished = df.getFlag(DataFrame.END_STREAM);
 182 
 183                 List<ByteBuffer> buffers = df.getData();
 184                 List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
 185                 int size = Utils.remaining(dsts, Integer.MAX_VALUE);
 186                 if (size == 0 && finished) {
 187                     inputQ.remove();
 188                     connection.ensureWindowUpdated(df); // must update connection window
 189                     Log.logTrace("responseSubscriber.onComplete");
 190                     if (debug.on()) debug.log("incoming: onComplete");
 191                     sched.stop();
 192                     connection.decrementStreamsCount(streamid);
 193                     subscriber.onComplete();
 194                     onCompleteCalled = true;
 195                     setEndStreamReceived();
 196                     return;
 197                 } else if (userSubscription.tryDecrement()) {
 198                     inputQ.remove();
 199                     Log.logTrace("responseSubscriber.onNext {0}", size);
 200                     if (debug.on()) debug.log("incoming: onNext(%d)", size);
 201                     try {
 202                         subscriber.onNext(dsts);
 203                     } catch (Throwable t) {
 204                         connection.dropDataFrame(df); // must update connection window
 205                         throw t;
 206                     }
 207                     if (consumed(df)) {
 208                         Log.logTrace("responseSubscriber.onComplete");
 209                         if (debug.on()) debug.log("incoming: onComplete");
 210                         sched.stop();
 211                         connection.decrementStreamsCount(streamid);
 212                         subscriber.onComplete();
 213                         onCompleteCalled = true;
 214                         setEndStreamReceived();
 215                         return;
 216                     }
 217                 } else {
 218                     if (stopRequested) break;
 219                     return;
 220                 }
 221             }
 222         } catch (Throwable throwable) {
 223             errorRef.compareAndSet(null, throwable);
 224         } finally {
 225             if (sched.isStopped()) drainInputQueue();
 226         }
 227 
 228         Throwable t = errorRef.get();
 229         if (t != null) {
 230             sched.stop();
 231             try {
 232                 if (!onCompleteCalled) {
 233                     if (debug.on())
 234                         debug.log("calling subscriber.onError: %s", (Object) t);
 235                     subscriber.onError(t);
 236                 } else {
 237                     if (debug.on())
 238                         debug.log("already completed: dropping error %s", (Object) t);
 239                 }
 240             } catch (Throwable x) {
 241                 Log.logError("Subscriber::onError threw exception: {0}", (Object) t);
 242             } finally {
 243                 cancelImpl(t);
 244                 drainInputQueue();
 245             }
 246         }
 247     }
 248 
 249     // must only be called from the scheduler schedule() loop.
 250     // ensure that all received data frames are accounted for
 251     // in the connection window flow control if the scheduler
 252     // is stopped before all the data is consumed.
 253     private void drainInputQueue() {
 254         Http2Frame frame;
 255         while ((frame = inputQ.poll()) != null) {
 256             if (frame instanceof DataFrame) {
 257                 connection.dropDataFrame((DataFrame)frame);
 258             }
 259         }
 260     }
 261 
 262 
 263     // Callback invoked after the Response BodySubscriber has consumed the
 264     // buffers contained in a DataFrame.
 265     // Returns true if END_STREAM is reached, false otherwise.
 266     private boolean consumed(DataFrame df) {
 267         // RFC 7540 6.1:
 268         // The entire DATA frame payload is included in flow control,
 269         // including the Pad Length and Padding fields if present
 270         int len = df.payloadLength();
 271         boolean endStream = df.getFlag(DataFrame.END_STREAM);
 272         if (len == 0) return endStream;
 273 
 274         connection.windowUpdater.update(len);
 275 
 276         if (!endStream) {
 277             // Don't send window update on a stream which is
 278             // closed or half closed.
 279             windowUpdater.update(len);
 280         }
 281 
 282         // true: end of stream; false: more data coming
 283         return endStream;
 284     }
 285 
 286     boolean deRegister() {
 287         return deRegistered.compareAndSet(false, true);
 288     }
 289 
 290     @Override
 291     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
 292                                        boolean returnConnectionToPool,
 293                                        Executor executor)
 294     {
 295         try {
 296             Log.logTrace("Reading body on stream {0}", streamid);
 297             debug.log("Getting BodySubscriber for: " + response);
 298             BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response));
 299             CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
 300 
 301             PushGroup<?> pg = exchange.getPushGroup();
 302             if (pg != null) {
 303                 // if an error occurs make sure it is recorded in the PushGroup
 304                 cf = cf.whenComplete((t, e) -> pg.pushError(e));
 305             }
 306             return cf;
 307         } catch (Throwable t) {
 308             // may be thrown by handler.apply
 309             cancelImpl(t);
 310             return MinimalFuture.failedFuture(t);
 311         }
 312     }
 313 
 314     @Override
 315     public String toString() {
 316         StringBuilder sb = new StringBuilder();
 317         sb.append("streamid: ")
 318                 .append(streamid);
 319         return sb.toString();
 320     }
 321 
 322     private void receiveDataFrame(DataFrame df) {
 323         inputQ.add(df);
 324         sched.runOrSchedule();
 325     }
 326 
 327     /** Handles a RESET frame. RESET is always handled inline in the queue. */
 328     private void receiveResetFrame(ResetFrame frame) {
 329         inputQ.add(frame);
 330         sched.runOrSchedule();
 331     }
 332 
 333     // pushes entire response body into response subscriber
 334     // blocking when required by local or remote flow control
 335     CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
 336         responseBodyCF = new MinimalFuture<>();
 337         // We want to allow the subscriber's getBody() method to block so it
 338         // can work with InputStreams. So, we offload execution.
 339         executor.execute(() -> {
 340             try {
 341                 bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
 342                     if (t == null)
 343                         responseBodyCF.complete(body);
 344                     else
 345                         responseBodyCF.completeExceptionally(t);
 346                 });
 347             } catch(Throwable t) {
 348                 cancelImpl(t);
 349             }
 350         });
 351 
 352         if (isCanceled()) {
 353             Throwable t = getCancelCause();
 354             responseBodyCF.completeExceptionally(t);
 355         } else {
 356             pendingResponseSubscriber = bodySubscriber;
 357             sched.runOrSchedule(); // in case data waiting already to be processed
 358         }
 359         return responseBodyCF;
 360     }
 361 
 362     @Override
 363     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 364         return sendBodyImpl().thenApply( v -> this);
 365     }
 366 
 367     @SuppressWarnings("unchecked")
 368     Stream(Http2Connection connection,
 369            Exchange<T> e,
 370            WindowController windowController)
 371     {
 372         super(e);
 373         this.connection = connection;
 374         this.windowController = windowController;
 375         this.request = e.request();
 376         this.requestPublisher = request.requestPublisher;  // may be null
 377         this.responseHeadersBuilder = new HttpHeadersBuilder();
 378         this.rspHeadersConsumer = new HeadersConsumer();
 379         this.requestPseudoHeaders = createPseudoHeaders(request);
 380         this.windowUpdater = new StreamWindowUpdateSender(connection);
 381     }
 382 
 383     /**
 384      * Entry point from Http2Connection reader thread.
 385      *
 386      * Data frames will be removed by response body thread.
 387      */
 388     void incoming(Http2Frame frame) throws IOException {
 389         if (debug.on()) debug.log("incoming: %s", frame);
 390         if ((frame instanceof HeaderFrame)) {
 391             HeaderFrame hframe = (HeaderFrame)frame;
 392             if (hframe.endHeaders()) {
 393                 Log.logTrace("handling response (streamid={0})", streamid);
 394                 handleResponse();
 395                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
 396                     receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
 397                 }
 398             }
 399         } else if (frame instanceof DataFrame) {
 400             receiveDataFrame((DataFrame)frame);
 401         } else {
 402             otherFrame(frame);
 403         }
 404     }
 405 
 406     void otherFrame(Http2Frame frame) throws IOException {
 407         switch (frame.type()) {
 408             case WindowUpdateFrame.TYPE:
 409                 incoming_windowUpdate((WindowUpdateFrame) frame);
 410                 break;
 411             case ResetFrame.TYPE:
 412                 incoming_reset((ResetFrame) frame);
 413                 break;
 414             case PriorityFrame.TYPE:
 415                 incoming_priority((PriorityFrame) frame);
 416                 break;
 417             default:
 418                 String msg = "Unexpected frame: " + frame.toString();
 419                 throw new IOException(msg);
 420         }
 421     }
 422 
 423     // The Hpack decoder decodes into one of these consumers of name,value pairs
 424 
 425     DecodingCallback rspHeadersConsumer() {
 426         return rspHeadersConsumer;
 427     }
 428 
 429     protected void handleResponse() throws IOException {
 430         HttpHeaders responseHeaders = responseHeadersBuilder.build();
 431         responseCode = (int)responseHeaders
 432                 .firstValueAsLong(":status")
 433                 .orElseThrow(() -> new IOException("no statuscode in response"));
 434 
 435         response = new Response(
 436                 request, exchange, responseHeaders, connection(),
 437                 responseCode, HttpClient.Version.HTTP_2);
 438 
 439         /* TODO: review if needs to be removed
 440            the value is not used, but in case `content-length` doesn't parse as
 441            long, there will be NumberFormatException. If left as is, make sure
 442            code up the stack handles NFE correctly. */
 443         responseHeaders.firstValueAsLong("content-length");
 444 
 445         if (Log.headers()) {
 446             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 447             Log.dumpHeaders(sb, "    ", responseHeaders);
 448             Log.logHeaders(sb.toString());
 449         }
 450 
 451         // this will clear the response headers
 452         rspHeadersConsumer.reset();
 453 
 454         completeResponse(response);
 455     }
 456 
 457     void incoming_reset(ResetFrame frame) {
 458         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
 459         if (endStreamReceived()) {
 460             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
 461         } else if (closed) {
 462             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 463         } else {
 464             Flow.Subscriber<?> subscriber =
 465                     responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
 466             if (response == null && subscriber == null) {
 467                 // we haven't receive the headers yet, and won't receive any!
 468                 // handle reset now.
 469                 handleReset(frame, subscriber);
 470             } else {
 471                 // put it in the input queue in order to read all
 472                 // pending data frames first. Indeed, a server may send
 473                 // RST_STREAM after sending END_STREAM, in which case we should
 474                 // ignore it. However, we won't know if we have received END_STREAM
 475                 // or not until all pending data frames are read.
 476                 receiveResetFrame(frame);
 477                 // RST_STREAM was pushed to the queue. It will be handled by
 478                 // asyncReceive after all pending data frames have been
 479                 // processed.
 480                 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
 481             }
 482         }
 483     }
 484 
 485     void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
 486         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
 487         if (!closed) {
 488             synchronized (this) {
 489                 if (closed) {
 490                     if (debug.on()) debug.log("Stream already closed: ignoring RESET");
 491                     return;
 492                 }
 493                 closed = true;
 494             }
 495             try {
 496                 int error = frame.getErrorCode();
 497                 IOException e = new IOException("Received RST_STREAM: "
 498                         + ErrorFrame.stringForCode(error));
 499                 if (errorRef.compareAndSet(null, e)) {
 500                     if (subscriber != null) {
 501                         subscriber.onError(e);
 502                     }
 503                 }
 504                 completeResponseExceptionally(e);
 505                 if (!requestBodyCF.isDone()) {
 506                     requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
 507                 }
 508                 if (responseBodyCF != null) {
 509                     responseBodyCF.completeExceptionally(errorRef.get());
 510                 }
 511             } finally {
 512                 connection.decrementStreamsCount(streamid);
 513                 connection.closeStream(streamid);
 514             }
 515         } else {
 516             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 517         }
 518     }
 519 
 520     void incoming_priority(PriorityFrame frame) {
 521         // TODO: implement priority
 522         throw new UnsupportedOperationException("Not implemented");
 523     }
 524 
 525     private void incoming_windowUpdate(WindowUpdateFrame frame)
 526         throws IOException
 527     {
 528         int amount = frame.getUpdate();
 529         if (amount <= 0) {
 530             Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",
 531                          streamid, amount);
 532             connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 533         } else {
 534             assert streamid != 0;
 535             boolean success = windowController.increaseStreamWindow(amount, streamid);
 536             if (!success) {  // overflow
 537                 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 538             }
 539         }
 540     }
 541 
 542     void incoming_pushPromise(HttpRequestImpl pushRequest,
 543                               PushedStream<T> pushStream)
 544         throws IOException
 545     {
 546         if (Log.requests()) {
 547             Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
 548         }
 549         PushGroup<T> pushGroup = exchange.getPushGroup();
 550         if (pushGroup == null) {
 551             Log.logTrace("Rejecting push promise stream " + streamid);
 552             connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
 553             pushStream.close();
 554             return;
 555         }
 556 
 557         PushGroup.Acceptor<T> acceptor = null;
 558         boolean accepted = false;
 559         try {
 560             acceptor = pushGroup.acceptPushRequest(pushRequest);
 561             accepted = acceptor.accepted();
 562         } catch (Throwable t) {
 563             if (debug.on())
 564                 debug.log("PushPromiseHandler::applyPushPromise threw exception %s",
 565                           (Object)t);
 566         }
 567         if (!accepted) {
 568             // cancel / reject
 569             IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
 570             if (Log.trace()) {
 571                 Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
 572                         ex.getMessage());
 573             }
 574             pushStream.cancelImpl(ex);
 575             return;
 576         }
 577 
 578         assert accepted && acceptor != null;
 579         CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
 580         HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
 581         assert pushHandler != null;
 582 
 583         pushStream.requestSent();
 584         pushStream.setPushHandler(pushHandler);  // TODO: could wrap the handler to throw on acceptPushPromise ?
 585         // setup housekeeping for when the push is received
 586         // TODO: deal with ignoring of CF anti-pattern
 587         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
 588         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
 589             t = Utils.getCompletionCause(t);
 590             if (Log.trace()) {
 591                 Log.logTrace("Push completed on stream {0} for {1}{2}",
 592                              pushStream.streamid, resp,
 593                              ((t==null) ? "": " with exception " + t));
 594             }
 595             if (t != null) {
 596                 pushGroup.pushError(t);
 597                 pushResponseCF.completeExceptionally(t);
 598             } else {
 599                 pushResponseCF.complete(resp);
 600             }
 601             pushGroup.pushCompleted();
 602         });
 603 
 604     }
 605 
 606     private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
 607         HttpHeadersBuilder h = request.getSystemHeadersBuilder();
 608         if (contentLength > 0) {
 609             h.setHeader("content-length", Long.toString(contentLength));
 610         }
 611         HttpHeaders sysh = filterHeaders(h.build());
 612         HttpHeaders userh = filterHeaders(request.getUserHeaders());
 613         OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
 614         if (contentLength == 0) {
 615             f.setFlag(HeadersFrame.END_STREAM);
 616             endStreamSent = true;
 617         }
 618         return f;
 619     }
 620 
 621     private boolean hasProxyAuthorization(HttpHeaders headers) {
 622         return headers.firstValue("proxy-authorization")
 623                       .isPresent();
 624     }
 625 
 626     // Determines whether we need to build a new HttpHeader object.
 627     //
 628     // Ideally we should pass the filter to OutgoingHeaders refactor the
 629     // code that creates the HeaderFrame to honor the filter.
 630     // We're not there yet - so depending on the filter we need to
 631     // apply and the content of the header we will try to determine
 632     //  whether anything might need to be filtered.
 633     // If nothing needs filtering then we can just use the
 634     // original headers.
 635     private boolean needsFiltering(HttpHeaders headers,
 636                                    BiPredicate<String, String> filter) {
 637         if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
 638             // we're either connecting or proxying
 639             // slight optimization: we only need to filter out
 640             // disabled schemes, so if there are none just
 641             // pass through.
 642             return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
 643                     && hasProxyAuthorization(headers);
 644         } else {
 645             // we're talking to a server, either directly or through
 646             // a tunnel.
 647             // Slight optimization: we only need to filter out
 648             // proxy authorization headers, so if there are none just
 649             // pass through.
 650             return hasProxyAuthorization(headers);
 651         }
 652     }
 653 
 654     private HttpHeaders filterHeaders(HttpHeaders headers) {
 655         HttpConnection conn = connection();
 656         BiPredicate<String, String> filter = conn.headerFilter(request);
 657         if (needsFiltering(headers, filter)) {
 658             return HttpHeaders.of(headers.map(), filter);
 659         }
 660         return headers;
 661     }
 662 
 663     private static HttpHeaders createPseudoHeaders(HttpRequest request) {
 664         HttpHeadersBuilder hdrs = new HttpHeadersBuilder();
 665         String method = request.method();
 666         hdrs.setHeader(":method", method);
 667         URI uri = request.uri();
 668         hdrs.setHeader(":scheme", uri.getScheme());
 669         // TODO: userinfo deprecated. Needs to be removed
 670         hdrs.setHeader(":authority", uri.getAuthority());
 671         // TODO: ensure header names beginning with : not in user headers
 672         String query = uri.getRawQuery();
 673         String path = uri.getRawPath();
 674         if (path == null || path.isEmpty()) {
 675             if (method.equalsIgnoreCase("OPTIONS")) {
 676                 path = "*";
 677             } else {
 678                 path = "/";
 679             }
 680         }
 681         if (query != null) {
 682             path += "?" + query;
 683         }
 684         hdrs.setHeader(":path", Utils.encode(path));
 685         return hdrs.build();
 686     }
 687 
 688     HttpHeaders getRequestPseudoHeaders() {
 689         return requestPseudoHeaders;
 690     }
 691 
 692     /** Sets endStreamReceived. Should be called only once. */
 693     void setEndStreamReceived() {
 694         assert remotelyClosed == false: "Unexpected endStream already set";
 695         remotelyClosed = true;
 696         responseReceived();
 697     }
 698 
 699     /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
 700      *  received on this stream. */
 701     private boolean endStreamReceived() {
 702         return remotelyClosed;
 703     }
 704 
 705     @Override
 706     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 707         if (debug.on()) debug.log("sendHeadersOnly()");
 708         if (Log.requests() && request != null) {
 709             Log.logRequest(request.toString());
 710         }
 711         if (requestPublisher != null) {
 712             requestContentLen = requestPublisher.contentLength();
 713         } else {
 714             requestContentLen = 0;
 715         }
 716         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
 717         connection.sendFrame(f);
 718         CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
 719         cf.complete(this);  // #### good enough for now
 720         return cf;
 721     }
 722 
 723     @Override
 724     void released() {
 725         if (streamid > 0) {
 726             if (debug.on()) debug.log("Released stream %d", streamid);
 727             // remove this stream from the Http2Connection map.
 728             connection.decrementStreamsCount(streamid);
 729             connection.closeStream(streamid);
 730         } else {
 731             if (debug.on()) debug.log("Can't release stream %d", streamid);
 732         }
 733     }
 734 
 735     @Override
 736     void completed() {
 737         // There should be nothing to do here: the stream should have
 738         // been already closed (or will be closed shortly after).
 739     }
 740 
 741     void registerStream(int id) {
 742         this.streamid = id;
 743         connection.putStream(this, streamid);
 744         if (debug.on()) debug.log("Registered stream %d", id);
 745     }
 746 
 747     void signalWindowUpdate() {
 748         RequestSubscriber subscriber = requestSubscriber;
 749         assert subscriber != null;
 750         if (debug.on()) debug.log("Signalling window update");
 751         subscriber.sendScheduler.runOrSchedule();
 752     }
 753 
 754     static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
 755     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
 756         // can be < 0 if the actual length is not known.
 757         private final long contentLength;
 758         private volatile long remainingContentLength;
 759         private volatile Subscription subscription;
 760 
 761         // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
 762         //  1) The data that was published by the request body Publisher, and
 763         //  2) the COMPLETED sentinel, since onComplete can be invoked without demand.
 764         final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
 765 
 766         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 767         // A scheduler used to honor window updates. Writing must be paused
 768         // when the window is exhausted, and resumed when the window acquires
 769         // some space. The sendScheduler makes it possible to implement this
 770         // behaviour in an asynchronous non-blocking way.
 771         // See RequestSubscriber::trySend below.
 772         final SequentialScheduler sendScheduler;
 773 
 774         RequestSubscriber(long contentLen) {
 775             this.contentLength = contentLen;
 776             this.remainingContentLength = contentLen;
 777             this.sendScheduler =
 778                     SequentialScheduler.synchronizedScheduler(this::trySend);
 779         }
 780 
 781         @Override
 782         public void onSubscribe(Flow.Subscription subscription) {
 783             if (this.subscription != null) {
 784                 throw new IllegalStateException("already subscribed");
 785             }
 786             this.subscription = subscription;
 787             if (debug.on())
 788                 debug.log("RequestSubscriber: onSubscribe, request 1");
 789             subscription.request(1);
 790         }
 791 
 792         @Override
 793         public void onNext(ByteBuffer item) {
 794             if (debug.on())
 795                 debug.log("RequestSubscriber: onNext(%d)", item.remaining());
 796             int size = outgoing.size();
 797             assert size == 0 : "non-zero size: " + size;
 798             onNextImpl(item);
 799         }
 800 
 801         private void onNextImpl(ByteBuffer item) {
 802             // Got some more request body bytes to send.
 803             if (requestBodyCF.isDone()) {
 804                 // stream already cancelled, probably in timeout
 805                 sendScheduler.stop();
 806                 subscription.cancel();
 807                 return;
 808             }
 809             outgoing.add(item);
 810             sendScheduler.runOrSchedule();
 811         }
 812 
 813         @Override
 814         public void onError(Throwable throwable) {
 815             if (debug.on())
 816                 debug.log(() -> "RequestSubscriber: onError: " + throwable);
 817             // ensure that errors are handled within the flow.
 818             if (errorRef.compareAndSet(null, throwable)) {
 819                 sendScheduler.runOrSchedule();
 820             }
 821         }
 822 
 823         @Override
 824         public void onComplete() {
 825             if (debug.on()) debug.log("RequestSubscriber: onComplete");
 826             int size = outgoing.size();
 827             assert size == 0 || size == 1 : "non-zero or one size: " + size;
 828             // last byte of request body has been obtained.
 829             // ensure that everything is completed within the flow.
 830             onNextImpl(COMPLETED);
 831         }
 832 
 833         // Attempts to send the data, if any.
 834         // Handles errors and completion state.
 835         // Pause writing if the send window is exhausted, resume it if the
 836         // send window has some bytes that can be acquired.
 837         void trySend() {
 838             try {
 839                 // handle errors raised by onError;
 840                 Throwable t = errorRef.get();
 841                 if (t != null) {
 842                     sendScheduler.stop();
 843                     if (requestBodyCF.isDone()) return;
 844                     subscription.cancel();
 845                     requestBodyCF.completeExceptionally(t);
 846                     cancelImpl(t);
 847                     return;
 848                 }
 849 
 850                 do {
 851                     // handle COMPLETED;
 852                     ByteBuffer item = outgoing.peekFirst();
 853                     if (item == null) return;
 854                     else if (item == COMPLETED) {
 855                         sendScheduler.stop();
 856                         complete();
 857                         return;
 858                     }
 859 
 860                     // handle bytes to send downstream
 861                     while (item.hasRemaining()) {
 862                         if (debug.on()) debug.log("trySend: %d", item.remaining());
 863                         assert !endStreamSent : "internal error, send data after END_STREAM flag";
 864                         DataFrame df = getDataFrame(item);
 865                         if (df == null) {
 866                             if (debug.on())
 867                                 debug.log("trySend: can't send yet: %d", item.remaining());
 868                             return; // the send window is exhausted: come back later
 869                         }
 870 
 871                         if (contentLength > 0) {
 872                             remainingContentLength -= df.getDataLength();
 873                             if (remainingContentLength < 0) {
 874                                 String msg = connection().getConnectionFlow()
 875                                         + " stream=" + streamid + " "
 876                                         + "[" + Thread.currentThread().getName() + "] "
 877                                         + "Too many bytes in request body. Expected: "
 878                                         + contentLength + ", got: "
 879                                         + (contentLength - remainingContentLength);
 880                                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 881                                 throw new IOException(msg);
 882                             } else if (remainingContentLength == 0) {
 883                                 df.setFlag(DataFrame.END_STREAM);
 884                                 endStreamSent = true;
 885                             }
 886                         }
 887                         if (debug.on())
 888                             debug.log("trySend: sending: %d", df.getDataLength());
 889                         connection.sendDataFrame(df);
 890                     }
 891                     assert !item.hasRemaining();
 892                     ByteBuffer b = outgoing.removeFirst();
 893                     assert b == item;
 894                 } while (outgoing.peekFirst() != null);
 895 
 896                 if (debug.on()) debug.log("trySend: request 1");
 897                 subscription.request(1);
 898             } catch (Throwable ex) {
 899                 if (debug.on()) debug.log("trySend: ", ex);
 900                 sendScheduler.stop();
 901                 subscription.cancel();
 902                 requestBodyCF.completeExceptionally(ex);
 903                 // need to cancel the stream to 1. tell the server
 904                 // we don't want to receive any more data and
 905                 // 2. ensure that the operation ref count will be
 906                 // decremented on the HttpClient.
 907                 cancelImpl(ex);
 908             }
 909         }
 910 
 911         private void complete() throws IOException {
 912             long remaining = remainingContentLength;
 913             long written = contentLength - remaining;
 914             if (remaining > 0) {
 915                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 916                 // let trySend() handle the exception
 917                 throw new IOException(connection().getConnectionFlow()
 918                                      + " stream=" + streamid + " "
 919                                      + "[" + Thread.currentThread().getName() +"] "
 920                                      + "Too few bytes returned by the publisher ("
 921                                               + written + "/"
 922                                               + contentLength + ")");
 923             }
 924             if (!endStreamSent) {
 925                 endStreamSent = true;
 926                 connection.sendDataFrame(getEmptyEndStreamDataFrame());
 927             }
 928             requestBodyCF.complete(null);
 929         }
 930     }
 931 
 932     /**
 933      * Send a RESET frame to tell server to stop sending data on this stream
 934      */
 935     @Override
 936     public CompletableFuture<Void> ignoreBody() {
 937         try {
 938             connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
 939             return MinimalFuture.completedFuture(null);
 940         } catch (Throwable e) {
 941             Log.logTrace("Error resetting stream {0}", e.toString());
 942             return MinimalFuture.failedFuture(e);
 943         }
 944     }
 945 
 946     DataFrame getDataFrame(ByteBuffer buffer) {
 947         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
 948         // blocks waiting for stream send window, if exhausted
 949         int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
 950         if (actualAmount <= 0) return null;
 951         ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer,  actualAmount);
 952         DataFrame df = new DataFrame(streamid, 0 , outBuf);
 953         return df;
 954     }
 955 
 956     private DataFrame getEmptyEndStreamDataFrame()  {
 957         return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
 958     }
 959 
 960     /**
 961      * A List of responses relating to this stream. Normally there is only
 962      * one response, but intermediate responses like 100 are allowed
 963      * and must be passed up to higher level before continuing. Deals with races
 964      * such as if responses are returned before the CFs get created by
 965      * getResponseAsync()
 966      */
 967 
 968     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
 969 
 970     @Override
 971     CompletableFuture<Response> getResponseAsync(Executor executor) {
 972         CompletableFuture<Response> cf;
 973         // The code below deals with race condition that can be caused when
 974         // completeResponse() is being called before getResponseAsync()
 975         synchronized (response_cfs) {
 976             if (!response_cfs.isEmpty()) {
 977                 // This CompletableFuture was created by completeResponse().
 978                 // it will be already completed.
 979                 cf = response_cfs.remove(0);
 980                 // if we find a cf here it should be already completed.
 981                 // finding a non completed cf should not happen. just assert it.
 982                 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
 983             } else {
 984                 // getResponseAsync() is called first. Create a CompletableFuture
 985                 // that will be completed by completeResponse() when
 986                 // completeResponse() is called.
 987                 cf = new MinimalFuture<>();
 988                 response_cfs.add(cf);
 989             }
 990         }
 991         if (executor != null && !cf.isDone()) {
 992             // protect from executing later chain of CompletableFuture operations from SelectorManager thread
 993             cf = cf.thenApplyAsync(r -> r, executor);
 994         }
 995         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
 996         PushGroup<?> pg = exchange.getPushGroup();
 997         if (pg != null) {
 998             // if an error occurs make sure it is recorded in the PushGroup
 999             cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
1000         }
1001         return cf;
1002     }
1003 
1004     /**
1005      * Completes the first uncompleted CF on list, and removes it. If there is no
1006      * uncompleted CF then creates one (completes it) and adds to list
1007      */
1008     void completeResponse(Response resp) {
1009         synchronized (response_cfs) {
1010             CompletableFuture<Response> cf;
1011             int cfs_len = response_cfs.size();
1012             for (int i=0; i<cfs_len; i++) {
1013                 cf = response_cfs.get(i);
1014                 if (!cf.isDone()) {
1015                     Log.logTrace("Completing response (streamid={0}): {1}",
1016                                  streamid, cf);
1017                     if (debug.on())
1018                         debug.log("Completing responseCF(%d) with response headers", i);
1019                     response_cfs.remove(cf);
1020                     cf.complete(resp);
1021                     return;
1022                 } // else we found the previous response: just leave it alone.
1023             }
1024             cf = MinimalFuture.completedFuture(resp);
1025             Log.logTrace("Created completed future (streamid={0}): {1}",
1026                          streamid, cf);
1027             if (debug.on())
1028                 debug.log("Adding completed responseCF(0) with response headers");
1029             response_cfs.add(cf);
1030         }
1031     }
1032 
1033     // methods to update state and remove stream when finished
1034 
1035     synchronized void requestSent() {
1036         requestSent = true;
1037         if (responseReceived) {
1038             close();
1039         }
1040     }
1041 
1042     synchronized void responseReceived() {
1043         responseReceived = true;
1044         if (requestSent) {
1045             close();
1046         }
1047     }
1048 
1049     /**
1050      * same as above but for errors
1051      */
1052     void completeResponseExceptionally(Throwable t) {
1053         synchronized (response_cfs) {
1054             // use index to avoid ConcurrentModificationException
1055             // caused by removing the CF from within the loop.
1056             for (int i = 0; i < response_cfs.size(); i++) {
1057                 CompletableFuture<Response> cf = response_cfs.get(i);
1058                 if (!cf.isDone()) {
1059                     response_cfs.remove(i);
1060                     cf.completeExceptionally(t);
1061                     return;
1062                 }
1063             }
1064             response_cfs.add(MinimalFuture.failedFuture(t));
1065         }
1066     }
1067 
1068     CompletableFuture<Void> sendBodyImpl() {
1069         requestBodyCF.whenComplete((v, t) -> requestSent());
1070         try {
1071             if (requestPublisher != null) {
1072                 final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
1073                 requestPublisher.subscribe(requestSubscriber = subscriber);
1074             } else {
1075                 // there is no request body, therefore the request is complete,
1076                 // END_STREAM has already sent with outgoing headers
1077                 requestBodyCF.complete(null);
1078             }
1079         } catch (Throwable t) {
1080             cancelImpl(t);
1081             requestBodyCF.completeExceptionally(t);
1082         }
1083         return requestBodyCF;
1084     }
1085 
1086     @Override
1087     void cancel() {
1088         cancel(new IOException("Stream " + streamid + " cancelled"));
1089     }
1090 
1091     void onSubscriptionError(Throwable t) {
1092         errorRef.compareAndSet(null, t);
1093         if (debug.on()) debug.log("Got subscription error: %s", (Object)t);
1094         // This is the special case where the subscriber
1095         // has requested an illegal number of items.
1096         // In this case, the error doesn't come from
1097         // upstream, but from downstream, and we need to
1098         // handle the error without waiting for the inputQ
1099         // to be exhausted.
1100         stopRequested = true;
1101         sched.runOrSchedule();
1102     }
1103 
1104     @Override
1105     void cancel(IOException cause) {
1106         cancelImpl(cause);
1107     }
1108 
1109     void connectionClosing(Throwable cause) {
1110         Flow.Subscriber<?> subscriber =
1111                 responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
1112         errorRef.compareAndSet(null, cause);
1113         if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) {
1114             sched.runOrSchedule();
1115         } else cancelImpl(cause);
1116     }
1117 
1118     // This method sends a RST_STREAM frame
1119     void cancelImpl(Throwable e) {
1120         errorRef.compareAndSet(null, e);
1121         if (debug.on()) debug.log("cancelling stream {0}: {1}", streamid, e);
1122         if (Log.trace()) {
1123             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
1124         }
1125         boolean closing;
1126         if (closing = !closed) { // assigning closing to !closed
1127             synchronized (this) {
1128                 if (closing = !closed) { // assigning closing to !closed
1129                     closed=true;
1130                 }
1131             }
1132         }
1133         if (closing) { // true if the stream has not been closed yet
1134             if (responseSubscriber != null || pendingResponseSubscriber != null)
1135                 sched.runOrSchedule();
1136         }
1137         completeResponseExceptionally(e);
1138         if (!requestBodyCF.isDone()) {
1139             requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
1140         }
1141         if (responseBodyCF != null) {
1142             responseBodyCF.completeExceptionally(errorRef.get());
1143         }
1144         try {
1145             // will send a RST_STREAM frame
1146             if (streamid != 0) {
1147                 connection.decrementStreamsCount(streamid);
1148                 e = Utils.getCompletionCause(e);
1149                 if (e instanceof EOFException) {
1150                     // read EOF: no need to try & send reset
1151                     connection.closeStream(streamid);
1152                 } else {
1153                     connection.resetStream(streamid, ResetFrame.CANCEL);
1154                 }
1155             }
1156         } catch (Throwable ex) {
1157             Log.logError(ex);
1158         }
1159     }
1160 
1161     // This method doesn't send any frame
1162     void close() {
1163         if (closed) return;
1164         synchronized(this) {
1165             if (closed) return;
1166             closed = true;
1167         }
1168         Log.logTrace("Closing stream {0}", streamid);
1169         connection.closeStream(streamid);
1170         Log.logTrace("Stream {0} closed", streamid);
1171     }
1172 
1173     static class PushedStream<T> extends Stream<T> {
1174         final PushGroup<T> pushGroup;
1175         // push streams need the response CF allocated up front as it is
1176         // given directly to user via the multi handler callback function.
1177         final CompletableFuture<Response> pushCF;
1178         CompletableFuture<HttpResponse<T>> responseCF;
1179         final HttpRequestImpl pushReq;
1180         HttpResponse.BodyHandler<T> pushHandler;
1181 
1182         PushedStream(PushGroup<T> pushGroup,
1183                      Http2Connection connection,
1184                      Exchange<T> pushReq) {
1185             // ## no request body possible, null window controller
1186             super(connection, pushReq, null);
1187             this.pushGroup = pushGroup;
1188             this.pushReq = pushReq.request();
1189             this.pushCF = new MinimalFuture<>();
1190             this.responseCF = new MinimalFuture<>();
1191 
1192         }
1193 
1194         CompletableFuture<HttpResponse<T>> responseCF() {
1195             return responseCF;
1196         }
1197 
1198         synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
1199             this.pushHandler = pushHandler;
1200         }
1201 
1202         synchronized HttpResponse.BodyHandler<T> getPushHandler() {
1203             // ignored parameters to function can be used as BodyHandler
1204             return this.pushHandler;
1205         }
1206 
1207         // Following methods call the super class but in case of
1208         // error record it in the PushGroup. The error method is called
1209         // with a null value when no error occurred (is a no-op)
1210         @Override
1211         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
1212             return super.sendBodyAsync()
1213                         .whenComplete((ExchangeImpl<T> v, Throwable t)
1214                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
1215         }
1216 
1217         @Override
1218         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
1219             return super.sendHeadersAsync()
1220                         .whenComplete((ExchangeImpl<T> ex, Throwable t)
1221                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
1222         }
1223 
1224         @Override
1225         CompletableFuture<Response> getResponseAsync(Executor executor) {
1226             CompletableFuture<Response> cf = pushCF.whenComplete(
1227                     (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
1228             if(executor!=null && !cf.isDone()) {
1229                 cf  = cf.thenApplyAsync( r -> r, executor);
1230             }
1231             return cf;
1232         }
1233 
1234         @Override
1235         CompletableFuture<T> readBodyAsync(
1236                 HttpResponse.BodyHandler<T> handler,
1237                 boolean returnConnectionToPool,
1238                 Executor executor)
1239         {
1240             return super.readBodyAsync(handler, returnConnectionToPool, executor)
1241                         .whenComplete((v, t) -> pushGroup.pushError(t));
1242         }
1243 
1244         @Override
1245         void completeResponse(Response r) {
1246             Log.logResponse(r::toString);
1247             pushCF.complete(r); // not strictly required for push API
1248             // start reading the body using the obtained BodySubscriber
1249             CompletableFuture<Void> start = new MinimalFuture<>();
1250             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
1251                 .whenComplete((T body, Throwable t) -> {
1252                     if (t != null) {
1253                         responseCF.completeExceptionally(t);
1254                     } else {
1255                         HttpResponseImpl<T> resp =
1256                                 new HttpResponseImpl<>(r.request, r, null, body, getExchange());
1257                         responseCF.complete(resp);
1258                     }
1259                 });
1260             start.completeAsync(() -> null, getExchange().executor());
1261         }
1262 
1263         @Override
1264         void completeResponseExceptionally(Throwable t) {
1265             pushCF.completeExceptionally(t);
1266         }
1267 
1268 //        @Override
1269 //        synchronized void responseReceived() {
1270 //            super.responseReceived();
1271 //        }
1272 
1273         // create and return the PushResponseImpl
1274         @Override
1275         protected void handleResponse() {
1276             HttpHeaders responseHeaders = responseHeadersBuilder.build();
1277             responseCode = (int)responseHeaders
1278                 .firstValueAsLong(":status")
1279                 .orElse(-1);
1280 
1281             if (responseCode == -1) {
1282                 completeResponseExceptionally(new IOException("No status code"));
1283             }
1284 
1285             this.response = new Response(
1286                 pushReq, exchange, responseHeaders, connection(),
1287                 responseCode, HttpClient.Version.HTTP_2);
1288 
1289             /* TODO: review if needs to be removed
1290                the value is not used, but in case `content-length` doesn't parse
1291                as long, there will be NumberFormatException. If left as is, make
1292                sure code up the stack handles NFE correctly. */
1293             responseHeaders.firstValueAsLong("content-length");
1294 
1295             if (Log.headers()) {
1296                 StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
1297                 sb.append(" (streamid=").append(streamid).append("):\n");
1298                 Log.dumpHeaders(sb, "    ", responseHeaders);
1299                 Log.logHeaders(sb.toString());
1300             }
1301 
1302             rspHeadersConsumer.reset();
1303 
1304             // different implementations for normal streams and pushed streams
1305             completeResponse(response);
1306         }
1307     }
1308 
1309     final class StreamWindowUpdateSender extends WindowUpdateSender {
1310 
1311         StreamWindowUpdateSender(Http2Connection connection) {
1312             super(connection);
1313         }
1314 
1315         @Override
1316         int getStreamId() {
1317             return streamid;
1318         }
1319 
1320         @Override
1321         String dbgString() {
1322             String dbg = dbgString;
1323             if (dbg != null) return dbg;
1324             if (streamid == 0) {
1325                 return connection.dbgString() + ":WindowUpdateSender(stream: ?)";
1326             } else {
1327                 dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";
1328                 return dbgString = dbg;
1329             }
1330         }
1331     }
1332 
1333     /**
1334      * Returns true if this exchange was canceled.
1335      * @return true if this exchange was canceled.
1336      */
1337     synchronized boolean isCanceled() {
1338         return errorRef.get() != null;
1339     }
1340 
1341     /**
1342      * Returns the cause for which this exchange was canceled, if available.
1343      * @return the cause for which this exchange was canceled, if available.
1344      */
1345     synchronized Throwable getCancelCause() {
1346         return errorRef.get();
1347     }
1348 
1349     final String dbgString() {
1350         return connection.dbgString() + "/Stream("+streamid+")";
1351     }
1352 
1353     private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer {
1354 
1355         void reset() {
1356             super.reset();
1357             responseHeadersBuilder.clear();
1358             debug.log("Response builder cleared, ready to receive new headers.");
1359         }
1360 
1361         @Override
1362         public void onDecoded(CharSequence name, CharSequence value)
1363             throws UncheckedIOException
1364         {
1365             String n = name.toString();
1366             String v = value.toString();
1367             super.onDecoded(n, v);
1368             responseHeadersBuilder.addHeader(n, v);
1369             if (Log.headers() && Log.trace()) {
1370                 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
1371                              streamid, n, v);
1372             }
1373         }
1374     }
1375 }