1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.net.URI;
  30 import java.nio.ByteBuffer;
  31 import java.util.ArrayList;
  32 import java.util.List;
  33 import java.util.Optional;
  34 import java.util.concurrent.CompletableFuture;
  35 import java.util.concurrent.CompletionException;
  36 import java.util.concurrent.ExecutionException;
  37 import java.util.concurrent.Executor;
  38 import java.util.concurrent.Flow;
  39 import java.util.concurrent.Flow.Subscription;
  40 import java.util.concurrent.TimeUnit;
  41 import java.util.concurrent.TimeoutException;
  42 import java.util.function.Consumer;
  43 
  44 import jdk.incubator.http.internal.common.*;
  45 import jdk.incubator.http.internal.frame.*;
  46 import jdk.incubator.http.internal.hpack.DecodingCallback;
  47 
  48 /**
  49  * Http/2 Stream handling.
  50  *
  51  * REQUESTS
  52  *
  53  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  54  *
  55  * sendRequest() -- sendHeadersOnly() + sendBody()
  56  *
  57  * sendBody() -- in calling thread: obeys all flow control (so may block)
  58  *               obtains data from request body processor and places on connection
  59  *               outbound Q.
  60  *
  61  * sendBodyAsync() -- calls sendBody() in an executor thread.
  62  *
  63  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  64  *
  65  * sendRequestAsync() -- calls sendRequest() in an executor thread
  66  *
  67  * RESPONSES
  68  *
  69  * Multiple responses can be received per request. Responses are queued up on
  70  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
  71  * with the next response
  72  *
  73  * getResponseAsync() -- queries list of response CFs and returns first one
  74  *               if one exists. Otherwise, creates one and adds it to list
  75  *               and returns it. Completion is achieved through the
  76  *               incoming() upcall from connection reader thread.
  77  *
  78  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  79  *
  80  * responseBody() -- in calling thread: blocks for incoming DATA frames on
  81  *               stream inputQ. Obeys remote and local flow control so may block.
  82  *               Calls user response body processor with data buffers.
  83  *
  84  * responseBodyAsync() -- calls responseBody() in an executor thread.
  85  *
  86  * incoming() -- entry point called from connection reader thread. Frames are
  87  *               either handled immediately without blocking or for data frames
  88  *               placed on the stream's inputQ which is consumed by the stream's
  89  *               reader thread.
  90  *
  91  * PushedStream sub class
  92  * ======================
  93  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  94  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  95  * is created. PushedStream does not use responseCF list as there can be only
  96  * one response. The CF is created when the object created and when the response
  97  * HEADERS frame is received the object is completed.
  98  */
  99 class Stream<T> extends ExchangeImpl<T> {
 100 
 101     final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
 102 
 103     /**
 104      * This stream's identifier. Assigned lazily by the HTTP2Connection before
 105      * the stream's first frame is sent.
 106      */
 107     protected volatile int streamid;
 108 
 109     long responseContentLen = -1;
 110     long responseBytesProcessed = 0;
 111     long requestContentLen;
 112 
 113     final Http2Connection connection;
 114     HttpClientImpl client;
 115     final HttpRequestImpl request;
 116     final DecodingCallback rspHeadersConsumer;
 117     HttpHeadersImpl responseHeaders;
 118     final HttpHeadersImpl requestHeaders;
 119     final HttpHeadersImpl requestPseudoHeaders;
 120     HttpResponse.BodyProcessor<T> responseProcessor;
 121     final HttpRequest.BodyProcessor requestProcessor;
 122     volatile int responseCode;
 123     volatile Response response;
 124     volatile CompletableFuture<Response> responseCF;
 125     final AbstractPushPublisher<ByteBuffer> publisher;
 126     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
 127 
 128     /** True if END_STREAM has been seen in a frame received on this stream. */
 129     private volatile boolean remotelyClosed;
 130     private volatile boolean closed;
 131     private volatile boolean endStreamSent;
 132 
 133     // state flags
 134     boolean requestSent, responseReceived, responseHeadersReceived;
 135 
 136     /**
 137      * A reference to this Stream's connection Send Window controller. The
 138      * stream MUST acquire the appropriate amount of Send Window before
 139      * sending any data. Will be null for PushStreams, as they cannot send data.
 140      */
 141     private final WindowController windowController;
 142     private final WindowUpdateSender windowUpdater;
 143 
 144     @Override
 145     HttpConnection connection() {
 146         return connection.connection;
 147     }
 148 
 149     @Override
 150     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
 151                                        boolean returnConnectionToPool,
 152                                        Executor executor)
 153     {
 154         Log.logTrace("Reading body on stream {0}", streamid);
 155         responseProcessor = handler.apply(responseCode, responseHeaders);
 156         publisher.subscribe(responseProcessor);
 157         CompletableFuture<T> cf = receiveData(executor);
 158 
 159         PushGroup<?,?> pg = exchange.getPushGroup();
 160         if (pg != null) {
 161             // if an error occurs make sure it is recorded in the PushGroup
 162             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 163         }
 164         return cf;
 165     }
 166 
 167     @Override
 168     T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool)
 169         throws IOException
 170     {
 171         CompletableFuture<T> cf = readBodyAsync(handler,
 172                                                 returnConnectionToPool,
 173                                                 null);
 174         try {
 175             return cf.join();
 176         } catch (CompletionException e) {
 177             throw Utils.getIOException(e);
 178         }
 179     }
 180 
 181     @Override
 182     public String toString() {
 183         StringBuilder sb = new StringBuilder();
 184         sb.append("streamid: ")
 185                 .append(streamid);
 186         return sb.toString();
 187     }
 188 
 189     private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
 190         if (frame instanceof ResetFrame) {
 191             handleReset((ResetFrame) frame);
 192             return true;
 193         } else if (!(frame instanceof DataFrame)) {
 194             assert false;
 195             return true;
 196         }
 197         DataFrame df = (DataFrame) frame;
 198         // RFC 7540 6.1:
 199         // The entire DATA frame payload is included in flow control,
 200         // including the Pad Length and Padding fields if present
 201         int len = df.payloadLength();
 202         ByteBufferReference[] buffers = df.getData();
 203         for (ByteBufferReference b : buffers) {
 204             ByteBuffer buf = b.get();
 205             if (buf.hasRemaining()) {
 206                 publisher.acceptData(Optional.of(buf));
 207             }
 208         }
 209         connection.windowUpdater.update(len);
 210         if (df.getFlag(DataFrame.END_STREAM)) {
 211             setEndStreamReceived();
 212             publisher.acceptData(Optional.empty());
 213             return false;
 214         }
 215         // Don't send window update on a stream which is
 216         // closed or half closed.
 217         windowUpdater.update(len);
 218         return true;
 219     }
 220 
 221     // pushes entire response body into response processor
 222     // blocking when required by local or remote flow control
 223     CompletableFuture<T> receiveData(Executor executor) {
 224         CompletableFuture<T> cf = responseProcessor
 225                 .getBody()
 226                 .toCompletableFuture();
 227         Consumer<Throwable> onError = e -> {
 228             Log.logTrace("receiveData: {0}", e.toString());
 229             e.printStackTrace();
 230             cf.completeExceptionally(e);
 231             publisher.acceptError(e);
 232         };
 233         if (executor == null) {
 234             inputQ.blockingReceive(this::receiveDataFrame, onError);
 235         } else {
 236             inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
 237         }
 238         return cf;
 239     }
 240 
 241     @Override
 242     void sendBody() throws IOException {
 243         try {
 244             sendBodyImpl().join();
 245         } catch (CompletionException e) {
 246             throw Utils.getIOException(e);
 247         }
 248     }
 249 
 250     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 251         return sendBodyImpl().thenApply( v -> this);
 252     }
 253 
 254     @SuppressWarnings("unchecked")
 255     Stream(HttpClientImpl client,
 256            Http2Connection connection,
 257            Exchange<T> e,
 258            WindowController windowController)
 259     {
 260         super(e);
 261         this.client = client;
 262         this.connection = connection;
 263         this.windowController = windowController;
 264         this.request = e.request();
 265         this.requestProcessor = request.requestProcessor;
 266         responseHeaders = new HttpHeadersImpl();
 267         requestHeaders = new HttpHeadersImpl();
 268         rspHeadersConsumer = (name, value) -> {
 269             responseHeaders.addHeader(name.toString(), value.toString());
 270             if (Log.headers() && Log.trace()) {
 271                 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
 272                              streamid, name, value);
 273             }
 274         };
 275         this.requestPseudoHeaders = new HttpHeadersImpl();
 276         // NEW
 277         this.publisher = new BlockingPushPublisher<>();
 278         this.windowUpdater = new StreamWindowUpdateSender(connection);
 279     }
 280 
 281     /**
 282      * Entry point from Http2Connection reader thread.
 283      *
 284      * Data frames will be removed by response body thread.
 285      */
 286     void incoming(Http2Frame frame) throws IOException {
 287         if ((frame instanceof HeaderFrame)) {
 288             HeaderFrame hframe = (HeaderFrame)frame;
 289             if (hframe.endHeaders()) {
 290                 Log.logTrace("handling response (streamid={0})", streamid);
 291                 handleResponse();
 292                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
 293                     inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]));
 294                 }
 295             }
 296         } else if (frame instanceof DataFrame) {
 297             inputQ.put(frame);
 298         } else {
 299             otherFrame(frame);
 300         }
 301     }
 302 
 303     void otherFrame(Http2Frame frame) throws IOException {
 304         switch (frame.type()) {
 305             case WindowUpdateFrame.TYPE:
 306                 incoming_windowUpdate((WindowUpdateFrame) frame);
 307                 break;
 308             case ResetFrame.TYPE:
 309                 incoming_reset((ResetFrame) frame);
 310                 break;
 311             case PriorityFrame.TYPE:
 312                 incoming_priority((PriorityFrame) frame);
 313                 break;
 314             default:
 315                 String msg = "Unexpected frame: " + frame.toString();
 316                 throw new IOException(msg);
 317         }
 318     }
 319 
 320     // The Hpack decoder decodes into one of these consumers of name,value pairs
 321 
 322     DecodingCallback rspHeadersConsumer() {
 323         return rspHeadersConsumer;
 324     }
 325 
 326     protected void handleResponse() throws IOException {
 327         synchronized(this) {
 328             responseHeadersReceived = true;
 329         }
 330         HttpConnection c = connection.connection; // TODO: improve
 331         responseCode = (int)responseHeaders
 332                 .firstValueAsLong(":status")
 333                 .orElseThrow(() -> new IOException("no statuscode in response"));
 334 
 335         response = new Response(
 336                 request, exchange, responseHeaders,
 337                 responseCode, HttpClient.Version.HTTP_2);
 338 
 339         this.responseContentLen = responseHeaders
 340                 .firstValueAsLong("content-length")
 341                 .orElse(-1L);
 342 
 343         if (Log.headers()) {
 344             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 345             Log.dumpHeaders(sb, "    ", responseHeaders);
 346             Log.logHeaders(sb.toString());
 347         }
 348 
 349         completeResponse(response);
 350     }
 351 
 352     void incoming_reset(ResetFrame frame) throws IOException {
 353         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
 354         if (endStreamReceived()) {
 355             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
 356         } else if (closed) {
 357             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 358         } else {
 359             boolean pushedToQueue = false;
 360             synchronized(this) {
 361                 // if the response headers are not yet
 362                 // received, or the inputQueue is closed, handle reset directly.
 363                 // Otherwise, put it in the input queue in order to read all
 364                 // pending data frames first. Indeed, a server may send
 365                 // RST_STREAM after sending END_STREAM, in which case we should
 366                 // ignore it. However, we won't know if we have received END_STREAM
 367                 // or not until all pending data frames are read.
 368                 // Because the inputQ will not be read until the response
 369                 // headers are received, and because response headers won't be
 370                 // sent if the server sent RST_STREAM, then we must handle
 371                 // reset here directly unless responseHeadersReceived is true.
 372                 pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame);
 373             }
 374             if (!pushedToQueue) {
 375                 // RST_STREAM was not pushed to the queue: handle it.
 376                 try {
 377                     handleReset(frame);
 378                 } catch (IOException io) {
 379                     completeResponseExceptionally(io);
 380                 }
 381             } else {
 382                 // RST_STREAM was pushed to the queue. It will be handled by
 383                 // asyncReceive after all pending data frames have been
 384                 // processed.
 385                 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
 386             }
 387         }
 388     }
 389 
 390     void handleReset(ResetFrame frame) throws IOException {
 391         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
 392         if (!closed) {
 393             close();
 394             int error = frame.getErrorCode();
 395             throw new IOException(ErrorFrame.stringForCode(error));
 396         } else {
 397             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 398         }
 399     }
 400 
 401     void incoming_priority(PriorityFrame frame) {
 402         // TODO: implement priority
 403         throw new UnsupportedOperationException("Not implemented");
 404     }
 405 
 406     private void incoming_windowUpdate(WindowUpdateFrame frame)
 407         throws IOException
 408     {
 409         int amount = frame.getUpdate();
 410         if (amount <= 0) {
 411             Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
 412                          streamid, streamid, amount);
 413             connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 414         } else {
 415             assert streamid != 0;
 416             boolean success = windowController.increaseStreamWindow(amount, streamid);
 417             if (!success) {  // overflow
 418                 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 419             }
 420         }
 421     }
 422 
 423     void incoming_pushPromise(HttpRequestImpl pushReq,
 424                               PushedStream<?,T> pushStream)
 425         throws IOException
 426     {
 427         if (Log.requests()) {
 428             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 429         }
 430         PushGroup<?,T> pushGroup = exchange.getPushGroup();
 431         if (pushGroup == null || pushGroup.noMorePushes()) {
 432             cancelImpl(new IllegalStateException("unexpected push promise"
 433                 + " on stream " + streamid));
 434         }
 435 
 436         HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor();
 437 
 438         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
 439 
 440         Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest(
 441                 pushReq);
 442 
 443         if (!bpOpt.isPresent()) {
 444             IOException ex = new IOException("Stream "
 445                  + streamid + " cancelled by user");
 446             if (Log.trace()) {
 447                 Log.logTrace("No body processor for {0}: {1}", pushReq,
 448                             ex.getMessage());
 449             }
 450             pushStream.cancelImpl(ex);
 451             cf.completeExceptionally(ex);
 452             return;
 453         }
 454 
 455         pushGroup.addPush();
 456         pushStream.requestSent();
 457         pushStream.setPushHandler(bpOpt.get());
 458         // setup housekeeping for when the push is received
 459         // TODO: deal with ignoring of CF anti-pattern
 460         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
 461             if (Log.trace()) {
 462                 Log.logTrace("Push completed on stream {0} for {1}{2}",
 463                              pushStream.streamid, resp,
 464                              ((t==null) ? "": " with exception " + t));
 465             }
 466             if (t != null) {
 467                 pushGroup.pushError(t);
 468                 proc.onError(pushReq, t);
 469             } else {
 470                 proc.onResponse(resp);
 471             }
 472             pushGroup.pushCompleted();
 473         });
 474 
 475     }
 476 
 477     private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
 478         HttpHeadersImpl h = request.getSystemHeaders();
 479         if (contentLength > 0) {
 480             h.setHeader("content-length", Long.toString(contentLength));
 481         }
 482         setPseudoHeaderFields();
 483         OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(h, request.getUserHeaders(), this);
 484         if (contentLength == 0) {
 485             f.setFlag(HeadersFrame.END_STREAM);
 486             endStreamSent = true;
 487         }
 488         return f;
 489     }
 490 
 491     private void setPseudoHeaderFields() {
 492         HttpHeadersImpl hdrs = requestPseudoHeaders;
 493         String method = request.method();
 494         hdrs.setHeader(":method", method);
 495         URI uri = request.uri();
 496         hdrs.setHeader(":scheme", uri.getScheme());
 497         // TODO: userinfo deprecated. Needs to be removed
 498         hdrs.setHeader(":authority", uri.getAuthority());
 499         // TODO: ensure header names beginning with : not in user headers
 500         String query = uri.getQuery();
 501         String path = uri.getPath();
 502         if (path == null || path.isEmpty()) {
 503             if (method.equalsIgnoreCase("OPTIONS")) {
 504                 path = "*";
 505             } else {
 506                 path = "/";
 507             }
 508         }
 509         if (query != null) {
 510             path += "?" + query;
 511         }
 512         hdrs.setHeader(":path", path);
 513     }
 514 
 515     HttpHeadersImpl getRequestPseudoHeaders() {
 516         return requestPseudoHeaders;
 517     }
 518 
 519     @Override
 520     Response getResponse() throws IOException {
 521         try {
 522             if (request.duration() != null) {
 523                 Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)",
 524                              streamid,
 525                              request.duration().toMillis());
 526                 return getResponseAsync(null).get(
 527                         request.duration().toMillis(), TimeUnit.MILLISECONDS);
 528             } else {
 529                 Log.logTrace("Waiting for response (streamid={0})", streamid);
 530                 return getResponseAsync(null).join();
 531             }
 532         } catch (TimeoutException e) {
 533             Log.logTrace("Response timeout (streamid={0})", streamid);
 534             throw new HttpTimeoutException("Response timed out");
 535         } catch (InterruptedException | ExecutionException | CompletionException e) {
 536             Throwable t = e.getCause();
 537             Log.logTrace("Response failed (streamid={0}): {1}", streamid, t);
 538             if (t instanceof IOException) {
 539                 throw (IOException)t;
 540             }
 541             throw new IOException(e);
 542         } finally {
 543             Log.logTrace("Got response or failed (streamid={0})", streamid);
 544         }
 545     }
 546 
 547     /** Sets endStreamReceived. Should be called only once. */
 548     void setEndStreamReceived() {
 549         assert remotelyClosed == false: "Unexpected endStream already set";
 550         remotelyClosed = true;
 551         responseReceived();
 552     }
 553 
 554     /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
 555      *  received on this stream. */
 556     private boolean endStreamReceived() {
 557         return remotelyClosed;
 558     }
 559 
 560     @Override
 561     void sendHeadersOnly() throws IOException, InterruptedException {
 562         if (Log.requests() && request != null) {
 563             Log.logRequest(request.toString());
 564         }
 565         requestContentLen = requestProcessor.contentLength();
 566         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
 567         connection.sendFrame(f);
 568     }
 569 
 570     void registerStream(int id) {
 571         this.streamid = id;
 572         connection.putStream(this, streamid);
 573     }
 574 
 575     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
 576         // can be < 0 if the actual length is not known.
 577         private volatile long remainingContentLength;
 578         private volatile Subscription subscription;
 579 
 580         RequestSubscriber(long contentLen) {
 581             this.remainingContentLength = contentLen;
 582         }
 583 
 584         @Override
 585         public void onSubscribe(Flow.Subscription subscription) {
 586             if (this.subscription != null) {
 587                 throw new IllegalStateException();
 588             }
 589             this.subscription = subscription;
 590             subscription.request(1);
 591         }
 592 
 593         @Override
 594         public void onNext(ByteBuffer item) {
 595             if (requestBodyCF.isDone()) {
 596                 throw new IllegalStateException();
 597             }
 598 
 599             try {
 600                 while (item.hasRemaining()) {
 601                     assert !endStreamSent : "internal error, send data after END_STREAM flag";
 602                     DataFrame df = getDataFrame(item);
 603                     if (remainingContentLength > 0) {
 604                         remainingContentLength -= df.getDataLength();
 605                         assert remainingContentLength >= 0;
 606                         if (remainingContentLength == 0) {
 607                             df.setFlag(DataFrame.END_STREAM);
 608                             endStreamSent = true;
 609                         }
 610                     }
 611                     connection.sendDataFrame(df);
 612                 }
 613                 subscription.request(1);
 614             } catch (InterruptedException ex) {
 615                 subscription.cancel();
 616                 requestBodyCF.completeExceptionally(ex);
 617             }
 618         }
 619 
 620         @Override
 621         public void onError(Throwable throwable) {
 622             if (requestBodyCF.isDone()) {
 623                 return;
 624             }
 625             subscription.cancel();
 626             requestBodyCF.completeExceptionally(throwable);
 627         }
 628 
 629         @Override
 630         public void onComplete() {
 631             assert endStreamSent || remainingContentLength < 0;
 632             try {
 633                 if (!endStreamSent) {
 634                     endStreamSent = true;
 635                     connection.sendDataFrame(getEmptyEndStreamDataFrame());
 636                 }
 637                 requestBodyCF.complete(null);
 638             } catch (InterruptedException ex) {
 639                 requestBodyCF.completeExceptionally(ex);
 640             }
 641         }
 642     }
 643 
 644     DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException {
 645         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
 646         // blocks waiting for stream send window, if exhausted
 647         int actualAmount = windowController.tryAcquire(requestAmount, streamid);
 648         ByteBuffer outBuf = Utils.slice(buffer,  actualAmount);
 649         DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf));
 650         return df;
 651     }
 652 
 653     private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException {
 654         return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]);
 655     }
 656 
 657     /**
 658      * A List of responses relating to this stream. Normally there is only
 659      * one response, but intermediate responses like 100 are allowed
 660      * and must be passed up to higher level before continuing. Deals with races
 661      * such as if responses are returned before the CFs get created by
 662      * getResponseAsync()
 663      */
 664 
 665     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
 666 
 667     @Override
 668     CompletableFuture<Response> getResponseAsync(Executor executor) {
 669         CompletableFuture<Response> cf = null;
 670         // The code below deals with race condition that can be caused when
 671         // completeResponse() is being called before getResponseAsync()
 672         synchronized (response_cfs) {
 673             if (!response_cfs.isEmpty()) {
 674                 // This CompletableFuture was created by completeResponse().
 675                 // it will be already completed.
 676                 cf = response_cfs.remove(0);
 677                 // if we find a cf here it should be already completed.
 678                 // finding a non completed cf should not happen. just assert it.
 679                 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
 680             } else {
 681                 // getResponseAsync() is called first. Create a CompletableFuture
 682                 // that will be completed by completeResponse() when
 683                 // completeResponse() is called.
 684                 cf = new MinimalFuture<>();
 685                 response_cfs.add(cf);
 686             }
 687         }
 688         if (executor != null && !cf.isDone()) {
 689             // protect from executing later chain of CompletableFuture operations from SelectorManager thread
 690             cf = cf.thenApplyAsync(r -> r, executor);
 691         }
 692         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
 693         PushGroup<?,?> pg = exchange.getPushGroup();
 694         if (pg != null) {
 695             // if an error occurs make sure it is recorded in the PushGroup
 696             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 697         }
 698         return cf;
 699     }
 700 
 701     /**
 702      * Completes the first uncompleted CF on list, and removes it. If there is no
 703      * uncompleted CF then creates one (completes it) and adds to list
 704      */
 705     void completeResponse(Response resp) {
 706         synchronized (response_cfs) {
 707             CompletableFuture<Response> cf;
 708             int cfs_len = response_cfs.size();
 709             for (int i=0; i<cfs_len; i++) {
 710                 cf = response_cfs.get(i);
 711                 if (!cf.isDone()) {
 712                     Log.logTrace("Completing response (streamid={0}): {1}",
 713                                  streamid, cf);
 714                     cf.complete(resp);
 715                     response_cfs.remove(cf);
 716                     return;
 717                 } // else we found the previous response: just leave it alone.
 718             }
 719             cf = MinimalFuture.completedFuture(resp);
 720             Log.logTrace("Created completed future (streamid={0}): {1}",
 721                          streamid, cf);
 722             response_cfs.add(cf);
 723         }
 724     }
 725 
 726     // methods to update state and remove stream when finished
 727 
 728     synchronized void requestSent() {
 729         requestSent = true;
 730         if (responseReceived) {
 731             close();
 732         }
 733     }
 734 
 735     final synchronized boolean isResponseReceived() {
 736         return responseReceived;
 737     }
 738 
 739     synchronized void responseReceived() {
 740         responseReceived = true;
 741         if (requestSent) {
 742             close();
 743         }
 744     }
 745 
 746     /**
 747      * same as above but for errors
 748      */
 749     void completeResponseExceptionally(Throwable t) {
 750         synchronized (response_cfs) {
 751             // use index to avoid ConcurrentModificationException
 752             // caused by removing the CF from within the loop.
 753             for (int i = 0; i < response_cfs.size(); i++) {
 754                 CompletableFuture<Response> cf = response_cfs.get(i);
 755                 if (!cf.isDone()) {
 756                     cf.completeExceptionally(t);
 757                     response_cfs.remove(i);
 758                     return;
 759                 }
 760             }
 761             response_cfs.add(MinimalFuture.failedFuture(t));
 762         }
 763     }
 764 
 765     CompletableFuture<Void> sendBodyImpl() {
 766         RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
 767         requestProcessor.subscribe(subscriber);
 768         requestBodyCF.whenComplete((v,t) -> requestSent());
 769         return requestBodyCF;
 770     }
 771 
 772     @Override
 773     void cancel() {
 774         cancel(new IOException("Stream " + streamid + " cancelled"));
 775     }
 776 
 777     @Override
 778     void cancel(IOException cause) {
 779         cancelImpl(cause);
 780     }
 781 
 782     // This method sends a RST_STREAM frame
 783     void cancelImpl(Throwable e) {
 784         if (Log.trace()) {
 785             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
 786         }
 787         boolean closing;
 788         if (closing = !closed) { // assigning closing to !closed
 789             synchronized (this) {
 790                 if (closing = !closed) { // assigning closing to !closed
 791                     closed=true;
 792                 }
 793             }
 794         }
 795         if (closing) { // true if the stream has not been closed yet
 796             inputQ.close();
 797         }
 798         completeResponseExceptionally(e);
 799         try {
 800             // will send a RST_STREAM frame
 801             if (streamid != 0) {
 802                 connection.resetStream(streamid, ResetFrame.CANCEL);
 803             }
 804         } catch (IOException ex) {
 805             Log.logError(ex);
 806         }
 807     }
 808 
 809     // This method doesn't send any frame
 810     void close() {
 811         if (closed) return;
 812         synchronized(this) {
 813             if (closed) return;
 814             closed = true;
 815         }
 816         Log.logTrace("Closing stream {0}", streamid);
 817         inputQ.close();
 818         connection.closeStream(streamid);
 819         Log.logTrace("Stream {0} closed", streamid);
 820     }
 821 
 822     static class PushedStream<U,T> extends Stream<T> {
 823         final PushGroup<U,T> pushGroup;
 824         private final Stream<T> parent;      // used by server push streams
 825         // push streams need the response CF allocated up front as it is
 826         // given directly to user via the multi handler callback function.
 827         final CompletableFuture<Response> pushCF;
 828         final CompletableFuture<HttpResponse<T>> responseCF;
 829         final HttpRequestImpl pushReq;
 830         HttpResponse.BodyHandler<T> pushHandler;
 831 
 832         PushedStream(PushGroup<U,T> pushGroup, HttpClientImpl client,
 833                 Http2Connection connection, Stream<T> parent,
 834                 Exchange<T> pushReq) {
 835             // ## no request body possible, null window controller
 836             super(client, connection, pushReq, null);
 837             this.pushGroup = pushGroup;
 838             this.pushReq = pushReq.request();
 839             this.pushCF = new MinimalFuture<>();
 840             this.responseCF = new MinimalFuture<>();
 841             this.parent = parent;
 842         }
 843 
 844         CompletableFuture<HttpResponse<T>> responseCF() {
 845             return responseCF;
 846         }
 847 
 848         synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
 849             this.pushHandler = pushHandler;
 850         }
 851 
 852         synchronized HttpResponse.BodyHandler<T> getPushHandler() {
 853             // ignored parameters to function can be used as BodyHandler
 854             return this.pushHandler;
 855         }
 856 
 857         // Following methods call the super class but in case of
 858         // error record it in the PushGroup. The error method is called
 859         // with a null value when no error occurred (is a no-op)
 860         @Override
 861         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 862             return super.sendBodyAsync()
 863                         .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
 864         }
 865 
 866         @Override
 867         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 868             return super.sendHeadersAsync()
 869                         .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
 870         }
 871 
 872         @Override
 873         CompletableFuture<Response> getResponseAsync(Executor executor) {
 874             CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
 875             if(executor!=null && !cf.isDone()) {
 876                 cf  = cf.thenApplyAsync( r -> r, executor);
 877             }
 878             return cf;
 879         }
 880 
 881         @Override
 882         CompletableFuture<T> readBodyAsync(
 883                 HttpResponse.BodyHandler<T> handler,
 884                 boolean returnConnectionToPool,
 885                 Executor executor)
 886         {
 887             return super.readBodyAsync(handler, returnConnectionToPool, executor)
 888                         .whenComplete((v, t) -> pushGroup.pushError(t));
 889         }
 890 
 891         @Override
 892         void completeResponse(Response r) {
 893             HttpResponseImpl.logResponse(r);
 894             pushCF.complete(r); // not strictly required for push API
 895             // start reading the body using the obtained BodyProcessor
 896             CompletableFuture<Void> start = new MinimalFuture<>();
 897             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
 898                 .whenComplete((T body, Throwable t) -> {
 899                     if (t != null) {
 900                         responseCF.completeExceptionally(t);
 901                     } else {
 902                         HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange());
 903                         responseCF.complete(response);
 904                     }
 905                 });
 906             start.completeAsync(() -> null, getExchange().executor());
 907         }
 908 
 909         @Override
 910         void completeResponseExceptionally(Throwable t) {
 911             pushCF.completeExceptionally(t);
 912         }
 913 
 914         @Override
 915         synchronized void responseReceived() {
 916             super.responseReceived();
 917         }
 918 
 919         // create and return the PushResponseImpl
 920         @Override
 921         protected void handleResponse() {
 922             HttpConnection c = connection.connection; // TODO: improve
 923             responseCode = (int)responseHeaders
 924                 .firstValueAsLong(":status")
 925                 .orElse(-1);
 926 
 927             if (responseCode == -1) {
 928                 completeResponseExceptionally(new IOException("No status code"));
 929             }
 930 
 931             this.response = new Response(
 932                 pushReq, exchange, responseHeaders,
 933                 responseCode, HttpClient.Version.HTTP_2);
 934 
 935             this.responseContentLen = responseHeaders
 936                 .firstValueAsLong("content-length")
 937                 .orElse(-1L);
 938 
 939             if (Log.headers()) {
 940                 StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
 941                 sb.append(" (streamid=").append(streamid).append("): ");
 942                 Log.dumpHeaders(sb, "    ", responseHeaders);
 943                 Log.logHeaders(sb.toString());
 944             }
 945 
 946             // different implementations for normal streams and pushed streams
 947             completeResponse(response);
 948         }
 949     }
 950 
 951     final class StreamWindowUpdateSender extends WindowUpdateSender {
 952 
 953         StreamWindowUpdateSender(Http2Connection connection) {
 954             super(connection);
 955         }
 956 
 957         @Override
 958         int getStreamId() {
 959             return streamid;
 960         }
 961     }
 962 
 963 }