1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.net.URI;
  30 import java.nio.ByteBuffer;
  31 import java.util.ArrayList;
  32 import java.util.Arrays;
  33 import java.util.List;
  34 import java.util.Optional;
  35 import java.util.concurrent.CompletableFuture;
  36 import java.util.concurrent.CompletionException;
  37 import java.util.concurrent.ExecutionException;
  38 import java.util.concurrent.Executor;
  39 import java.util.concurrent.Flow;
  40 import java.util.concurrent.Flow.Subscription;
  41 import java.util.concurrent.TimeUnit;
  42 import java.util.concurrent.TimeoutException;
  43 import java.util.function.Consumer;
  44 
  45 import jdk.incubator.http.internal.common.*;
  46 import jdk.incubator.http.internal.frame.*;
  47 import jdk.incubator.http.internal.hpack.DecodingCallback;
  48 import static java.util.stream.Collectors.toList;
  49 
  50 /**
  51  * Http/2 Stream handling.
  52  *
  53  * REQUESTS
  54  *
  55  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  56  *
  57  * sendRequest() -- sendHeadersOnly() + sendBody()
  58  *
  59  * sendBody() -- in calling thread: obeys all flow control (so may block)
  60  *               obtains data from request body processor and places on connection
  61  *               outbound Q.
  62  *
  63  * sendBodyAsync() -- calls sendBody() in an executor thread.
  64  *
  65  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  66  *
  67  * sendRequestAsync() -- calls sendRequest() in an executor thread
  68  *
  69  * RESPONSES
  70  *
  71  * Multiple responses can be received per request. Responses are queued up on
  72  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
  73  * with the next response
  74  *
  75  * getResponseAsync() -- queries list of response CFs and returns first one
  76  *               if one exists. Otherwise, creates one and adds it to list
  77  *               and returns it. Completion is achieved through the
  78  *               incoming() upcall from connection reader thread.
  79  *
  80  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  81  *
  82  * responseBody() -- in calling thread: blocks for incoming DATA frames on
  83  *               stream inputQ. Obeys remote and local flow control so may block.
  84  *               Calls user response body processor with data buffers.
  85  *
  86  * responseBodyAsync() -- calls responseBody() in an executor thread.
  87  *
  88  * incoming() -- entry point called from connection reader thread. Frames are
  89  *               either handled immediately without blocking or for data frames
  90  *               placed on the stream's inputQ which is consumed by the stream's
  91  *               reader thread.
  92  *
  93  * PushedStream sub class
  94  * ======================
  95  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  96  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  97  * is created. PushedStream does not use responseCF list as there can be only
  98  * one response. The CF is created when the object created and when the response
  99  * HEADERS frame is received the object is completed.
 100  */
 101 class Stream<T> extends ExchangeImpl<T> {
 102 
 103     final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
 104 
 105     /**
 106      * This stream's identifier. Assigned lazily by the HTTP2Connection before
 107      * the stream's first frame is sent.
 108      */
 109     protected volatile int streamid;
 110 
 111     long responseContentLen = -1;
 112     long responseBytesProcessed = 0;
 113     long requestContentLen;
 114 
 115     final Http2Connection connection;
 116     HttpClientImpl client;
 117     final HttpRequestImpl request;
 118     final DecodingCallback rspHeadersConsumer;
 119     HttpHeadersImpl responseHeaders;
 120     final HttpHeadersImpl requestHeaders;
 121     final HttpHeadersImpl requestPseudoHeaders;
 122     HttpResponse.BodyProcessor<T> responseProcessor;
 123     final HttpRequest.BodyProcessor requestProcessor;
 124     volatile int responseCode;
 125     volatile Response response;
 126     volatile CompletableFuture<Response> responseCF;
 127     final AbstractPushPublisher<List<ByteBuffer>> publisher;
 128     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
 129 
 130     /** True if END_STREAM has been seen in a frame received on this stream. */
 131     private volatile boolean remotelyClosed;
 132     private volatile boolean closed;
 133     private volatile boolean endStreamSent;
 134 
 135     // state flags
 136     boolean requestSent, responseReceived, responseHeadersReceived;
 137 
 138     /**
 139      * A reference to this Stream's connection Send Window controller. The
 140      * stream MUST acquire the appropriate amount of Send Window before
 141      * sending any data. Will be null for PushStreams, as they cannot send data.
 142      */
 143     private final WindowController windowController;
 144     private final WindowUpdateSender windowUpdater;
 145 
 146     @Override
 147     HttpConnection connection() {
 148         return connection.connection;
 149     }
 150 
 151     @Override
 152     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
 153                                        boolean returnConnectionToPool,
 154                                        Executor executor)
 155     {
 156         Log.logTrace("Reading body on stream {0}", streamid);
 157         responseProcessor = handler.apply(responseCode, responseHeaders);
 158         publisher.subscribe(responseProcessor);
 159         CompletableFuture<T> cf = receiveData(executor);
 160 
 161         PushGroup<?,?> pg = exchange.getPushGroup();
 162         if (pg != null) {
 163             // if an error occurs make sure it is recorded in the PushGroup
 164             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 165         }
 166         return cf;
 167     }
 168 
 169     @Override
 170     T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool)
 171         throws IOException
 172     {
 173         CompletableFuture<T> cf = readBodyAsync(handler,
 174                                                 returnConnectionToPool,
 175                                                 null);
 176         try {
 177             return cf.join();
 178         } catch (CompletionException e) {
 179             throw Utils.getIOException(e);
 180         }
 181     }
 182 
 183     @Override
 184     public String toString() {
 185         StringBuilder sb = new StringBuilder();
 186         sb.append("streamid: ")
 187                 .append(streamid);
 188         return sb.toString();
 189     }
 190 
 191     private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
 192         if (frame instanceof ResetFrame) {
 193             handleReset((ResetFrame) frame);
 194             return true;
 195         } else if (!(frame instanceof DataFrame)) {
 196             assert false;
 197             return true;
 198         }
 199         DataFrame df = (DataFrame) frame;
 200         // RFC 7540 6.1:
 201         // The entire DATA frame payload is included in flow control,
 202         // including the Pad Length and Padding fields if present
 203         int len = df.payloadLength();
 204         ByteBufferReference[] buffers = df.getData();
 205         List<ByteBuffer> dsts = Arrays.stream(buffers)
 206                 .map(ByteBufferReference::get)
 207                 .filter(ByteBuffer::hasRemaining)
 208                 .collect(toList());
 209         if (!dsts.isEmpty()) {
 210             publisher.acceptData(Optional.of(dsts));
 211         }
 212         connection.windowUpdater.update(len);
 213         if (df.getFlag(DataFrame.END_STREAM)) {
 214             setEndStreamReceived();
 215             publisher.acceptData(Optional.empty());
 216             return false;
 217         }
 218         // Don't send window update on a stream which is
 219         // closed or half closed.
 220         windowUpdater.update(len);
 221         return true;
 222     }
 223 
 224     // pushes entire response body into response processor
 225     // blocking when required by local or remote flow control
 226     CompletableFuture<T> receiveData(Executor executor) {
 227         CompletableFuture<T> cf = responseProcessor
 228                 .getBody()
 229                 .toCompletableFuture();
 230         Consumer<Throwable> onError = e -> {
 231             Log.logTrace("receiveData: {0}", e.toString());
 232             e.printStackTrace();
 233             cf.completeExceptionally(e);
 234             publisher.acceptError(e);
 235         };
 236         if (executor == null) {
 237             inputQ.blockingReceive(this::receiveDataFrame, onError);
 238         } else {
 239             inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
 240         }
 241         return cf;
 242     }
 243 
 244     @Override
 245     void sendBody() throws IOException {
 246         try {
 247             sendBodyImpl().join();
 248         } catch (CompletionException e) {
 249             throw Utils.getIOException(e);
 250         }
 251     }
 252 
 253     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 254         return sendBodyImpl().thenApply( v -> this);
 255     }
 256 
 257     @SuppressWarnings("unchecked")
 258     Stream(HttpClientImpl client,
 259            Http2Connection connection,
 260            Exchange<T> e,
 261            WindowController windowController)
 262     {
 263         super(e);
 264         this.client = client;
 265         this.connection = connection;
 266         this.windowController = windowController;
 267         this.request = e.request();
 268         this.requestProcessor = request.requestProcessor;
 269         responseHeaders = new HttpHeadersImpl();
 270         requestHeaders = new HttpHeadersImpl();
 271         rspHeadersConsumer = (name, value) -> {
 272             responseHeaders.addHeader(name.toString(), value.toString());
 273             if (Log.headers() && Log.trace()) {
 274                 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
 275                              streamid, name, value);
 276             }
 277         };
 278         this.requestPseudoHeaders = new HttpHeadersImpl();
 279         // NEW
 280         this.publisher = new BlockingPushPublisher<>();
 281         this.windowUpdater = new StreamWindowUpdateSender(connection);
 282     }
 283 
 284     /**
 285      * Entry point from Http2Connection reader thread.
 286      *
 287      * Data frames will be removed by response body thread.
 288      */
 289     void incoming(Http2Frame frame) throws IOException {
 290         if ((frame instanceof HeaderFrame)) {
 291             HeaderFrame hframe = (HeaderFrame)frame;
 292             if (hframe.endHeaders()) {
 293                 Log.logTrace("handling response (streamid={0})", streamid);
 294                 handleResponse();
 295                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
 296                     inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]));
 297                 }
 298             }
 299         } else if (frame instanceof DataFrame) {
 300             inputQ.put(frame);
 301         } else {
 302             otherFrame(frame);
 303         }
 304     }
 305 
 306     void otherFrame(Http2Frame frame) throws IOException {
 307         switch (frame.type()) {
 308             case WindowUpdateFrame.TYPE:
 309                 incoming_windowUpdate((WindowUpdateFrame) frame);
 310                 break;
 311             case ResetFrame.TYPE:
 312                 incoming_reset((ResetFrame) frame);
 313                 break;
 314             case PriorityFrame.TYPE:
 315                 incoming_priority((PriorityFrame) frame);
 316                 break;
 317             default:
 318                 String msg = "Unexpected frame: " + frame.toString();
 319                 throw new IOException(msg);
 320         }
 321     }
 322 
 323     // The Hpack decoder decodes into one of these consumers of name,value pairs
 324 
 325     DecodingCallback rspHeadersConsumer() {
 326         return rspHeadersConsumer;
 327     }
 328 
 329     protected void handleResponse() throws IOException {
 330         synchronized(this) {
 331             responseHeadersReceived = true;
 332         }
 333         HttpConnection c = connection.connection; // TODO: improve
 334         responseCode = (int)responseHeaders
 335                 .firstValueAsLong(":status")
 336                 .orElseThrow(() -> new IOException("no statuscode in response"));
 337 
 338         response = new Response(
 339                 request, exchange, responseHeaders,
 340                 responseCode, HttpClient.Version.HTTP_2);
 341 
 342         this.responseContentLen = responseHeaders
 343                 .firstValueAsLong("content-length")
 344                 .orElse(-1L);
 345 
 346         if (Log.headers()) {
 347             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 348             Log.dumpHeaders(sb, "    ", responseHeaders);
 349             Log.logHeaders(sb.toString());
 350         }
 351 
 352         completeResponse(response);
 353     }
 354 
 355     void incoming_reset(ResetFrame frame) throws IOException {
 356         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
 357         if (endStreamReceived()) {
 358             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
 359         } else if (closed) {
 360             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 361         } else {
 362             boolean pushedToQueue = false;
 363             synchronized(this) {
 364                 // if the response headers are not yet
 365                 // received, or the inputQueue is closed, handle reset directly.
 366                 // Otherwise, put it in the input queue in order to read all
 367                 // pending data frames first. Indeed, a server may send
 368                 // RST_STREAM after sending END_STREAM, in which case we should
 369                 // ignore it. However, we won't know if we have received END_STREAM
 370                 // or not until all pending data frames are read.
 371                 // Because the inputQ will not be read until the response
 372                 // headers are received, and because response headers won't be
 373                 // sent if the server sent RST_STREAM, then we must handle
 374                 // reset here directly unless responseHeadersReceived is true.
 375                 pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame);
 376             }
 377             if (!pushedToQueue) {
 378                 // RST_STREAM was not pushed to the queue: handle it.
 379                 try {
 380                     handleReset(frame);
 381                 } catch (IOException io) {
 382                     completeResponseExceptionally(io);
 383                 }
 384             } else {
 385                 // RST_STREAM was pushed to the queue. It will be handled by
 386                 // asyncReceive after all pending data frames have been
 387                 // processed.
 388                 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
 389             }
 390         }
 391     }
 392 
 393     void handleReset(ResetFrame frame) throws IOException {
 394         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
 395         if (!closed) {
 396             close();
 397             int error = frame.getErrorCode();
 398             throw new IOException(ErrorFrame.stringForCode(error));
 399         } else {
 400             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 401         }
 402     }
 403 
 404     void incoming_priority(PriorityFrame frame) {
 405         // TODO: implement priority
 406         throw new UnsupportedOperationException("Not implemented");
 407     }
 408 
 409     private void incoming_windowUpdate(WindowUpdateFrame frame)
 410         throws IOException
 411     {
 412         int amount = frame.getUpdate();
 413         if (amount <= 0) {
 414             Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
 415                          streamid, streamid, amount);
 416             connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 417         } else {
 418             assert streamid != 0;
 419             boolean success = windowController.increaseStreamWindow(amount, streamid);
 420             if (!success) {  // overflow
 421                 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 422             }
 423         }
 424     }
 425 
 426     void incoming_pushPromise(HttpRequestImpl pushReq,
 427                               PushedStream<?,T> pushStream)
 428         throws IOException
 429     {
 430         if (Log.requests()) {
 431             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 432         }
 433         PushGroup<?,T> pushGroup = exchange.getPushGroup();
 434         if (pushGroup == null || pushGroup.noMorePushes()) {
 435             cancelImpl(new IllegalStateException("unexpected push promise"
 436                 + " on stream " + streamid));
 437         }
 438 
 439         HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor();
 440 
 441         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
 442 
 443         Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest(
 444                 pushReq);
 445 
 446         if (!bpOpt.isPresent()) {
 447             IOException ex = new IOException("Stream "
 448                  + streamid + " cancelled by user");
 449             if (Log.trace()) {
 450                 Log.logTrace("No body processor for {0}: {1}", pushReq,
 451                             ex.getMessage());
 452             }
 453             pushStream.cancelImpl(ex);
 454             cf.completeExceptionally(ex);
 455             return;
 456         }
 457 
 458         pushGroup.addPush();
 459         pushStream.requestSent();
 460         pushStream.setPushHandler(bpOpt.get());
 461         // setup housekeeping for when the push is received
 462         // TODO: deal with ignoring of CF anti-pattern
 463         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
 464             if (Log.trace()) {
 465                 Log.logTrace("Push completed on stream {0} for {1}{2}",
 466                              pushStream.streamid, resp,
 467                              ((t==null) ? "": " with exception " + t));
 468             }
 469             if (t != null) {
 470                 pushGroup.pushError(t);
 471                 proc.onError(pushReq, t);
 472             } else {
 473                 proc.onResponse(resp);
 474             }
 475             pushGroup.pushCompleted();
 476         });
 477 
 478     }
 479 
 480     private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
 481         HttpHeadersImpl h = request.getSystemHeaders();
 482         if (contentLength > 0) {
 483             h.setHeader("content-length", Long.toString(contentLength));
 484         }
 485         setPseudoHeaderFields();
 486         OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(h, request.getUserHeaders(), this);
 487         if (contentLength == 0) {
 488             f.setFlag(HeadersFrame.END_STREAM);
 489             endStreamSent = true;
 490         }
 491         return f;
 492     }
 493 
 494     private void setPseudoHeaderFields() {
 495         HttpHeadersImpl hdrs = requestPseudoHeaders;
 496         String method = request.method();
 497         hdrs.setHeader(":method", method);
 498         URI uri = request.uri();
 499         hdrs.setHeader(":scheme", uri.getScheme());
 500         // TODO: userinfo deprecated. Needs to be removed
 501         hdrs.setHeader(":authority", uri.getAuthority());
 502         // TODO: ensure header names beginning with : not in user headers
 503         String query = uri.getQuery();
 504         String path = uri.getPath();
 505         if (path == null || path.isEmpty()) {
 506             if (method.equalsIgnoreCase("OPTIONS")) {
 507                 path = "*";
 508             } else {
 509                 path = "/";
 510             }
 511         }
 512         if (query != null) {
 513             path += "?" + query;
 514         }
 515         hdrs.setHeader(":path", path);
 516     }
 517 
 518     HttpHeadersImpl getRequestPseudoHeaders() {
 519         return requestPseudoHeaders;
 520     }
 521 
 522     @Override
 523     Response getResponse() throws IOException {
 524         try {
 525             if (request.duration() != null) {
 526                 Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)",
 527                              streamid,
 528                              request.duration().toMillis());
 529                 return getResponseAsync(null).get(
 530                         request.duration().toMillis(), TimeUnit.MILLISECONDS);
 531             } else {
 532                 Log.logTrace("Waiting for response (streamid={0})", streamid);
 533                 return getResponseAsync(null).join();
 534             }
 535         } catch (TimeoutException e) {
 536             Log.logTrace("Response timeout (streamid={0})", streamid);
 537             throw new HttpTimeoutException("Response timed out");
 538         } catch (InterruptedException | ExecutionException | CompletionException e) {
 539             Throwable t = e.getCause();
 540             Log.logTrace("Response failed (streamid={0}): {1}", streamid, t);
 541             if (t instanceof IOException) {
 542                 throw (IOException)t;
 543             }
 544             throw new IOException(e);
 545         } finally {
 546             Log.logTrace("Got response or failed (streamid={0})", streamid);
 547         }
 548     }
 549 
 550     /** Sets endStreamReceived. Should be called only once. */
 551     void setEndStreamReceived() {
 552         assert remotelyClosed == false: "Unexpected endStream already set";
 553         remotelyClosed = true;
 554         responseReceived();
 555     }
 556 
 557     /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
 558      *  received on this stream. */
 559     private boolean endStreamReceived() {
 560         return remotelyClosed;
 561     }
 562 
 563     @Override
 564     void sendHeadersOnly() throws IOException, InterruptedException {
 565         if (Log.requests() && request != null) {
 566             Log.logRequest(request.toString());
 567         }
 568         requestContentLen = requestProcessor.contentLength();
 569         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
 570         connection.sendFrame(f);
 571     }
 572 
 573     void registerStream(int id) {
 574         this.streamid = id;
 575         connection.putStream(this, streamid);
 576     }
 577 
 578     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
 579         // can be < 0 if the actual length is not known.
 580         private volatile long remainingContentLength;
 581         private volatile Subscription subscription;
 582 
 583         RequestSubscriber(long contentLen) {
 584             this.remainingContentLength = contentLen;
 585         }
 586 
 587         @Override
 588         public void onSubscribe(Flow.Subscription subscription) {
 589             if (this.subscription != null) {
 590                 throw new IllegalStateException();
 591             }
 592             this.subscription = subscription;
 593             subscription.request(1);
 594         }
 595 
 596         @Override
 597         public void onNext(ByteBuffer item) {
 598             if (requestBodyCF.isDone()) {
 599                 throw new IllegalStateException();
 600             }
 601 
 602             try {
 603                 while (item.hasRemaining()) {
 604                     assert !endStreamSent : "internal error, send data after END_STREAM flag";
 605                     DataFrame df = getDataFrame(item);
 606                     if (remainingContentLength > 0) {
 607                         remainingContentLength -= df.getDataLength();
 608                         assert remainingContentLength >= 0;
 609                         if (remainingContentLength == 0) {
 610                             df.setFlag(DataFrame.END_STREAM);
 611                             endStreamSent = true;
 612                         }
 613                     }
 614                     connection.sendDataFrame(df);
 615                 }
 616                 subscription.request(1);
 617             } catch (InterruptedException ex) {
 618                 subscription.cancel();
 619                 requestBodyCF.completeExceptionally(ex);
 620             }
 621         }
 622 
 623         @Override
 624         public void onError(Throwable throwable) {
 625             if (requestBodyCF.isDone()) {
 626                 return;
 627             }
 628             subscription.cancel();
 629             requestBodyCF.completeExceptionally(throwable);
 630         }
 631 
 632         @Override
 633         public void onComplete() {
 634             assert endStreamSent || remainingContentLength < 0;
 635             try {
 636                 if (!endStreamSent) {
 637                     endStreamSent = true;
 638                     connection.sendDataFrame(getEmptyEndStreamDataFrame());
 639                 }
 640                 requestBodyCF.complete(null);
 641             } catch (InterruptedException ex) {
 642                 requestBodyCF.completeExceptionally(ex);
 643             }
 644         }
 645     }
 646 
 647     DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException {
 648         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
 649         // blocks waiting for stream send window, if exhausted
 650         int actualAmount = windowController.tryAcquire(requestAmount, streamid);
 651         ByteBuffer outBuf = Utils.slice(buffer,  actualAmount);
 652         DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf));
 653         return df;
 654     }
 655 
 656     private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException {
 657         return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]);
 658     }
 659 
 660     /**
 661      * A List of responses relating to this stream. Normally there is only
 662      * one response, but intermediate responses like 100 are allowed
 663      * and must be passed up to higher level before continuing. Deals with races
 664      * such as if responses are returned before the CFs get created by
 665      * getResponseAsync()
 666      */
 667 
 668     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
 669 
 670     @Override
 671     CompletableFuture<Response> getResponseAsync(Executor executor) {
 672         CompletableFuture<Response> cf = null;
 673         // The code below deals with race condition that can be caused when
 674         // completeResponse() is being called before getResponseAsync()
 675         synchronized (response_cfs) {
 676             if (!response_cfs.isEmpty()) {
 677                 // This CompletableFuture was created by completeResponse().
 678                 // it will be already completed.
 679                 cf = response_cfs.remove(0);
 680                 // if we find a cf here it should be already completed.
 681                 // finding a non completed cf should not happen. just assert it.
 682                 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
 683             } else {
 684                 // getResponseAsync() is called first. Create a CompletableFuture
 685                 // that will be completed by completeResponse() when
 686                 // completeResponse() is called.
 687                 cf = new MinimalFuture<>();
 688                 response_cfs.add(cf);
 689             }
 690         }
 691         if (executor != null && !cf.isDone()) {
 692             // protect from executing later chain of CompletableFuture operations from SelectorManager thread
 693             cf = cf.thenApplyAsync(r -> r, executor);
 694         }
 695         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
 696         PushGroup<?,?> pg = exchange.getPushGroup();
 697         if (pg != null) {
 698             // if an error occurs make sure it is recorded in the PushGroup
 699             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 700         }
 701         return cf;
 702     }
 703 
 704     /**
 705      * Completes the first uncompleted CF on list, and removes it. If there is no
 706      * uncompleted CF then creates one (completes it) and adds to list
 707      */
 708     void completeResponse(Response resp) {
 709         synchronized (response_cfs) {
 710             CompletableFuture<Response> cf;
 711             int cfs_len = response_cfs.size();
 712             for (int i=0; i<cfs_len; i++) {
 713                 cf = response_cfs.get(i);
 714                 if (!cf.isDone()) {
 715                     Log.logTrace("Completing response (streamid={0}): {1}",
 716                                  streamid, cf);
 717                     cf.complete(resp);
 718                     response_cfs.remove(cf);
 719                     return;
 720                 } // else we found the previous response: just leave it alone.
 721             }
 722             cf = MinimalFuture.completedFuture(resp);
 723             Log.logTrace("Created completed future (streamid={0}): {1}",
 724                          streamid, cf);
 725             response_cfs.add(cf);
 726         }
 727     }
 728 
 729     // methods to update state and remove stream when finished
 730 
 731     synchronized void requestSent() {
 732         requestSent = true;
 733         if (responseReceived) {
 734             close();
 735         }
 736     }
 737 
 738     final synchronized boolean isResponseReceived() {
 739         return responseReceived;
 740     }
 741 
 742     synchronized void responseReceived() {
 743         responseReceived = true;
 744         if (requestSent) {
 745             close();
 746         }
 747     }
 748 
 749     /**
 750      * same as above but for errors
 751      */
 752     void completeResponseExceptionally(Throwable t) {
 753         synchronized (response_cfs) {
 754             // use index to avoid ConcurrentModificationException
 755             // caused by removing the CF from within the loop.
 756             for (int i = 0; i < response_cfs.size(); i++) {
 757                 CompletableFuture<Response> cf = response_cfs.get(i);
 758                 if (!cf.isDone()) {
 759                     cf.completeExceptionally(t);
 760                     response_cfs.remove(i);
 761                     return;
 762                 }
 763             }
 764             response_cfs.add(MinimalFuture.failedFuture(t));
 765         }
 766     }
 767 
 768     CompletableFuture<Void> sendBodyImpl() {
 769         RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
 770         requestProcessor.subscribe(subscriber);
 771         requestBodyCF.whenComplete((v,t) -> requestSent());
 772         return requestBodyCF;
 773     }
 774 
 775     @Override
 776     void cancel() {
 777         cancel(new IOException("Stream " + streamid + " cancelled"));
 778     }
 779 
 780     @Override
 781     void cancel(IOException cause) {
 782         cancelImpl(cause);
 783     }
 784 
 785     // This method sends a RST_STREAM frame
 786     void cancelImpl(Throwable e) {
 787         if (Log.trace()) {
 788             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
 789         }
 790         boolean closing;
 791         if (closing = !closed) { // assigning closing to !closed
 792             synchronized (this) {
 793                 if (closing = !closed) { // assigning closing to !closed
 794                     closed=true;
 795                 }
 796             }
 797         }
 798         if (closing) { // true if the stream has not been closed yet
 799             inputQ.close();
 800         }
 801         completeResponseExceptionally(e);
 802         try {
 803             // will send a RST_STREAM frame
 804             if (streamid != 0) {
 805                 connection.resetStream(streamid, ResetFrame.CANCEL);
 806             }
 807         } catch (IOException ex) {
 808             Log.logError(ex);
 809         }
 810     }
 811 
 812     // This method doesn't send any frame
 813     void close() {
 814         if (closed) return;
 815         synchronized(this) {
 816             if (closed) return;
 817             closed = true;
 818         }
 819         Log.logTrace("Closing stream {0}", streamid);
 820         inputQ.close();
 821         connection.closeStream(streamid);
 822         Log.logTrace("Stream {0} closed", streamid);
 823     }
 824 
 825     static class PushedStream<U,T> extends Stream<T> {
 826         final PushGroup<U,T> pushGroup;
 827         private final Stream<T> parent;      // used by server push streams
 828         // push streams need the response CF allocated up front as it is
 829         // given directly to user via the multi handler callback function.
 830         final CompletableFuture<Response> pushCF;
 831         final CompletableFuture<HttpResponse<T>> responseCF;
 832         final HttpRequestImpl pushReq;
 833         HttpResponse.BodyHandler<T> pushHandler;
 834 
 835         PushedStream(PushGroup<U,T> pushGroup, HttpClientImpl client,
 836                 Http2Connection connection, Stream<T> parent,
 837                 Exchange<T> pushReq) {
 838             // ## no request body possible, null window controller
 839             super(client, connection, pushReq, null);
 840             this.pushGroup = pushGroup;
 841             this.pushReq = pushReq.request();
 842             this.pushCF = new MinimalFuture<>();
 843             this.responseCF = new MinimalFuture<>();
 844             this.parent = parent;
 845         }
 846 
 847         CompletableFuture<HttpResponse<T>> responseCF() {
 848             return responseCF;
 849         }
 850 
 851         synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
 852             this.pushHandler = pushHandler;
 853         }
 854 
 855         synchronized HttpResponse.BodyHandler<T> getPushHandler() {
 856             // ignored parameters to function can be used as BodyHandler
 857             return this.pushHandler;
 858         }
 859 
 860         // Following methods call the super class but in case of
 861         // error record it in the PushGroup. The error method is called
 862         // with a null value when no error occurred (is a no-op)
 863         @Override
 864         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 865             return super.sendBodyAsync()
 866                         .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
 867         }
 868 
 869         @Override
 870         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 871             return super.sendHeadersAsync()
 872                         .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
 873         }
 874 
 875         @Override
 876         CompletableFuture<Response> getResponseAsync(Executor executor) {
 877             CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
 878             if(executor!=null && !cf.isDone()) {
 879                 cf  = cf.thenApplyAsync( r -> r, executor);
 880             }
 881             return cf;
 882         }
 883 
 884         @Override
 885         CompletableFuture<T> readBodyAsync(
 886                 HttpResponse.BodyHandler<T> handler,
 887                 boolean returnConnectionToPool,
 888                 Executor executor)
 889         {
 890             return super.readBodyAsync(handler, returnConnectionToPool, executor)
 891                         .whenComplete((v, t) -> pushGroup.pushError(t));
 892         }
 893 
 894         @Override
 895         void completeResponse(Response r) {
 896             HttpResponseImpl.logResponse(r);
 897             pushCF.complete(r); // not strictly required for push API
 898             // start reading the body using the obtained BodyProcessor
 899             CompletableFuture<Void> start = new MinimalFuture<>();
 900             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
 901                 .whenComplete((T body, Throwable t) -> {
 902                     if (t != null) {
 903                         responseCF.completeExceptionally(t);
 904                     } else {
 905                         HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange());
 906                         responseCF.complete(response);
 907                     }
 908                 });
 909             start.completeAsync(() -> null, getExchange().executor());
 910         }
 911 
 912         @Override
 913         void completeResponseExceptionally(Throwable t) {
 914             pushCF.completeExceptionally(t);
 915         }
 916 
 917         @Override
 918         synchronized void responseReceived() {
 919             super.responseReceived();
 920         }
 921 
 922         // create and return the PushResponseImpl
 923         @Override
 924         protected void handleResponse() {
 925             HttpConnection c = connection.connection; // TODO: improve
 926             responseCode = (int)responseHeaders
 927                 .firstValueAsLong(":status")
 928                 .orElse(-1);
 929 
 930             if (responseCode == -1) {
 931                 completeResponseExceptionally(new IOException("No status code"));
 932             }
 933 
 934             this.response = new Response(
 935                 pushReq, exchange, responseHeaders,
 936                 responseCode, HttpClient.Version.HTTP_2);
 937 
 938             this.responseContentLen = responseHeaders
 939                 .firstValueAsLong("content-length")
 940                 .orElse(-1L);
 941 
 942             if (Log.headers()) {
 943                 StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
 944                 sb.append(" (streamid=").append(streamid).append("): ");
 945                 Log.dumpHeaders(sb, "    ", responseHeaders);
 946                 Log.logHeaders(sb.toString());
 947             }
 948 
 949             // different implementations for normal streams and pushed streams
 950             completeResponse(response);
 951         }
 952     }
 953 
 954     final class StreamWindowUpdateSender extends WindowUpdateSender {
 955 
 956         StreamWindowUpdateSender(Http2Connection connection) {
 957             super(connection);
 958         }
 959 
 960         @Override
 961         int getStreamId() {
 962             return streamid;
 963         }
 964     }
 965 
 966 }