24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.List;
43 import java.util.concurrent.locks.LockSupport;
44 import java.util.function.BiConsumer;
45 import java.util.function.BiPredicate;
46 import java.util.function.Consumer;
47 import static java.util.concurrent.Flow.Publisher;
48 import static java.util.concurrent.Flow.Subscriber;
49 import static java.util.concurrent.Flow.Subscription;
50
51 /**
52 * A {@link Flow.Publisher} that asynchronously issues submitted
53 * (non-null) items to current subscribers until it is closed. Each
54 * current subscriber receives newly submitted items in the same order
55 * unless drops or exceptions are encountered. Using a
56 * SubmissionPublisher allows item generators to act as compliant <a
57 * href="http://www.reactive-streams.org/"> reactive-streams</a>
58 * Publishers relying on drop handling and/or blocking for flow
59 * control.
60 *
61 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
62 * constructor for delivery to subscribers. The best choice of
63 * Executor depends on expected usage. If the generator(s) of
158 * public void onSubscribe(Flow.Subscription subscription) {
159 * (this.subscription = subscription).request(1);
160 * }
161 * public void onNext(S item) {
162 * subscription.request(1);
163 * submit(function.apply(item));
164 * }
165 * public void onError(Throwable ex) { closeExceptionally(ex); }
166 * public void onComplete() { close(); }
167 * }}</pre>
168 *
169 * @param <T> the published item type
170 * @author Doug Lea
171 * @since 9
172 */
173 public class SubmissionPublisher<T> implements Publisher<T>,
174 AutoCloseable {
175 /*
176 * Most mechanics are handled by BufferedSubscription. This class
177 * mainly tracks subscribers and ensures sequentiality, by using
178 * built-in synchronization locks across public methods. Using
179 * built-in locks works well in the most typical case in which
180 * only one thread submits items. We extend this idea in
181 * submission methods by detecting single-ownership to reduce
182 * producer-consumer synchronization strength.
183 */
184
185 /** The largest possible power of two array size. */
186 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
187
188 /**
189 * Initial buffer capacity used when maxBufferCapacity is
190 * greater. Must be a power of two.
191 */
192 static final int INITIAL_CAPACITY = 32;
193
194 /** Round capacity to power of 2, at most limit. */
195 static final int roundCapacity(int cap) {
196 int n = cap - 1;
197 n |= n >>> 1;
198 n |= n >>> 2;
199 n |= n >>> 4;
200 n |= n >>> 8;
201 n |= n >>> 16;
202 return (n <= 0) ? 1 : // at least 1
217 private static final class ThreadPerTaskExecutor implements Executor {
218 ThreadPerTaskExecutor() {} // prevent access constructor creation
219 public void execute(Runnable r) { new Thread(r).start(); }
220 }
221
222 /**
223 * Clients (BufferedSubscriptions) are maintained in a linked list
224 * (via their "next" fields). This works well for publish loops.
225 * It requires O(n) traversal to check for duplicate subscribers,
226 * but we expect that subscribing is much less common than
227 * publishing. Unsubscribing occurs only during traversal loops,
228 * when BufferedSubscription methods return negative values
229 * signifying that they have been closed. To reduce
230 * head-of-line blocking, submit and offer methods first call
231 * BufferedSubscription.offer on each subscriber, and place
232 * saturated ones in retries list (using nextRetry field), and
233 * retry, possibly blocking or dropping.
234 */
235 BufferedSubscription<T> clients;
236
237 /** Run status, updated only within locks */
238 volatile boolean closed;
239 /** Set true on first call to subscribe, to initialize possible owner */
240 boolean subscribed;
241 /** The first caller thread to subscribe, or null if thread ever changed */
242 Thread owner;
243 /** If non-null, the exception in closeExceptionally */
244 volatile Throwable closedException;
245
246 // Parameters for constructing BufferedSubscriptions
247 final Executor executor;
248 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
249 final int maxBufferCapacity;
250
251 /**
252 * Creates a new SubmissionPublisher using the given Executor for
253 * async delivery to subscribers, with the given maximum buffer size
254 * for each subscriber, and, if non-null, the given handler invoked
255 * when any Subscriber throws an exception in method {@link
256 * Flow.Subscriber#onNext(Object) onNext}.
257 *
258 * @param executor the executor to use for async delivery,
259 * supporting creation of at least one independent thread
260 * @param maxBufferCapacity the maximum capacity for each
261 * subscriber's buffer (the enforced capacity may be rounded up to
262 * the nearest power of two and/or bounded by the largest value
263 * supported by this implementation; method {@link #getMaxBufferCapacity}
264 * returns the actual value)
265 * @param handler if non-null, procedure to invoke upon exception
266 * thrown in method {@code onNext}
267 * @throws NullPointerException if executor is null
268 * @throws IllegalArgumentException if maxBufferCapacity not
269 * positive
270 */
271 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
272 BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
273 if (executor == null)
274 throw new NullPointerException();
275 if (maxBufferCapacity <= 0)
276 throw new IllegalArgumentException("capacity must be positive");
277 this.executor = executor;
278 this.onNextHandler = handler;
279 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
280 }
281
282 /**
283 * Creates a new SubmissionPublisher using the given Executor for
284 * async delivery to subscribers, with the given maximum buffer size
285 * for each subscriber, and no handler for Subscriber exceptions in
286 * method {@link Flow.Subscriber#onNext(Object) onNext}.
287 *
288 * @param executor the executor to use for async delivery,
289 * supporting creation of at least one independent thread
290 * @param maxBufferCapacity the maximum capacity for each
291 * subscriber's buffer (the enforced capacity may be rounded up to
292 * the nearest power of two and/or bounded by the largest value
293 * supported by this implementation; method {@link #getMaxBufferCapacity}
294 * returns the actual value)
295 * @throws NullPointerException if executor is null
296 * @throws IllegalArgumentException if maxBufferCapacity not
320 * the existing subscription with an {@link IllegalStateException}.
321 * Otherwise, upon success, the Subscriber's {@link
322 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
323 * asynchronously with a new {@link Flow.Subscription}. If {@link
324 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
325 * subscription is cancelled. Otherwise, if this SubmissionPublisher
326 * was closed exceptionally, then the subscriber's {@link
327 * Flow.Subscriber#onError onError} method is invoked with the
328 * corresponding exception, or if closed without exception, the
329 * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
330 * method is invoked. Subscribers may enable receiving items by
331 * invoking the {@link Flow.Subscription#request(long) request}
332 * method of the new Subscription, and may unsubscribe by invoking
333 * its {@link Flow.Subscription#cancel() cancel} method.
334 *
335 * @param subscriber the subscriber
336 * @throws NullPointerException if subscriber is null
337 */
338 public void subscribe(Subscriber<? super T> subscriber) {
339 if (subscriber == null) throw new NullPointerException();
340 int max = maxBufferCapacity; // allocate initial array
341 Object[] array = new Object[max < INITIAL_CAPACITY ?
342 max : INITIAL_CAPACITY];
343 BufferedSubscription<T> subscription =
344 new BufferedSubscription<T>(subscriber, executor, onNextHandler,
345 array, max);
346 synchronized (this) {
347 if (!subscribed) {
348 subscribed = true;
349 owner = Thread.currentThread();
350 }
351 for (BufferedSubscription<T> b = clients, pred = null;;) {
352 if (b == null) {
353 Throwable ex;
354 subscription.onSubscribe();
355 if ((ex = closedException) != null)
356 subscription.onError(ex);
357 else if (closed)
358 subscription.onComplete();
359 else if (pred == null)
360 clients = subscription;
361 else
362 pred.next = subscription;
363 break;
364 }
365 BufferedSubscription<T> next = b.next;
366 if (b.isClosed()) { // remove
367 b.next = null; // detach
368 if (pred == null)
369 clients = next;
370 else
371 pred.next = next;
372 }
373 else if (subscriber.equals(b.subscriber)) {
374 b.onError(new IllegalStateException("Duplicate subscribe"));
375 break;
376 }
377 else
378 pred = b;
379 b = next;
380 }
381 }
382 }
383
384 /**
385 * Common implementation for all three forms of submit and offer.
386 * Acts as submit if nanos == Long.MAX_VALUE, else offer.
387 */
388 private int doOffer(T item, long nanos,
389 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
390 if (item == null) throw new NullPointerException();
391 int lag = 0;
392 boolean complete, unowned;
393 synchronized (this) {
394 Thread t = Thread.currentThread(), o;
395 BufferedSubscription<T> b = clients;
396 if ((unowned = ((o = owner) != t)) && o != null)
397 owner = null; // disable bias
398 if (b == null)
399 complete = closed;
400 else {
401 complete = false;
402 boolean cleanMe = false;
403 BufferedSubscription<T> retries = null, rtail = null, next;
404 do {
405 next = b.next;
406 int stat = b.offer(item, unowned);
407 if (stat == 0) { // saturated; add to retry list
408 b.nextRetry = null; // avoid garbage on exceptions
409 if (rtail == null)
410 retries = b;
411 else
412 rtail.nextRetry = b;
413 rtail = b;
414 }
415 else if (stat < 0) // closed
416 cleanMe = true; // remove later
417 else if (stat > lag)
418 lag = stat;
419 } while ((b = next) != null);
420
421 if (retries != null || cleanMe)
422 lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
423 }
424 }
425 if (complete)
426 throw new IllegalStateException("Closed");
427 else
428 return lag;
429 }
430
431 /**
432 * Helps, (timed) waits for, and/or drops buffers on list; returns
433 * lag or negative drops (for use in offer).
434 */
435 private int retryOffer(T item, long nanos,
436 BiPredicate<Subscriber<? super T>, ? super T> onDrop,
437 BufferedSubscription<T> retries, int lag,
438 boolean cleanMe) {
439 for (BufferedSubscription<T> r = retries; r != null;) {
440 BufferedSubscription<T> nextRetry = r.nextRetry;
441 r.nextRetry = null;
442 if (nanos > 0L)
443 r.awaitSpace(nanos);
592 * @throws IllegalStateException if closed
593 * @throws NullPointerException if item is null
594 * @throws RejectedExecutionException if thrown by Executor
595 */
596 public int offer(T item, long timeout, TimeUnit unit,
597 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
598 long nanos = unit.toNanos(timeout);
599 // distinguishes from untimed (only wrt interrupt policy)
600 if (nanos == Long.MAX_VALUE) --nanos;
601 return doOffer(item, nanos, onDrop);
602 }
603
604 /**
605 * Unless already closed, issues {@link
606 * Flow.Subscriber#onComplete() onComplete} signals to current
607 * subscribers, and disallows subsequent attempts to publish.
608 * Upon return, this method does <em>NOT</em> guarantee that all
609 * subscribers have yet completed.
610 */
611 public void close() {
612 if (!closed) {
613 BufferedSubscription<T> b;
614 synchronized (this) {
615 // no need to re-check closed here
616 b = clients;
617 clients = null;
618 owner = null;
619 closed = true;
620 }
621 while (b != null) {
622 BufferedSubscription<T> next = b.next;
623 b.next = null;
624 b.onComplete();
625 b = next;
626 }
627 }
628 }
629
630 /**
631 * Unless already closed, issues {@link
632 * Flow.Subscriber#onError(Throwable) onError} signals to current
633 * subscribers with the given error, and disallows subsequent
634 * attempts to publish. Future subscribers also receive the given
635 * error. Upon return, this method does <em>NOT</em> guarantee
636 * that all subscribers have yet completed.
637 *
638 * @param error the {@code onError} argument sent to subscribers
639 * @throws NullPointerException if error is null
640 */
641 public void closeExceptionally(Throwable error) {
642 if (error == null)
643 throw new NullPointerException();
644 if (!closed) {
645 BufferedSubscription<T> b;
646 synchronized (this) {
647 b = clients;
648 if (!closed) { // don't clobber racing close
649 closedException = error;
650 clients = null;
651 owner = null;
652 closed = true;
653 }
654 }
655 while (b != null) {
656 BufferedSubscription<T> next = b.next;
657 b.next = null;
658 b.onError(error);
659 b = next;
660 }
661 }
662 }
663
664 /**
665 * Returns true if this publisher is not accepting submissions.
666 *
667 * @return true if closed
668 */
669 public boolean isClosed() {
670 return closed;
671 }
672
673 /**
674 * Returns the exception associated with {@link
675 * #closeExceptionally(Throwable) closeExceptionally}, or null if
676 * not closed or if closed normally.
677 *
678 * @return the exception, or null if none
679 */
680 public Throwable getClosedException() {
681 return closedException;
682 }
683
684 /**
685 * Returns true if this publisher has any subscribers.
686 *
687 * @return true if this publisher has any subscribers
688 */
689 public boolean hasSubscribers() {
690 boolean nonEmpty = false;
691 synchronized (this) {
692 for (BufferedSubscription<T> b = clients; b != null;) {
693 BufferedSubscription<T> next = b.next;
694 if (b.isClosed()) {
695 b.next = null;
696 b = clients = next;
697 }
698 else {
699 nonEmpty = true;
700 break;
701 }
702 }
703 }
704 return nonEmpty;
705 }
706
707 /**
708 * Returns the number of current subscribers.
709 *
710 * @return the number of current subscribers
711 */
712 public int getNumberOfSubscribers() {
713 synchronized (this) {
714 return cleanAndCount();
715 }
716 }
717
718 /**
719 * Returns the Executor used for asynchronous delivery.
720 *
721 * @return the Executor used for asynchronous delivery
722 */
723 public Executor getExecutor() {
724 return executor;
725 }
726
727 /**
728 * Returns the maximum per-subscriber buffer capacity.
729 *
730 * @return the maximum per-subscriber buffer capacity
731 */
732 public int getMaxBufferCapacity() {
733 return maxBufferCapacity;
734 }
735
736 /**
737 * Returns a list of current subscribers for monitoring and
738 * tracking purposes, not for invoking {@link Flow.Subscriber}
739 * methods on the subscribers.
740 *
741 * @return list of current subscribers
742 */
743 public List<Subscriber<? super T>> getSubscribers() {
744 ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
745 synchronized (this) {
746 BufferedSubscription<T> pred = null, next;
747 for (BufferedSubscription<T> b = clients; b != null; b = next) {
748 next = b.next;
749 if (b.isClosed()) {
750 b.next = null;
751 if (pred == null)
752 clients = next;
753 else
754 pred.next = next;
755 }
756 else {
757 subs.add(b.subscriber);
758 pred = b;
759 }
760 }
761 }
762 return subs;
763 }
764
765 /**
766 * Returns true if the given Subscriber is currently subscribed.
767 *
768 * @param subscriber the subscriber
769 * @return true if currently subscribed
770 * @throws NullPointerException if subscriber is null
771 */
772 public boolean isSubscribed(Subscriber<? super T> subscriber) {
773 if (subscriber == null) throw new NullPointerException();
774 if (!closed) {
775 synchronized (this) {
776 BufferedSubscription<T> pred = null, next;
777 for (BufferedSubscription<T> b = clients; b != null; b = next) {
778 next = b.next;
779 if (b.isClosed()) {
780 b.next = null;
781 if (pred == null)
782 clients = next;
783 else
784 pred.next = next;
785 }
786 else if (subscriber.equals(b.subscriber))
787 return true;
788 else
789 pred = b;
790 }
791 }
792 }
793 return false;
794 }
795
796 /**
797 * Returns an estimate of the minimum number of items requested
798 * (via {@link Flow.Subscription#request(long) request}) but not
799 * yet produced, among all current subscribers.
800 *
801 * @return the estimate, or zero if no subscribers
802 */
803 public long estimateMinimumDemand() {
804 long min = Long.MAX_VALUE;
805 boolean nonEmpty = false;
806 synchronized (this) {
807 BufferedSubscription<T> pred = null, next;
808 for (BufferedSubscription<T> b = clients; b != null; b = next) {
809 int n; long d;
810 next = b.next;
811 if ((n = b.estimateLag()) < 0) {
812 b.next = null;
813 if (pred == null)
814 clients = next;
815 else
816 pred.next = next;
817 }
818 else {
819 if ((d = b.demand - n) < min)
820 min = d;
821 nonEmpty = true;
822 pred = b;
823 }
824 }
825 }
826 return nonEmpty ? min : 0;
827 }
828
829 /**
830 * Returns an estimate of the maximum number of items produced but
831 * not yet consumed among all current subscribers.
832 *
833 * @return the estimate
834 */
835 public int estimateMaximumLag() {
836 int max = 0;
837 synchronized (this) {
838 BufferedSubscription<T> pred = null, next;
839 for (BufferedSubscription<T> b = clients; b != null; b = next) {
840 int n;
841 next = b.next;
842 if ((n = b.estimateLag()) < 0) {
843 b.next = null;
844 if (pred == null)
845 clients = next;
846 else
847 pred.next = next;
848 }
849 else {
850 if (n > max)
851 max = n;
852 pred = b;
853 }
854 }
855 }
856 return max;
857 }
858
859 /**
860 * Processes all published items using the given Consumer function.
861 * Returns a CompletableFuture that is completed normally when this
862 * publisher signals {@link Flow.Subscriber#onComplete()
863 * onComplete}, or completed exceptionally upon any error, or an
864 * exception is thrown by the Consumer, or the returned
865 * CompletableFuture is cancelled, in which case no further items
866 * are processed.
867 *
868 * @param consumer the function applied to each onNext item
869 * @return a CompletableFuture that is completed normally
870 * when the publisher signals onComplete, and exceptionally
871 * upon any error or cancellation
872 * @throws NullPointerException if consumer is null
873 */
874 public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
|
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.List;
43 import java.util.concurrent.locks.LockSupport;
44 import java.util.concurrent.locks.ReentrantLock;
45 import java.util.function.BiConsumer;
46 import java.util.function.BiPredicate;
47 import java.util.function.Consumer;
48 import static java.util.concurrent.Flow.Publisher;
49 import static java.util.concurrent.Flow.Subscriber;
50 import static java.util.concurrent.Flow.Subscription;
51
52 /**
53 * A {@link Flow.Publisher} that asynchronously issues submitted
54 * (non-null) items to current subscribers until it is closed. Each
55 * current subscriber receives newly submitted items in the same order
56 * unless drops or exceptions are encountered. Using a
57 * SubmissionPublisher allows item generators to act as compliant <a
58 * href="http://www.reactive-streams.org/"> reactive-streams</a>
59 * Publishers relying on drop handling and/or blocking for flow
60 * control.
61 *
62 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
63 * constructor for delivery to subscribers. The best choice of
64 * Executor depends on expected usage. If the generator(s) of
159 * public void onSubscribe(Flow.Subscription subscription) {
160 * (this.subscription = subscription).request(1);
161 * }
162 * public void onNext(S item) {
163 * subscription.request(1);
164 * submit(function.apply(item));
165 * }
166 * public void onError(Throwable ex) { closeExceptionally(ex); }
167 * public void onComplete() { close(); }
168 * }}</pre>
169 *
170 * @param <T> the published item type
171 * @author Doug Lea
172 * @since 9
173 */
174 public class SubmissionPublisher<T> implements Publisher<T>,
175 AutoCloseable {
176 /*
177 * Most mechanics are handled by BufferedSubscription. This class
178 * mainly tracks subscribers and ensures sequentiality, by using
179 * locks across public methods, to ensure thread-safety in the
180 * presence of multiple sources and maintain acquire-release
181 * ordering around user operations. However, we also track whether
182 * there is only a single source, and if so streamline some buffer
183 * operations by avoiding some atomics.
184 */
185
186 /** The largest possible power of two array size. */
187 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
188
189 /**
190 * Initial buffer capacity used when maxBufferCapacity is
191 * greater. Must be a power of two.
192 */
193 static final int INITIAL_CAPACITY = 32;
194
195 /** Round capacity to power of 2, at most limit. */
196 static final int roundCapacity(int cap) {
197 int n = cap - 1;
198 n |= n >>> 1;
199 n |= n >>> 2;
200 n |= n >>> 4;
201 n |= n >>> 8;
202 n |= n >>> 16;
203 return (n <= 0) ? 1 : // at least 1
218 private static final class ThreadPerTaskExecutor implements Executor {
219 ThreadPerTaskExecutor() {} // prevent access constructor creation
220 public void execute(Runnable r) { new Thread(r).start(); }
221 }
222
223 /**
224 * Clients (BufferedSubscriptions) are maintained in a linked list
225 * (via their "next" fields). This works well for publish loops.
226 * It requires O(n) traversal to check for duplicate subscribers,
227 * but we expect that subscribing is much less common than
228 * publishing. Unsubscribing occurs only during traversal loops,
229 * when BufferedSubscription methods return negative values
230 * signifying that they have been closed. To reduce
231 * head-of-line blocking, submit and offer methods first call
232 * BufferedSubscription.offer on each subscriber, and place
233 * saturated ones in retries list (using nextRetry field), and
234 * retry, possibly blocking or dropping.
235 */
236 BufferedSubscription<T> clients;
237
238 /** Lock for exclusion across multiple sources */
239 final ReentrantLock lock;
240 /** Run status, updated only within locks */
241 volatile boolean closed;
242 /** Set true on first call to subscribe, to initialize possible owner */
243 boolean subscribed;
244 /** The first caller thread to subscribe, or null if thread ever changed */
245 Thread owner;
246 /** If non-null, the exception in closeExceptionally */
247 volatile Throwable closedException;
248
249 // Parameters for constructing BufferedSubscriptions
250 final Executor executor;
251 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
252 final int maxBufferCapacity;
253
254 /**
255 * Creates a new SubmissionPublisher using the given Executor for
256 * async delivery to subscribers, with the given maximum buffer size
257 * for each subscriber, and, if non-null, the given handler invoked
258 * when any Subscriber throws an exception in method {@link
259 * Flow.Subscriber#onNext(Object) onNext}.
260 *
261 * @param executor the executor to use for async delivery,
262 * supporting creation of at least one independent thread
263 * @param maxBufferCapacity the maximum capacity for each
264 * subscriber's buffer (the enforced capacity may be rounded up to
265 * the nearest power of two and/or bounded by the largest value
266 * supported by this implementation; method {@link #getMaxBufferCapacity}
267 * returns the actual value)
268 * @param handler if non-null, procedure to invoke upon exception
269 * thrown in method {@code onNext}
270 * @throws NullPointerException if executor is null
271 * @throws IllegalArgumentException if maxBufferCapacity not
272 * positive
273 */
274 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
275 BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
276 if (executor == null)
277 throw new NullPointerException();
278 if (maxBufferCapacity <= 0)
279 throw new IllegalArgumentException("capacity must be positive");
280 this.lock = new ReentrantLock();
281 this.executor = executor;
282 this.onNextHandler = handler;
283 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
284 }
285
286 /**
287 * Creates a new SubmissionPublisher using the given Executor for
288 * async delivery to subscribers, with the given maximum buffer size
289 * for each subscriber, and no handler for Subscriber exceptions in
290 * method {@link Flow.Subscriber#onNext(Object) onNext}.
291 *
292 * @param executor the executor to use for async delivery,
293 * supporting creation of at least one independent thread
294 * @param maxBufferCapacity the maximum capacity for each
295 * subscriber's buffer (the enforced capacity may be rounded up to
296 * the nearest power of two and/or bounded by the largest value
297 * supported by this implementation; method {@link #getMaxBufferCapacity}
298 * returns the actual value)
299 * @throws NullPointerException if executor is null
300 * @throws IllegalArgumentException if maxBufferCapacity not
324 * the existing subscription with an {@link IllegalStateException}.
325 * Otherwise, upon success, the Subscriber's {@link
326 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
327 * asynchronously with a new {@link Flow.Subscription}. If {@link
328 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
329 * subscription is cancelled. Otherwise, if this SubmissionPublisher
330 * was closed exceptionally, then the subscriber's {@link
331 * Flow.Subscriber#onError onError} method is invoked with the
332 * corresponding exception, or if closed without exception, the
333 * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
334 * method is invoked. Subscribers may enable receiving items by
335 * invoking the {@link Flow.Subscription#request(long) request}
336 * method of the new Subscription, and may unsubscribe by invoking
337 * its {@link Flow.Subscription#cancel() cancel} method.
338 *
339 * @param subscriber the subscriber
340 * @throws NullPointerException if subscriber is null
341 */
342 public void subscribe(Subscriber<? super T> subscriber) {
343 if (subscriber == null) throw new NullPointerException();
344 ReentrantLock lock = this.lock;
345 int max = maxBufferCapacity; // allocate initial array
346 Object[] array = new Object[max < INITIAL_CAPACITY ?
347 max : INITIAL_CAPACITY];
348 BufferedSubscription<T> subscription =
349 new BufferedSubscription<T>(subscriber, executor, onNextHandler,
350 array, max);
351 lock.lock();
352 try {
353 if (!subscribed) {
354 subscribed = true;
355 owner = Thread.currentThread();
356 }
357 for (BufferedSubscription<T> b = clients, pred = null;;) {
358 if (b == null) {
359 Throwable ex;
360 subscription.onSubscribe();
361 if ((ex = closedException) != null)
362 subscription.onError(ex);
363 else if (closed)
364 subscription.onComplete();
365 else if (pred == null)
366 clients = subscription;
367 else
368 pred.next = subscription;
369 break;
370 }
371 BufferedSubscription<T> next = b.next;
372 if (b.isClosed()) { // remove
373 b.next = null; // detach
374 if (pred == null)
375 clients = next;
376 else
377 pred.next = next;
378 }
379 else if (subscriber.equals(b.subscriber)) {
380 b.onError(new IllegalStateException("Duplicate subscribe"));
381 break;
382 }
383 else
384 pred = b;
385 b = next;
386 }
387 } finally {
388 lock.unlock();
389 }
390 }
391
392 /**
393 * Common implementation for all three forms of submit and offer.
394 * Acts as submit if nanos == Long.MAX_VALUE, else offer.
395 */
396 private int doOffer(T item, long nanos,
397 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
398 if (item == null) throw new NullPointerException();
399 int lag = 0;
400 boolean complete, unowned;
401 ReentrantLock lock = this.lock;
402 lock.lock();
403 try {
404 Thread t = Thread.currentThread(), o;
405 BufferedSubscription<T> b = clients;
406 if ((unowned = ((o = owner) != t)) && o != null)
407 owner = null; // disable bias
408 if (b == null)
409 complete = closed;
410 else {
411 complete = false;
412 boolean cleanMe = false;
413 BufferedSubscription<T> retries = null, rtail = null, next;
414 do {
415 next = b.next;
416 int stat = b.offer(item, unowned);
417 if (stat == 0) { // saturated; add to retry list
418 b.nextRetry = null; // avoid garbage on exceptions
419 if (rtail == null)
420 retries = b;
421 else
422 rtail.nextRetry = b;
423 rtail = b;
424 }
425 else if (stat < 0) // closed
426 cleanMe = true; // remove later
427 else if (stat > lag)
428 lag = stat;
429 } while ((b = next) != null);
430
431 if (retries != null || cleanMe)
432 lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
433 }
434 } finally {
435 lock.unlock();
436 }
437 if (complete)
438 throw new IllegalStateException("Closed");
439 else
440 return lag;
441 }
442
443 /**
444 * Helps, (timed) waits for, and/or drops buffers on list; returns
445 * lag or negative drops (for use in offer).
446 */
447 private int retryOffer(T item, long nanos,
448 BiPredicate<Subscriber<? super T>, ? super T> onDrop,
449 BufferedSubscription<T> retries, int lag,
450 boolean cleanMe) {
451 for (BufferedSubscription<T> r = retries; r != null;) {
452 BufferedSubscription<T> nextRetry = r.nextRetry;
453 r.nextRetry = null;
454 if (nanos > 0L)
455 r.awaitSpace(nanos);
604 * @throws IllegalStateException if closed
605 * @throws NullPointerException if item is null
606 * @throws RejectedExecutionException if thrown by Executor
607 */
608 public int offer(T item, long timeout, TimeUnit unit,
609 BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
610 long nanos = unit.toNanos(timeout);
611 // distinguishes from untimed (only wrt interrupt policy)
612 if (nanos == Long.MAX_VALUE) --nanos;
613 return doOffer(item, nanos, onDrop);
614 }
615
616 /**
617 * Unless already closed, issues {@link
618 * Flow.Subscriber#onComplete() onComplete} signals to current
619 * subscribers, and disallows subsequent attempts to publish.
620 * Upon return, this method does <em>NOT</em> guarantee that all
621 * subscribers have yet completed.
622 */
623 public void close() {
624 ReentrantLock lock = this.lock;
625 if (!closed) {
626 BufferedSubscription<T> b;
627 lock.lock();
628 try {
629 // no need to re-check closed here
630 b = clients;
631 clients = null;
632 owner = null;
633 closed = true;
634 } finally {
635 lock.unlock();
636 }
637 while (b != null) {
638 BufferedSubscription<T> next = b.next;
639 b.next = null;
640 b.onComplete();
641 b = next;
642 }
643 }
644 }
645
646 /**
647 * Unless already closed, issues {@link
648 * Flow.Subscriber#onError(Throwable) onError} signals to current
649 * subscribers with the given error, and disallows subsequent
650 * attempts to publish. Future subscribers also receive the given
651 * error. Upon return, this method does <em>NOT</em> guarantee
652 * that all subscribers have yet completed.
653 *
654 * @param error the {@code onError} argument sent to subscribers
655 * @throws NullPointerException if error is null
656 */
657 public void closeExceptionally(Throwable error) {
658 if (error == null)
659 throw new NullPointerException();
660 ReentrantLock lock = this.lock;
661 if (!closed) {
662 BufferedSubscription<T> b;
663 lock.lock();
664 try {
665 b = clients;
666 if (!closed) { // don't clobber racing close
667 closedException = error;
668 clients = null;
669 owner = null;
670 closed = true;
671 }
672 } finally {
673 lock.unlock();
674 }
675 while (b != null) {
676 BufferedSubscription<T> next = b.next;
677 b.next = null;
678 b.onError(error);
679 b = next;
680 }
681 }
682 }
683
684 /**
685 * Returns true if this publisher is not accepting submissions.
686 *
687 * @return true if closed
688 */
689 public boolean isClosed() {
690 return closed;
691 }
692
693 /**
694 * Returns the exception associated with {@link
695 * #closeExceptionally(Throwable) closeExceptionally}, or null if
696 * not closed or if closed normally.
697 *
698 * @return the exception, or null if none
699 */
700 public Throwable getClosedException() {
701 return closedException;
702 }
703
704 /**
705 * Returns true if this publisher has any subscribers.
706 *
707 * @return true if this publisher has any subscribers
708 */
709 public boolean hasSubscribers() {
710 boolean nonEmpty = false;
711 ReentrantLock lock = this.lock;
712 lock.lock();
713 try {
714 for (BufferedSubscription<T> b = clients; b != null;) {
715 BufferedSubscription<T> next = b.next;
716 if (b.isClosed()) {
717 b.next = null;
718 b = clients = next;
719 }
720 else {
721 nonEmpty = true;
722 break;
723 }
724 }
725 } finally {
726 lock.unlock();
727 }
728 return nonEmpty;
729 }
730
731 /**
732 * Returns the number of current subscribers.
733 *
734 * @return the number of current subscribers
735 */
736 public int getNumberOfSubscribers() {
737 int n;
738 ReentrantLock lock = this.lock;
739 lock.lock();
740 try {
741 n = cleanAndCount();
742 } finally {
743 lock.unlock();
744 }
745 return n;
746 }
747
748 /**
749 * Returns the Executor used for asynchronous delivery.
750 *
751 * @return the Executor used for asynchronous delivery
752 */
753 public Executor getExecutor() {
754 return executor;
755 }
756
757 /**
758 * Returns the maximum per-subscriber buffer capacity.
759 *
760 * @return the maximum per-subscriber buffer capacity
761 */
762 public int getMaxBufferCapacity() {
763 return maxBufferCapacity;
764 }
765
766 /**
767 * Returns a list of current subscribers for monitoring and
768 * tracking purposes, not for invoking {@link Flow.Subscriber}
769 * methods on the subscribers.
770 *
771 * @return list of current subscribers
772 */
773 public List<Subscriber<? super T>> getSubscribers() {
774 ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
775 ReentrantLock lock = this.lock;
776 lock.lock();
777 try {
778 BufferedSubscription<T> pred = null, next;
779 for (BufferedSubscription<T> b = clients; b != null; b = next) {
780 next = b.next;
781 if (b.isClosed()) {
782 b.next = null;
783 if (pred == null)
784 clients = next;
785 else
786 pred.next = next;
787 }
788 else {
789 subs.add(b.subscriber);
790 pred = b;
791 }
792 }
793 } finally {
794 lock.unlock();
795 }
796 return subs;
797 }
798
799 /**
800 * Returns true if the given Subscriber is currently subscribed.
801 *
802 * @param subscriber the subscriber
803 * @return true if currently subscribed
804 * @throws NullPointerException if subscriber is null
805 */
806 public boolean isSubscribed(Subscriber<? super T> subscriber) {
807 if (subscriber == null) throw new NullPointerException();
808 boolean subscribed = false;
809 ReentrantLock lock = this.lock;
810 if (!closed) {
811 lock.lock();
812 try {
813 BufferedSubscription<T> pred = null, next;
814 for (BufferedSubscription<T> b = clients; b != null; b = next) {
815 next = b.next;
816 if (b.isClosed()) {
817 b.next = null;
818 if (pred == null)
819 clients = next;
820 else
821 pred.next = next;
822 }
823 else if (subscribed = subscriber.equals(b.subscriber))
824 break;
825 else
826 pred = b;
827 }
828 } finally {
829 lock.unlock();
830 }
831 }
832 return subscribed;
833 }
834
835 /**
836 * Returns an estimate of the minimum number of items requested
837 * (via {@link Flow.Subscription#request(long) request}) but not
838 * yet produced, among all current subscribers.
839 *
840 * @return the estimate, or zero if no subscribers
841 */
842 public long estimateMinimumDemand() {
843 long min = Long.MAX_VALUE;
844 boolean nonEmpty = false;
845 ReentrantLock lock = this.lock;
846 lock.lock();
847 try {
848 BufferedSubscription<T> pred = null, next;
849 for (BufferedSubscription<T> b = clients; b != null; b = next) {
850 int n; long d;
851 next = b.next;
852 if ((n = b.estimateLag()) < 0) {
853 b.next = null;
854 if (pred == null)
855 clients = next;
856 else
857 pred.next = next;
858 }
859 else {
860 if ((d = b.demand - n) < min)
861 min = d;
862 nonEmpty = true;
863 pred = b;
864 }
865 }
866 } finally {
867 lock.unlock();
868 }
869 return nonEmpty ? min : 0;
870 }
871
872 /**
873 * Returns an estimate of the maximum number of items produced but
874 * not yet consumed among all current subscribers.
875 *
876 * @return the estimate
877 */
878 public int estimateMaximumLag() {
879 int max = 0;
880 ReentrantLock lock = this.lock;
881 lock.lock();
882 try {
883 BufferedSubscription<T> pred = null, next;
884 for (BufferedSubscription<T> b = clients; b != null; b = next) {
885 int n;
886 next = b.next;
887 if ((n = b.estimateLag()) < 0) {
888 b.next = null;
889 if (pred == null)
890 clients = next;
891 else
892 pred.next = next;
893 }
894 else {
895 if (n > max)
896 max = n;
897 pred = b;
898 }
899 }
900 } finally {
901 lock.unlock();
902 }
903 return max;
904 }
905
906 /**
907 * Processes all published items using the given Consumer function.
908 * Returns a CompletableFuture that is completed normally when this
909 * publisher signals {@link Flow.Subscriber#onComplete()
910 * onComplete}, or completed exceptionally upon any error, or an
911 * exception is thrown by the Consumer, or the returned
912 * CompletableFuture is cancelled, in which case no further items
913 * are processed.
914 *
915 * @param consumer the function applied to each onNext item
916 * @return a CompletableFuture that is completed normally
917 * when the publisher signals onComplete, and exceptionally
918 * upon any error or cancellation
919 * @throws NullPointerException if consumer is null
920 */
921 public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
|