1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.invoke.MethodHandles;
  39 import java.lang.invoke.VarHandle;
  40 import java.util.ArrayList;
  41 import java.util.Arrays;
  42 import java.util.List;
  43 import java.util.concurrent.locks.LockSupport;
  44 import java.util.concurrent.locks.ReentrantLock;
  45 import java.util.function.BiConsumer;
  46 import java.util.function.BiPredicate;
  47 import java.util.function.Consumer;
  48 import static java.util.concurrent.Flow.Publisher;
  49 import static java.util.concurrent.Flow.Subscriber;
  50 import static java.util.concurrent.Flow.Subscription;
  51 
  52 /**
  53  * A {@link Flow.Publisher} that asynchronously issues submitted
  54  * (non-null) items to current subscribers until it is closed.  Each
  55  * current subscriber receives newly submitted items in the same order
  56  * unless drops or exceptions are encountered.  Using a
  57  * SubmissionPublisher allows item generators to act as compliant <a
  58  * href="http://www.reactive-streams.org/"> reactive-streams</a>
  59  * Publishers relying on drop handling and/or blocking for flow
  60  * control.
  61  *
  62  * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
  63  * constructor for delivery to subscribers. The best choice of
  64  * Executor depends on expected usage. If the generator(s) of
  65  * submitted items run in separate threads, and the number of
  66  * subscribers can be estimated, consider using a {@link
  67  * Executors#newFixedThreadPool}. Otherwise consider using the
  68  * default, normally the {@link ForkJoinPool#commonPool}.
  69  *
  70  * <p>Buffering allows producers and consumers to transiently operate
  71  * at different rates.  Each subscriber uses an independent buffer.
  72  * Buffers are created upon first use and expanded as needed up to the
  73  * given maximum. (The enforced capacity may be rounded up to the
  74  * nearest power of two and/or bounded by the largest value supported
  75  * by this implementation.)  Invocations of {@link
  76  * Flow.Subscription#request(long) request} do not directly result in
  77  * buffer expansion, but risk saturation if unfilled requests exceed
  78  * the maximum capacity.  The default value of {@link
  79  * Flow#defaultBufferSize()} may provide a useful starting point for
  80  * choosing a capacity based on expected rates, resources, and usages.
  81  *
  82  * <p>A single SubmissionPublisher may be shared among multiple
  83  * sources. Actions in a source thread prior to publishing an item or
  84  * issuing a signal <a href="package-summary.html#MemoryVisibility">
  85  * <i>happen-before</i></a> actions subsequent to the corresponding
  86  * access by each subscriber. But reported estimates of lag and demand
  87  * are designed for use in monitoring, not for synchronization
  88  * control, and may reflect stale or inaccurate views of progress.
  89  *
  90  * <p>Publication methods support different policies about what to do
  91  * when buffers are saturated. Method {@link #submit(Object) submit}
  92  * blocks until resources are available. This is simplest, but least
  93  * responsive.  The {@code offer} methods may drop items (either
  94  * immediately or with bounded timeout), but provide an opportunity to
  95  * interpose a handler and then retry.
  96  *
  97  * <p>If any Subscriber method throws an exception, its subscription
  98  * is cancelled.  If a handler is supplied as a constructor argument,
  99  * it is invoked before cancellation upon an exception in method
 100  * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
 101  * {@link Flow.Subscriber#onSubscribe onSubscribe},
 102  * {@link Flow.Subscriber#onError(Throwable) onError} and
 103  * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
 104  * handled before cancellation.  If the supplied Executor throws
 105  * {@link RejectedExecutionException} (or any other RuntimeException
 106  * or Error) when attempting to execute a task, or a drop handler
 107  * throws an exception when processing a dropped item, then the
 108  * exception is rethrown. In these cases, not all subscribers will
 109  * have been issued the published item. It is usually good practice to
 110  * {@link #closeExceptionally closeExceptionally} in these cases.
 111  *
 112  * <p>Method {@link #consume(Consumer)} simplifies support for a
 113  * common case in which the only action of a subscriber is to request
 114  * and process all items using a supplied function.
 115  *
 116  * <p>This class may also serve as a convenient base for subclasses
 117  * that generate items, and use the methods in this class to publish
 118  * them.  For example here is a class that periodically publishes the
 119  * items generated from a supplier. (In practice you might add methods
 120  * to independently start and stop generation, to share Executors
 121  * among publishers, and so on, or use a SubmissionPublisher as a
 122  * component rather than a superclass.)
 123  *
 124  * <pre> {@code
 125  * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
 126  *   final ScheduledFuture<?> periodicTask;
 127  *   final ScheduledExecutorService scheduler;
 128  *   PeriodicPublisher(Executor executor, int maxBufferCapacity,
 129  *                     Supplier<? extends T> supplier,
 130  *                     long period, TimeUnit unit) {
 131  *     super(executor, maxBufferCapacity);
 132  *     scheduler = new ScheduledThreadPoolExecutor(1);
 133  *     periodicTask = scheduler.scheduleAtFixedRate(
 134  *       () -> submit(supplier.get()), 0, period, unit);
 135  *   }
 136  *   public void close() {
 137  *     periodicTask.cancel(false);
 138  *     scheduler.shutdown();
 139  *     super.close();
 140  *   }
 141  * }}</pre>
 142  *
 143  * <p>Here is an example of a {@link Flow.Processor} implementation.
 144  * It uses single-step requests to its publisher for simplicity of
 145  * illustration. A more adaptive version could monitor flow using the
 146  * lag estimate returned from {@code submit}, along with other utility
 147  * methods.
 148  *
 149  * <pre> {@code
 150  * class TransformProcessor<S,T> extends SubmissionPublisher<T>
 151  *   implements Flow.Processor<S,T> {
 152  *   final Function<? super S, ? extends T> function;
 153  *   Flow.Subscription subscription;
 154  *   TransformProcessor(Executor executor, int maxBufferCapacity,
 155  *                      Function<? super S, ? extends T> function) {
 156  *     super(executor, maxBufferCapacity);
 157  *     this.function = function;
 158  *   }
 159  *   public void onSubscribe(Flow.Subscription subscription) {
 160  *     (this.subscription = subscription).request(1);
 161  *   }
 162  *   public void onNext(S item) {
 163  *     subscription.request(1);
 164  *     submit(function.apply(item));
 165  *   }
 166  *   public void onError(Throwable ex) { closeExceptionally(ex); }
 167  *   public void onComplete() { close(); }
 168  * }}</pre>
 169  *
 170  * @param <T> the published item type
 171  * @author Doug Lea
 172  * @since 9
 173  */
 174 public class SubmissionPublisher<T> implements Publisher<T>,
 175                                                AutoCloseable {
 176     /*
 177      * Most mechanics are handled by BufferedSubscription. This class
 178      * mainly tracks subscribers and ensures sequentiality, by using
 179      * locks across public methods, to ensure thread-safety in the
 180      * presence of multiple sources and maintain acquire-release
 181      * ordering around user operations. However, we also track whether
 182      * there is only a single source, and if so streamline some buffer
 183      * operations by avoiding some atomics.
 184      */
 185 
 186     /** The largest possible power of two array size. */
 187     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
 188 
 189     /**
 190      * Initial buffer capacity used when maxBufferCapacity is
 191      * greater. Must be a power of two.
 192      */
 193     static final int INITIAL_CAPACITY = 32;
 194 
 195     /** Round capacity to power of 2, at most limit. */
 196     static final int roundCapacity(int cap) {
 197         int n = cap - 1;
 198         n |= n >>> 1;
 199         n |= n >>> 2;
 200         n |= n >>> 4;
 201         n |= n >>> 8;
 202         n |= n >>> 16;
 203         return (n <= 0) ? 1 : // at least 1
 204             (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
 205     }
 206 
 207     // default Executor setup; nearly the same as CompletableFuture
 208 
 209     /**
 210      * Default executor -- ForkJoinPool.commonPool() unless it cannot
 211      * support parallelism.
 212      */
 213     private static final Executor ASYNC_POOL =
 214         (ForkJoinPool.getCommonPoolParallelism() > 1) ?
 215         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
 216 
 217     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
 218     private static final class ThreadPerTaskExecutor implements Executor {
 219         ThreadPerTaskExecutor() {}      // prevent access constructor creation
 220         public void execute(Runnable r) { new Thread(r).start(); }
 221     }
 222 
 223     /**
 224      * Clients (BufferedSubscriptions) are maintained in a linked list
 225      * (via their "next" fields). This works well for publish loops.
 226      * It requires O(n) traversal to check for duplicate subscribers,
 227      * but we expect that subscribing is much less common than
 228      * publishing. Unsubscribing occurs only during traversal loops,
 229      * when BufferedSubscription methods return negative values
 230      * signifying that they have been closed.  To reduce
 231      * head-of-line blocking, submit and offer methods first call
 232      * BufferedSubscription.offer on each subscriber, and place
 233      * saturated ones in retries list (using nextRetry field), and
 234      * retry, possibly blocking or dropping.
 235      */
 236     BufferedSubscription<T> clients;
 237 
 238     /** Lock for exclusion across multiple sources */
 239     final ReentrantLock lock;
 240     /** Run status, updated only within locks */
 241     volatile boolean closed;
 242     /** Set true on first call to subscribe, to initialize possible owner */
 243     boolean subscribed;
 244     /** The first caller thread to subscribe, or null if thread ever changed */
 245     Thread owner;
 246     /** If non-null, the exception in closeExceptionally */
 247     volatile Throwable closedException;
 248 
 249     // Parameters for constructing BufferedSubscriptions
 250     final Executor executor;
 251     final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
 252     final int maxBufferCapacity;
 253 
 254     /**
 255      * Creates a new SubmissionPublisher using the given Executor for
 256      * async delivery to subscribers, with the given maximum buffer size
 257      * for each subscriber, and, if non-null, the given handler invoked
 258      * when any Subscriber throws an exception in method {@link
 259      * Flow.Subscriber#onNext(Object) onNext}.
 260      *
 261      * @param executor the executor to use for async delivery,
 262      * supporting creation of at least one independent thread
 263      * @param maxBufferCapacity the maximum capacity for each
 264      * subscriber's buffer (the enforced capacity may be rounded up to
 265      * the nearest power of two and/or bounded by the largest value
 266      * supported by this implementation; method {@link #getMaxBufferCapacity}
 267      * returns the actual value)
 268      * @param handler if non-null, procedure to invoke upon exception
 269      * thrown in method {@code onNext}
 270      * @throws NullPointerException if executor is null
 271      * @throws IllegalArgumentException if maxBufferCapacity not
 272      * positive
 273      */
 274     public SubmissionPublisher(Executor executor, int maxBufferCapacity,
 275                                BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
 276         if (executor == null)
 277             throw new NullPointerException();
 278         if (maxBufferCapacity <= 0)
 279             throw new IllegalArgumentException("capacity must be positive");
 280         this.lock = new ReentrantLock();
 281         this.executor = executor;
 282         this.onNextHandler = handler;
 283         this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
 284     }
 285 
 286     /**
 287      * Creates a new SubmissionPublisher using the given Executor for
 288      * async delivery to subscribers, with the given maximum buffer size
 289      * for each subscriber, and no handler for Subscriber exceptions in
 290      * method {@link Flow.Subscriber#onNext(Object) onNext}.
 291      *
 292      * @param executor the executor to use for async delivery,
 293      * supporting creation of at least one independent thread
 294      * @param maxBufferCapacity the maximum capacity for each
 295      * subscriber's buffer (the enforced capacity may be rounded up to
 296      * the nearest power of two and/or bounded by the largest value
 297      * supported by this implementation; method {@link #getMaxBufferCapacity}
 298      * returns the actual value)
 299      * @throws NullPointerException if executor is null
 300      * @throws IllegalArgumentException if maxBufferCapacity not
 301      * positive
 302      */
 303     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
 304         this(executor, maxBufferCapacity, null);
 305     }
 306 
 307     /**
 308      * Creates a new SubmissionPublisher using the {@link
 309      * ForkJoinPool#commonPool()} for async delivery to subscribers
 310      * (unless it does not support a parallelism level of at least two,
 311      * in which case, a new Thread is created to run each task), with
 312      * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
 313      * handler for Subscriber exceptions in method {@link
 314      * Flow.Subscriber#onNext(Object) onNext}.
 315      */
 316     public SubmissionPublisher() {
 317         this(ASYNC_POOL, Flow.defaultBufferSize(), null);
 318     }
 319 
 320     /**
 321      * Adds the given Subscriber unless already subscribed.  If already
 322      * subscribed, the Subscriber's {@link
 323      * Flow.Subscriber#onError(Throwable) onError} method is invoked on
 324      * the existing subscription with an {@link IllegalStateException}.
 325      * Otherwise, upon success, the Subscriber's {@link
 326      * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
 327      * asynchronously with a new {@link Flow.Subscription}.  If {@link
 328      * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
 329      * subscription is cancelled. Otherwise, if this SubmissionPublisher
 330      * was closed exceptionally, then the subscriber's {@link
 331      * Flow.Subscriber#onError onError} method is invoked with the
 332      * corresponding exception, or if closed without exception, the
 333      * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
 334      * method is invoked.  Subscribers may enable receiving items by
 335      * invoking the {@link Flow.Subscription#request(long) request}
 336      * method of the new Subscription, and may unsubscribe by invoking
 337      * its {@link Flow.Subscription#cancel() cancel} method.
 338      *
 339      * @param subscriber the subscriber
 340      * @throws NullPointerException if subscriber is null
 341      */
 342     public void subscribe(Subscriber<? super T> subscriber) {
 343         if (subscriber == null) throw new NullPointerException();
 344         ReentrantLock lock = this.lock;
 345         int max = maxBufferCapacity; // allocate initial array
 346         Object[] array = new Object[max < INITIAL_CAPACITY ?
 347                                     max : INITIAL_CAPACITY];
 348         BufferedSubscription<T> subscription =
 349             new BufferedSubscription<T>(subscriber, executor, onNextHandler,
 350                                         array, max);
 351         lock.lock();
 352         try {
 353             if (!subscribed) {
 354                 subscribed = true;
 355                 owner = Thread.currentThread();
 356             }
 357             for (BufferedSubscription<T> b = clients, pred = null;;) {
 358                 if (b == null) {
 359                     Throwable ex;
 360                     subscription.onSubscribe();
 361                     if ((ex = closedException) != null)
 362                         subscription.onError(ex);
 363                     else if (closed)
 364                         subscription.onComplete();
 365                     else if (pred == null)
 366                         clients = subscription;
 367                     else
 368                         pred.next = subscription;
 369                     break;
 370                 }
 371                 BufferedSubscription<T> next = b.next;
 372                 if (b.isClosed()) {   // remove
 373                     b.next = null;    // detach
 374                     if (pred == null)
 375                         clients = next;
 376                     else
 377                         pred.next = next;
 378                 }
 379                 else if (subscriber.equals(b.subscriber)) {
 380                     b.onError(new IllegalStateException("Duplicate subscribe"));
 381                     break;
 382                 }
 383                 else
 384                     pred = b;
 385                 b = next;
 386             }
 387         } finally {
 388             lock.unlock();
 389         }
 390     }
 391 
 392     /**
 393      * Common implementation for all three forms of submit and offer.
 394      * Acts as submit if nanos == Long.MAX_VALUE, else offer.
 395      */
 396     private int doOffer(T item, long nanos,
 397                         BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
 398         if (item == null) throw new NullPointerException();
 399         int lag = 0;
 400         boolean complete, unowned;
 401         ReentrantLock lock = this.lock;
 402         lock.lock();
 403         try {
 404             Thread t = Thread.currentThread(), o;
 405             BufferedSubscription<T> b = clients;
 406             if ((unowned = ((o = owner) != t)) && o != null)
 407                 owner = null;                     // disable bias
 408             if (b == null)
 409                 complete = closed;
 410             else {
 411                 complete = false;
 412                 boolean cleanMe = false;
 413                 BufferedSubscription<T> retries = null, rtail = null, next;
 414                 do {
 415                     next = b.next;
 416                     int stat = b.offer(item, unowned);
 417                     if (stat == 0) {              // saturated; add to retry list
 418                         b.nextRetry = null;       // avoid garbage on exceptions
 419                         if (rtail == null)
 420                             retries = b;
 421                         else
 422                             rtail.nextRetry = b;
 423                         rtail = b;
 424                     }
 425                     else if (stat < 0)            // closed
 426                         cleanMe = true;           // remove later
 427                     else if (stat > lag)
 428                         lag = stat;
 429                 } while ((b = next) != null);
 430 
 431                 if (retries != null || cleanMe)
 432                     lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
 433             }
 434         } finally {
 435             lock.unlock();
 436         }
 437         if (complete)
 438             throw new IllegalStateException("Closed");
 439         else
 440             return lag;
 441     }
 442 
 443     /**
 444      * Helps, (timed) waits for, and/or drops buffers on list; returns
 445      * lag or negative drops (for use in offer).
 446      */
 447     private int retryOffer(T item, long nanos,
 448                            BiPredicate<Subscriber<? super T>, ? super T> onDrop,
 449                            BufferedSubscription<T> retries, int lag,
 450                            boolean cleanMe) {
 451         for (BufferedSubscription<T> r = retries; r != null;) {
 452             BufferedSubscription<T> nextRetry = r.nextRetry;
 453             r.nextRetry = null;
 454             if (nanos > 0L)
 455                 r.awaitSpace(nanos);
 456             int stat = r.retryOffer(item);
 457             if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
 458                 stat = r.retryOffer(item);
 459             if (stat == 0)
 460                 lag = (lag >= 0) ? -1 : lag - 1;
 461             else if (stat < 0)
 462                 cleanMe = true;
 463             else if (lag >= 0 && stat > lag)
 464                 lag = stat;
 465             r = nextRetry;
 466         }
 467         if (cleanMe)
 468             cleanAndCount();
 469         return lag;
 470     }
 471 
 472     /**
 473      * Returns current list count after removing closed subscribers.
 474      * Call only while holding lock.  Used mainly by retryOffer for
 475      * cleanup.
 476      */
 477     private int cleanAndCount() {
 478         int count = 0;
 479         BufferedSubscription<T> pred = null, next;
 480         for (BufferedSubscription<T> b = clients; b != null; b = next) {
 481             next = b.next;
 482             if (b.isClosed()) {
 483                 b.next = null;
 484                 if (pred == null)
 485                     clients = next;
 486                 else
 487                     pred.next = next;
 488             }
 489             else {
 490                 pred = b;
 491                 ++count;
 492             }
 493         }
 494         return count;
 495     }
 496 
 497     /**
 498      * Publishes the given item to each current subscriber by
 499      * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
 500      * onNext} method, blocking uninterruptibly while resources for any
 501      * subscriber are unavailable. This method returns an estimate of
 502      * the maximum lag (number of items submitted but not yet consumed)
 503      * among all current subscribers. This value is at least one
 504      * (accounting for this submitted item) if there are any
 505      * subscribers, else zero.
 506      *
 507      * <p>If the Executor for this publisher throws a
 508      * RejectedExecutionException (or any other RuntimeException or
 509      * Error) when attempting to asynchronously notify subscribers,
 510      * then this exception is rethrown, in which case not all
 511      * subscribers will have been issued this item.
 512      *
 513      * @param item the (non-null) item to publish
 514      * @return the estimated maximum lag among subscribers
 515      * @throws IllegalStateException if closed
 516      * @throws NullPointerException if item is null
 517      * @throws RejectedExecutionException if thrown by Executor
 518      */
 519     public int submit(T item) {
 520         return doOffer(item, Long.MAX_VALUE, null);
 521     }
 522 
 523     /**
 524      * Publishes the given item, if possible, to each current subscriber
 525      * by asynchronously invoking its {@link
 526      * Flow.Subscriber#onNext(Object) onNext} method. The item may be
 527      * dropped by one or more subscribers if resource limits are
 528      * exceeded, in which case the given handler (if non-null) is
 529      * invoked, and if it returns true, retried once.  Other calls to
 530      * methods in this class by other threads are blocked while the
 531      * handler is invoked.  Unless recovery is assured, options are
 532      * usually limited to logging the error and/or issuing an {@link
 533      * Flow.Subscriber#onError(Throwable) onError} signal to the
 534      * subscriber.
 535      *
 536      * <p>This method returns a status indicator: If negative, it
 537      * represents the (negative) number of drops (failed attempts to
 538      * issue the item to a subscriber). Otherwise it is an estimate of
 539      * the maximum lag (number of items submitted but not yet
 540      * consumed) among all current subscribers. This value is at least
 541      * one (accounting for this submitted item) if there are any
 542      * subscribers, else zero.
 543      *
 544      * <p>If the Executor for this publisher throws a
 545      * RejectedExecutionException (or any other RuntimeException or
 546      * Error) when attempting to asynchronously notify subscribers, or
 547      * the drop handler throws an exception when processing a dropped
 548      * item, then this exception is rethrown.
 549      *
 550      * @param item the (non-null) item to publish
 551      * @param onDrop if non-null, the handler invoked upon a drop to a
 552      * subscriber, with arguments of the subscriber and item; if it
 553      * returns true, an offer is re-attempted (once)
 554      * @return if negative, the (negative) number of drops; otherwise
 555      * an estimate of maximum lag
 556      * @throws IllegalStateException if closed
 557      * @throws NullPointerException if item is null
 558      * @throws RejectedExecutionException if thrown by Executor
 559      */
 560     public int offer(T item,
 561                      BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
 562         return doOffer(item, 0L, onDrop);
 563     }
 564 
 565     /**
 566      * Publishes the given item, if possible, to each current subscriber
 567      * by asynchronously invoking its {@link
 568      * Flow.Subscriber#onNext(Object) onNext} method, blocking while
 569      * resources for any subscription are unavailable, up to the
 570      * specified timeout or until the caller thread is interrupted, at
 571      * which point the given handler (if non-null) is invoked, and if it
 572      * returns true, retried once. (The drop handler may distinguish
 573      * timeouts from interrupts by checking whether the current thread
 574      * is interrupted.)  Other calls to methods in this class by other
 575      * threads are blocked while the handler is invoked.  Unless
 576      * recovery is assured, options are usually limited to logging the
 577      * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
 578      * onError} signal to the subscriber.
 579      *
 580      * <p>This method returns a status indicator: If negative, it
 581      * represents the (negative) number of drops (failed attempts to
 582      * issue the item to a subscriber). Otherwise it is an estimate of
 583      * the maximum lag (number of items submitted but not yet
 584      * consumed) among all current subscribers. This value is at least
 585      * one (accounting for this submitted item) if there are any
 586      * subscribers, else zero.
 587      *
 588      * <p>If the Executor for this publisher throws a
 589      * RejectedExecutionException (or any other RuntimeException or
 590      * Error) when attempting to asynchronously notify subscribers, or
 591      * the drop handler throws an exception when processing a dropped
 592      * item, then this exception is rethrown.
 593      *
 594      * @param item the (non-null) item to publish
 595      * @param timeout how long to wait for resources for any subscriber
 596      * before giving up, in units of {@code unit}
 597      * @param unit a {@code TimeUnit} determining how to interpret the
 598      * {@code timeout} parameter
 599      * @param onDrop if non-null, the handler invoked upon a drop to a
 600      * subscriber, with arguments of the subscriber and item; if it
 601      * returns true, an offer is re-attempted (once)
 602      * @return if negative, the (negative) number of drops; otherwise
 603      * an estimate of maximum lag
 604      * @throws IllegalStateException if closed
 605      * @throws NullPointerException if item is null
 606      * @throws RejectedExecutionException if thrown by Executor
 607      */
 608     public int offer(T item, long timeout, TimeUnit unit,
 609                      BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
 610         long nanos = unit.toNanos(timeout);
 611         // distinguishes from untimed (only wrt interrupt policy)
 612         if (nanos == Long.MAX_VALUE) --nanos;
 613         return doOffer(item, nanos, onDrop);
 614     }
 615 
 616     /**
 617      * Unless already closed, issues {@link
 618      * Flow.Subscriber#onComplete() onComplete} signals to current
 619      * subscribers, and disallows subsequent attempts to publish.
 620      * Upon return, this method does <em>NOT</em> guarantee that all
 621      * subscribers have yet completed.
 622      */
 623     public void close() {
 624         ReentrantLock lock = this.lock;
 625         if (!closed) {
 626             BufferedSubscription<T> b;
 627             lock.lock();
 628             try {
 629                 // no need to re-check closed here
 630                 b = clients;
 631                 clients = null;
 632                 owner = null;
 633                 closed = true;
 634             } finally {
 635                 lock.unlock();
 636             }
 637             while (b != null) {
 638                 BufferedSubscription<T> next = b.next;
 639                 b.next = null;
 640                 b.onComplete();
 641                 b = next;
 642             }
 643         }
 644     }
 645 
 646     /**
 647      * Unless already closed, issues {@link
 648      * Flow.Subscriber#onError(Throwable) onError} signals to current
 649      * subscribers with the given error, and disallows subsequent
 650      * attempts to publish.  Future subscribers also receive the given
 651      * error. Upon return, this method does <em>NOT</em> guarantee
 652      * that all subscribers have yet completed.
 653      *
 654      * @param error the {@code onError} argument sent to subscribers
 655      * @throws NullPointerException if error is null
 656      */
 657     public void closeExceptionally(Throwable error) {
 658         if (error == null)
 659             throw new NullPointerException();
 660         ReentrantLock lock = this.lock;
 661         if (!closed) {
 662             BufferedSubscription<T> b;
 663             lock.lock();
 664             try {
 665                 b = clients;
 666                 if (!closed) {  // don't clobber racing close
 667                     closedException = error;
 668                     clients = null;
 669                     owner = null;
 670                     closed = true;
 671                 }
 672             } finally {
 673                 lock.unlock();
 674             }
 675             while (b != null) {
 676                 BufferedSubscription<T> next = b.next;
 677                 b.next = null;
 678                 b.onError(error);
 679                 b = next;
 680             }
 681         }
 682     }
 683 
 684     /**
 685      * Returns true if this publisher is not accepting submissions.
 686      *
 687      * @return true if closed
 688      */
 689     public boolean isClosed() {
 690         return closed;
 691     }
 692 
 693     /**
 694      * Returns the exception associated with {@link
 695      * #closeExceptionally(Throwable) closeExceptionally}, or null if
 696      * not closed or if closed normally.
 697      *
 698      * @return the exception, or null if none
 699      */
 700     public Throwable getClosedException() {
 701         return closedException;
 702     }
 703 
 704     /**
 705      * Returns true if this publisher has any subscribers.
 706      *
 707      * @return true if this publisher has any subscribers
 708      */
 709     public boolean hasSubscribers() {
 710         boolean nonEmpty = false;
 711         ReentrantLock lock = this.lock;
 712         lock.lock();
 713         try {
 714             for (BufferedSubscription<T> b = clients; b != null;) {
 715                 BufferedSubscription<T> next = b.next;
 716                 if (b.isClosed()) {
 717                     b.next = null;
 718                     b = clients = next;
 719                 }
 720                 else {
 721                     nonEmpty = true;
 722                     break;
 723                 }
 724             }
 725         } finally {
 726             lock.unlock();
 727         }
 728         return nonEmpty;
 729     }
 730 
 731     /**
 732      * Returns the number of current subscribers.
 733      *
 734      * @return the number of current subscribers
 735      */
 736     public int getNumberOfSubscribers() {
 737         int n;
 738         ReentrantLock lock = this.lock;
 739         lock.lock();
 740         try {
 741             n = cleanAndCount();
 742         } finally {
 743             lock.unlock();
 744         }
 745         return n;
 746     }
 747 
 748     /**
 749      * Returns the Executor used for asynchronous delivery.
 750      *
 751      * @return the Executor used for asynchronous delivery
 752      */
 753     public Executor getExecutor() {
 754         return executor;
 755     }
 756 
 757     /**
 758      * Returns the maximum per-subscriber buffer capacity.
 759      *
 760      * @return the maximum per-subscriber buffer capacity
 761      */
 762     public int getMaxBufferCapacity() {
 763         return maxBufferCapacity;
 764     }
 765 
 766     /**
 767      * Returns a list of current subscribers for monitoring and
 768      * tracking purposes, not for invoking {@link Flow.Subscriber}
 769      * methods on the subscribers.
 770      *
 771      * @return list of current subscribers
 772      */
 773     public List<Subscriber<? super T>> getSubscribers() {
 774         ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
 775         ReentrantLock lock = this.lock;
 776         lock.lock();
 777         try {
 778             BufferedSubscription<T> pred = null, next;
 779             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 780                 next = b.next;
 781                 if (b.isClosed()) {
 782                     b.next = null;
 783                     if (pred == null)
 784                         clients = next;
 785                     else
 786                         pred.next = next;
 787                 }
 788                 else {
 789                     subs.add(b.subscriber);
 790                     pred = b;
 791                 }
 792             }
 793         } finally {
 794             lock.unlock();
 795         }
 796         return subs;
 797     }
 798 
 799     /**
 800      * Returns true if the given Subscriber is currently subscribed.
 801      *
 802      * @param subscriber the subscriber
 803      * @return true if currently subscribed
 804      * @throws NullPointerException if subscriber is null
 805      */
 806     public boolean isSubscribed(Subscriber<? super T> subscriber) {
 807         if (subscriber == null) throw new NullPointerException();
 808         boolean subscribed = false;
 809         ReentrantLock lock = this.lock;
 810         if (!closed) {
 811             lock.lock();
 812             try {
 813                 BufferedSubscription<T> pred = null, next;
 814                 for (BufferedSubscription<T> b = clients; b != null; b = next) {
 815                     next = b.next;
 816                     if (b.isClosed()) {
 817                         b.next = null;
 818                         if (pred == null)
 819                             clients = next;
 820                         else
 821                             pred.next = next;
 822                     }
 823                     else if (subscribed = subscriber.equals(b.subscriber))
 824                         break;
 825                     else
 826                         pred = b;
 827                 }
 828             } finally {
 829                 lock.unlock();
 830             }
 831         }
 832         return subscribed;
 833     }
 834 
 835     /**
 836      * Returns an estimate of the minimum number of items requested
 837      * (via {@link Flow.Subscription#request(long) request}) but not
 838      * yet produced, among all current subscribers.
 839      *
 840      * @return the estimate, or zero if no subscribers
 841      */
 842     public long estimateMinimumDemand() {
 843         long min = Long.MAX_VALUE;
 844         boolean nonEmpty = false;
 845         ReentrantLock lock = this.lock;
 846         lock.lock();
 847         try {
 848             BufferedSubscription<T> pred = null, next;
 849             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 850                 int n; long d;
 851                 next = b.next;
 852                 if ((n = b.estimateLag()) < 0) {
 853                     b.next = null;
 854                     if (pred == null)
 855                         clients = next;
 856                     else
 857                         pred.next = next;
 858                 }
 859                 else {
 860                     if ((d = b.demand - n) < min)
 861                         min = d;
 862                     nonEmpty = true;
 863                     pred = b;
 864                 }
 865             }
 866         } finally {
 867             lock.unlock();
 868         }
 869         return nonEmpty ? min : 0;
 870     }
 871 
 872     /**
 873      * Returns an estimate of the maximum number of items produced but
 874      * not yet consumed among all current subscribers.
 875      *
 876      * @return the estimate
 877      */
 878     public int estimateMaximumLag() {
 879         int max = 0;
 880         ReentrantLock lock = this.lock;
 881         lock.lock();
 882         try {
 883             BufferedSubscription<T> pred = null, next;
 884             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 885                 int n;
 886                 next = b.next;
 887                 if ((n = b.estimateLag()) < 0) {
 888                     b.next = null;
 889                     if (pred == null)
 890                         clients = next;
 891                     else
 892                         pred.next = next;
 893                 }
 894                 else {
 895                     if (n > max)
 896                         max = n;
 897                     pred = b;
 898                 }
 899             }
 900         } finally {
 901             lock.unlock();
 902         }
 903         return max;
 904     }
 905 
 906     /**
 907      * Processes all published items using the given Consumer function.
 908      * Returns a CompletableFuture that is completed normally when this
 909      * publisher signals {@link Flow.Subscriber#onComplete()
 910      * onComplete}, or completed exceptionally upon any error, or an
 911      * exception is thrown by the Consumer, or the returned
 912      * CompletableFuture is cancelled, in which case no further items
 913      * are processed.
 914      *
 915      * @param consumer the function applied to each onNext item
 916      * @return a CompletableFuture that is completed normally
 917      * when the publisher signals onComplete, and exceptionally
 918      * upon any error or cancellation
 919      * @throws NullPointerException if consumer is null
 920      */
 921     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
 922         if (consumer == null)
 923             throw new NullPointerException();
 924         CompletableFuture<Void> status = new CompletableFuture<>();
 925         subscribe(new ConsumerSubscriber<T>(status, consumer));
 926         return status;
 927     }
 928 
 929     /** Subscriber for method consume */
 930     static final class ConsumerSubscriber<T> implements Subscriber<T> {
 931         final CompletableFuture<Void> status;
 932         final Consumer<? super T> consumer;
 933         Subscription subscription;
 934         ConsumerSubscriber(CompletableFuture<Void> status,
 935                            Consumer<? super T> consumer) {
 936             this.status = status; this.consumer = consumer;
 937         }
 938         public final void onSubscribe(Subscription subscription) {
 939             this.subscription = subscription;
 940             status.whenComplete((v, e) -> subscription.cancel());
 941             if (!status.isDone())
 942                 subscription.request(Long.MAX_VALUE);
 943         }
 944         public final void onError(Throwable ex) {
 945             status.completeExceptionally(ex);
 946         }
 947         public final void onComplete() {
 948             status.complete(null);
 949         }
 950         public final void onNext(T item) {
 951             try {
 952                 consumer.accept(item);
 953             } catch (Throwable ex) {
 954                 subscription.cancel();
 955                 status.completeExceptionally(ex);
 956             }
 957         }
 958     }
 959 
 960     /**
 961      * A task for consuming buffer items and signals, created and
 962      * executed whenever they become available. A task consumes as
 963      * many items/signals as possible before terminating, at which
 964      * point another task is created when needed. The dual Runnable
 965      * and ForkJoinTask declaration saves overhead when executed by
 966      * ForkJoinPools, without impacting other kinds of Executors.
 967      */
 968     @SuppressWarnings("serial")
 969     static final class ConsumerTask<T> extends ForkJoinTask<Void>
 970         implements Runnable, CompletableFuture.AsynchronousCompletionTask {
 971         final BufferedSubscription<T> consumer;
 972         ConsumerTask(BufferedSubscription<T> consumer) {
 973             this.consumer = consumer;
 974         }
 975         public final Void getRawResult() { return null; }
 976         public final void setRawResult(Void v) {}
 977         public final boolean exec() { consumer.consume(); return false; }
 978         public final void run() { consumer.consume(); }
 979     }
 980 
 981     /**
 982      * A resizable array-based ring buffer with integrated control to
 983      * start a consumer task whenever items are available.  The buffer
 984      * algorithm is specialized for the case of at most one concurrent
 985      * producer and consumer, and power of two buffer sizes. It relies
 986      * primarily on atomic operations (CAS or getAndSet) at the next
 987      * array slot to put or take an element, at the "tail" and "head"
 988      * indices written only by the producer and consumer respectively.
 989      *
 990      * We ensure internally that there is at most one active consumer
 991      * task at any given time. The publisher guarantees a single
 992      * producer via its lock. Sync among producers and consumers
 993      * relies on volatile fields "ctl", "demand", and "waiting" (along
 994      * with element access). Other variables are accessed in plain
 995      * mode, relying on outer ordering and exclusion, and/or enclosing
 996      * them within other volatile accesses. Some atomic operations are
 997      * avoided by tracking single threaded ownership by producers (in
 998      * the style of biased locking).
 999      *
1000      * Execution control and protocol state are managed using field
1001      * "ctl".  Methods to subscribe, close, request, and cancel set
1002      * ctl bits (mostly using atomic boolean method getAndBitwiseOr),
1003      * and ensure that a task is running. (The corresponding consumer
1004      * side actions are in method consume.)  To avoid starting a new
1005      * task on each action, ctl also includes a keep-alive bit
1006      * (ACTIVE) that is refreshed if needed on producer actions.
1007      * (Maintaining agreement about keep-alives requires most atomic
1008      * updates to be full SC/Volatile strength, which is still much
1009      * cheaper than using one task per item.)  Error signals
1010      * additionally null out items and/or fields to reduce termination
1011      * latency.  The cancel() method is supported by treating as ERROR
1012      * but suppressing onError signal.
1013      *
1014      * Support for blocking also exploits the fact that there is only
1015      * one possible waiter. ManagedBlocker-compatible control fields
1016      * are placed in this class itself rather than in wait-nodes.
1017      * Blocking control relies on the "waiting" and "waiter"
1018      * fields. Producers set them before trying to block. Signalling
1019      * unparks and clears fields. If the producer and/or consumer are
1020      * using a ForkJoinPool, the producer attempts to help run
1021      * consumer tasks via ForkJoinPool.helpAsyncBlocker before
1022      * blocking.
1023      *
1024      * Usages of this class may encounter any of several forms of
1025      * memory contention. We try to ameliorate across them without
1026      * unduly impacting footprints in low-contention usages where it
1027      * isn't needed. Buffer arrays start out small and grow only as
1028      * needed.  The class uses @Contended and heuristic field
1029      * declaration ordering to reduce false-sharing memory contention
1030      * across instances of BufferedSubscription (as in, multiple
1031      * subscribers per publisher).  We additionally segregate some
1032      * fields that would otherwise nearly always encounter cache line
1033      * contention among producers and consumers. To reduce contention
1034      * across time (vs space), consumers only periodically update
1035      * other fields (see method takeItems), at the expense of possibly
1036      * staler reporting of lags and demand (bounded at 12.5% == 1/8
1037      * capacity) and possibly more atomic operations.
1038      *
1039      * Other forms of imbalance and slowdowns can occur during startup
1040      * when producer and consumer methods are compiled and/or memory
1041      * is allocated at different rates.  This is ameliorated by
1042      * artificially subdividing some consumer methods, including
1043      * isolation of all subscriber callbacks.  This code also includes
1044      * typical power-of-two array screening idioms to avoid compilers
1045      * generating traps, along with the usual SSA-based inline
1046      * assignment coding style. Also, all methods and fields have
1047      * default visibility to simplify usage by callers.
1048      */
1049     @SuppressWarnings("serial")
1050     @jdk.internal.vm.annotation.Contended
1051     static final class BufferedSubscription<T>
1052         implements Subscription, ForkJoinPool.ManagedBlocker {
1053         long timeout;                      // Long.MAX_VALUE if untimed wait
1054         int head;                          // next position to take
1055         int tail;                          // next position to put
1056         final int maxCapacity;             // max buffer size
1057         volatile int ctl;                  // atomic run state flags
1058         Object[] array;                    // buffer
1059         final Subscriber<? super T> subscriber;
1060         final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
1061         Executor executor;                 // null on error
1062         Thread waiter;                     // blocked producer thread
1063         Throwable pendingError;            // holds until onError issued
1064         BufferedSubscription<T> next;      // used only by publisher
1065         BufferedSubscription<T> nextRetry; // used only by publisher
1066 
1067         @jdk.internal.vm.annotation.Contended("c") // segregate
1068         volatile long demand;              // # unfilled requests
1069         @jdk.internal.vm.annotation.Contended("c")
1070         volatile int waiting;              // nonzero if producer blocked
1071 
1072         // ctl bit values
1073         static final int CLOSED   = 0x01;  // if set, other bits ignored
1074         static final int ACTIVE   = 0x02;  // keep-alive for consumer task
1075         static final int REQS     = 0x04;  // (possibly) nonzero demand
1076         static final int ERROR    = 0x08;  // issues onError when noticed
1077         static final int COMPLETE = 0x10;  // issues onComplete when done
1078         static final int RUN      = 0x20;  // task is or will be running
1079         static final int OPEN     = 0x40;  // true after subscribe
1080 
1081         static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
1082 
1083         BufferedSubscription(Subscriber<? super T> subscriber,
1084                              Executor executor,
1085                              BiConsumer<? super Subscriber<? super T>,
1086                              ? super Throwable> onNextHandler,
1087                              Object[] array,
1088                              int maxBufferCapacity) {
1089             this.subscriber = subscriber;
1090             this.executor = executor;
1091             this.onNextHandler = onNextHandler;
1092             this.array = array;
1093             this.maxCapacity = maxBufferCapacity;
1094         }
1095 
1096         // Wrappers for some VarHandle methods
1097 
1098         final boolean weakCasCtl(int cmp, int val) {
1099             return CTL.weakCompareAndSet(this, cmp, val);
1100         }
1101 
1102         final int getAndBitwiseOrCtl(int bits) {
1103             return (int)CTL.getAndBitwiseOr(this, bits);
1104         }
1105 
1106         final long subtractDemand(int k) {
1107             long n = (long)(-k);
1108             return n + (long)DEMAND.getAndAdd(this, n);
1109         }
1110 
1111         final boolean casDemand(long cmp, long val) {
1112             return DEMAND.compareAndSet(this, cmp, val);
1113         }
1114 
1115         // Utilities used by SubmissionPublisher
1116 
1117         /**
1118          * Returns true if closed (consumer task may still be running).
1119          */
1120         final boolean isClosed() {
1121             return (ctl & CLOSED) != 0;
1122         }
1123 
1124         /**
1125          * Returns estimated number of buffered items, or negative if
1126          * closed.
1127          */
1128         final int estimateLag() {
1129             int c = ctl, n = tail - head;
1130             return ((c & CLOSED) != 0) ? -1 : (n < 0) ? 0 : n;
1131         }
1132 
1133         // Methods for submitting items
1134 
1135         /**
1136          * Tries to add item and start consumer task if necessary.
1137          * @return negative if closed, 0 if saturated, else estimated lag
1138          */
1139         final int offer(T item, boolean unowned) {
1140             Object[] a;
1141             int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
1142             int t = tail, i = t & (cap - 1), n = t + 1 - head;
1143             if (cap > 0) {
1144                 boolean added;
1145                 if (n >= cap && cap < maxCapacity) // resize
1146                     added = growAndOffer(item, a, t);
1147                 else if (n >= cap || unowned)      // need volatile CAS
1148                     added = QA.compareAndSet(a, i, null, item);
1149                 else {                             // can use release mode
1150                     QA.setRelease(a, i, item);
1151                     added = true;
1152                 }
1153                 if (added) {
1154                     tail = t + 1;
1155                     stat = n;
1156                 }
1157             }
1158             return startOnOffer(stat);
1159         }
1160 
1161         /**
1162          * Tries to expand buffer and add item, returning true on
1163          * success. Currently fails only if out of memory.
1164          */
1165         final boolean growAndOffer(T item, Object[] a, int t) {
1166             int cap = 0, newCap = 0;
1167             Object[] newArray = null;
1168             if (a != null && (cap = a.length) > 0 && (newCap = cap << 1) > 0) {
1169                 try {
1170                     newArray = new Object[newCap];
1171                 } catch (OutOfMemoryError ex) {
1172                 }
1173             }
1174             if (newArray == null)
1175                 return false;
1176             else {                                // take and move items
1177                 int newMask = newCap - 1;
1178                 newArray[t-- & newMask] = item;
1179                 for (int mask = cap - 1, k = mask; k >= 0; --k) {
1180                     Object x = QA.getAndSet(a, t & mask, null);
1181                     if (x == null)
1182                         break;                    // already consumed
1183                     else
1184                         newArray[t-- & newMask] = x;
1185                 }
1186                 array = newArray;
1187                 VarHandle.releaseFence();         // release array and slots
1188                 return true;
1189             }
1190         }
1191 
1192         /**
1193          * Version of offer for retries (no resize or bias)
1194          */
1195         final int retryOffer(T item) {
1196             Object[] a;
1197             int stat = 0, t = tail, h = head, cap;
1198             if ((a = array) != null && (cap = a.length) > 0 &&
1199                 QA.compareAndSet(a, (cap - 1) & t, null, item))
1200                 stat = (tail = t + 1) - h;
1201             return startOnOffer(stat);
1202         }
1203 
1204         /**
1205          * Tries to start consumer task after offer.
1206          * @return negative if now closed, else argument
1207          */
1208         final int startOnOffer(int stat) {
1209             int c; // start or keep alive if requests exist and not active
1210             if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
1211                 ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
1212                 tryStart();
1213             else if ((c & CLOSED) != 0)
1214                 stat = -1;
1215             return stat;
1216         }
1217 
1218         /**
1219          * Tries to start consumer task. Sets error state on failure.
1220          */
1221         final void tryStart() {
1222             try {
1223                 Executor e;
1224                 ConsumerTask<T> task = new ConsumerTask<T>(this);
1225                 if ((e = executor) != null)   // skip if disabled on error
1226                     e.execute(task);
1227             } catch (RuntimeException | Error ex) {
1228                 getAndBitwiseOrCtl(ERROR | CLOSED);
1229                 throw ex;
1230             }
1231         }
1232 
1233         // Signals to consumer tasks
1234 
1235         /**
1236          * Sets the given control bits, starting task if not running or closed.
1237          * @param bits state bits, assumed to include RUN but not CLOSED
1238          */
1239         final void startOnSignal(int bits) {
1240             if ((ctl & bits) != bits &&
1241                 (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
1242                 tryStart();
1243         }
1244 
1245         final void onSubscribe() {
1246             startOnSignal(RUN | ACTIVE);
1247         }
1248 
1249         final void onComplete() {
1250             startOnSignal(RUN | ACTIVE | COMPLETE);
1251         }
1252 
1253         final void onError(Throwable ex) {
1254             int c; Object[] a;      // to null out buffer on async error
1255             if (ex != null)
1256                 pendingError = ex;  // races are OK
1257             if (((c = getAndBitwiseOrCtl(ERROR | RUN | ACTIVE)) & CLOSED) == 0) {
1258                 if ((c & RUN) == 0)
1259                     tryStart();
1260                 else if ((a = array) != null)
1261                     Arrays.fill(a, null);
1262             }
1263         }
1264 
1265         public final void cancel() {
1266             onError(null);
1267         }
1268 
1269         public final void request(long n) {
1270             if (n > 0L) {
1271                 for (;;) {
1272                     long p = demand, d = p + n;  // saturate
1273                     if (casDemand(p, d < p ? Long.MAX_VALUE : d))
1274                         break;
1275                 }
1276                 startOnSignal(RUN | ACTIVE | REQS);
1277             }
1278             else
1279                 onError(new IllegalArgumentException(
1280                             "non-positive subscription request"));
1281         }
1282 
1283         // Consumer task actions
1284 
1285         /**
1286          * Consumer loop, called from ConsumerTask, or indirectly when
1287          * helping during submit.
1288          */
1289         final void consume() {
1290             Subscriber<? super T> s;
1291             if ((s = subscriber) != null) {          // hoist checks
1292                 subscribeOnOpen(s);
1293                 long d = demand;
1294                 for (int h = head, t = tail;;) {
1295                     int c, taken; boolean empty;
1296                     if (((c = ctl) & ERROR) != 0) {
1297                         closeOnError(s, null);
1298                         break;
1299                     }
1300                     else if ((taken = takeItems(s, d, h)) > 0) {
1301                         head = h += taken;
1302                         d = subtractDemand(taken);
1303                     }
1304                     else if ((d = demand) == 0L && (c & REQS) != 0)
1305                         weakCasCtl(c, c & ~REQS);    // exhausted demand
1306                     else if (d != 0L && (c & REQS) == 0)
1307                         weakCasCtl(c, c | REQS);     // new demand
1308                     else if (t == (t = tail)) {      // stability check
1309                         if ((empty = (t == h)) && (c & COMPLETE) != 0) {
1310                             closeOnComplete(s);      // end of stream
1311                             break;
1312                         }
1313                         else if (empty || d == 0L) {
1314                             int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
1315                             if (weakCasCtl(c, c & ~bit) && bit == RUN)
1316                                 break;               // un-keep-alive or exit
1317                         }
1318                     }
1319                 }
1320             }
1321         }
1322 
1323         /**
1324          * Consumes some items until unavailable or bound or error.
1325          *
1326          * @param s subscriber
1327          * @param d current demand
1328          * @param h current head
1329          * @return number taken
1330          */
1331         final int takeItems(Subscriber<? super T> s, long d, int h) {
1332             Object[] a;
1333             int k = 0, cap;
1334             if ((a = array) != null && (cap = a.length) > 0) {
1335                 int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
1336                 int n = (d < (long)b) ? (int)d : b;
1337                 for (; k < n; ++h, ++k) {
1338                     Object x = QA.getAndSet(a, h & m, null);
1339                     if (waiting != 0)
1340                         signalWaiter();
1341                     if (x == null)
1342                         break;
1343                     else if (!consumeNext(s, x))
1344                         break;
1345                 }
1346             }
1347             return k;
1348         }
1349 
1350         final boolean consumeNext(Subscriber<? super T> s, Object x) {
1351             try {
1352                 @SuppressWarnings("unchecked") T y = (T) x;
1353                 if (s != null)
1354                     s.onNext(y);
1355                 return true;
1356             } catch (Throwable ex) {
1357                 handleOnNext(s, ex);
1358                 return false;
1359             }
1360         }
1361 
1362         /**
1363          * Processes exception in Subscriber.onNext.
1364          */
1365         final void handleOnNext(Subscriber<? super T> s, Throwable ex) {
1366             BiConsumer<? super Subscriber<? super T>, ? super Throwable> h;
1367             try {
1368                 if ((h = onNextHandler) != null)
1369                     h.accept(s, ex);
1370             } catch (Throwable ignore) {
1371             }
1372             closeOnError(s, ex);
1373         }
1374 
1375         /**
1376          * Issues subscriber.onSubscribe if this is first signal.
1377          */
1378         final void subscribeOnOpen(Subscriber<? super T> s) {
1379             if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0)
1380                 consumeSubscribe(s);
1381         }
1382 
1383         final void consumeSubscribe(Subscriber<? super T> s) {
1384             try {
1385                 if (s != null) // ignore if disabled
1386                     s.onSubscribe(this);
1387             } catch (Throwable ex) {
1388                 closeOnError(s, ex);
1389             }
1390         }
1391 
1392         /**
1393          * Issues subscriber.onComplete unless already closed.
1394          */
1395         final void closeOnComplete(Subscriber<? super T> s) {
1396             if ((getAndBitwiseOrCtl(CLOSED) & CLOSED) == 0)
1397                 consumeComplete(s);
1398         }
1399 
1400         final void consumeComplete(Subscriber<? super T> s) {
1401             try {
1402                 if (s != null)
1403                     s.onComplete();
1404             } catch (Throwable ignore) {
1405             }
1406         }
1407 
1408         /**
1409          * Issues subscriber.onError, and unblocks producer if needed.
1410          */
1411         final void closeOnError(Subscriber<? super T> s, Throwable ex) {
1412             if ((getAndBitwiseOrCtl(ERROR | CLOSED) & CLOSED) == 0) {
1413                 if (ex == null)
1414                     ex = pendingError;
1415                 pendingError = null;  // detach
1416                 executor = null;      // suppress racing start calls
1417                 signalWaiter();
1418                 consumeError(s, ex);
1419             }
1420         }
1421 
1422         final void consumeError(Subscriber<? super T> s, Throwable ex) {
1423             try {
1424                 if (ex != null && s != null)
1425                     s.onError(ex);
1426             } catch (Throwable ignore) {
1427             }
1428         }
1429 
1430         // Blocking support
1431 
1432         /**
1433          * Unblocks waiting producer.
1434          */
1435         final void signalWaiter() {
1436             Thread w;
1437             waiting = 0;
1438             if ((w = waiter) != null)
1439                 LockSupport.unpark(w);
1440         }
1441 
1442         /**
1443          * Returns true if closed or space available.
1444          * For ManagedBlocker.
1445          */
1446         public final boolean isReleasable() {
1447             Object[] a; int cap;
1448             return ((ctl & CLOSED) != 0 ||
1449                     ((a = array) != null && (cap = a.length) > 0 &&
1450                      QA.getAcquire(a, (cap - 1) & tail) == null));
1451         }
1452 
1453         /**
1454          * Helps or blocks until timeout, closed, or space available.
1455          */
1456         final void awaitSpace(long nanos) {
1457             if (!isReleasable()) {
1458                 ForkJoinPool.helpAsyncBlocker(executor, this);
1459                 if (!isReleasable()) {
1460                     timeout = nanos;
1461                     try {
1462                         ForkJoinPool.managedBlock(this);
1463                     } catch (InterruptedException ie) {
1464                         timeout = INTERRUPTED;
1465                     }
1466                     if (timeout == INTERRUPTED)
1467                         Thread.currentThread().interrupt();
1468                 }
1469             }
1470         }
1471 
1472         /**
1473          * Blocks until closed, space available or timeout.
1474          * For ManagedBlocker.
1475          */
1476         public final boolean block() {
1477             long nanos = timeout;
1478             boolean timed = (nanos < Long.MAX_VALUE);
1479             long deadline = timed ? System.nanoTime() + nanos : 0L;
1480             while (!isReleasable()) {
1481                 if (Thread.interrupted()) {
1482                     timeout = INTERRUPTED;
1483                     if (timed)
1484                         break;
1485                 }
1486                 else if (timed && (nanos = deadline - System.nanoTime()) <= 0L)
1487                     break;
1488                 else if (waiter == null)
1489                     waiter = Thread.currentThread();
1490                 else if (waiting == 0)
1491                     waiting = 1;
1492                 else if (timed)
1493                     LockSupport.parkNanos(this, nanos);
1494                 else
1495                     LockSupport.park(this);
1496             }
1497             waiter = null;
1498             waiting = 0;
1499             return true;
1500         }
1501 
1502         // VarHandle mechanics
1503         static final VarHandle CTL;
1504         static final VarHandle DEMAND;
1505         static final VarHandle QA;
1506 
1507         static {
1508             try {
1509                 MethodHandles.Lookup l = MethodHandles.lookup();
1510                 CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1511                                       int.class);
1512                 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1513                                          long.class);
1514                 QA = MethodHandles.arrayElementVarHandle(Object[].class);
1515             } catch (ReflectiveOperationException e) {
1516                 throw new ExceptionInInitializerError(e);
1517             }
1518 
1519             // Reduce the risk of rare disastrous classloading in first call to
1520             // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1521             Class<?> ensureLoaded = LockSupport.class;
1522         }
1523     }
1524 }