1 /*
   2  * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.util.concurrent.CompletableFuture;
  29 import jdk.incubator.http.internal.common.MinimalFuture;
  30 import jdk.incubator.http.internal.common.Log;
  31 
  32 /**
  33  * One PushGroup object is associated with the parent Stream of the pushed
  34  * Streams. This keeps track of all common state associated with the pushes.
  35  */
  36 class PushGroup<U,T> {
  37     // the overall completion object, completed when all pushes are done.
  38     final CompletableFuture<Void> resultCF;
  39     final CompletableFuture<Void> noMorePushesCF;
  40 
  41     volatile Throwable error; // any exception that occured during pushes
  42 
  43     // CF for main response
  44     final CompletableFuture<HttpResponse<T>> mainResponse;
  45 
  46     // user's processor object
  47     final HttpResponse.MultiProcessor<U, T> multiProcessor;
  48 
  49     final HttpResponse.BodyHandler<T> mainBodyHandler;
  50 
  51     int numberOfPushes;
  52     int remainingPushes;
  53     boolean noMorePushes = false;
  54 
  55     PushGroup(HttpResponse.MultiProcessor<U, T> multiProcessor, HttpRequestImpl req) {
  56         this(multiProcessor, req, new MinimalFuture<>());
  57     }
  58 
  59     // Check mainBodyHandler before calling nested constructor.
  60     private PushGroup(HttpResponse.MultiProcessor<U, T> multiProcessor,
  61             HttpRequestImpl req,
  62             CompletableFuture<HttpResponse<T>> mainResponse) {
  63         this(multiProcessor, mainResponse,
  64              multiProcessor.onRequest(req).orElseThrow(
  65                     () -> new IllegalArgumentException(
  66                      "A valid body processor for the main response is required")));
  67     }
  68 
  69     // This private constructor is called after all parameters have been
  70     // checked.
  71     private PushGroup(HttpResponse.MultiProcessor<U, T> multiProcessor,
  72                       CompletableFuture<HttpResponse<T>> mainResponse,
  73                       HttpResponse.BodyHandler<T> mainBodyHandler) {
  74 
  75         assert mainResponse != null; // A new instance is created above
  76         assert mainBodyHandler != null; // should have been checked above
  77 
  78         this.resultCF = new MinimalFuture<>();
  79         this.noMorePushesCF = new MinimalFuture<>();
  80         this.multiProcessor = multiProcessor;
  81         this.mainResponse = mainResponse.thenApply(r -> {
  82             multiProcessor.onResponse(r);
  83             return r;
  84         });
  85         this.mainBodyHandler = mainBodyHandler;
  86     }
  87 
  88     CompletableFuture<Void> groupResult() {
  89         return resultCF;
  90     }
  91 
  92     HttpResponse.MultiProcessor<U, T> processor() {
  93         return multiProcessor;
  94     }
  95 
  96     HttpResponse.BodyHandler<T> mainResponseHandler() {
  97         return mainBodyHandler;
  98     }
  99 
 100     synchronized void setMainResponse(CompletableFuture<HttpResponse<T>> r) {
 101         r.whenComplete((HttpResponse<T> response, Throwable t) -> {
 102             if (t != null)
 103                 mainResponse.completeExceptionally(t);
 104             else
 105                 mainResponse.complete(response);
 106         });
 107     }
 108 
 109     synchronized CompletableFuture<HttpResponse<T>> mainResponse() {
 110         return mainResponse;
 111     }
 112 
 113     synchronized void addPush() {
 114         numberOfPushes++;
 115         remainingPushes++;
 116     }
 117 
 118     synchronized int numberOfPushes() {
 119         return numberOfPushes;
 120     }
 121     // This is called when the main body response completes because it means
 122     // no more PUSH_PROMISEs are possible
 123 
 124     synchronized void noMorePushes(boolean noMore) {
 125         noMorePushes = noMore;
 126         checkIfCompleted();
 127         noMorePushesCF.complete(null);
 128     }
 129 
 130     CompletableFuture<Void> pushesCF() {
 131         return noMorePushesCF;
 132     }
 133 
 134     synchronized boolean noMorePushes() {
 135         return noMorePushes;
 136     }
 137 
 138     synchronized void pushCompleted() {
 139         remainingPushes--;
 140         checkIfCompleted();
 141     }
 142 
 143     synchronized void checkIfCompleted() {
 144         if (Log.trace()) {
 145             Log.logTrace("PushGroup remainingPushes={0} error={1} noMorePushes={2}",
 146                          remainingPushes,
 147                          (error==null)?error:error.getClass().getSimpleName(),
 148                          noMorePushes);
 149         }
 150         if (remainingPushes == 0 && error == null && noMorePushes) {
 151             if (Log.trace()) {
 152                 Log.logTrace("push completed");
 153             }
 154             resultCF.complete(null);
 155         }
 156     }
 157 
 158     synchronized void pushError(Throwable t) {
 159         if (t == null) {
 160             return;
 161         }
 162         this.error = t;
 163         resultCF.completeExceptionally(t);
 164     }
 165 }