1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.URI;
  31 import java.nio.ByteBuffer;
  32 import java.util.ArrayList;
  33 import java.util.Collections;
  34 import java.util.List;
  35 import java.util.Optional;
  36 import java.util.concurrent.CompletableFuture;
  37 import java.util.concurrent.ConcurrentLinkedDeque;
  38 import java.util.concurrent.ConcurrentLinkedQueue;
  39 import java.util.concurrent.Executor;
  40 import java.util.concurrent.Flow;
  41 import java.util.concurrent.Flow.Subscription;
  42 import java.util.concurrent.atomic.AtomicReference;
  43 import jdk.incubator.http.HttpResponse.BodySubscriber;
  44 import jdk.incubator.http.internal.common.*;
  45 import jdk.incubator.http.internal.frame.*;
  46 import jdk.incubator.http.internal.hpack.DecodingCallback;
  47 
  48 /**
  49  * Http/2 Stream handling.
  50  *
  51  * REQUESTS
  52  *
  53  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  54  *
  55  * sendRequest() -- sendHeadersOnly() + sendBody()
  56  *
  57  * sendBodyAsync() -- calls sendBody() in an executor thread.
  58  *
  59  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  60  *
  61  * sendRequestAsync() -- calls sendRequest() in an executor thread
  62  *
  63  * RESPONSES
  64  *
  65  * Multiple responses can be received per request. Responses are queued up on
  66  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
  67  * with the next response
  68  *
  69  * getResponseAsync() -- queries list of response CFs and returns first one
  70  *               if one exists. Otherwise, creates one and adds it to list
  71  *               and returns it. Completion is achieved through the
  72  *               incoming() upcall from connection reader thread.
  73  *
  74  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  75  *
  76  * responseBodyAsync() -- calls responseBody() in an executor thread.
  77  *
  78  * incoming() -- entry point called from connection reader thread. Frames are
  79  *               either handled immediately without blocking or for data frames
  80  *               placed on the stream's inputQ which is consumed by the stream's
  81  *               reader thread.
  82  *
  83  * PushedStream sub class
  84  * ======================
  85  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  86  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  87  * is created. PushedStream does not use responseCF list as there can be only
  88  * one response. The CF is created when the object created and when the response
  89  * HEADERS frame is received the object is completed.
  90  */
  91 class Stream<T> extends ExchangeImpl<T> {
  92 
  93     final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag
  94     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
  95 
  96     final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
  97     final SequentialScheduler sched =
  98             SequentialScheduler.synchronizedScheduler(this::schedule);
  99     final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
 100 
 101     /**
 102      * This stream's identifier. Assigned lazily by the HTTP2Connection before
 103      * the stream's first frame is sent.
 104      */
 105     protected volatile int streamid;
 106 
 107     long requestContentLen;
 108 
 109     final Http2Connection connection;
 110     final HttpRequestImpl request;
 111     final DecodingCallback rspHeadersConsumer;
 112     HttpHeadersImpl responseHeaders;
 113     final HttpHeadersImpl requestPseudoHeaders;
 114     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
 115     final HttpRequest.BodyPublisher requestPublisher;
 116     volatile RequestSubscriber requestSubscriber;
 117     volatile int responseCode;
 118     volatile Response response;
 119     volatile Throwable failed; // The exception with which this stream was canceled.
 120     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
 121     volatile CompletableFuture<T> responseBodyCF;
 122 
 123     /** True if END_STREAM has been seen in a frame received on this stream. */
 124     private volatile boolean remotelyClosed;
 125     private volatile boolean closed;
 126     private volatile boolean endStreamSent;
 127 
 128     // state flags
 129     private boolean requestSent, responseReceived;
 130 
 131     /**
 132      * A reference to this Stream's connection Send Window controller. The
 133      * stream MUST acquire the appropriate amount of Send Window before
 134      * sending any data. Will be null for PushStreams, as they cannot send data.
 135      */
 136     private final WindowController windowController;
 137     private final WindowUpdateSender windowUpdater;
 138 
 139     @Override
 140     HttpConnection connection() {
 141         return connection.connection;
 142     }
 143 
 144     /**
 145      * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
 146      * of after user subscription window has re-opened, from SubscriptionBase.request()
 147      */
 148     private void schedule() {
 149         if (responseSubscriber == null)
 150             // can't process anything yet
 151             return;
 152 
 153         while (!inputQ.isEmpty()) {
 154             Http2Frame frame  = inputQ.peek();
 155             if (frame instanceof ResetFrame) {
 156                 inputQ.remove();
 157                 handleReset((ResetFrame)frame);
 158                 return;
 159             }
 160             DataFrame df = (DataFrame)frame;
 161             boolean finished = df.getFlag(DataFrame.END_STREAM);
 162 
 163             List<ByteBuffer> buffers = df.getData();
 164             List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
 165             int size = Utils.remaining(dsts, Integer.MAX_VALUE);
 166             if (size == 0 && finished) {
 167                 inputQ.remove();
 168                 Log.logTrace("responseSubscriber.onComplete");
 169                 debug.log(Level.DEBUG, "incoming: onComplete");
 170                 sched.stop();
 171                 responseSubscriber.onComplete();
 172                 setEndStreamReceived();
 173                 return;
 174             } else if (userSubscription.tryDecrement()) {
 175                 inputQ.remove();
 176                 Log.logTrace("responseSubscriber.onNext {0}", size);
 177                 debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
 178                 responseSubscriber.onNext(dsts);
 179                 if (consumed(df)) {
 180                     Log.logTrace("responseSubscriber.onComplete");
 181                     debug.log(Level.DEBUG, "incoming: onComplete");
 182                     sched.stop();
 183                     responseSubscriber.onComplete();
 184                     setEndStreamReceived();
 185                     return;
 186                 }
 187             } else {
 188                 return;
 189             }
 190         }
 191         Throwable t = failed;
 192         if (t != null) {
 193             sched.stop();
 194             responseSubscriber.onError(t);
 195             close();
 196         }
 197     }
 198 
 199     // Callback invoked after the Response BodySubscriber has consumed the
 200     // buffers contained in a DataFrame.
 201     // Returns true if END_STREAM is reached, false otherwise.
 202     private boolean consumed(DataFrame df) {
 203         // RFC 7540 6.1:
 204         // The entire DATA frame payload is included in flow control,
 205         // including the Pad Length and Padding fields if present
 206         int len = df.payloadLength();
 207         connection.windowUpdater.update(len);
 208 
 209         if (!df.getFlag(DataFrame.END_STREAM)) {
 210             // Don't send window update on a stream which is
 211             // closed or half closed.
 212             windowUpdater.update(len);
 213             return false; // more data coming
 214         }
 215         return true; // end of stream
 216     }
 217 
 218     @Override
 219     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
 220                                        boolean returnConnectionToPool,
 221                                        Executor executor)
 222     {
 223         Log.logTrace("Reading body on stream {0}", streamid);
 224         BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
 225         CompletableFuture<T> cf = receiveData(bodySubscriber);
 226 
 227         PushGroup<?,?> pg = exchange.getPushGroup();
 228         if (pg != null) {
 229             // if an error occurs make sure it is recorded in the PushGroup
 230             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 231         }
 232         return cf;
 233     }
 234 
 235     @Override
 236     public String toString() {
 237         StringBuilder sb = new StringBuilder();
 238         sb.append("streamid: ")
 239                 .append(streamid);
 240         return sb.toString();
 241     }
 242 
 243     private void receiveDataFrame(DataFrame df) {
 244         inputQ.add(df);
 245         sched.runOrSchedule();
 246     }
 247 
 248     /** Handles a RESET frame. RESET is always handled inline in the queue. */
 249     private void receiveResetFrame(ResetFrame frame) {
 250         inputQ.add(frame);
 251         sched.runOrSchedule();
 252     }
 253 
 254     // pushes entire response body into response subscriber
 255     // blocking when required by local or remote flow control
 256     CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
 257         responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
 258 
 259         if (isCanceled()) {
 260             Throwable t = getCancelCause();
 261             responseBodyCF.completeExceptionally(t);
 262         } else {
 263             bodySubscriber.onSubscribe(userSubscription);
 264         }
 265         // Set the responseSubscriber field now that onSubscribe has been called.
 266         // This effectively allows the scheduler to start invoking the callbacks.
 267         responseSubscriber = bodySubscriber;
 268         sched.runOrSchedule(); // in case data waiting already to be processed
 269         return responseBodyCF;
 270     }
 271 
 272     @Override
 273     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 274         return sendBodyImpl().thenApply( v -> this);
 275     }
 276 
 277     @SuppressWarnings("unchecked")
 278     Stream(Http2Connection connection,
 279            Exchange<T> e,
 280            WindowController windowController)
 281     {
 282         super(e);
 283         this.connection = connection;
 284         this.windowController = windowController;
 285         this.request = e.request();
 286         this.requestPublisher = request.requestPublisher;  // may be null
 287         responseHeaders = new HttpHeadersImpl();
 288         rspHeadersConsumer = (name, value) -> {
 289             responseHeaders.addHeader(name.toString(), value.toString());
 290             if (Log.headers() && Log.trace()) {
 291                 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
 292                              streamid, name, value);
 293             }
 294         };
 295         this.requestPseudoHeaders = new HttpHeadersImpl();
 296         // NEW
 297         this.windowUpdater = new StreamWindowUpdateSender(connection);
 298     }
 299 
 300     /**
 301      * Entry point from Http2Connection reader thread.
 302      *
 303      * Data frames will be removed by response body thread.
 304      */
 305     void incoming(Http2Frame frame) throws IOException {
 306         debug.log(Level.DEBUG, "incoming: %s", frame);
 307         if ((frame instanceof HeaderFrame)) {
 308             HeaderFrame hframe = (HeaderFrame)frame;
 309             if (hframe.endHeaders()) {
 310                 Log.logTrace("handling response (streamid={0})", streamid);
 311                 handleResponse();
 312                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
 313                     receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
 314                 }
 315             }
 316         } else if (frame instanceof DataFrame) {
 317             receiveDataFrame((DataFrame)frame);
 318         } else {
 319             otherFrame(frame);
 320         }
 321     }
 322 
 323     void otherFrame(Http2Frame frame) throws IOException {
 324         switch (frame.type()) {
 325             case WindowUpdateFrame.TYPE:
 326                 incoming_windowUpdate((WindowUpdateFrame) frame);
 327                 break;
 328             case ResetFrame.TYPE:
 329                 incoming_reset((ResetFrame) frame);
 330                 break;
 331             case PriorityFrame.TYPE:
 332                 incoming_priority((PriorityFrame) frame);
 333                 break;
 334             default:
 335                 String msg = "Unexpected frame: " + frame.toString();
 336                 throw new IOException(msg);
 337         }
 338     }
 339 
 340     // The Hpack decoder decodes into one of these consumers of name,value pairs
 341 
 342     DecodingCallback rspHeadersConsumer() {
 343         return rspHeadersConsumer;
 344     }
 345 
 346     protected void handleResponse() throws IOException {
 347         responseCode = (int)responseHeaders
 348                 .firstValueAsLong(":status")
 349                 .orElseThrow(() -> new IOException("no statuscode in response"));
 350 
 351         response = new Response(
 352                 request, exchange, responseHeaders,
 353                 responseCode, HttpClient.Version.HTTP_2);
 354 
 355         /* TODO: review if needs to be removed
 356            the value is not used, but in case `content-length` doesn't parse as
 357            long, there will be NumberFormatException. If left as is, make sure
 358            code up the stack handles NFE correctly. */
 359         responseHeaders.firstValueAsLong("content-length");
 360 
 361         if (Log.headers()) {
 362             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 363             Log.dumpHeaders(sb, "    ", responseHeaders);
 364             Log.logHeaders(sb.toString());
 365         }
 366 
 367         completeResponse(response);
 368     }
 369 
 370     void incoming_reset(ResetFrame frame) {
 371         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
 372         if (endStreamReceived()) {
 373             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
 374         } else if (closed) {
 375             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 376         } else {
 377             // put it in the input queue in order to read all
 378             // pending data frames first. Indeed, a server may send
 379             // RST_STREAM after sending END_STREAM, in which case we should
 380             // ignore it. However, we won't know if we have received END_STREAM
 381             // or not until all pending data frames are read.
 382             receiveResetFrame(frame);
 383             // RST_STREAM was pushed to the queue. It will be handled by
 384             // asyncReceive after all pending data frames have been
 385             // processed.
 386             Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
 387         }
 388     }
 389 
 390     void handleReset(ResetFrame frame) {
 391         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
 392         if (!closed) {
 393             close();
 394             int error = frame.getErrorCode();
 395             completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
 396         } else {
 397             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 398         }
 399     }
 400 
 401     void incoming_priority(PriorityFrame frame) {
 402         // TODO: implement priority
 403         throw new UnsupportedOperationException("Not implemented");
 404     }
 405 
 406     private void incoming_windowUpdate(WindowUpdateFrame frame)
 407         throws IOException
 408     {
 409         int amount = frame.getUpdate();
 410         if (amount <= 0) {
 411             Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
 412                          streamid, streamid, amount);
 413             connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 414         } else {
 415             assert streamid != 0;
 416             boolean success = windowController.increaseStreamWindow(amount, streamid);
 417             if (!success) {  // overflow
 418                 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 419             }
 420         }
 421     }
 422 
 423     void incoming_pushPromise(HttpRequestImpl pushReq,
 424                               PushedStream<?,T> pushStream)
 425         throws IOException
 426     {
 427         if (Log.requests()) {
 428             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 429         }
 430         PushGroup<?,T> pushGroup = exchange.getPushGroup();
 431         if (pushGroup == null) {
 432             Log.logTrace("Rejecting push promise stream " + streamid);
 433             connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
 434             pushStream.close();
 435             return;
 436         }
 437 
 438         HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber();
 439 
 440         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
 441 
 442         Optional<HttpResponse.BodyHandler<T>> bpOpt =
 443                 pushGroup.handlerForPushRequest(pushReq);
 444 
 445         if (!bpOpt.isPresent()) {
 446             IOException ex = new IOException("Stream "
 447                  + streamid + " cancelled by user");
 448             if (Log.trace()) {
 449                 Log.logTrace("No body subscriber for {0}: {1}", pushReq,
 450                             ex.getMessage());
 451             }
 452             pushStream.cancelImpl(ex);
 453             cf.completeExceptionally(ex);
 454             return;
 455         }
 456 
 457         pushGroup.addPush();
 458         pushStream.requestSent();
 459         pushStream.setPushHandler(bpOpt.get());
 460         // setup housekeeping for when the push is received
 461         // TODO: deal with ignoring of CF anti-pattern
 462         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
 463             t = Utils.getCompletionCause(t);
 464             if (Log.trace()) {
 465                 Log.logTrace("Push completed on stream {0} for {1}{2}",
 466                              pushStream.streamid, resp,
 467                              ((t==null) ? "": " with exception " + t));
 468             }
 469             if (t != null) {
 470                 pushGroup.pushError(t);
 471                 proc.onError(pushReq, t);
 472             } else {
 473                 proc.onResponse(resp);
 474             }
 475             pushGroup.pushCompleted();
 476         });
 477 
 478     }
 479 
 480     private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
 481         HttpHeadersImpl h = request.getSystemHeaders();
 482         if (contentLength > 0) {
 483             h.setHeader("content-length", Long.toString(contentLength));
 484         }
 485         setPseudoHeaderFields();
 486         OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(h, request.getUserHeaders(), this);
 487         if (contentLength == 0) {
 488             f.setFlag(HeadersFrame.END_STREAM);
 489             endStreamSent = true;
 490         }
 491         return f;
 492     }
 493 
 494     private void setPseudoHeaderFields() {
 495         HttpHeadersImpl hdrs = requestPseudoHeaders;
 496         String method = request.method();
 497         hdrs.setHeader(":method", method);
 498         URI uri = request.uri();
 499         hdrs.setHeader(":scheme", uri.getScheme());
 500         // TODO: userinfo deprecated. Needs to be removed
 501         hdrs.setHeader(":authority", uri.getAuthority());
 502         // TODO: ensure header names beginning with : not in user headers
 503         String query = uri.getQuery();
 504         String path = uri.getPath();
 505         if (path == null || path.isEmpty()) {
 506             if (method.equalsIgnoreCase("OPTIONS")) {
 507                 path = "*";
 508             } else {
 509                 path = "/";
 510             }
 511         }
 512         if (query != null) {
 513             path += "?" + query;
 514         }
 515         hdrs.setHeader(":path", path);
 516     }
 517 
 518     HttpHeadersImpl getRequestPseudoHeaders() {
 519         return requestPseudoHeaders;
 520     }
 521 
 522     /** Sets endStreamReceived. Should be called only once. */
 523     void setEndStreamReceived() {
 524         assert remotelyClosed == false: "Unexpected endStream already set";
 525         remotelyClosed = true;
 526         responseReceived();
 527     }
 528 
 529     /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
 530      *  received on this stream. */
 531     private boolean endStreamReceived() {
 532         return remotelyClosed;
 533     }
 534 
 535     @Override
 536     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 537         debug.log(Level.DEBUG, "sendHeadersOnly()");
 538         if (Log.requests() && request != null) {
 539             Log.logRequest(request.toString());
 540         }
 541         if (requestPublisher != null) {
 542             requestContentLen = requestPublisher.contentLength();
 543         } else {
 544             requestContentLen = 0;
 545         }
 546         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
 547         connection.sendFrame(f);
 548         CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
 549         cf.complete(this);  // #### good enough for now
 550         return cf;
 551     }
 552 
 553     @Override
 554     void released() {
 555         if (streamid > 0) {
 556             debug.log(Level.DEBUG, "Released stream %d", streamid);
 557             // remove this stream from the Http2Connection map.
 558             connection.closeStream(streamid);
 559         } else {
 560             debug.log(Level.DEBUG, "Can't release stream %d", streamid);
 561         }
 562     }
 563 
 564     @Override
 565     void completed() {
 566         // There should be nothing to do here: the stream should have
 567         // been already closed (or will be closed shortly after).
 568     }
 569 
 570     void registerStream(int id) {
 571         this.streamid = id;
 572         connection.putStream(this, streamid);
 573         debug.log(Level.DEBUG, "Registered stream %d", id);
 574     }
 575 
 576     void signalWindowUpdate() {
 577         RequestSubscriber subscriber = requestSubscriber;
 578         assert subscriber != null;
 579         debug.log(Level.DEBUG, "Signalling window update");
 580         subscriber.sendScheduler.runOrSchedule();
 581     }
 582 
 583     static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
 584     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
 585         // can be < 0 if the actual length is not known.
 586         private final long contentLength;
 587         private volatile long remainingContentLength;
 588         private volatile Subscription subscription;
 589 
 590         // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
 591         //  1) The data that was published by the request body Publisher, and
 592         //  2) the COMPLETED sentinel, since onComplete can be invoked without demand.
 593         final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
 594 
 595         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 596         // A scheduler used to honor window updates. Writing must be paused
 597         // when the window is exhausted, and resumed when the window acquires
 598         // some space. The sendScheduler makes it possible to implement this
 599         // behaviour in an asynchronous non-blocking way.
 600         // See RequestSubscriber::trySend below.
 601         final SequentialScheduler sendScheduler;
 602 
 603         RequestSubscriber(long contentLen) {
 604             this.contentLength = contentLen;
 605             this.remainingContentLength = contentLen;
 606             this.sendScheduler =
 607                     SequentialScheduler.synchronizedScheduler(this::trySend);
 608         }
 609 
 610         @Override
 611         public void onSubscribe(Flow.Subscription subscription) {
 612             if (this.subscription != null) {
 613                 throw new IllegalStateException("already subscribed");
 614             }
 615             this.subscription = subscription;
 616             debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1");
 617             subscription.request(1);
 618         }
 619 
 620         @Override
 621         public void onNext(ByteBuffer item) {
 622             debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining());
 623             int size = outgoing.size();
 624             assert size == 0 : "non-zero size: " + size;
 625             onNextImpl(item);
 626         }
 627 
 628         private void onNextImpl(ByteBuffer item) {
 629             // Got some more request body bytes to send.
 630             if (requestBodyCF.isDone()) {
 631                 // stream already cancelled, probably in timeout
 632                 sendScheduler.stop();
 633                 subscription.cancel();
 634                 return;
 635             }
 636             outgoing.add(item);
 637             sendScheduler.runOrSchedule();
 638         }
 639 
 640         @Override
 641         public void onError(Throwable throwable) {
 642             debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable);
 643             // ensure that errors are handled within the flow.
 644             if (errorRef.compareAndSet(null, throwable)) {
 645                 sendScheduler.runOrSchedule();
 646             }
 647         }
 648 
 649         @Override
 650         public void onComplete() {
 651             debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
 652             int size = outgoing.size();
 653             assert size == 0 || size == 1 : "non-zero or one size: " + size;
 654             // last byte of request body has been obtained.
 655             // ensure that everything is completed within the flow.
 656             onNextImpl(COMPLETED);
 657         }
 658 
 659         // Attempts to send the data, if any.
 660         // Handles errors and completion state.
 661         // Pause writing if the send window is exhausted, resume it if the
 662         // send window has some bytes that can be acquired.
 663         void trySend() {
 664             try {
 665                 // handle errors raised by onError;
 666                 Throwable t = errorRef.get();
 667                 if (t != null) {
 668                     sendScheduler.stop();
 669                     if (requestBodyCF.isDone()) return;
 670                     subscription.cancel();
 671                     requestBodyCF.completeExceptionally(t);
 672                     return;
 673                 }
 674 
 675                 do {
 676                     // handle COMPLETED;
 677                     ByteBuffer item = outgoing.peekFirst();
 678                     if (item == null) return;
 679                     else if (item == COMPLETED) {
 680                         sendScheduler.stop();
 681                         complete();
 682                         return;
 683                     }
 684 
 685                     // handle bytes to send downstream
 686                     while (item.hasRemaining()) {
 687                         debug.log(Level.DEBUG, "trySend: %d", item.remaining());
 688                         assert !endStreamSent : "internal error, send data after END_STREAM flag";
 689                         DataFrame df = getDataFrame(item);
 690                         if (df == null) {
 691                             debug.log(Level.DEBUG, "trySend: can't send yet: %d",
 692                                     item.remaining());
 693                             return; // the send window is exhausted: come back later
 694                         }
 695 
 696                         if (contentLength > 0) {
 697                             remainingContentLength -= df.getDataLength();
 698                             if (remainingContentLength < 0) {
 699                                 String msg = connection().getConnectionFlow()
 700                                         + " stream=" + streamid + " "
 701                                         + "[" + Thread.currentThread().getName() + "] "
 702                                         + "Too many bytes in request body. Expected: "
 703                                         + contentLength + ", got: "
 704                                         + (contentLength - remainingContentLength);
 705                                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 706                                 throw new IOException(msg);
 707                             } else if (remainingContentLength == 0) {
 708                                 df.setFlag(DataFrame.END_STREAM);
 709                                 endStreamSent = true;
 710                             }
 711                         }
 712                         debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
 713                         connection.sendDataFrame(df);
 714                     }
 715                     assert !item.hasRemaining();
 716                     ByteBuffer b = outgoing.removeFirst();
 717                     assert b == item;
 718                 } while (outgoing.peekFirst() != null);
 719 
 720                 debug.log(Level.DEBUG, "trySend: request 1");
 721                 subscription.request(1);
 722             } catch (Throwable ex) {
 723                 debug.log(Level.DEBUG, "trySend: ", ex);
 724                 sendScheduler.stop();
 725                 subscription.cancel();
 726                 requestBodyCF.completeExceptionally(ex);
 727             }
 728         }
 729 
 730         private void complete() throws IOException {
 731             long remaining = remainingContentLength;
 732             long written = contentLength - remaining;
 733             if (remaining > 0) {
 734                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 735                 // let trySend() handle the exception
 736                 throw new IOException(connection().getConnectionFlow()
 737                                      + " stream=" + streamid + " "
 738                                      + "[" + Thread.currentThread().getName() +"] "
 739                                      + "Too few bytes returned by the publisher ("
 740                                               + written + "/"
 741                                               + contentLength + ")");
 742             }
 743             if (!endStreamSent) {
 744                 endStreamSent = true;
 745                 connection.sendDataFrame(getEmptyEndStreamDataFrame());
 746             }
 747             requestBodyCF.complete(null);
 748         }
 749     }
 750 
 751     /**
 752      * Send a RESET frame to tell server to stop sending data on this stream
 753      */
 754     @Override
 755     public CompletableFuture<Void> ignoreBody() {
 756         try {
 757             connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
 758             return MinimalFuture.completedFuture(null);
 759         } catch (Throwable e) {
 760             Log.logTrace("Error resetting stream {0}", e.toString());
 761             return MinimalFuture.failedFuture(e);
 762         }
 763     }
 764 
 765     DataFrame getDataFrame(ByteBuffer buffer) {
 766         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
 767         // blocks waiting for stream send window, if exhausted
 768         int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
 769         if (actualAmount <= 0) return null;
 770         ByteBuffer outBuf = Utils.slice(buffer,  actualAmount);
 771         DataFrame df = new DataFrame(streamid, 0 , outBuf);
 772         return df;
 773     }
 774 
 775     private DataFrame getEmptyEndStreamDataFrame()  {
 776         return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
 777     }
 778 
 779     /**
 780      * A List of responses relating to this stream. Normally there is only
 781      * one response, but intermediate responses like 100 are allowed
 782      * and must be passed up to higher level before continuing. Deals with races
 783      * such as if responses are returned before the CFs get created by
 784      * getResponseAsync()
 785      */
 786 
 787     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
 788 
 789     @Override
 790     CompletableFuture<Response> getResponseAsync(Executor executor) {
 791         CompletableFuture<Response> cf;
 792         // The code below deals with race condition that can be caused when
 793         // completeResponse() is being called before getResponseAsync()
 794         synchronized (response_cfs) {
 795             if (!response_cfs.isEmpty()) {
 796                 // This CompletableFuture was created by completeResponse().
 797                 // it will be already completed.
 798                 cf = response_cfs.remove(0);
 799                 // if we find a cf here it should be already completed.
 800                 // finding a non completed cf should not happen. just assert it.
 801                 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
 802             } else {
 803                 // getResponseAsync() is called first. Create a CompletableFuture
 804                 // that will be completed by completeResponse() when
 805                 // completeResponse() is called.
 806                 cf = new MinimalFuture<>();
 807                 response_cfs.add(cf);
 808             }
 809         }
 810         if (executor != null && !cf.isDone()) {
 811             // protect from executing later chain of CompletableFuture operations from SelectorManager thread
 812             cf = cf.thenApplyAsync(r -> r, executor);
 813         }
 814         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
 815         PushGroup<?,?> pg = exchange.getPushGroup();
 816         if (pg != null) {
 817             // if an error occurs make sure it is recorded in the PushGroup
 818             cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
 819         }
 820         return cf;
 821     }
 822 
 823     /**
 824      * Completes the first uncompleted CF on list, and removes it. If there is no
 825      * uncompleted CF then creates one (completes it) and adds to list
 826      */
 827     void completeResponse(Response resp) {
 828         synchronized (response_cfs) {
 829             CompletableFuture<Response> cf;
 830             int cfs_len = response_cfs.size();
 831             for (int i=0; i<cfs_len; i++) {
 832                 cf = response_cfs.get(i);
 833                 if (!cf.isDone()) {
 834                     Log.logTrace("Completing response (streamid={0}): {1}",
 835                                  streamid, cf);
 836                     cf.complete(resp);
 837                     response_cfs.remove(cf);
 838                     return;
 839                 } // else we found the previous response: just leave it alone.
 840             }
 841             cf = MinimalFuture.completedFuture(resp);
 842             Log.logTrace("Created completed future (streamid={0}): {1}",
 843                          streamid, cf);
 844             response_cfs.add(cf);
 845         }
 846     }
 847 
 848     // methods to update state and remove stream when finished
 849 
 850     synchronized void requestSent() {
 851         requestSent = true;
 852         if (responseReceived) {
 853             close();
 854         }
 855     }
 856 
 857     synchronized void responseReceived() {
 858         responseReceived = true;
 859         if (requestSent) {
 860             close();
 861         }
 862     }
 863 
 864     /**
 865      * same as above but for errors
 866      */
 867     void completeResponseExceptionally(Throwable t) {
 868         synchronized (response_cfs) {
 869             // use index to avoid ConcurrentModificationException
 870             // caused by removing the CF from within the loop.
 871             for (int i = 0; i < response_cfs.size(); i++) {
 872                 CompletableFuture<Response> cf = response_cfs.get(i);
 873                 if (!cf.isDone()) {
 874                     cf.completeExceptionally(t);
 875                     response_cfs.remove(i);
 876                     return;
 877                 }
 878             }
 879             response_cfs.add(MinimalFuture.failedFuture(t));
 880         }
 881     }
 882 
 883     CompletableFuture<Void> sendBodyImpl() {
 884         requestBodyCF.whenComplete((v, t) -> requestSent());
 885         if (requestPublisher != null) {
 886             final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
 887             requestPublisher.subscribe(requestSubscriber = subscriber);
 888         } else {
 889             // there is no request body, therefore the request is complete,
 890             // END_STREAM has already sent with outgoing headers
 891             requestBodyCF.complete(null);
 892         }
 893         return requestBodyCF;
 894     }
 895 
 896     @Override
 897     void cancel() {
 898         cancel(new IOException("Stream " + streamid + " cancelled"));
 899     }
 900 
 901     @Override
 902     void cancel(IOException cause) {
 903         cancelImpl(cause);
 904     }
 905 
 906     // This method sends a RST_STREAM frame
 907     void cancelImpl(Throwable e) {
 908         debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
 909         if (Log.trace()) {
 910             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
 911         }
 912         boolean closing;
 913         if (closing = !closed) { // assigning closing to !closed
 914             synchronized (this) {
 915                 failed = e;
 916                 if (closing = !closed) { // assigning closing to !closed
 917                     closed=true;
 918                 }
 919             }
 920         }
 921         if (closing) { // true if the stream has not been closed yet
 922             if (responseSubscriber != null)
 923                 sched.runOrSchedule();
 924         }
 925         completeResponseExceptionally(e);
 926         if (!requestBodyCF.isDone()) {
 927             requestBodyCF.completeExceptionally(e); // we may be sending the body..
 928         }
 929         if (responseBodyCF != null) {
 930             responseBodyCF.completeExceptionally(e);
 931         }
 932         try {
 933             // will send a RST_STREAM frame
 934             if (streamid != 0) {
 935                 connection.resetStream(streamid, ResetFrame.CANCEL);
 936             }
 937         } catch (IOException ex) {
 938             Log.logError(ex);
 939         }
 940     }
 941 
 942     // This method doesn't send any frame
 943     void close() {
 944         if (closed) return;
 945         synchronized(this) {
 946             if (closed) return;
 947             closed = true;
 948         }
 949         Log.logTrace("Closing stream {0}", streamid);
 950         connection.closeStream(streamid);
 951         Log.logTrace("Stream {0} closed", streamid);
 952     }
 953 
 954     static class PushedStream<U,T> extends Stream<T> {
 955         final PushGroup<U,T> pushGroup;
 956         // push streams need the response CF allocated up front as it is
 957         // given directly to user via the multi handler callback function.
 958         final CompletableFuture<Response> pushCF;
 959         final CompletableFuture<HttpResponse<T>> responseCF;
 960         final HttpRequestImpl pushReq;
 961         HttpResponse.BodyHandler<T> pushHandler;
 962 
 963         PushedStream(PushGroup<U,T> pushGroup,
 964                      Http2Connection connection,
 965                      Exchange<T> pushReq) {
 966             // ## no request body possible, null window controller
 967             super(connection, pushReq, null);
 968             this.pushGroup = pushGroup;
 969             this.pushReq = pushReq.request();
 970             this.pushCF = new MinimalFuture<>();
 971             this.responseCF = new MinimalFuture<>();
 972         }
 973 
 974         CompletableFuture<HttpResponse<T>> responseCF() {
 975             return responseCF;
 976         }
 977 
 978         synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
 979             this.pushHandler = pushHandler;
 980         }
 981 
 982         synchronized HttpResponse.BodyHandler<T> getPushHandler() {
 983             // ignored parameters to function can be used as BodyHandler
 984             return this.pushHandler;
 985         }
 986 
 987         // Following methods call the super class but in case of
 988         // error record it in the PushGroup. The error method is called
 989         // with a null value when no error occurred (is a no-op)
 990         @Override
 991         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 992             return super.sendBodyAsync()
 993                         .whenComplete((ExchangeImpl<T> v, Throwable t)
 994                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
 995         }
 996 
 997         @Override
 998         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 999             return super.sendHeadersAsync()
1000                         .whenComplete((ExchangeImpl<T> ex, Throwable t)
1001                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
1002         }
1003 
1004         @Override
1005         CompletableFuture<Response> getResponseAsync(Executor executor) {
1006             CompletableFuture<Response> cf = pushCF.whenComplete(
1007                     (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
1008             if(executor!=null && !cf.isDone()) {
1009                 cf  = cf.thenApplyAsync( r -> r, executor);
1010             }
1011             return cf;
1012         }
1013 
1014         @Override
1015         CompletableFuture<T> readBodyAsync(
1016                 HttpResponse.BodyHandler<T> handler,
1017                 boolean returnConnectionToPool,
1018                 Executor executor)
1019         {
1020             return super.readBodyAsync(handler, returnConnectionToPool, executor)
1021                         .whenComplete((v, t) -> pushGroup.pushError(t));
1022         }
1023 
1024         @Override
1025         void completeResponse(Response r) {
1026             Log.logResponse(r::toString);
1027             pushCF.complete(r); // not strictly required for push API
1028             // start reading the body using the obtained BodySubscriber
1029             CompletableFuture<Void> start = new MinimalFuture<>();
1030             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
1031                 .whenComplete((T body, Throwable t) -> {
1032                     if (t != null) {
1033                         responseCF.completeExceptionally(t);
1034                     } else {
1035                         HttpResponseImpl<T> resp =
1036                                 new HttpResponseImpl<>(r.request, r, null, body, getExchange());
1037                         responseCF.complete(resp);
1038                     }
1039                 });
1040             start.completeAsync(() -> null, getExchange().executor());
1041         }
1042 
1043         @Override
1044         void completeResponseExceptionally(Throwable t) {
1045             pushCF.completeExceptionally(t);
1046         }
1047 
1048 //        @Override
1049 //        synchronized void responseReceived() {
1050 //            super.responseReceived();
1051 //        }
1052 
1053         // create and return the PushResponseImpl
1054         @Override
1055         protected void handleResponse() {
1056             responseCode = (int)responseHeaders
1057                 .firstValueAsLong(":status")
1058                 .orElse(-1);
1059 
1060             if (responseCode == -1) {
1061                 completeResponseExceptionally(new IOException("No status code"));
1062             }
1063 
1064             this.response = new Response(
1065                 pushReq, exchange, responseHeaders,
1066                 responseCode, HttpClient.Version.HTTP_2);
1067 
1068             /* TODO: review if needs to be removed
1069                the value is not used, but in case `content-length` doesn't parse
1070                as long, there will be NumberFormatException. If left as is, make
1071                sure code up the stack handles NFE correctly. */
1072             responseHeaders.firstValueAsLong("content-length");
1073 
1074             if (Log.headers()) {
1075                 StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
1076                 sb.append(" (streamid=").append(streamid).append("): ");
1077                 Log.dumpHeaders(sb, "    ", responseHeaders);
1078                 Log.logHeaders(sb.toString());
1079             }
1080 
1081             // different implementations for normal streams and pushed streams
1082             completeResponse(response);
1083         }
1084     }
1085 
1086     final class StreamWindowUpdateSender extends WindowUpdateSender {
1087 
1088         StreamWindowUpdateSender(Http2Connection connection) {
1089             super(connection);
1090         }
1091 
1092         @Override
1093         int getStreamId() {
1094             return streamid;
1095         }
1096     }
1097 
1098     /**
1099      * Returns true if this exchange was canceled.
1100      * @return true if this exchange was canceled.
1101      */
1102     synchronized boolean isCanceled() {
1103         return failed != null;
1104     }
1105 
1106     /**
1107      * Returns the cause for which this exchange was canceled, if available.
1108      * @return the cause for which this exchange was canceled, if available.
1109      */
1110     synchronized Throwable getCancelCause() {
1111         return failed;
1112     }
1113 
1114     final String dbgString() {
1115         return connection.dbgString() + "/Stream("+streamid+")";
1116     }
1117 }