1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  */
  24 
  25 package java.net.http;
  26 
  27 import sun.net.httpclient.hpack.DecodingCallback;
  28 
  29 import java.io.IOException;
  30 import java.net.URI;
  31 import java.nio.ByteBuffer;
  32 import java.util.LinkedList;
  33 import java.util.ArrayList;
  34 import java.util.List;
  35 import java.util.concurrent.CompletableFuture;
  36 import java.util.concurrent.CompletionException;
  37 import java.util.concurrent.ExecutionException;
  38 import java.util.concurrent.TimeUnit;
  39 import java.util.concurrent.TimeoutException;
  40 import java.util.function.BiFunction;
  41 import java.util.function.LongConsumer;
  42 
  43 /**
  44  * Http/2 Stream handling.
  45  *
  46  * REQUESTS
  47  *
  48  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  49  *
  50  * sendRequest() -- sendHeadersOnly() + sendBody()
  51  *
  52  * sendBody() -- in calling thread: obeys all flow control (so may block)
  53  *               obtains data from request body processor and places on connection
  54  *               outbound Q.
  55  *
  56  * sendBodyAsync() -- calls sendBody() in an executor thread.
  57  *
  58  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  59  *
  60  * sendRequestAsync() -- calls sendRequest() in an executor thread
  61  *
  62  * RESPONSES
  63  *
  64  * Multiple responses can be received per request. Responses are queued up on
  65  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
  66  * with the next response
  67  *
  68  * getResponseAsync() -- queries list of response CFs and returns first one
  69  *               if one exists. Otherwise, creates one and adds it to list
  70  *               and returns it. Completion is achieved through the
  71  *               incoming() upcall from connection reader thread.
  72  *
  73  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  74  *
  75  * responseBody() -- in calling thread: blocks for incoming DATA frames on
  76  *               stream inputQ. Obeys remote and local flow control so may block.
  77  *               Calls user response body processor with data buffers.
  78  *
  79  * responseBodyAsync() -- calls responseBody() in an executor thread.
  80  *
  81  * incoming() -- entry point called from connection reader thread. Frames are
  82  *               either handled immediately without blocking or for data frames
  83  *               placed on the stream's inputQ which is consumed by the stream's
  84  *               reader thread.
  85  *
  86  * PushedStream sub class
  87  * ======================
  88  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  89  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  90  * is created. PushedStream does not use responseCF list as there can be only
  91  * one response. The CF is created when the object created and when the response
  92  * HEADERS frame is received the object is completed.
  93  */
  94 class Stream extends ExchangeImpl {
  95 
  96     final Queue<Http2Frame> inputQ;
  97 
  98     volatile int streamid;
  99 
 100     long responseContentLen = -1;
 101     long responseBytesProcessed = 0;
 102     long requestContentLen;
 103 
 104     Http2Connection connection;
 105     HttpClientImpl client;
 106     final HttpRequestImpl request;
 107     final DecodingCallback rspHeadersConsumer;
 108     HttpHeadersImpl responseHeaders;
 109     final HttpHeadersImpl requestHeaders;
 110     final HttpHeadersImpl requestPseudoHeaders;
 111     HttpResponse.BodyProcessor<?> responseProcessor;
 112     final HttpRequest.BodyProcessor requestProcessor;
 113     HttpResponse response;
 114 
 115     // state flags
 116     boolean requestSent, responseReceived;
 117 
 118     final FlowController userRequestFlowController =
 119             new FlowController();
 120     final FlowController remoteRequestFlowController =
 121             new FlowController();
 122     final FlowController responseFlowController =
 123             new FlowController();
 124 
 125     final ExecutorWrapper executor;
 126 
 127     @Override
 128     @SuppressWarnings("unchecked")
 129     <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
 130         this.responseProcessor = processor;
 131         CompletableFuture<T> cf;
 132         try {
 133             T body = processor.onResponseBodyStart(
 134                     responseContentLen, responseHeaders,
 135                     responseFlowController); // TODO: filter headers
 136             if (body != null) {
 137                 cf = CompletableFuture.completedFuture(body);
 138                 receiveDataAsync(processor);
 139             } else
 140                 cf = receiveDataAsync(processor);
 141         } catch (IOException e) {
 142             cf = CompletableFuture.failedFuture(e);
 143         }
 144         PushGroup<?> pg = request.pushGroup();
 145         if (pg != null) {
 146             // if an error occurs make sure it is recorded in the PushGroup
 147             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 148         }
 149         return cf;
 150     }
 151 
 152     @Override
 153     public String toString() {
 154         StringBuilder sb = new StringBuilder();
 155         sb.append("streamid: ")
 156                 .append(streamid);
 157         return sb.toString();
 158     }
 159 
 160     // pushes entire response body into response processor
 161     // blocking when required by local or remote flow control
 162     void receiveData() throws IOException {
 163         Http2Frame frame;
 164         DataFrame df = null;
 165         try {
 166             do {
 167                 frame = inputQ.take();
 168                 if (!(frame instanceof DataFrame)) {
 169                     assert false;
 170                     continue;
 171                 }
 172                 df = (DataFrame) frame;
 173                 int len = df.getDataLength();
 174                 ByteBuffer[] buffers = df.getData();
 175                 for (ByteBuffer b : buffers) {
 176                     responseFlowController.take();
 177                     responseProcessor.onResponseBodyChunk(b);
 178                 }
 179                 sendWindowUpdate(len);
 180             } while (!df.getFlag(DataFrame.END_STREAM));
 181         } catch (InterruptedException e) {
 182             throw new IOException(e);
 183         }
 184     }
 185 
 186     private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
 187         CompletableFuture<T> cf = new CompletableFuture<>();
 188         executor.execute(() -> {
 189             try {
 190                 receiveData();
 191                 T body = processor.onResponseComplete();
 192                 cf.complete(body);
 193                 responseReceived();
 194             } catch (Throwable t) {
 195                 cf.completeExceptionally(t);
 196             }
 197         }, null);
 198         return cf;
 199     }
 200 
 201     private void sendWindowUpdate(int increment)
 202             throws IOException, InterruptedException {
 203         if (increment == 0)
 204             return;
 205         LinkedList<Http2Frame> list = new LinkedList<>();
 206         WindowUpdateFrame frame = new WindowUpdateFrame();
 207         frame.streamid(streamid);
 208         frame.setUpdate(increment);
 209         list.add(frame);
 210         frame = new WindowUpdateFrame();
 211         frame.streamid(0);
 212         frame.setUpdate(increment);
 213         list.add(frame);
 214         connection.sendFrames(list);
 215     }
 216 
 217     @Override
 218     CompletableFuture<Void> sendBodyAsync() {
 219         final CompletableFuture<Void> cf = new CompletableFuture<>();
 220         executor.execute(() -> {
 221             try {
 222                 sendBodyImpl();
 223                 cf.complete(null);
 224             } catch (IOException | InterruptedException e) {
 225                 cf.completeExceptionally(e);
 226             }
 227         }, null);
 228         return cf;
 229     }
 230 
 231     @SuppressWarnings("unchecked")
 232     Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
 233         super(e);
 234         this.client = client;
 235         this.connection = connection;
 236         this.request = e.request();
 237         this.requestProcessor = request.requestProcessor();
 238         responseHeaders = new HttpHeadersImpl();
 239         requestHeaders = new HttpHeadersImpl();
 240         rspHeadersConsumer = (name, value) -> {
 241             responseHeaders.addHeader(name.toString(), value.toString());
 242         };
 243         this.executor = client.executorWrapper();
 244         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 245         this.requestPseudoHeaders = new HttpHeadersImpl();
 246         // NEW
 247         this.inputQ = new Queue<>();
 248     }
 249 
 250     @SuppressWarnings("unchecked")
 251     Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
 252         super(null);
 253         this.client = client;
 254         this.connection = connection;
 255         this.request = req;
 256         this.requestProcessor = null;
 257         responseHeaders = new HttpHeadersImpl();
 258         requestHeaders = new HttpHeadersImpl();
 259         rspHeadersConsumer = (name, value) -> {
 260             responseHeaders.addHeader(name.toString(), value.toString());
 261         };
 262         this.executor = client.executorWrapper();
 263         //this.response_cf = new CompletableFuture<HttpResponseImpl>();
 264         this.requestPseudoHeaders = new HttpHeadersImpl();
 265         // NEW
 266         this.inputQ = new Queue<>();
 267     }
 268 
 269     /**
 270      * Entry point from Http2Connection reader thread.
 271      *
 272      * Data frames will be removed by response body thread.
 273      *
 274      * @param frame
 275      * @throws IOException
 276      */
 277     void incoming(Http2Frame frame) throws IOException, InterruptedException {
 278         if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
 279             // Complete headers accumulated. handle response.
 280             // It's okay if there are multiple HeaderFrames.
 281             handleResponse();
 282         } else if (frame instanceof DataFrame) {
 283             inputQ.put(frame);
 284         } else {
 285             otherFrame(frame);
 286         }
 287     }
 288 
 289     void otherFrame(Http2Frame frame) throws IOException {
 290         switch (frame.type()) {
 291             case WindowUpdateFrame.TYPE:
 292                 incoming_windowUpdate((WindowUpdateFrame) frame);
 293                 break;
 294             case ResetFrame.TYPE:
 295                 incoming_reset((ResetFrame) frame);
 296                 break;
 297             case PriorityFrame.TYPE:
 298                 incoming_priority((PriorityFrame) frame);
 299                 break;
 300             default:
 301                 String msg = "Unexpected frame: " + frame.toString();
 302                 throw new IOException(msg);
 303         }
 304     }
 305 
 306     // The Hpack decoder decodes into one of these consumers of name,value pairs
 307 
 308     DecodingCallback rspHeadersConsumer() {
 309         return rspHeadersConsumer;
 310     }
 311 
 312     // create and return the HttpResponseImpl
 313     protected void handleResponse() throws IOException {
 314         HttpConnection c = connection.connection; // TODO: improve
 315         long statusCode = responseHeaders
 316                 .firstValueAsLong(":status")
 317                 .orElseThrow(() -> new IOException("no statuscode in response"));
 318 
 319         this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null,
 320                 c.sslParameters(), HttpClient.Version.HTTP_2, c);
 321         this.responseContentLen = responseHeaders
 322                 .firstValueAsLong("content-length")
 323                 .orElse(-1L);
 324         // different implementations for normal streams and pushed streams
 325         completeResponse(response);
 326     }
 327 
 328     void incoming_reset(ResetFrame frame) {
 329         // TODO: implement reset
 330         int error = frame.getErrorCode();
 331         IOException e = new IOException(ErrorFrame.stringForCode(error));
 332         completeResponseExceptionally(e);
 333         throw new UnsupportedOperationException("Not implemented");
 334     }
 335 
 336     void incoming_priority(PriorityFrame frame) {
 337         // TODO: implement priority
 338         throw new UnsupportedOperationException("Not implemented");
 339     }
 340 
 341     void incoming_windowUpdate(WindowUpdateFrame frame) {
 342         int amount = frame.getUpdate();
 343         if (amount > 0)
 344             remoteRequestFlowController.accept(amount);
 345     }
 346 
 347     void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
 348         if (Log.requests()) {
 349             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 350         }
 351         PushGroup<?> pushGroup = request.pushGroup();
 352         if (pushGroup == null) {
 353             cancelImpl(new IllegalStateException("unexpected push promise"));
 354         }
 355         // get the handler and call it.
 356         BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
 357             pushGroup.pushHandler();
 358 
 359         CompletableFuture<HttpResponse> pushCF = pushStream
 360                 .getResponseAsync(null)
 361                 .thenApply(r -> (HttpResponse)r);
 362         boolean accept = ph.apply(pushReq, pushCF);
 363         if (!accept) {
 364             IOException ex = new IOException("Stream cancelled by user");
 365             cancelImpl(ex);
 366             pushCF.completeExceptionally(ex);
 367         } else {
 368             pushStream.requestSent();
 369             pushGroup.addPush();
 370         }
 371     }
 372 
 373     private OutgoingHeaders headerFrame(long contentLength) {
 374         HttpHeadersImpl h = request.getSystemHeaders();
 375         if (contentLength > 0) {
 376             h.setHeader("content-length", Long.toString(contentLength));
 377         }
 378         setPseudoHeaderFields();
 379         OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this);
 380         if (contentLength == 0) {
 381             f.setFlag(HeadersFrame.END_STREAM);
 382         }
 383         return f;
 384     }
 385 
 386     private void setPseudoHeaderFields() {
 387         HttpHeadersImpl hdrs = requestPseudoHeaders;
 388         String method = request.method();
 389         hdrs.setHeader(":method", method);
 390         URI uri = request.uri();
 391         hdrs.setHeader(":scheme", uri.getScheme());
 392         // TODO: userinfo deprecated. Needs to be removed
 393         hdrs.setHeader(":authority", uri.getAuthority());
 394         // TODO: ensure header names beginning with : not in user headers
 395         String query = uri.getQuery();
 396         String path = uri.getPath();
 397         if (path == null) {
 398             if (method.equalsIgnoreCase("OPTIONS")) {
 399                 path = "*";
 400             } else {
 401                 path = "/";
 402             }
 403         }
 404         if (query != null) {
 405             path += "?" + query;
 406         }
 407         hdrs.setHeader(":path", path);
 408     }
 409 
 410     HttpHeadersImpl getRequestPseudoHeaders() {
 411         return requestPseudoHeaders;
 412     }
 413 
 414     @Override
 415     HttpResponseImpl getResponse() throws IOException {
 416         try {
 417             if (request.timeval() > 0) {
 418                 return getResponseAsync(null).get(
 419                         request.timeval(), TimeUnit.MILLISECONDS);
 420             } else {
 421                 return getResponseAsync(null).join();
 422             }
 423         } catch (TimeoutException e) {
 424             throw new HttpTimeoutException("Response timed out");
 425         } catch (InterruptedException | ExecutionException | CompletionException e) {
 426             Throwable t = e.getCause();
 427             if (t instanceof IOException) {
 428                 throw (IOException)t;
 429             }
 430             throw new IOException(e);
 431         }
 432     }
 433 
 434     @Override
 435     void sendRequest() throws IOException, InterruptedException {
 436         sendHeadersOnly();
 437         sendBody();
 438     }
 439 
 440     /**
 441      * A simple general purpose blocking flow controller
 442      */
 443     class FlowController implements LongConsumer {
 444         int permits;
 445 
 446         FlowController() {
 447             this.permits = 0;
 448         }
 449 
 450         @Override
 451         public synchronized void accept(long n) {
 452             if (n < 1) {
 453                 throw new InternalError("FlowController.accept called with " + n);
 454             }
 455             if (permits == 0) {
 456                 permits += n;
 457                 notifyAll();
 458             } else {
 459                 permits += n;
 460             }
 461         }
 462 
 463         public synchronized void take() throws InterruptedException {
 464             take(1);
 465         }
 466 
 467         public synchronized void take(int amount) throws InterruptedException {
 468             assert permits >= 0;
 469             while (permits < amount) {
 470                 int n = Math.min(amount, permits);
 471                 permits -= n;
 472                 amount -= n;
 473                 if (amount > 0)
 474                     wait();
 475             }
 476         }
 477     }
 478 
 479     @Override
 480     void sendHeadersOnly() throws IOException, InterruptedException {
 481         if (Log.requests() && request != null) {
 482             Log.logRequest(request.toString());
 483         }
 484         requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController);
 485         OutgoingHeaders f = headerFrame(requestContentLen);
 486         connection.sendFrame(f);
 487     }
 488 
 489     @Override
 490     void sendBody() throws IOException, InterruptedException {
 491         sendBodyImpl();
 492     }
 493 
 494     void registerStream(int id) {
 495         this.streamid = id;
 496         connection.putStream(this, streamid);
 497     }
 498 
 499     DataFrame getDataFrame() throws IOException, InterruptedException {
 500         userRequestFlowController.take();
 501         int maxpayloadLen = connection.getMaxSendFrameSize() - 9;
 502         ByteBuffer buffer = connection.getBuffer();
 503         buffer.limit(maxpayloadLen);
 504         boolean complete = requestProcessor.onRequestBodyChunk(buffer);
 505         buffer.flip();
 506         int amount = buffer.remaining();
 507         // wait for flow control if necessary. Following method will block
 508         // until after headers frame is sent, so correct streamid is set.
 509         remoteRequestFlowController.take(amount);
 510         connection.obtainSendWindow(amount);
 511 
 512         DataFrame df = new DataFrame();
 513         df.streamid(streamid);
 514         if (complete) {
 515             df.setFlag(DataFrame.END_STREAM);
 516         }
 517         df.setData(buffer);
 518         df.computeLength();
 519         return df;
 520     }
 521 
 522 
 523     @Override
 524     CompletableFuture<Void> sendHeadersAsync() {
 525         try {
 526             sendHeadersOnly();
 527             return CompletableFuture.completedFuture(null);
 528         } catch (IOException | InterruptedException ex) {
 529             return CompletableFuture.failedFuture(ex);
 530         }
 531     }
 532 
 533     /**
 534      * A List of responses relating to this stream. Normally there is only
 535      * one response, but intermediate responses like 100 are allowed
 536      * and must be passed up to higher level before continuing. Deals with races
 537      * such as if responses are returned before the CFs get created by
 538      * getResponseAsync()
 539      */
 540 
 541     final List<CompletableFuture<HttpResponseImpl>> response_cfs = new ArrayList<>(5);
 542 
 543     @Override
 544     CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) {
 545         CompletableFuture<HttpResponseImpl> cf;
 546         synchronized (response_cfs) {
 547             if (!response_cfs.isEmpty()) {
 548                 cf = response_cfs.remove(0);
 549             } else {
 550                 cf = new CompletableFuture<>();
 551                 response_cfs.add(cf);
 552             }
 553         }
 554         PushGroup<?> pg = request.pushGroup();
 555         if (pg != null) {
 556             // if an error occurs make sure it is recorded in the PushGroup
 557             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 558         }
 559         return cf;
 560     }
 561 
 562     /**
 563      * Completes the first uncompleted CF on list, and removes it. If there is no
 564      * uncompleted CF then creates one (completes it) and adds to list
 565      */
 566     void completeResponse(HttpResponse r) {
 567         HttpResponseImpl resp = (HttpResponseImpl)r;
 568         synchronized (response_cfs) {
 569             int cfs_len = response_cfs.size();
 570             for (int i=0; i<cfs_len; i++) {
 571                 CompletableFuture<HttpResponseImpl> cf = response_cfs.get(i);
 572                 if (!cf.isDone()) {
 573                     cf.complete(resp);
 574                     response_cfs.remove(cf);
 575                     return;
 576                 }
 577             }
 578             response_cfs.add(CompletableFuture.completedFuture(resp));
 579         }
 580     }
 581 
 582     // methods to update state and remove stream when finished
 583 
 584     synchronized void requestSent() {
 585         requestSent = true;
 586         if (responseReceived)
 587             connection.deleteStream(this);
 588     }
 589 
 590     synchronized void responseReceived() {
 591         responseReceived = true;
 592         if (requestSent)
 593             connection.deleteStream(this);
 594         PushGroup<?> pg = request.pushGroup();
 595         if (pg != null)
 596             pg.noMorePushes();
 597     }
 598 
 599     /**
 600      * same as above but for errors
 601      *
 602      * @param t
 603      */
 604     void completeResponseExceptionally(Throwable t) {
 605         synchronized (response_cfs) {
 606             for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
 607                 if (!cf.isDone()) {
 608                     cf.completeExceptionally(t);
 609                     response_cfs.remove(cf);
 610                     return;
 611                 }
 612             }
 613             response_cfs.add(CompletableFuture.failedFuture(t));
 614         }
 615     }
 616 
 617     void sendBodyImpl() throws IOException, InterruptedException {
 618         if (requestContentLen == 0) {
 619             // no body
 620             requestSent();
 621             return;
 622         }
 623         DataFrame df;
 624         do {
 625             df = getDataFrame();
 626             // TODO: check accumulated content length (if not checked below)
 627             connection.sendFrame(df);
 628         } while (!df.getFlag(DataFrame.END_STREAM));
 629         requestSent();
 630     }
 631 
 632     @Override
 633     void cancel() {
 634         cancelImpl(new Exception("Cancelled"));
 635     }
 636 
 637 
 638     void cancelImpl(Throwable e) {
 639         Log.logTrace("cancelling stream: {0}\n", e.toString());
 640         inputQ.close();
 641         completeResponseExceptionally(e);
 642         try {
 643             connection.resetStream(streamid, ResetFrame.CANCEL);
 644         } catch (IOException | InterruptedException ex) {
 645             Log.logError(ex);
 646         }
 647     }
 648 
 649     @Override
 650     CompletableFuture<Void> sendRequestAsync() {
 651         CompletableFuture<Void> cf = new CompletableFuture<>();
 652         executor.execute(() -> {
 653            try {
 654                sendRequest();
 655                cf.complete(null);
 656            } catch (IOException |InterruptedException e) {
 657                cf.completeExceptionally(e);
 658            }
 659         }, null);
 660         return cf;
 661     }
 662 
 663     @Override
 664     <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
 665         this.responseProcessor = processor;
 666         T body = processor.onResponseBodyStart(
 667                     responseContentLen, responseHeaders,
 668                     responseFlowController); // TODO: filter headers
 669         if (body == null) {
 670             receiveData();
 671             body = processor.onResponseComplete();
 672         } else
 673             receiveDataAsync(processor);
 674         responseReceived();
 675         return body;
 676     }
 677 
 678     // called from Http2Connection reader thread
 679     synchronized void updateOutgoingWindow(int update) {
 680         remoteRequestFlowController.accept(update);
 681     }
 682 
 683     void close(String msg) {
 684         cancel();
 685     }
 686 
 687     static class PushedStream extends Stream {
 688         final PushGroup<?> pushGroup;
 689         final private Stream parent;      // used by server push streams
 690         // push streams need the response CF allocated up front as it is
 691         // given directly to user via the multi handler callback function.
 692         final CompletableFuture<HttpResponseImpl> pushCF;
 693         final HttpRequestImpl pushReq;
 694 
 695         PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
 696                 Http2Connection connection, Stream parent,
 697                 HttpRequestImpl pushReq) {
 698             super(client, connection, pushReq);
 699             this.pushGroup = pushGroup;
 700             this.pushReq = pushReq;
 701             this.pushCF = new CompletableFuture<>();
 702             this.parent = parent;
 703         }
 704 
 705         // Following methods call the super class but in case of
 706         // error record it in the PushGroup. The error method is called
 707         // with a null value when no error occurred (is a no-op)
 708         @Override
 709         CompletableFuture<Void> sendBodyAsync() {
 710             return super.sendBodyAsync()
 711                         .whenComplete((v, t) -> pushGroup.pushError(t));
 712         }
 713 
 714         @Override
 715         CompletableFuture<Void> sendHeadersAsync() {
 716             return super.sendHeadersAsync()
 717                         .whenComplete((v, t) -> pushGroup.pushError(t));
 718         }
 719 
 720         @Override
 721         CompletableFuture<Void> sendRequestAsync() {
 722             return super.sendRequestAsync()
 723                         .whenComplete((v, t) -> pushGroup.pushError(t));
 724         }
 725 
 726         @Override
 727         CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) {
 728             return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
 729         }
 730 
 731         @Override
 732         <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
 733             return super.responseBodyAsync(processor)
 734                         .whenComplete((v, t) -> pushGroup.pushError(t));
 735         }
 736 
 737         @Override
 738         void completeResponse(HttpResponse r) {
 739             HttpResponseImpl resp = (HttpResponseImpl)r;
 740             Utils.logResponse(resp);
 741             pushCF.complete(resp);
 742         }
 743 
 744         @Override
 745         void completeResponseExceptionally(Throwable t) {
 746             pushCF.completeExceptionally(t);
 747         }
 748 
 749         @Override
 750         synchronized void responseReceived() {
 751             super.responseReceived();
 752             pushGroup.pushCompleted();
 753         }
 754 
 755         // create and return the PushResponseImpl
 756         @Override
 757         protected void handleResponse() {
 758             HttpConnection c = connection.connection; // TODO: improve
 759             long statusCode = responseHeaders
 760                 .firstValueAsLong(":status")
 761                 .orElse(-1L);
 762 
 763             if (statusCode == -1L)
 764                 completeResponseExceptionally(new IOException("No status code"));
 765             ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS);
 766             this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this,
 767                 c.sslParameters());
 768             this.responseContentLen = responseHeaders
 769                 .firstValueAsLong("content-length")
 770                 .orElse(-1L);
 771             // different implementations for normal streams and pushed streams
 772             completeResponse(response);
 773         }
 774     }
 775 
 776     /**
 777      * One PushGroup object is associated with the parent Stream of
 778      * the pushed Streams. This keeps track of all common state associated
 779      * with the pushes.
 780      */
 781     static class PushGroup<T> {
 782         // the overall completion object, completed when all pushes are done.
 783         final CompletableFuture<T> resultCF;
 784         Throwable error; // any exception that occured during pushes
 785 
 786         // CF for main response
 787         final CompletableFuture<HttpResponse> mainResponse;
 788 
 789         // user's processor object
 790         final HttpResponse.MultiProcessor<T> multiProcessor;
 791 
 792         // per push handler function provided by processor
 793         final private BiFunction<HttpRequest,
 794                            CompletableFuture<HttpResponse>,
 795                            Boolean> pushHandler;
 796         int numberOfPushes;
 797         int remainingPushes;
 798         boolean noMorePushes = false;
 799 
 800         PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) {
 801             this.resultCF = new CompletableFuture<>();
 802             this.mainResponse = new CompletableFuture<>();
 803             this.multiProcessor = multiProcessor;
 804             this.pushHandler = multiProcessor.onStart(req, mainResponse);
 805         }
 806 
 807         CompletableFuture<T> groupResult() {
 808             return resultCF;
 809         }
 810 
 811         CompletableFuture<HttpResponse> mainResponse() {
 812             return mainResponse;
 813         }
 814 
 815         private BiFunction<HttpRequest,
 816             CompletableFuture<HttpResponse>, Boolean> pushHandler()
 817         {
 818                 return pushHandler;
 819         }
 820 
 821         synchronized void addPush() {
 822             numberOfPushes++;
 823             remainingPushes++;
 824         }
 825 
 826         synchronized int numberOfPushes() {
 827             return numberOfPushes;
 828         }
 829         // This is called when the main body response completes because it means
 830         // no more PUSH_PROMISEs are possible
 831         synchronized void noMorePushes() {
 832             noMorePushes = true;
 833             checkIfCompleted();
 834         }
 835 
 836         synchronized void pushCompleted() {
 837             remainingPushes--;
 838             checkIfCompleted();
 839         }
 840 
 841         synchronized void checkIfCompleted() {
 842             if (remainingPushes == 0 && error == null && noMorePushes) {
 843                 T overallResult = multiProcessor.onComplete();
 844                 resultCF.complete(overallResult);
 845             }
 846         }
 847 
 848         synchronized void pushError(Throwable t) {
 849             if (t == null)
 850                 return;
 851             this.error = t;
 852             resultCF.completeExceptionally(t);
 853         }
 854     }
 855 }