1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.ArrayList; 39 import java.util.List; 40 import java.util.concurrent.locks.LockSupport; 41 import java.util.function.BiConsumer; 42 import java.util.function.BiPredicate; 43 import java.util.function.Consumer; 44 45 /** 46 * A {@link Flow.Publisher} that asynchronously issues submitted 47 * (non-null) items to current subscribers until it is closed. Each 48 * current subscriber receives newly submitted items in the same order 49 * unless drops or exceptions are encountered. Using a 50 * SubmissionPublisher allows item generators to act as compliant <a 51 * href="http://www.reactive-streams.org/"> reactive-streams</a> 52 * Publishers relying on drop handling and/or blocking for flow 53 * control. 54 * 55 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its 56 * constructor for delivery to subscribers. The best choice of 57 * Executor depends on expected usage. If the generator(s) of 58 * submitted items run in separate threads, and the number of 59 * subscribers can be estimated, consider using a {@link 60 * Executors#newFixedThreadPool}. Otherwise consider using the 61 * default, normally the {@link ForkJoinPool#commonPool}. 62 * 63 * <p>Buffering allows producers and consumers to transiently operate 64 * at different rates. Each subscriber uses an independent buffer. 65 * Buffers are created upon first use and expanded as needed up to the 66 * given maximum. (The enforced capacity may be rounded up to the 67 * nearest power of two and/or bounded by the largest value supported 68 * by this implementation.) Invocations of {@link 69 * Flow.Subscription#request(long) request} do not directly result in 70 * buffer expansion, but risk saturation if unfilled requests exceed 71 * the maximum capacity. The default value of {@link 72 * Flow#defaultBufferSize()} may provide a useful starting point for 73 * choosing a capacity based on expected rates, resources, and usages. 74 * 75 * <p>Publication methods support different policies about what to do 76 * when buffers are saturated. Method {@link #submit(Object) submit} 77 * blocks until resources are available. This is simplest, but least 78 * responsive. The {@code offer} methods may drop items (either 79 * immediately or with bounded timeout), but provide an opportunity to 80 * interpose a handler and then retry. 81 * 82 * <p>If any Subscriber method throws an exception, its subscription 83 * is cancelled. If a handler is supplied as a constructor argument, 84 * it is invoked before cancellation upon an exception in method 85 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods 86 * {@link Flow.Subscriber#onSubscribe onSubscribe}, 87 * {@link Flow.Subscriber#onError(Throwable) onError} and 88 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or 89 * handled before cancellation. If the supplied Executor throws 90 * {@link RejectedExecutionException} (or any other RuntimeException 91 * or Error) when attempting to execute a task, or a drop handler 92 * throws an exception when processing a dropped item, then the 93 * exception is rethrown. In these cases, not all subscribers will 94 * have been issued the published item. It is usually good practice to 95 * {@link #closeExceptionally closeExceptionally} in these cases. 96 * 97 * <p>Method {@link #consume(Consumer)} simplifies support for a 98 * common case in which the only action of a subscriber is to request 99 * and process all items using a supplied function. 100 * 101 * <p>This class may also serve as a convenient base for subclasses 102 * that generate items, and use the methods in this class to publish 103 * them. For example here is a class that periodically publishes the 104 * items generated from a supplier. (In practice you might add methods 105 * to independently start and stop generation, to share Executors 106 * among publishers, and so on, or use a SubmissionPublisher as a 107 * component rather than a superclass.) 108 * 109 * <pre> {@code 110 * class PeriodicPublisher<T> extends SubmissionPublisher<T> { 111 * final ScheduledFuture<?> periodicTask; 112 * final ScheduledExecutorService scheduler; 113 * PeriodicPublisher(Executor executor, int maxBufferCapacity, 114 * Supplier<? extends T> supplier, 115 * long period, TimeUnit unit) { 116 * super(executor, maxBufferCapacity); 117 * scheduler = new ScheduledThreadPoolExecutor(1); 118 * periodicTask = scheduler.scheduleAtFixedRate( 119 * () -> submit(supplier.get()), 0, period, unit); 120 * } 121 * public void close() { 122 * periodicTask.cancel(false); 123 * scheduler.shutdown(); 124 * super.close(); 125 * } 126 * }}</pre> 127 * 128 * <p>Here is an example of a {@link Flow.Processor} implementation. 129 * It uses single-step requests to its publisher for simplicity of 130 * illustration. A more adaptive version could monitor flow using the 131 * lag estimate returned from {@code submit}, along with other utility 132 * methods. 133 * 134 * <pre> {@code 135 * class TransformProcessor<S,T> extends SubmissionPublisher<T> 136 * implements Flow.Processor<S,T> { 137 * final Function<? super S, ? extends T> function; 138 * Flow.Subscription subscription; 139 * TransformProcessor(Executor executor, int maxBufferCapacity, 140 * Function<? super S, ? extends T> function) { 141 * super(executor, maxBufferCapacity); 142 * this.function = function; 143 * } 144 * public void onSubscribe(Flow.Subscription subscription) { 145 * (this.subscription = subscription).request(1); 146 * } 147 * public void onNext(S item) { 148 * subscription.request(1); 149 * submit(function.apply(item)); 150 * } 151 * public void onError(Throwable ex) { closeExceptionally(ex); } 152 * public void onComplete() { close(); } 153 * }}</pre> 154 * 155 * @param <T> the published item type 156 * @author Doug Lea 157 * @since 1.9 158 */ 159 public class SubmissionPublisher<T> implements Flow.Publisher<T>, 160 AutoCloseable { 161 /* 162 * Most mechanics are handled by BufferedSubscription. This class 163 * mainly tracks subscribers and ensures sequentiality, by using 164 * built-in synchronization locks across public methods. (Using 165 * built-in locks works well in the most typical case in which 166 * only one thread submits items). 167 */ 168 169 /** The largest possible power of two array size. */ 170 static final int BUFFER_CAPACITY_LIMIT = 1 << 30; 171 172 /** Round capacity to power of 2, at most limit. */ 173 static final int roundCapacity(int cap) { 174 int n = cap - 1; 175 n |= n >>> 1; 176 n |= n >>> 2; 177 n |= n >>> 4; 178 n |= n >>> 8; 179 n |= n >>> 16; 180 return (n <= 0) ? 1 : // at least 1 181 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1; 182 } 183 184 // default Executor setup; nearly the same as CompletableFuture 185 186 /** 187 * Default executor -- ForkJoinPool.commonPool() unless it cannot 188 * support parallelism. 189 */ 190 private static final Executor ASYNC_POOL = 191 (ForkJoinPool.getCommonPoolParallelism() > 1) ? 192 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 193 194 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 195 private static final class ThreadPerTaskExecutor implements Executor { 196 public void execute(Runnable r) { new Thread(r).start(); } 197 } 198 199 /** 200 * Clients (BufferedSubscriptions) are maintained in a linked list 201 * (via their "next" fields). This works well for publish loops. 202 * It requires O(n) traversal to check for duplicate subscribers, 203 * but we expect that subscribing is much less common than 204 * publishing. Unsubscribing occurs only during traversal loops, 205 * when BufferedSubscription methods return negative values 206 * signifying that they have been disabled. To reduce 207 * head-of-line blocking, submit and offer methods first call 208 * BufferedSubscription.offer on each subscriber, and place 209 * saturated ones in retries list (using nextRetry field), and 210 * retry, possibly blocking or dropping. 211 */ 212 BufferedSubscription<T> clients; 213 214 /** Run status, updated only within locks */ 215 volatile boolean closed; 216 /** If non-null, the exception in closeExceptionally */ 217 volatile Throwable closedException; 218 219 // Parameters for constructing BufferedSubscriptions 220 final Executor executor; 221 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler; 222 final int maxBufferCapacity; 223 224 /** 225 * Creates a new SubmissionPublisher using the given Executor for 226 * async delivery to subscribers, with the given maximum buffer size 227 * for each subscriber, and, if non-null, the given handler invoked 228 * when any Subscriber throws an exception in method {@link 229 * Flow.Subscriber#onNext(Object) onNext}. 230 * 231 * @param executor the executor to use for async delivery, 232 * supporting creation of at least one independent thread 233 * @param maxBufferCapacity the maximum capacity for each 234 * subscriber's buffer (the enforced capacity may be rounded up to 235 * the nearest power of two and/or bounded by the largest value 236 * supported by this implementation; method {@link #getMaxBufferCapacity} 237 * returns the actual value) 238 * @param handler if non-null, procedure to invoke upon exception 239 * thrown in method {@code onNext} 240 * @throws NullPointerException if executor is null 241 * @throws IllegalArgumentException if maxBufferCapacity not 242 * positive 243 */ 244 public SubmissionPublisher(Executor executor, int maxBufferCapacity, 245 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) { 246 if (executor == null) 247 throw new NullPointerException(); 248 if (maxBufferCapacity <= 0) 249 throw new IllegalArgumentException("capacity must be positive"); 250 this.executor = executor; 251 this.onNextHandler = handler; 252 this.maxBufferCapacity = roundCapacity(maxBufferCapacity); 253 } 254 255 /** 256 * Creates a new SubmissionPublisher using the given Executor for 257 * async delivery to subscribers, with the given maximum buffer size 258 * for each subscriber, and no handler for Subscriber exceptions in 259 * method {@link Flow.Subscriber#onNext(Object) onNext}. 260 * 261 * @param executor the executor to use for async delivery, 262 * supporting creation of at least one independent thread 263 * @param maxBufferCapacity the maximum capacity for each 264 * subscriber's buffer (the enforced capacity may be rounded up to 265 * the nearest power of two and/or bounded by the largest value 266 * supported by this implementation; method {@link #getMaxBufferCapacity} 267 * returns the actual value) 268 * @throws NullPointerException if executor is null 269 * @throws IllegalArgumentException if maxBufferCapacity not 270 * positive 271 */ 272 public SubmissionPublisher(Executor executor, int maxBufferCapacity) { 273 this(executor, maxBufferCapacity, null); 274 } 275 276 /** 277 * Creates a new SubmissionPublisher using the {@link 278 * ForkJoinPool#commonPool()} for async delivery to subscribers 279 * (unless it does not support a parallelism level of at least two, 280 * in which case, a new Thread is created to run each task), with 281 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no 282 * handler for Subscriber exceptions in method {@link 283 * Flow.Subscriber#onNext(Object) onNext}. 284 */ 285 public SubmissionPublisher() { 286 this(ASYNC_POOL, Flow.defaultBufferSize(), null); 287 } 288 289 /** 290 * Adds the given Subscriber unless already subscribed. If already 291 * subscribed, the Subscriber's {@link 292 * Flow.Subscriber#onError(Throwable) onError} method is invoked on 293 * the existing subscription with an {@link IllegalStateException}. 294 * Otherwise, upon success, the Subscriber's {@link 295 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked 296 * asynchronously with a new {@link Flow.Subscription}. If {@link 297 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the 298 * subscription is cancelled. Otherwise, if this SubmissionPublisher 299 * was closed exceptionally, then the subscriber's {@link 300 * Flow.Subscriber#onError onError} method is invoked with the 301 * corresponding exception, or if closed without exception, the 302 * subscriber's {@link Flow.Subscriber#onComplete() onComplete} 303 * method is invoked. Subscribers may enable receiving items by 304 * invoking the {@link Flow.Subscription#request(long) request} 305 * method of the new Subscription, and may unsubscribe by invoking 306 * its {@link Flow.Subscription#cancel() cancel} method. 307 * 308 * @param subscriber the subscriber 309 * @throws NullPointerException if subscriber is null 310 */ 311 public void subscribe(Flow.Subscriber<? super T> subscriber) { 312 if (subscriber == null) throw new NullPointerException(); 313 BufferedSubscription<T> subscription = 314 new BufferedSubscription<T>(subscriber, executor, 315 onNextHandler, maxBufferCapacity); 316 synchronized (this) { 317 for (BufferedSubscription<T> b = clients, pred = null;;) { 318 if (b == null) { 319 Throwable ex; 320 subscription.onSubscribe(); 321 if ((ex = closedException) != null) 322 subscription.onError(ex); 323 else if (closed) 324 subscription.onComplete(); 325 else if (pred == null) 326 clients = subscription; 327 else 328 pred.next = subscription; 329 break; 330 } 331 BufferedSubscription<T> next = b.next; 332 if (b.isDisabled()) { // remove 333 b.next = null; // detach 334 if (pred == null) 335 clients = next; 336 else 337 pred.next = next; 338 } 339 else if (subscriber.equals(b.subscriber)) { 340 b.onError(new IllegalStateException("Duplicate subscribe")); 341 break; 342 } 343 else 344 pred = b; 345 b = next; 346 } 347 } 348 } 349 350 /** 351 * Publishes the given item to each current subscriber by 352 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object) 353 * onNext} method, blocking uninterruptibly while resources for any 354 * subscriber are unavailable. This method returns an estimate of 355 * the maximum lag (number of items submitted but not yet consumed) 356 * among all current subscribers. This value is at least one 357 * (accounting for this submitted item) if there are any 358 * subscribers, else zero. 359 * 360 * <p>If the Executor for this publisher throws a 361 * RejectedExecutionException (or any other RuntimeException or 362 * Error) when attempting to asynchronously notify subscribers, 363 * then this exception is rethrown, in which case not all 364 * subscribers will have been issued this item. 365 * 366 * @param item the (non-null) item to publish 367 * @return the estimated maximum lag among subscribers 368 * @throws IllegalStateException if closed 369 * @throws NullPointerException if item is null 370 * @throws RejectedExecutionException if thrown by Executor 371 */ 372 public int submit(T item) { 373 if (item == null) throw new NullPointerException(); 374 int lag = 0; 375 boolean complete; 376 synchronized (this) { 377 complete = closed; 378 BufferedSubscription<T> b = clients; 379 if (!complete) { 380 BufferedSubscription<T> pred = null, r = null, rtail = null; 381 while (b != null) { 382 BufferedSubscription<T> next = b.next; 383 int stat = b.offer(item); 384 if (stat < 0) { // disabled 385 b.next = null; 386 if (pred == null) 387 clients = next; 388 else 389 pred.next = next; 390 } 391 else { 392 if (stat > lag) 393 lag = stat; 394 else if (stat == 0) { // place on retry list 395 b.nextRetry = null; 396 if (rtail == null) 397 r = b; 398 else 399 rtail.nextRetry = b; 400 rtail = b; 401 } 402 pred = b; 403 } 404 b = next; 405 } 406 while (r != null) { 407 BufferedSubscription<T> nextRetry = r.nextRetry; 408 r.nextRetry = null; 409 int stat = r.submit(item); 410 if (stat > lag) 411 lag = stat; 412 else if (stat < 0 && clients == r) 413 clients = r.next; // postpone internal unsubscribes 414 r = nextRetry; 415 } 416 } 417 } 418 if (complete) 419 throw new IllegalStateException("Closed"); 420 else 421 return lag; 422 } 423 424 /** 425 * Publishes the given item, if possible, to each current subscriber 426 * by asynchronously invoking its {@link 427 * Flow.Subscriber#onNext(Object) onNext} method. The item may be 428 * dropped by one or more subscribers if resource limits are 429 * exceeded, in which case the given handler (if non-null) is 430 * invoked, and if it returns true, retried once. Other calls to 431 * methods in this class by other threads are blocked while the 432 * handler is invoked. Unless recovery is assured, options are 433 * usually limited to logging the error and/or issuing an {@link 434 * Flow.Subscriber#onError(Throwable) onError} signal to the 435 * subscriber. 436 * 437 * <p>This method returns a status indicator: If negative, it 438 * represents the (negative) number of drops (failed attempts to 439 * issue the item to a subscriber). Otherwise it is an estimate of 440 * the maximum lag (number of items submitted but not yet 441 * consumed) among all current subscribers. This value is at least 442 * one (accounting for this submitted item) if there are any 443 * subscribers, else zero. 444 * 445 * <p>If the Executor for this publisher throws a 446 * RejectedExecutionException (or any other RuntimeException or 447 * Error) when attempting to asynchronously notify subscribers, or 448 * the drop handler throws an exception when processing a dropped 449 * item, then this exception is rethrown. 450 * 451 * @param item the (non-null) item to publish 452 * @param onDrop if non-null, the handler invoked upon a drop to a 453 * subscriber, with arguments of the subscriber and item; if it 454 * returns true, an offer is re-attempted (once) 455 * @return if negative, the (negative) number of drops; otherwise 456 * an estimate of maximum lag 457 * @throws IllegalStateException if closed 458 * @throws NullPointerException if item is null 459 * @throws RejectedExecutionException if thrown by Executor 460 */ 461 public int offer(T item, 462 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) { 463 return doOffer(0L, item, onDrop); 464 } 465 466 /** 467 * Publishes the given item, if possible, to each current subscriber 468 * by asynchronously invoking its {@link 469 * Flow.Subscriber#onNext(Object) onNext} method, blocking while 470 * resources for any subscription are unavailable, up to the 471 * specified timeout or until the caller thread is interrupted, at 472 * which point the given handler (if non-null) is invoked, and if it 473 * returns true, retried once. (The drop handler may distinguish 474 * timeouts from interrupts by checking whether the current thread 475 * is interrupted.) Other calls to methods in this class by other 476 * threads are blocked while the handler is invoked. Unless 477 * recovery is assured, options are usually limited to logging the 478 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable) 479 * onError} signal to the subscriber. 480 * 481 * <p>This method returns a status indicator: If negative, it 482 * represents the (negative) number of drops (failed attempts to 483 * issue the item to a subscriber). Otherwise it is an estimate of 484 * the maximum lag (number of items submitted but not yet 485 * consumed) among all current subscribers. This value is at least 486 * one (accounting for this submitted item) if there are any 487 * subscribers, else zero. 488 * 489 * <p>If the Executor for this publisher throws a 490 * RejectedExecutionException (or any other RuntimeException or 491 * Error) when attempting to asynchronously notify subscribers, or 492 * the drop handler throws an exception when processing a dropped 493 * item, then this exception is rethrown. 494 * 495 * @param item the (non-null) item to publish 496 * @param timeout how long to wait for resources for any subscriber 497 * before giving up, in units of {@code unit} 498 * @param unit a {@code TimeUnit} determining how to interpret the 499 * {@code timeout} parameter 500 * @param onDrop if non-null, the handler invoked upon a drop to a 501 * subscriber, with arguments of the subscriber and item; if it 502 * returns true, an offer is re-attempted (once) 503 * @return if negative, the (negative) number of drops; otherwise 504 * an estimate of maximum lag 505 * @throws IllegalStateException if closed 506 * @throws NullPointerException if item is null 507 * @throws RejectedExecutionException if thrown by Executor 508 */ 509 public int offer(T item, long timeout, TimeUnit unit, 510 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) { 511 return doOffer(unit.toNanos(timeout), item, onDrop); 512 } 513 514 /** Common implementation for both forms of offer */ 515 final int doOffer(long nanos, T item, 516 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) { 517 if (item == null) throw new NullPointerException(); 518 int lag = 0, drops = 0; 519 boolean complete; 520 synchronized (this) { 521 complete = closed; 522 BufferedSubscription<T> b = clients; 523 if (!complete) { 524 BufferedSubscription<T> pred = null, r = null, rtail = null; 525 while (b != null) { 526 BufferedSubscription<T> next = b.next; 527 int stat = b.offer(item); 528 if (stat < 0) { 529 b.next = null; 530 if (pred == null) 531 clients = next; 532 else 533 pred.next = next; 534 } 535 else { 536 if (stat > lag) 537 lag = stat; 538 else if (stat == 0) { 539 b.nextRetry = null; 540 if (rtail == null) 541 r = b; 542 else 543 rtail.nextRetry = b; 544 rtail = b; 545 } 546 else if (stat > lag) 547 lag = stat; 548 pred = b; 549 } 550 b = next; 551 } 552 while (r != null) { 553 BufferedSubscription<T> nextRetry = r.nextRetry; 554 r.nextRetry = null; 555 int stat = (nanos > 0L) ? r.timedOffer(item, nanos) : 556 r.offer(item); 557 if (stat == 0 && onDrop != null && 558 onDrop.test(r.subscriber, item)) 559 stat = r.offer(item); 560 if (stat == 0) 561 ++drops; 562 else if (stat > lag) 563 lag = stat; 564 else if (stat < 0 && clients == r) 565 clients = r.next; 566 r = nextRetry; 567 } 568 } 569 } 570 if (complete) 571 throw new IllegalStateException("Closed"); 572 else 573 return (drops > 0) ? -drops : lag; 574 } 575 576 /** 577 * Unless already closed, issues {@link 578 * Flow.Subscriber#onComplete() onComplete} signals to current 579 * subscribers, and disallows subsequent attempts to publish. 580 * Upon return, this method does <em>NOT</em> guarantee that all 581 * subscribers have yet completed. 582 */ 583 public void close() { 584 if (!closed) { 585 BufferedSubscription<T> b; 586 synchronized (this) { 587 b = clients; 588 clients = null; 589 closed = true; 590 } 591 while (b != null) { 592 BufferedSubscription<T> next = b.next; 593 b.next = null; 594 b.onComplete(); 595 b = next; 596 } 597 } 598 } 599 600 /** 601 * Unless already closed, issues {@link 602 * Flow.Subscriber#onError(Throwable) onError} signals to current 603 * subscribers with the given error, and disallows subsequent 604 * attempts to publish. Future subscribers also receive the given 605 * error. Upon return, this method does <em>NOT</em> guarantee 606 * that all subscribers have yet completed. 607 * 608 * @param error the {@code onError} argument sent to subscribers 609 * @throws NullPointerException if error is null 610 */ 611 public void closeExceptionally(Throwable error) { 612 if (error == null) 613 throw new NullPointerException(); 614 if (!closed) { 615 BufferedSubscription<T> b; 616 synchronized (this) { 617 b = clients; 618 clients = null; 619 closed = true; 620 closedException = error; 621 } 622 while (b != null) { 623 BufferedSubscription<T> next = b.next; 624 b.next = null; 625 b.onError(error); 626 b = next; 627 } 628 } 629 } 630 631 /** 632 * Returns true if this publisher is not accepting submissions. 633 * 634 * @return true if closed 635 */ 636 public boolean isClosed() { 637 return closed; 638 } 639 640 /** 641 * Returns the exception associated with {@link 642 * #closeExceptionally(Throwable) closeExceptionally}, or null if 643 * not closed or if closed normally. 644 * 645 * @return the exception, or null if none 646 */ 647 public Throwable getClosedException() { 648 return closedException; 649 } 650 651 /** 652 * Returns true if this publisher has any subscribers. 653 * 654 * @return true if this publisher has any subscribers 655 */ 656 public boolean hasSubscribers() { 657 boolean nonEmpty = false; 658 if (!closed) { 659 synchronized (this) { 660 for (BufferedSubscription<T> b = clients; b != null;) { 661 BufferedSubscription<T> next = b.next; 662 if (b.isDisabled()) { 663 b.next = null; 664 b = clients = next; 665 } 666 else { 667 nonEmpty = true; 668 break; 669 } 670 } 671 } 672 } 673 return nonEmpty; 674 } 675 676 /** 677 * Returns the number of current subscribers. 678 * 679 * @return the number of current subscribers 680 */ 681 public int getNumberOfSubscribers() { 682 int count = 0; 683 if (!closed) { 684 synchronized (this) { 685 BufferedSubscription<T> pred = null, next; 686 for (BufferedSubscription<T> b = clients; b != null; b = next) { 687 next = b.next; 688 if (b.isDisabled()) { 689 b.next = null; 690 if (pred == null) 691 clients = next; 692 else 693 pred.next = next; 694 } 695 else { 696 pred = b; 697 ++count; 698 } 699 } 700 } 701 } 702 return count; 703 } 704 705 /** 706 * Returns the Executor used for asynchronous delivery. 707 * 708 * @return the Executor used for asynchronous delivery 709 */ 710 public Executor getExecutor() { 711 return executor; 712 } 713 714 /** 715 * Returns the maximum per-subscriber buffer capacity. 716 * 717 * @return the maximum per-subscriber buffer capacity 718 */ 719 public int getMaxBufferCapacity() { 720 return maxBufferCapacity; 721 } 722 723 /** 724 * Returns a list of current subscribers for monitoring and 725 * tracking purposes, not for invoking {@link Flow.Subscriber} 726 * methods on the subscribers. 727 * 728 * @return list of current subscribers 729 */ 730 public List<Flow.Subscriber<? super T>> getSubscribers() { 731 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>(); 732 synchronized (this) { 733 BufferedSubscription<T> pred = null, next; 734 for (BufferedSubscription<T> b = clients; b != null; b = next) { 735 next = b.next; 736 if (b.isDisabled()) { 737 b.next = null; 738 if (pred == null) 739 clients = next; 740 else 741 pred.next = next; 742 } 743 else 744 subs.add(b.subscriber); 745 } 746 } 747 return subs; 748 } 749 750 /** 751 * Returns true if the given Subscriber is currently subscribed. 752 * 753 * @param subscriber the subscriber 754 * @return true if currently subscribed 755 * @throws NullPointerException if subscriber is null 756 */ 757 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) { 758 if (subscriber == null) throw new NullPointerException(); 759 if (!closed) { 760 synchronized (this) { 761 BufferedSubscription<T> pred = null, next; 762 for (BufferedSubscription<T> b = clients; b != null; b = next) { 763 next = b.next; 764 if (b.isDisabled()) { 765 b.next = null; 766 if (pred == null) 767 clients = next; 768 else 769 pred.next = next; 770 } 771 else if (subscriber.equals(b.subscriber)) 772 return true; 773 else 774 pred = b; 775 } 776 } 777 } 778 return false; 779 } 780 781 /** 782 * Returns an estimate of the minimum number of items requested 783 * (via {@link Flow.Subscription#request(long) request}) but not 784 * yet produced, among all current subscribers. 785 * 786 * @return the estimate, or zero if no subscribers 787 */ 788 public long estimateMinimumDemand() { 789 long min = Long.MAX_VALUE; 790 boolean nonEmpty = false; 791 synchronized (this) { 792 BufferedSubscription<T> pred = null, next; 793 for (BufferedSubscription<T> b = clients; b != null; b = next) { 794 int n; long d; 795 next = b.next; 796 if ((n = b.estimateLag()) < 0) { 797 b.next = null; 798 if (pred == null) 799 clients = next; 800 else 801 pred.next = next; 802 } 803 else { 804 if ((d = b.demand - n) < min) 805 min = d; 806 nonEmpty = true; 807 pred = b; 808 } 809 } 810 } 811 return nonEmpty ? min : 0; 812 } 813 814 /** 815 * Returns an estimate of the maximum number of items produced but 816 * not yet consumed among all current subscribers. 817 * 818 * @return the estimate 819 */ 820 public int estimateMaximumLag() { 821 int max = 0; 822 synchronized (this) { 823 BufferedSubscription<T> pred = null, next; 824 for (BufferedSubscription<T> b = clients; b != null; b = next) { 825 int n; 826 next = b.next; 827 if ((n = b.estimateLag()) < 0) { 828 b.next = null; 829 if (pred == null) 830 clients = next; 831 else 832 pred.next = next; 833 } 834 else { 835 if (n > max) 836 max = n; 837 pred = b; 838 } 839 } 840 } 841 return max; 842 } 843 844 /** 845 * Processes all published items using the given Consumer function. 846 * Returns a CompletableFuture that is completed normally when this 847 * publisher signals {@link Flow.Subscriber#onComplete() 848 * onComplete}, or completed exceptionally upon any error, or an 849 * exception is thrown by the Consumer, or the returned 850 * CompletableFuture is cancelled, in which case no further items 851 * are processed. 852 * 853 * @param consumer the function applied to each onNext item 854 * @return a CompletableFuture that is completed normally 855 * when the publisher signals onComplete, and exceptionally 856 * upon any error or cancellation 857 * @throws NullPointerException if consumer is null 858 */ 859 public CompletableFuture<Void> consume(Consumer<? super T> consumer) { 860 if (consumer == null) 861 throw new NullPointerException(); 862 CompletableFuture<Void> status = new CompletableFuture<>(); 863 subscribe(new ConsumerSubscriber<T>(status, consumer)); 864 return status; 865 } 866 867 /** Subscriber for method consume */ 868 private static final class ConsumerSubscriber<T> 869 implements Flow.Subscriber<T> { 870 final CompletableFuture<Void> status; 871 final Consumer<? super T> consumer; 872 Flow.Subscription subscription; 873 ConsumerSubscriber(CompletableFuture<Void> status, 874 Consumer<? super T> consumer) { 875 this.status = status; this.consumer = consumer; 876 } 877 public final void onSubscribe(Flow.Subscription subscription) { 878 this.subscription = subscription; 879 status.whenComplete((v, e) -> subscription.cancel()); 880 if (!status.isDone()) 881 subscription.request(Long.MAX_VALUE); 882 } 883 public final void onError(Throwable ex) { 884 status.completeExceptionally(ex); 885 } 886 public final void onComplete() { 887 status.complete(null); 888 } 889 public final void onNext(T item) { 890 try { 891 consumer.accept(item); 892 } catch (Throwable ex) { 893 subscription.cancel(); 894 status.completeExceptionally(ex); 895 } 896 } 897 } 898 899 /** 900 * A task for consuming buffer items and signals, created and 901 * executed whenever they become available. A task consumes as 902 * many items/signals as possible before terminating, at which 903 * point another task is created when needed. The dual Runnable 904 * and ForkJoinTask declaration saves overhead when executed by 905 * ForkJoinPools, without impacting other kinds of Executors. 906 */ 907 @SuppressWarnings("serial") 908 static final class ConsumerTask<T> extends ForkJoinTask<Void> 909 implements Runnable { 910 final BufferedSubscription<T> consumer; 911 ConsumerTask(BufferedSubscription<T> consumer) { 912 this.consumer = consumer; 913 } 914 public final Void getRawResult() { return null; } 915 public final void setRawResult(Void v) {} 916 public final boolean exec() { consumer.consume(); return false; } 917 public final void run() { consumer.consume(); } 918 } 919 920 /** 921 * A bounded (ring) buffer with integrated control to start a 922 * consumer task whenever items are available. The buffer 923 * algorithm is similar to one used inside ForkJoinPool (see its 924 * internal documentation for details) specialized for the case of 925 * at most one concurrent producer and consumer, and power of two 926 * buffer sizes. This allows methods to operate without locks even 927 * while supporting resizing, blocking, task-triggering, and 928 * garbage-free buffers (nulling out elements when consumed), 929 * although supporting these does impose a bit of overhead 930 * compared to plain fixed-size ring buffers. 931 * 932 * The publisher guarantees a single producer via its lock. We 933 * ensure in this class that there is at most one consumer. The 934 * request and cancel methods must be fully thread-safe but are 935 * coded to exploit the most common case in which they are only 936 * called by consumers (usually within onNext). 937 * 938 * Execution control is managed using the ACTIVE ctl bit. We 939 * ensure that a task is active when consumable items (and 940 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and 941 * there is demand (unfilled requests). This is complicated on 942 * the creation side by the possibility of exceptions when trying 943 * to execute tasks. These eventually force DISABLED state, but 944 * sometimes not directly. On the task side, termination (clearing 945 * ACTIVE) that would otherwise race with producers or request() 946 * calls uses the CONSUME keep-alive bit to force a recheck. 947 * 948 * The ctl field also manages run state. When DISABLED, no further 949 * updates are possible. Disabling may be preceded by setting 950 * ERROR or COMPLETE (or both -- ERROR has precedence), in which 951 * case the associated Subscriber methods are invoked, possibly 952 * synchronously if there is no active consumer task (including 953 * cases where execute() failed). The cancel() method is supported 954 * by treating as ERROR but suppressing onError signal. 955 * 956 * Support for blocking also exploits the fact that there is only 957 * one possible waiter. ManagedBlocker-compatible control fields 958 * are placed in this class itself rather than in wait-nodes. 959 * Blocking control relies on the "waiter" field. Producers set 960 * the field before trying to block, but must then recheck (via 961 * offer) before parking. Signalling then just unparks and clears 962 * waiter field. If the producer and consumer are both in the same 963 * ForkJoinPool, or consumers are running in commonPool, the 964 * producer attempts to help run consumer tasks that it forked 965 * before blocking. To avoid potential cycles, only one level of 966 * helping is currently supported. 967 * 968 * This class uses @Contended and heuristic field declaration 969 * ordering to reduce false-sharing-based memory contention among 970 * instances of BufferedSubscription, but it does not currently 971 * attempt to avoid memory contention among buffers. This field 972 * and element packing can hurt performance especially when each 973 * publisher has only one client operating at a high rate. 974 * Addressing this may require allocating substantially more space 975 * than users expect. 976 */ 977 @SuppressWarnings("serial") 978 @sun.misc.Contended 979 private static final class BufferedSubscription<T> 980 implements Flow.Subscription, ForkJoinPool.ManagedBlocker { 981 // Order-sensitive field declarations 982 long timeout; // > 0 if timed wait 983 volatile long demand; // # unfilled requests 984 int maxCapacity; // reduced on OOME 985 int putStat; // offer result for ManagedBlocker 986 int helpDepth; // nested helping depth (at most 1) 987 volatile int ctl; // atomic run state flags 988 volatile int head; // next position to take 989 int tail; // next position to put 990 Object[] array; // buffer: null if disabled 991 Flow.Subscriber<? super T> subscriber; // null if disabled 992 Executor executor; // null if disabled 993 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler; 994 volatile Throwable pendingError; // holds until onError issued 995 volatile Thread waiter; // blocked producer thread 996 T putItem; // for offer within ManagedBlocker 997 BufferedSubscription<T> next; // used only by publisher 998 BufferedSubscription<T> nextRetry; // used only by publisher 999 1000 // ctl values 1001 static final int ACTIVE = 0x01; // consumer task active 1002 static final int CONSUME = 0x02; // keep-alive for consumer task 1003 static final int DISABLED = 0x04; // final state 1004 static final int ERROR = 0x08; // signal onError then disable 1005 static final int SUBSCRIBE = 0x10; // signal onSubscribe 1006 static final int COMPLETE = 0x20; // signal onComplete when done 1007 1008 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel 1009 1010 /** 1011 * Initial buffer capacity used when maxBufferCapacity is 1012 * greater. Must be a power of two. 1013 */ 1014 static final int DEFAULT_INITIAL_CAP = 32; 1015 1016 BufferedSubscription(Flow.Subscriber<? super T> subscriber, 1017 Executor executor, 1018 BiConsumer<? super Flow.Subscriber<? super T>, 1019 ? super Throwable> onNextHandler, 1020 int maxBufferCapacity) { 1021 this.subscriber = subscriber; 1022 this.executor = executor; 1023 this.onNextHandler = onNextHandler; 1024 this.maxCapacity = maxBufferCapacity; 1025 this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ? 1026 (maxBufferCapacity < 2 ? // at least 2 slots 1027 2 : maxBufferCapacity) : 1028 DEFAULT_INITIAL_CAP]; 1029 } 1030 1031 final boolean isDisabled() { 1032 return ctl == DISABLED; 1033 } 1034 1035 /** 1036 * Returns estimated number of buffered items, or -1 if 1037 * disabled. 1038 */ 1039 final int estimateLag() { 1040 int n; 1041 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0; 1042 } 1043 1044 /** 1045 * Tries to add item and start consumer task if necessary. 1046 * @return -1 if disabled, 0 if dropped, else estimated lag 1047 */ 1048 final int offer(T item) { 1049 int h = head, t = tail, cap, size, stat; 1050 Object[] a = array; 1051 if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) { 1052 a[(cap - 1) & t] = item; // relaxed writes OK 1053 tail = t + 1; 1054 stat = size; 1055 } 1056 else 1057 stat = growAndAdd(a, item); 1058 return (stat > 0 && 1059 (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ? 1060 startOnOffer(stat) : stat; 1061 } 1062 1063 /** 1064 * Tries to create or expand buffer, then adds item if possible. 1065 */ 1066 private int growAndAdd(Object[] a, T item) { 1067 boolean alloc; 1068 int cap, stat; 1069 if ((ctl & (ERROR | DISABLED)) != 0) { 1070 cap = 0; 1071 stat = -1; 1072 alloc = false; 1073 } 1074 else if (a == null || (cap = a.length) <= 0) { 1075 cap = 0; 1076 stat = 1; 1077 alloc = true; 1078 } 1079 else { 1080 U.fullFence(); // recheck 1081 int h = head, t = tail, size = t + 1 - h; 1082 if (cap >= size) { 1083 a[(cap - 1) & t] = item; 1084 tail = t + 1; 1085 stat = size; 1086 alloc = false; 1087 } 1088 else if (cap >= maxCapacity) { 1089 stat = 0; // cannot grow 1090 alloc = false; 1091 } 1092 else { 1093 stat = cap + 1; 1094 alloc = true; 1095 } 1096 } 1097 if (alloc) { 1098 int newCap = (cap > 0) ? cap << 1 : 1; 1099 if (newCap <= cap) 1100 stat = 0; 1101 else { 1102 Object[] newArray = null; 1103 try { 1104 newArray = new Object[newCap]; 1105 } catch (Throwable ex) { // try to cope with OOME 1106 } 1107 if (newArray == null) { 1108 if (cap > 0) 1109 maxCapacity = cap; // avoid continuous failure 1110 stat = 0; 1111 } 1112 else { 1113 array = newArray; 1114 int t = tail; 1115 int newMask = newCap - 1; 1116 if (a != null && cap > 0) { 1117 int mask = cap - 1; 1118 for (int j = head; j != t; ++j) { 1119 long k = ((long)(j & mask) << ASHIFT) + ABASE; 1120 Object x = U.getObjectVolatile(a, k); 1121 if (x != null && // races with consumer 1122 U.compareAndSwapObject(a, k, x, null)) 1123 newArray[j & newMask] = x; 1124 } 1125 } 1126 newArray[t & newMask] = item; 1127 tail = t + 1; 1128 } 1129 } 1130 } 1131 return stat; 1132 } 1133 1134 /** 1135 * Spins/helps/blocks while offer returns 0. Called only if 1136 * initial offer return 0. 1137 */ 1138 final int submit(T item) { 1139 int stat; Executor e; ForkJoinWorkerThread w; 1140 if ((stat = offer(item)) == 0 && helpDepth == 0 && 1141 ((e = executor) instanceof ForkJoinPool)) { 1142 helpDepth = 1; 1143 Thread thread = Thread.currentThread(); 1144 if ((thread instanceof ForkJoinWorkerThread) && 1145 ((w = (ForkJoinWorkerThread)thread)).getPool() == e) 1146 stat = internalHelpConsume(w.workQueue, item); 1147 else if (e == ForkJoinPool.commonPool()) 1148 stat = externalHelpConsume 1149 (ForkJoinPool.commonSubmitterQueue(), item); 1150 helpDepth = 0; 1151 } 1152 if (stat == 0 && (stat = offer(item)) == 0) { 1153 putItem = item; 1154 timeout = 0L; 1155 try { 1156 ForkJoinPool.managedBlock(this); 1157 } catch (InterruptedException ie) { 1158 timeout = INTERRUPTED; 1159 } 1160 stat = putStat; 1161 if (timeout < 0L) 1162 Thread.currentThread().interrupt(); 1163 } 1164 return stat; 1165 } 1166 1167 /** 1168 * Tries helping for FJ submitter. 1169 */ 1170 private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) { 1171 int stat = 0; 1172 if (w != null) { 1173 ForkJoinTask<?> t; 1174 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) { 1175 if ((stat = offer(item)) != 0 || !w.tryUnpush(t)) 1176 break; 1177 ((ConsumerTask<?>)t).consumer.consume(); 1178 } 1179 } 1180 return stat; 1181 } 1182 1183 /** 1184 * Tries helping for non-FJ submitter. 1185 */ 1186 private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) { 1187 int stat = 0; 1188 if (w != null) { 1189 ForkJoinTask<?> t; 1190 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) { 1191 if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t)) 1192 break; 1193 ((ConsumerTask<?>)t).consumer.consume(); 1194 } 1195 } 1196 return stat; 1197 } 1198 1199 /** 1200 * Timeout version; similar to submit. 1201 */ 1202 final int timedOffer(T item, long nanos) { 1203 int stat; Executor e; 1204 if ((stat = offer(item)) == 0 && helpDepth == 0 && 1205 ((e = executor) instanceof ForkJoinPool)) { 1206 Thread thread = Thread.currentThread(); 1207 if (((thread instanceof ForkJoinWorkerThread) && 1208 ((ForkJoinWorkerThread)thread).getPool() == e) || 1209 e == ForkJoinPool.commonPool()) { 1210 helpDepth = 1; 1211 ForkJoinTask<?> t; 1212 long deadline = System.nanoTime() + nanos; 1213 while ((t = ForkJoinTask.peekNextLocalTask()) != null && 1214 (t instanceof ConsumerTask)) { 1215 if ((stat = offer(item)) != 0 || 1216 (nanos = deadline - System.nanoTime()) <= 0L || 1217 !t.tryUnfork()) 1218 break; 1219 ((ConsumerTask<?>)t).consumer.consume(); 1220 } 1221 helpDepth = 0; 1222 } 1223 } 1224 if (stat == 0 && (stat = offer(item)) == 0 && 1225 (timeout = nanos) > 0L) { 1226 putItem = item; 1227 try { 1228 ForkJoinPool.managedBlock(this); 1229 } catch (InterruptedException ie) { 1230 timeout = INTERRUPTED; 1231 } 1232 stat = putStat; 1233 if (timeout < 0L) 1234 Thread.currentThread().interrupt(); 1235 } 1236 return stat; 1237 } 1238 1239 /** 1240 * Tries to start consumer task after offer. 1241 * @return -1 if now disabled, else argument 1242 */ 1243 private int startOnOffer(int stat) { 1244 for (;;) { 1245 Executor e; int c; 1246 if ((c = ctl) == DISABLED || (e = executor) == null) { 1247 stat = -1; 1248 break; 1249 } 1250 else if ((c & ACTIVE) != 0) { // ensure keep-alive 1251 if ((c & CONSUME) != 0 || 1252 U.compareAndSwapInt(this, CTL, c, 1253 c | CONSUME)) 1254 break; 1255 } 1256 else if (demand == 0L || tail == head) 1257 break; 1258 else if (U.compareAndSwapInt(this, CTL, c, 1259 c | (ACTIVE | CONSUME))) { 1260 try { 1261 e.execute(new ConsumerTask<T>(this)); 1262 break; 1263 } catch (RuntimeException | Error ex) { // back out 1264 do {} while (((c = ctl) & DISABLED) == 0 && 1265 (c & ACTIVE) != 0 && 1266 !U.compareAndSwapInt(this, CTL, c, 1267 c & ~ACTIVE)); 1268 throw ex; 1269 } 1270 } 1271 } 1272 return stat; 1273 } 1274 1275 private void signalWaiter(Thread w) { 1276 waiter = null; 1277 LockSupport.unpark(w); // release producer 1278 } 1279 1280 /** 1281 * Nulls out most fields, mainly to avoid garbage retention 1282 * until publisher unsubscribes, but also to help cleanly stop 1283 * upon error by nulling required components. 1284 */ 1285 private void detach() { 1286 Thread w = waiter; 1287 executor = null; 1288 subscriber = null; 1289 pendingError = null; 1290 signalWaiter(w); 1291 } 1292 1293 /** 1294 * Issues error signal, asynchronously if a task is running, 1295 * else synchronously. 1296 */ 1297 final void onError(Throwable ex) { 1298 for (int c;;) { 1299 if (((c = ctl) & (ERROR | DISABLED)) != 0) 1300 break; 1301 else if ((c & ACTIVE) != 0) { 1302 pendingError = ex; 1303 if (U.compareAndSwapInt(this, CTL, c, c | ERROR)) 1304 break; // cause consumer task to exit 1305 } 1306 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) { 1307 Flow.Subscriber<? super T> s = subscriber; 1308 if (s != null && ex != null) { 1309 try { 1310 s.onError(ex); 1311 } catch (Throwable ignore) { 1312 } 1313 } 1314 detach(); 1315 break; 1316 } 1317 } 1318 } 1319 1320 /** 1321 * Tries to start consumer task upon a signal or request; 1322 * disables on failure. 1323 */ 1324 private void startOrDisable() { 1325 Executor e; 1326 if ((e = executor) != null) { // skip if already disabled 1327 try { 1328 e.execute(new ConsumerTask<T>(this)); 1329 } catch (Throwable ex) { // back out and force signal 1330 for (int c;;) { 1331 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0) 1332 break; 1333 if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) { 1334 onError(ex); 1335 break; 1336 } 1337 } 1338 } 1339 } 1340 } 1341 1342 final void onComplete() { 1343 for (int c;;) { 1344 if ((c = ctl) == DISABLED) 1345 break; 1346 if (U.compareAndSwapInt(this, CTL, c, 1347 c | (ACTIVE | CONSUME | COMPLETE))) { 1348 if ((c & ACTIVE) == 0) 1349 startOrDisable(); 1350 break; 1351 } 1352 } 1353 } 1354 1355 final void onSubscribe() { 1356 for (int c;;) { 1357 if ((c = ctl) == DISABLED) 1358 break; 1359 if (U.compareAndSwapInt(this, CTL, c, 1360 c | (ACTIVE | CONSUME | SUBSCRIBE))) { 1361 if ((c & ACTIVE) == 0) 1362 startOrDisable(); 1363 break; 1364 } 1365 } 1366 } 1367 1368 /** 1369 * Causes consumer task to exit if active (without reporting 1370 * onError unless there is already a pending error), and 1371 * disables. 1372 */ 1373 public void cancel() { 1374 for (int c;;) { 1375 if ((c = ctl) == DISABLED) 1376 break; 1377 else if ((c & ACTIVE) != 0) { 1378 if (U.compareAndSwapInt(this, CTL, c, 1379 c | (CONSUME | ERROR))) 1380 break; 1381 } 1382 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) { 1383 detach(); 1384 break; 1385 } 1386 } 1387 } 1388 1389 /** 1390 * Adds to demand and possibly starts task. 1391 */ 1392 public void request(long n) { 1393 if (n > 0L) { 1394 for (;;) { 1395 long prev = demand, d; 1396 if ((d = prev + n) < prev) // saturate 1397 d = Long.MAX_VALUE; 1398 if (U.compareAndSwapLong(this, DEMAND, prev, d)) { 1399 for (int c, h;;) { 1400 if ((c = ctl) == DISABLED) 1401 break; 1402 else if ((c & ACTIVE) != 0) { 1403 if ((c & CONSUME) != 0 || 1404 U.compareAndSwapInt(this, CTL, c, 1405 c | CONSUME)) 1406 break; 1407 } 1408 else if ((h = head) != tail) { 1409 if (U.compareAndSwapInt(this, CTL, c, 1410 c | (ACTIVE|CONSUME))) { 1411 startOrDisable(); 1412 break; 1413 } 1414 } 1415 else if (head == h && tail == h) 1416 break; // else stale 1417 if (demand == 0L) 1418 break; 1419 } 1420 break; 1421 } 1422 } 1423 } 1424 else if (n < 0L) 1425 onError(new IllegalArgumentException( 1426 "negative subscription request")); 1427 } 1428 1429 public final boolean isReleasable() { // for ManagedBlocker 1430 T item = putItem; 1431 if (item != null) { 1432 if ((putStat = offer(item)) == 0) 1433 return false; 1434 putItem = null; 1435 } 1436 return true; 1437 } 1438 1439 public final boolean block() { // for ManagedBlocker 1440 T item = putItem; 1441 if (item != null) { 1442 putItem = null; 1443 long nanos = timeout; 1444 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L; 1445 while ((putStat = offer(item)) == 0) { 1446 if (Thread.interrupted()) { 1447 timeout = INTERRUPTED; 1448 if (nanos > 0L) 1449 break; 1450 } 1451 else if (nanos > 0L && 1452 (nanos = deadline - System.nanoTime()) <= 0L) 1453 break; 1454 else if (waiter == null) 1455 waiter = Thread.currentThread(); 1456 else { 1457 if (nanos > 0L) 1458 LockSupport.parkNanos(this, nanos); 1459 else 1460 LockSupport.park(this); 1461 waiter = null; 1462 } 1463 } 1464 } 1465 waiter = null; 1466 return true; 1467 } 1468 1469 /** 1470 * Consumer loop, called from ConsumerTask, or indirectly 1471 * when helping during submit. 1472 */ 1473 final void consume() { 1474 Flow.Subscriber<? super T> s; 1475 int h = head; 1476 if ((s = subscriber) != null) { // else disabled 1477 for (;;) { 1478 long d = demand; 1479 int c; Object[] a; int n; long i; Object x; Thread w; 1480 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) { 1481 if (!checkControl(s, c)) 1482 break; 1483 } 1484 else if ((a = array) == null || h == tail || 1485 (n = a.length) == 0 || 1486 (x = U.getObjectVolatile 1487 (a, (i = ((long)((n - 1) & h) << ASHIFT) + ABASE))) 1488 == null) { 1489 if (!checkEmpty(s, c)) 1490 break; 1491 } 1492 else if (d == 0L) { 1493 if (!checkDemand(c)) 1494 break; 1495 } 1496 else if (((c & CONSUME) != 0 || 1497 U.compareAndSwapInt(this, CTL, c, c | CONSUME)) && 1498 U.compareAndSwapObject(a, i, x, null)) { 1499 U.putOrderedInt(this, HEAD, ++h); 1500 U.getAndAddLong(this, DEMAND, -1L); 1501 if ((w = waiter) != null) 1502 signalWaiter(w); 1503 try { 1504 @SuppressWarnings("unchecked") T y = (T) x; 1505 s.onNext(y); 1506 } catch (Throwable ex) { 1507 handleOnNext(s, ex); 1508 } 1509 } 1510 } 1511 } 1512 } 1513 1514 /** 1515 * Responds to control events in consume(). 1516 */ 1517 private boolean checkControl(Flow.Subscriber<? super T> s, int c) { 1518 boolean stat = true; 1519 if ((c & ERROR) != 0) { 1520 Throwable ex = pendingError; 1521 ctl = DISABLED; // no need for CAS 1522 if (ex != null) { // null if errorless cancel 1523 try { 1524 if (s != null) 1525 s.onError(ex); 1526 } catch (Throwable ignore) { 1527 } 1528 } 1529 } 1530 else if ((c & SUBSCRIBE) != 0) { 1531 if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) { 1532 try { 1533 if (s != null) 1534 s.onSubscribe(this); 1535 } catch (Throwable ex) { 1536 onError(ex); 1537 } 1538 } 1539 } 1540 else { 1541 detach(); 1542 stat = false; 1543 } 1544 return stat; 1545 } 1546 1547 /** 1548 * Responds to apparent emptiness in consume(). 1549 */ 1550 private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) { 1551 boolean stat = true; 1552 if (head == tail) { 1553 if ((c & CONSUME) != 0) 1554 U.compareAndSwapInt(this, CTL, c, c & ~CONSUME); 1555 else if ((c & COMPLETE) != 0) { 1556 if (U.compareAndSwapInt(this, CTL, c, DISABLED)) { 1557 try { 1558 if (s != null) 1559 s.onComplete(); 1560 } catch (Throwable ignore) { 1561 } 1562 } 1563 } 1564 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) 1565 stat = false; 1566 } 1567 return stat; 1568 } 1569 1570 /** 1571 * Responds to apparent zero demand in consume(). 1572 */ 1573 private boolean checkDemand(int c) { 1574 boolean stat = true; 1575 if (demand == 0L) { 1576 if ((c & CONSUME) != 0) 1577 U.compareAndSwapInt(this, CTL, c, c & ~CONSUME); 1578 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) 1579 stat = false; 1580 } 1581 return stat; 1582 } 1583 1584 /** 1585 * Processes exception in Subscriber.onNext. 1586 */ 1587 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) { 1588 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h; 1589 if ((h = onNextHandler) != null) { 1590 try { 1591 h.accept(s, ex); 1592 } catch (Throwable ignore) { 1593 } 1594 } 1595 onError(ex); 1596 } 1597 1598 // Unsafe mechanics 1599 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 1600 private static final long CTL; 1601 private static final long TAIL; 1602 private static final long HEAD; 1603 private static final long DEMAND; 1604 private static final int ABASE; 1605 private static final int ASHIFT; 1606 1607 static { 1608 try { 1609 CTL = U.objectFieldOffset 1610 (BufferedSubscription.class.getDeclaredField("ctl")); 1611 TAIL = U.objectFieldOffset 1612 (BufferedSubscription.class.getDeclaredField("tail")); 1613 HEAD = U.objectFieldOffset 1614 (BufferedSubscription.class.getDeclaredField("head")); 1615 DEMAND = U.objectFieldOffset 1616 (BufferedSubscription.class.getDeclaredField("demand")); 1617 1618 ABASE = U.arrayBaseOffset(Object[].class); 1619 int scale = U.arrayIndexScale(Object[].class); 1620 if ((scale & (scale - 1)) != 0) 1621 throw new Error("data type scale not a power of two"); 1622 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); 1623 } catch (ReflectiveOperationException e) { 1624 throw new Error(e); 1625 } 1626 1627 // Reduce the risk of rare disastrous classloading in first call to 1628 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 1629 Class<?> ensureLoaded = LockSupport.class; 1630 } 1631 } 1632 }