1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http.internal.common; 27 28 import java.io.Closeable; 29 import java.lang.System.Logger.Level; 30 import java.nio.ByteBuffer; 31 import java.util.List; 32 import java.util.Objects; 33 import java.util.concurrent.CompletableFuture; 34 import java.util.concurrent.ConcurrentLinkedQueue; 35 import java.util.concurrent.Flow; 36 import java.util.concurrent.Flow.Subscriber; 37 import java.util.concurrent.atomic.AtomicLong; 38 import java.util.concurrent.atomic.AtomicReference; 39 40 /** 41 * A wrapper for a Flow.Subscriber. This wrapper delivers data to the wrapped 42 * Subscriber which is supplied to the constructor. This class takes care of 43 * downstream flow control automatically and upstream flow control automatically 44 * by default. 45 * <p> 46 * Processing is done by implementing the {@link #incoming(List, boolean)} method 47 * which supplies buffers from upstream. This method (or any other method) 48 * can then call the outgoing() method to deliver processed buffers downstream. 49 * <p> 50 * Upstream error signals are delivered downstream directly. Cancellation from 51 * downstream is also propagated upstream immediately. 52 * <p> 53 * Each SubscriberWrapper has a {@link java.util.concurrent.CompletableFuture}{@code <Void>} 54 * which propagates completion/errors from downstream to upstream. Normal completion 55 * can only occur after onComplete() is called, but errors can be propagated upwards 56 * at any time. 57 */ 58 public abstract class SubscriberWrapper 59 implements FlowTube.TubeSubscriber, Closeable, Flow.Processor<List<ByteBuffer>,List<ByteBuffer>> 60 // TODO: SSLTube Subscriber will never change? Does this really need to be a TS? 61 { 62 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 63 final System.Logger logger = 64 Utils.getDebugLogger(this::dbgString, DEBUG); 65 66 public enum SchedulingAction { CONTINUE, RETURN, RESCHEDULE } 67 68 volatile Flow.Subscription upstreamSubscription; 69 final SubscriptionBase downstreamSubscription; 70 volatile boolean upstreamCompleted; 71 volatile boolean downstreamCompleted; 72 volatile boolean completionAcknowledged; 73 private volatile Subscriber<? super List<ByteBuffer>> downstreamSubscriber; 74 // processed byte to send to the downstream subscriber. 75 private final ConcurrentLinkedQueue<List<ByteBuffer>> outputQ; 76 private final CompletableFuture<Void> cf; 77 private final SequentialScheduler pushScheduler; 78 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 79 80 /** 81 * Wraps the given downstream subscriber. For each call to {@link 82 * #onNext(List<ByteBuffer>) } the given filter function is invoked 83 * and the list (if not empty) returned is passed downstream. 84 * 85 * A {@code CompletableFuture} is supplied which can be used to signal an 86 * error from downstream and which terminates the wrapper or which signals 87 * completion of downstream activity which can be propagated upstream. Error 88 * completion can be signaled at any time, but normal completion must not be 89 * signaled before onComplete() is called. 90 */ 91 public SubscriberWrapper() 92 { 93 this.outputQ = new ConcurrentLinkedQueue<>(); 94 this.cf = new MinimalFuture<>(); 95 this.pushScheduler = 96 SequentialScheduler.synchronizedScheduler(new DownstreamPusher()); 97 this.downstreamSubscription = new SubscriptionBase(pushScheduler, 98 this::downstreamCompletion); 99 } 100 101 @Override 102 public final void subscribe(Subscriber<? super List<ByteBuffer>> downstreamSubscriber) { 103 Objects.requireNonNull(downstreamSubscriber); 104 this.downstreamSubscriber = downstreamSubscriber; 105 } 106 107 /** 108 * Wraps the given downstream wrapper in this. For each call to 109 * {@link #onNext(List<ByteBuffer>) } the incoming() method is called. 110 * 111 * The {@code downstreamCF} from the downstream wrapper is linked to this 112 * wrappers notifier. 113 * 114 * @param downstreamWrapper downstream destination 115 */ 116 public SubscriberWrapper(Subscriber<? super List<ByteBuffer>> downstreamWrapper) 117 { 118 this(); 119 subscribe(downstreamWrapper); 120 } 121 122 /** 123 * Delivers data to be processed by this wrapper. Generated data to be sent 124 * downstream, must be provided to the {@link #outgoing(List, boolean)}} 125 * method. 126 * 127 * @param buffers a List of ByteBuffers. 128 * @param complete if true then no more data will be added to the list 129 */ 130 protected abstract void incoming(List<ByteBuffer> buffers, boolean complete); 131 132 /** 133 * This method is called to determine the window size to use at any time. The 134 * current window is supplied together with the current downstream queue size. 135 * {@code 0} should be returned if no change is 136 * required or a positive integer which will be added to the current window. 137 * The default implementation maintains a downstream queue size of no greater 138 * than 5. The method can be overridden if required. 139 * 140 * @param currentWindow the current upstream subscription window 141 * @param downstreamQsize the current number of buffers waiting to be sent 142 * downstream 143 * 144 * @return value to add to currentWindow 145 */ 146 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 147 if (downstreamQsize > 5) { 148 return 0; 149 } 150 151 if (currentWindow == 0) { 152 return 1; 153 } else { 154 return 0; 155 } 156 } 157 158 /** 159 * Override this if anything needs to be done after the upstream subscriber 160 * has subscribed 161 */ 162 protected void onSubscribe() { 163 } 164 165 /** 166 * Override this if anything needs to be done before checking for error 167 * and processing the input queue. 168 * @return 169 */ 170 protected SchedulingAction enterScheduling() { 171 return SchedulingAction.CONTINUE; 172 } 173 174 protected boolean signalScheduling() { 175 if (downstreamCompleted || pushScheduler.isStopped()) { 176 return false; 177 } 178 pushScheduler.runOrSchedule(); 179 return true; 180 } 181 182 /** 183 * Delivers buffers of data downstream. After incoming() 184 * has been called complete == true signifying completion of the upstream 185 * subscription, data may continue to be delivered, up to when outgoing() is 186 * called complete == true, after which, the downstream subscription is 187 * completed. 188 * 189 * It's an error to call outgoing() with complete = true if incoming() has 190 * not previously been called with it. 191 */ 192 public void outgoing(ByteBuffer buffer, boolean complete) { 193 Objects.requireNonNull(buffer); 194 assert !complete || !buffer.hasRemaining(); 195 outgoing(List.of(buffer), complete); 196 } 197 198 /** 199 * Sometime it might be necessary to complete the downstream subscriber 200 * before the upstream completes. For instance, when an SSL server 201 * sends a notify_close. In that case we should let the outgoing 202 * complete before upstream us completed. 203 * @return true, may be overridden by subclasses. 204 */ 205 public boolean closing() { 206 return false; 207 } 208 209 public void outgoing(List<ByteBuffer> buffers, boolean complete) { 210 Objects.requireNonNull(buffers); 211 if (complete) { 212 assert Utils.remaining(buffers) == 0; 213 logger.log(Level.DEBUG, "completionAcknowledged"); 214 if (!upstreamCompleted && !closing()) 215 throw new IllegalStateException("upstream not completed"); 216 completionAcknowledged = true; 217 } else { 218 logger.log(Level.DEBUG, () -> "Adding " 219 + Utils.remaining(buffers) 220 + " to outputQ queue"); 221 outputQ.add(buffers); 222 } 223 logger.log(Level.DEBUG, () -> "pushScheduler " 224 + (pushScheduler.isStopped() ? " is stopped!" : " is alive")); 225 pushScheduler.runOrSchedule(); 226 } 227 228 /** 229 * Returns a CompletableFuture which completes when this wrapper completes. 230 * Normal completion happens with the following steps (in order): 231 * 1. onComplete() is called 232 * 2. incoming() called with complete = true 233 * 3. outgoing() may continue to be called normally 234 * 4. outgoing called with complete = true 235 * 5. downstream subscriber is called onComplete() 236 * 237 * If the subscription is canceled or onComplete() is invoked the 238 * CompletableFuture completes exceptionally. Exceptional completion 239 * also occurs if downstreamCF completes exceptionally. 240 */ 241 public CompletableFuture<Void> completion() { 242 return cf; 243 } 244 245 /** 246 * Invoked whenever it 'may' be possible to push buffers downstream. 247 */ 248 class DownstreamPusher implements Runnable { 249 @Override 250 public void run() { 251 try { 252 run1(); 253 } catch (Throwable t) { 254 errorCommon(t); 255 } 256 } 257 258 private void run1() { 259 if (downstreamCompleted) { 260 logger.log(Level.DEBUG, "DownstreamPusher: downstream is already completed"); 261 return; 262 } 263 switch (enterScheduling()) { 264 case CONTINUE: break; 265 case RESCHEDULE: pushScheduler.runOrSchedule(); return; 266 case RETURN: return; 267 default: 268 errorRef.compareAndSet(null, 269 new InternalError("unknown scheduling command")); 270 break; 271 } 272 // If there was an error, send it downstream. 273 Throwable error = errorRef.get(); 274 if (error != null) { 275 synchronized(this) { 276 if (downstreamCompleted) return; 277 downstreamCompleted = true; 278 } 279 logger.log(Level.DEBUG, 280 () -> "DownstreamPusher: forwarding error downstream: " + error); 281 pushScheduler.stop(); 282 outputQ.clear(); 283 downstreamSubscriber.onError(error); 284 return; 285 } 286 287 // OK - no error, let's proceed 288 if (!outputQ.isEmpty()) { 289 logger.log(Level.DEBUG, 290 "DownstreamPusher: queue not empty, downstreamSubscription: %s", 291 downstreamSubscription); 292 } else { 293 logger.log(Level.DEBUG, 294 "DownstreamPusher: queue empty, downstreamSubscription: %s", 295 downstreamSubscription); 296 } 297 298 final boolean dbgOn = logger.isLoggable(Level.DEBUG); 299 while (!outputQ.isEmpty() && downstreamSubscription.tryDecrement()) { 300 List<ByteBuffer> b = outputQ.poll(); 301 if (dbgOn) logger.log(Level.DEBUG, 302 "DownstreamPusher: Pushing " 303 + Utils.remaining(b) 304 + " bytes downstream"); 305 downstreamSubscriber.onNext(b); 306 } 307 upstreamWindowUpdate(); 308 checkCompletion(); 309 } 310 } 311 312 AtomicLong upstreamWindow = new AtomicLong(0); 313 314 void upstreamWindowUpdate() { 315 long downstreamQueueSize = outputQ.size(); 316 long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize); 317 if (n > 0) 318 upstreamRequest(n); 319 } 320 321 @Override 322 public void onSubscribe(Flow.Subscription subscription) { 323 if (upstreamSubscription != null) { 324 throw new IllegalStateException("Single shot publisher"); 325 } 326 this.upstreamSubscription = subscription; 327 upstreamRequest(upstreamWindowUpdate(0, 0)); 328 logger.log(Level.DEBUG, 329 "calling downstreamSubscriber::onSubscribe on %s", 330 downstreamSubscriber); 331 downstreamSubscriber.onSubscribe(downstreamSubscription); 332 onSubscribe(); 333 } 334 335 @Override 336 public void onNext(List<ByteBuffer> item) { 337 logger.log(Level.DEBUG, "onNext"); 338 long prev = upstreamWindow.getAndDecrement(); 339 if (prev <= 0) 340 throw new IllegalStateException("invalid onNext call"); 341 incomingCaller(item, false); 342 upstreamWindowUpdate(); 343 } 344 345 private void upstreamRequest(long n) { 346 logger.log(Level.DEBUG, "requesting %d", n); 347 upstreamWindow.getAndAdd(n); 348 upstreamSubscription.request(n); 349 } 350 351 public long upstreamWindow() { 352 return upstreamWindow.get(); 353 } 354 355 @Override 356 public void onError(Throwable throwable) { 357 logger.log(Level.DEBUG, () -> "onError: " + throwable); 358 errorCommon(Objects.requireNonNull(throwable)); 359 } 360 361 protected boolean errorCommon(Throwable throwable) { 362 assert throwable != null; 363 if (errorRef.compareAndSet(null, throwable)) { 364 logger.log(Level.DEBUG, "error", throwable); 365 pushScheduler.runOrSchedule(); 366 upstreamCompleted = true; 367 cf.completeExceptionally(throwable); 368 return true; 369 } 370 return false; 371 } 372 373 @Override 374 public void close() { 375 errorCommon(new RuntimeException("wrapper closed")); 376 } 377 378 private void incomingCaller(List<ByteBuffer> l, boolean complete) { 379 try { 380 incoming(l, complete); 381 } catch(Throwable t) { 382 errorCommon(t); 383 } 384 } 385 386 @Override 387 public void onComplete() { 388 logger.log(Level.DEBUG, () -> "upstream completed: " + toString()); 389 upstreamCompleted = true; 390 incomingCaller(Utils.EMPTY_BB_LIST, true); 391 // pushScheduler will call checkCompletion() 392 pushScheduler.runOrSchedule(); 393 } 394 395 /** Adds the given data to the input queue. */ 396 public void addData(ByteBuffer l) { 397 if (upstreamSubscription == null) { 398 throw new IllegalStateException("can't add data before upstream subscriber subscribes"); 399 } 400 incomingCaller(List.of(l), false); 401 } 402 403 void checkCompletion() { 404 if (downstreamCompleted || !upstreamCompleted) { 405 return; 406 } 407 if (!outputQ.isEmpty()) { 408 return; 409 } 410 if (errorRef.get() != null) { 411 pushScheduler.runOrSchedule(); 412 return; 413 } 414 if (completionAcknowledged) { 415 downstreamSubscriber.onComplete(); 416 // Fix me subscriber.onComplete.run(); 417 downstreamCompleted = true; 418 cf.complete(null); 419 } 420 } 421 422 // called from the downstream Subscription.cancel() 423 void downstreamCompletion() { 424 upstreamSubscription.cancel(); 425 cf.complete(null); 426 } 427 428 public void resetDownstreamDemand() { 429 downstreamSubscription.demand.reset(); 430 } 431 432 @Override 433 public String toString() { 434 StringBuilder sb = new StringBuilder(); 435 sb.append("SubscriberWrapper:") 436 .append(" upstreamCompleted: ").append(Boolean.toString(upstreamCompleted)) 437 .append(" upstreamWindow: ").append(upstreamWindow.toString()) 438 .append(" downstreamCompleted: ").append(Boolean.toString(downstreamCompleted)) 439 .append(" completionAcknowledged: ").append(Boolean.toString(completionAcknowledged)) 440 .append(" outputQ size: ").append(Integer.toString(outputQ.size())) 441 //.append(" outputQ: ").append(outputQ.toString()) 442 .append(" cf: ").append(cf.toString()) 443 .append(" downstreamSubscription: ").append(downstreamSubscription.toString()); 444 445 return sb.toString(); 446 } 447 448 public String dbgString() { 449 return "SubscriberWrapper"; 450 } 451 }