1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package java.net.http;
  27 
  28 import sun.net.httpclient.hpack.DecodingCallback;
  29 
  30 import java.io.IOException;
  31 import java.net.URI;
  32 import java.nio.ByteBuffer;
  33 import java.util.LinkedList;
  34 import java.util.ArrayList;
  35 import java.util.List;
  36 import java.util.concurrent.CompletableFuture;
  37 import java.util.concurrent.CompletionException;
  38 import java.util.concurrent.ExecutionException;
  39 import java.util.concurrent.TimeUnit;
  40 import java.util.concurrent.TimeoutException;
  41 import java.util.function.BiFunction;
  42 import java.util.function.LongConsumer;
  43 
  44 /**
  45  * Http/2 Stream handling.
  46  *
  47  * REQUESTS
  48  *
  49  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  50  *
  51  * sendRequest() -- sendHeadersOnly() + sendBody()
  52  *
  53  * sendBody() -- in calling thread: obeys all flow control (so may block)
  54  *               obtains data from request body processor and places on connection
  55  *               outbound Q.
  56  *
  57  * sendBodyAsync() -- calls sendBody() in an executor thread.
  58  *
  59  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  60  *
  61  * sendRequestAsync() -- calls sendRequest() in an executor thread
  62  *
  63  * RESPONSES
  64  *
  65  * Multiple responses can be received per request. Responses are queued up on
  66  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
  67  * with the next response
  68  *
  69  * getResponseAsync() -- queries list of response CFs and returns first one
  70  *               if one exists. Otherwise, creates one and adds it to list
  71  *               and returns it. Completion is achieved through the
  72  *               incoming() upcall from connection reader thread.
  73  *
  74  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  75  *
  76  * responseBody() -- in calling thread: blocks for incoming DATA frames on
  77  *               stream inputQ. Obeys remote and local flow control so may block.
  78  *               Calls user response body processor with data buffers.
  79  *
  80  * responseBodyAsync() -- calls responseBody() in an executor thread.
  81  *
  82  * incoming() -- entry point called from connection reader thread. Frames are
  83  *               either handled immediately without blocking or for data frames
  84  *               placed on the stream's inputQ which is consumed by the stream's
  85  *               reader thread.
  86  *
  87  * PushedStream sub class
  88  * ======================
  89  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  90  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  91  * is created. PushedStream does not use responseCF list as there can be only
  92  * one response. The CF is created when the object created and when the response
  93  * HEADERS frame is received the object is completed.
  94  */
  95 class Stream extends ExchangeImpl {
  96 
  97     final ClosableBlockingQueue<DataFrame> inputQ = new ClosableBlockingQueue<>();
  98 
  99     volatile int streamid;
 100 
 101     long responseContentLen = -1;
 102     long responseBytesProcessed = 0;
 103     long requestContentLen;
 104 
 105     Http2Connection connection;
 106     HttpClientImpl client;
 107     final HttpRequestImpl request;
 108     final DecodingCallback rspHeadersConsumer;
 109     HttpHeadersImpl responseHeaders;
 110     final HttpHeadersImpl requestHeaders;
 111     final HttpHeadersImpl requestPseudoHeaders;
 112     HttpResponse.BodyProcessor<?> responseProcessor;
 113     final HttpRequest.BodyProcessor requestProcessor;
 114     HttpResponse response;
 115     final WindowUpdateSender windowUpdater;
 116     // state flags
 117     boolean requestSent, responseReceived;
 118 
 119     final FlowController userRequestFlowController =
 120             new FlowController();
 121     final FlowController responseFlowController =
 122             new FlowController();
 123     final WindowControl outgoingWindow =  new WindowControl();
 124 
 125     final ExecutorWrapper executor;
 126 
 127     @Override
 128     @SuppressWarnings("unchecked")
 129     <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
 130         this.responseProcessor = processor;
 131         CompletableFuture<T> cf;
 132         try {
 133             T body = processor.onResponseBodyStart(
 134                     responseContentLen, responseHeaders,
 135                     responseFlowController); // TODO: filter headers
 136             if (body != null) {
 137                 cf = CompletableFuture.completedFuture(body);
 138                 receiveDataAsync(processor);
 139             } else
 140                 cf = receiveDataAsync(processor);
 141         } catch (IOException e) {
 142             cf = CompletableFuture.failedFuture(e);
 143         }
 144         PushGroup<?> pg = request.pushGroup();
 145         if (pg != null) {
 146             // if an error occurs make sure it is recorded in the PushGroup
 147             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 148         }
 149         return cf;
 150     }
 151 
 152     @Override
 153     public String toString() {
 154         StringBuilder sb = new StringBuilder();
 155         sb.append("streamid: ")
 156                 .append(streamid);
 157         return sb.toString();
 158     }
 159 
 160     // pushes entire response body into response processor
 161     // blocking when required by local or remote flow control
 162     void receiveData() throws IOException {
 163         DataFrame df = null;
 164         try {
 165             boolean endOfStream;
 166             do {
 167                 df = inputQ.take();
 168                 endOfStream = df.getFlag(DataFrame.END_STREAM);
 169                 int len = df.getDataLength();
 170                 ByteBuffer[] buffers = df.getData();
 171                 for (ByteBuffer b : buffers) {
 172                     responseFlowController.take();
 173                     responseProcessor.onResponseBodyChunk(b);
 174                 }
 175                 connection.windowUpdater.update(len);
 176                 if(!endOfStream) {
 177                     // if we got the last frame in the stream we shouldn't send WindowUpdate
 178                     windowUpdater.update(len);
 179                 }
 180             } while (!endOfStream);
 181         } catch (InterruptedException e) {
 182             throw new IOException(e);
 183         }
 184     }
 185 
 186     private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
 187         CompletableFuture<T> cf = new CompletableFuture<>();
 188         executor.execute(() -> {
 189             try {
 190                 receiveData();
 191                 T body = processor.onResponseComplete();
 192                 cf.complete(body);
 193                 responseReceived();
 194             } catch (Throwable t) {
 195                 cf.completeExceptionally(t);
 196             }
 197         }, null);
 198         return cf;
 199     }
 200 
 201     @Override
 202     CompletableFuture<Void> sendBodyAsync() {
 203         final CompletableFuture<Void> cf = new CompletableFuture<>();
 204         executor.execute(() -> {
 205             try {
 206                 sendBodyImpl();
 207                 cf.complete(null);
 208             } catch (IOException | InterruptedException e) {
 209                 cf.completeExceptionally(e);
 210             }
 211         }, null);
 212         return cf;
 213     }
 214 
 215     @SuppressWarnings("unchecked")
 216     Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
 217         super(e);
 218         this.client = client;
 219         this.connection = connection;
 220         this.request = e.request();
 221         this.requestProcessor = request.requestProcessor();
 222         responseHeaders = new HttpHeadersImpl();
 223         requestHeaders = new HttpHeadersImpl();
 224         rspHeadersConsumer = (name, value) -> {
 225             responseHeaders.addHeader(name.toString(), value.toString());
 226         };
 227         this.executor = client.executorWrapper();
 228         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 229         this.requestPseudoHeaders = new HttpHeadersImpl();
 230         // NEW
 231         this.windowUpdater = new StreamWindowUpdateSender(connection);
 232     }
 233 
 234     @SuppressWarnings("unchecked")
 235     Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
 236         super(null);
 237         this.client = client;
 238         this.connection = connection;
 239         this.request = req;
 240         this.requestProcessor = null;
 241         responseHeaders = new HttpHeadersImpl();
 242         requestHeaders = new HttpHeadersImpl();
 243         rspHeadersConsumer = (name, value) -> {
 244             responseHeaders.addHeader(name.toString(), value.toString());
 245         };
 246         this.executor = client.executorWrapper();
 247         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 248         this.requestPseudoHeaders = new HttpHeadersImpl();
 249         // NEW
 250         this.windowUpdater = new StreamWindowUpdateSender(connection);
 251     }
 252 
 253     /**
 254      * Entry point from Http2Connection reader thread.
 255      *
 256      * Data frames will be removed by response body thread.
 257      *
 258      * @param frame
 259      * @throws IOException
 260      */
 261     void incoming(Http2Frame frame) throws IOException, InterruptedException {
 262         if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
 263             // Complete headers accumulated. handle response.
 264             // It's okay if there are multiple HeaderFrames.
 265             handleResponse();
 266         } else if (frame instanceof DataFrame) {
 267             inputQ.put((DataFrame)frame);
 268         } else {
 269             otherFrame(frame);
 270         }
 271     }
 272 
 273     void otherFrame(Http2Frame frame) throws IOException {
 274         switch (frame.type()) {
 275             case WindowUpdateFrame.TYPE:
 276                 incoming_windowUpdate((WindowUpdateFrame) frame);
 277                 break;
 278             case ResetFrame.TYPE:
 279                 incoming_reset((ResetFrame) frame);
 280                 break;
 281             case PriorityFrame.TYPE:
 282                 incoming_priority((PriorityFrame) frame);
 283                 break;
 284             default:
 285                 String msg = "Unexpected frame: " + frame.toString();
 286                 throw new IOException(msg);
 287         }
 288     }
 289 
 290     // The Hpack decoder decodes into one of these consumers of name,value pairs
 291 
 292     DecodingCallback rspHeadersConsumer() {
 293         return rspHeadersConsumer;
 294     }
 295 
 296     // create and return the HttpResponseImpl
 297     protected void handleResponse() throws IOException {
 298         HttpConnection c = connection.connection; // TODO: improve
 299         long statusCode = responseHeaders
 300                 .firstValueAsLong(":status")
 301                 .orElseThrow(() -> new IOException("no statuscode in response"));
 302 
 303         this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null,
 304                 c.sslParameters(), HttpClient.Version.HTTP_2, c);
 305         this.responseContentLen = responseHeaders
 306                 .firstValueAsLong("content-length")
 307                 .orElse(-1L);
 308         // different implementations for normal streams and pushed streams
 309         completeResponse(response);
 310     }
 311 
 312     void incoming_reset(ResetFrame frame) {
 313         // TODO: implement reset
 314         int error = frame.getErrorCode();
 315         IOException e = new IOException(ErrorFrame.stringForCode(error));
 316         completeResponseExceptionally(e);
 317         throw new UnsupportedOperationException("Not implemented");
 318     }
 319 
 320     void incoming_priority(PriorityFrame frame) {
 321         // TODO: implement priority
 322         throw new UnsupportedOperationException("Not implemented");
 323     }
 324 
 325     void incoming_windowUpdate(WindowUpdateFrame frame) {
 326         outgoingWindow.update(frame.getUpdate());
 327     }
 328 
 329     void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
 330         if (Log.requests()) {
 331             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 332         }
 333         PushGroup<?> pushGroup = request.pushGroup();
 334         if (pushGroup == null) {
 335             cancelImpl(new IllegalStateException("unexpected push promise"));
 336         }
 337         // get the handler and call it.
 338         BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
 339             pushGroup.pushHandler();
 340 
 341         CompletableFuture<HttpResponse> pushCF = pushStream
 342                 .getResponseAsync(null)
 343                 .thenApply(r -> (HttpResponse)r);
 344         boolean accept = ph.apply(pushReq, pushCF);
 345         if (!accept) {
 346             IOException ex = new IOException("Stream cancelled by user");
 347             cancelImpl(ex);
 348             pushCF.completeExceptionally(ex);
 349         } else {
 350             pushStream.requestSent();
 351             pushGroup.addPush();
 352         }
 353     }
 354 
 355     private OutgoingHeaders headerFrame(long contentLength) {
 356         HttpHeadersImpl h = request.getSystemHeaders();
 357         if (contentLength > 0) {
 358             h.setHeader("content-length", Long.toString(contentLength));
 359         }
 360         setPseudoHeaderFields();
 361         OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this);
 362         if (contentLength == 0) {
 363             f.setFlag(HeadersFrame.END_STREAM);
 364         }
 365         return f;
 366     }
 367 
 368     private void setPseudoHeaderFields() {
 369         HttpHeadersImpl hdrs = requestPseudoHeaders;
 370         String method = request.method();
 371         hdrs.setHeader(":method", method);
 372         URI uri = request.uri();
 373         hdrs.setHeader(":scheme", uri.getScheme());
 374         // TODO: userinfo deprecated. Needs to be removed
 375         hdrs.setHeader(":authority", uri.getAuthority());
 376         // TODO: ensure header names beginning with : not in user headers
 377         String query = uri.getQuery();
 378         String path = uri.getPath();
 379         if (path == null) {
 380             if (method.equalsIgnoreCase("OPTIONS")) {
 381                 path = "*";
 382             } else {
 383                 path = "/";
 384             }
 385         }
 386         if (query != null) {
 387             path += "?" + query;
 388         }
 389         hdrs.setHeader(":path", path);
 390     }
 391 
 392     HttpHeadersImpl getRequestPseudoHeaders() {
 393         return requestPseudoHeaders;
 394     }
 395 
 396     @Override
 397     HttpResponseImpl getResponse() throws IOException {
 398         try {
 399             if (request.timeval() > 0) {
 400                 return getResponseAsync(null).get(
 401                         request.timeval(), TimeUnit.MILLISECONDS);
 402             } else {
 403                 return getResponseAsync(null).join();
 404             }
 405         } catch (TimeoutException e) {
 406             throw new HttpTimeoutException("Response timed out");
 407         } catch (InterruptedException | ExecutionException | CompletionException e) {
 408             Throwable t = e.getCause();
 409             if (t instanceof IOException) {
 410                 throw (IOException)t;
 411             }
 412             throw new IOException(e);
 413         }
 414     }
 415 
 416     @Override
 417     void sendRequest() throws IOException, InterruptedException {
 418         sendHeadersOnly();
 419         sendBody();
 420     }
 421 
 422     /**
 423      * A simple general purpose blocking flow controller
 424      */
 425     class FlowController implements LongConsumer {
 426         int permits;
 427 
 428         FlowController() {
 429             this.permits = 0;
 430         }
 431 
 432         @Override
 433         public synchronized void accept(long n) {
 434             if (n < 1) {
 435                 throw new InternalError("FlowController.accept called with " + n);
 436             }
 437             if (permits == 0) {
 438                 permits += n;
 439                 notifyAll();
 440             } else {
 441                 permits += n;
 442             }
 443         }
 444 
 445         public synchronized void take() throws InterruptedException {
 446             take(1);
 447         }
 448 
 449         public synchronized void take(int amount) throws InterruptedException {
 450             assert permits >= 0;
 451             while (amount > 0) {
 452                 int n = Math.min(amount, permits);
 453                 permits -= n;
 454                 amount -= n;
 455                 if (amount > 0)
 456                     wait();
 457             }
 458         }
 459     }
 460 
 461     @Override
 462     void sendHeadersOnly() throws IOException, InterruptedException {
 463         if (Log.requests() && request != null) {
 464             Log.logRequest(request.toString());
 465         }
 466         requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController);
 467         OutgoingHeaders f = headerFrame(requestContentLen);
 468         connection.sendFrame(f);
 469     }
 470 
 471     @Override
 472     void sendBody() throws IOException, InterruptedException {
 473         sendBodyImpl();
 474     }
 475 
 476     void registerStream(int id) {
 477         this.streamid = id;
 478         connection.putStream(this, streamid);
 479     }
 480 
 481     DataFrame getDataFrame() throws IOException, InterruptedException {
 482         userRequestFlowController.take();
 483         int maxpayloadLen = connection.getMaxSendFrameSize();
 484         ByteBuffer buffer = connection.getBuffer();
 485         buffer.limit(maxpayloadLen);
 486         boolean complete = requestProcessor.onRequestBodyChunk(buffer);
 487         buffer.flip();
 488         int amount = buffer.remaining();
 489         // wait for flow control if necessary. Following method will block
 490         // until after headers frame is sent, so correct streamid is set.
 491         outgoingWindow.acquire(amount);
 492         connection.connectionSendWindow.acquire(amount);
 493         DataFrame df = new DataFrame();
 494         df.streamid(streamid);
 495         if (complete) {
 496             df.setFlag(DataFrame.END_STREAM);
 497         }
 498         df.setData(buffer);
 499         df.computeLength();
 500         return df;
 501     }
 502 
 503 
 504     @Override
 505     CompletableFuture<Void> sendHeadersAsync() {
 506         try {
 507             sendHeadersOnly();
 508             return CompletableFuture.completedFuture(null);
 509         } catch (IOException | InterruptedException ex) {
 510             return CompletableFuture.failedFuture(ex);
 511         }
 512     }
 513 
 514     /**
 515      * A List of responses relating to this stream. Normally there is only
 516      * one response, but intermediate responses like 100 are allowed
 517      * and must be passed up to higher level before continuing. Deals with races
 518      * such as if responses are returned before the CFs get created by
 519      * getResponseAsync()
 520      */
 521 
 522     final List<CompletableFuture<HttpResponseImpl>> response_cfs = new ArrayList<>(5);
 523 
 524     @Override
 525     CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) {
 526         CompletableFuture<HttpResponseImpl> cf;
 527         synchronized (response_cfs) {
 528             if (!response_cfs.isEmpty()) {
 529                 cf = response_cfs.remove(0);
 530             } else {
 531                 cf = new CompletableFuture<>();
 532                 response_cfs.add(cf);
 533             }
 534         }
 535         PushGroup<?> pg = request.pushGroup();
 536         if (pg != null) {
 537             // if an error occurs make sure it is recorded in the PushGroup
 538             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 539         }
 540         return cf;
 541     }
 542 
 543     /**
 544      * Completes the first uncompleted CF on list, and removes it. If there is no
 545      * uncompleted CF then creates one (completes it) and adds to list
 546      */
 547     void completeResponse(HttpResponse r) {
 548         HttpResponseImpl resp = (HttpResponseImpl)r;
 549         synchronized (response_cfs) {
 550             int cfs_len = response_cfs.size();
 551             for (int i=0; i<cfs_len; i++) {
 552                 CompletableFuture<HttpResponseImpl> cf = response_cfs.get(i);
 553                 if (!cf.isDone()) {
 554                     cf.complete(resp);
 555                     response_cfs.remove(cf);
 556                     return;
 557                 }
 558             }
 559             response_cfs.add(CompletableFuture.completedFuture(resp));
 560         }
 561     }
 562 
 563     // methods to update state and remove stream when finished
 564 
 565     synchronized void requestSent() {
 566         requestSent = true;
 567         if (responseReceived)
 568             connection.deleteStream(this);
 569     }
 570 
 571     synchronized void responseReceived() {
 572         responseReceived = true;
 573         if (requestSent)
 574             connection.deleteStream(this);
 575         PushGroup<?> pg = request.pushGroup();
 576         if (pg != null)
 577             pg.noMorePushes();
 578     }
 579 
 580     /**
 581      * same as above but for errors
 582      *
 583      * @param t
 584      */
 585     void completeResponseExceptionally(Throwable t) {
 586         synchronized (response_cfs) {
 587             for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
 588                 if (!cf.isDone()) {
 589                     cf.completeExceptionally(t);
 590                     response_cfs.remove(cf);
 591                     return;
 592                 }
 593             }
 594             response_cfs.add(CompletableFuture.failedFuture(t));
 595         }
 596     }
 597 
 598     void sendBodyImpl() throws IOException, InterruptedException {
 599         if (requestContentLen == 0) {
 600             // no body
 601             requestSent();
 602             return;
 603         }
 604         DataFrame df;
 605         do {
 606             df = getDataFrame();
 607             // TODO: check accumulated content length (if not checked below)
 608             connection.sendDataFrame(df);
 609         } while (!df.getFlag(DataFrame.END_STREAM));
 610         requestSent();
 611     }
 612 
 613     @Override
 614     void cancel() {
 615         cancelImpl(new Exception("Cancelled"));
 616     }
 617 
 618 
 619     void cancelImpl(Throwable e) {
 620         Log.logTrace("cancelling stream: {0}\n", e.toString());
 621         inputQ.close();
 622         completeResponseExceptionally(e);
 623         try {
 624             connection.resetStream(streamid, ResetFrame.CANCEL);
 625         } catch (IOException | InterruptedException ex) {
 626             Log.logError(ex);
 627         }
 628     }
 629 
 630     @Override
 631     CompletableFuture<Void> sendRequestAsync() {
 632         CompletableFuture<Void> cf = new CompletableFuture<>();
 633         executor.execute(() -> {
 634            try {
 635                sendRequest();
 636                cf.complete(null);
 637            } catch (IOException |InterruptedException e) {
 638                cf.completeExceptionally(e);
 639            }
 640         }, null);
 641         return cf;
 642     }
 643 
 644     @Override
 645     <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
 646         this.responseProcessor = processor;
 647         T body = processor.onResponseBodyStart(
 648                     responseContentLen, responseHeaders,
 649                     responseFlowController); // TODO: filter headers
 650         if (body == null) {
 651             receiveData();
 652             body = processor.onResponseComplete();
 653         } else
 654             receiveDataAsync(processor);
 655         responseReceived();
 656         return body;
 657     }
 658 
 659     // called from Http2Connection reader thread
 660     void updateOutgoingWindow(int update) {
 661         outgoingWindow.update(update);
 662     }
 663 
 664     void close(String msg) {
 665         cancel();
 666     }
 667 
 668     static class PushedStream extends Stream {
 669         final PushGroup<?> pushGroup;
 670         final private Stream parent;      // used by server push streams
 671         // push streams need the response CF allocated up front as it is
 672         // given directly to user via the multi handler callback function.
 673         final CompletableFuture<HttpResponseImpl> pushCF;
 674         final HttpRequestImpl pushReq;
 675 
 676         PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
 677                 Http2Connection connection, Stream parent,
 678                 HttpRequestImpl pushReq) {
 679             super(client, connection, pushReq);
 680             this.pushGroup = pushGroup;
 681             this.pushReq = pushReq;
 682             this.pushCF = new CompletableFuture<>();
 683             this.parent = parent;
 684         }
 685 
 686         // Following methods call the super class but in case of
 687         // error record it in the PushGroup. The error method is called
 688         // with a null value when no error occurred (is a no-op)
 689         @Override
 690         CompletableFuture<Void> sendBodyAsync() {
 691             return super.sendBodyAsync()
 692                         .whenComplete((v, t) -> pushGroup.pushError(t));
 693         }
 694 
 695         @Override
 696         CompletableFuture<Void> sendHeadersAsync() {
 697             return super.sendHeadersAsync()
 698                         .whenComplete((v, t) -> pushGroup.pushError(t));
 699         }
 700 
 701         @Override
 702         CompletableFuture<Void> sendRequestAsync() {
 703             return super.sendRequestAsync()
 704                         .whenComplete((v, t) -> pushGroup.pushError(t));
 705         }
 706 
 707         @Override
 708         CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) {
 709             return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
 710         }
 711 
 712         @Override
 713         <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
 714             return super.responseBodyAsync(processor)
 715                         .whenComplete((v, t) -> pushGroup.pushError(t));
 716         }
 717 
 718         @Override
 719         void completeResponse(HttpResponse r) {
 720             HttpResponseImpl resp = (HttpResponseImpl)r;
 721             Utils.logResponse(resp);
 722             pushCF.complete(resp);
 723         }
 724 
 725         @Override
 726         void completeResponseExceptionally(Throwable t) {
 727             pushCF.completeExceptionally(t);
 728         }
 729 
 730         @Override
 731         synchronized void responseReceived() {
 732             super.responseReceived();
 733             pushGroup.pushCompleted();
 734         }
 735 
 736         // create and return the PushResponseImpl
 737         @Override
 738         protected void handleResponse() {
 739             HttpConnection c = connection.connection; // TODO: improve
 740             long statusCode = responseHeaders
 741                 .firstValueAsLong(":status")
 742                 .orElse(-1L);
 743 
 744             if (statusCode == -1L)
 745                 completeResponseExceptionally(new IOException("No status code"));
 746             ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS);
 747             this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this,
 748                 c.sslParameters());
 749             this.responseContentLen = responseHeaders
 750                 .firstValueAsLong("content-length")
 751                 .orElse(-1L);
 752             // different implementations for normal streams and pushed streams
 753             completeResponse(response);
 754         }
 755     }
 756 
 757     /**
 758      * One PushGroup object is associated with the parent Stream of
 759      * the pushed Streams. This keeps track of all common state associated
 760      * with the pushes.
 761      */
 762     static class PushGroup<T> {
 763         // the overall completion object, completed when all pushes are done.
 764         final CompletableFuture<T> resultCF;
 765         Throwable error; // any exception that occured during pushes
 766 
 767         // CF for main response
 768         final CompletableFuture<HttpResponse> mainResponse;
 769 
 770         // user's processor object
 771         final HttpResponse.MultiProcessor<T> multiProcessor;
 772 
 773         // per push handler function provided by processor
 774         final private BiFunction<HttpRequest,
 775                            CompletableFuture<HttpResponse>,
 776                            Boolean> pushHandler;
 777         int numberOfPushes;
 778         int remainingPushes;
 779         boolean noMorePushes = false;
 780 
 781         PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) {
 782             this.resultCF = new CompletableFuture<>();
 783             this.mainResponse = new CompletableFuture<>();
 784             this.multiProcessor = multiProcessor;
 785             this.pushHandler = multiProcessor.onStart(req, mainResponse);
 786         }
 787 
 788         CompletableFuture<T> groupResult() {
 789             return resultCF;
 790         }
 791 
 792         CompletableFuture<HttpResponse> mainResponse() {
 793             return mainResponse;
 794         }
 795 
 796         private BiFunction<HttpRequest,
 797             CompletableFuture<HttpResponse>, Boolean> pushHandler()
 798         {
 799                 return pushHandler;
 800         }
 801 
 802         synchronized void addPush() {
 803             numberOfPushes++;
 804             remainingPushes++;
 805         }
 806 
 807         synchronized int numberOfPushes() {
 808             return numberOfPushes;
 809         }
 810         // This is called when the main body response completes because it means
 811         // no more PUSH_PROMISEs are possible
 812         synchronized void noMorePushes() {
 813             noMorePushes = true;
 814             checkIfCompleted();
 815         }
 816 
 817         synchronized void pushCompleted() {
 818             remainingPushes--;
 819             checkIfCompleted();
 820         }
 821 
 822         synchronized void checkIfCompleted() {
 823             if (remainingPushes == 0 && error == null && noMorePushes) {
 824                 T overallResult = multiProcessor.onComplete();
 825                 resultCF.complete(overallResult);
 826             }
 827         }
 828 
 829         synchronized void pushError(Throwable t) {
 830             if (t == null)
 831                 return;
 832             this.error = t;
 833             resultCF.completeExceptionally(t);
 834         }
 835     }
 836 
 837     class StreamWindowUpdateSender extends WindowUpdateSender {
 838 
 839         StreamWindowUpdateSender(Http2Connection connection) {
 840             super(connection);
 841         }
 842 
 843         @Override
 844         int getStreamId() {
 845             return streamid;
 846         }
 847     }
 848 }