1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.security.AccessControlContext; 29 import java.util.Optional; 30 import java.util.concurrent.CompletableFuture; 31 import jdk.incubator.http.HttpResponse.BodyHandler; 32 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler; 33 import jdk.incubator.http.internal.common.MinimalFuture; 34 import jdk.incubator.http.internal.common.Log; 35 36 /** 37 * One PushGroup object is associated with the parent Stream of the pushed 38 * Streams. This keeps track of all common state associated with the pushes. 39 */ 40 class PushGroup<U,T> { 41 // the overall completion object, completed when all pushes are done. 42 final CompletableFuture<Void> resultCF; 43 final CompletableFuture<Void> noMorePushesCF; 44 45 volatile Throwable error; // any exception that occurred during pushes 46 47 // CF for main response 48 final CompletableFuture<HttpResponse<T>> mainResponse; 49 50 // user's subscriber object 51 final HttpResponse.MultiSubscriber<U, T> multiSubscriber; 52 53 final HttpResponse.BodyHandler<T> mainBodyHandler; 54 55 private final AccessControlContext acc; 56 57 int numberOfPushes; 58 int remainingPushes; 59 boolean noMorePushes = false; 60 61 PushGroup(HttpResponse.MultiSubscriber<U, T> multiSubscriber, 62 HttpRequestImpl req, 63 AccessControlContext acc) { 64 this(multiSubscriber, req, new MinimalFuture<>(), acc); 65 } 66 67 // Check mainBodyHandler before calling nested constructor. 68 private PushGroup(HttpResponse.MultiSubscriber<U, T> multiSubscriber, 69 HttpRequestImpl req, 70 CompletableFuture<HttpResponse<T>> mainResponse, 71 AccessControlContext acc) { 72 this(multiSubscriber, 73 mainResponse, 74 multiSubscriber.onRequest(req), 75 acc); 76 } 77 78 // This private constructor is called after all parameters have been checked. 79 private PushGroup(HttpResponse.MultiSubscriber<U, T> multiSubscriber, 80 CompletableFuture<HttpResponse<T>> mainResponse, 81 HttpResponse.BodyHandler<T> mainBodyHandler, 82 AccessControlContext acc) { 83 84 assert mainResponse != null; // A new instance is created above 85 assert mainBodyHandler != null; // should have been checked above 86 87 this.resultCF = new MinimalFuture<>(); 88 this.noMorePushesCF = new MinimalFuture<>(); 89 this.multiSubscriber = multiSubscriber; 90 this.mainResponse = mainResponse.thenApply(r -> { 91 multiSubscriber.onResponse(r); 92 return r; 93 }); 94 this.mainBodyHandler = mainBodyHandler; 95 if (acc != null) { 96 // Restricts the file publisher with the senders ACC, if any 97 if (mainBodyHandler instanceof UntrustedBodyHandler) 98 ((UntrustedBodyHandler)this.mainBodyHandler).setAccessControlContext(acc); 99 } 100 this.acc = acc; 101 } 102 103 CompletableFuture<Void> groupResult() { 104 return resultCF; 105 } 106 107 HttpResponse.MultiSubscriber<U, T> subscriber() { 108 return multiSubscriber; 109 } 110 111 Optional<BodyHandler<T>> handlerForPushRequest(HttpRequest ppRequest) { 112 Optional<BodyHandler<T>> bh = multiSubscriber.onPushPromise(ppRequest); 113 if (acc != null && bh.isPresent()) { 114 // Restricts the file publisher with the senders ACC, if any 115 BodyHandler<T> x = bh.get(); 116 if (x instanceof UntrustedBodyHandler) 117 ((UntrustedBodyHandler)x).setAccessControlContext(acc); 118 bh = Optional.of(x); 119 } 120 return bh; 121 } 122 123 HttpResponse.BodyHandler<T> mainResponseHandler() { 124 return mainBodyHandler; 125 } 126 127 synchronized void setMainResponse(CompletableFuture<HttpResponse<T>> r) { 128 r.whenComplete((HttpResponse<T> response, Throwable t) -> { 129 if (t != null) 130 mainResponse.completeExceptionally(t); 131 else 132 mainResponse.complete(response); 133 }); 134 } 135 136 synchronized void addPush() { 137 numberOfPushes++; 138 remainingPushes++; 139 } 140 141 // This is called when the main body response completes because it means 142 // no more PUSH_PROMISEs are possible 143 144 synchronized void noMorePushes(boolean noMore) { 145 noMorePushes = noMore; 146 checkIfCompleted(); 147 noMorePushesCF.complete(null); 148 } 149 150 CompletableFuture<Void> pushesCF() { 151 return noMorePushesCF; 152 } 153 154 synchronized boolean noMorePushes() { 155 return noMorePushes; 156 } 157 158 synchronized void pushCompleted() { 159 remainingPushes--; 160 checkIfCompleted(); 161 } 162 163 synchronized void checkIfCompleted() { 164 if (Log.trace()) { 165 Log.logTrace("PushGroup remainingPushes={0} error={1} noMorePushes={2}", 166 remainingPushes, 167 (error==null)?error:error.getClass().getSimpleName(), 168 noMorePushes); 169 } 170 if (remainingPushes == 0 && error == null && noMorePushes) { 171 if (Log.trace()) { 172 Log.logTrace("push completed"); 173 } 174 resultCF.complete(null); 175 } 176 } 177 178 synchronized void pushError(Throwable t) { 179 if (t == null) { 180 return; 181 } 182 this.error = t; 183 resultCF.completeExceptionally(t); 184 } 185 }