< prev index next >

src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java

Print this page
8234131: Miscellaneous changes imported from jsr166 CVS 2021-01
Reviewed-by: martin


  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.invoke.MethodHandles;
  39 import java.lang.invoke.VarHandle;
  40 import java.util.ArrayList;
  41 import java.util.Arrays;
  42 import java.util.List;
  43 import java.util.concurrent.locks.LockSupport;

  44 import java.util.function.BiConsumer;
  45 import java.util.function.BiPredicate;
  46 import java.util.function.Consumer;
  47 import static java.util.concurrent.Flow.Publisher;
  48 import static java.util.concurrent.Flow.Subscriber;
  49 import static java.util.concurrent.Flow.Subscription;
  50 
  51 /**
  52  * A {@link Flow.Publisher} that asynchronously issues submitted
  53  * (non-null) items to current subscribers until it is closed.  Each
  54  * current subscriber receives newly submitted items in the same order
  55  * unless drops or exceptions are encountered.  Using a
  56  * SubmissionPublisher allows item generators to act as compliant <a
  57  * href="http://www.reactive-streams.org/"> reactive-streams</a>
  58  * Publishers relying on drop handling and/or blocking for flow
  59  * control.
  60  *
  61  * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
  62  * constructor for delivery to subscribers. The best choice of
  63  * Executor depends on expected usage. If the generator(s) of


 158  *   public void onSubscribe(Flow.Subscription subscription) {
 159  *     (this.subscription = subscription).request(1);
 160  *   }
 161  *   public void onNext(S item) {
 162  *     subscription.request(1);
 163  *     submit(function.apply(item));
 164  *   }
 165  *   public void onError(Throwable ex) { closeExceptionally(ex); }
 166  *   public void onComplete() { close(); }
 167  * }}</pre>
 168  *
 169  * @param <T> the published item type
 170  * @author Doug Lea
 171  * @since 9
 172  */
 173 public class SubmissionPublisher<T> implements Publisher<T>,
 174                                                AutoCloseable {
 175     /*
 176      * Most mechanics are handled by BufferedSubscription. This class
 177      * mainly tracks subscribers and ensures sequentiality, by using
 178      * built-in synchronization locks across public methods. Using
 179      * built-in locks works well in the most typical case in which
 180      * only one thread submits items. We extend this idea in
 181      * submission methods by detecting single-ownership to reduce
 182      * producer-consumer synchronization strength.
 183      */
 184 
 185     /** The largest possible power of two array size. */
 186     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
 187 
 188     /**
 189      * Initial buffer capacity used when maxBufferCapacity is
 190      * greater. Must be a power of two.
 191      */
 192     static final int INITIAL_CAPACITY = 32;
 193 
 194     /** Round capacity to power of 2, at most limit. */
 195     static final int roundCapacity(int cap) {
 196         int n = cap - 1;
 197         n |= n >>> 1;
 198         n |= n >>> 2;
 199         n |= n >>> 4;
 200         n |= n >>> 8;
 201         n |= n >>> 16;
 202         return (n <= 0) ? 1 : // at least 1


 217     private static final class ThreadPerTaskExecutor implements Executor {
 218         ThreadPerTaskExecutor() {}      // prevent access constructor creation
 219         public void execute(Runnable r) { new Thread(r).start(); }
 220     }
 221 
 222     /**
 223      * Clients (BufferedSubscriptions) are maintained in a linked list
 224      * (via their "next" fields). This works well for publish loops.
 225      * It requires O(n) traversal to check for duplicate subscribers,
 226      * but we expect that subscribing is much less common than
 227      * publishing. Unsubscribing occurs only during traversal loops,
 228      * when BufferedSubscription methods return negative values
 229      * signifying that they have been closed.  To reduce
 230      * head-of-line blocking, submit and offer methods first call
 231      * BufferedSubscription.offer on each subscriber, and place
 232      * saturated ones in retries list (using nextRetry field), and
 233      * retry, possibly blocking or dropping.
 234      */
 235     BufferedSubscription<T> clients;
 236 


 237     /** Run status, updated only within locks */
 238     volatile boolean closed;
 239     /** Set true on first call to subscribe, to initialize possible owner */
 240     boolean subscribed;
 241     /** The first caller thread to subscribe, or null if thread ever changed */
 242     Thread owner;
 243     /** If non-null, the exception in closeExceptionally */
 244     volatile Throwable closedException;
 245 
 246     // Parameters for constructing BufferedSubscriptions
 247     final Executor executor;
 248     final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
 249     final int maxBufferCapacity;
 250 
 251     /**
 252      * Creates a new SubmissionPublisher using the given Executor for
 253      * async delivery to subscribers, with the given maximum buffer size
 254      * for each subscriber, and, if non-null, the given handler invoked
 255      * when any Subscriber throws an exception in method {@link
 256      * Flow.Subscriber#onNext(Object) onNext}.
 257      *
 258      * @param executor the executor to use for async delivery,
 259      * supporting creation of at least one independent thread
 260      * @param maxBufferCapacity the maximum capacity for each
 261      * subscriber's buffer (the enforced capacity may be rounded up to
 262      * the nearest power of two and/or bounded by the largest value
 263      * supported by this implementation; method {@link #getMaxBufferCapacity}
 264      * returns the actual value)
 265      * @param handler if non-null, procedure to invoke upon exception
 266      * thrown in method {@code onNext}
 267      * @throws NullPointerException if executor is null
 268      * @throws IllegalArgumentException if maxBufferCapacity not
 269      * positive
 270      */
 271     public SubmissionPublisher(Executor executor, int maxBufferCapacity,
 272                                BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
 273         if (executor == null)
 274             throw new NullPointerException();
 275         if (maxBufferCapacity <= 0)
 276             throw new IllegalArgumentException("capacity must be positive");

 277         this.executor = executor;
 278         this.onNextHandler = handler;
 279         this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
 280     }
 281 
 282     /**
 283      * Creates a new SubmissionPublisher using the given Executor for
 284      * async delivery to subscribers, with the given maximum buffer size
 285      * for each subscriber, and no handler for Subscriber exceptions in
 286      * method {@link Flow.Subscriber#onNext(Object) onNext}.
 287      *
 288      * @param executor the executor to use for async delivery,
 289      * supporting creation of at least one independent thread
 290      * @param maxBufferCapacity the maximum capacity for each
 291      * subscriber's buffer (the enforced capacity may be rounded up to
 292      * the nearest power of two and/or bounded by the largest value
 293      * supported by this implementation; method {@link #getMaxBufferCapacity}
 294      * returns the actual value)
 295      * @throws NullPointerException if executor is null
 296      * @throws IllegalArgumentException if maxBufferCapacity not


 320      * the existing subscription with an {@link IllegalStateException}.
 321      * Otherwise, upon success, the Subscriber's {@link
 322      * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
 323      * asynchronously with a new {@link Flow.Subscription}.  If {@link
 324      * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
 325      * subscription is cancelled. Otherwise, if this SubmissionPublisher
 326      * was closed exceptionally, then the subscriber's {@link
 327      * Flow.Subscriber#onError onError} method is invoked with the
 328      * corresponding exception, or if closed without exception, the
 329      * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
 330      * method is invoked.  Subscribers may enable receiving items by
 331      * invoking the {@link Flow.Subscription#request(long) request}
 332      * method of the new Subscription, and may unsubscribe by invoking
 333      * its {@link Flow.Subscription#cancel() cancel} method.
 334      *
 335      * @param subscriber the subscriber
 336      * @throws NullPointerException if subscriber is null
 337      */
 338     public void subscribe(Subscriber<? super T> subscriber) {
 339         if (subscriber == null) throw new NullPointerException();

 340         int max = maxBufferCapacity; // allocate initial array
 341         Object[] array = new Object[max < INITIAL_CAPACITY ?
 342                                     max : INITIAL_CAPACITY];
 343         BufferedSubscription<T> subscription =
 344             new BufferedSubscription<T>(subscriber, executor, onNextHandler,
 345                                         array, max);
 346         synchronized (this) {

 347             if (!subscribed) {
 348                 subscribed = true;
 349                 owner = Thread.currentThread();
 350             }
 351             for (BufferedSubscription<T> b = clients, pred = null;;) {
 352                 if (b == null) {
 353                     Throwable ex;
 354                     subscription.onSubscribe();
 355                     if ((ex = closedException) != null)
 356                         subscription.onError(ex);
 357                     else if (closed)
 358                         subscription.onComplete();
 359                     else if (pred == null)
 360                         clients = subscription;
 361                     else
 362                         pred.next = subscription;
 363                     break;
 364                 }
 365                 BufferedSubscription<T> next = b.next;
 366                 if (b.isClosed()) {   // remove
 367                     b.next = null;    // detach
 368                     if (pred == null)
 369                         clients = next;
 370                     else
 371                         pred.next = next;
 372                 }
 373                 else if (subscriber.equals(b.subscriber)) {
 374                     b.onError(new IllegalStateException("Duplicate subscribe"));
 375                     break;
 376                 }
 377                 else
 378                     pred = b;
 379                 b = next;
 380             }


 381         }
 382     }
 383 
 384     /**
 385      * Common implementation for all three forms of submit and offer.
 386      * Acts as submit if nanos == Long.MAX_VALUE, else offer.
 387      */
 388     private int doOffer(T item, long nanos,
 389                         BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
 390         if (item == null) throw new NullPointerException();
 391         int lag = 0;
 392         boolean complete, unowned;
 393         synchronized (this) {


 394             Thread t = Thread.currentThread(), o;
 395             BufferedSubscription<T> b = clients;
 396             if ((unowned = ((o = owner) != t)) && o != null)
 397                 owner = null;                     // disable bias
 398             if (b == null)
 399                 complete = closed;
 400             else {
 401                 complete = false;
 402                 boolean cleanMe = false;
 403                 BufferedSubscription<T> retries = null, rtail = null, next;
 404                 do {
 405                     next = b.next;
 406                     int stat = b.offer(item, unowned);
 407                     if (stat == 0) {              // saturated; add to retry list
 408                         b.nextRetry = null;       // avoid garbage on exceptions
 409                         if (rtail == null)
 410                             retries = b;
 411                         else
 412                             rtail.nextRetry = b;
 413                         rtail = b;
 414                     }
 415                     else if (stat < 0)            // closed
 416                         cleanMe = true;           // remove later
 417                     else if (stat > lag)
 418                         lag = stat;
 419                 } while ((b = next) != null);
 420 
 421                 if (retries != null || cleanMe)
 422                     lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
 423             }


 424         }
 425         if (complete)
 426             throw new IllegalStateException("Closed");
 427         else
 428             return lag;
 429     }
 430 
 431     /**
 432      * Helps, (timed) waits for, and/or drops buffers on list; returns
 433      * lag or negative drops (for use in offer).
 434      */
 435     private int retryOffer(T item, long nanos,
 436                            BiPredicate<Subscriber<? super T>, ? super T> onDrop,
 437                            BufferedSubscription<T> retries, int lag,
 438                            boolean cleanMe) {
 439         for (BufferedSubscription<T> r = retries; r != null;) {
 440             BufferedSubscription<T> nextRetry = r.nextRetry;
 441             r.nextRetry = null;
 442             if (nanos > 0L)
 443                 r.awaitSpace(nanos);


 592      * @throws IllegalStateException if closed
 593      * @throws NullPointerException if item is null
 594      * @throws RejectedExecutionException if thrown by Executor
 595      */
 596     public int offer(T item, long timeout, TimeUnit unit,
 597                      BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
 598         long nanos = unit.toNanos(timeout);
 599         // distinguishes from untimed (only wrt interrupt policy)
 600         if (nanos == Long.MAX_VALUE) --nanos;
 601         return doOffer(item, nanos, onDrop);
 602     }
 603 
 604     /**
 605      * Unless already closed, issues {@link
 606      * Flow.Subscriber#onComplete() onComplete} signals to current
 607      * subscribers, and disallows subsequent attempts to publish.
 608      * Upon return, this method does <em>NOT</em> guarantee that all
 609      * subscribers have yet completed.
 610      */
 611     public void close() {

 612         if (!closed) {
 613             BufferedSubscription<T> b;
 614             synchronized (this) {

 615                 // no need to re-check closed here
 616                 b = clients;
 617                 clients = null;
 618                 owner = null;
 619                 closed = true;


 620             }
 621             while (b != null) {
 622                 BufferedSubscription<T> next = b.next;
 623                 b.next = null;
 624                 b.onComplete();
 625                 b = next;
 626             }
 627         }
 628     }
 629 
 630     /**
 631      * Unless already closed, issues {@link
 632      * Flow.Subscriber#onError(Throwable) onError} signals to current
 633      * subscribers with the given error, and disallows subsequent
 634      * attempts to publish.  Future subscribers also receive the given
 635      * error. Upon return, this method does <em>NOT</em> guarantee
 636      * that all subscribers have yet completed.
 637      *
 638      * @param error the {@code onError} argument sent to subscribers
 639      * @throws NullPointerException if error is null
 640      */
 641     public void closeExceptionally(Throwable error) {
 642         if (error == null)
 643             throw new NullPointerException();

 644         if (!closed) {
 645             BufferedSubscription<T> b;
 646             synchronized (this) {

 647                 b = clients;
 648                 if (!closed) {  // don't clobber racing close
 649                     closedException = error;
 650                     clients = null;
 651                     owner = null;
 652                     closed = true;
 653                 }


 654             }
 655             while (b != null) {
 656                 BufferedSubscription<T> next = b.next;
 657                 b.next = null;
 658                 b.onError(error);
 659                 b = next;
 660             }
 661         }
 662     }
 663 
 664     /**
 665      * Returns true if this publisher is not accepting submissions.
 666      *
 667      * @return true if closed
 668      */
 669     public boolean isClosed() {
 670         return closed;
 671     }
 672 
 673     /**
 674      * Returns the exception associated with {@link
 675      * #closeExceptionally(Throwable) closeExceptionally}, or null if
 676      * not closed or if closed normally.
 677      *
 678      * @return the exception, or null if none
 679      */
 680     public Throwable getClosedException() {
 681         return closedException;
 682     }
 683 
 684     /**
 685      * Returns true if this publisher has any subscribers.
 686      *
 687      * @return true if this publisher has any subscribers
 688      */
 689     public boolean hasSubscribers() {
 690         boolean nonEmpty = false;
 691         synchronized (this) {


 692             for (BufferedSubscription<T> b = clients; b != null;) {
 693                 BufferedSubscription<T> next = b.next;
 694                 if (b.isClosed()) {
 695                     b.next = null;
 696                     b = clients = next;
 697                 }
 698                 else {
 699                     nonEmpty = true;
 700                     break;
 701                 }
 702             }


 703         }
 704         return nonEmpty;
 705     }
 706 
 707     /**
 708      * Returns the number of current subscribers.
 709      *
 710      * @return the number of current subscribers
 711      */
 712     public int getNumberOfSubscribers() {
 713         synchronized (this) {
 714             return cleanAndCount();





 715         }

 716     }
 717 
 718     /**
 719      * Returns the Executor used for asynchronous delivery.
 720      *
 721      * @return the Executor used for asynchronous delivery
 722      */
 723     public Executor getExecutor() {
 724         return executor;
 725     }
 726 
 727     /**
 728      * Returns the maximum per-subscriber buffer capacity.
 729      *
 730      * @return the maximum per-subscriber buffer capacity
 731      */
 732     public int getMaxBufferCapacity() {
 733         return maxBufferCapacity;
 734     }
 735 
 736     /**
 737      * Returns a list of current subscribers for monitoring and
 738      * tracking purposes, not for invoking {@link Flow.Subscriber}
 739      * methods on the subscribers.
 740      *
 741      * @return list of current subscribers
 742      */
 743     public List<Subscriber<? super T>> getSubscribers() {
 744         ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
 745         synchronized (this) {


 746             BufferedSubscription<T> pred = null, next;
 747             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 748                 next = b.next;
 749                 if (b.isClosed()) {
 750                     b.next = null;
 751                     if (pred == null)
 752                         clients = next;
 753                     else
 754                         pred.next = next;
 755                 }
 756                 else {
 757                     subs.add(b.subscriber);
 758                     pred = b;
 759                 }
 760             }


 761         }
 762         return subs;
 763     }
 764 
 765     /**
 766      * Returns true if the given Subscriber is currently subscribed.
 767      *
 768      * @param subscriber the subscriber
 769      * @return true if currently subscribed
 770      * @throws NullPointerException if subscriber is null
 771      */
 772     public boolean isSubscribed(Subscriber<? super T> subscriber) {
 773         if (subscriber == null) throw new NullPointerException();


 774         if (!closed) {
 775             synchronized (this) {

 776                 BufferedSubscription<T> pred = null, next;
 777                 for (BufferedSubscription<T> b = clients; b != null; b = next) {
 778                     next = b.next;
 779                     if (b.isClosed()) {
 780                         b.next = null;
 781                         if (pred == null)
 782                             clients = next;
 783                         else
 784                             pred.next = next;
 785                     }
 786                     else if (subscriber.equals(b.subscriber))
 787                         return true;
 788                     else
 789                         pred = b;
 790                 }


 791             }
 792         }
 793         return false;
 794     }
 795 
 796     /**
 797      * Returns an estimate of the minimum number of items requested
 798      * (via {@link Flow.Subscription#request(long) request}) but not
 799      * yet produced, among all current subscribers.
 800      *
 801      * @return the estimate, or zero if no subscribers
 802      */
 803     public long estimateMinimumDemand() {
 804         long min = Long.MAX_VALUE;
 805         boolean nonEmpty = false;
 806         synchronized (this) {


 807             BufferedSubscription<T> pred = null, next;
 808             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 809                 int n; long d;
 810                 next = b.next;
 811                 if ((n = b.estimateLag()) < 0) {
 812                     b.next = null;
 813                     if (pred == null)
 814                         clients = next;
 815                     else
 816                         pred.next = next;
 817                 }
 818                 else {
 819                     if ((d = b.demand - n) < min)
 820                         min = d;
 821                     nonEmpty = true;
 822                     pred = b;
 823                 }
 824             }


 825         }
 826         return nonEmpty ? min : 0;
 827     }
 828 
 829     /**
 830      * Returns an estimate of the maximum number of items produced but
 831      * not yet consumed among all current subscribers.
 832      *
 833      * @return the estimate
 834      */
 835     public int estimateMaximumLag() {
 836         int max = 0;
 837         synchronized (this) {


 838             BufferedSubscription<T> pred = null, next;
 839             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 840                 int n;
 841                 next = b.next;
 842                 if ((n = b.estimateLag()) < 0) {
 843                     b.next = null;
 844                     if (pred == null)
 845                         clients = next;
 846                     else
 847                         pred.next = next;
 848                 }
 849                 else {
 850                     if (n > max)
 851                         max = n;
 852                     pred = b;
 853                 }
 854             }


 855         }
 856         return max;
 857     }
 858 
 859     /**
 860      * Processes all published items using the given Consumer function.
 861      * Returns a CompletableFuture that is completed normally when this
 862      * publisher signals {@link Flow.Subscriber#onComplete()
 863      * onComplete}, or completed exceptionally upon any error, or an
 864      * exception is thrown by the Consumer, or the returned
 865      * CompletableFuture is cancelled, in which case no further items
 866      * are processed.
 867      *
 868      * @param consumer the function applied to each onNext item
 869      * @return a CompletableFuture that is completed normally
 870      * when the publisher signals onComplete, and exceptionally
 871      * upon any error or cancellation
 872      * @throws NullPointerException if consumer is null
 873      */
 874     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {




  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.invoke.MethodHandles;
  39 import java.lang.invoke.VarHandle;
  40 import java.util.ArrayList;
  41 import java.util.Arrays;
  42 import java.util.List;
  43 import java.util.concurrent.locks.LockSupport;
  44 import java.util.concurrent.locks.ReentrantLock;
  45 import java.util.function.BiConsumer;
  46 import java.util.function.BiPredicate;
  47 import java.util.function.Consumer;
  48 import static java.util.concurrent.Flow.Publisher;
  49 import static java.util.concurrent.Flow.Subscriber;
  50 import static java.util.concurrent.Flow.Subscription;
  51 
  52 /**
  53  * A {@link Flow.Publisher} that asynchronously issues submitted
  54  * (non-null) items to current subscribers until it is closed.  Each
  55  * current subscriber receives newly submitted items in the same order
  56  * unless drops or exceptions are encountered.  Using a
  57  * SubmissionPublisher allows item generators to act as compliant <a
  58  * href="http://www.reactive-streams.org/"> reactive-streams</a>
  59  * Publishers relying on drop handling and/or blocking for flow
  60  * control.
  61  *
  62  * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
  63  * constructor for delivery to subscribers. The best choice of
  64  * Executor depends on expected usage. If the generator(s) of


 159  *   public void onSubscribe(Flow.Subscription subscription) {
 160  *     (this.subscription = subscription).request(1);
 161  *   }
 162  *   public void onNext(S item) {
 163  *     subscription.request(1);
 164  *     submit(function.apply(item));
 165  *   }
 166  *   public void onError(Throwable ex) { closeExceptionally(ex); }
 167  *   public void onComplete() { close(); }
 168  * }}</pre>
 169  *
 170  * @param <T> the published item type
 171  * @author Doug Lea
 172  * @since 9
 173  */
 174 public class SubmissionPublisher<T> implements Publisher<T>,
 175                                                AutoCloseable {
 176     /*
 177      * Most mechanics are handled by BufferedSubscription. This class
 178      * mainly tracks subscribers and ensures sequentiality, by using
 179      * locks across public methods, to ensure thread-safety in the
 180      * presence of multiple sources and maintain acquire-release
 181      * ordering around user operations. However, we also track whether
 182      * there is only a single source, and if so streamline some buffer
 183      * operations by avoiding some atomics.
 184      */
 185 
 186     /** The largest possible power of two array size. */
 187     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
 188 
 189     /**
 190      * Initial buffer capacity used when maxBufferCapacity is
 191      * greater. Must be a power of two.
 192      */
 193     static final int INITIAL_CAPACITY = 32;
 194 
 195     /** Round capacity to power of 2, at most limit. */
 196     static final int roundCapacity(int cap) {
 197         int n = cap - 1;
 198         n |= n >>> 1;
 199         n |= n >>> 2;
 200         n |= n >>> 4;
 201         n |= n >>> 8;
 202         n |= n >>> 16;
 203         return (n <= 0) ? 1 : // at least 1


 218     private static final class ThreadPerTaskExecutor implements Executor {
 219         ThreadPerTaskExecutor() {}      // prevent access constructor creation
 220         public void execute(Runnable r) { new Thread(r).start(); }
 221     }
 222 
 223     /**
 224      * Clients (BufferedSubscriptions) are maintained in a linked list
 225      * (via their "next" fields). This works well for publish loops.
 226      * It requires O(n) traversal to check for duplicate subscribers,
 227      * but we expect that subscribing is much less common than
 228      * publishing. Unsubscribing occurs only during traversal loops,
 229      * when BufferedSubscription methods return negative values
 230      * signifying that they have been closed.  To reduce
 231      * head-of-line blocking, submit and offer methods first call
 232      * BufferedSubscription.offer on each subscriber, and place
 233      * saturated ones in retries list (using nextRetry field), and
 234      * retry, possibly blocking or dropping.
 235      */
 236     BufferedSubscription<T> clients;
 237 
 238     /** Lock for exclusion across multiple sources */
 239     final ReentrantLock lock;
 240     /** Run status, updated only within locks */
 241     volatile boolean closed;
 242     /** Set true on first call to subscribe, to initialize possible owner */
 243     boolean subscribed;
 244     /** The first caller thread to subscribe, or null if thread ever changed */
 245     Thread owner;
 246     /** If non-null, the exception in closeExceptionally */
 247     volatile Throwable closedException;
 248 
 249     // Parameters for constructing BufferedSubscriptions
 250     final Executor executor;
 251     final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
 252     final int maxBufferCapacity;
 253 
 254     /**
 255      * Creates a new SubmissionPublisher using the given Executor for
 256      * async delivery to subscribers, with the given maximum buffer size
 257      * for each subscriber, and, if non-null, the given handler invoked
 258      * when any Subscriber throws an exception in method {@link
 259      * Flow.Subscriber#onNext(Object) onNext}.
 260      *
 261      * @param executor the executor to use for async delivery,
 262      * supporting creation of at least one independent thread
 263      * @param maxBufferCapacity the maximum capacity for each
 264      * subscriber's buffer (the enforced capacity may be rounded up to
 265      * the nearest power of two and/or bounded by the largest value
 266      * supported by this implementation; method {@link #getMaxBufferCapacity}
 267      * returns the actual value)
 268      * @param handler if non-null, procedure to invoke upon exception
 269      * thrown in method {@code onNext}
 270      * @throws NullPointerException if executor is null
 271      * @throws IllegalArgumentException if maxBufferCapacity not
 272      * positive
 273      */
 274     public SubmissionPublisher(Executor executor, int maxBufferCapacity,
 275                                BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
 276         if (executor == null)
 277             throw new NullPointerException();
 278         if (maxBufferCapacity <= 0)
 279             throw new IllegalArgumentException("capacity must be positive");
 280         this.lock = new ReentrantLock();
 281         this.executor = executor;
 282         this.onNextHandler = handler;
 283         this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
 284     }
 285 
 286     /**
 287      * Creates a new SubmissionPublisher using the given Executor for
 288      * async delivery to subscribers, with the given maximum buffer size
 289      * for each subscriber, and no handler for Subscriber exceptions in
 290      * method {@link Flow.Subscriber#onNext(Object) onNext}.
 291      *
 292      * @param executor the executor to use for async delivery,
 293      * supporting creation of at least one independent thread
 294      * @param maxBufferCapacity the maximum capacity for each
 295      * subscriber's buffer (the enforced capacity may be rounded up to
 296      * the nearest power of two and/or bounded by the largest value
 297      * supported by this implementation; method {@link #getMaxBufferCapacity}
 298      * returns the actual value)
 299      * @throws NullPointerException if executor is null
 300      * @throws IllegalArgumentException if maxBufferCapacity not


 324      * the existing subscription with an {@link IllegalStateException}.
 325      * Otherwise, upon success, the Subscriber's {@link
 326      * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
 327      * asynchronously with a new {@link Flow.Subscription}.  If {@link
 328      * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
 329      * subscription is cancelled. Otherwise, if this SubmissionPublisher
 330      * was closed exceptionally, then the subscriber's {@link
 331      * Flow.Subscriber#onError onError} method is invoked with the
 332      * corresponding exception, or if closed without exception, the
 333      * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
 334      * method is invoked.  Subscribers may enable receiving items by
 335      * invoking the {@link Flow.Subscription#request(long) request}
 336      * method of the new Subscription, and may unsubscribe by invoking
 337      * its {@link Flow.Subscription#cancel() cancel} method.
 338      *
 339      * @param subscriber the subscriber
 340      * @throws NullPointerException if subscriber is null
 341      */
 342     public void subscribe(Subscriber<? super T> subscriber) {
 343         if (subscriber == null) throw new NullPointerException();
 344         ReentrantLock lock = this.lock;
 345         int max = maxBufferCapacity; // allocate initial array
 346         Object[] array = new Object[max < INITIAL_CAPACITY ?
 347                                     max : INITIAL_CAPACITY];
 348         BufferedSubscription<T> subscription =
 349             new BufferedSubscription<T>(subscriber, executor, onNextHandler,
 350                                         array, max);
 351         lock.lock();
 352         try {
 353             if (!subscribed) {
 354                 subscribed = true;
 355                 owner = Thread.currentThread();
 356             }
 357             for (BufferedSubscription<T> b = clients, pred = null;;) {
 358                 if (b == null) {
 359                     Throwable ex;
 360                     subscription.onSubscribe();
 361                     if ((ex = closedException) != null)
 362                         subscription.onError(ex);
 363                     else if (closed)
 364                         subscription.onComplete();
 365                     else if (pred == null)
 366                         clients = subscription;
 367                     else
 368                         pred.next = subscription;
 369                     break;
 370                 }
 371                 BufferedSubscription<T> next = b.next;
 372                 if (b.isClosed()) {   // remove
 373                     b.next = null;    // detach
 374                     if (pred == null)
 375                         clients = next;
 376                     else
 377                         pred.next = next;
 378                 }
 379                 else if (subscriber.equals(b.subscriber)) {
 380                     b.onError(new IllegalStateException("Duplicate subscribe"));
 381                     break;
 382                 }
 383                 else
 384                     pred = b;
 385                 b = next;
 386             }
 387         } finally {
 388             lock.unlock();
 389         }
 390     }
 391 
 392     /**
 393      * Common implementation for all three forms of submit and offer.
 394      * Acts as submit if nanos == Long.MAX_VALUE, else offer.
 395      */
 396     private int doOffer(T item, long nanos,
 397                         BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
 398         if (item == null) throw new NullPointerException();
 399         int lag = 0;
 400         boolean complete, unowned;
 401         ReentrantLock lock = this.lock;
 402         lock.lock();
 403         try {
 404             Thread t = Thread.currentThread(), o;
 405             BufferedSubscription<T> b = clients;
 406             if ((unowned = ((o = owner) != t)) && o != null)
 407                 owner = null;                     // disable bias
 408             if (b == null)
 409                 complete = closed;
 410             else {
 411                 complete = false;
 412                 boolean cleanMe = false;
 413                 BufferedSubscription<T> retries = null, rtail = null, next;
 414                 do {
 415                     next = b.next;
 416                     int stat = b.offer(item, unowned);
 417                     if (stat == 0) {              // saturated; add to retry list
 418                         b.nextRetry = null;       // avoid garbage on exceptions
 419                         if (rtail == null)
 420                             retries = b;
 421                         else
 422                             rtail.nextRetry = b;
 423                         rtail = b;
 424                     }
 425                     else if (stat < 0)            // closed
 426                         cleanMe = true;           // remove later
 427                     else if (stat > lag)
 428                         lag = stat;
 429                 } while ((b = next) != null);
 430 
 431                 if (retries != null || cleanMe)
 432                     lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
 433             }
 434         } finally {
 435             lock.unlock();
 436         }
 437         if (complete)
 438             throw new IllegalStateException("Closed");
 439         else
 440             return lag;
 441     }
 442 
 443     /**
 444      * Helps, (timed) waits for, and/or drops buffers on list; returns
 445      * lag or negative drops (for use in offer).
 446      */
 447     private int retryOffer(T item, long nanos,
 448                            BiPredicate<Subscriber<? super T>, ? super T> onDrop,
 449                            BufferedSubscription<T> retries, int lag,
 450                            boolean cleanMe) {
 451         for (BufferedSubscription<T> r = retries; r != null;) {
 452             BufferedSubscription<T> nextRetry = r.nextRetry;
 453             r.nextRetry = null;
 454             if (nanos > 0L)
 455                 r.awaitSpace(nanos);


 604      * @throws IllegalStateException if closed
 605      * @throws NullPointerException if item is null
 606      * @throws RejectedExecutionException if thrown by Executor
 607      */
 608     public int offer(T item, long timeout, TimeUnit unit,
 609                      BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
 610         long nanos = unit.toNanos(timeout);
 611         // distinguishes from untimed (only wrt interrupt policy)
 612         if (nanos == Long.MAX_VALUE) --nanos;
 613         return doOffer(item, nanos, onDrop);
 614     }
 615 
 616     /**
 617      * Unless already closed, issues {@link
 618      * Flow.Subscriber#onComplete() onComplete} signals to current
 619      * subscribers, and disallows subsequent attempts to publish.
 620      * Upon return, this method does <em>NOT</em> guarantee that all
 621      * subscribers have yet completed.
 622      */
 623     public void close() {
 624         ReentrantLock lock = this.lock;
 625         if (!closed) {
 626             BufferedSubscription<T> b;
 627             lock.lock();
 628             try {
 629                 // no need to re-check closed here
 630                 b = clients;
 631                 clients = null;
 632                 owner = null;
 633                 closed = true;
 634             } finally {
 635                 lock.unlock();
 636             }
 637             while (b != null) {
 638                 BufferedSubscription<T> next = b.next;
 639                 b.next = null;
 640                 b.onComplete();
 641                 b = next;
 642             }
 643         }
 644     }
 645 
 646     /**
 647      * Unless already closed, issues {@link
 648      * Flow.Subscriber#onError(Throwable) onError} signals to current
 649      * subscribers with the given error, and disallows subsequent
 650      * attempts to publish.  Future subscribers also receive the given
 651      * error. Upon return, this method does <em>NOT</em> guarantee
 652      * that all subscribers have yet completed.
 653      *
 654      * @param error the {@code onError} argument sent to subscribers
 655      * @throws NullPointerException if error is null
 656      */
 657     public void closeExceptionally(Throwable error) {
 658         if (error == null)
 659             throw new NullPointerException();
 660         ReentrantLock lock = this.lock;
 661         if (!closed) {
 662             BufferedSubscription<T> b;
 663             lock.lock();
 664             try {
 665                 b = clients;
 666                 if (!closed) {  // don't clobber racing close
 667                     closedException = error;
 668                     clients = null;
 669                     owner = null;
 670                     closed = true;
 671                 }
 672             } finally {
 673                 lock.unlock();
 674             }
 675             while (b != null) {
 676                 BufferedSubscription<T> next = b.next;
 677                 b.next = null;
 678                 b.onError(error);
 679                 b = next;
 680             }
 681         }
 682     }
 683 
 684     /**
 685      * Returns true if this publisher is not accepting submissions.
 686      *
 687      * @return true if closed
 688      */
 689     public boolean isClosed() {
 690         return closed;
 691     }
 692 
 693     /**
 694      * Returns the exception associated with {@link
 695      * #closeExceptionally(Throwable) closeExceptionally}, or null if
 696      * not closed or if closed normally.
 697      *
 698      * @return the exception, or null if none
 699      */
 700     public Throwable getClosedException() {
 701         return closedException;
 702     }
 703 
 704     /**
 705      * Returns true if this publisher has any subscribers.
 706      *
 707      * @return true if this publisher has any subscribers
 708      */
 709     public boolean hasSubscribers() {
 710         boolean nonEmpty = false;
 711         ReentrantLock lock = this.lock;
 712         lock.lock();
 713         try {
 714             for (BufferedSubscription<T> b = clients; b != null;) {
 715                 BufferedSubscription<T> next = b.next;
 716                 if (b.isClosed()) {
 717                     b.next = null;
 718                     b = clients = next;
 719                 }
 720                 else {
 721                     nonEmpty = true;
 722                     break;
 723                 }
 724             }
 725         } finally {
 726             lock.unlock();
 727         }
 728         return nonEmpty;
 729     }
 730 
 731     /**
 732      * Returns the number of current subscribers.
 733      *
 734      * @return the number of current subscribers
 735      */
 736     public int getNumberOfSubscribers() {
 737         int n;
 738         ReentrantLock lock = this.lock;
 739         lock.lock();
 740         try {
 741             n = cleanAndCount();
 742         } finally {
 743             lock.unlock();
 744         }
 745         return n;
 746     }
 747 
 748     /**
 749      * Returns the Executor used for asynchronous delivery.
 750      *
 751      * @return the Executor used for asynchronous delivery
 752      */
 753     public Executor getExecutor() {
 754         return executor;
 755     }
 756 
 757     /**
 758      * Returns the maximum per-subscriber buffer capacity.
 759      *
 760      * @return the maximum per-subscriber buffer capacity
 761      */
 762     public int getMaxBufferCapacity() {
 763         return maxBufferCapacity;
 764     }
 765 
 766     /**
 767      * Returns a list of current subscribers for monitoring and
 768      * tracking purposes, not for invoking {@link Flow.Subscriber}
 769      * methods on the subscribers.
 770      *
 771      * @return list of current subscribers
 772      */
 773     public List<Subscriber<? super T>> getSubscribers() {
 774         ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
 775         ReentrantLock lock = this.lock;
 776         lock.lock();
 777         try {
 778             BufferedSubscription<T> pred = null, next;
 779             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 780                 next = b.next;
 781                 if (b.isClosed()) {
 782                     b.next = null;
 783                     if (pred == null)
 784                         clients = next;
 785                     else
 786                         pred.next = next;
 787                 }
 788                 else {
 789                     subs.add(b.subscriber);
 790                     pred = b;
 791                 }
 792             }
 793         } finally {
 794             lock.unlock();
 795         }
 796         return subs;
 797     }
 798 
 799     /**
 800      * Returns true if the given Subscriber is currently subscribed.
 801      *
 802      * @param subscriber the subscriber
 803      * @return true if currently subscribed
 804      * @throws NullPointerException if subscriber is null
 805      */
 806     public boolean isSubscribed(Subscriber<? super T> subscriber) {
 807         if (subscriber == null) throw new NullPointerException();
 808         boolean subscribed = false;
 809         ReentrantLock lock = this.lock;
 810         if (!closed) {
 811             lock.lock();
 812             try {
 813                 BufferedSubscription<T> pred = null, next;
 814                 for (BufferedSubscription<T> b = clients; b != null; b = next) {
 815                     next = b.next;
 816                     if (b.isClosed()) {
 817                         b.next = null;
 818                         if (pred == null)
 819                             clients = next;
 820                         else
 821                             pred.next = next;
 822                     }
 823                     else if (subscribed = subscriber.equals(b.subscriber))
 824                         break;
 825                     else
 826                         pred = b;
 827                 }
 828             } finally {
 829                 lock.unlock();
 830             }
 831         }
 832         return subscribed;
 833     }
 834 
 835     /**
 836      * Returns an estimate of the minimum number of items requested
 837      * (via {@link Flow.Subscription#request(long) request}) but not
 838      * yet produced, among all current subscribers.
 839      *
 840      * @return the estimate, or zero if no subscribers
 841      */
 842     public long estimateMinimumDemand() {
 843         long min = Long.MAX_VALUE;
 844         boolean nonEmpty = false;
 845         ReentrantLock lock = this.lock;
 846         lock.lock();
 847         try {
 848             BufferedSubscription<T> pred = null, next;
 849             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 850                 int n; long d;
 851                 next = b.next;
 852                 if ((n = b.estimateLag()) < 0) {
 853                     b.next = null;
 854                     if (pred == null)
 855                         clients = next;
 856                     else
 857                         pred.next = next;
 858                 }
 859                 else {
 860                     if ((d = b.demand - n) < min)
 861                         min = d;
 862                     nonEmpty = true;
 863                     pred = b;
 864                 }
 865             }
 866         } finally {
 867             lock.unlock();
 868         }
 869         return nonEmpty ? min : 0;
 870     }
 871 
 872     /**
 873      * Returns an estimate of the maximum number of items produced but
 874      * not yet consumed among all current subscribers.
 875      *
 876      * @return the estimate
 877      */
 878     public int estimateMaximumLag() {
 879         int max = 0;
 880         ReentrantLock lock = this.lock;
 881         lock.lock();
 882         try {
 883             BufferedSubscription<T> pred = null, next;
 884             for (BufferedSubscription<T> b = clients; b != null; b = next) {
 885                 int n;
 886                 next = b.next;
 887                 if ((n = b.estimateLag()) < 0) {
 888                     b.next = null;
 889                     if (pred == null)
 890                         clients = next;
 891                     else
 892                         pred.next = next;
 893                 }
 894                 else {
 895                     if (n > max)
 896                         max = n;
 897                     pred = b;
 898                 }
 899             }
 900         } finally {
 901             lock.unlock();
 902         }
 903         return max;
 904     }
 905 
 906     /**
 907      * Processes all published items using the given Consumer function.
 908      * Returns a CompletableFuture that is completed normally when this
 909      * publisher signals {@link Flow.Subscriber#onComplete()
 910      * onComplete}, or completed exceptionally upon any error, or an
 911      * exception is thrown by the Consumer, or the returned
 912      * CompletableFuture is cancelled, in which case no further items
 913      * are processed.
 914      *
 915      * @param consumer the function applied to each onNext item
 916      * @return a CompletableFuture that is completed normally
 917      * when the publisher signals onComplete, and exceptionally
 918      * upon any error or cancellation
 919      * @throws NullPointerException if consumer is null
 920      */
 921     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {


< prev index next >