1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.URI;
  31 import java.nio.ByteBuffer;
  32 import java.util.ArrayList;
  33 import java.util.Collections;
  34 import java.util.List;
  35 import java.util.Optional;
  36 import java.util.concurrent.CompletableFuture;
  37 import java.util.concurrent.ConcurrentLinkedDeque;
  38 import java.util.concurrent.ConcurrentLinkedQueue;
  39 import java.util.concurrent.Executor;
  40 import java.util.concurrent.Flow;
  41 import java.util.concurrent.Flow.Subscription;
  42 import java.util.concurrent.atomic.AtomicReference;
  43 import jdk.incubator.http.HttpResponse.BodySubscriber;
  44 import jdk.incubator.http.internal.common.*;
  45 import jdk.incubator.http.internal.frame.*;
  46 import jdk.incubator.http.internal.hpack.DecodingCallback;
  47 
  48 /**
  49  * Http/2 Stream handling.
  50  *
  51  * REQUESTS
  52  *
  53  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  54  *
  55  * sendRequest() -- sendHeadersOnly() + sendBody()
  56  *
  57  * sendBodyAsync() -- calls sendBody() in an executor thread.
  58  *
  59  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  60  *
  61  * sendRequestAsync() -- calls sendRequest() in an executor thread
  62  *
  63  * RESPONSES
  64  *
  65  * Multiple responses can be received per request. Responses are queued up on
  66  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
  67  * with the next response
  68  *
  69  * getResponseAsync() -- queries list of response CFs and returns first one
  70  *               if one exists. Otherwise, creates one and adds it to list
  71  *               and returns it. Completion is achieved through the
  72  *               incoming() upcall from connection reader thread.
  73  *
  74  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  75  *
  76  * responseBodyAsync() -- calls responseBody() in an executor thread.
  77  *
  78  * incoming() -- entry point called from connection reader thread. Frames are
  79  *               either handled immediately without blocking or for data frames
  80  *               placed on the stream's inputQ which is consumed by the stream's
  81  *               reader thread.
  82  *
  83  * PushedStream sub class
  84  * ======================
  85  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  86  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  87  * is created. PushedStream does not use responseCF list as there can be only
  88  * one response. The CF is created when the object created and when the response
  89  * HEADERS frame is received the object is completed.
  90  */
  91 class Stream<T> extends ExchangeImpl<T> {
  92 
  93     final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag
  94     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
  95 
  96     final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
  97     final SequentialScheduler sched =
  98             SequentialScheduler.synchronizedScheduler(this::schedule);
  99     final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
 100 
 101     /**
 102      * This stream's identifier. Assigned lazily by the HTTP2Connection before
 103      * the stream's first frame is sent.
 104      */
 105     protected volatile int streamid;
 106 
 107     long requestContentLen;
 108 
 109     final Http2Connection connection;
 110     final HttpRequestImpl request;
 111     final DecodingCallback rspHeadersConsumer;
 112     HttpHeadersImpl responseHeaders;
 113     final HttpHeadersImpl requestPseudoHeaders;
 114     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
 115     final HttpRequest.BodyPublisher requestPublisher;
 116     volatile RequestSubscriber requestSubscriber;
 117     volatile int responseCode;
 118     volatile Response response;
 119     volatile Throwable failed; // The exception with which this stream was canceled.
 120     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
 121     volatile CompletableFuture<T> responseBodyCF;
 122 
 123     /** True if END_STREAM has been seen in a frame received on this stream. */
 124     private volatile boolean remotelyClosed;
 125     private volatile boolean closed;
 126     private volatile boolean endStreamSent;
 127 
 128     // state flags
 129     private boolean requestSent, responseReceived;
 130 
 131     /**
 132      * A reference to this Stream's connection Send Window controller. The
 133      * stream MUST acquire the appropriate amount of Send Window before
 134      * sending any data. Will be null for PushStreams, as they cannot send data.
 135      */
 136     private final WindowController windowController;
 137     private final WindowUpdateSender windowUpdater;
 138 
 139     @Override
 140     HttpConnection connection() {
 141         return connection.connection;
 142     }
 143 
 144     /**
 145      * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
 146      * of after user subscription window has re-opened, from SubscriptionBase.request()
 147      */
 148     private void schedule() {
 149         if (responseSubscriber == null)
 150             // can't process anything yet
 151             return;
 152 
 153         while (!inputQ.isEmpty()) {
 154             Http2Frame frame  = inputQ.peek();
 155             if (frame instanceof ResetFrame) {
 156                 inputQ.remove();
 157                 handleReset((ResetFrame)frame);
 158                 return;
 159             }
 160             DataFrame df = (DataFrame)frame;
 161             boolean finished = df.getFlag(DataFrame.END_STREAM);
 162 
 163             List<ByteBuffer> buffers = df.getData();
 164             List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
 165             int size = Utils.remaining(dsts, Integer.MAX_VALUE);
 166             if (size == 0 && finished) {
 167                 inputQ.remove();
 168                 Log.logTrace("responseSubscriber.onComplete");
 169                 debug.log(Level.DEBUG, "incoming: onComplete");
 170                 sched.stop();
 171                 responseSubscriber.onComplete();
 172                 setEndStreamReceived();
 173                 return;
 174             } else if (userSubscription.tryDecrement()) {
 175                 inputQ.remove();
 176                 Log.logTrace("responseSubscriber.onNext {0}", size);
 177                 debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
 178                 responseSubscriber.onNext(dsts);
 179                 if (consumed(df)) {
 180                     Log.logTrace("responseSubscriber.onComplete");
 181                     debug.log(Level.DEBUG, "incoming: onComplete");
 182                     sched.stop();
 183                     responseSubscriber.onComplete();
 184                     setEndStreamReceived();
 185                     return;
 186                 }
 187             } else {
 188                 return;
 189             }
 190         }
 191         Throwable t = failed;
 192         if (t != null) {
 193             sched.stop();
 194             responseSubscriber.onError(t);
 195             close();
 196         }
 197     }
 198 
 199     // Callback invoked after the Response BodySubscriber has consumed the
 200     // buffers contained in a DataFrame.
 201     // Returns true if END_STREAM is reached, false otherwise.
 202     private boolean consumed(DataFrame df) {
 203         // RFC 7540 6.1:
 204         // The entire DATA frame payload is included in flow control,
 205         // including the Pad Length and Padding fields if present
 206         int len = df.payloadLength();
 207         connection.windowUpdater.update(len);
 208 
 209         if (!df.getFlag(DataFrame.END_STREAM)) {
 210             // Don't send window update on a stream which is
 211             // closed or half closed.
 212             windowUpdater.update(len);
 213             return false; // more data coming
 214         }
 215         return true; // end of stream
 216     }
 217 
 218     @Override
 219     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
 220                                        boolean returnConnectionToPool,
 221                                        Executor executor)
 222     {
 223         Log.logTrace("Reading body on stream {0}", streamid);
 224         BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
 225         CompletableFuture<T> cf = receiveData(bodySubscriber);
 226 
 227         PushGroup<?,?> pg = exchange.getPushGroup();
 228         if (pg != null) {
 229             // if an error occurs make sure it is recorded in the PushGroup
 230             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 231         }
 232         return cf;
 233     }
 234 
 235     @Override
 236     public String toString() {
 237         StringBuilder sb = new StringBuilder();
 238         sb.append("streamid: ")
 239                 .append(streamid);
 240         return sb.toString();
 241     }
 242 
 243     private void receiveDataFrame(DataFrame df) {
 244         inputQ.add(df);
 245         sched.runOrSchedule();
 246     }
 247 
 248     /** Handles a RESET frame. RESET is always handled inline in the queue. */
 249     private void receiveResetFrame(ResetFrame frame) {
 250         inputQ.add(frame);
 251         sched.runOrSchedule();
 252     }
 253 
 254     // pushes entire response body into response subscriber
 255     // blocking when required by local or remote flow control
 256     CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
 257         responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
 258 
 259         if (isCanceled()) {
 260             Throwable t = getCancelCause();
 261             responseBodyCF.completeExceptionally(t);
 262         } else {
 263             bodySubscriber.onSubscribe(userSubscription);
 264         }
 265         // Set the responseSubscriber field now that onSubscribe has been called.
 266         // This effectively allows the scheduler to start invoking the callbacks.
 267         responseSubscriber = bodySubscriber;
 268         sched.runOrSchedule(); // in case data waiting already to be processed
 269         return responseBodyCF;
 270     }
 271 
 272     @Override
 273     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 274         return sendBodyImpl().thenApply( v -> this);
 275     }
 276 
 277     @SuppressWarnings("unchecked")
 278     Stream(Http2Connection connection,
 279            Exchange<T> e,
 280            WindowController windowController)
 281     {
 282         super(e);
 283         this.connection = connection;
 284         this.windowController = windowController;
 285         this.request = e.request();
 286         this.requestPublisher = request.requestPublisher;  // may be null
 287         responseHeaders = new HttpHeadersImpl();
 288         rspHeadersConsumer = (name, value) -> {
 289             responseHeaders.addHeader(name.toString(), value.toString());
 290             if (Log.headers() && Log.trace()) {
 291                 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
 292                              streamid, name, value);
 293             }
 294         };
 295         this.requestPseudoHeaders = new HttpHeadersImpl();
 296         // NEW
 297         this.windowUpdater = new StreamWindowUpdateSender(connection);
 298     }
 299 
 300     /**
 301      * Entry point from Http2Connection reader thread.
 302      *
 303      * Data frames will be removed by response body thread.
 304      */
 305     void incoming(Http2Frame frame) throws IOException {
 306         debug.log(Level.DEBUG, "incoming: %s", frame);
 307         if ((frame instanceof HeaderFrame)) {
 308             HeaderFrame hframe = (HeaderFrame)frame;
 309             if (hframe.endHeaders()) {
 310                 Log.logTrace("handling response (streamid={0})", streamid);
 311                 handleResponse();
 312                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
 313                     receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
 314                 }
 315             }
 316         } else if (frame instanceof DataFrame) {
 317             receiveDataFrame((DataFrame)frame);
 318         } else {
 319             otherFrame(frame);
 320         }
 321     }
 322 
 323     void otherFrame(Http2Frame frame) throws IOException {
 324         switch (frame.type()) {
 325             case WindowUpdateFrame.TYPE:
 326                 incoming_windowUpdate((WindowUpdateFrame) frame);
 327                 break;
 328             case ResetFrame.TYPE:
 329                 incoming_reset((ResetFrame) frame);
 330                 break;
 331             case PriorityFrame.TYPE:
 332                 incoming_priority((PriorityFrame) frame);
 333                 break;
 334             default:
 335                 String msg = "Unexpected frame: " + frame.toString();
 336                 throw new IOException(msg);
 337         }
 338     }
 339 
 340     // The Hpack decoder decodes into one of these consumers of name,value pairs
 341 
 342     DecodingCallback rspHeadersConsumer() {
 343         return rspHeadersConsumer;
 344     }
 345 
 346     protected void handleResponse() throws IOException {
 347         responseCode = (int)responseHeaders
 348                 .firstValueAsLong(":status")
 349                 .orElseThrow(() -> new IOException("no statuscode in response"));
 350 
 351         response = new Response(
 352                 request, exchange, responseHeaders,
 353                 responseCode, HttpClient.Version.HTTP_2);
 354 
 355         /* TODO: review if needs to be removed
 356            the value is not used, but in case `content-length` doesn't parse as
 357            long, there will be NumberFormatException. If left as is, make sure
 358            code up the stack handles NFE correctly. */
 359         responseHeaders.firstValueAsLong("content-length");
 360 
 361         if (Log.headers()) {
 362             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 363             Log.dumpHeaders(sb, "    ", responseHeaders);
 364             Log.logHeaders(sb.toString());
 365         }
 366 
 367         completeResponse(response);
 368     }
 369 
 370     void incoming_reset(ResetFrame frame) {
 371         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
 372         if (endStreamReceived()) {
 373             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
 374         } else if (closed) {
 375             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 376         } else {
 377             // put it in the input queue in order to read all
 378             // pending data frames first. Indeed, a server may send
 379             // RST_STREAM after sending END_STREAM, in which case we should
 380             // ignore it. However, we won't know if we have received END_STREAM
 381             // or not until all pending data frames are read.
 382             receiveResetFrame(frame);
 383             // RST_STREAM was pushed to the queue. It will be handled by
 384             // asyncReceive after all pending data frames have been
 385             // processed.
 386             Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
 387         }
 388     }
 389 
 390     void handleReset(ResetFrame frame) {
 391         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
 392         if (!closed) {
 393             close();
 394             int error = frame.getErrorCode();
 395             completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
 396         } else {
 397             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 398         }
 399     }
 400 
 401     void incoming_priority(PriorityFrame frame) {
 402         // TODO: implement priority
 403         throw new UnsupportedOperationException("Not implemented");
 404     }
 405 
 406     private void incoming_windowUpdate(WindowUpdateFrame frame)
 407         throws IOException
 408     {
 409         int amount = frame.getUpdate();
 410         if (amount <= 0) {
 411             Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
 412                          streamid, streamid, amount);
 413             connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 414         } else {
 415             assert streamid != 0;
 416             boolean success = windowController.increaseStreamWindow(amount, streamid);
 417             if (!success) {  // overflow
 418                 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 419             }
 420         }
 421     }
 422 
 423     void incoming_pushPromise(HttpRequestImpl pushReq,
 424                               PushedStream<?,T> pushStream)
 425         throws IOException
 426     {
 427         if (Log.requests()) {
 428             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 429         }
 430         PushGroup<?,T> pushGroup = exchange.getPushGroup();
 431         if (pushGroup == null || pushGroup.noMorePushes()) {
 432             cancelImpl(new IllegalStateException("unexpected push promise"
 433                 + " on stream " + streamid));
 434             return;
 435         }
 436 
 437         HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber();
 438 
 439         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
 440 
 441         Optional<HttpResponse.BodyHandler<T>> bpOpt =
 442                 pushGroup.handlerForPushRequest(pushReq);
 443 
 444         if (!bpOpt.isPresent()) {
 445             IOException ex = new IOException("Stream "
 446                  + streamid + " cancelled by user");
 447             if (Log.trace()) {
 448                 Log.logTrace("No body subscriber for {0}: {1}", pushReq,
 449                             ex.getMessage());
 450             }
 451             pushStream.cancelImpl(ex);
 452             cf.completeExceptionally(ex);
 453             return;
 454         }
 455 
 456         pushGroup.addPush();
 457         pushStream.requestSent();
 458         pushStream.setPushHandler(bpOpt.get());
 459         // setup housekeeping for when the push is received
 460         // TODO: deal with ignoring of CF anti-pattern
 461         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
 462             t = Utils.getCompletionCause(t);
 463             if (Log.trace()) {
 464                 Log.logTrace("Push completed on stream {0} for {1}{2}",
 465                              pushStream.streamid, resp,
 466                              ((t==null) ? "": " with exception " + t));
 467             }
 468             if (t != null) {
 469                 pushGroup.pushError(t);
 470                 proc.onError(pushReq, t);
 471             } else {
 472                 proc.onResponse(resp);
 473             }
 474             pushGroup.pushCompleted();
 475         });
 476 
 477     }
 478 
 479     private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
 480         HttpHeadersImpl h = request.getSystemHeaders();
 481         if (contentLength > 0) {
 482             h.setHeader("content-length", Long.toString(contentLength));
 483         }
 484         setPseudoHeaderFields();
 485         OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(h, request.getUserHeaders(), this);
 486         if (contentLength == 0) {
 487             f.setFlag(HeadersFrame.END_STREAM);
 488             endStreamSent = true;
 489         }
 490         return f;
 491     }
 492 
 493     private void setPseudoHeaderFields() {
 494         HttpHeadersImpl hdrs = requestPseudoHeaders;
 495         String method = request.method();
 496         hdrs.setHeader(":method", method);
 497         URI uri = request.uri();
 498         hdrs.setHeader(":scheme", uri.getScheme());
 499         // TODO: userinfo deprecated. Needs to be removed
 500         hdrs.setHeader(":authority", uri.getAuthority());
 501         // TODO: ensure header names beginning with : not in user headers
 502         String query = uri.getQuery();
 503         String path = uri.getPath();
 504         if (path == null || path.isEmpty()) {
 505             if (method.equalsIgnoreCase("OPTIONS")) {
 506                 path = "*";
 507             } else {
 508                 path = "/";
 509             }
 510         }
 511         if (query != null) {
 512             path += "?" + query;
 513         }
 514         hdrs.setHeader(":path", path);
 515     }
 516 
 517     HttpHeadersImpl getRequestPseudoHeaders() {
 518         return requestPseudoHeaders;
 519     }
 520 
 521     /** Sets endStreamReceived. Should be called only once. */
 522     void setEndStreamReceived() {
 523         assert remotelyClosed == false: "Unexpected endStream already set";
 524         remotelyClosed = true;
 525         responseReceived();
 526     }
 527 
 528     /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
 529      *  received on this stream. */
 530     private boolean endStreamReceived() {
 531         return remotelyClosed;
 532     }
 533 
 534     @Override
 535     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 536         debug.log(Level.DEBUG, "sendHeadersOnly()");
 537         if (Log.requests() && request != null) {
 538             Log.logRequest(request.toString());
 539         }
 540         if (requestPublisher != null) {
 541             requestContentLen = requestPublisher.contentLength();
 542         } else {
 543             requestContentLen = 0;
 544         }
 545         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
 546         connection.sendFrame(f);
 547         CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
 548         cf.complete(this);  // #### good enough for now
 549         return cf;
 550     }
 551 
 552     @Override
 553     void released() {
 554         if (streamid > 0) {
 555             debug.log(Level.DEBUG, "Released stream %d", streamid);
 556             // remove this stream from the Http2Connection map.
 557             connection.closeStream(streamid);
 558         } else {
 559             debug.log(Level.DEBUG, "Can't release stream %d", streamid);
 560         }
 561     }
 562 
 563     @Override
 564     void completed() {
 565         // There should be nothing to do here: the stream should have
 566         // been already closed (or will be closed shortly after).
 567     }
 568 
 569     void registerStream(int id) {
 570         this.streamid = id;
 571         connection.putStream(this, streamid);
 572         debug.log(Level.DEBUG, "Registered stream %d", id);
 573     }
 574 
 575     void signalWindowUpdate() {
 576         RequestSubscriber subscriber = requestSubscriber;
 577         assert subscriber != null;
 578         debug.log(Level.DEBUG, "Signalling window update");
 579         subscriber.sendScheduler.runOrSchedule();
 580     }
 581 
 582     static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
 583     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
 584         // can be < 0 if the actual length is not known.
 585         private final long contentLength;
 586         private volatile long remainingContentLength;
 587         private volatile Subscription subscription;
 588 
 589         // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
 590         //  1) The data that was published by the request body Publisher, and
 591         //  2) the COMPLETED sentinel, since onComplete can be invoked without demand.
 592         final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
 593 
 594         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 595         // A scheduler used to honor window updates. Writing must be paused
 596         // when the window is exhausted, and resumed when the window acquires
 597         // some space. The sendScheduler makes it possible to implement this
 598         // behaviour in an asynchronous non-blocking way.
 599         // See RequestSubscriber::trySend below.
 600         final SequentialScheduler sendScheduler;
 601 
 602         RequestSubscriber(long contentLen) {
 603             this.contentLength = contentLen;
 604             this.remainingContentLength = contentLen;
 605             this.sendScheduler =
 606                     SequentialScheduler.synchronizedScheduler(this::trySend);
 607         }
 608 
 609         @Override
 610         public void onSubscribe(Flow.Subscription subscription) {
 611             if (this.subscription != null) {
 612                 throw new IllegalStateException("already subscribed");
 613             }
 614             this.subscription = subscription;
 615             debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1");
 616             subscription.request(1);
 617         }
 618 
 619         @Override
 620         public void onNext(ByteBuffer item) {
 621             debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining());
 622             int size = outgoing.size();
 623             assert size == 0 : "non-zero size: " + size;
 624             onNextImpl(item);
 625         }
 626 
 627         private void onNextImpl(ByteBuffer item) {
 628             // Got some more request body bytes to send.
 629             if (requestBodyCF.isDone()) {
 630                 // stream already cancelled, probably in timeout
 631                 sendScheduler.stop();
 632                 subscription.cancel();
 633                 return;
 634             }
 635             outgoing.add(item);
 636             sendScheduler.runOrSchedule();
 637         }
 638 
 639         @Override
 640         public void onError(Throwable throwable) {
 641             debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable);
 642             // ensure that errors are handled within the flow.
 643             if (errorRef.compareAndSet(null, throwable)) {
 644                 sendScheduler.runOrSchedule();
 645             }
 646         }
 647 
 648         @Override
 649         public void onComplete() {
 650             debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
 651             int size = outgoing.size();
 652             assert size == 0 || size == 1 : "non-zero or one size: " + size;
 653             // last byte of request body has been obtained.
 654             // ensure that everything is completed within the flow.
 655             onNextImpl(COMPLETED);
 656         }
 657 
 658         // Attempts to send the data, if any.
 659         // Handles errors and completion state.
 660         // Pause writing if the send window is exhausted, resume it if the
 661         // send window has some bytes that can be acquired.
 662         void trySend() {
 663             try {
 664                 // handle errors raised by onError;
 665                 Throwable t = errorRef.get();
 666                 if (t != null) {
 667                     sendScheduler.stop();
 668                     if (requestBodyCF.isDone()) return;
 669                     subscription.cancel();
 670                     requestBodyCF.completeExceptionally(t);
 671                     return;
 672                 }
 673 
 674                 do {
 675                     // handle COMPLETED;
 676                     ByteBuffer item = outgoing.peekFirst();
 677                     if (item == null) return;
 678                     else if (item == COMPLETED) {
 679                         sendScheduler.stop();
 680                         complete();
 681                         return;
 682                     }
 683 
 684                     // handle bytes to send downstream
 685                     while (item.hasRemaining()) {
 686                         debug.log(Level.DEBUG, "trySend: %d", item.remaining());
 687                         assert !endStreamSent : "internal error, send data after END_STREAM flag";
 688                         DataFrame df = getDataFrame(item);
 689                         if (df == null) {
 690                             debug.log(Level.DEBUG, "trySend: can't send yet: %d",
 691                                     item.remaining());
 692                             return; // the send window is exhausted: come back later
 693                         }
 694 
 695                         if (contentLength > 0) {
 696                             remainingContentLength -= df.getDataLength();
 697                             if (remainingContentLength < 0) {
 698                                 String msg = connection().getConnectionFlow()
 699                                         + " stream=" + streamid + " "
 700                                         + "[" + Thread.currentThread().getName() + "] "
 701                                         + "Too many bytes in request body. Expected: "
 702                                         + contentLength + ", got: "
 703                                         + (contentLength - remainingContentLength);
 704                                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 705                                 throw new IOException(msg);
 706                             } else if (remainingContentLength == 0) {
 707                                 df.setFlag(DataFrame.END_STREAM);
 708                                 endStreamSent = true;
 709                             }
 710                         }
 711                         debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
 712                         connection.sendDataFrame(df);
 713                     }
 714                     assert !item.hasRemaining();
 715                     ByteBuffer b = outgoing.removeFirst();
 716                     assert b == item;
 717                 } while (outgoing.peekFirst() != null);
 718 
 719                 debug.log(Level.DEBUG, "trySend: request 1");
 720                 subscription.request(1);
 721             } catch (Throwable ex) {
 722                 debug.log(Level.DEBUG, "trySend: ", ex);
 723                 sendScheduler.stop();
 724                 subscription.cancel();
 725                 requestBodyCF.completeExceptionally(ex);
 726             }
 727         }
 728 
 729         private void complete() throws IOException {
 730             long remaining = remainingContentLength;
 731             long written = contentLength - remaining;
 732             if (remaining > 0) {
 733                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 734                 // let trySend() handle the exception
 735                 throw new IOException(connection().getConnectionFlow()
 736                                      + " stream=" + streamid + " "
 737                                      + "[" + Thread.currentThread().getName() +"] "
 738                                      + "Too few bytes returned by the publisher ("
 739                                               + written + "/"
 740                                               + contentLength + ")");
 741             }
 742             if (!endStreamSent) {
 743                 endStreamSent = true;
 744                 connection.sendDataFrame(getEmptyEndStreamDataFrame());
 745             }
 746             requestBodyCF.complete(null);
 747         }
 748     }
 749 
 750     /**
 751      * Send a RESET frame to tell server to stop sending data on this stream
 752      */
 753     @Override
 754     public CompletableFuture<Void> ignoreBody() {
 755         try {
 756             connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
 757             return MinimalFuture.completedFuture(null);
 758         } catch (Throwable e) {
 759             Log.logTrace("Error resetting stream {0}", e.toString());
 760             return MinimalFuture.failedFuture(e);
 761         }
 762     }
 763 
 764     DataFrame getDataFrame(ByteBuffer buffer) {
 765         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
 766         // blocks waiting for stream send window, if exhausted
 767         int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
 768         if (actualAmount <= 0) return null;
 769         ByteBuffer outBuf = Utils.slice(buffer,  actualAmount);
 770         DataFrame df = new DataFrame(streamid, 0 , outBuf);
 771         return df;
 772     }
 773 
 774     private DataFrame getEmptyEndStreamDataFrame()  {
 775         return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
 776     }
 777 
 778     /**
 779      * A List of responses relating to this stream. Normally there is only
 780      * one response, but intermediate responses like 100 are allowed
 781      * and must be passed up to higher level before continuing. Deals with races
 782      * such as if responses are returned before the CFs get created by
 783      * getResponseAsync()
 784      */
 785 
 786     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
 787 
 788     @Override
 789     CompletableFuture<Response> getResponseAsync(Executor executor) {
 790         CompletableFuture<Response> cf;
 791         // The code below deals with race condition that can be caused when
 792         // completeResponse() is being called before getResponseAsync()
 793         synchronized (response_cfs) {
 794             if (!response_cfs.isEmpty()) {
 795                 // This CompletableFuture was created by completeResponse().
 796                 // it will be already completed.
 797                 cf = response_cfs.remove(0);
 798                 // if we find a cf here it should be already completed.
 799                 // finding a non completed cf should not happen. just assert it.
 800                 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
 801             } else {
 802                 // getResponseAsync() is called first. Create a CompletableFuture
 803                 // that will be completed by completeResponse() when
 804                 // completeResponse() is called.
 805                 cf = new MinimalFuture<>();
 806                 response_cfs.add(cf);
 807             }
 808         }
 809         if (executor != null && !cf.isDone()) {
 810             // protect from executing later chain of CompletableFuture operations from SelectorManager thread
 811             cf = cf.thenApplyAsync(r -> r, executor);
 812         }
 813         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
 814         PushGroup<?,?> pg = exchange.getPushGroup();
 815         if (pg != null) {
 816             // if an error occurs make sure it is recorded in the PushGroup
 817             cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
 818         }
 819         return cf;
 820     }
 821 
 822     /**
 823      * Completes the first uncompleted CF on list, and removes it. If there is no
 824      * uncompleted CF then creates one (completes it) and adds to list
 825      */
 826     void completeResponse(Response resp) {
 827         synchronized (response_cfs) {
 828             CompletableFuture<Response> cf;
 829             int cfs_len = response_cfs.size();
 830             for (int i=0; i<cfs_len; i++) {
 831                 cf = response_cfs.get(i);
 832                 if (!cf.isDone()) {
 833                     Log.logTrace("Completing response (streamid={0}): {1}",
 834                                  streamid, cf);
 835                     cf.complete(resp);
 836                     response_cfs.remove(cf);
 837                     return;
 838                 } // else we found the previous response: just leave it alone.
 839             }
 840             cf = MinimalFuture.completedFuture(resp);
 841             Log.logTrace("Created completed future (streamid={0}): {1}",
 842                          streamid, cf);
 843             response_cfs.add(cf);
 844         }
 845     }
 846 
 847     // methods to update state and remove stream when finished
 848 
 849     synchronized void requestSent() {
 850         requestSent = true;
 851         if (responseReceived) {
 852             close();
 853         }
 854     }
 855 
 856     synchronized void responseReceived() {
 857         responseReceived = true;
 858         if (requestSent) {
 859             close();
 860         }
 861     }
 862 
 863     /**
 864      * same as above but for errors
 865      */
 866     void completeResponseExceptionally(Throwable t) {
 867         synchronized (response_cfs) {
 868             // use index to avoid ConcurrentModificationException
 869             // caused by removing the CF from within the loop.
 870             for (int i = 0; i < response_cfs.size(); i++) {
 871                 CompletableFuture<Response> cf = response_cfs.get(i);
 872                 if (!cf.isDone()) {
 873                     cf.completeExceptionally(t);
 874                     response_cfs.remove(i);
 875                     return;
 876                 }
 877             }
 878             response_cfs.add(MinimalFuture.failedFuture(t));
 879         }
 880     }
 881 
 882     CompletableFuture<Void> sendBodyImpl() {
 883         requestBodyCF.whenComplete((v, t) -> requestSent());
 884         if (requestPublisher != null) {
 885             final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
 886             requestPublisher.subscribe(requestSubscriber = subscriber);
 887         } else {
 888             // there is no request body, therefore the request is complete,
 889             // END_STREAM has already sent with outgoing headers
 890             requestBodyCF.complete(null);
 891         }
 892         return requestBodyCF;
 893     }
 894 
 895     @Override
 896     void cancel() {
 897         cancel(new IOException("Stream " + streamid + " cancelled"));
 898     }
 899 
 900     @Override
 901     void cancel(IOException cause) {
 902         cancelImpl(cause);
 903     }
 904 
 905     // This method sends a RST_STREAM frame
 906     void cancelImpl(Throwable e) {
 907         debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
 908         if (Log.trace()) {
 909             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
 910         }
 911         boolean closing;
 912         if (closing = !closed) { // assigning closing to !closed
 913             synchronized (this) {
 914                 failed = e;
 915                 if (closing = !closed) { // assigning closing to !closed
 916                     closed=true;
 917                 }
 918             }
 919         }
 920         if (closing) { // true if the stream has not been closed yet
 921             if (responseSubscriber != null)
 922                 sched.runOrSchedule();
 923         }
 924         completeResponseExceptionally(e);
 925         if (!requestBodyCF.isDone()) {
 926             requestBodyCF.completeExceptionally(e); // we may be sending the body..
 927         }
 928         if (responseBodyCF != null) {
 929             responseBodyCF.completeExceptionally(e);
 930         }
 931         try {
 932             // will send a RST_STREAM frame
 933             if (streamid != 0) {
 934                 connection.resetStream(streamid, ResetFrame.CANCEL);
 935             }
 936         } catch (IOException ex) {
 937             Log.logError(ex);
 938         }
 939     }
 940 
 941     // This method doesn't send any frame
 942     void close() {
 943         if (closed) return;
 944         synchronized(this) {
 945             if (closed) return;
 946             closed = true;
 947         }
 948         Log.logTrace("Closing stream {0}", streamid);
 949         connection.closeStream(streamid);
 950         Log.logTrace("Stream {0} closed", streamid);
 951     }
 952 
 953     static class PushedStream<U,T> extends Stream<T> {
 954         final PushGroup<U,T> pushGroup;
 955         // push streams need the response CF allocated up front as it is
 956         // given directly to user via the multi handler callback function.
 957         final CompletableFuture<Response> pushCF;
 958         final CompletableFuture<HttpResponse<T>> responseCF;
 959         final HttpRequestImpl pushReq;
 960         HttpResponse.BodyHandler<T> pushHandler;
 961 
 962         PushedStream(PushGroup<U,T> pushGroup,
 963                      Http2Connection connection,
 964                      Exchange<T> pushReq) {
 965             // ## no request body possible, null window controller
 966             super(connection, pushReq, null);
 967             this.pushGroup = pushGroup;
 968             this.pushReq = pushReq.request();
 969             this.pushCF = new MinimalFuture<>();
 970             this.responseCF = new MinimalFuture<>();
 971         }
 972 
 973         CompletableFuture<HttpResponse<T>> responseCF() {
 974             return responseCF;
 975         }
 976 
 977         synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
 978             this.pushHandler = pushHandler;
 979         }
 980 
 981         synchronized HttpResponse.BodyHandler<T> getPushHandler() {
 982             // ignored parameters to function can be used as BodyHandler
 983             return this.pushHandler;
 984         }
 985 
 986         // Following methods call the super class but in case of
 987         // error record it in the PushGroup. The error method is called
 988         // with a null value when no error occurred (is a no-op)
 989         @Override
 990         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 991             return super.sendBodyAsync()
 992                         .whenComplete((ExchangeImpl<T> v, Throwable t)
 993                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
 994         }
 995 
 996         @Override
 997         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 998             return super.sendHeadersAsync()
 999                         .whenComplete((ExchangeImpl<T> ex, Throwable t)
1000                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
1001         }
1002 
1003         @Override
1004         CompletableFuture<Response> getResponseAsync(Executor executor) {
1005             CompletableFuture<Response> cf = pushCF.whenComplete(
1006                     (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
1007             if(executor!=null && !cf.isDone()) {
1008                 cf  = cf.thenApplyAsync( r -> r, executor);
1009             }
1010             return cf;
1011         }
1012 
1013         @Override
1014         CompletableFuture<T> readBodyAsync(
1015                 HttpResponse.BodyHandler<T> handler,
1016                 boolean returnConnectionToPool,
1017                 Executor executor)
1018         {
1019             return super.readBodyAsync(handler, returnConnectionToPool, executor)
1020                         .whenComplete((v, t) -> pushGroup.pushError(t));
1021         }
1022 
1023         @Override
1024         void completeResponse(Response r) {
1025             Log.logResponse(r::toString);
1026             pushCF.complete(r); // not strictly required for push API
1027             // start reading the body using the obtained BodySubscriber
1028             CompletableFuture<Void> start = new MinimalFuture<>();
1029             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
1030                 .whenComplete((T body, Throwable t) -> {
1031                     if (t != null) {
1032                         responseCF.completeExceptionally(t);
1033                     } else {
1034                         HttpResponseImpl<T> resp =
1035                                 new HttpResponseImpl<>(r.request, r, null, body, getExchange());
1036                         responseCF.complete(resp);
1037                     }
1038                 });
1039             start.completeAsync(() -> null, getExchange().executor());
1040         }
1041 
1042         @Override
1043         void completeResponseExceptionally(Throwable t) {
1044             pushCF.completeExceptionally(t);
1045         }
1046 
1047 //        @Override
1048 //        synchronized void responseReceived() {
1049 //            super.responseReceived();
1050 //        }
1051 
1052         // create and return the PushResponseImpl
1053         @Override
1054         protected void handleResponse() {
1055             responseCode = (int)responseHeaders
1056                 .firstValueAsLong(":status")
1057                 .orElse(-1);
1058 
1059             if (responseCode == -1) {
1060                 completeResponseExceptionally(new IOException("No status code"));
1061             }
1062 
1063             this.response = new Response(
1064                 pushReq, exchange, responseHeaders,
1065                 responseCode, HttpClient.Version.HTTP_2);
1066 
1067             /* TODO: review if needs to be removed
1068                the value is not used, but in case `content-length` doesn't parse
1069                as long, there will be NumberFormatException. If left as is, make
1070                sure code up the stack handles NFE correctly. */
1071             responseHeaders.firstValueAsLong("content-length");
1072 
1073             if (Log.headers()) {
1074                 StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
1075                 sb.append(" (streamid=").append(streamid).append("): ");
1076                 Log.dumpHeaders(sb, "    ", responseHeaders);
1077                 Log.logHeaders(sb.toString());
1078             }
1079 
1080             // different implementations for normal streams and pushed streams
1081             completeResponse(response);
1082         }
1083     }
1084 
1085     final class StreamWindowUpdateSender extends WindowUpdateSender {
1086 
1087         StreamWindowUpdateSender(Http2Connection connection) {
1088             super(connection);
1089         }
1090 
1091         @Override
1092         int getStreamId() {
1093             return streamid;
1094         }
1095     }
1096 
1097     /**
1098      * Returns true if this exchange was canceled.
1099      * @return true if this exchange was canceled.
1100      */
1101     synchronized boolean isCanceled() {
1102         return failed != null;
1103     }
1104 
1105     /**
1106      * Returns the cause for which this exchange was canceled, if available.
1107      * @return the cause for which this exchange was canceled, if available.
1108      */
1109     synchronized Throwable getCancelCause() {
1110         return failed;
1111     }
1112 
1113     final String dbgString() {
1114         return connection.dbgString() + "/Stream("+streamid+")";
1115     }
1116 }