1 /* 2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.util.concurrent.CompletableFuture; 29 import jdk.incubator.http.internal.common.MinimalFuture; 30 import jdk.incubator.http.internal.common.Log; 31 32 /** 33 * One PushGroup object is associated with the parent Stream of the pushed 34 * Streams. This keeps track of all common state associated with the pushes. 35 */ 36 class PushGroup<U,T> { 37 // the overall completion object, completed when all pushes are done. 38 final CompletableFuture<Void> resultCF; 39 final CompletableFuture<Void> noMorePushesCF; 40 41 volatile Throwable error; // any exception that occured during pushes 42 43 // CF for main response 44 final CompletableFuture<HttpResponse<T>> mainResponse; 45 46 // user's processor object 47 final HttpResponse.MultiProcessor<U, T> multiProcessor; 48 49 final HttpResponse.BodyHandler<T> mainBodyHandler; 50 51 int numberOfPushes; 52 int remainingPushes; 53 boolean noMorePushes = false; 54 55 PushGroup(HttpResponse.MultiProcessor<U, T> multiProcessor, HttpRequestImpl req) { 56 this(multiProcessor, req, new MinimalFuture<>()); 57 } 58 59 // Check mainBodyHandler before calling nested constructor. 60 private PushGroup(HttpResponse.MultiProcessor<U, T> multiProcessor, 61 HttpRequestImpl req, 62 CompletableFuture<HttpResponse<T>> mainResponse) { 63 this(multiProcessor, mainResponse, 64 multiProcessor.onRequest(req).orElseThrow( 65 () -> new IllegalArgumentException( 66 "A valid body processor for the main response is required"))); 67 } 68 69 // This private constructor is called after all parameters have been 70 // checked. 71 private PushGroup(HttpResponse.MultiProcessor<U, T> multiProcessor, 72 CompletableFuture<HttpResponse<T>> mainResponse, 73 HttpResponse.BodyHandler<T> mainBodyHandler) { 74 75 assert mainResponse != null; // A new instance is created above 76 assert mainBodyHandler != null; // should have been checked above 77 78 this.resultCF = new MinimalFuture<>(); 79 this.noMorePushesCF = new MinimalFuture<>(); 80 this.multiProcessor = multiProcessor; 81 this.mainResponse = mainResponse.thenApply(r -> { 82 multiProcessor.onResponse(r); 83 return r; 84 }); 85 this.mainBodyHandler = mainBodyHandler; 86 } 87 88 CompletableFuture<Void> groupResult() { 89 return resultCF; 90 } 91 92 HttpResponse.MultiProcessor<U, T> processor() { 93 return multiProcessor; 94 } 95 96 HttpResponse.BodyHandler<T> mainResponseHandler() { 97 return mainBodyHandler; 98 } 99 100 synchronized void setMainResponse(CompletableFuture<HttpResponse<T>> r) { 101 r.whenComplete((HttpResponse<T> response, Throwable t) -> { 102 if (t != null) 103 mainResponse.completeExceptionally(t); 104 else 105 mainResponse.complete(response); 106 }); 107 } 108 109 synchronized CompletableFuture<HttpResponse<T>> mainResponse() { 110 return mainResponse; 111 } 112 113 synchronized void addPush() { 114 numberOfPushes++; 115 remainingPushes++; 116 } 117 118 synchronized int numberOfPushes() { 119 return numberOfPushes; 120 } 121 // This is called when the main body response completes because it means 122 // no more PUSH_PROMISEs are possible 123 124 synchronized void noMorePushes(boolean noMore) { 125 noMorePushes = noMore; 126 checkIfCompleted(); 127 noMorePushesCF.complete(null); 128 } 129 130 CompletableFuture<Void> pushesCF() { 131 return noMorePushesCF; 132 } 133 134 synchronized boolean noMorePushes() { 135 return noMorePushes; 136 } 137 138 synchronized void pushCompleted() { 139 remainingPushes--; 140 checkIfCompleted(); 141 } 142 143 synchronized void checkIfCompleted() { 144 if (Log.trace()) { 145 Log.logTrace("PushGroup remainingPushes={0} error={1} noMorePushes={2}", 146 remainingPushes, 147 (error==null)?error:error.getClass().getSimpleName(), 148 noMorePushes); 149 } 150 if (remainingPushes == 0 && error == null && noMorePushes) { 151 if (Log.trace()) { 152 Log.logTrace("push completed"); 153 } 154 resultCF.complete(null); 155 } 156 } 157 158 synchronized void pushError(Throwable t) { 159 if (t == null) { 160 return; 161 } 162 this.error = t; 163 resultCF.completeExceptionally(t); 164 } 165 }