1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.net.http;
  27 
  28 import sun.net.httpclient.hpack.DecodingCallback;
  29 
  30 import java.io.IOException;
  31 import java.net.URI;
  32 import java.nio.ByteBuffer;
  33 import java.util.LinkedList;
  34 import java.util.ArrayList;
  35 import java.util.List;
  36 import java.util.concurrent.CompletableFuture;
  37 import java.util.concurrent.CompletionException;
  38 import java.util.concurrent.ExecutionException;
  39 import java.util.concurrent.TimeUnit;
  40 import java.util.concurrent.TimeoutException;
  41 import java.util.function.BiFunction;
  42 import java.util.function.LongConsumer;
  43 
  44 /**
  45  * Http/2 Stream handling.
  46  *
  47  * REQUESTS
  48  *
  49  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  50  *
  51  * sendRequest() -- sendHeadersOnly() + sendBody()
  52  *
  53  * sendBody() -- in calling thread: obeys all flow control (so may block)
  54  *               obtains data from request body processor and places on connection
  55  *               outbound Q.
  56  *
  57  * sendBodyAsync() -- calls sendBody() in an executor thread.
  58  *
  59  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  60  *
  61  * sendRequestAsync() -- calls sendRequest() in an executor thread
  62  *
  63  * RESPONSES
  64  *
  65  * Multiple responses can be received per request. Responses are queued up on
  66  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
  67  * with the next response
  68  *
  69  * getResponseAsync() -- queries list of response CFs and returns first one
  70  *               if one exists. Otherwise, creates one and adds it to list
  71  *               and returns it. Completion is achieved through the
  72  *               incoming() upcall from connection reader thread.
  73  *
  74  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  75  *
  76  * responseBody() -- in calling thread: blocks for incoming DATA frames on
  77  *               stream inputQ. Obeys remote and local flow control so may block.
  78  *               Calls user response body processor with data buffers.
  79  *
  80  * responseBodyAsync() -- calls responseBody() in an executor thread.
  81  *
  82  * incoming() -- entry point called from connection reader thread. Frames are
  83  *               either handled immediately without blocking or for data frames
  84  *               placed on the stream's inputQ which is consumed by the stream's
  85  *               reader thread.
  86  *
  87  * PushedStream sub class
  88  * ======================
  89  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  90  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  91  * is created. PushedStream does not use responseCF list as there can be only
  92  * one response. The CF is created when the object created and when the response
  93  * HEADERS frame is received the object is completed.
  94  */
  95 class Stream extends ExchangeImpl {
  96 
  97     final Queue<Http2Frame> inputQ;
  98 
  99     volatile int streamid;
 100 
 101     long responseContentLen = -1;
 102     long responseBytesProcessed = 0;
 103     long requestContentLen;
 104 
 105     Http2Connection connection;
 106     HttpClientImpl client;
 107     final HttpRequestImpl request;
 108     final DecodingCallback rspHeadersConsumer;
 109     HttpHeadersImpl responseHeaders;
 110     final HttpHeadersImpl requestHeaders;
 111     final HttpHeadersImpl requestPseudoHeaders;
 112     HttpResponse.BodyProcessor<?> responseProcessor;
 113     final HttpRequest.BodyProcessor requestProcessor;
 114     HttpResponse response;
 115 
 116     // state flags
 117     boolean requestSent, responseReceived;
 118 
 119     final FlowController userRequestFlowController =
 120             new FlowController();
 121     final FlowController remoteRequestFlowController =
 122             new FlowController();
 123     final FlowController responseFlowController =
 124             new FlowController();
 125 
 126     final ExecutorWrapper executor;
 127 
 128     @Override
 129     @SuppressWarnings("unchecked")
 130     <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
 131         this.responseProcessor = processor;
 132         CompletableFuture<T> cf;
 133         try {
 134             T body = processor.onResponseBodyStart(
 135                     responseContentLen, responseHeaders,
 136                     responseFlowController); // TODO: filter headers
 137             if (body != null) {
 138                 cf = CompletableFuture.completedFuture(body);
 139                 receiveDataAsync(processor);
 140             } else
 141                 cf = receiveDataAsync(processor);
 142         } catch (IOException e) {
 143             cf = CompletableFuture.failedFuture(e);
 144         }
 145         PushGroup<?> pg = request.pushGroup();
 146         if (pg != null) {
 147             // if an error occurs make sure it is recorded in the PushGroup
 148             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 149         }
 150         return cf;
 151     }
 152 
 153     @Override
 154     public String toString() {
 155         StringBuilder sb = new StringBuilder();
 156         sb.append("streamid: ")
 157                 .append(streamid);
 158         return sb.toString();
 159     }
 160 
 161     // pushes entire response body into response processor
 162     // blocking when required by local or remote flow control
 163     void receiveData() throws IOException {
 164         Http2Frame frame;
 165         DataFrame df = null;
 166         try {
 167             do {
 168                 frame = inputQ.take();
 169                 if (!(frame instanceof DataFrame)) {
 170                     assert false;
 171                     continue;
 172                 }
 173                 df = (DataFrame) frame;
 174                 int len = df.getDataLength();
 175                 ByteBuffer[] buffers = df.getData();
 176                 for (ByteBuffer b : buffers) {
 177                     responseFlowController.take();
 178                     responseProcessor.onResponseBodyChunk(b);
 179                 }
 180                 sendWindowUpdate(len);
 181             } while (!df.getFlag(DataFrame.END_STREAM));
 182         } catch (InterruptedException e) {
 183             throw new IOException(e);
 184         }
 185     }
 186 
 187     private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
 188         CompletableFuture<T> cf = new CompletableFuture<>();
 189         executor.execute(() -> {
 190             try {
 191                 receiveData();
 192                 T body = processor.onResponseComplete();
 193                 cf.complete(body);
 194                 responseReceived();
 195             } catch (Throwable t) {
 196                 cf.completeExceptionally(t);
 197             }
 198         }, null);
 199         return cf;
 200     }
 201 
 202     private void sendWindowUpdate(int increment)
 203             throws IOException, InterruptedException {
 204         if (increment == 0)
 205             return;
 206         LinkedList<Http2Frame> list = new LinkedList<>();
 207         WindowUpdateFrame frame = new WindowUpdateFrame();
 208         frame.streamid(streamid);
 209         frame.setUpdate(increment);
 210         list.add(frame);
 211         frame = new WindowUpdateFrame();
 212         frame.streamid(0);
 213         frame.setUpdate(increment);
 214         list.add(frame);
 215         connection.sendFrames(list);
 216     }
 217 
 218     @Override
 219     CompletableFuture<Void> sendBodyAsync() {
 220         final CompletableFuture<Void> cf = new CompletableFuture<>();
 221         executor.execute(() -> {
 222             try {
 223                 sendBodyImpl();
 224                 cf.complete(null);
 225             } catch (IOException | InterruptedException e) {
 226                 cf.completeExceptionally(e);
 227             }
 228         }, null);
 229         return cf;
 230     }
 231 
 232     @SuppressWarnings("unchecked")
 233     Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
 234         super(e);
 235         this.client = client;
 236         this.connection = connection;
 237         this.request = e.request();
 238         this.requestProcessor = request.requestProcessor();
 239         responseHeaders = new HttpHeadersImpl();
 240         requestHeaders = new HttpHeadersImpl();
 241         rspHeadersConsumer = (name, value) -> {
 242             responseHeaders.addHeader(name.toString(), value.toString());
 243         };
 244         this.executor = client.executorWrapper();
 245         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 246         this.requestPseudoHeaders = new HttpHeadersImpl();
 247         // NEW
 248         this.inputQ = new Queue<>();
 249     }
 250 
 251     @SuppressWarnings("unchecked")
 252     Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
 253         super(null);
 254         this.client = client;
 255         this.connection = connection;
 256         this.request = req;
 257         this.requestProcessor = null;
 258         responseHeaders = new HttpHeadersImpl();
 259         requestHeaders = new HttpHeadersImpl();
 260         rspHeadersConsumer = (name, value) -> {
 261             responseHeaders.addHeader(name.toString(), value.toString());
 262         };
 263         this.executor = client.executorWrapper();
 264         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 265         this.requestPseudoHeaders = new HttpHeadersImpl();
 266         // NEW
 267         this.inputQ = new Queue<>();
 268     }
 269 
 270     /**
 271      * Entry point from Http2Connection reader thread.
 272      *
 273      * Data frames will be removed by response body thread.
 274      *
 275      * @param frame
 276      * @throws IOException
 277      */
 278     void incoming(Http2Frame frame) throws IOException, InterruptedException {
 279         if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
 280             // Complete headers accumulated. handle response.
 281             // It's okay if there are multiple HeaderFrames.
 282             handleResponse();
 283         } else if (frame instanceof DataFrame) {
 284             inputQ.put(frame);
 285         } else {
 286             otherFrame(frame);
 287         }
 288     }
 289 
 290     void otherFrame(Http2Frame frame) throws IOException {
 291         switch (frame.type()) {
 292             case WindowUpdateFrame.TYPE:
 293                 incoming_windowUpdate((WindowUpdateFrame) frame);
 294                 break;
 295             case ResetFrame.TYPE:
 296                 incoming_reset((ResetFrame) frame);
 297                 break;
 298             case PriorityFrame.TYPE:
 299                 incoming_priority((PriorityFrame) frame);
 300                 break;
 301             default:
 302                 String msg = "Unexpected frame: " + frame.toString();
 303                 throw new IOException(msg);
 304         }
 305     }
 306 
 307     // The Hpack decoder decodes into one of these consumers of name,value pairs
 308 
 309     DecodingCallback rspHeadersConsumer() {
 310         return rspHeadersConsumer;
 311     }
 312 
 313     // create and return the HttpResponseImpl
 314     protected void handleResponse() throws IOException {
 315         HttpConnection c = connection.connection; // TODO: improve
 316         long statusCode = responseHeaders
 317                 .firstValueAsLong(":status")
 318                 .orElseThrow(() -> new IOException("no statuscode in response"));
 319 
 320         this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null,
 321                 c.sslParameters(), HttpClient.Version.HTTP_2, c);
 322         this.responseContentLen = responseHeaders
 323                 .firstValueAsLong("content-length")
 324                 .orElse(-1L);
 325         // different implementations for normal streams and pushed streams
 326         completeResponse(response);
 327     }
 328 
 329     void incoming_reset(ResetFrame frame) {
 330         // TODO: implement reset
 331         int error = frame.getErrorCode();
 332         IOException e = new IOException(ErrorFrame.stringForCode(error));
 333         completeResponseExceptionally(e);
 334         throw new UnsupportedOperationException("Not implemented");
 335     }
 336 
 337     void incoming_priority(PriorityFrame frame) {
 338         // TODO: implement priority
 339         throw new UnsupportedOperationException("Not implemented");
 340     }
 341 
 342     void incoming_windowUpdate(WindowUpdateFrame frame) {
 343         int amount = frame.getUpdate();
 344         if (amount > 0)
 345             remoteRequestFlowController.accept(amount);
 346     }
 347 
 348     void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
 349         if (Log.requests()) {
 350             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 351         }
 352         PushGroup<?> pushGroup = request.pushGroup();
 353         if (pushGroup == null) {
 354             cancelImpl(new IllegalStateException("unexpected push promise"));
 355         }
 356         // get the handler and call it.
 357         BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
 358             pushGroup.pushHandler();
 359 
 360         CompletableFuture<HttpResponse> pushCF = pushStream
 361                 .getResponseAsync(null)
 362                 .thenApply(r -> (HttpResponse)r);
 363         boolean accept = ph.apply(pushReq, pushCF);
 364         if (!accept) {
 365             IOException ex = new IOException("Stream cancelled by user");
 366             cancelImpl(ex);
 367             pushCF.completeExceptionally(ex);
 368         } else {
 369             pushStream.requestSent();
 370             pushGroup.addPush();
 371         }
 372     }
 373 
 374     private OutgoingHeaders headerFrame(long contentLength) {
 375         HttpHeadersImpl h = request.getSystemHeaders();
 376         if (contentLength > 0) {
 377             h.setHeader("content-length", Long.toString(contentLength));
 378         }
 379         setPseudoHeaderFields();
 380         OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this);
 381         if (contentLength == 0) {
 382             f.setFlag(HeadersFrame.END_STREAM);
 383         }
 384         return f;
 385     }
 386 
 387     private void setPseudoHeaderFields() {
 388         HttpHeadersImpl hdrs = requestPseudoHeaders;
 389         String method = request.method();
 390         hdrs.setHeader(":method", method);
 391         URI uri = request.uri();
 392         hdrs.setHeader(":scheme", uri.getScheme());
 393         // TODO: userinfo deprecated. Needs to be removed
 394         hdrs.setHeader(":authority", uri.getAuthority());
 395         // TODO: ensure header names beginning with : not in user headers
 396         String query = uri.getQuery();
 397         String path = uri.getPath();
 398         if (path == null) {
 399             if (method.equalsIgnoreCase("OPTIONS")) {
 400                 path = "*";
 401             } else {
 402                 path = "/";
 403             }
 404         }
 405         if (query != null) {
 406             path += "?" + query;
 407         }
 408         hdrs.setHeader(":path", path);
 409     }
 410 
 411     HttpHeadersImpl getRequestPseudoHeaders() {
 412         return requestPseudoHeaders;
 413     }
 414 
 415     @Override
 416     HttpResponseImpl getResponse() throws IOException {
 417         try {
 418             if (request.timeval() > 0) {
 419                 return getResponseAsync(null).get(
 420                         request.timeval(), TimeUnit.MILLISECONDS);
 421             } else {
 422                 return getResponseAsync(null).join();
 423             }
 424         } catch (TimeoutException e) {
 425             throw new HttpTimeoutException("Response timed out");
 426         } catch (InterruptedException | ExecutionException | CompletionException e) {
 427             Throwable t = e.getCause();
 428             if (t instanceof IOException) {
 429                 throw (IOException)t;
 430             }
 431             throw new IOException(e);
 432         }
 433     }
 434 
 435     @Override
 436     void sendRequest() throws IOException, InterruptedException {
 437         sendHeadersOnly();
 438         sendBody();
 439     }
 440 
 441     /**
 442      * A simple general purpose blocking flow controller
 443      */
 444     class FlowController implements LongConsumer {
 445         int permits;
 446 
 447         FlowController() {
 448             this.permits = 0;
 449         }
 450 
 451         @Override
 452         public synchronized void accept(long n) {
 453             if (n < 1) {
 454                 throw new InternalError("FlowController.accept called with " + n);
 455             }
 456             if (permits == 0) {
 457                 permits += n;
 458                 notifyAll();
 459             } else {
 460                 permits += n;
 461             }
 462         }
 463 
 464         public synchronized void take() throws InterruptedException {
 465             take(1);
 466         }
 467 
 468         public synchronized void take(int amount) throws InterruptedException {
 469             assert permits >= 0;
 470             while (permits < amount) {
 471                 int n = Math.min(amount, permits);
 472                 permits -= n;
 473                 amount -= n;
 474                 if (amount > 0)
 475                     wait();
 476             }
 477         }
 478     }
 479 
 480     @Override
 481     void sendHeadersOnly() throws IOException, InterruptedException {
 482         if (Log.requests() && request != null) {
 483             Log.logRequest(request.toString());
 484         }
 485         requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController);
 486         OutgoingHeaders f = headerFrame(requestContentLen);
 487         connection.sendFrame(f);
 488     }
 489 
 490     @Override
 491     void sendBody() throws IOException, InterruptedException {
 492         sendBodyImpl();
 493     }
 494 
 495     void registerStream(int id) {
 496         this.streamid = id;
 497         connection.putStream(this, streamid);
 498     }
 499 
 500     DataFrame getDataFrame() throws IOException, InterruptedException {
 501         userRequestFlowController.take();
 502         int maxpayloadLen = connection.getMaxSendFrameSize() - 9;
 503         ByteBuffer buffer = connection.getBuffer();
 504         buffer.limit(maxpayloadLen);
 505         boolean complete = requestProcessor.onRequestBodyChunk(buffer);
 506         buffer.flip();
 507         int amount = buffer.remaining();
 508         // wait for flow control if necessary. Following method will block
 509         // until after headers frame is sent, so correct streamid is set.
 510         remoteRequestFlowController.take(amount);
 511         connection.obtainSendWindow(amount);
 512 
 513         DataFrame df = new DataFrame();
 514         df.streamid(streamid);
 515         if (complete) {
 516             df.setFlag(DataFrame.END_STREAM);
 517         }
 518         df.setData(buffer);
 519         df.computeLength();
 520         return df;
 521     }
 522 
 523 
 524     @Override
 525     CompletableFuture<Void> sendHeadersAsync() {
 526         try {
 527             sendHeadersOnly();
 528             return CompletableFuture.completedFuture(null);
 529         } catch (IOException | InterruptedException ex) {
 530             return CompletableFuture.failedFuture(ex);
 531         }
 532     }
 533 
 534     /**
 535      * A List of responses relating to this stream. Normally there is only
 536      * one response, but intermediate responses like 100 are allowed
 537      * and must be passed up to higher level before continuing. Deals with races
 538      * such as if responses are returned before the CFs get created by
 539      * getResponseAsync()
 540      */
 541 
 542     final List<CompletableFuture<HttpResponseImpl>> response_cfs = new ArrayList<>(5);
 543 
 544     @Override
 545     CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) {
 546         CompletableFuture<HttpResponseImpl> cf;
 547         synchronized (response_cfs) {
 548             if (!response_cfs.isEmpty()) {
 549                 cf = response_cfs.remove(0);
 550             } else {
 551                 cf = new CompletableFuture<>();
 552                 response_cfs.add(cf);
 553             }
 554         }
 555         PushGroup<?> pg = request.pushGroup();
 556         if (pg != null) {
 557             // if an error occurs make sure it is recorded in the PushGroup
 558             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 559         }
 560         return cf;
 561     }
 562 
 563     /**
 564      * Completes the first uncompleted CF on list, and removes it. If there is no
 565      * uncompleted CF then creates one (completes it) and adds to list
 566      */
 567     void completeResponse(HttpResponse r) {
 568         HttpResponseImpl resp = (HttpResponseImpl)r;
 569         synchronized (response_cfs) {
 570             int cfs_len = response_cfs.size();
 571             for (int i=0; i<cfs_len; i++) {
 572                 CompletableFuture<HttpResponseImpl> cf = response_cfs.get(i);
 573                 if (!cf.isDone()) {
 574                     cf.complete(resp);
 575                     response_cfs.remove(cf);
 576                     return;
 577                 }
 578             }
 579             response_cfs.add(CompletableFuture.completedFuture(resp));
 580         }
 581     }
 582 
 583     // methods to update state and remove stream when finished
 584 
 585     synchronized void requestSent() {
 586         requestSent = true;
 587         if (responseReceived)
 588             connection.deleteStream(this);
 589     }
 590 
 591     synchronized void responseReceived() {
 592         responseReceived = true;
 593         if (requestSent)
 594             connection.deleteStream(this);
 595         PushGroup<?> pg = request.pushGroup();
 596         if (pg != null)
 597             pg.noMorePushes();
 598     }
 599 
 600     /**
 601      * same as above but for errors
 602      *
 603      * @param t
 604      */
 605     void completeResponseExceptionally(Throwable t) {
 606         synchronized (response_cfs) {
 607             for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
 608                 if (!cf.isDone()) {
 609                     cf.completeExceptionally(t);
 610                     response_cfs.remove(cf);
 611                     return;
 612                 }
 613             }
 614             response_cfs.add(CompletableFuture.failedFuture(t));
 615         }
 616     }
 617 
 618     void sendBodyImpl() throws IOException, InterruptedException {
 619         if (requestContentLen == 0) {
 620             // no body
 621             requestSent();
 622             return;
 623         }
 624         DataFrame df;
 625         do {
 626             df = getDataFrame();
 627             // TODO: check accumulated content length (if not checked below)
 628             connection.sendFrame(df);
 629         } while (!df.getFlag(DataFrame.END_STREAM));
 630         requestSent();
 631     }
 632 
 633     @Override
 634     void cancel() {
 635         cancelImpl(new Exception("Cancelled"));
 636     }
 637 
 638 
 639     void cancelImpl(Throwable e) {
 640         Log.logTrace("cancelling stream: {0}\n", e.toString());
 641         inputQ.close();
 642         completeResponseExceptionally(e);
 643         try {
 644             connection.resetStream(streamid, ResetFrame.CANCEL);
 645         } catch (IOException | InterruptedException ex) {
 646             Log.logError(ex);
 647         }
 648     }
 649 
 650     @Override
 651     CompletableFuture<Void> sendRequestAsync() {
 652         CompletableFuture<Void> cf = new CompletableFuture<>();
 653         executor.execute(() -> {
 654            try {
 655                sendRequest();
 656                cf.complete(null);
 657            } catch (IOException |InterruptedException e) {
 658                cf.completeExceptionally(e);
 659            }
 660         }, null);
 661         return cf;
 662     }
 663 
 664     @Override
 665     <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
 666         this.responseProcessor = processor;
 667         T body = processor.onResponseBodyStart(
 668                     responseContentLen, responseHeaders,
 669                     responseFlowController); // TODO: filter headers
 670         if (body == null) {
 671             receiveData();
 672             body = processor.onResponseComplete();
 673         } else
 674             receiveDataAsync(processor);
 675         responseReceived();
 676         return body;
 677     }
 678 
 679     // called from Http2Connection reader thread
 680     synchronized void updateOutgoingWindow(int update) {
 681         remoteRequestFlowController.accept(update);
 682     }
 683 
 684     void close(String msg) {
 685         cancel();
 686     }
 687 
 688     static class PushedStream extends Stream {
 689         final PushGroup<?> pushGroup;
 690         final private Stream parent;      // used by server push streams
 691         // push streams need the response CF allocated up front as it is
 692         // given directly to user via the multi handler callback function.
 693         final CompletableFuture<HttpResponseImpl> pushCF;
 694         final HttpRequestImpl pushReq;
 695 
 696         PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
 697                 Http2Connection connection, Stream parent,
 698                 HttpRequestImpl pushReq) {
 699             super(client, connection, pushReq);
 700             this.pushGroup = pushGroup;
 701             this.pushReq = pushReq;
 702             this.pushCF = new CompletableFuture<>();
 703             this.parent = parent;
 704         }
 705 
 706         // Following methods call the super class but in case of
 707         // error record it in the PushGroup. The error method is called
 708         // with a null value when no error occurred (is a no-op)
 709         @Override
 710         CompletableFuture<Void> sendBodyAsync() {
 711             return super.sendBodyAsync()
 712                         .whenComplete((v, t) -> pushGroup.pushError(t));
 713         }
 714 
 715         @Override
 716         CompletableFuture<Void> sendHeadersAsync() {
 717             return super.sendHeadersAsync()
 718                         .whenComplete((v, t) -> pushGroup.pushError(t));
 719         }
 720 
 721         @Override
 722         CompletableFuture<Void> sendRequestAsync() {
 723             return super.sendRequestAsync()
 724                         .whenComplete((v, t) -> pushGroup.pushError(t));
 725         }
 726 
 727         @Override
 728         CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) {
 729             return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
 730         }
 731 
 732         @Override
 733         <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
 734             return super.responseBodyAsync(processor)
 735                         .whenComplete((v, t) -> pushGroup.pushError(t));
 736         }
 737 
 738         @Override
 739         void completeResponse(HttpResponse r) {
 740             HttpResponseImpl resp = (HttpResponseImpl)r;
 741             Utils.logResponse(resp);
 742             pushCF.complete(resp);
 743         }
 744 
 745         @Override
 746         void completeResponseExceptionally(Throwable t) {
 747             pushCF.completeExceptionally(t);
 748         }
 749 
 750         @Override
 751         synchronized void responseReceived() {
 752             super.responseReceived();
 753             pushGroup.pushCompleted();
 754         }
 755 
 756         // create and return the PushResponseImpl
 757         @Override
 758         protected void handleResponse() {
 759             HttpConnection c = connection.connection; // TODO: improve
 760             long statusCode = responseHeaders
 761                 .firstValueAsLong(":status")
 762                 .orElse(-1L);
 763 
 764             if (statusCode == -1L)
 765                 completeResponseExceptionally(new IOException("No status code"));
 766             ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS);
 767             this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this,
 768                 c.sslParameters());
 769             this.responseContentLen = responseHeaders
 770                 .firstValueAsLong("content-length")
 771                 .orElse(-1L);
 772             // different implementations for normal streams and pushed streams
 773             completeResponse(response);
 774         }
 775     }
 776 
 777     /**
 778      * One PushGroup object is associated with the parent Stream of
 779      * the pushed Streams. This keeps track of all common state associated
 780      * with the pushes.
 781      */
 782     static class PushGroup<T> {
 783         // the overall completion object, completed when all pushes are done.
 784         final CompletableFuture<T> resultCF;
 785         Throwable error; // any exception that occured during pushes
 786 
 787         // CF for main response
 788         final CompletableFuture<HttpResponse> mainResponse;
 789 
 790         // user's processor object
 791         final HttpResponse.MultiProcessor<T> multiProcessor;
 792 
 793         // per push handler function provided by processor
 794         final private BiFunction<HttpRequest,
 795                            CompletableFuture<HttpResponse>,
 796                            Boolean> pushHandler;
 797         int numberOfPushes;
 798         int remainingPushes;
 799         boolean noMorePushes = false;
 800 
 801         PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) {
 802             this.resultCF = new CompletableFuture<>();
 803             this.mainResponse = new CompletableFuture<>();
 804             this.multiProcessor = multiProcessor;
 805             this.pushHandler = multiProcessor.onStart(req, mainResponse);
 806         }
 807 
 808         CompletableFuture<T> groupResult() {
 809             return resultCF;
 810         }
 811 
 812         CompletableFuture<HttpResponse> mainResponse() {
 813             return mainResponse;
 814         }
 815 
 816         private BiFunction<HttpRequest,
 817             CompletableFuture<HttpResponse>, Boolean> pushHandler()
 818         {
 819                 return pushHandler;
 820         }
 821 
 822         synchronized void addPush() {
 823             numberOfPushes++;
 824             remainingPushes++;
 825         }
 826 
 827         synchronized int numberOfPushes() {
 828             return numberOfPushes;
 829         }
 830         // This is called when the main body response completes because it means
 831         // no more PUSH_PROMISEs are possible
 832         synchronized void noMorePushes() {
 833             noMorePushes = true;
 834             checkIfCompleted();
 835         }
 836 
 837         synchronized void pushCompleted() {
 838             remainingPushes--;
 839             checkIfCompleted();
 840         }
 841 
 842         synchronized void checkIfCompleted() {
 843             if (remainingPushes == 0 && error == null && noMorePushes) {
 844                 T overallResult = multiProcessor.onComplete();
 845                 resultCF.complete(overallResult);
 846             }
 847         }
 848 
 849         synchronized void pushError(Throwable t) {
 850             if (t == null)
 851                 return;
 852             this.error = t;
 853             resultCF.completeExceptionally(t);
 854         }
 855     }
 856 }