1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.invoke.MethodHandles; 39 import java.lang.invoke.VarHandle; 40 import java.util.ArrayList; 41 import java.util.Arrays; 42 import java.util.List; 43 import java.util.concurrent.locks.LockSupport; 44 import java.util.function.BiConsumer; 45 import java.util.function.BiPredicate; 46 import java.util.function.Consumer; 47 import static java.util.concurrent.Flow.Publisher; 48 import static java.util.concurrent.Flow.Subscriber; 49 import static java.util.concurrent.Flow.Subscription; 50 51 /** 52 * A {@link Flow.Publisher} that asynchronously issues submitted 53 * (non-null) items to current subscribers until it is closed. Each 54 * current subscriber receives newly submitted items in the same order 55 * unless drops or exceptions are encountered. Using a 56 * SubmissionPublisher allows item generators to act as compliant <a 57 * href="http://www.reactive-streams.org/"> reactive-streams</a> 58 * Publishers relying on drop handling and/or blocking for flow 59 * control. 60 * 61 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its 62 * constructor for delivery to subscribers. The best choice of 63 * Executor depends on expected usage. If the generator(s) of 64 * submitted items run in separate threads, and the number of 65 * subscribers can be estimated, consider using a {@link 66 * Executors#newFixedThreadPool}. Otherwise consider using the 67 * default, normally the {@link ForkJoinPool#commonPool}. 68 * 69 * <p>Buffering allows producers and consumers to transiently operate 70 * at different rates. Each subscriber uses an independent buffer. 71 * Buffers are created upon first use and expanded as needed up to the 72 * given maximum. (The enforced capacity may be rounded up to the 73 * nearest power of two and/or bounded by the largest value supported 74 * by this implementation.) Invocations of {@link 75 * Flow.Subscription#request(long) request} do not directly result in 76 * buffer expansion, but risk saturation if unfilled requests exceed 77 * the maximum capacity. The default value of {@link 78 * Flow#defaultBufferSize()} may provide a useful starting point for 79 * choosing a capacity based on expected rates, resources, and usages. 80 * 81 * <p>A single SubmissionPublisher may be shared among multiple 82 * sources. Actions in a source thread prior to publishing an item or 83 * issuing a signal <a href="package-summary.html#MemoryVisibility"> 84 * <i>happen-before</i></a> actions subsequent to the corresponding 85 * access by each subscriber. But reported estimates of lag and demand 86 * are designed for use in monitoring, not for synchronization 87 * control, and may reflect stale or inaccurate views of progress. 88 * 89 * <p>Publication methods support different policies about what to do 90 * when buffers are saturated. Method {@link #submit(Object) submit} 91 * blocks until resources are available. This is simplest, but least 92 * responsive. The {@code offer} methods may drop items (either 93 * immediately or with bounded timeout), but provide an opportunity to 94 * interpose a handler and then retry. 95 * 96 * <p>If any Subscriber method throws an exception, its subscription 97 * is cancelled. If a handler is supplied as a constructor argument, 98 * it is invoked before cancellation upon an exception in method 99 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods 100 * {@link Flow.Subscriber#onSubscribe onSubscribe}, 101 * {@link Flow.Subscriber#onError(Throwable) onError} and 102 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or 103 * handled before cancellation. If the supplied Executor throws 104 * {@link RejectedExecutionException} (or any other RuntimeException 105 * or Error) when attempting to execute a task, or a drop handler 106 * throws an exception when processing a dropped item, then the 107 * exception is rethrown. In these cases, not all subscribers will 108 * have been issued the published item. It is usually good practice to 109 * {@link #closeExceptionally closeExceptionally} in these cases. 110 * 111 * <p>Method {@link #consume(Consumer)} simplifies support for a 112 * common case in which the only action of a subscriber is to request 113 * and process all items using a supplied function. 114 * 115 * <p>This class may also serve as a convenient base for subclasses 116 * that generate items, and use the methods in this class to publish 117 * them. For example here is a class that periodically publishes the 118 * items generated from a supplier. (In practice you might add methods 119 * to independently start and stop generation, to share Executors 120 * among publishers, and so on, or use a SubmissionPublisher as a 121 * component rather than a superclass.) 122 * 123 * <pre> {@code 124 * class PeriodicPublisher<T> extends SubmissionPublisher<T> { 125 * final ScheduledFuture<?> periodicTask; 126 * final ScheduledExecutorService scheduler; 127 * PeriodicPublisher(Executor executor, int maxBufferCapacity, 128 * Supplier<? extends T> supplier, 129 * long period, TimeUnit unit) { 130 * super(executor, maxBufferCapacity); 131 * scheduler = new ScheduledThreadPoolExecutor(1); 132 * periodicTask = scheduler.scheduleAtFixedRate( 133 * () -> submit(supplier.get()), 0, period, unit); 134 * } 135 * public void close() { 136 * periodicTask.cancel(false); 137 * scheduler.shutdown(); 138 * super.close(); 139 * } 140 * }}</pre> 141 * 142 * <p>Here is an example of a {@link Flow.Processor} implementation. 143 * It uses single-step requests to its publisher for simplicity of 144 * illustration. A more adaptive version could monitor flow using the 145 * lag estimate returned from {@code submit}, along with other utility 146 * methods. 147 * 148 * <pre> {@code 149 * class TransformProcessor<S,T> extends SubmissionPublisher<T> 150 * implements Flow.Processor<S,T> { 151 * final Function<? super S, ? extends T> function; 152 * Flow.Subscription subscription; 153 * TransformProcessor(Executor executor, int maxBufferCapacity, 154 * Function<? super S, ? extends T> function) { 155 * super(executor, maxBufferCapacity); 156 * this.function = function; 157 * } 158 * public void onSubscribe(Flow.Subscription subscription) { 159 * (this.subscription = subscription).request(1); 160 * } 161 * public void onNext(S item) { 162 * subscription.request(1); 163 * submit(function.apply(item)); 164 * } 165 * public void onError(Throwable ex) { closeExceptionally(ex); } 166 * public void onComplete() { close(); } 167 * }}</pre> 168 * 169 * @param <T> the published item type 170 * @author Doug Lea 171 * @since 9 172 */ 173 public class SubmissionPublisher<T> implements Publisher<T>, 174 AutoCloseable { 175 /* 176 * Most mechanics are handled by BufferedSubscription. This class 177 * mainly tracks subscribers and ensures sequentiality, by using 178 * built-in synchronization locks across public methods. Using 179 * built-in locks works well in the most typical case in which 180 * only one thread submits items. We extend this idea in 181 * submission methods by detecting single-ownership to reduce 182 * producer-consumer synchronization strength. 183 */ 184 185 /** The largest possible power of two array size. */ 186 static final int BUFFER_CAPACITY_LIMIT = 1 << 30; 187 188 /** 189 * Initial buffer capacity used when maxBufferCapacity is 190 * greater. Must be a power of two. 191 */ 192 static final int INITIAL_CAPACITY = 32; 193 194 /** Round capacity to power of 2, at most limit. */ 195 static final int roundCapacity(int cap) { 196 int n = cap - 1; 197 n |= n >>> 1; 198 n |= n >>> 2; 199 n |= n >>> 4; 200 n |= n >>> 8; 201 n |= n >>> 16; 202 return (n <= 0) ? 1 : // at least 1 203 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1; 204 } 205 206 // default Executor setup; nearly the same as CompletableFuture 207 208 /** 209 * Default executor -- ForkJoinPool.commonPool() unless it cannot 210 * support parallelism. 211 */ 212 private static final Executor ASYNC_POOL = 213 (ForkJoinPool.getCommonPoolParallelism() > 1) ? 214 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 215 216 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 217 private static final class ThreadPerTaskExecutor implements Executor { 218 ThreadPerTaskExecutor() {} // prevent access constructor creation 219 public void execute(Runnable r) { new Thread(r).start(); } 220 } 221 222 /** 223 * Clients (BufferedSubscriptions) are maintained in a linked list 224 * (via their "next" fields). This works well for publish loops. 225 * It requires O(n) traversal to check for duplicate subscribers, 226 * but we expect that subscribing is much less common than 227 * publishing. Unsubscribing occurs only during traversal loops, 228 * when BufferedSubscription methods return negative values 229 * signifying that they have been closed. To reduce 230 * head-of-line blocking, submit and offer methods first call 231 * BufferedSubscription.offer on each subscriber, and place 232 * saturated ones in retries list (using nextRetry field), and 233 * retry, possibly blocking or dropping. 234 */ 235 BufferedSubscription<T> clients; 236 237 /** Run status, updated only within locks */ 238 volatile boolean closed; 239 /** Set true on first call to subscribe, to initialize possible owner */ 240 boolean subscribed; 241 /** The first caller thread to subscribe, or null if thread ever changed */ 242 Thread owner; 243 /** If non-null, the exception in closeExceptionally */ 244 volatile Throwable closedException; 245 246 // Parameters for constructing BufferedSubscriptions 247 final Executor executor; 248 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler; 249 final int maxBufferCapacity; 250 251 /** 252 * Creates a new SubmissionPublisher using the given Executor for 253 * async delivery to subscribers, with the given maximum buffer size 254 * for each subscriber, and, if non-null, the given handler invoked 255 * when any Subscriber throws an exception in method {@link 256 * Flow.Subscriber#onNext(Object) onNext}. 257 * 258 * @param executor the executor to use for async delivery, 259 * supporting creation of at least one independent thread 260 * @param maxBufferCapacity the maximum capacity for each 261 * subscriber's buffer (the enforced capacity may be rounded up to 262 * the nearest power of two and/or bounded by the largest value 263 * supported by this implementation; method {@link #getMaxBufferCapacity} 264 * returns the actual value) 265 * @param handler if non-null, procedure to invoke upon exception 266 * thrown in method {@code onNext} 267 * @throws NullPointerException if executor is null 268 * @throws IllegalArgumentException if maxBufferCapacity not 269 * positive 270 */ 271 public SubmissionPublisher(Executor executor, int maxBufferCapacity, 272 BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) { 273 if (executor == null) 274 throw new NullPointerException(); 275 if (maxBufferCapacity <= 0) 276 throw new IllegalArgumentException("capacity must be positive"); 277 this.executor = executor; 278 this.onNextHandler = handler; 279 this.maxBufferCapacity = roundCapacity(maxBufferCapacity); 280 } 281 282 /** 283 * Creates a new SubmissionPublisher using the given Executor for 284 * async delivery to subscribers, with the given maximum buffer size 285 * for each subscriber, and no handler for Subscriber exceptions in 286 * method {@link Flow.Subscriber#onNext(Object) onNext}. 287 * 288 * @param executor the executor to use for async delivery, 289 * supporting creation of at least one independent thread 290 * @param maxBufferCapacity the maximum capacity for each 291 * subscriber's buffer (the enforced capacity may be rounded up to 292 * the nearest power of two and/or bounded by the largest value 293 * supported by this implementation; method {@link #getMaxBufferCapacity} 294 * returns the actual value) 295 * @throws NullPointerException if executor is null 296 * @throws IllegalArgumentException if maxBufferCapacity not 297 * positive 298 */ 299 public SubmissionPublisher(Executor executor, int maxBufferCapacity) { 300 this(executor, maxBufferCapacity, null); 301 } 302 303 /** 304 * Creates a new SubmissionPublisher using the {@link 305 * ForkJoinPool#commonPool()} for async delivery to subscribers 306 * (unless it does not support a parallelism level of at least two, 307 * in which case, a new Thread is created to run each task), with 308 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no 309 * handler for Subscriber exceptions in method {@link 310 * Flow.Subscriber#onNext(Object) onNext}. 311 */ 312 public SubmissionPublisher() { 313 this(ASYNC_POOL, Flow.defaultBufferSize(), null); 314 } 315 316 /** 317 * Adds the given Subscriber unless already subscribed. If already 318 * subscribed, the Subscriber's {@link 319 * Flow.Subscriber#onError(Throwable) onError} method is invoked on 320 * the existing subscription with an {@link IllegalStateException}. 321 * Otherwise, upon success, the Subscriber's {@link 322 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked 323 * asynchronously with a new {@link Flow.Subscription}. If {@link 324 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the 325 * subscription is cancelled. Otherwise, if this SubmissionPublisher 326 * was closed exceptionally, then the subscriber's {@link 327 * Flow.Subscriber#onError onError} method is invoked with the 328 * corresponding exception, or if closed without exception, the 329 * subscriber's {@link Flow.Subscriber#onComplete() onComplete} 330 * method is invoked. Subscribers may enable receiving items by 331 * invoking the {@link Flow.Subscription#request(long) request} 332 * method of the new Subscription, and may unsubscribe by invoking 333 * its {@link Flow.Subscription#cancel() cancel} method. 334 * 335 * @param subscriber the subscriber 336 * @throws NullPointerException if subscriber is null 337 */ 338 public void subscribe(Subscriber<? super T> subscriber) { 339 if (subscriber == null) throw new NullPointerException(); 340 int max = maxBufferCapacity; // allocate initial array 341 Object[] array = new Object[max < INITIAL_CAPACITY ? 342 max : INITIAL_CAPACITY]; 343 BufferedSubscription<T> subscription = 344 new BufferedSubscription<T>(subscriber, executor, onNextHandler, 345 array, max); 346 synchronized (this) { 347 if (!subscribed) { 348 subscribed = true; 349 owner = Thread.currentThread(); 350 } 351 for (BufferedSubscription<T> b = clients, pred = null;;) { 352 if (b == null) { 353 Throwable ex; 354 subscription.onSubscribe(); 355 if ((ex = closedException) != null) 356 subscription.onError(ex); 357 else if (closed) 358 subscription.onComplete(); 359 else if (pred == null) 360 clients = subscription; 361 else 362 pred.next = subscription; 363 break; 364 } 365 BufferedSubscription<T> next = b.next; 366 if (b.isClosed()) { // remove 367 b.next = null; // detach 368 if (pred == null) 369 clients = next; 370 else 371 pred.next = next; 372 } 373 else if (subscriber.equals(b.subscriber)) { 374 b.onError(new IllegalStateException("Duplicate subscribe")); 375 break; 376 } 377 else 378 pred = b; 379 b = next; 380 } 381 } 382 } 383 384 /** 385 * Common implementation for all three forms of submit and offer. 386 * Acts as submit if nanos == Long.MAX_VALUE, else offer. 387 */ 388 private int doOffer(T item, long nanos, 389 BiPredicate<Subscriber<? super T>, ? super T> onDrop) { 390 if (item == null) throw new NullPointerException(); 391 int lag = 0; 392 boolean complete, unowned; 393 synchronized (this) { 394 Thread t = Thread.currentThread(), o; 395 BufferedSubscription<T> b = clients; 396 if ((unowned = ((o = owner) != t)) && o != null) 397 owner = null; // disable bias 398 if (b == null) 399 complete = closed; 400 else { 401 complete = false; 402 boolean cleanMe = false; 403 BufferedSubscription<T> retries = null, rtail = null, next; 404 do { 405 next = b.next; 406 int stat = b.offer(item, unowned); 407 if (stat == 0) { // saturated; add to retry list 408 b.nextRetry = null; // avoid garbage on exceptions 409 if (rtail == null) 410 retries = b; 411 else 412 rtail.nextRetry = b; 413 rtail = b; 414 } 415 else if (stat < 0) // closed 416 cleanMe = true; // remove later 417 else if (stat > lag) 418 lag = stat; 419 } while ((b = next) != null); 420 421 if (retries != null || cleanMe) 422 lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe); 423 } 424 } 425 if (complete) 426 throw new IllegalStateException("Closed"); 427 else 428 return lag; 429 } 430 431 /** 432 * Helps, (timed) waits for, and/or drops buffers on list; returns 433 * lag or negative drops (for use in offer). 434 */ 435 private int retryOffer(T item, long nanos, 436 BiPredicate<Subscriber<? super T>, ? super T> onDrop, 437 BufferedSubscription<T> retries, int lag, 438 boolean cleanMe) { 439 for (BufferedSubscription<T> r = retries; r != null;) { 440 BufferedSubscription<T> nextRetry = r.nextRetry; 441 r.nextRetry = null; 442 if (nanos > 0L) 443 r.awaitSpace(nanos); 444 int stat = r.retryOffer(item); 445 if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item)) 446 stat = r.retryOffer(item); 447 if (stat == 0) 448 lag = (lag >= 0) ? -1 : lag - 1; 449 else if (stat < 0) 450 cleanMe = true; 451 else if (lag >= 0 && stat > lag) 452 lag = stat; 453 r = nextRetry; 454 } 455 if (cleanMe) 456 cleanAndCount(); 457 return lag; 458 } 459 460 /** 461 * Returns current list count after removing closed subscribers. 462 * Call only while holding lock. Used mainly by retryOffer for 463 * cleanup. 464 */ 465 private int cleanAndCount() { 466 int count = 0; 467 BufferedSubscription<T> pred = null, next; 468 for (BufferedSubscription<T> b = clients; b != null; b = next) { 469 next = b.next; 470 if (b.isClosed()) { 471 b.next = null; 472 if (pred == null) 473 clients = next; 474 else 475 pred.next = next; 476 } 477 else { 478 pred = b; 479 ++count; 480 } 481 } 482 return count; 483 } 484 485 /** 486 * Publishes the given item to each current subscriber by 487 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object) 488 * onNext} method, blocking uninterruptibly while resources for any 489 * subscriber are unavailable. This method returns an estimate of 490 * the maximum lag (number of items submitted but not yet consumed) 491 * among all current subscribers. This value is at least one 492 * (accounting for this submitted item) if there are any 493 * subscribers, else zero. 494 * 495 * <p>If the Executor for this publisher throws a 496 * RejectedExecutionException (or any other RuntimeException or 497 * Error) when attempting to asynchronously notify subscribers, 498 * then this exception is rethrown, in which case not all 499 * subscribers will have been issued this item. 500 * 501 * @param item the (non-null) item to publish 502 * @return the estimated maximum lag among subscribers 503 * @throws IllegalStateException if closed 504 * @throws NullPointerException if item is null 505 * @throws RejectedExecutionException if thrown by Executor 506 */ 507 public int submit(T item) { 508 return doOffer(item, Long.MAX_VALUE, null); 509 } 510 511 /** 512 * Publishes the given item, if possible, to each current subscriber 513 * by asynchronously invoking its {@link 514 * Flow.Subscriber#onNext(Object) onNext} method. The item may be 515 * dropped by one or more subscribers if resource limits are 516 * exceeded, in which case the given handler (if non-null) is 517 * invoked, and if it returns true, retried once. Other calls to 518 * methods in this class by other threads are blocked while the 519 * handler is invoked. Unless recovery is assured, options are 520 * usually limited to logging the error and/or issuing an {@link 521 * Flow.Subscriber#onError(Throwable) onError} signal to the 522 * subscriber. 523 * 524 * <p>This method returns a status indicator: If negative, it 525 * represents the (negative) number of drops (failed attempts to 526 * issue the item to a subscriber). Otherwise it is an estimate of 527 * the maximum lag (number of items submitted but not yet 528 * consumed) among all current subscribers. This value is at least 529 * one (accounting for this submitted item) if there are any 530 * subscribers, else zero. 531 * 532 * <p>If the Executor for this publisher throws a 533 * RejectedExecutionException (or any other RuntimeException or 534 * Error) when attempting to asynchronously notify subscribers, or 535 * the drop handler throws an exception when processing a dropped 536 * item, then this exception is rethrown. 537 * 538 * @param item the (non-null) item to publish 539 * @param onDrop if non-null, the handler invoked upon a drop to a 540 * subscriber, with arguments of the subscriber and item; if it 541 * returns true, an offer is re-attempted (once) 542 * @return if negative, the (negative) number of drops; otherwise 543 * an estimate of maximum lag 544 * @throws IllegalStateException if closed 545 * @throws NullPointerException if item is null 546 * @throws RejectedExecutionException if thrown by Executor 547 */ 548 public int offer(T item, 549 BiPredicate<Subscriber<? super T>, ? super T> onDrop) { 550 return doOffer(item, 0L, onDrop); 551 } 552 553 /** 554 * Publishes the given item, if possible, to each current subscriber 555 * by asynchronously invoking its {@link 556 * Flow.Subscriber#onNext(Object) onNext} method, blocking while 557 * resources for any subscription are unavailable, up to the 558 * specified timeout or until the caller thread is interrupted, at 559 * which point the given handler (if non-null) is invoked, and if it 560 * returns true, retried once. (The drop handler may distinguish 561 * timeouts from interrupts by checking whether the current thread 562 * is interrupted.) Other calls to methods in this class by other 563 * threads are blocked while the handler is invoked. Unless 564 * recovery is assured, options are usually limited to logging the 565 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable) 566 * onError} signal to the subscriber. 567 * 568 * <p>This method returns a status indicator: If negative, it 569 * represents the (negative) number of drops (failed attempts to 570 * issue the item to a subscriber). Otherwise it is an estimate of 571 * the maximum lag (number of items submitted but not yet 572 * consumed) among all current subscribers. This value is at least 573 * one (accounting for this submitted item) if there are any 574 * subscribers, else zero. 575 * 576 * <p>If the Executor for this publisher throws a 577 * RejectedExecutionException (or any other RuntimeException or 578 * Error) when attempting to asynchronously notify subscribers, or 579 * the drop handler throws an exception when processing a dropped 580 * item, then this exception is rethrown. 581 * 582 * @param item the (non-null) item to publish 583 * @param timeout how long to wait for resources for any subscriber 584 * before giving up, in units of {@code unit} 585 * @param unit a {@code TimeUnit} determining how to interpret the 586 * {@code timeout} parameter 587 * @param onDrop if non-null, the handler invoked upon a drop to a 588 * subscriber, with arguments of the subscriber and item; if it 589 * returns true, an offer is re-attempted (once) 590 * @return if negative, the (negative) number of drops; otherwise 591 * an estimate of maximum lag 592 * @throws IllegalStateException if closed 593 * @throws NullPointerException if item is null 594 * @throws RejectedExecutionException if thrown by Executor 595 */ 596 public int offer(T item, long timeout, TimeUnit unit, 597 BiPredicate<Subscriber<? super T>, ? super T> onDrop) { 598 long nanos = unit.toNanos(timeout); 599 // distinguishes from untimed (only wrt interrupt policy) 600 if (nanos == Long.MAX_VALUE) --nanos; 601 return doOffer(item, nanos, onDrop); 602 } 603 604 /** 605 * Unless already closed, issues {@link 606 * Flow.Subscriber#onComplete() onComplete} signals to current 607 * subscribers, and disallows subsequent attempts to publish. 608 * Upon return, this method does <em>NOT</em> guarantee that all 609 * subscribers have yet completed. 610 */ 611 public void close() { 612 if (!closed) { 613 BufferedSubscription<T> b; 614 synchronized (this) { 615 // no need to re-check closed here 616 b = clients; 617 clients = null; 618 owner = null; 619 closed = true; 620 } 621 while (b != null) { 622 BufferedSubscription<T> next = b.next; 623 b.next = null; 624 b.onComplete(); 625 b = next; 626 } 627 } 628 } 629 630 /** 631 * Unless already closed, issues {@link 632 * Flow.Subscriber#onError(Throwable) onError} signals to current 633 * subscribers with the given error, and disallows subsequent 634 * attempts to publish. Future subscribers also receive the given 635 * error. Upon return, this method does <em>NOT</em> guarantee 636 * that all subscribers have yet completed. 637 * 638 * @param error the {@code onError} argument sent to subscribers 639 * @throws NullPointerException if error is null 640 */ 641 public void closeExceptionally(Throwable error) { 642 if (error == null) 643 throw new NullPointerException(); 644 if (!closed) { 645 BufferedSubscription<T> b; 646 synchronized (this) { 647 b = clients; 648 if (!closed) { // don't clobber racing close 649 closedException = error; 650 clients = null; 651 owner = null; 652 closed = true; 653 } 654 } 655 while (b != null) { 656 BufferedSubscription<T> next = b.next; 657 b.next = null; 658 b.onError(error); 659 b = next; 660 } 661 } 662 } 663 664 /** 665 * Returns true if this publisher is not accepting submissions. 666 * 667 * @return true if closed 668 */ 669 public boolean isClosed() { 670 return closed; 671 } 672 673 /** 674 * Returns the exception associated with {@link 675 * #closeExceptionally(Throwable) closeExceptionally}, or null if 676 * not closed or if closed normally. 677 * 678 * @return the exception, or null if none 679 */ 680 public Throwable getClosedException() { 681 return closedException; 682 } 683 684 /** 685 * Returns true if this publisher has any subscribers. 686 * 687 * @return true if this publisher has any subscribers 688 */ 689 public boolean hasSubscribers() { 690 boolean nonEmpty = false; 691 synchronized (this) { 692 for (BufferedSubscription<T> b = clients; b != null;) { 693 BufferedSubscription<T> next = b.next; 694 if (b.isClosed()) { 695 b.next = null; 696 b = clients = next; 697 } 698 else { 699 nonEmpty = true; 700 break; 701 } 702 } 703 } 704 return nonEmpty; 705 } 706 707 /** 708 * Returns the number of current subscribers. 709 * 710 * @return the number of current subscribers 711 */ 712 public int getNumberOfSubscribers() { 713 synchronized (this) { 714 return cleanAndCount(); 715 } 716 } 717 718 /** 719 * Returns the Executor used for asynchronous delivery. 720 * 721 * @return the Executor used for asynchronous delivery 722 */ 723 public Executor getExecutor() { 724 return executor; 725 } 726 727 /** 728 * Returns the maximum per-subscriber buffer capacity. 729 * 730 * @return the maximum per-subscriber buffer capacity 731 */ 732 public int getMaxBufferCapacity() { 733 return maxBufferCapacity; 734 } 735 736 /** 737 * Returns a list of current subscribers for monitoring and 738 * tracking purposes, not for invoking {@link Flow.Subscriber} 739 * methods on the subscribers. 740 * 741 * @return list of current subscribers 742 */ 743 public List<Subscriber<? super T>> getSubscribers() { 744 ArrayList<Subscriber<? super T>> subs = new ArrayList<>(); 745 synchronized (this) { 746 BufferedSubscription<T> pred = null, next; 747 for (BufferedSubscription<T> b = clients; b != null; b = next) { 748 next = b.next; 749 if (b.isClosed()) { 750 b.next = null; 751 if (pred == null) 752 clients = next; 753 else 754 pred.next = next; 755 } 756 else { 757 subs.add(b.subscriber); 758 pred = b; 759 } 760 } 761 } 762 return subs; 763 } 764 765 /** 766 * Returns true if the given Subscriber is currently subscribed. 767 * 768 * @param subscriber the subscriber 769 * @return true if currently subscribed 770 * @throws NullPointerException if subscriber is null 771 */ 772 public boolean isSubscribed(Subscriber<? super T> subscriber) { 773 if (subscriber == null) throw new NullPointerException(); 774 if (!closed) { 775 synchronized (this) { 776 BufferedSubscription<T> pred = null, next; 777 for (BufferedSubscription<T> b = clients; b != null; b = next) { 778 next = b.next; 779 if (b.isClosed()) { 780 b.next = null; 781 if (pred == null) 782 clients = next; 783 else 784 pred.next = next; 785 } 786 else if (subscriber.equals(b.subscriber)) 787 return true; 788 else 789 pred = b; 790 } 791 } 792 } 793 return false; 794 } 795 796 /** 797 * Returns an estimate of the minimum number of items requested 798 * (via {@link Flow.Subscription#request(long) request}) but not 799 * yet produced, among all current subscribers. 800 * 801 * @return the estimate, or zero if no subscribers 802 */ 803 public long estimateMinimumDemand() { 804 long min = Long.MAX_VALUE; 805 boolean nonEmpty = false; 806 synchronized (this) { 807 BufferedSubscription<T> pred = null, next; 808 for (BufferedSubscription<T> b = clients; b != null; b = next) { 809 int n; long d; 810 next = b.next; 811 if ((n = b.estimateLag()) < 0) { 812 b.next = null; 813 if (pred == null) 814 clients = next; 815 else 816 pred.next = next; 817 } 818 else { 819 if ((d = b.demand - n) < min) 820 min = d; 821 nonEmpty = true; 822 pred = b; 823 } 824 } 825 } 826 return nonEmpty ? min : 0; 827 } 828 829 /** 830 * Returns an estimate of the maximum number of items produced but 831 * not yet consumed among all current subscribers. 832 * 833 * @return the estimate 834 */ 835 public int estimateMaximumLag() { 836 int max = 0; 837 synchronized (this) { 838 BufferedSubscription<T> pred = null, next; 839 for (BufferedSubscription<T> b = clients; b != null; b = next) { 840 int n; 841 next = b.next; 842 if ((n = b.estimateLag()) < 0) { 843 b.next = null; 844 if (pred == null) 845 clients = next; 846 else 847 pred.next = next; 848 } 849 else { 850 if (n > max) 851 max = n; 852 pred = b; 853 } 854 } 855 } 856 return max; 857 } 858 859 /** 860 * Processes all published items using the given Consumer function. 861 * Returns a CompletableFuture that is completed normally when this 862 * publisher signals {@link Flow.Subscriber#onComplete() 863 * onComplete}, or completed exceptionally upon any error, or an 864 * exception is thrown by the Consumer, or the returned 865 * CompletableFuture is cancelled, in which case no further items 866 * are processed. 867 * 868 * @param consumer the function applied to each onNext item 869 * @return a CompletableFuture that is completed normally 870 * when the publisher signals onComplete, and exceptionally 871 * upon any error or cancellation 872 * @throws NullPointerException if consumer is null 873 */ 874 public CompletableFuture<Void> consume(Consumer<? super T> consumer) { 875 if (consumer == null) 876 throw new NullPointerException(); 877 CompletableFuture<Void> status = new CompletableFuture<>(); 878 subscribe(new ConsumerSubscriber<T>(status, consumer)); 879 return status; 880 } 881 882 /** Subscriber for method consume */ 883 static final class ConsumerSubscriber<T> implements Subscriber<T> { 884 final CompletableFuture<Void> status; 885 final Consumer<? super T> consumer; 886 Subscription subscription; 887 ConsumerSubscriber(CompletableFuture<Void> status, 888 Consumer<? super T> consumer) { 889 this.status = status; this.consumer = consumer; 890 } 891 public final void onSubscribe(Subscription subscription) { 892 this.subscription = subscription; 893 status.whenComplete((v, e) -> subscription.cancel()); 894 if (!status.isDone()) 895 subscription.request(Long.MAX_VALUE); 896 } 897 public final void onError(Throwable ex) { 898 status.completeExceptionally(ex); 899 } 900 public final void onComplete() { 901 status.complete(null); 902 } 903 public final void onNext(T item) { 904 try { 905 consumer.accept(item); 906 } catch (Throwable ex) { 907 subscription.cancel(); 908 status.completeExceptionally(ex); 909 } 910 } 911 } 912 913 /** 914 * A task for consuming buffer items and signals, created and 915 * executed whenever they become available. A task consumes as 916 * many items/signals as possible before terminating, at which 917 * point another task is created when needed. The dual Runnable 918 * and ForkJoinTask declaration saves overhead when executed by 919 * ForkJoinPools, without impacting other kinds of Executors. 920 */ 921 @SuppressWarnings("serial") 922 static final class ConsumerTask<T> extends ForkJoinTask<Void> 923 implements Runnable, CompletableFuture.AsynchronousCompletionTask { 924 final BufferedSubscription<T> consumer; 925 ConsumerTask(BufferedSubscription<T> consumer) { 926 this.consumer = consumer; 927 } 928 public final Void getRawResult() { return null; } 929 public final void setRawResult(Void v) {} 930 public final boolean exec() { consumer.consume(); return false; } 931 public final void run() { consumer.consume(); } 932 } 933 934 /** 935 * A resizable array-based ring buffer with integrated control to 936 * start a consumer task whenever items are available. The buffer 937 * algorithm is specialized for the case of at most one concurrent 938 * producer and consumer, and power of two buffer sizes. It relies 939 * primarily on atomic operations (CAS or getAndSet) at the next 940 * array slot to put or take an element, at the "tail" and "head" 941 * indices written only by the producer and consumer respectively. 942 * 943 * We ensure internally that there is at most one active consumer 944 * task at any given time. The publisher guarantees a single 945 * producer via its lock. Sync among producers and consumers 946 * relies on volatile fields "ctl", "demand", and "waiting" (along 947 * with element access). Other variables are accessed in plain 948 * mode, relying on outer ordering and exclusion, and/or enclosing 949 * them within other volatile accesses. Some atomic operations are 950 * avoided by tracking single threaded ownership by producers (in 951 * the style of biased locking). 952 * 953 * Execution control and protocol state are managed using field 954 * "ctl". Methods to subscribe, close, request, and cancel set 955 * ctl bits (mostly using atomic boolean method getAndBitwiseOr), 956 * and ensure that a task is running. (The corresponding consumer 957 * side actions are in method consume.) To avoid starting a new 958 * task on each action, ctl also includes a keep-alive bit 959 * (ACTIVE) that is refreshed if needed on producer actions. 960 * (Maintaining agreement about keep-alives requires most atomic 961 * updates to be full SC/Volatile strength, which is still much 962 * cheaper than using one task per item.) Error signals 963 * additionally null out items and/or fields to reduce termination 964 * latency. The cancel() method is supported by treating as ERROR 965 * but suppressing onError signal. 966 * 967 * Support for blocking also exploits the fact that there is only 968 * one possible waiter. ManagedBlocker-compatible control fields 969 * are placed in this class itself rather than in wait-nodes. 970 * Blocking control relies on the "waiting" and "waiter" 971 * fields. Producers set them before trying to block. Signalling 972 * unparks and clears fields. If the producer and/or consumer are 973 * using a ForkJoinPool, the producer attempts to help run 974 * consumer tasks via ForkJoinPool.helpAsyncBlocker before 975 * blocking. 976 * 977 * Usages of this class may encounter any of several forms of 978 * memory contention. We try to ameliorate across them without 979 * unduly impacting footprints in low-contention usages where it 980 * isn't needed. Buffer arrays start out small and grow only as 981 * needed. The class uses @Contended and heuristic field 982 * declaration ordering to reduce false-sharing memory contention 983 * across instances of BufferedSubscription (as in, multiple 984 * subscribers per publisher). We additionally segregate some 985 * fields that would otherwise nearly always encounter cache line 986 * contention among producers and consumers. To reduce contention 987 * across time (vs space), consumers only periodically update 988 * other fields (see method takeItems), at the expense of possibly 989 * staler reporting of lags and demand (bounded at 12.5% == 1/8 990 * capacity) and possibly more atomic operations. 991 * 992 * Other forms of imbalance and slowdowns can occur during startup 993 * when producer and consumer methods are compiled and/or memory 994 * is allocated at different rates. This is ameliorated by 995 * artificially subdividing some consumer methods, including 996 * isolation of all subscriber callbacks. This code also includes 997 * typical power-of-two array screening idioms to avoid compilers 998 * generating traps, along with the usual SSA-based inline 999 * assignment coding style. Also, all methods and fields have 1000 * default visibility to simplify usage by callers. 1001 */ 1002 @SuppressWarnings("serial") 1003 @jdk.internal.vm.annotation.Contended 1004 static final class BufferedSubscription<T> 1005 implements Subscription, ForkJoinPool.ManagedBlocker { 1006 long timeout; // Long.MAX_VALUE if untimed wait 1007 int head; // next position to take 1008 int tail; // next position to put 1009 final int maxCapacity; // max buffer size 1010 volatile int ctl; // atomic run state flags 1011 Object[] array; // buffer 1012 final Subscriber<? super T> subscriber; 1013 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler; 1014 Executor executor; // null on error 1015 Thread waiter; // blocked producer thread 1016 Throwable pendingError; // holds until onError issued 1017 BufferedSubscription<T> next; // used only by publisher 1018 BufferedSubscription<T> nextRetry; // used only by publisher 1019 1020 @jdk.internal.vm.annotation.Contended("c") // segregate 1021 volatile long demand; // # unfilled requests 1022 @jdk.internal.vm.annotation.Contended("c") 1023 volatile int waiting; // nonzero if producer blocked 1024 1025 // ctl bit values 1026 static final int CLOSED = 0x01; // if set, other bits ignored 1027 static final int ACTIVE = 0x02; // keep-alive for consumer task 1028 static final int REQS = 0x04; // (possibly) nonzero demand 1029 static final int ERROR = 0x08; // issues onError when noticed 1030 static final int COMPLETE = 0x10; // issues onComplete when done 1031 static final int RUN = 0x20; // task is or will be running 1032 static final int OPEN = 0x40; // true after subscribe 1033 1034 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel 1035 1036 BufferedSubscription(Subscriber<? super T> subscriber, 1037 Executor executor, 1038 BiConsumer<? super Subscriber<? super T>, 1039 ? super Throwable> onNextHandler, 1040 Object[] array, 1041 int maxBufferCapacity) { 1042 this.subscriber = subscriber; 1043 this.executor = executor; 1044 this.onNextHandler = onNextHandler; 1045 this.array = array; 1046 this.maxCapacity = maxBufferCapacity; 1047 } 1048 1049 // Wrappers for some VarHandle methods 1050 1051 final boolean weakCasCtl(int cmp, int val) { 1052 return CTL.weakCompareAndSet(this, cmp, val); 1053 } 1054 1055 final int getAndBitwiseOrCtl(int bits) { 1056 return (int)CTL.getAndBitwiseOr(this, bits); 1057 } 1058 1059 final long subtractDemand(int k) { 1060 long n = (long)(-k); 1061 return n + (long)DEMAND.getAndAdd(this, n); 1062 } 1063 1064 final boolean casDemand(long cmp, long val) { 1065 return DEMAND.compareAndSet(this, cmp, val); 1066 } 1067 1068 // Utilities used by SubmissionPublisher 1069 1070 /** 1071 * Returns true if closed (consumer task may still be running). 1072 */ 1073 final boolean isClosed() { 1074 return (ctl & CLOSED) != 0; 1075 } 1076 1077 /** 1078 * Returns estimated number of buffered items, or negative if 1079 * closed. 1080 */ 1081 final int estimateLag() { 1082 int c = ctl, n = tail - head; 1083 return ((c & CLOSED) != 0) ? -1 : (n < 0) ? 0 : n; 1084 } 1085 1086 // Methods for submitting items 1087 1088 /** 1089 * Tries to add item and start consumer task if necessary. 1090 * @return negative if closed, 0 if saturated, else estimated lag 1091 */ 1092 final int offer(T item, boolean unowned) { 1093 Object[] a; 1094 int stat = 0, cap = ((a = array) == null) ? 0 : a.length; 1095 int t = tail, i = t & (cap - 1), n = t + 1 - head; 1096 if (cap > 0) { 1097 boolean added; 1098 if (n >= cap && cap < maxCapacity) // resize 1099 added = growAndOffer(item, a, t); 1100 else if (n >= cap || unowned) // need volatile CAS 1101 added = QA.compareAndSet(a, i, null, item); 1102 else { // can use release mode 1103 QA.setRelease(a, i, item); 1104 added = true; 1105 } 1106 if (added) { 1107 tail = t + 1; 1108 stat = n; 1109 } 1110 } 1111 return startOnOffer(stat); 1112 } 1113 1114 /** 1115 * Tries to expand buffer and add item, returning true on 1116 * success. Currently fails only if out of memory. 1117 */ 1118 final boolean growAndOffer(T item, Object[] a, int t) { 1119 int cap = 0, newCap = 0; 1120 Object[] newArray = null; 1121 if (a != null && (cap = a.length) > 0 && (newCap = cap << 1) > 0) { 1122 try { 1123 newArray = new Object[newCap]; 1124 } catch (OutOfMemoryError ex) { 1125 } 1126 } 1127 if (newArray == null) 1128 return false; 1129 else { // take and move items 1130 int newMask = newCap - 1; 1131 newArray[t-- & newMask] = item; 1132 for (int mask = cap - 1, k = mask; k >= 0; --k) { 1133 Object x = QA.getAndSet(a, t & mask, null); 1134 if (x == null) 1135 break; // already consumed 1136 else 1137 newArray[t-- & newMask] = x; 1138 } 1139 array = newArray; 1140 VarHandle.releaseFence(); // release array and slots 1141 return true; 1142 } 1143 } 1144 1145 /** 1146 * Version of offer for retries (no resize or bias) 1147 */ 1148 final int retryOffer(T item) { 1149 Object[] a; 1150 int stat = 0, t = tail, h = head, cap; 1151 if ((a = array) != null && (cap = a.length) > 0 && 1152 QA.compareAndSet(a, (cap - 1) & t, null, item)) 1153 stat = (tail = t + 1) - h; 1154 return startOnOffer(stat); 1155 } 1156 1157 /** 1158 * Tries to start consumer task after offer. 1159 * @return negative if now closed, else argument 1160 */ 1161 final int startOnOffer(int stat) { 1162 int c; // start or keep alive if requests exist and not active 1163 if (((c = ctl) & (REQS | ACTIVE)) == REQS && 1164 ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0) 1165 tryStart(); 1166 else if ((c & CLOSED) != 0) 1167 stat = -1; 1168 return stat; 1169 } 1170 1171 /** 1172 * Tries to start consumer task. Sets error state on failure. 1173 */ 1174 final void tryStart() { 1175 try { 1176 Executor e; 1177 ConsumerTask<T> task = new ConsumerTask<T>(this); 1178 if ((e = executor) != null) // skip if disabled on error 1179 e.execute(task); 1180 } catch (RuntimeException | Error ex) { 1181 getAndBitwiseOrCtl(ERROR | CLOSED); 1182 throw ex; 1183 } 1184 } 1185 1186 // Signals to consumer tasks 1187 1188 /** 1189 * Sets the given control bits, starting task if not running or closed. 1190 * @param bits state bits, assumed to include RUN but not CLOSED 1191 */ 1192 final void startOnSignal(int bits) { 1193 if ((ctl & bits) != bits && 1194 (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0) 1195 tryStart(); 1196 } 1197 1198 final void onSubscribe() { 1199 startOnSignal(RUN | ACTIVE); 1200 } 1201 1202 final void onComplete() { 1203 startOnSignal(RUN | ACTIVE | COMPLETE); 1204 } 1205 1206 final void onError(Throwable ex) { 1207 int c; Object[] a; // to null out buffer on async error 1208 if (ex != null) 1209 pendingError = ex; // races are OK 1210 if (((c = getAndBitwiseOrCtl(ERROR | RUN | ACTIVE)) & CLOSED) == 0) { 1211 if ((c & RUN) == 0) 1212 tryStart(); 1213 else if ((a = array) != null) 1214 Arrays.fill(a, null); 1215 } 1216 } 1217 1218 public final void cancel() { 1219 onError(null); 1220 } 1221 1222 public final void request(long n) { 1223 if (n > 0L) { 1224 for (;;) { 1225 long p = demand, d = p + n; // saturate 1226 if (casDemand(p, d < p ? Long.MAX_VALUE : d)) 1227 break; 1228 } 1229 startOnSignal(RUN | ACTIVE | REQS); 1230 } 1231 else 1232 onError(new IllegalArgumentException( 1233 "non-positive subscription request")); 1234 } 1235 1236 // Consumer task actions 1237 1238 /** 1239 * Consumer loop, called from ConsumerTask, or indirectly when 1240 * helping during submit. 1241 */ 1242 final void consume() { 1243 Subscriber<? super T> s; 1244 if ((s = subscriber) != null) { // hoist checks 1245 subscribeOnOpen(s); 1246 long d = demand; 1247 for (int h = head, t = tail;;) { 1248 int c, taken; boolean empty; 1249 if (((c = ctl) & ERROR) != 0) { 1250 closeOnError(s, null); 1251 break; 1252 } 1253 else if ((taken = takeItems(s, d, h)) > 0) { 1254 head = h += taken; 1255 d = subtractDemand(taken); 1256 } 1257 else if ((d = demand) == 0L && (c & REQS) != 0) 1258 weakCasCtl(c, c & ~REQS); // exhausted demand 1259 else if (d != 0L && (c & REQS) == 0) 1260 weakCasCtl(c, c | REQS); // new demand 1261 else if (t == (t = tail)) { // stability check 1262 if ((empty = (t == h)) && (c & COMPLETE) != 0) { 1263 closeOnComplete(s); // end of stream 1264 break; 1265 } 1266 else if (empty || d == 0L) { 1267 int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN; 1268 if (weakCasCtl(c, c & ~bit) && bit == RUN) 1269 break; // un-keep-alive or exit 1270 } 1271 } 1272 } 1273 } 1274 } 1275 1276 /** 1277 * Consumes some items until unavailable or bound or error. 1278 * 1279 * @param s subscriber 1280 * @param d current demand 1281 * @param h current head 1282 * @return number taken 1283 */ 1284 final int takeItems(Subscriber<? super T> s, long d, int h) { 1285 Object[] a; 1286 int k = 0, cap; 1287 if ((a = array) != null && (cap = a.length) > 0) { 1288 int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8) 1289 int n = (d < (long)b) ? (int)d : b; 1290 for (; k < n; ++h, ++k) { 1291 Object x = QA.getAndSet(a, h & m, null); 1292 if (waiting != 0) 1293 signalWaiter(); 1294 if (x == null) 1295 break; 1296 else if (!consumeNext(s, x)) 1297 break; 1298 } 1299 } 1300 return k; 1301 } 1302 1303 final boolean consumeNext(Subscriber<? super T> s, Object x) { 1304 try { 1305 @SuppressWarnings("unchecked") T y = (T) x; 1306 if (s != null) 1307 s.onNext(y); 1308 return true; 1309 } catch (Throwable ex) { 1310 handleOnNext(s, ex); 1311 return false; 1312 } 1313 } 1314 1315 /** 1316 * Processes exception in Subscriber.onNext. 1317 */ 1318 final void handleOnNext(Subscriber<? super T> s, Throwable ex) { 1319 BiConsumer<? super Subscriber<? super T>, ? super Throwable> h; 1320 try { 1321 if ((h = onNextHandler) != null) 1322 h.accept(s, ex); 1323 } catch (Throwable ignore) { 1324 } 1325 closeOnError(s, ex); 1326 } 1327 1328 /** 1329 * Issues subscriber.onSubscribe if this is first signal. 1330 */ 1331 final void subscribeOnOpen(Subscriber<? super T> s) { 1332 if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0) 1333 consumeSubscribe(s); 1334 } 1335 1336 final void consumeSubscribe(Subscriber<? super T> s) { 1337 try { 1338 if (s != null) // ignore if disabled 1339 s.onSubscribe(this); 1340 } catch (Throwable ex) { 1341 closeOnError(s, ex); 1342 } 1343 } 1344 1345 /** 1346 * Issues subscriber.onComplete unless already closed. 1347 */ 1348 final void closeOnComplete(Subscriber<? super T> s) { 1349 if ((getAndBitwiseOrCtl(CLOSED) & CLOSED) == 0) 1350 consumeComplete(s); 1351 } 1352 1353 final void consumeComplete(Subscriber<? super T> s) { 1354 try { 1355 if (s != null) 1356 s.onComplete(); 1357 } catch (Throwable ignore) { 1358 } 1359 } 1360 1361 /** 1362 * Issues subscriber.onError, and unblocks producer if needed. 1363 */ 1364 final void closeOnError(Subscriber<? super T> s, Throwable ex) { 1365 if ((getAndBitwiseOrCtl(ERROR | CLOSED) & CLOSED) == 0) { 1366 if (ex == null) 1367 ex = pendingError; 1368 pendingError = null; // detach 1369 executor = null; // suppress racing start calls 1370 signalWaiter(); 1371 consumeError(s, ex); 1372 } 1373 } 1374 1375 final void consumeError(Subscriber<? super T> s, Throwable ex) { 1376 try { 1377 if (ex != null && s != null) 1378 s.onError(ex); 1379 } catch (Throwable ignore) { 1380 } 1381 } 1382 1383 // Blocking support 1384 1385 /** 1386 * Unblocks waiting producer. 1387 */ 1388 final void signalWaiter() { 1389 Thread w; 1390 waiting = 0; 1391 if ((w = waiter) != null) 1392 LockSupport.unpark(w); 1393 } 1394 1395 /** 1396 * Returns true if closed or space available. 1397 * For ManagedBlocker. 1398 */ 1399 public final boolean isReleasable() { 1400 Object[] a; int cap; 1401 return ((ctl & CLOSED) != 0 || 1402 ((a = array) != null && (cap = a.length) > 0 && 1403 QA.getAcquire(a, (cap - 1) & tail) == null)); 1404 } 1405 1406 /** 1407 * Helps or blocks until timeout, closed, or space available. 1408 */ 1409 final void awaitSpace(long nanos) { 1410 if (!isReleasable()) { 1411 ForkJoinPool.helpAsyncBlocker(executor, this); 1412 if (!isReleasable()) { 1413 timeout = nanos; 1414 try { 1415 ForkJoinPool.managedBlock(this); 1416 } catch (InterruptedException ie) { 1417 timeout = INTERRUPTED; 1418 } 1419 if (timeout == INTERRUPTED) 1420 Thread.currentThread().interrupt(); 1421 } 1422 } 1423 } 1424 1425 /** 1426 * Blocks until closed, space available or timeout. 1427 * For ManagedBlocker. 1428 */ 1429 public final boolean block() { 1430 long nanos = timeout; 1431 boolean timed = (nanos < Long.MAX_VALUE); 1432 long deadline = timed ? System.nanoTime() + nanos : 0L; 1433 while (!isReleasable()) { 1434 if (Thread.interrupted()) { 1435 timeout = INTERRUPTED; 1436 if (timed) 1437 break; 1438 } 1439 else if (timed && (nanos = deadline - System.nanoTime()) <= 0L) 1440 break; 1441 else if (waiter == null) 1442 waiter = Thread.currentThread(); 1443 else if (waiting == 0) 1444 waiting = 1; 1445 else if (timed) 1446 LockSupport.parkNanos(this, nanos); 1447 else 1448 LockSupport.park(this); 1449 } 1450 waiter = null; 1451 waiting = 0; 1452 return true; 1453 } 1454 1455 // VarHandle mechanics 1456 static final VarHandle CTL; 1457 static final VarHandle DEMAND; 1458 static final VarHandle QA; 1459 1460 static { 1461 try { 1462 MethodHandles.Lookup l = MethodHandles.lookup(); 1463 CTL = l.findVarHandle(BufferedSubscription.class, "ctl", 1464 int.class); 1465 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand", 1466 long.class); 1467 QA = MethodHandles.arrayElementVarHandle(Object[].class); 1468 } catch (ReflectiveOperationException e) { 1469 throw new ExceptionInInitializerError(e); 1470 } 1471 1472 // Reduce the risk of rare disastrous classloading in first call to 1473 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 1474 Class<?> ensureLoaded = LockSupport.class; 1475 } 1476 } 1477 }