1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.ArrayList;
  39 import java.util.List;
  40 import java.util.concurrent.locks.LockSupport;
  41 import java.util.function.BiConsumer;
  42 import java.util.function.BiPredicate;
  43 import java.util.function.Consumer;
  44 
  45 /**
  46  * A {@link Flow.Publisher} that asynchronously issues submitted
  47  * (non-null) items to current subscribers until it is closed.  Each
  48  * current subscriber receives newly submitted items in the same order
  49  * unless drops or exceptions are encountered.  Using a
  50  * SubmissionPublisher allows item generators to act as compliant <a
  51  * href="http://www.reactive-streams.org/"> reactive-streams</a>
  52  * Publishers relying on drop handling and/or blocking for flow
  53  * control.
  54  *
  55  * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
  56  * constructor for delivery to subscribers. The best choice of
  57  * Executor depends on expected usage. If the generator(s) of
  58  * submitted items run in separate threads, and the number of
  59  * subscribers can be estimated, consider using a {@link
  60  * Executors#newFixedThreadPool}. Otherwise consider using the
  61  * default, normally the {@link ForkJoinPool#commonPool}.
  62  *
  63  * <p>Buffering allows producers and consumers to transiently operate
  64  * at different rates.  Each subscriber uses an independent buffer.
  65  * Buffers are created upon first use and expanded as needed up to the
  66  * given maximum. (The enforced capacity may be rounded up to the
  67  * nearest power of two and/or bounded by the largest value supported
  68  * by this implementation.)  Invocations of {@link
  69  * Flow.Subscription#request(long) request} do not directly result in
  70  * buffer expansion, but risk saturation if unfilled requests exceed
  71  * the maximum capacity.  The default value of {@link
  72  * Flow#defaultBufferSize()} may provide a useful starting point for
  73  * choosing a capacity based on expected rates, resources, and usages.
  74  *
  75  * <p>Publication methods support different policies about what to do
  76  * when buffers are saturated. Method {@link #submit(Object) submit}
  77  * blocks until resources are available. This is simplest, but least
  78  * responsive.  The {@code offer} methods may drop items (either
  79  * immediately or with bounded timeout), but provide an opportunity to
  80  * interpose a handler and then retry.
  81  *
  82  * <p>If any Subscriber method throws an exception, its subscription
  83  * is cancelled.  If a handler is supplied as a constructor argument,
  84  * it is invoked before cancellation upon an exception in method
  85  * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
  86  * {@link Flow.Subscriber#onSubscribe onSubscribe},
  87  * {@link Flow.Subscriber#onError(Throwable) onError} and
  88  * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
  89  * handled before cancellation.  If the supplied Executor throws
  90  * {@link RejectedExecutionException} (or any other RuntimeException
  91  * or Error) when attempting to execute a task, or a drop handler
  92  * throws an exception when processing a dropped item, then the
  93  * exception is rethrown. In these cases, not all subscribers will
  94  * have been issued the published item. It is usually good practice to
  95  * {@link #closeExceptionally closeExceptionally} in these cases.
  96  *
  97  * <p>Method {@link #consume(Consumer)} simplifies support for a
  98  * common case in which the only action of a subscriber is to request
  99  * and process all items using a supplied function.
 100  *
 101  * <p>This class may also serve as a convenient base for subclasses
 102  * that generate items, and use the methods in this class to publish
 103  * them.  For example here is a class that periodically publishes the
 104  * items generated from a supplier. (In practice you might add methods
 105  * to independently start and stop generation, to share Executors
 106  * among publishers, and so on, or use a SubmissionPublisher as a
 107  * component rather than a superclass.)
 108  *
 109  * <pre> {@code
 110  * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
 111  *   final ScheduledFuture<?> periodicTask;
 112  *   final ScheduledExecutorService scheduler;
 113  *   PeriodicPublisher(Executor executor, int maxBufferCapacity,
 114  *                     Supplier<? extends T> supplier,
 115  *                     long period, TimeUnit unit) {
 116  *     super(executor, maxBufferCapacity);
 117  *     scheduler = new ScheduledThreadPoolExecutor(1);
 118  *     periodicTask = scheduler.scheduleAtFixedRate(
 119  *       () -> submit(supplier.get()), 0, period, unit);
 120  *   }
 121  *   public void close() {
 122  *     periodicTask.cancel(false);
 123  *     scheduler.shutdown();
 124  *     super.close();
 125  *   }
 126  * }}</pre>
 127  *
 128  * <p>Here is an example of a {@link Flow.Processor} implementation.
 129  * It uses single-step requests to its publisher for simplicity of
 130  * illustration. A more adaptive version could monitor flow using the
 131  * lag estimate returned from {@code submit}, along with other utility
 132  * methods.
 133  *
 134  * <pre> {@code
 135  * class TransformProcessor<S,T> extends SubmissionPublisher<T>
 136  *   implements Flow.Processor<S,T> {
 137  *   final Function<? super S, ? extends T> function;
 138  *   Flow.Subscription subscription;
 139  *   TransformProcessor(Executor executor, int maxBufferCapacity,
 140  *                      Function<? super S, ? extends T> function) {
 141  *     super(executor, maxBufferCapacity);
 142  *     this.function = function;
 143  *   }
 144  *   public void onSubscribe(Flow.Subscription subscription) {
 145  *     (this.subscription = subscription).request(1);
 146  *   }
 147  *   public void onNext(S item) {
 148  *     subscription.request(1);
 149  *     submit(function.apply(item));
 150  *   }
 151  *   public void onError(Throwable ex) { closeExceptionally(ex); }
 152  *   public void onComplete() { close(); }
 153  * }}</pre>
 154  *
 155  * @param <T> the published item type
 156  * @author Doug Lea
 157  * @since 1.9
 158  */
 159 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
 160                                                AutoCloseable {
 161     /*
 162      * Most mechanics are handled by BufferedSubscription. This class
 163      * mainly tracks subscribers and ensures sequentiality, by using
 164      * built-in synchronization locks across public methods. (Using
 165      * built-in locks works well in the most typical case in which
 166      * only one thread submits items).
 167      */
 168 
 169     /** The largest possible power of two array size. */
 170     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
 171 
 172     /** Round capacity to power of 2, at most limit. */
 173     static final int roundCapacity(int cap) {
 174         int n = cap - 1;
 175         n |= n >>> 1;
 176         n |= n >>> 2;
 177         n |= n >>> 4;
 178         n |= n >>> 8;
 179         n |= n >>> 16;
 180         return (n <= 0) ? 1 : // at least 1
 181             (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
 182     }
 183 
 184     // default Executor setup; nearly the same as CompletableFuture
 185 
 186     /**
 187      * Default executor -- ForkJoinPool.commonPool() unless it cannot
 188      * support parallelism.
 189      */
 190     private static final Executor ASYNC_POOL =
 191         (ForkJoinPool.getCommonPoolParallelism() > 1) ?
 192         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
 193 
 194     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
 195     private static final class ThreadPerTaskExecutor implements Executor {
 196         public void execute(Runnable r) { new Thread(r).start(); }
 197     }
 198 
 199     /**
 200      * Clients (BufferedSubscriptions) are maintained in a linked list
 201      * (via their "next" fields). This works well for publish loops.
 202      * It requires O(n) traversal to check for duplicate subscribers,
 203      * but we expect that subscribing is much less common than
 204      * publishing. Unsubscribing occurs only during traversal loops,
 205      * when BufferedSubscription methods return negative values
 206      * signifying that they have been disabled.  To reduce
 207      * head-of-line blocking, submit and offer methods first call
 208      * BufferedSubscription.offer on each subscriber, and place
 209      * saturated ones in retries list (using nextRetry field), and
 210      * retry, possibly blocking or dropping.
 211      */
 212     BufferedSubscription<T> clients;
 213 
 214     /** Run status, updated only within locks */
 215     volatile boolean closed;
 216     /** If non-null, the exception in closeExceptionally */
 217     volatile Throwable closedException;
 218 
 219     // Parameters for constructing BufferedSubscriptions
 220     final Executor executor;
 221     final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
 222     final int maxBufferCapacity;
 223 
 224     /**
 225      * Creates a new SubmissionPublisher using the given Executor for
 226      * async delivery to subscribers, with the given maximum buffer size
 227      * for each subscriber, and, if non-null, the given handler invoked
 228      * when any Subscriber throws an exception in method {@link
 229      * Flow.Subscriber#onNext(Object) onNext}.
 230      *
 231      * @param executor the executor to use for async delivery,
 232      * supporting creation of at least one independent thread
 233      * @param maxBufferCapacity the maximum capacity for each
 234      * subscriber's buffer (the enforced capacity may be rounded up to
 235      * the nearest power of two and/or bounded by the largest value
 236      * supported by this implementation; method {@link #getMaxBufferCapacity}
 237      * returns the actual value)
 238      * @param handler if non-null, procedure to invoke upon exception
 239      * thrown in method {@code onNext}
 240      * @throws NullPointerException if executor is null
 241      * @throws IllegalArgumentException if maxBufferCapacity not
 242      * positive
 243      */
 244     public SubmissionPublisher(Executor executor, int maxBufferCapacity,
 245                                BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
 246         if (executor == null)
 247             throw new NullPointerException();
 248         if (maxBufferCapacity <= 0)
 249             throw new IllegalArgumentException("capacity must be positive");
 250         this.executor = executor;
 251         this.onNextHandler = handler;
 252         this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
 253     }
 254 
 255     /**
 256      * Creates a new SubmissionPublisher using the given Executor for
 257      * async delivery to subscribers, with the given maximum buffer size
 258      * for each subscriber, and no handler for Subscriber exceptions in
 259      * method {@link Flow.Subscriber#onNext(Object) onNext}.
 260      *
 261      * @param executor the executor to use for async delivery,
 262      * supporting creation of at least one independent thread
 263      * @param maxBufferCapacity the maximum capacity for each
 264      * subscriber's buffer (the enforced capacity may be rounded up to
 265      * the nearest power of two and/or bounded by the largest value
 266      * supported by this implementation; method {@link #getMaxBufferCapacity}
 267      * returns the actual value)
 268      * @throws NullPointerException if executor is null
 269      * @throws IllegalArgumentException if maxBufferCapacity not
 270      * positive
 271      */
 272     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
 273         this(executor, maxBufferCapacity, null);
 274     }
 275 
 276     /**
 277      * Creates a new SubmissionPublisher using the {@link
 278      * ForkJoinPool#commonPool()} for async delivery to subscribers
 279      * (unless it does not support a parallelism level of at least two,
 280      * in which case, a new Thread is created to run each task), with
 281      * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
 282      * handler for Subscriber exceptions in method {@link
 283      * Flow.Subscriber#onNext(Object) onNext}.
 284      */
 285     public SubmissionPublisher() {
 286         this(ASYNC_POOL, Flow.defaultBufferSize(), null);
 287     }
 288 
 289     /**
 290      * Adds the given Subscriber unless already subscribed.  If already
 291      * subscribed, the Subscriber's {@link
 292      * Flow.Subscriber#onError(Throwable) onError} method is invoked on
 293      * the existing subscription with an {@link IllegalStateException}.
 294      * Otherwise, upon success, the Subscriber's {@link
 295      * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
 296      * asynchronously with a new {@link Flow.Subscription}.  If {@link
 297      * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
 298      * subscription is cancelled. Otherwise, if this SubmissionPublisher
 299      * was closed exceptionally, then the subscriber's {@link
 300      * Flow.Subscriber#onError onError} method is invoked with the
 301      * corresponding exception, or if closed without exception, the
 302      * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
 303      * method is invoked.  Subscribers may enable receiving items by
 304      * invoking the {@link Flow.Subscription#request(long) request}
 305      * method of the new Subscription, and may unsubscribe by invoking
 306      * its {@link Flow.Subscription#cancel() cancel} method.
 307      *
 308      * @param subscriber the subscriber
 309      * @throws NullPointerException if subscriber is null
 310      */
 311     public void subscribe(Flow.Subscriber<? super T> subscriber) {
 312         if (subscriber == null) throw new NullPointerException();
 313         BufferedSubscription<T> subscription =
 314             new BufferedSubscription<T>(subscriber, executor,
 315                                         onNextHandler, maxBufferCapacity);
 316         synchronized (this) {
 317             for (BufferedSubscription<T> b = clients, pred = null;;) {
 318                 if (b == null) {
 319                     Throwable ex;
 320                     subscription.onSubscribe();
 321                     if ((ex = closedException) != null)
 322                         subscription.onError(ex);
 323                     else if (closed)
 324                         subscription.onComplete();
 325                     else if (pred == null)
 326                         clients = subscription;
 327                     else
 328                         pred.next = subscription;
 329                     break;
 330                 }
 331                 BufferedSubscription<T> next = b.next;
 332                 if (b.isDisabled()) { // remove
 333                     b.next = null;    // detach
 334                     if (pred == null)
 335                         clients = next;
 336                     else
 337                         pred.next = next;
 338                 }
 339                 else if (subscriber.equals(b.subscriber)) {
 340                     b.onError(new IllegalStateException("Duplicate subscribe"));
 341                     break;
 342                 }
 343                 else
 344                     pred = b;
 345                 b = next;
 346             }
 347         }
 348     }
 349 
 350     /**
 351      * Publishes the given item to each current subscriber by
 352      * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
 353      * onNext} method, blocking uninterruptibly while resources for any
 354      * subscriber are unavailable. This method returns an estimate of
 355      * the maximum lag (number of items submitted but not yet consumed)
 356      * among all current subscribers. This value is at least one
 357      * (accounting for this submitted item) if there are any
 358      * subscribers, else zero.
 359      *
 360      * <p>If the Executor for this publisher throws a
 361      * RejectedExecutionException (or any other RuntimeException or
 362      * Error) when attempting to asynchronously notify subscribers,
 363      * then this exception is rethrown, in which case not all
 364      * subscribers will have been issued this item.
 365      *
 366      * @param item the (non-null) item to publish
 367      * @return the estimated maximum lag among subscribers
 368      * @throws IllegalStateException if closed
 369      * @throws NullPointerException if item is null
 370      * @throws RejectedExecutionException if thrown by Executor
 371      */
 372     public int submit(T item) {
 373         if (item == null) throw new NullPointerException();
 374         int lag = 0;
 375         boolean complete;
 376         synchronized (this) {
 377             complete = closed;
 378             BufferedSubscription<T> b = clients;
 379             if (!complete) {
 380                 BufferedSubscription<T> pred = null, r = null, rtail = null;
 381                 while (b != null) {
 382                     BufferedSubscription<T> next = b.next;
 383                     int stat = b.offer(item);
 384                     if (stat < 0) {           // disabled
 385                         b.next = null;
 386                         if (pred == null)
 387                             clients = next;
 388                         else
 389                             pred.next = next;
 390                     }
 391                     else {
 392                         if (stat > lag)
 393                             lag = stat;
 394                         else if (stat == 0) { // place on retry list
 395                             b.nextRetry = null;
 396                             if (rtail == null)
 397                                 r = b;
 398                             else
 399                                 rtail.nextRetry = b;
 400                             rtail = b;
 401                         }
 402                         pred = b;
 403                     }
 404                     b = next;
 405                 }
 406                 while (r != null) {
 407                     BufferedSubscription<T> nextRetry = r.nextRetry;
 408                     r.nextRetry = null;
 409                     int stat = r.submit(item);
 410                     if (stat > lag)
 411                         lag = stat;
 412                     else if (stat < 0 && clients == r)
 413                         clients = r.next; // postpone internal unsubscribes
 414                     r = nextRetry;
 415                 }
 416             }
 417         }
 418         if (complete)
 419             throw new IllegalStateException("Closed");
 420         else
 421             return lag;
 422     }
 423 
 424     /**
 425      * Publishes the given item, if possible, to each current subscriber
 426      * by asynchronously invoking its {@link
 427      * Flow.Subscriber#onNext(Object) onNext} method. The item may be
 428      * dropped by one or more subscribers if resource limits are
 429      * exceeded, in which case the given handler (if non-null) is
 430      * invoked, and if it returns true, retried once.  Other calls to
 431      * methods in this class by other threads are blocked while the
 432      * handler is invoked.  Unless recovery is assured, options are
 433      * usually limited to logging the error and/or issuing an {@link
 434      * Flow.Subscriber#onError(Throwable) onError} signal to the
 435      * subscriber.
 436      *
 437      * <p>This method returns a status indicator: If negative, it
 438      * represents the (negative) number of drops (failed attempts to
 439      * issue the item to a subscriber). Otherwise it is an estimate of
 440      * the maximum lag (number of items submitted but not yet
 441      * consumed) among all current subscribers. This value is at least
 442      * one (accounting for this submitted item) if there are any
 443      * subscribers, else zero.
 444      *
 445      * <p>If the Executor for this publisher throws a
 446      * RejectedExecutionException (or any other RuntimeException or
 447      * Error) when attempting to asynchronously notify subscribers, or
 448      * the drop handler throws an exception when processing a dropped
 449      * item, then this exception is rethrown.
 450      *
 451      * @param item the (non-null) item to publish
 452      * @param onDrop if non-null, the handler invoked upon a drop to a
 453      * subscriber, with arguments of the subscriber and item; if it
 454      * returns true, an offer is re-attempted (once)
 455      * @return if negative, the (negative) number of drops; otherwise
 456      * an estimate of maximum lag
 457      * @throws IllegalStateException if closed
 458      * @throws NullPointerException if item is null
 459      * @throws RejectedExecutionException if thrown by Executor
 460      */
 461     public int offer(T item,
 462                      BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
 463         return doOffer(0L, item, onDrop);
 464     }
 465 
 466     /**
 467      * Publishes the given item, if possible, to each current subscriber
 468      * by asynchronously invoking its {@link
 469      * Flow.Subscriber#onNext(Object) onNext} method, blocking while
 470      * resources for any subscription are unavailable, up to the
 471      * specified timeout or until the caller thread is interrupted, at
 472      * which point the given handler (if non-null) is invoked, and if it
 473      * returns true, retried once. (The drop handler may distinguish
 474      * timeouts from interrupts by checking whether the current thread
 475      * is interrupted.)  Other calls to methods in this class by other
 476      * threads are blocked while the handler is invoked.  Unless
 477      * recovery is assured, options are usually limited to logging the
 478      * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
 479      * onError} signal to the subscriber.
 480      *
 481      * <p>This method returns a status indicator: If negative, it
 482      * represents the (negative) number of drops (failed attempts to
 483      * issue the item to a subscriber). Otherwise it is an estimate of
 484      * the maximum lag (number of items submitted but not yet
 485      * consumed) among all current subscribers. This value is at least
 486      * one (accounting for this submitted item) if there are any
 487      * subscribers, else zero.
 488      *
 489      * <p>If the Executor for this publisher throws a
 490      * RejectedExecutionException (or any other RuntimeException or
 491      * Error) when attempting to asynchronously notify subscribers, or
 492      * the drop handler throws an exception when processing a dropped
 493      * item, then this exception is rethrown.
 494      *
 495      * @param item the (non-null) item to publish
 496      * @param timeout how long to wait for resources for any subscriber
 497      * before giving up, in units of {@code unit}
 498      * @param unit a {@code TimeUnit} determining how to interpret the
 499      * {@code timeout} parameter
 500      * @param onDrop if non-null, the handler invoked upon a drop to a
 501      * subscriber, with arguments of the subscriber and item; if it
 502      * returns true, an offer is re-attempted (once)
 503      * @return if negative, the (negative) number of drops; otherwise
 504      * an estimate of maximum lag
 505      * @throws IllegalStateException if closed
 506      * @throws NullPointerException if item is null
 507      * @throws RejectedExecutionException if thrown by Executor
 508      */
 509     public int offer(T item, long timeout, TimeUnit unit,
 510                      BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
 511         return doOffer(unit.toNanos(timeout), item, onDrop);
 512     }
 513 
 514     /** Common implementation for both forms of offer */
 515     final int doOffer(long nanos, T item,
 516                       BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
 517         if (item == null) throw new NullPointerException();
 518         int lag = 0, drops = 0;
 519         boolean complete;
 520         synchronized (this) {
 521             complete = closed;
 522             BufferedSubscription<T> b = clients;
 523             if (!complete) {
 524                 BufferedSubscription<T> pred = null, r = null, rtail = null;
 525                 while (b != null) {
 526                     BufferedSubscription<T> next = b.next;
 527                     int stat = b.offer(item);
 528                     if (stat < 0) {
 529                         b.next = null;
 530                         if (pred == null)
 531                             clients = next;
 532                         else
 533                             pred.next = next;
 534                     }
 535                     else {
 536                         if (stat > lag)
 537                             lag = stat;
 538                         else if (stat == 0) {
 539                             b.nextRetry = null;
 540                             if (rtail == null)
 541                                 r = b;
 542                             else
 543                                 rtail.nextRetry = b;
 544                             rtail = b;
 545                         }
 546                         else if (stat > lag)
 547                             lag = stat;
 548                         pred = b;
 549                     }
 550                     b = next;
 551                 }
 552                 while (r != null) {
 553                     BufferedSubscription<T> nextRetry = r.nextRetry;
 554                     r.nextRetry = null;
 555                     int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
 556                         r.offer(item);
 557                     if (stat == 0 && onDrop != null &&
 558                         onDrop.test(r.subscriber, item))
 559                         stat = r.offer(item);
 560                     if (stat == 0)
 561                         ++drops;
 562                     else if (stat > lag)
 563                         lag = stat;
 564                     else if (stat < 0 && clients == r)
 565                         clients = r.next;
 566                     r = nextRetry;
 567                 }
 568             }
 569         }
 570         if (complete)
 571             throw new IllegalStateException("Closed");
 572         else
 573             return (drops > 0) ? -drops : lag;
 574     }
 575 
 576     /**
 577      * Unless already closed, issues {@link
 578      * Flow.Subscriber#onComplete() onComplete} signals to current
 579      * subscribers, and disallows subsequent attempts to publish.
 580      * Upon return, this method does <em>NOT</em> guarantee that all
 581      * subscribers have yet completed.
 582      */
 583     public void close() {
 584         if (!closed) {
 585             BufferedSubscription<T> b;
 586             synchronized (this) {
 587                 b = clients;
 588                 clients = null;
 589                 closed = true;
 590             }
 591             while (b != null) {
 592                 BufferedSubscription<T> next = b.next;
 593                 b.next = null;
 594                 b.onComplete();
 595                 b = next;
 596             }
 597         }
 598     }
 599 
 600     /**
 601      * Unless already closed, issues {@link
 602      * Flow.Subscriber#onError(Throwable) onError} signals to current
 603      * subscribers with the given error, and disallows subsequent
 604      * attempts to publish.  Future subscribers also receive the given
 605      * error. Upon return, this method does <em>NOT</em> guarantee
 606      * that all subscribers have yet completed.
 607      *
 608      * @param error the {@code onError} argument sent to subscribers
 609      * @throws NullPointerException if error is null
 610      */
 611     public void closeExceptionally(Throwable error) {
 612         if (error == null)
 613             throw new NullPointerException();
 614         if (!closed) {
 615             BufferedSubscription<T> b;
 616             synchronized (this) {
 617                 b = clients;
 618                 clients = null;
 619                 closed = true;
 620                 closedException = error;
 621             }
 622             while (b != null) {
 623                 BufferedSubscription<T> next = b.next;
 624                 b.next = null;
 625                 b.onError(error);
 626                 b = next;
 627             }
 628         }
 629     }
 630 
 631     /**
 632      * Returns true if this publisher is not accepting submissions.
 633      *
 634      * @return true if closed
 635      */
 636     public boolean isClosed() {
 637         return closed;
 638     }
 639 
 640     /**
 641      * Returns the exception associated with {@link
 642      * #closeExceptionally(Throwable) closeExceptionally}, or null if
 643      * not closed or if closed normally.
 644      *
 645      * @return the exception, or null if none
 646      */
 647     public Throwable getClosedException() {
 648         return closedException;
 649     }
 650 
 651     /**
 652      * Returns true if this publisher has any subscribers.
 653      *
 654      * @return true if this publisher has any subscribers
 655      */
 656     public boolean hasSubscribers() {
 657         boolean nonEmpty = false;
 658         if (!closed) {
 659             synchronized (this) {
 660                 for (BufferedSubscription<T> b = clients; b != null;) {
 661                     BufferedSubscription<T> next = b.next;
 662                     if (b.isDisabled()) {
 663                         b.next = null;
 664                         b = clients = next;
 665                     }
 666                     else {
 667                         nonEmpty = true;
 668                         break;
 669                     }
 670                 }
 671             }
 672         }
 673         return nonEmpty;
 674     }
 675 
 676     /**
 677      * Returns the number of current subscribers.
 678      *
 679      * @return the number of current subscribers
 680      */
 681     public int getNumberOfSubscribers() {
 682         int count = 0;
 683         if (!closed) {
 684             synchronized (this) {
 685                 BufferedSubscription<T> pred = null, next;
 686                 for (BufferedSubscription<T> b = clients; b != null; b = next) {
 687                     next = b.next;
 688                     if (b.isDisabled()) {
 689                         b.next = null;
 690                         if (pred == null)
 691                             clients = next;
 692                         else
 693                             pred.next = next;
 694                     }
 695                     else {
 696                         pred = b;
 697                         ++count;
 698                     }
 699                 }
 700             }
 701         }
 702         return count;
 703     }
 704 
 705     /**
 706      * Returns the Executor used for asynchronous delivery.
 707      *
 708      * @return the Executor used for asynchronous delivery
 709      */
 710     public Executor getExecutor() {
 711         return executor;
 712     }
 713 
 714     /**
 715      * Returns the maximum per-subscriber buffer capacity.
 716      *
 717      * @return the maximum per-subscriber buffer capacity
 718      */
 719     public int getMaxBufferCapacity() {
 720         return maxBufferCapacity;
 721     }
 722 
 723     /**
 724      * Returns a list of current subscribers for monitoring and
 725      * tracking purposes, not for invoking {@link Flow.Subscriber}
 726      * methods on the subscribers.
 727      *
 728      * @return list of current subscribers
 729      */
 730     public List<Flow.Subscriber<? super T>> getSubscribers() {
 731         ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
 732         synchronized (this) {
 733             BufferedSubscription<T> pred = null, next;
 734             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 735                 next = b.next;
 736                 if (b.isDisabled()) {
 737                     b.next = null;
 738                     if (pred == null)
 739                         clients = next;
 740                     else
 741                         pred.next = next;
 742                 }
 743                 else
 744                     subs.add(b.subscriber);
 745             }
 746         }
 747         return subs;
 748     }
 749 
 750     /**
 751      * Returns true if the given Subscriber is currently subscribed.
 752      *
 753      * @param subscriber the subscriber
 754      * @return true if currently subscribed
 755      * @throws NullPointerException if subscriber is null
 756      */
 757     public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
 758         if (subscriber == null) throw new NullPointerException();
 759         if (!closed) {
 760             synchronized (this) {
 761                 BufferedSubscription<T> pred = null, next;
 762                 for (BufferedSubscription<T> b = clients; b != null; b = next) {
 763                     next = b.next;
 764                     if (b.isDisabled()) {
 765                         b.next = null;
 766                         if (pred == null)
 767                             clients = next;
 768                         else
 769                             pred.next = next;
 770                     }
 771                     else if (subscriber.equals(b.subscriber))
 772                         return true;
 773                     else
 774                         pred = b;
 775                 }
 776             }
 777         }
 778         return false;
 779     }
 780 
 781     /**
 782      * Returns an estimate of the minimum number of items requested
 783      * (via {@link Flow.Subscription#request(long) request}) but not
 784      * yet produced, among all current subscribers.
 785      *
 786      * @return the estimate, or zero if no subscribers
 787      */
 788     public long estimateMinimumDemand() {
 789         long min = Long.MAX_VALUE;
 790         boolean nonEmpty = false;
 791         synchronized (this) {
 792             BufferedSubscription<T> pred = null, next;
 793             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 794                 int n; long d;
 795                 next = b.next;
 796                 if ((n = b.estimateLag()) < 0) {
 797                     b.next = null;
 798                     if (pred == null)
 799                         clients = next;
 800                     else
 801                         pred.next = next;
 802                 }
 803                 else {
 804                     if ((d = b.demand - n) < min)
 805                         min = d;
 806                     nonEmpty = true;
 807                     pred = b;
 808                 }
 809             }
 810         }
 811         return nonEmpty ? min : 0;
 812     }
 813 
 814     /**
 815      * Returns an estimate of the maximum number of items produced but
 816      * not yet consumed among all current subscribers.
 817      *
 818      * @return the estimate
 819      */
 820     public int estimateMaximumLag() {
 821         int max = 0;
 822         synchronized (this) {
 823             BufferedSubscription<T> pred = null, next;
 824             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 825                 int n;
 826                 next = b.next;
 827                 if ((n = b.estimateLag()) < 0) {
 828                     b.next = null;
 829                     if (pred == null)
 830                         clients = next;
 831                     else
 832                         pred.next = next;
 833                 }
 834                 else {
 835                     if (n > max)
 836                         max = n;
 837                     pred = b;
 838                 }
 839             }
 840         }
 841         return max;
 842     }
 843 
 844     /**
 845      * Processes all published items using the given Consumer function.
 846      * Returns a CompletableFuture that is completed normally when this
 847      * publisher signals {@link Flow.Subscriber#onComplete()
 848      * onComplete}, or completed exceptionally upon any error, or an
 849      * exception is thrown by the Consumer, or the returned
 850      * CompletableFuture is cancelled, in which case no further items
 851      * are processed.
 852      *
 853      * @param consumer the function applied to each onNext item
 854      * @return a CompletableFuture that is completed normally
 855      * when the publisher signals onComplete, and exceptionally
 856      * upon any error or cancellation
 857      * @throws NullPointerException if consumer is null
 858      */
 859     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
 860         if (consumer == null)
 861             throw new NullPointerException();
 862         CompletableFuture<Void> status = new CompletableFuture<>();
 863         subscribe(new ConsumerSubscriber<T>(status, consumer));
 864         return status;
 865     }
 866 
 867     /** Subscriber for method consume */
 868     private static final class ConsumerSubscriber<T>
 869             implements Flow.Subscriber<T> {
 870         final CompletableFuture<Void> status;
 871         final Consumer<? super T> consumer;
 872         Flow.Subscription subscription;
 873         ConsumerSubscriber(CompletableFuture<Void> status,
 874                            Consumer<? super T> consumer) {
 875             this.status = status; this.consumer = consumer;
 876         }
 877         public final void onSubscribe(Flow.Subscription subscription) {
 878             this.subscription = subscription;
 879             status.whenComplete((v, e) -> subscription.cancel());
 880             if (!status.isDone())
 881                 subscription.request(Long.MAX_VALUE);
 882         }
 883         public final void onError(Throwable ex) {
 884             status.completeExceptionally(ex);
 885         }
 886         public final void onComplete() {
 887             status.complete(null);
 888         }
 889         public final void onNext(T item) {
 890             try {
 891                 consumer.accept(item);
 892             } catch (Throwable ex) {
 893                 subscription.cancel();
 894                 status.completeExceptionally(ex);
 895             }
 896         }
 897     }
 898 
 899     /**
 900      * A task for consuming buffer items and signals, created and
 901      * executed whenever they become available. A task consumes as
 902      * many items/signals as possible before terminating, at which
 903      * point another task is created when needed. The dual Runnable
 904      * and ForkJoinTask declaration saves overhead when executed by
 905      * ForkJoinPools, without impacting other kinds of Executors.
 906      */
 907     @SuppressWarnings("serial")
 908     static final class ConsumerTask<T> extends ForkJoinTask<Void>
 909         implements Runnable {
 910         final BufferedSubscription<T> consumer;
 911         ConsumerTask(BufferedSubscription<T> consumer) {
 912             this.consumer = consumer;
 913         }
 914         public final Void getRawResult() { return null; }
 915         public final void setRawResult(Void v) {}
 916         public final boolean exec() { consumer.consume(); return false; }
 917         public final void run() { consumer.consume(); }
 918     }
 919 
 920     /**
 921      * A bounded (ring) buffer with integrated control to start a
 922      * consumer task whenever items are available.  The buffer
 923      * algorithm is similar to one used inside ForkJoinPool (see its
 924      * internal documentation for details) specialized for the case of
 925      * at most one concurrent producer and consumer, and power of two
 926      * buffer sizes. This allows methods to operate without locks even
 927      * while supporting resizing, blocking, task-triggering, and
 928      * garbage-free buffers (nulling out elements when consumed),
 929      * although supporting these does impose a bit of overhead
 930      * compared to plain fixed-size ring buffers.
 931      *
 932      * The publisher guarantees a single producer via its lock.  We
 933      * ensure in this class that there is at most one consumer.  The
 934      * request and cancel methods must be fully thread-safe but are
 935      * coded to exploit the most common case in which they are only
 936      * called by consumers (usually within onNext).
 937      *
 938      * Execution control is managed using the ACTIVE ctl bit. We
 939      * ensure that a task is active when consumable items (and
 940      * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
 941      * there is demand (unfilled requests).  This is complicated on
 942      * the creation side by the possibility of exceptions when trying
 943      * to execute tasks. These eventually force DISABLED state, but
 944      * sometimes not directly. On the task side, termination (clearing
 945      * ACTIVE) that would otherwise race with producers or request()
 946      * calls uses the CONSUME keep-alive bit to force a recheck.
 947      *
 948      * The ctl field also manages run state. When DISABLED, no further
 949      * updates are possible. Disabling may be preceded by setting
 950      * ERROR or COMPLETE (or both -- ERROR has precedence), in which
 951      * case the associated Subscriber methods are invoked, possibly
 952      * synchronously if there is no active consumer task (including
 953      * cases where execute() failed). The cancel() method is supported
 954      * by treating as ERROR but suppressing onError signal.
 955      *
 956      * Support for blocking also exploits the fact that there is only
 957      * one possible waiter. ManagedBlocker-compatible control fields
 958      * are placed in this class itself rather than in wait-nodes.
 959      * Blocking control relies on the "waiter" field. Producers set
 960      * the field before trying to block, but must then recheck (via
 961      * offer) before parking. Signalling then just unparks and clears
 962      * waiter field. If the producer and consumer are both in the same
 963      * ForkJoinPool, or consumers are running in commonPool, the
 964      * producer attempts to help run consumer tasks that it forked
 965      * before blocking.  To avoid potential cycles, only one level of
 966      * helping is currently supported.
 967      *
 968      * This class uses @Contended and heuristic field declaration
 969      * ordering to reduce false-sharing-based memory contention among
 970      * instances of BufferedSubscription, but it does not currently
 971      * attempt to avoid memory contention among buffers. This field
 972      * and element packing can hurt performance especially when each
 973      * publisher has only one client operating at a high rate.
 974      * Addressing this may require allocating substantially more space
 975      * than users expect.
 976      */
 977     @SuppressWarnings("serial")
 978     @sun.misc.Contended
 979     private static final class BufferedSubscription<T>
 980         implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
 981         // Order-sensitive field declarations
 982         long timeout;                      // > 0 if timed wait
 983         volatile long demand;              // # unfilled requests
 984         int maxCapacity;                   // reduced on OOME
 985         int putStat;                       // offer result for ManagedBlocker
 986         int helpDepth;                     // nested helping depth (at most 1)
 987         volatile int ctl;                  // atomic run state flags
 988         volatile int head;                 // next position to take
 989         int tail;                          // next position to put
 990         Object[] array;                    // buffer: null if disabled
 991         Flow.Subscriber<? super T> subscriber; // null if disabled
 992         Executor executor;                 // null if disabled
 993         BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
 994         volatile Throwable pendingError;   // holds until onError issued
 995         volatile Thread waiter;            // blocked producer thread
 996         T putItem;                         // for offer within ManagedBlocker
 997         BufferedSubscription<T> next;      // used only by publisher
 998         BufferedSubscription<T> nextRetry; // used only by publisher
 999 
1000         // ctl values
1001         static final int ACTIVE    = 0x01; // consumer task active
1002         static final int CONSUME   = 0x02; // keep-alive for consumer task
1003         static final int DISABLED  = 0x04; // final state
1004         static final int ERROR     = 0x08; // signal onError then disable
1005         static final int SUBSCRIBE = 0x10; // signal onSubscribe
1006         static final int COMPLETE  = 0x20; // signal onComplete when done
1007 
1008         static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
1009 
1010         /**
1011          * Initial buffer capacity used when maxBufferCapacity is
1012          * greater. Must be a power of two.
1013          */
1014         static final int DEFAULT_INITIAL_CAP = 32;
1015 
1016         BufferedSubscription(Flow.Subscriber<? super T> subscriber,
1017                              Executor executor,
1018                              BiConsumer<? super Flow.Subscriber<? super T>,
1019                              ? super Throwable> onNextHandler,
1020                              int maxBufferCapacity) {
1021             this.subscriber = subscriber;
1022             this.executor = executor;
1023             this.onNextHandler = onNextHandler;
1024             this.maxCapacity = maxBufferCapacity;
1025             this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
1026                                     (maxBufferCapacity < 2 ? // at least 2 slots
1027                                      2 : maxBufferCapacity) :
1028                                     DEFAULT_INITIAL_CAP];
1029         }
1030 
1031         final boolean isDisabled() {
1032             return ctl == DISABLED;
1033         }
1034 
1035         /**
1036          * Returns estimated number of buffered items, or -1 if
1037          * disabled.
1038          */
1039         final int estimateLag() {
1040             int n;
1041             return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
1042         }
1043 
1044         /**
1045          * Tries to add item and start consumer task if necessary.
1046          * @return -1 if disabled, 0 if dropped, else estimated lag
1047          */
1048         final int offer(T item) {
1049             int h = head, t = tail, cap, size, stat;
1050             Object[] a = array;
1051             if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
1052                 a[(cap - 1) & t] = item;    // relaxed writes OK
1053                 tail = t + 1;
1054                 stat = size;
1055             }
1056             else
1057                 stat = growAndAdd(a, item);
1058             return (stat > 0 &&
1059                     (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
1060                 startOnOffer(stat) : stat;
1061         }
1062 
1063         /**
1064          * Tries to create or expand buffer, then adds item if possible.
1065          */
1066         private int growAndAdd(Object[] a, T item) {
1067             boolean alloc;
1068             int cap, stat;
1069             if ((ctl & (ERROR | DISABLED)) != 0) {
1070                 cap = 0;
1071                 stat = -1;
1072                 alloc = false;
1073             }
1074             else if (a == null || (cap = a.length) <= 0) {
1075                 cap = 0;
1076                 stat = 1;
1077                 alloc = true;
1078             }
1079             else {
1080                 U.fullFence();                   // recheck
1081                 int h = head, t = tail, size = t + 1 - h;
1082                 if (cap >= size) {
1083                     a[(cap - 1) & t] = item;
1084                     tail = t + 1;
1085                     stat = size;
1086                     alloc = false;
1087                 }
1088                 else if (cap >= maxCapacity) {
1089                     stat = 0;                    // cannot grow
1090                     alloc = false;
1091                 }
1092                 else {
1093                     stat = cap + 1;
1094                     alloc = true;
1095                 }
1096             }
1097             if (alloc) {
1098                 int newCap = (cap > 0) ? cap << 1 : 1;
1099                 if (newCap <= cap)
1100                     stat = 0;
1101                 else {
1102                     Object[] newArray = null;
1103                     try {
1104                         newArray = new Object[newCap];
1105                     } catch (Throwable ex) {     // try to cope with OOME
1106                     }
1107                     if (newArray == null) {
1108                         if (cap > 0)
1109                             maxCapacity = cap;   // avoid continuous failure
1110                         stat = 0;
1111                     }
1112                     else {
1113                         array = newArray;
1114                         int t = tail;
1115                         int newMask = newCap - 1;
1116                         if (a != null && cap > 0) {
1117                             int mask = cap - 1;
1118                             for (int j = head; j != t; ++j) {
1119                                 long k = ((long)(j & mask) << ASHIFT) + ABASE;
1120                                 Object x = U.getObjectVolatile(a, k);
1121                                 if (x != null && // races with consumer
1122                                     U.compareAndSwapObject(a, k, x, null))
1123                                     newArray[j & newMask] = x;
1124                             }
1125                         }
1126                         newArray[t & newMask] = item;
1127                         tail = t + 1;
1128                     }
1129                 }
1130             }
1131             return stat;
1132         }
1133 
1134         /**
1135          * Spins/helps/blocks while offer returns 0.  Called only if
1136          * initial offer return 0.
1137          */
1138         final int submit(T item) {
1139             int stat; Executor e; ForkJoinWorkerThread w;
1140             if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1141                 ((e = executor) instanceof ForkJoinPool)) {
1142                 helpDepth = 1;
1143                 Thread thread = Thread.currentThread();
1144                 if ((thread instanceof ForkJoinWorkerThread) &&
1145                     ((w = (ForkJoinWorkerThread)thread)).getPool() == e)
1146                     stat = internalHelpConsume(w.workQueue, item);
1147                 else if (e == ForkJoinPool.commonPool())
1148                     stat = externalHelpConsume
1149                         (ForkJoinPool.commonSubmitterQueue(), item);
1150                 helpDepth = 0;
1151             }
1152             if (stat == 0 && (stat = offer(item)) == 0) {
1153                 putItem = item;
1154                 timeout = 0L;
1155                 try {
1156                     ForkJoinPool.managedBlock(this);
1157                 } catch (InterruptedException ie) {
1158                     timeout = INTERRUPTED;
1159                 }
1160                 stat = putStat;
1161                 if (timeout < 0L)
1162                     Thread.currentThread().interrupt();
1163             }
1164             return stat;
1165         }
1166 
1167         /**
1168          * Tries helping for FJ submitter.
1169          */
1170         private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1171             int stat = 0;
1172             if (w != null) {
1173                 ForkJoinTask<?> t;
1174                 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1175                     if ((stat = offer(item)) != 0 || !w.tryUnpush(t))
1176                         break;
1177                     ((ConsumerTask<?>)t).consumer.consume();
1178                 }
1179             }
1180             return stat;
1181         }
1182 
1183         /**
1184          * Tries helping for non-FJ submitter.
1185          */
1186         private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1187             int stat = 0;
1188             if (w != null) {
1189                 ForkJoinTask<?> t;
1190                 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1191                     if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t))
1192                         break;
1193                     ((ConsumerTask<?>)t).consumer.consume();
1194                 }
1195             }
1196             return stat;
1197         }
1198 
1199         /**
1200          * Timeout version; similar to submit.
1201          */
1202         final int timedOffer(T item, long nanos) {
1203             int stat; Executor e;
1204             if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1205                 ((e = executor) instanceof ForkJoinPool)) {
1206                 Thread thread = Thread.currentThread();
1207                 if (((thread instanceof ForkJoinWorkerThread) &&
1208                      ((ForkJoinWorkerThread)thread).getPool() == e) ||
1209                     e == ForkJoinPool.commonPool()) {
1210                     helpDepth = 1;
1211                     ForkJoinTask<?> t;
1212                     long deadline = System.nanoTime() + nanos;
1213                     while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
1214                            (t instanceof ConsumerTask)) {
1215                         if ((stat = offer(item)) != 0 ||
1216                             (nanos = deadline - System.nanoTime()) <= 0L ||
1217                             !t.tryUnfork())
1218                             break;
1219                         ((ConsumerTask<?>)t).consumer.consume();
1220                     }
1221                     helpDepth = 0;
1222                 }
1223             }
1224             if (stat == 0 && (stat = offer(item)) == 0 &&
1225                 (timeout = nanos) > 0L) {
1226                 putItem = item;
1227                 try {
1228                     ForkJoinPool.managedBlock(this);
1229                 } catch (InterruptedException ie) {
1230                     timeout = INTERRUPTED;
1231                 }
1232                 stat = putStat;
1233                 if (timeout < 0L)
1234                     Thread.currentThread().interrupt();
1235             }
1236             return stat;
1237         }
1238 
1239         /**
1240          * Tries to start consumer task after offer.
1241          * @return -1 if now disabled, else argument
1242          */
1243         private int startOnOffer(int stat) {
1244             for (;;) {
1245                 Executor e; int c;
1246                 if ((c = ctl) == DISABLED || (e = executor) == null) {
1247                     stat = -1;
1248                     break;
1249                 }
1250                 else if ((c & ACTIVE) != 0) { // ensure keep-alive
1251                     if ((c & CONSUME) != 0 ||
1252                         U.compareAndSwapInt(this, CTL, c,
1253                                             c | CONSUME))
1254                         break;
1255                 }
1256                 else if (demand == 0L || tail == head)
1257                     break;
1258                 else if (U.compareAndSwapInt(this, CTL, c,
1259                                              c | (ACTIVE | CONSUME))) {
1260                     try {
1261                         e.execute(new ConsumerTask<T>(this));
1262                         break;
1263                     } catch (RuntimeException | Error ex) { // back out
1264                         do {} while (((c = ctl) & DISABLED) == 0 &&
1265                                      (c & ACTIVE) != 0 &&
1266                                      !U.compareAndSwapInt(this, CTL, c,
1267                                                           c & ~ACTIVE));
1268                         throw ex;
1269                     }
1270                 }
1271             }
1272             return stat;
1273         }
1274 
1275         private void signalWaiter(Thread w) {
1276             waiter = null;
1277             LockSupport.unpark(w);    // release producer
1278         }
1279 
1280         /**
1281          * Nulls out most fields, mainly to avoid garbage retention
1282          * until publisher unsubscribes, but also to help cleanly stop
1283          * upon error by nulling required components.
1284          */
1285         private void detach() {
1286             Thread w = waiter;
1287             executor = null;
1288             subscriber = null;
1289             pendingError = null;
1290             signalWaiter(w);
1291         }
1292 
1293         /**
1294          * Issues error signal, asynchronously if a task is running,
1295          * else synchronously.
1296          */
1297         final void onError(Throwable ex) {
1298             for (int c;;) {
1299                 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1300                     break;
1301                 else if ((c & ACTIVE) != 0) {
1302                     pendingError = ex;
1303                     if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1304                         break; // cause consumer task to exit
1305                 }
1306                 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1307                     Flow.Subscriber<? super T> s = subscriber;
1308                     if (s != null && ex != null) {
1309                         try {
1310                             s.onError(ex);
1311                         } catch (Throwable ignore) {
1312                         }
1313                     }
1314                     detach();
1315                     break;
1316                 }
1317             }
1318         }
1319 
1320         /**
1321          * Tries to start consumer task upon a signal or request;
1322          * disables on failure.
1323          */
1324         private void startOrDisable() {
1325             Executor e;
1326             if ((e = executor) != null) { // skip if already disabled
1327                 try {
1328                     e.execute(new ConsumerTask<T>(this));
1329                 } catch (Throwable ex) {  // back out and force signal
1330                     for (int c;;) {
1331                         if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1332                             break;
1333                         if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1334                             onError(ex);
1335                             break;
1336                         }
1337                     }
1338                 }
1339             }
1340         }
1341 
1342         final void onComplete() {
1343             for (int c;;) {
1344                 if ((c = ctl) == DISABLED)
1345                     break;
1346                 if (U.compareAndSwapInt(this, CTL, c,
1347                                         c | (ACTIVE | CONSUME | COMPLETE))) {
1348                     if ((c & ACTIVE) == 0)
1349                         startOrDisable();
1350                     break;
1351                 }
1352             }
1353         }
1354 
1355         final void onSubscribe() {
1356             for (int c;;) {
1357                 if ((c = ctl) == DISABLED)
1358                     break;
1359                 if (U.compareAndSwapInt(this, CTL, c,
1360                                         c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1361                     if ((c & ACTIVE) == 0)
1362                         startOrDisable();
1363                     break;
1364                 }
1365             }
1366         }
1367 
1368         /**
1369          * Causes consumer task to exit if active (without reporting
1370          * onError unless there is already a pending error), and
1371          * disables.
1372          */
1373         public void cancel() {
1374             for (int c;;) {
1375                 if ((c = ctl) == DISABLED)
1376                     break;
1377                 else if ((c & ACTIVE) != 0) {
1378                     if (U.compareAndSwapInt(this, CTL, c,
1379                                             c | (CONSUME | ERROR)))
1380                         break;
1381                 }
1382                 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1383                     detach();
1384                     break;
1385                 }
1386             }
1387         }
1388 
1389         /**
1390          * Adds to demand and possibly starts task.
1391          */
1392         public void request(long n) {
1393             if (n > 0L) {
1394                 for (;;) {
1395                     long prev = demand, d;
1396                     if ((d = prev + n) < prev) // saturate
1397                         d = Long.MAX_VALUE;
1398                     if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1399                         for (int c, h;;) {
1400                             if ((c = ctl) == DISABLED)
1401                                 break;
1402                             else if ((c & ACTIVE) != 0) {
1403                                 if ((c & CONSUME) != 0 ||
1404                                     U.compareAndSwapInt(this, CTL, c,
1405                                                         c | CONSUME))
1406                                     break;
1407                             }
1408                             else if ((h = head) != tail) {
1409                                 if (U.compareAndSwapInt(this, CTL, c,
1410                                                         c | (ACTIVE|CONSUME))) {
1411                                     startOrDisable();
1412                                     break;
1413                                 }
1414                             }
1415                             else if (head == h && tail == h)
1416                                 break;          // else stale
1417                             if (demand == 0L)
1418                                 break;
1419                         }
1420                         break;
1421                     }
1422                 }
1423             }
1424             else if (n < 0L)
1425                 onError(new IllegalArgumentException(
1426                             "negative subscription request"));
1427         }
1428 
1429         public final boolean isReleasable() { // for ManagedBlocker
1430             T item = putItem;
1431             if (item != null) {
1432                 if ((putStat = offer(item)) == 0)
1433                     return false;
1434                 putItem = null;
1435             }
1436             return true;
1437         }
1438 
1439         public final boolean block() { // for ManagedBlocker
1440             T item = putItem;
1441             if (item != null) {
1442                 putItem = null;
1443                 long nanos = timeout;
1444                 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1445                 while ((putStat = offer(item)) == 0) {
1446                     if (Thread.interrupted()) {
1447                         timeout = INTERRUPTED;
1448                         if (nanos > 0L)
1449                             break;
1450                     }
1451                     else if (nanos > 0L &&
1452                              (nanos = deadline - System.nanoTime()) <= 0L)
1453                         break;
1454                     else if (waiter == null)
1455                         waiter = Thread.currentThread();
1456                     else {
1457                         if (nanos > 0L)
1458                             LockSupport.parkNanos(this, nanos);
1459                         else
1460                             LockSupport.park(this);
1461                         waiter = null;
1462                     }
1463                 }
1464             }
1465             waiter = null;
1466             return true;
1467         }
1468 
1469         /**
1470          * Consumer loop, called from ConsumerTask, or indirectly
1471          * when helping during submit.
1472          */
1473         final void consume() {
1474             Flow.Subscriber<? super T> s;
1475             int h = head;
1476             if ((s = subscriber) != null) {           // else disabled
1477                 for (;;) {
1478                     long d = demand;
1479                     int c; Object[] a; int n; long i; Object x; Thread w;
1480                     if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1481                         if (!checkControl(s, c))
1482                             break;
1483                     }
1484                     else if ((a = array) == null || h == tail ||
1485                              (n = a.length) == 0 ||
1486                              (x = U.getObjectVolatile
1487                               (a, (i = ((long)((n - 1) & h) << ASHIFT) + ABASE)))
1488                              == null) {
1489                         if (!checkEmpty(s, c))
1490                             break;
1491                     }
1492                     else if (d == 0L) {
1493                         if (!checkDemand(c))
1494                             break;
1495                     }
1496                     else if (((c & CONSUME) != 0 ||
1497                               U.compareAndSwapInt(this, CTL, c, c | CONSUME)) &&
1498                              U.compareAndSwapObject(a, i, x, null)) {
1499                         U.putOrderedInt(this, HEAD, ++h);
1500                         U.getAndAddLong(this, DEMAND, -1L);
1501                         if ((w = waiter) != null)
1502                             signalWaiter(w);
1503                         try {
1504                             @SuppressWarnings("unchecked") T y = (T) x;
1505                             s.onNext(y);
1506                         } catch (Throwable ex) {
1507                             handleOnNext(s, ex);
1508                         }
1509                     }
1510                 }
1511             }
1512         }
1513 
1514         /**
1515          * Responds to control events in consume().
1516          */
1517         private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1518             boolean stat = true;
1519             if ((c & ERROR) != 0) {
1520                 Throwable ex = pendingError;
1521                 ctl = DISABLED;           // no need for CAS
1522                 if (ex != null) {         // null if errorless cancel
1523                     try {
1524                         if (s != null)
1525                             s.onError(ex);
1526                     } catch (Throwable ignore) {
1527                     }
1528                 }
1529             }
1530             else if ((c & SUBSCRIBE) != 0) {
1531                 if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) {
1532                     try {
1533                         if (s != null)
1534                             s.onSubscribe(this);
1535                     } catch (Throwable ex) {
1536                         onError(ex);
1537                     }
1538                 }
1539             }
1540             else {
1541                 detach();
1542                 stat = false;
1543             }
1544             return stat;
1545         }
1546 
1547         /**
1548          * Responds to apparent emptiness in consume().
1549          */
1550         private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1551             boolean stat = true;
1552             if (head == tail) {
1553                 if ((c & CONSUME) != 0)
1554                     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1555                 else if ((c & COMPLETE) != 0) {
1556                     if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1557                         try {
1558                             if (s != null)
1559                                 s.onComplete();
1560                         } catch (Throwable ignore) {
1561                         }
1562                     }
1563                 }
1564                 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1565                     stat = false;
1566             }
1567             return stat;
1568         }
1569 
1570         /**
1571          * Responds to apparent zero demand in consume().
1572          */
1573         private boolean checkDemand(int c) {
1574             boolean stat = true;
1575             if (demand == 0L) {
1576                 if ((c & CONSUME) != 0)
1577                     U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1578                 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1579                     stat = false;
1580             }
1581             return stat;
1582         }
1583 
1584         /**
1585          * Processes exception in Subscriber.onNext.
1586          */
1587         private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1588             BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1589             if ((h = onNextHandler) != null) {
1590                 try {
1591                     h.accept(s, ex);
1592                 } catch (Throwable ignore) {
1593                 }
1594             }
1595             onError(ex);
1596         }
1597 
1598         // Unsafe mechanics
1599         private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
1600         private static final long CTL;
1601         private static final long TAIL;
1602         private static final long HEAD;
1603         private static final long DEMAND;
1604         private static final int ABASE;
1605         private static final int ASHIFT;
1606 
1607         static {
1608             try {
1609                 CTL = U.objectFieldOffset
1610                     (BufferedSubscription.class.getDeclaredField("ctl"));
1611                 TAIL = U.objectFieldOffset
1612                     (BufferedSubscription.class.getDeclaredField("tail"));
1613                 HEAD = U.objectFieldOffset
1614                     (BufferedSubscription.class.getDeclaredField("head"));
1615                 DEMAND = U.objectFieldOffset
1616                     (BufferedSubscription.class.getDeclaredField("demand"));
1617 
1618                 ABASE = U.arrayBaseOffset(Object[].class);
1619                 int scale = U.arrayIndexScale(Object[].class);
1620                 if ((scale & (scale - 1)) != 0)
1621                     throw new Error("data type scale not a power of two");
1622                 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1623             } catch (ReflectiveOperationException e) {
1624                 throw new Error(e);
1625             }
1626 
1627             // Reduce the risk of rare disastrous classloading in first call to
1628             // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1629             Class<?> ensureLoaded = LockSupport.class;
1630         }
1631     }
1632 }