1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.invoke.MethodHandles;
  39 import java.lang.invoke.VarHandle;
  40 import java.util.ArrayList;
  41 import java.util.Arrays;
  42 import java.util.List;
  43 import java.util.concurrent.locks.LockSupport;
  44 import java.util.function.BiConsumer;
  45 import java.util.function.BiPredicate;
  46 import java.util.function.Consumer;
  47 import static java.util.concurrent.Flow.Publisher;
  48 import static java.util.concurrent.Flow.Subscriber;
  49 import static java.util.concurrent.Flow.Subscription;
  50 
  51 /**
  52  * A {@link Flow.Publisher} that asynchronously issues submitted
  53  * (non-null) items to current subscribers until it is closed.  Each
  54  * current subscriber receives newly submitted items in the same order
  55  * unless drops or exceptions are encountered.  Using a
  56  * SubmissionPublisher allows item generators to act as compliant <a
  57  * href="http://www.reactive-streams.org/"> reactive-streams</a>
  58  * Publishers relying on drop handling and/or blocking for flow
  59  * control.
  60  *
  61  * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
  62  * constructor for delivery to subscribers. The best choice of
  63  * Executor depends on expected usage. If the generator(s) of
  64  * submitted items run in separate threads, and the number of
  65  * subscribers can be estimated, consider using a {@link
  66  * Executors#newFixedThreadPool}. Otherwise consider using the
  67  * default, normally the {@link ForkJoinPool#commonPool}.
  68  *
  69  * <p>Buffering allows producers and consumers to transiently operate
  70  * at different rates.  Each subscriber uses an independent buffer.
  71  * Buffers are created upon first use and expanded as needed up to the
  72  * given maximum. (The enforced capacity may be rounded up to the
  73  * nearest power of two and/or bounded by the largest value supported
  74  * by this implementation.)  Invocations of {@link
  75  * Flow.Subscription#request(long) request} do not directly result in
  76  * buffer expansion, but risk saturation if unfilled requests exceed
  77  * the maximum capacity.  The default value of {@link
  78  * Flow#defaultBufferSize()} may provide a useful starting point for
  79  * choosing a capacity based on expected rates, resources, and usages.
  80  *
  81  * <p>A single SubmissionPublisher may be shared among multiple
  82  * sources. Actions in a source thread prior to publishing an item or
  83  * issuing a signal <a href="package-summary.html#MemoryVisibility">
  84  * <i>happen-before</i></a> actions subsequent to the corresponding
  85  * access by each subscriber. But reported estimates of lag and demand
  86  * are designed for use in monitoring, not for synchronization
  87  * control, and may reflect stale or inaccurate views of progress.
  88  *
  89  * <p>Publication methods support different policies about what to do
  90  * when buffers are saturated. Method {@link #submit(Object) submit}
  91  * blocks until resources are available. This is simplest, but least
  92  * responsive.  The {@code offer} methods may drop items (either
  93  * immediately or with bounded timeout), but provide an opportunity to
  94  * interpose a handler and then retry.
  95  *
  96  * <p>If any Subscriber method throws an exception, its subscription
  97  * is cancelled.  If a handler is supplied as a constructor argument,
  98  * it is invoked before cancellation upon an exception in method
  99  * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
 100  * {@link Flow.Subscriber#onSubscribe onSubscribe},
 101  * {@link Flow.Subscriber#onError(Throwable) onError} and
 102  * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
 103  * handled before cancellation.  If the supplied Executor throws
 104  * {@link RejectedExecutionException} (or any other RuntimeException
 105  * or Error) when attempting to execute a task, or a drop handler
 106  * throws an exception when processing a dropped item, then the
 107  * exception is rethrown. In these cases, not all subscribers will
 108  * have been issued the published item. It is usually good practice to
 109  * {@link #closeExceptionally closeExceptionally} in these cases.
 110  *
 111  * <p>Method {@link #consume(Consumer)} simplifies support for a
 112  * common case in which the only action of a subscriber is to request
 113  * and process all items using a supplied function.
 114  *
 115  * <p>This class may also serve as a convenient base for subclasses
 116  * that generate items, and use the methods in this class to publish
 117  * them.  For example here is a class that periodically publishes the
 118  * items generated from a supplier. (In practice you might add methods
 119  * to independently start and stop generation, to share Executors
 120  * among publishers, and so on, or use a SubmissionPublisher as a
 121  * component rather than a superclass.)
 122  *
 123  * <pre> {@code
 124  * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
 125  *   final ScheduledFuture<?> periodicTask;
 126  *   final ScheduledExecutorService scheduler;
 127  *   PeriodicPublisher(Executor executor, int maxBufferCapacity,
 128  *                     Supplier<? extends T> supplier,
 129  *                     long period, TimeUnit unit) {
 130  *     super(executor, maxBufferCapacity);
 131  *     scheduler = new ScheduledThreadPoolExecutor(1);
 132  *     periodicTask = scheduler.scheduleAtFixedRate(
 133  *       () -> submit(supplier.get()), 0, period, unit);
 134  *   }
 135  *   public void close() {
 136  *     periodicTask.cancel(false);
 137  *     scheduler.shutdown();
 138  *     super.close();
 139  *   }
 140  * }}</pre>
 141  *
 142  * <p>Here is an example of a {@link Flow.Processor} implementation.
 143  * It uses single-step requests to its publisher for simplicity of
 144  * illustration. A more adaptive version could monitor flow using the
 145  * lag estimate returned from {@code submit}, along with other utility
 146  * methods.
 147  *
 148  * <pre> {@code
 149  * class TransformProcessor<S,T> extends SubmissionPublisher<T>
 150  *   implements Flow.Processor<S,T> {
 151  *   final Function<? super S, ? extends T> function;
 152  *   Flow.Subscription subscription;
 153  *   TransformProcessor(Executor executor, int maxBufferCapacity,
 154  *                      Function<? super S, ? extends T> function) {
 155  *     super(executor, maxBufferCapacity);
 156  *     this.function = function;
 157  *   }
 158  *   public void onSubscribe(Flow.Subscription subscription) {
 159  *     (this.subscription = subscription).request(1);
 160  *   }
 161  *   public void onNext(S item) {
 162  *     subscription.request(1);
 163  *     submit(function.apply(item));
 164  *   }
 165  *   public void onError(Throwable ex) { closeExceptionally(ex); }
 166  *   public void onComplete() { close(); }
 167  * }}</pre>
 168  *
 169  * @param <T> the published item type
 170  * @author Doug Lea
 171  * @since 9
 172  */
 173 public class SubmissionPublisher<T> implements Publisher<T>,
 174                                                AutoCloseable {
 175     /*
 176      * Most mechanics are handled by BufferedSubscription. This class
 177      * mainly tracks subscribers and ensures sequentiality, by using
 178      * built-in synchronization locks across public methods. Using
 179      * built-in locks works well in the most typical case in which
 180      * only one thread submits items. We extend this idea in
 181      * submission methods by detecting single-ownership to reduce
 182      * producer-consumer synchronization strength.
 183      */
 184 
 185     /** The largest possible power of two array size. */
 186     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
 187 
 188     /**
 189      * Initial buffer capacity used when maxBufferCapacity is
 190      * greater. Must be a power of two.
 191      */
 192     static final int INITIAL_CAPACITY = 32;
 193 
 194     /** Round capacity to power of 2, at most limit. */
 195     static final int roundCapacity(int cap) {
 196         int n = cap - 1;
 197         n |= n >>> 1;
 198         n |= n >>> 2;
 199         n |= n >>> 4;
 200         n |= n >>> 8;
 201         n |= n >>> 16;
 202         return (n <= 0) ? 1 : // at least 1
 203             (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
 204     }
 205 
 206     // default Executor setup; nearly the same as CompletableFuture
 207 
 208     /**
 209      * Default executor -- ForkJoinPool.commonPool() unless it cannot
 210      * support parallelism.
 211      */
 212     private static final Executor ASYNC_POOL =
 213         (ForkJoinPool.getCommonPoolParallelism() > 1) ?
 214         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
 215 
 216     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
 217     private static final class ThreadPerTaskExecutor implements Executor {
 218         ThreadPerTaskExecutor() {}      // prevent access constructor creation
 219         public void execute(Runnable r) { new Thread(r).start(); }
 220     }
 221 
 222     /**
 223      * Clients (BufferedSubscriptions) are maintained in a linked list
 224      * (via their "next" fields). This works well for publish loops.
 225      * It requires O(n) traversal to check for duplicate subscribers,
 226      * but we expect that subscribing is much less common than
 227      * publishing. Unsubscribing occurs only during traversal loops,
 228      * when BufferedSubscription methods return negative values
 229      * signifying that they have been closed.  To reduce
 230      * head-of-line blocking, submit and offer methods first call
 231      * BufferedSubscription.offer on each subscriber, and place
 232      * saturated ones in retries list (using nextRetry field), and
 233      * retry, possibly blocking or dropping.
 234      */
 235     BufferedSubscription<T> clients;
 236 
 237     /** Run status, updated only within locks */
 238     volatile boolean closed;
 239     /** Set true on first call to subscribe, to initialize possible owner */
 240     boolean subscribed;
 241     /** The first caller thread to subscribe, or null if thread ever changed */
 242     Thread owner;
 243     /** If non-null, the exception in closeExceptionally */
 244     volatile Throwable closedException;
 245 
 246     // Parameters for constructing BufferedSubscriptions
 247     final Executor executor;
 248     final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
 249     final int maxBufferCapacity;
 250 
 251     /**
 252      * Creates a new SubmissionPublisher using the given Executor for
 253      * async delivery to subscribers, with the given maximum buffer size
 254      * for each subscriber, and, if non-null, the given handler invoked
 255      * when any Subscriber throws an exception in method {@link
 256      * Flow.Subscriber#onNext(Object) onNext}.
 257      *
 258      * @param executor the executor to use for async delivery,
 259      * supporting creation of at least one independent thread
 260      * @param maxBufferCapacity the maximum capacity for each
 261      * subscriber's buffer (the enforced capacity may be rounded up to
 262      * the nearest power of two and/or bounded by the largest value
 263      * supported by this implementation; method {@link #getMaxBufferCapacity}
 264      * returns the actual value)
 265      * @param handler if non-null, procedure to invoke upon exception
 266      * thrown in method {@code onNext}
 267      * @throws NullPointerException if executor is null
 268      * @throws IllegalArgumentException if maxBufferCapacity not
 269      * positive
 270      */
 271     public SubmissionPublisher(Executor executor, int maxBufferCapacity,
 272                                BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
 273         if (executor == null)
 274             throw new NullPointerException();
 275         if (maxBufferCapacity <= 0)
 276             throw new IllegalArgumentException("capacity must be positive");
 277         this.executor = executor;
 278         this.onNextHandler = handler;
 279         this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
 280     }
 281 
 282     /**
 283      * Creates a new SubmissionPublisher using the given Executor for
 284      * async delivery to subscribers, with the given maximum buffer size
 285      * for each subscriber, and no handler for Subscriber exceptions in
 286      * method {@link Flow.Subscriber#onNext(Object) onNext}.
 287      *
 288      * @param executor the executor to use for async delivery,
 289      * supporting creation of at least one independent thread
 290      * @param maxBufferCapacity the maximum capacity for each
 291      * subscriber's buffer (the enforced capacity may be rounded up to
 292      * the nearest power of two and/or bounded by the largest value
 293      * supported by this implementation; method {@link #getMaxBufferCapacity}
 294      * returns the actual value)
 295      * @throws NullPointerException if executor is null
 296      * @throws IllegalArgumentException if maxBufferCapacity not
 297      * positive
 298      */
 299     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
 300         this(executor, maxBufferCapacity, null);
 301     }
 302 
 303     /**
 304      * Creates a new SubmissionPublisher using the {@link
 305      * ForkJoinPool#commonPool()} for async delivery to subscribers
 306      * (unless it does not support a parallelism level of at least two,
 307      * in which case, a new Thread is created to run each task), with
 308      * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
 309      * handler for Subscriber exceptions in method {@link
 310      * Flow.Subscriber#onNext(Object) onNext}.
 311      */
 312     public SubmissionPublisher() {
 313         this(ASYNC_POOL, Flow.defaultBufferSize(), null);
 314     }
 315 
 316     /**
 317      * Adds the given Subscriber unless already subscribed.  If already
 318      * subscribed, the Subscriber's {@link
 319      * Flow.Subscriber#onError(Throwable) onError} method is invoked on
 320      * the existing subscription with an {@link IllegalStateException}.
 321      * Otherwise, upon success, the Subscriber's {@link
 322      * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
 323      * asynchronously with a new {@link Flow.Subscription}.  If {@link
 324      * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
 325      * subscription is cancelled. Otherwise, if this SubmissionPublisher
 326      * was closed exceptionally, then the subscriber's {@link
 327      * Flow.Subscriber#onError onError} method is invoked with the
 328      * corresponding exception, or if closed without exception, the
 329      * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
 330      * method is invoked.  Subscribers may enable receiving items by
 331      * invoking the {@link Flow.Subscription#request(long) request}
 332      * method of the new Subscription, and may unsubscribe by invoking
 333      * its {@link Flow.Subscription#cancel() cancel} method.
 334      *
 335      * @param subscriber the subscriber
 336      * @throws NullPointerException if subscriber is null
 337      */
 338     public void subscribe(Subscriber<? super T> subscriber) {
 339         if (subscriber == null) throw new NullPointerException();
 340         int max = maxBufferCapacity; // allocate initial array
 341         Object[] array = new Object[max < INITIAL_CAPACITY ?
 342                                     max : INITIAL_CAPACITY];
 343         BufferedSubscription<T> subscription =
 344             new BufferedSubscription<T>(subscriber, executor, onNextHandler,
 345                                         array, max);
 346         synchronized (this) {
 347             if (!subscribed) {
 348                 subscribed = true;
 349                 owner = Thread.currentThread();
 350             }
 351             for (BufferedSubscription<T> b = clients, pred = null;;) {
 352                 if (b == null) {
 353                     Throwable ex;
 354                     subscription.onSubscribe();
 355                     if ((ex = closedException) != null)
 356                         subscription.onError(ex);
 357                     else if (closed)
 358                         subscription.onComplete();
 359                     else if (pred == null)
 360                         clients = subscription;
 361                     else
 362                         pred.next = subscription;
 363                     break;
 364                 }
 365                 BufferedSubscription<T> next = b.next;
 366                 if (b.isClosed()) {   // remove
 367                     b.next = null;    // detach
 368                     if (pred == null)
 369                         clients = next;
 370                     else
 371                         pred.next = next;
 372                 }
 373                 else if (subscriber.equals(b.subscriber)) {
 374                     b.onError(new IllegalStateException("Duplicate subscribe"));
 375                     break;
 376                 }
 377                 else
 378                     pred = b;
 379                 b = next;
 380             }
 381         }
 382     }
 383 
 384     /**
 385      * Common implementation for all three forms of submit and offer.
 386      * Acts as submit if nanos == Long.MAX_VALUE, else offer.
 387      */
 388     private int doOffer(T item, long nanos,
 389                         BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
 390         if (item == null) throw new NullPointerException();
 391         int lag = 0;
 392         boolean complete, unowned;
 393         synchronized (this) {
 394             Thread t = Thread.currentThread(), o;
 395             BufferedSubscription<T> b = clients;
 396             if ((unowned = ((o = owner) != t)) && o != null)
 397                 owner = null;                     // disable bias
 398             if (b == null)
 399                 complete = closed;
 400             else {
 401                 complete = false;
 402                 boolean cleanMe = false;
 403                 BufferedSubscription<T> retries = null, rtail = null, next;
 404                 do {
 405                     next = b.next;
 406                     int stat = b.offer(item, unowned);
 407                     if (stat == 0) {              // saturated; add to retry list
 408                         b.nextRetry = null;       // avoid garbage on exceptions
 409                         if (rtail == null)
 410                             retries = b;
 411                         else
 412                             rtail.nextRetry = b;
 413                         rtail = b;
 414                     }
 415                     else if (stat < 0)            // closed
 416                         cleanMe = true;           // remove later
 417                     else if (stat > lag)
 418                         lag = stat;
 419                 } while ((b = next) != null);
 420 
 421                 if (retries != null || cleanMe)
 422                     lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
 423             }
 424         }
 425         if (complete)
 426             throw new IllegalStateException("Closed");
 427         else
 428             return lag;
 429     }
 430 
 431     /**
 432      * Helps, (timed) waits for, and/or drops buffers on list; returns
 433      * lag or negative drops (for use in offer).
 434      */
 435     private int retryOffer(T item, long nanos,
 436                            BiPredicate<Subscriber<? super T>, ? super T> onDrop,
 437                            BufferedSubscription<T> retries, int lag,
 438                            boolean cleanMe) {
 439         for (BufferedSubscription<T> r = retries; r != null;) {
 440             BufferedSubscription<T> nextRetry = r.nextRetry;
 441             r.nextRetry = null;
 442             if (nanos > 0L)
 443                 r.awaitSpace(nanos);
 444             int stat = r.retryOffer(item);
 445             if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
 446                 stat = r.retryOffer(item);
 447             if (stat == 0)
 448                 lag = (lag >= 0) ? -1 : lag - 1;
 449             else if (stat < 0)
 450                 cleanMe = true;
 451             else if (lag >= 0 && stat > lag)
 452                 lag = stat;
 453             r = nextRetry;
 454         }
 455         if (cleanMe)
 456             cleanAndCount();
 457         return lag;
 458     }
 459 
 460     /**
 461      * Returns current list count after removing closed subscribers.
 462      * Call only while holding lock.  Used mainly by retryOffer for
 463      * cleanup.
 464      */
 465     private int cleanAndCount() {
 466         int count = 0;
 467         BufferedSubscription<T> pred = null, next;
 468         for (BufferedSubscription<T> b = clients; b != null; b = next) {
 469             next = b.next;
 470             if (b.isClosed()) {
 471                 b.next = null;
 472                 if (pred == null)
 473                     clients = next;
 474                 else
 475                     pred.next = next;
 476             }
 477             else {
 478                 pred = b;
 479                 ++count;
 480             }
 481         }
 482         return count;
 483     }
 484 
 485     /**
 486      * Publishes the given item to each current subscriber by
 487      * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
 488      * onNext} method, blocking uninterruptibly while resources for any
 489      * subscriber are unavailable. This method returns an estimate of
 490      * the maximum lag (number of items submitted but not yet consumed)
 491      * among all current subscribers. This value is at least one
 492      * (accounting for this submitted item) if there are any
 493      * subscribers, else zero.
 494      *
 495      * <p>If the Executor for this publisher throws a
 496      * RejectedExecutionException (or any other RuntimeException or
 497      * Error) when attempting to asynchronously notify subscribers,
 498      * then this exception is rethrown, in which case not all
 499      * subscribers will have been issued this item.
 500      *
 501      * @param item the (non-null) item to publish
 502      * @return the estimated maximum lag among subscribers
 503      * @throws IllegalStateException if closed
 504      * @throws NullPointerException if item is null
 505      * @throws RejectedExecutionException if thrown by Executor
 506      */
 507     public int submit(T item) {
 508         return doOffer(item, Long.MAX_VALUE, null);
 509     }
 510 
 511     /**
 512      * Publishes the given item, if possible, to each current subscriber
 513      * by asynchronously invoking its {@link
 514      * Flow.Subscriber#onNext(Object) onNext} method. The item may be
 515      * dropped by one or more subscribers if resource limits are
 516      * exceeded, in which case the given handler (if non-null) is
 517      * invoked, and if it returns true, retried once.  Other calls to
 518      * methods in this class by other threads are blocked while the
 519      * handler is invoked.  Unless recovery is assured, options are
 520      * usually limited to logging the error and/or issuing an {@link
 521      * Flow.Subscriber#onError(Throwable) onError} signal to the
 522      * subscriber.
 523      *
 524      * <p>This method returns a status indicator: If negative, it
 525      * represents the (negative) number of drops (failed attempts to
 526      * issue the item to a subscriber). Otherwise it is an estimate of
 527      * the maximum lag (number of items submitted but not yet
 528      * consumed) among all current subscribers. This value is at least
 529      * one (accounting for this submitted item) if there are any
 530      * subscribers, else zero.
 531      *
 532      * <p>If the Executor for this publisher throws a
 533      * RejectedExecutionException (or any other RuntimeException or
 534      * Error) when attempting to asynchronously notify subscribers, or
 535      * the drop handler throws an exception when processing a dropped
 536      * item, then this exception is rethrown.
 537      *
 538      * @param item the (non-null) item to publish
 539      * @param onDrop if non-null, the handler invoked upon a drop to a
 540      * subscriber, with arguments of the subscriber and item; if it
 541      * returns true, an offer is re-attempted (once)
 542      * @return if negative, the (negative) number of drops; otherwise
 543      * an estimate of maximum lag
 544      * @throws IllegalStateException if closed
 545      * @throws NullPointerException if item is null
 546      * @throws RejectedExecutionException if thrown by Executor
 547      */
 548     public int offer(T item,
 549                      BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
 550         return doOffer(item, 0L, onDrop);
 551     }
 552 
 553     /**
 554      * Publishes the given item, if possible, to each current subscriber
 555      * by asynchronously invoking its {@link
 556      * Flow.Subscriber#onNext(Object) onNext} method, blocking while
 557      * resources for any subscription are unavailable, up to the
 558      * specified timeout or until the caller thread is interrupted, at
 559      * which point the given handler (if non-null) is invoked, and if it
 560      * returns true, retried once. (The drop handler may distinguish
 561      * timeouts from interrupts by checking whether the current thread
 562      * is interrupted.)  Other calls to methods in this class by other
 563      * threads are blocked while the handler is invoked.  Unless
 564      * recovery is assured, options are usually limited to logging the
 565      * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
 566      * onError} signal to the subscriber.
 567      *
 568      * <p>This method returns a status indicator: If negative, it
 569      * represents the (negative) number of drops (failed attempts to
 570      * issue the item to a subscriber). Otherwise it is an estimate of
 571      * the maximum lag (number of items submitted but not yet
 572      * consumed) among all current subscribers. This value is at least
 573      * one (accounting for this submitted item) if there are any
 574      * subscribers, else zero.
 575      *
 576      * <p>If the Executor for this publisher throws a
 577      * RejectedExecutionException (or any other RuntimeException or
 578      * Error) when attempting to asynchronously notify subscribers, or
 579      * the drop handler throws an exception when processing a dropped
 580      * item, then this exception is rethrown.
 581      *
 582      * @param item the (non-null) item to publish
 583      * @param timeout how long to wait for resources for any subscriber
 584      * before giving up, in units of {@code unit}
 585      * @param unit a {@code TimeUnit} determining how to interpret the
 586      * {@code timeout} parameter
 587      * @param onDrop if non-null, the handler invoked upon a drop to a
 588      * subscriber, with arguments of the subscriber and item; if it
 589      * returns true, an offer is re-attempted (once)
 590      * @return if negative, the (negative) number of drops; otherwise
 591      * an estimate of maximum lag
 592      * @throws IllegalStateException if closed
 593      * @throws NullPointerException if item is null
 594      * @throws RejectedExecutionException if thrown by Executor
 595      */
 596     public int offer(T item, long timeout, TimeUnit unit,
 597                      BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
 598         long nanos = unit.toNanos(timeout);
 599         // distinguishes from untimed (only wrt interrupt policy)
 600         if (nanos == Long.MAX_VALUE) --nanos;
 601         return doOffer(item, nanos, onDrop);
 602     }
 603 
 604     /**
 605      * Unless already closed, issues {@link
 606      * Flow.Subscriber#onComplete() onComplete} signals to current
 607      * subscribers, and disallows subsequent attempts to publish.
 608      * Upon return, this method does <em>NOT</em> guarantee that all
 609      * subscribers have yet completed.
 610      */
 611     public void close() {
 612         if (!closed) {
 613             BufferedSubscription<T> b;
 614             synchronized (this) {
 615                 // no need to re-check closed here
 616                 b = clients;
 617                 clients = null;
 618                 owner = null;
 619                 closed = true;
 620             }
 621             while (b != null) {
 622                 BufferedSubscription<T> next = b.next;
 623                 b.next = null;
 624                 b.onComplete();
 625                 b = next;
 626             }
 627         }
 628     }
 629 
 630     /**
 631      * Unless already closed, issues {@link
 632      * Flow.Subscriber#onError(Throwable) onError} signals to current
 633      * subscribers with the given error, and disallows subsequent
 634      * attempts to publish.  Future subscribers also receive the given
 635      * error. Upon return, this method does <em>NOT</em> guarantee
 636      * that all subscribers have yet completed.
 637      *
 638      * @param error the {@code onError} argument sent to subscribers
 639      * @throws NullPointerException if error is null
 640      */
 641     public void closeExceptionally(Throwable error) {
 642         if (error == null)
 643             throw new NullPointerException();
 644         if (!closed) {
 645             BufferedSubscription<T> b;
 646             synchronized (this) {
 647                 b = clients;
 648                 if (!closed) {  // don't clobber racing close
 649                     closedException = error;
 650                     clients = null;
 651                     owner = null;
 652                     closed = true;
 653                 }
 654             }
 655             while (b != null) {
 656                 BufferedSubscription<T> next = b.next;
 657                 b.next = null;
 658                 b.onError(error);
 659                 b = next;
 660             }
 661         }
 662     }
 663 
 664     /**
 665      * Returns true if this publisher is not accepting submissions.
 666      *
 667      * @return true if closed
 668      */
 669     public boolean isClosed() {
 670         return closed;
 671     }
 672 
 673     /**
 674      * Returns the exception associated with {@link
 675      * #closeExceptionally(Throwable) closeExceptionally}, or null if
 676      * not closed or if closed normally.
 677      *
 678      * @return the exception, or null if none
 679      */
 680     public Throwable getClosedException() {
 681         return closedException;
 682     }
 683 
 684     /**
 685      * Returns true if this publisher has any subscribers.
 686      *
 687      * @return true if this publisher has any subscribers
 688      */
 689     public boolean hasSubscribers() {
 690         boolean nonEmpty = false;
 691         synchronized (this) {
 692             for (BufferedSubscription<T> b = clients; b != null;) {
 693                 BufferedSubscription<T> next = b.next;
 694                 if (b.isClosed()) {
 695                     b.next = null;
 696                     b = clients = next;
 697                 }
 698                 else {
 699                     nonEmpty = true;
 700                     break;
 701                 }
 702             }
 703         }
 704         return nonEmpty;
 705     }
 706 
 707     /**
 708      * Returns the number of current subscribers.
 709      *
 710      * @return the number of current subscribers
 711      */
 712     public int getNumberOfSubscribers() {
 713         synchronized (this) {
 714             return cleanAndCount();
 715         }
 716     }
 717 
 718     /**
 719      * Returns the Executor used for asynchronous delivery.
 720      *
 721      * @return the Executor used for asynchronous delivery
 722      */
 723     public Executor getExecutor() {
 724         return executor;
 725     }
 726 
 727     /**
 728      * Returns the maximum per-subscriber buffer capacity.
 729      *
 730      * @return the maximum per-subscriber buffer capacity
 731      */
 732     public int getMaxBufferCapacity() {
 733         return maxBufferCapacity;
 734     }
 735 
 736     /**
 737      * Returns a list of current subscribers for monitoring and
 738      * tracking purposes, not for invoking {@link Flow.Subscriber}
 739      * methods on the subscribers.
 740      *
 741      * @return list of current subscribers
 742      */
 743     public List<Subscriber<? super T>> getSubscribers() {
 744         ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
 745         synchronized (this) {
 746             BufferedSubscription<T> pred = null, next;
 747             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 748                 next = b.next;
 749                 if (b.isClosed()) {
 750                     b.next = null;
 751                     if (pred == null)
 752                         clients = next;
 753                     else
 754                         pred.next = next;
 755                 }
 756                 else {
 757                     subs.add(b.subscriber);
 758                     pred = b;
 759                 }
 760             }
 761         }
 762         return subs;
 763     }
 764 
 765     /**
 766      * Returns true if the given Subscriber is currently subscribed.
 767      *
 768      * @param subscriber the subscriber
 769      * @return true if currently subscribed
 770      * @throws NullPointerException if subscriber is null
 771      */
 772     public boolean isSubscribed(Subscriber<? super T> subscriber) {
 773         if (subscriber == null) throw new NullPointerException();
 774         if (!closed) {
 775             synchronized (this) {
 776                 BufferedSubscription<T> pred = null, next;
 777                 for (BufferedSubscription<T> b = clients; b != null; b = next) {
 778                     next = b.next;
 779                     if (b.isClosed()) {
 780                         b.next = null;
 781                         if (pred == null)
 782                             clients = next;
 783                         else
 784                             pred.next = next;
 785                     }
 786                     else if (subscriber.equals(b.subscriber))
 787                         return true;
 788                     else
 789                         pred = b;
 790                 }
 791             }
 792         }
 793         return false;
 794     }
 795 
 796     /**
 797      * Returns an estimate of the minimum number of items requested
 798      * (via {@link Flow.Subscription#request(long) request}) but not
 799      * yet produced, among all current subscribers.
 800      *
 801      * @return the estimate, or zero if no subscribers
 802      */
 803     public long estimateMinimumDemand() {
 804         long min = Long.MAX_VALUE;
 805         boolean nonEmpty = false;
 806         synchronized (this) {
 807             BufferedSubscription<T> pred = null, next;
 808             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 809                 int n; long d;
 810                 next = b.next;
 811                 if ((n = b.estimateLag()) < 0) {
 812                     b.next = null;
 813                     if (pred == null)
 814                         clients = next;
 815                     else
 816                         pred.next = next;
 817                 }
 818                 else {
 819                     if ((d = b.demand - n) < min)
 820                         min = d;
 821                     nonEmpty = true;
 822                     pred = b;
 823                 }
 824             }
 825         }
 826         return nonEmpty ? min : 0;
 827     }
 828 
 829     /**
 830      * Returns an estimate of the maximum number of items produced but
 831      * not yet consumed among all current subscribers.
 832      *
 833      * @return the estimate
 834      */
 835     public int estimateMaximumLag() {
 836         int max = 0;
 837         synchronized (this) {
 838             BufferedSubscription<T> pred = null, next;
 839             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 840                 int n;
 841                 next = b.next;
 842                 if ((n = b.estimateLag()) < 0) {
 843                     b.next = null;
 844                     if (pred == null)
 845                         clients = next;
 846                     else
 847                         pred.next = next;
 848                 }
 849                 else {
 850                     if (n > max)
 851                         max = n;
 852                     pred = b;
 853                 }
 854             }
 855         }
 856         return max;
 857     }
 858 
 859     /**
 860      * Processes all published items using the given Consumer function.
 861      * Returns a CompletableFuture that is completed normally when this
 862      * publisher signals {@link Flow.Subscriber#onComplete()
 863      * onComplete}, or completed exceptionally upon any error, or an
 864      * exception is thrown by the Consumer, or the returned
 865      * CompletableFuture is cancelled, in which case no further items
 866      * are processed.
 867      *
 868      * @param consumer the function applied to each onNext item
 869      * @return a CompletableFuture that is completed normally
 870      * when the publisher signals onComplete, and exceptionally
 871      * upon any error or cancellation
 872      * @throws NullPointerException if consumer is null
 873      */
 874     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
 875         if (consumer == null)
 876             throw new NullPointerException();
 877         CompletableFuture<Void> status = new CompletableFuture<>();
 878         subscribe(new ConsumerSubscriber<T>(status, consumer));
 879         return status;
 880     }
 881 
 882     /** Subscriber for method consume */
 883     static final class ConsumerSubscriber<T> implements Subscriber<T> {
 884         final CompletableFuture<Void> status;
 885         final Consumer<? super T> consumer;
 886         Subscription subscription;
 887         ConsumerSubscriber(CompletableFuture<Void> status,
 888                            Consumer<? super T> consumer) {
 889             this.status = status; this.consumer = consumer;
 890         }
 891         public final void onSubscribe(Subscription subscription) {
 892             this.subscription = subscription;
 893             status.whenComplete((v, e) -> subscription.cancel());
 894             if (!status.isDone())
 895                 subscription.request(Long.MAX_VALUE);
 896         }
 897         public final void onError(Throwable ex) {
 898             status.completeExceptionally(ex);
 899         }
 900         public final void onComplete() {
 901             status.complete(null);
 902         }
 903         public final void onNext(T item) {
 904             try {
 905                 consumer.accept(item);
 906             } catch (Throwable ex) {
 907                 subscription.cancel();
 908                 status.completeExceptionally(ex);
 909             }
 910         }
 911     }
 912 
 913     /**
 914      * A task for consuming buffer items and signals, created and
 915      * executed whenever they become available. A task consumes as
 916      * many items/signals as possible before terminating, at which
 917      * point another task is created when needed. The dual Runnable
 918      * and ForkJoinTask declaration saves overhead when executed by
 919      * ForkJoinPools, without impacting other kinds of Executors.
 920      */
 921     @SuppressWarnings("serial")
 922     static final class ConsumerTask<T> extends ForkJoinTask<Void>
 923         implements Runnable, CompletableFuture.AsynchronousCompletionTask {
 924         final BufferedSubscription<T> consumer;
 925         ConsumerTask(BufferedSubscription<T> consumer) {
 926             this.consumer = consumer;
 927         }
 928         public final Void getRawResult() { return null; }
 929         public final void setRawResult(Void v) {}
 930         public final boolean exec() { consumer.consume(); return false; }
 931         public final void run() { consumer.consume(); }
 932     }
 933 
 934     /**
 935      * A resizable array-based ring buffer with integrated control to
 936      * start a consumer task whenever items are available.  The buffer
 937      * algorithm is specialized for the case of at most one concurrent
 938      * producer and consumer, and power of two buffer sizes. It relies
 939      * primarily on atomic operations (CAS or getAndSet) at the next
 940      * array slot to put or take an element, at the "tail" and "head"
 941      * indices written only by the producer and consumer respectively.
 942      *
 943      * We ensure internally that there is at most one active consumer
 944      * task at any given time. The publisher guarantees a single
 945      * producer via its lock. Sync among producers and consumers
 946      * relies on volatile fields "ctl", "demand", and "waiting" (along
 947      * with element access). Other variables are accessed in plain
 948      * mode, relying on outer ordering and exclusion, and/or enclosing
 949      * them within other volatile accesses. Some atomic operations are
 950      * avoided by tracking single threaded ownership by producers (in
 951      * the style of biased locking).
 952      *
 953      * Execution control and protocol state are managed using field
 954      * "ctl".  Methods to subscribe, close, request, and cancel set
 955      * ctl bits (mostly using atomic boolean method getAndBitwiseOr),
 956      * and ensure that a task is running. (The corresponding consumer
 957      * side actions are in method consume.)  To avoid starting a new
 958      * task on each action, ctl also includes a keep-alive bit
 959      * (ACTIVE) that is refreshed if needed on producer actions.
 960      * (Maintaining agreement about keep-alives requires most atomic
 961      * updates to be full SC/Volatile strength, which is still much
 962      * cheaper than using one task per item.)  Error signals
 963      * additionally null out items and/or fields to reduce termination
 964      * latency.  The cancel() method is supported by treating as ERROR
 965      * but suppressing onError signal.
 966      *
 967      * Support for blocking also exploits the fact that there is only
 968      * one possible waiter. ManagedBlocker-compatible control fields
 969      * are placed in this class itself rather than in wait-nodes.
 970      * Blocking control relies on the "waiting" and "waiter"
 971      * fields. Producers set them before trying to block. Signalling
 972      * unparks and clears fields. If the producer and/or consumer are
 973      * using a ForkJoinPool, the producer attempts to help run
 974      * consumer tasks via ForkJoinPool.helpAsyncBlocker before
 975      * blocking.
 976      *
 977      * Usages of this class may encounter any of several forms of
 978      * memory contention. We try to ameliorate across them without
 979      * unduly impacting footprints in low-contention usages where it
 980      * isn't needed. Buffer arrays start out small and grow only as
 981      * needed.  The class uses @Contended and heuristic field
 982      * declaration ordering to reduce false-sharing memory contention
 983      * across instances of BufferedSubscription (as in, multiple
 984      * subscribers per publisher).  We additionally segregate some
 985      * fields that would otherwise nearly always encounter cache line
 986      * contention among producers and consumers. To reduce contention
 987      * across time (vs space), consumers only periodically update
 988      * other fields (see method takeItems), at the expense of possibly
 989      * staler reporting of lags and demand (bounded at 12.5% == 1/8
 990      * capacity) and possibly more atomic operations.
 991      *
 992      * Other forms of imbalance and slowdowns can occur during startup
 993      * when producer and consumer methods are compiled and/or memory
 994      * is allocated at different rates.  This is ameliorated by
 995      * artificially subdividing some consumer methods, including
 996      * isolation of all subscriber callbacks.  This code also includes
 997      * typical power-of-two array screening idioms to avoid compilers
 998      * generating traps, along with the usual SSA-based inline
 999      * assignment coding style. Also, all methods and fields have
1000      * default visibility to simplify usage by callers.
1001      */
1002     @SuppressWarnings("serial")
1003     @jdk.internal.vm.annotation.Contended
1004     static final class BufferedSubscription<T>
1005         implements Subscription, ForkJoinPool.ManagedBlocker {
1006         long timeout;                      // Long.MAX_VALUE if untimed wait
1007         int head;                          // next position to take
1008         int tail;                          // next position to put
1009         final int maxCapacity;             // max buffer size
1010         volatile int ctl;                  // atomic run state flags
1011         Object[] array;                    // buffer
1012         final Subscriber<? super T> subscriber;
1013         final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
1014         Executor executor;                 // null on error
1015         Thread waiter;                     // blocked producer thread
1016         Throwable pendingError;            // holds until onError issued
1017         BufferedSubscription<T> next;      // used only by publisher
1018         BufferedSubscription<T> nextRetry; // used only by publisher
1019 
1020         @jdk.internal.vm.annotation.Contended("c") // segregate
1021         volatile long demand;              // # unfilled requests
1022         @jdk.internal.vm.annotation.Contended("c")
1023         volatile int waiting;              // nonzero if producer blocked
1024 
1025         // ctl bit values
1026         static final int CLOSED   = 0x01;  // if set, other bits ignored
1027         static final int ACTIVE   = 0x02;  // keep-alive for consumer task
1028         static final int REQS     = 0x04;  // (possibly) nonzero demand
1029         static final int ERROR    = 0x08;  // issues onError when noticed
1030         static final int COMPLETE = 0x10;  // issues onComplete when done
1031         static final int RUN      = 0x20;  // task is or will be running
1032         static final int OPEN     = 0x40;  // true after subscribe
1033 
1034         static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
1035 
1036         BufferedSubscription(Subscriber<? super T> subscriber,
1037                              Executor executor,
1038                              BiConsumer<? super Subscriber<? super T>,
1039                              ? super Throwable> onNextHandler,
1040                              Object[] array,
1041                              int maxBufferCapacity) {
1042             this.subscriber = subscriber;
1043             this.executor = executor;
1044             this.onNextHandler = onNextHandler;
1045             this.array = array;
1046             this.maxCapacity = maxBufferCapacity;
1047         }
1048 
1049         // Wrappers for some VarHandle methods
1050 
1051         final boolean weakCasCtl(int cmp, int val) {
1052             return CTL.weakCompareAndSet(this, cmp, val);
1053         }
1054 
1055         final int getAndBitwiseOrCtl(int bits) {
1056             return (int)CTL.getAndBitwiseOr(this, bits);
1057         }
1058 
1059         final long subtractDemand(int k) {
1060             long n = (long)(-k);
1061             return n + (long)DEMAND.getAndAdd(this, n);
1062         }
1063 
1064         final boolean casDemand(long cmp, long val) {
1065             return DEMAND.compareAndSet(this, cmp, val);
1066         }
1067 
1068         // Utilities used by SubmissionPublisher
1069 
1070         /**
1071          * Returns true if closed (consumer task may still be running).
1072          */
1073         final boolean isClosed() {
1074             return (ctl & CLOSED) != 0;
1075         }
1076 
1077         /**
1078          * Returns estimated number of buffered items, or negative if
1079          * closed.
1080          */
1081         final int estimateLag() {
1082             int c = ctl, n = tail - head;
1083             return ((c & CLOSED) != 0) ? -1 : (n < 0) ? 0 : n;
1084         }
1085 
1086         // Methods for submitting items
1087 
1088         /**
1089          * Tries to add item and start consumer task if necessary.
1090          * @return negative if closed, 0 if saturated, else estimated lag
1091          */
1092         final int offer(T item, boolean unowned) {
1093             Object[] a;
1094             int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
1095             int t = tail, i = t & (cap - 1), n = t + 1 - head;
1096             if (cap > 0) {
1097                 boolean added;
1098                 if (n >= cap && cap < maxCapacity) // resize
1099                     added = growAndOffer(item, a, t);
1100                 else if (n >= cap || unowned)      // need volatile CAS
1101                     added = QA.compareAndSet(a, i, null, item);
1102                 else {                             // can use release mode
1103                     QA.setRelease(a, i, item);
1104                     added = true;
1105                 }
1106                 if (added) {
1107                     tail = t + 1;
1108                     stat = n;
1109                 }
1110             }
1111             return startOnOffer(stat);
1112         }
1113 
1114         /**
1115          * Tries to expand buffer and add item, returning true on
1116          * success. Currently fails only if out of memory.
1117          */
1118         final boolean growAndOffer(T item, Object[] a, int t) {
1119             int cap = 0, newCap = 0;
1120             Object[] newArray = null;
1121             if (a != null && (cap = a.length) > 0 && (newCap = cap << 1) > 0) {
1122                 try {
1123                     newArray = new Object[newCap];
1124                 } catch (OutOfMemoryError ex) {
1125                 }
1126             }
1127             if (newArray == null)
1128                 return false;
1129             else {                                // take and move items
1130                 int newMask = newCap - 1;
1131                 newArray[t-- & newMask] = item;
1132                 for (int mask = cap - 1, k = mask; k >= 0; --k) {
1133                     Object x = QA.getAndSet(a, t & mask, null);
1134                     if (x == null)
1135                         break;                    // already consumed
1136                     else
1137                         newArray[t-- & newMask] = x;
1138                 }
1139                 array = newArray;
1140                 VarHandle.releaseFence();         // release array and slots
1141                 return true;
1142             }
1143         }
1144 
1145         /**
1146          * Version of offer for retries (no resize or bias)
1147          */
1148         final int retryOffer(T item) {
1149             Object[] a;
1150             int stat = 0, t = tail, h = head, cap;
1151             if ((a = array) != null && (cap = a.length) > 0 &&
1152                 QA.compareAndSet(a, (cap - 1) & t, null, item))
1153                 stat = (tail = t + 1) - h;
1154             return startOnOffer(stat);
1155         }
1156 
1157         /**
1158          * Tries to start consumer task after offer.
1159          * @return negative if now closed, else argument
1160          */
1161         final int startOnOffer(int stat) {
1162             int c; // start or keep alive if requests exist and not active
1163             if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
1164                 ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
1165                 tryStart();
1166             else if ((c & CLOSED) != 0)
1167                 stat = -1;
1168             return stat;
1169         }
1170 
1171         /**
1172          * Tries to start consumer task. Sets error state on failure.
1173          */
1174         final void tryStart() {
1175             try {
1176                 Executor e;
1177                 ConsumerTask<T> task = new ConsumerTask<T>(this);
1178                 if ((e = executor) != null)   // skip if disabled on error
1179                     e.execute(task);
1180             } catch (RuntimeException | Error ex) {
1181                 getAndBitwiseOrCtl(ERROR | CLOSED);
1182                 throw ex;
1183             }
1184         }
1185 
1186         // Signals to consumer tasks
1187 
1188         /**
1189          * Sets the given control bits, starting task if not running or closed.
1190          * @param bits state bits, assumed to include RUN but not CLOSED
1191          */
1192         final void startOnSignal(int bits) {
1193             if ((ctl & bits) != bits &&
1194                 (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
1195                 tryStart();
1196         }
1197 
1198         final void onSubscribe() {
1199             startOnSignal(RUN | ACTIVE);
1200         }
1201 
1202         final void onComplete() {
1203             startOnSignal(RUN | ACTIVE | COMPLETE);
1204         }
1205 
1206         final void onError(Throwable ex) {
1207             int c; Object[] a;      // to null out buffer on async error
1208             if (ex != null)
1209                 pendingError = ex;  // races are OK
1210             if (((c = getAndBitwiseOrCtl(ERROR | RUN | ACTIVE)) & CLOSED) == 0) {
1211                 if ((c & RUN) == 0)
1212                     tryStart();
1213                 else if ((a = array) != null)
1214                     Arrays.fill(a, null);
1215             }
1216         }
1217 
1218         public final void cancel() {
1219             onError(null);
1220         }
1221 
1222         public final void request(long n) {
1223             if (n > 0L) {
1224                 for (;;) {
1225                     long p = demand, d = p + n;  // saturate
1226                     if (casDemand(p, d < p ? Long.MAX_VALUE : d))
1227                         break;
1228                 }
1229                 startOnSignal(RUN | ACTIVE | REQS);
1230             }
1231             else
1232                 onError(new IllegalArgumentException(
1233                             "non-positive subscription request"));
1234         }
1235 
1236         // Consumer task actions
1237 
1238         /**
1239          * Consumer loop, called from ConsumerTask, or indirectly when
1240          * helping during submit.
1241          */
1242         final void consume() {
1243             Subscriber<? super T> s;
1244             if ((s = subscriber) != null) {          // hoist checks
1245                 subscribeOnOpen(s);
1246                 long d = demand;
1247                 for (int h = head, t = tail;;) {
1248                     int c, taken; boolean empty;
1249                     if (((c = ctl) & ERROR) != 0) {
1250                         closeOnError(s, null);
1251                         break;
1252                     }
1253                     else if ((taken = takeItems(s, d, h)) > 0) {
1254                         head = h += taken;
1255                         d = subtractDemand(taken);
1256                     }
1257                     else if ((d = demand) == 0L && (c & REQS) != 0)
1258                         weakCasCtl(c, c & ~REQS);    // exhausted demand
1259                     else if (d != 0L && (c & REQS) == 0)
1260                         weakCasCtl(c, c | REQS);     // new demand
1261                     else if (t == (t = tail)) {      // stability check
1262                         if ((empty = (t == h)) && (c & COMPLETE) != 0) {
1263                             closeOnComplete(s);      // end of stream
1264                             break;
1265                         }
1266                         else if (empty || d == 0L) {
1267                             int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
1268                             if (weakCasCtl(c, c & ~bit) && bit == RUN)
1269                                 break;               // un-keep-alive or exit
1270                         }
1271                     }
1272                 }
1273             }
1274         }
1275 
1276         /**
1277          * Consumes some items until unavailable or bound or error.
1278          *
1279          * @param s subscriber
1280          * @param d current demand
1281          * @param h current head
1282          * @return number taken
1283          */
1284         final int takeItems(Subscriber<? super T> s, long d, int h) {
1285             Object[] a;
1286             int k = 0, cap;
1287             if ((a = array) != null && (cap = a.length) > 0) {
1288                 int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
1289                 int n = (d < (long)b) ? (int)d : b;
1290                 for (; k < n; ++h, ++k) {
1291                     Object x = QA.getAndSet(a, h & m, null);
1292                     if (waiting != 0)
1293                         signalWaiter();
1294                     if (x == null)
1295                         break;
1296                     else if (!consumeNext(s, x))
1297                         break;
1298                 }
1299             }
1300             return k;
1301         }
1302 
1303         final boolean consumeNext(Subscriber<? super T> s, Object x) {
1304             try {
1305                 @SuppressWarnings("unchecked") T y = (T) x;
1306                 if (s != null)
1307                     s.onNext(y);
1308                 return true;
1309             } catch (Throwable ex) {
1310                 handleOnNext(s, ex);
1311                 return false;
1312             }
1313         }
1314 
1315         /**
1316          * Processes exception in Subscriber.onNext.
1317          */
1318         final void handleOnNext(Subscriber<? super T> s, Throwable ex) {
1319             BiConsumer<? super Subscriber<? super T>, ? super Throwable> h;
1320             try {
1321                 if ((h = onNextHandler) != null)
1322                     h.accept(s, ex);
1323             } catch (Throwable ignore) {
1324             }
1325             closeOnError(s, ex);
1326         }
1327 
1328         /**
1329          * Issues subscriber.onSubscribe if this is first signal.
1330          */
1331         final void subscribeOnOpen(Subscriber<? super T> s) {
1332             if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0)
1333                 consumeSubscribe(s);
1334         }
1335 
1336         final void consumeSubscribe(Subscriber<? super T> s) {
1337             try {
1338                 if (s != null) // ignore if disabled
1339                     s.onSubscribe(this);
1340             } catch (Throwable ex) {
1341                 closeOnError(s, ex);
1342             }
1343         }
1344 
1345         /**
1346          * Issues subscriber.onComplete unless already closed.
1347          */
1348         final void closeOnComplete(Subscriber<? super T> s) {
1349             if ((getAndBitwiseOrCtl(CLOSED) & CLOSED) == 0)
1350                 consumeComplete(s);
1351         }
1352 
1353         final void consumeComplete(Subscriber<? super T> s) {
1354             try {
1355                 if (s != null)
1356                     s.onComplete();
1357             } catch (Throwable ignore) {
1358             }
1359         }
1360 
1361         /**
1362          * Issues subscriber.onError, and unblocks producer if needed.
1363          */
1364         final void closeOnError(Subscriber<? super T> s, Throwable ex) {
1365             if ((getAndBitwiseOrCtl(ERROR | CLOSED) & CLOSED) == 0) {
1366                 if (ex == null)
1367                     ex = pendingError;
1368                 pendingError = null;  // detach
1369                 executor = null;      // suppress racing start calls
1370                 signalWaiter();
1371                 consumeError(s, ex);
1372             }
1373         }
1374 
1375         final void consumeError(Subscriber<? super T> s, Throwable ex) {
1376             try {
1377                 if (ex != null && s != null)
1378                     s.onError(ex);
1379             } catch (Throwable ignore) {
1380             }
1381         }
1382 
1383         // Blocking support
1384 
1385         /**
1386          * Unblocks waiting producer.
1387          */
1388         final void signalWaiter() {
1389             Thread w;
1390             waiting = 0;
1391             if ((w = waiter) != null)
1392                 LockSupport.unpark(w);
1393         }
1394 
1395         /**
1396          * Returns true if closed or space available.
1397          * For ManagedBlocker.
1398          */
1399         public final boolean isReleasable() {
1400             Object[] a; int cap;
1401             return ((ctl & CLOSED) != 0 ||
1402                     ((a = array) != null && (cap = a.length) > 0 &&
1403                      QA.getAcquire(a, (cap - 1) & tail) == null));
1404         }
1405 
1406         /**
1407          * Helps or blocks until timeout, closed, or space available.
1408          */
1409         final void awaitSpace(long nanos) {
1410             if (!isReleasable()) {
1411                 ForkJoinPool.helpAsyncBlocker(executor, this);
1412                 if (!isReleasable()) {
1413                     timeout = nanos;
1414                     try {
1415                         ForkJoinPool.managedBlock(this);
1416                     } catch (InterruptedException ie) {
1417                         timeout = INTERRUPTED;
1418                     }
1419                     if (timeout == INTERRUPTED)
1420                         Thread.currentThread().interrupt();
1421                 }
1422             }
1423         }
1424 
1425         /**
1426          * Blocks until closed, space available or timeout.
1427          * For ManagedBlocker.
1428          */
1429         public final boolean block() {
1430             long nanos = timeout;
1431             boolean timed = (nanos < Long.MAX_VALUE);
1432             long deadline = timed ? System.nanoTime() + nanos : 0L;
1433             while (!isReleasable()) {
1434                 if (Thread.interrupted()) {
1435                     timeout = INTERRUPTED;
1436                     if (timed)
1437                         break;
1438                 }
1439                 else if (timed && (nanos = deadline - System.nanoTime()) <= 0L)
1440                     break;
1441                 else if (waiter == null)
1442                     waiter = Thread.currentThread();
1443                 else if (waiting == 0)
1444                     waiting = 1;
1445                 else if (timed)
1446                     LockSupport.parkNanos(this, nanos);
1447                 else
1448                     LockSupport.park(this);
1449             }
1450             waiter = null;
1451             waiting = 0;
1452             return true;
1453         }
1454 
1455         // VarHandle mechanics
1456         static final VarHandle CTL;
1457         static final VarHandle DEMAND;
1458         static final VarHandle QA;
1459 
1460         static {
1461             try {
1462                 MethodHandles.Lookup l = MethodHandles.lookup();
1463                 CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1464                                       int.class);
1465                 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1466                                          long.class);
1467                 QA = MethodHandles.arrayElementVarHandle(Object[].class);
1468             } catch (ReflectiveOperationException e) {
1469                 throw new ExceptionInInitializerError(e);
1470             }
1471 
1472             // Reduce the risk of rare disastrous classloading in first call to
1473             // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1474             Class<?> ensureLoaded = LockSupport.class;
1475         }
1476     }
1477 }