1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.net.http;
  27 
  28 import sun.net.httpclient.hpack.DecodingCallback;
  29 
  30 import java.io.IOException;
  31 import java.net.URI;
  32 import java.nio.ByteBuffer;
  33 import java.util.LinkedList;
  34 import java.util.ArrayList;
  35 import java.util.List;
  36 import java.util.concurrent.CompletableFuture;
  37 import java.util.concurrent.CompletionException;
  38 import java.util.concurrent.ExecutionException;
  39 import java.util.concurrent.TimeUnit;
  40 import java.util.concurrent.TimeoutException;
  41 import java.util.function.BiFunction;
  42 import java.util.function.LongConsumer;
  43 
  44 /**
  45  * Http/2 Stream handling.
  46  *
  47  * REQUESTS
  48  *
  49  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  50  *
  51  * sendRequest() -- sendHeadersOnly() + sendBody()
  52  *
  53  * sendBody() -- in calling thread: obeys all flow control (so may block)
  54  *               obtains data from request body processor and places on connection
  55  *               outbound Q.
  56  *
  57  * sendBodyAsync() -- calls sendBody() in an executor thread.
  58  *
  59  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  60  *
  61  * sendRequestAsync() -- calls sendRequest() in an executor thread
  62  *
  63  * RESPONSES
  64  *
  65  * Multiple responses can be received per request. Responses are queued up on
  66  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
  67  * with the next response
  68  *
  69  * getResponseAsync() -- queries list of response CFs and returns first one
  70  *               if one exists. Otherwise, creates one and adds it to list
  71  *               and returns it. Completion is achieved through the
  72  *               incoming() upcall from connection reader thread.
  73  *
  74  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  75  *
  76  * responseBody() -- in calling thread: blocks for incoming DATA frames on
  77  *               stream inputQ. Obeys remote and local flow control so may block.
  78  *               Calls user response body processor with data buffers.
  79  *
  80  * responseBodyAsync() -- calls responseBody() in an executor thread.
  81  *
  82  * incoming() -- entry point called from connection reader thread. Frames are
  83  *               either handled immediately without blocking or for data frames
  84  *               placed on the stream's inputQ which is consumed by the stream's
  85  *               reader thread.
  86  *
  87  * PushedStream sub class
  88  * ======================
  89  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  90  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  91  * is created. PushedStream does not use responseCF list as there can be only
  92  * one response. The CF is created when the object created and when the response
  93  * HEADERS frame is received the object is completed.
  94  */
  95 class Stream extends ExchangeImpl {
  96 
  97     final Queue<Http2Frame> inputQ;
  98 
  99     volatile int streamid;
 100 
 101     long responseContentLen = -1;
 102     long responseBytesProcessed = 0;
 103     long requestContentLen;
 104 
 105     Http2Connection connection;
 106     HttpClientImpl client;
 107     final HttpRequestImpl request;
 108     final DecodingCallback rspHeadersConsumer;
 109     HttpHeadersImpl responseHeaders;
 110     final HttpHeadersImpl requestHeaders;
 111     final HttpHeadersImpl requestPseudoHeaders;
 112     HttpResponse.BodyProcessor<?> responseProcessor;
 113     final HttpRequest.BodyProcessor requestProcessor;
 114     HttpResponse response;
 115 
 116     // state flags
 117     boolean requestSent, responseReceived;
 118 
 119     final FlowController userRequestFlowController =
 120             new FlowController();
 121     final FlowController responseFlowController =
 122             new FlowController();
 123     final WindowControl outgoingWindow =  new WindowControl();
 124 
 125     final ExecutorWrapper executor;
 126 
 127     @Override
 128     @SuppressWarnings("unchecked")
 129     <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
 130         this.responseProcessor = processor;
 131         CompletableFuture<T> cf;
 132         try {
 133             T body = processor.onResponseBodyStart(
 134                     responseContentLen, responseHeaders,
 135                     responseFlowController); // TODO: filter headers
 136             if (body != null) {
 137                 cf = CompletableFuture.completedFuture(body);
 138                 receiveDataAsync(processor);
 139             } else
 140                 cf = receiveDataAsync(processor);
 141         } catch (IOException e) {
 142             cf = CompletableFuture.failedFuture(e);
 143         }
 144         PushGroup<?> pg = request.pushGroup();
 145         if (pg != null) {
 146             // if an error occurs make sure it is recorded in the PushGroup
 147             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 148         }
 149         return cf;
 150     }
 151 
 152     @Override
 153     public String toString() {
 154         StringBuilder sb = new StringBuilder();
 155         sb.append("streamid: ")
 156                 .append(streamid);
 157         return sb.toString();
 158     }
 159 
 160     // pushes entire response body into response processor
 161     // blocking when required by local or remote flow control
 162     void receiveData() throws IOException {
 163         Http2Frame frame;
 164         DataFrame df = null;
 165         try {
 166             do {
 167                 frame = inputQ.take();
 168                 if (!(frame instanceof DataFrame)) {
 169                     assert false;
 170                     continue;
 171                 }
 172                 df = (DataFrame) frame;
 173                 int len = df.getDataLength();
 174                 ByteBuffer[] buffers = df.getData();
 175                 for (ByteBuffer b : buffers) {
 176                     responseFlowController.take();
 177                     responseProcessor.onResponseBodyChunk(b);
 178                 }
 179                 sendWindowUpdate(len);
 180             } while (!df.getFlag(DataFrame.END_STREAM));
 181         } catch (InterruptedException e) {
 182             throw new IOException(e);
 183         }
 184     }
 185 
 186     private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
 187         CompletableFuture<T> cf = new CompletableFuture<>();
 188         executor.execute(() -> {
 189             try {
 190                 receiveData();
 191                 T body = processor.onResponseComplete();
 192                 cf.complete(body);
 193                 responseReceived();
 194             } catch (Throwable t) {
 195                 cf.completeExceptionally(t);
 196             }
 197         }, null);
 198         return cf;
 199     }
 200 
 201     private void sendWindowUpdate(int increment)
 202             throws IOException, InterruptedException {
 203         if (increment == 0)
 204             return;
 205         LinkedList<Http2Frame> list = new LinkedList<>();
 206         WindowUpdateFrame frame = new WindowUpdateFrame();
 207         frame.streamid(streamid);
 208         frame.setUpdate(increment);
 209         list.add(frame);
 210         frame = new WindowUpdateFrame();
 211         frame.streamid(0);
 212         frame.setUpdate(increment);
 213         list.add(frame);
 214         connection.sendFrames(list);
 215     }
 216 
 217     @Override
 218     CompletableFuture<Void> sendBodyAsync() {
 219         final CompletableFuture<Void> cf = new CompletableFuture<>();
 220         executor.execute(() -> {
 221             try {
 222                 sendBodyImpl();
 223                 cf.complete(null);
 224             } catch (IOException | InterruptedException e) {
 225                 cf.completeExceptionally(e);
 226             }
 227         }, null);
 228         return cf;
 229     }
 230 
 231     @SuppressWarnings("unchecked")
 232     Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
 233         super(e);
 234         this.client = client;
 235         this.connection = connection;
 236         this.request = e.request();
 237         this.requestProcessor = request.requestProcessor();
 238         responseHeaders = new HttpHeadersImpl();
 239         requestHeaders = new HttpHeadersImpl();
 240         rspHeadersConsumer = (name, value) -> {
 241             responseHeaders.addHeader(name.toString(), value.toString());
 242         };
 243         this.executor = client.executorWrapper();
 244         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 245         this.requestPseudoHeaders = new HttpHeadersImpl();
 246         // NEW
 247         this.inputQ = new Queue<>();
 248     }
 249 
 250     @SuppressWarnings("unchecked")
 251     Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
 252         super(null);
 253         this.client = client;
 254         this.connection = connection;
 255         this.request = req;
 256         this.requestProcessor = null;
 257         responseHeaders = new HttpHeadersImpl();
 258         requestHeaders = new HttpHeadersImpl();
 259         rspHeadersConsumer = (name, value) -> {
 260             responseHeaders.addHeader(name.toString(), value.toString());
 261         };
 262         this.executor = client.executorWrapper();
 263         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 264         this.requestPseudoHeaders = new HttpHeadersImpl();
 265         // NEW
 266         this.inputQ = new Queue<>();
 267     }
 268 
 269     /**
 270      * Entry point from Http2Connection reader thread.
 271      *
 272      * Data frames will be removed by response body thread.
 273      *
 274      * @param frame
 275      * @throws IOException
 276      */
 277     void incoming(Http2Frame frame) throws IOException, InterruptedException {
 278         if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
 279             // Complete headers accumulated. handle response.
 280             // It's okay if there are multiple HeaderFrames.
 281             handleResponse();
 282         } else if (frame instanceof DataFrame) {
 283             inputQ.put(frame);
 284         } else {
 285             otherFrame(frame);
 286         }
 287     }
 288 
 289     void otherFrame(Http2Frame frame) throws IOException {
 290         switch (frame.type()) {
 291             case WindowUpdateFrame.TYPE:
 292                 incoming_windowUpdate((WindowUpdateFrame) frame);
 293                 break;
 294             case ResetFrame.TYPE:
 295                 incoming_reset((ResetFrame) frame);
 296                 break;
 297             case PriorityFrame.TYPE:
 298                 incoming_priority((PriorityFrame) frame);
 299                 break;
 300             default:
 301                 String msg = "Unexpected frame: " + frame.toString();
 302                 throw new IOException(msg);
 303         }
 304     }
 305 
 306     // The Hpack decoder decodes into one of these consumers of name,value pairs
 307 
 308     DecodingCallback rspHeadersConsumer() {
 309         return rspHeadersConsumer;
 310     }
 311 
 312     // create and return the HttpResponseImpl
 313     protected void handleResponse() throws IOException {
 314         HttpConnection c = connection.connection; // TODO: improve
 315         long statusCode = responseHeaders
 316                 .firstValueAsLong(":status")
 317                 .orElseThrow(() -> new IOException("no statuscode in response"));
 318 
 319         this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null,
 320                 c.sslParameters(), HttpClient.Version.HTTP_2, c);
 321         this.responseContentLen = responseHeaders
 322                 .firstValueAsLong("content-length")
 323                 .orElse(-1L);
 324         // different implementations for normal streams and pushed streams
 325         completeResponse(response);
 326     }
 327 
 328     void incoming_reset(ResetFrame frame) {
 329         // TODO: implement reset
 330         int error = frame.getErrorCode();
 331         IOException e = new IOException(ErrorFrame.stringForCode(error));
 332         completeResponseExceptionally(e);
 333         throw new UnsupportedOperationException("Not implemented");
 334     }
 335 
 336     void incoming_priority(PriorityFrame frame) {
 337         // TODO: implement priority
 338         throw new UnsupportedOperationException("Not implemented");
 339     }
 340 
 341     void incoming_windowUpdate(WindowUpdateFrame frame) {
 342         outgoingWindow.update(frame.getUpdate());
 343     }
 344 
 345     void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
 346         if (Log.requests()) {
 347             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 348         }
 349         PushGroup<?> pushGroup = request.pushGroup();
 350         if (pushGroup == null) {
 351             cancelImpl(new IllegalStateException("unexpected push promise"));
 352         }
 353         // get the handler and call it.
 354         BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
 355             pushGroup.pushHandler();
 356 
 357         CompletableFuture<HttpResponse> pushCF = pushStream
 358                 .getResponseAsync(null)
 359                 .thenApply(r -> (HttpResponse)r);
 360         boolean accept = ph.apply(pushReq, pushCF);
 361         if (!accept) {
 362             IOException ex = new IOException("Stream cancelled by user");
 363             cancelImpl(ex);
 364             pushCF.completeExceptionally(ex);
 365         } else {
 366             pushStream.requestSent();
 367             pushGroup.addPush();
 368         }
 369     }
 370 
 371     private OutgoingHeaders headerFrame(long contentLength) {
 372         HttpHeadersImpl h = request.getSystemHeaders();
 373         if (contentLength > 0) {
 374             h.setHeader("content-length", Long.toString(contentLength));
 375         }
 376         setPseudoHeaderFields();
 377         OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this);
 378         if (contentLength == 0) {
 379             f.setFlag(HeadersFrame.END_STREAM);
 380         }
 381         return f;
 382     }
 383 
 384     private void setPseudoHeaderFields() {
 385         HttpHeadersImpl hdrs = requestPseudoHeaders;
 386         String method = request.method();
 387         hdrs.setHeader(":method", method);
 388         URI uri = request.uri();
 389         hdrs.setHeader(":scheme", uri.getScheme());
 390         // TODO: userinfo deprecated. Needs to be removed
 391         hdrs.setHeader(":authority", uri.getAuthority());
 392         // TODO: ensure header names beginning with : not in user headers
 393         String query = uri.getQuery();
 394         String path = uri.getPath();
 395         if (path == null) {
 396             if (method.equalsIgnoreCase("OPTIONS")) {
 397                 path = "*";
 398             } else {
 399                 path = "/";
 400             }
 401         }
 402         if (query != null) {
 403             path += "?" + query;
 404         }
 405         hdrs.setHeader(":path", path);
 406     }
 407 
 408     HttpHeadersImpl getRequestPseudoHeaders() {
 409         return requestPseudoHeaders;
 410     }
 411 
 412     @Override
 413     HttpResponseImpl getResponse() throws IOException {
 414         try {
 415             if (request.timeval() > 0) {
 416                 return getResponseAsync(null).get(
 417                         request.timeval(), TimeUnit.MILLISECONDS);
 418             } else {
 419                 return getResponseAsync(null).join();
 420             }
 421         } catch (TimeoutException e) {
 422             throw new HttpTimeoutException("Response timed out");
 423         } catch (InterruptedException | ExecutionException | CompletionException e) {
 424             Throwable t = e.getCause();
 425             if (t instanceof IOException) {
 426                 throw (IOException)t;
 427             }
 428             throw new IOException(e);
 429         }
 430     }
 431 
 432     @Override
 433     void sendRequest() throws IOException, InterruptedException {
 434         sendHeadersOnly();
 435         sendBody();
 436     }
 437 
 438     /**
 439      * A simple general purpose blocking flow controller
 440      */
 441     class FlowController implements LongConsumer {
 442         int permits;
 443 
 444         FlowController() {
 445             this.permits = 0;
 446         }
 447 
 448         @Override
 449         public synchronized void accept(long n) {
 450             if (n < 1) {
 451                 throw new InternalError("FlowController.accept called with " + n);
 452             }
 453             if (permits == 0) {
 454                 permits += n;
 455                 notifyAll();
 456             } else {
 457                 permits += n;
 458             }
 459         }
 460 
 461         public synchronized void take() throws InterruptedException {
 462             take(1);
 463         }
 464 
 465         public synchronized void take(int amount) throws InterruptedException {
 466             assert permits >= 0;
 467             while (amount > 0) {
 468                 int n = Math.min(amount, permits);
 469                 permits -= n;
 470                 amount -= n;
 471                 if (amount > 0)
 472                     wait();
 473             }
 474         }
 475     }
 476 
 477     @Override
 478     void sendHeadersOnly() throws IOException, InterruptedException {
 479         if (Log.requests() && request != null) {
 480             Log.logRequest(request.toString());
 481         }
 482         requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController);
 483         OutgoingHeaders f = headerFrame(requestContentLen);
 484         connection.sendFrame(f);
 485     }
 486 
 487     @Override
 488     void sendBody() throws IOException, InterruptedException {
 489         sendBodyImpl();
 490     }
 491 
 492     void registerStream(int id) {
 493         this.streamid = id;
 494         connection.putStream(this, streamid);
 495     }
 496 
 497     DataFrame getDataFrame() throws IOException, InterruptedException {
 498         userRequestFlowController.take();
 499         int maxpayloadLen = connection.getMaxSendFrameSize();
 500         ByteBuffer buffer = connection.getBuffer();
 501         buffer.limit(maxpayloadLen);
 502         boolean complete = requestProcessor.onRequestBodyChunk(buffer);
 503         buffer.flip();
 504         int amount = buffer.remaining();
 505         // wait for flow control if necessary. Following method will block
 506         // until after headers frame is sent, so correct streamid is set.
 507         outgoingWindow.acquire(amount);
 508         connection.connectionSendWindow.acquire(amount);
 509         DataFrame df = new DataFrame();
 510         df.streamid(streamid);
 511         if (complete) {
 512             df.setFlag(DataFrame.END_STREAM);
 513         }
 514         df.setData(buffer);
 515         df.computeLength();
 516         return df;
 517     }
 518 
 519 
 520     @Override
 521     CompletableFuture<Void> sendHeadersAsync() {
 522         try {
 523             sendHeadersOnly();
 524             return CompletableFuture.completedFuture(null);
 525         } catch (IOException | InterruptedException ex) {
 526             return CompletableFuture.failedFuture(ex);
 527         }
 528     }
 529 
 530     /**
 531      * A List of responses relating to this stream. Normally there is only
 532      * one response, but intermediate responses like 100 are allowed
 533      * and must be passed up to higher level before continuing. Deals with races
 534      * such as if responses are returned before the CFs get created by
 535      * getResponseAsync()
 536      */
 537 
 538     final List<CompletableFuture<HttpResponseImpl>> response_cfs = new ArrayList<>(5);
 539 
 540     @Override
 541     CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) {
 542         CompletableFuture<HttpResponseImpl> cf;
 543         synchronized (response_cfs) {
 544             if (!response_cfs.isEmpty()) {
 545                 cf = response_cfs.remove(0);
 546             } else {
 547                 cf = new CompletableFuture<>();
 548                 response_cfs.add(cf);
 549             }
 550         }
 551         PushGroup<?> pg = request.pushGroup();
 552         if (pg != null) {
 553             // if an error occurs make sure it is recorded in the PushGroup
 554             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 555         }
 556         return cf;
 557     }
 558 
 559     /**
 560      * Completes the first uncompleted CF on list, and removes it. If there is no
 561      * uncompleted CF then creates one (completes it) and adds to list
 562      */
 563     void completeResponse(HttpResponse r) {
 564         HttpResponseImpl resp = (HttpResponseImpl)r;
 565         synchronized (response_cfs) {
 566             int cfs_len = response_cfs.size();
 567             for (int i=0; i<cfs_len; i++) {
 568                 CompletableFuture<HttpResponseImpl> cf = response_cfs.get(i);
 569                 if (!cf.isDone()) {
 570                     cf.complete(resp);
 571                     response_cfs.remove(cf);
 572                     return;
 573                 }
 574             }
 575             response_cfs.add(CompletableFuture.completedFuture(resp));
 576         }
 577     }
 578 
 579     // methods to update state and remove stream when finished
 580 
 581     synchronized void requestSent() {
 582         requestSent = true;
 583         if (responseReceived)
 584             connection.deleteStream(this);
 585     }
 586 
 587     synchronized void responseReceived() {
 588         responseReceived = true;
 589         if (requestSent)
 590             connection.deleteStream(this);
 591         PushGroup<?> pg = request.pushGroup();
 592         if (pg != null)
 593             pg.noMorePushes();
 594     }
 595 
 596     /**
 597      * same as above but for errors
 598      *
 599      * @param t
 600      */
 601     void completeResponseExceptionally(Throwable t) {
 602         synchronized (response_cfs) {
 603             for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
 604                 if (!cf.isDone()) {
 605                     cf.completeExceptionally(t);
 606                     response_cfs.remove(cf);
 607                     return;
 608                 }
 609             }
 610             response_cfs.add(CompletableFuture.failedFuture(t));
 611         }
 612     }
 613 
 614     void sendBodyImpl() throws IOException, InterruptedException {
 615         if (requestContentLen == 0) {
 616             // no body
 617             requestSent();
 618             return;
 619         }
 620         DataFrame df;
 621         do {
 622             df = getDataFrame();
 623             // TODO: check accumulated content length (if not checked below)
 624             connection.sendFrame(df);
 625         } while (!df.getFlag(DataFrame.END_STREAM));
 626         requestSent();
 627     }
 628 
 629     @Override
 630     void cancel() {
 631         cancelImpl(new Exception("Cancelled"));
 632     }
 633 
 634 
 635     void cancelImpl(Throwable e) {
 636         Log.logTrace("cancelling stream: {0}\n", e.toString());
 637         inputQ.close();
 638         completeResponseExceptionally(e);
 639         try {
 640             connection.resetStream(streamid, ResetFrame.CANCEL);
 641         } catch (IOException | InterruptedException ex) {
 642             Log.logError(ex);
 643         }
 644     }
 645 
 646     @Override
 647     CompletableFuture<Void> sendRequestAsync() {
 648         CompletableFuture<Void> cf = new CompletableFuture<>();
 649         executor.execute(() -> {
 650            try {
 651                sendRequest();
 652                cf.complete(null);
 653            } catch (IOException |InterruptedException e) {
 654                cf.completeExceptionally(e);
 655            }
 656         }, null);
 657         return cf;
 658     }
 659 
 660     @Override
 661     <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
 662         this.responseProcessor = processor;
 663         T body = processor.onResponseBodyStart(
 664                     responseContentLen, responseHeaders,
 665                     responseFlowController); // TODO: filter headers
 666         if (body == null) {
 667             receiveData();
 668             body = processor.onResponseComplete();
 669         } else
 670             receiveDataAsync(processor);
 671         responseReceived();
 672         return body;
 673     }
 674 
 675     // called from Http2Connection reader thread
 676     void updateOutgoingWindow(int update) {
 677         outgoingWindow.update(update);
 678     }
 679 
 680     void close(String msg) {
 681         cancel();
 682     }
 683 
 684     static class PushedStream extends Stream {
 685         final PushGroup<?> pushGroup;
 686         final private Stream parent;      // used by server push streams
 687         // push streams need the response CF allocated up front as it is
 688         // given directly to user via the multi handler callback function.
 689         final CompletableFuture<HttpResponseImpl> pushCF;
 690         final HttpRequestImpl pushReq;
 691 
 692         PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
 693                 Http2Connection connection, Stream parent,
 694                 HttpRequestImpl pushReq) {
 695             super(client, connection, pushReq);
 696             this.pushGroup = pushGroup;
 697             this.pushReq = pushReq;
 698             this.pushCF = new CompletableFuture<>();
 699             this.parent = parent;
 700         }
 701 
 702         // Following methods call the super class but in case of
 703         // error record it in the PushGroup. The error method is called
 704         // with a null value when no error occurred (is a no-op)
 705         @Override
 706         CompletableFuture<Void> sendBodyAsync() {
 707             return super.sendBodyAsync()
 708                         .whenComplete((v, t) -> pushGroup.pushError(t));
 709         }
 710 
 711         @Override
 712         CompletableFuture<Void> sendHeadersAsync() {
 713             return super.sendHeadersAsync()
 714                         .whenComplete((v, t) -> pushGroup.pushError(t));
 715         }
 716 
 717         @Override
 718         CompletableFuture<Void> sendRequestAsync() {
 719             return super.sendRequestAsync()
 720                         .whenComplete((v, t) -> pushGroup.pushError(t));
 721         }
 722 
 723         @Override
 724         CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) {
 725             return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
 726         }
 727 
 728         @Override
 729         <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
 730             return super.responseBodyAsync(processor)
 731                         .whenComplete((v, t) -> pushGroup.pushError(t));
 732         }
 733 
 734         @Override
 735         void completeResponse(HttpResponse r) {
 736             HttpResponseImpl resp = (HttpResponseImpl)r;
 737             Utils.logResponse(resp);
 738             pushCF.complete(resp);
 739         }
 740 
 741         @Override
 742         void completeResponseExceptionally(Throwable t) {
 743             pushCF.completeExceptionally(t);
 744         }
 745 
 746         @Override
 747         synchronized void responseReceived() {
 748             super.responseReceived();
 749             pushGroup.pushCompleted();
 750         }
 751 
 752         // create and return the PushResponseImpl
 753         @Override
 754         protected void handleResponse() {
 755             HttpConnection c = connection.connection; // TODO: improve
 756             long statusCode = responseHeaders
 757                 .firstValueAsLong(":status")
 758                 .orElse(-1L);
 759 
 760             if (statusCode == -1L)
 761                 completeResponseExceptionally(new IOException("No status code"));
 762             ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS);
 763             this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this,
 764                 c.sslParameters());
 765             this.responseContentLen = responseHeaders
 766                 .firstValueAsLong("content-length")
 767                 .orElse(-1L);
 768             // different implementations for normal streams and pushed streams
 769             completeResponse(response);
 770         }
 771     }
 772 
 773     /**
 774      * One PushGroup object is associated with the parent Stream of
 775      * the pushed Streams. This keeps track of all common state associated
 776      * with the pushes.
 777      */
 778     static class PushGroup<T> {
 779         // the overall completion object, completed when all pushes are done.
 780         final CompletableFuture<T> resultCF;
 781         Throwable error; // any exception that occured during pushes
 782 
 783         // CF for main response
 784         final CompletableFuture<HttpResponse> mainResponse;
 785 
 786         // user's processor object
 787         final HttpResponse.MultiProcessor<T> multiProcessor;
 788 
 789         // per push handler function provided by processor
 790         final private BiFunction<HttpRequest,
 791                            CompletableFuture<HttpResponse>,
 792                            Boolean> pushHandler;
 793         int numberOfPushes;
 794         int remainingPushes;
 795         boolean noMorePushes = false;
 796 
 797         PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) {
 798             this.resultCF = new CompletableFuture<>();
 799             this.mainResponse = new CompletableFuture<>();
 800             this.multiProcessor = multiProcessor;
 801             this.pushHandler = multiProcessor.onStart(req, mainResponse);
 802         }
 803 
 804         CompletableFuture<T> groupResult() {
 805             return resultCF;
 806         }
 807 
 808         CompletableFuture<HttpResponse> mainResponse() {
 809             return mainResponse;
 810         }
 811 
 812         private BiFunction<HttpRequest,
 813             CompletableFuture<HttpResponse>, Boolean> pushHandler()
 814         {
 815                 return pushHandler;
 816         }
 817 
 818         synchronized void addPush() {
 819             numberOfPushes++;
 820             remainingPushes++;
 821         }
 822 
 823         synchronized int numberOfPushes() {
 824             return numberOfPushes;
 825         }
 826         // This is called when the main body response completes because it means
 827         // no more PUSH_PROMISEs are possible
 828         synchronized void noMorePushes() {
 829             noMorePushes = true;
 830             checkIfCompleted();
 831         }
 832 
 833         synchronized void pushCompleted() {
 834             remainingPushes--;
 835             checkIfCompleted();
 836         }
 837 
 838         synchronized void checkIfCompleted() {
 839             if (remainingPushes == 0 && error == null && noMorePushes) {
 840                 T overallResult = multiProcessor.onComplete();
 841                 resultCF.complete(overallResult);
 842             }
 843         }
 844 
 845         synchronized void pushError(Throwable t) {
 846             if (t == null)
 847                 return;
 848             this.error = t;
 849             resultCF.completeExceptionally(t);
 850         }
 851     }
 852 }