rev 12972 : 8140606: Update library code to use internal Unsafe
Reviewed-by: duke
1 /*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.util.ArrayList;
39 import java.util.List;
40 import java.util.concurrent.locks.LockSupport;
41 import java.util.function.BiConsumer;
42 import java.util.function.BiPredicate;
43 import java.util.function.Consumer;
44
45 /**
46 * A {@link Flow.Publisher} that asynchronously issues submitted
47 * (non-null) items to current subscribers until it is closed. Each
48 * current subscriber receives newly submitted items in the same order
49 * unless drops or exceptions are encountered. Using a
50 * SubmissionPublisher allows item generators to act as compliant <a
51 * href="http://www.reactive-streams.org/"> reactive-streams</a>
52 * Publishers relying on drop handling and/or blocking for flow
53 * control.
54 *
55 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
56 * constructor for delivery to subscribers. The best choice of
57 * Executor depends on expected usage. If the generator(s) of
58 * submitted items run in separate threads, and the number of
59 * subscribers can be estimated, consider using a {@link
60 * Executors#newFixedThreadPool}. Otherwise consider using the
61 * default, normally the {@link ForkJoinPool#commonPool}.
62 *
63 * <p>Buffering allows producers and consumers to transiently operate
64 * at different rates. Each subscriber uses an independent buffer.
65 * Buffers are created upon first use and expanded as needed up to the
66 * given maximum. (The enforced capacity may be rounded up to the
67 * nearest power of two and/or bounded by the largest value supported
68 * by this implementation.) Invocations of {@link
69 * Flow.Subscription#request(long) request} do not directly result in
70 * buffer expansion, but risk saturation if unfilled requests exceed
71 * the maximum capacity. The default value of {@link
72 * Flow#defaultBufferSize()} may provide a useful starting point for
73 * choosing a capacity based on expected rates, resources, and usages.
74 *
75 * <p>Publication methods support different policies about what to do
76 * when buffers are saturated. Method {@link #submit(Object) submit}
77 * blocks until resources are available. This is simplest, but least
78 * responsive. The {@code offer} methods may drop items (either
79 * immediately or with bounded timeout), but provide an opportunity to
80 * interpose a handler and then retry.
81 *
82 * <p>If any Subscriber method throws an exception, its subscription
83 * is cancelled. If a handler is supplied as a constructor argument,
84 * it is invoked before cancellation upon an exception in method
85 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
86 * {@link Flow.Subscriber#onSubscribe onSubscribe},
87 * {@link Flow.Subscriber#onError(Throwable) onError} and
88 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
89 * handled before cancellation. If the supplied Executor throws
90 * {@link RejectedExecutionException} (or any other RuntimeException
91 * or Error) when attempting to execute a task, or a drop handler
92 * throws an exception when processing a dropped item, then the
93 * exception is rethrown. In these cases, not all subscribers will
94 * have been issued the published item. It is usually good practice to
95 * {@link #closeExceptionally closeExceptionally} in these cases.
96 *
97 * <p>Method {@link #consume(Consumer)} simplifies support for a
98 * common case in which the only action of a subscriber is to request
99 * and process all items using a supplied function.
100 *
101 * <p>This class may also serve as a convenient base for subclasses
102 * that generate items, and use the methods in this class to publish
103 * them. For example here is a class that periodically publishes the
104 * items generated from a supplier. (In practice you might add methods
105 * to independently start and stop generation, to share Executors
106 * among publishers, and so on, or use a SubmissionPublisher as a
107 * component rather than a superclass.)
108 *
109 * <pre> {@code
110 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
111 * final ScheduledFuture<?> periodicTask;
112 * final ScheduledExecutorService scheduler;
113 * PeriodicPublisher(Executor executor, int maxBufferCapacity,
114 * Supplier<? extends T> supplier,
115 * long period, TimeUnit unit) {
116 * super(executor, maxBufferCapacity);
117 * scheduler = new ScheduledThreadPoolExecutor(1);
118 * periodicTask = scheduler.scheduleAtFixedRate(
119 * () -> submit(supplier.get()), 0, period, unit);
120 * }
121 * public void close() {
122 * periodicTask.cancel(false);
123 * scheduler.shutdown();
124 * super.close();
125 * }
126 * }}</pre>
127 *
128 * <p>Here is an example of a {@link Flow.Processor} implementation.
129 * It uses single-step requests to its publisher for simplicity of
130 * illustration. A more adaptive version could monitor flow using the
131 * lag estimate returned from {@code submit}, along with other utility
132 * methods.
133 *
134 * <pre> {@code
135 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
136 * implements Flow.Processor<S,T> {
137 * final Function<? super S, ? extends T> function;
138 * Flow.Subscription subscription;
139 * TransformProcessor(Executor executor, int maxBufferCapacity,
140 * Function<? super S, ? extends T> function) {
141 * super(executor, maxBufferCapacity);
142 * this.function = function;
143 * }
144 * public void onSubscribe(Flow.Subscription subscription) {
145 * (this.subscription = subscription).request(1);
146 * }
147 * public void onNext(S item) {
148 * subscription.request(1);
149 * submit(function.apply(item));
150 * }
151 * public void onError(Throwable ex) { closeExceptionally(ex); }
152 * public void onComplete() { close(); }
153 * }}</pre>
154 *
155 * @param <T> the published item type
156 * @author Doug Lea
157 * @since 1.9
158 */
159 public class SubmissionPublisher<T> implements Flow.Publisher<T>,
160 AutoCloseable {
161 /*
162 * Most mechanics are handled by BufferedSubscription. This class
163 * mainly tracks subscribers and ensures sequentiality, by using
164 * built-in synchronization locks across public methods. (Using
165 * built-in locks works well in the most typical case in which
166 * only one thread submits items).
167 */
168
169 /** The largest possible power of two array size. */
170 static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
171
172 /** Round capacity to power of 2, at most limit. */
173 static final int roundCapacity(int cap) {
174 int n = cap - 1;
175 n |= n >>> 1;
176 n |= n >>> 2;
177 n |= n >>> 4;
178 n |= n >>> 8;
179 n |= n >>> 16;
180 return (n <= 0) ? 1 : // at least 1
181 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
182 }
183
184 // default Executor setup; nearly the same as CompletableFuture
185
186 /**
187 * Default executor -- ForkJoinPool.commonPool() unless it cannot
188 * support parallelism.
189 */
190 private static final Executor ASYNC_POOL =
191 (ForkJoinPool.getCommonPoolParallelism() > 1) ?
192 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
193
194 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
195 private static final class ThreadPerTaskExecutor implements Executor {
196 public void execute(Runnable r) { new Thread(r).start(); }
197 }
198
199 /**
200 * Clients (BufferedSubscriptions) are maintained in a linked list
201 * (via their "next" fields). This works well for publish loops.
202 * It requires O(n) traversal to check for duplicate subscribers,
203 * but we expect that subscribing is much less common than
204 * publishing. Unsubscribing occurs only during traversal loops,
205 * when BufferedSubscription methods return negative values
206 * signifying that they have been disabled. To reduce
207 * head-of-line blocking, submit and offer methods first call
208 * BufferedSubscription.offer on each subscriber, and place
209 * saturated ones in retries list (using nextRetry field), and
210 * retry, possibly blocking or dropping.
211 */
212 BufferedSubscription<T> clients;
213
214 /** Run status, updated only within locks */
215 volatile boolean closed;
216 /** If non-null, the exception in closeExceptionally */
217 volatile Throwable closedException;
218
219 // Parameters for constructing BufferedSubscriptions
220 final Executor executor;
221 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
222 final int maxBufferCapacity;
223
224 /**
225 * Creates a new SubmissionPublisher using the given Executor for
226 * async delivery to subscribers, with the given maximum buffer size
227 * for each subscriber, and, if non-null, the given handler invoked
228 * when any Subscriber throws an exception in method {@link
229 * Flow.Subscriber#onNext(Object) onNext}.
230 *
231 * @param executor the executor to use for async delivery,
232 * supporting creation of at least one independent thread
233 * @param maxBufferCapacity the maximum capacity for each
234 * subscriber's buffer (the enforced capacity may be rounded up to
235 * the nearest power of two and/or bounded by the largest value
236 * supported by this implementation; method {@link #getMaxBufferCapacity}
237 * returns the actual value)
238 * @param handler if non-null, procedure to invoke upon exception
239 * thrown in method {@code onNext}
240 * @throws NullPointerException if executor is null
241 * @throws IllegalArgumentException if maxBufferCapacity not
242 * positive
243 */
244 public SubmissionPublisher(Executor executor, int maxBufferCapacity,
245 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
246 if (executor == null)
247 throw new NullPointerException();
248 if (maxBufferCapacity <= 0)
249 throw new IllegalArgumentException("capacity must be positive");
250 this.executor = executor;
251 this.onNextHandler = handler;
252 this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
253 }
254
255 /**
256 * Creates a new SubmissionPublisher using the given Executor for
257 * async delivery to subscribers, with the given maximum buffer size
258 * for each subscriber, and no handler for Subscriber exceptions in
259 * method {@link Flow.Subscriber#onNext(Object) onNext}.
260 *
261 * @param executor the executor to use for async delivery,
262 * supporting creation of at least one independent thread
263 * @param maxBufferCapacity the maximum capacity for each
264 * subscriber's buffer (the enforced capacity may be rounded up to
265 * the nearest power of two and/or bounded by the largest value
266 * supported by this implementation; method {@link #getMaxBufferCapacity}
267 * returns the actual value)
268 * @throws NullPointerException if executor is null
269 * @throws IllegalArgumentException if maxBufferCapacity not
270 * positive
271 */
272 public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
273 this(executor, maxBufferCapacity, null);
274 }
275
276 /**
277 * Creates a new SubmissionPublisher using the {@link
278 * ForkJoinPool#commonPool()} for async delivery to subscribers
279 * (unless it does not support a parallelism level of at least two,
280 * in which case, a new Thread is created to run each task), with
281 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
282 * handler for Subscriber exceptions in method {@link
283 * Flow.Subscriber#onNext(Object) onNext}.
284 */
285 public SubmissionPublisher() {
286 this(ASYNC_POOL, Flow.defaultBufferSize(), null);
287 }
288
289 /**
290 * Adds the given Subscriber unless already subscribed. If already
291 * subscribed, the Subscriber's {@link
292 * Flow.Subscriber#onError(Throwable) onError} method is invoked on
293 * the existing subscription with an {@link IllegalStateException}.
294 * Otherwise, upon success, the Subscriber's {@link
295 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
296 * asynchronously with a new {@link Flow.Subscription}. If {@link
297 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
298 * subscription is cancelled. Otherwise, if this SubmissionPublisher
299 * was closed exceptionally, then the subscriber's {@link
300 * Flow.Subscriber#onError onError} method is invoked with the
301 * corresponding exception, or if closed without exception, the
302 * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
303 * method is invoked. Subscribers may enable receiving items by
304 * invoking the {@link Flow.Subscription#request(long) request}
305 * method of the new Subscription, and may unsubscribe by invoking
306 * its {@link Flow.Subscription#cancel() cancel} method.
307 *
308 * @param subscriber the subscriber
309 * @throws NullPointerException if subscriber is null
310 */
311 public void subscribe(Flow.Subscriber<? super T> subscriber) {
312 if (subscriber == null) throw new NullPointerException();
313 BufferedSubscription<T> subscription =
314 new BufferedSubscription<T>(subscriber, executor,
315 onNextHandler, maxBufferCapacity);
316 synchronized (this) {
317 for (BufferedSubscription<T> b = clients, pred = null;;) {
318 if (b == null) {
319 Throwable ex;
320 subscription.onSubscribe();
321 if ((ex = closedException) != null)
322 subscription.onError(ex);
323 else if (closed)
324 subscription.onComplete();
325 else if (pred == null)
326 clients = subscription;
327 else
328 pred.next = subscription;
329 break;
330 }
331 BufferedSubscription<T> next = b.next;
332 if (b.isDisabled()) { // remove
333 b.next = null; // detach
334 if (pred == null)
335 clients = next;
336 else
337 pred.next = next;
338 }
339 else if (subscriber.equals(b.subscriber)) {
340 b.onError(new IllegalStateException("Duplicate subscribe"));
341 break;
342 }
343 else
344 pred = b;
345 b = next;
346 }
347 }
348 }
349
350 /**
351 * Publishes the given item to each current subscriber by
352 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
353 * onNext} method, blocking uninterruptibly while resources for any
354 * subscriber are unavailable. This method returns an estimate of
355 * the maximum lag (number of items submitted but not yet consumed)
356 * among all current subscribers. This value is at least one
357 * (accounting for this submitted item) if there are any
358 * subscribers, else zero.
359 *
360 * <p>If the Executor for this publisher throws a
361 * RejectedExecutionException (or any other RuntimeException or
362 * Error) when attempting to asynchronously notify subscribers,
363 * then this exception is rethrown, in which case not all
364 * subscribers will have been issued this item.
365 *
366 * @param item the (non-null) item to publish
367 * @return the estimated maximum lag among subscribers
368 * @throws IllegalStateException if closed
369 * @throws NullPointerException if item is null
370 * @throws RejectedExecutionException if thrown by Executor
371 */
372 public int submit(T item) {
373 if (item == null) throw new NullPointerException();
374 int lag = 0;
375 boolean complete;
376 synchronized (this) {
377 complete = closed;
378 BufferedSubscription<T> b = clients;
379 if (!complete) {
380 BufferedSubscription<T> pred = null, r = null, rtail = null;
381 while (b != null) {
382 BufferedSubscription<T> next = b.next;
383 int stat = b.offer(item);
384 if (stat < 0) { // disabled
385 b.next = null;
386 if (pred == null)
387 clients = next;
388 else
389 pred.next = next;
390 }
391 else {
392 if (stat > lag)
393 lag = stat;
394 else if (stat == 0) { // place on retry list
395 b.nextRetry = null;
396 if (rtail == null)
397 r = b;
398 else
399 rtail.nextRetry = b;
400 rtail = b;
401 }
402 pred = b;
403 }
404 b = next;
405 }
406 while (r != null) {
407 BufferedSubscription<T> nextRetry = r.nextRetry;
408 r.nextRetry = null;
409 int stat = r.submit(item);
410 if (stat > lag)
411 lag = stat;
412 else if (stat < 0 && clients == r)
413 clients = r.next; // postpone internal unsubscribes
414 r = nextRetry;
415 }
416 }
417 }
418 if (complete)
419 throw new IllegalStateException("Closed");
420 else
421 return lag;
422 }
423
424 /**
425 * Publishes the given item, if possible, to each current subscriber
426 * by asynchronously invoking its {@link
427 * Flow.Subscriber#onNext(Object) onNext} method. The item may be
428 * dropped by one or more subscribers if resource limits are
429 * exceeded, in which case the given handler (if non-null) is
430 * invoked, and if it returns true, retried once. Other calls to
431 * methods in this class by other threads are blocked while the
432 * handler is invoked. Unless recovery is assured, options are
433 * usually limited to logging the error and/or issuing an {@link
434 * Flow.Subscriber#onError(Throwable) onError} signal to the
435 * subscriber.
436 *
437 * <p>This method returns a status indicator: If negative, it
438 * represents the (negative) number of drops (failed attempts to
439 * issue the item to a subscriber). Otherwise it is an estimate of
440 * the maximum lag (number of items submitted but not yet
441 * consumed) among all current subscribers. This value is at least
442 * one (accounting for this submitted item) if there are any
443 * subscribers, else zero.
444 *
445 * <p>If the Executor for this publisher throws a
446 * RejectedExecutionException (or any other RuntimeException or
447 * Error) when attempting to asynchronously notify subscribers, or
448 * the drop handler throws an exception when processing a dropped
449 * item, then this exception is rethrown.
450 *
451 * @param item the (non-null) item to publish
452 * @param onDrop if non-null, the handler invoked upon a drop to a
453 * subscriber, with arguments of the subscriber and item; if it
454 * returns true, an offer is re-attempted (once)
455 * @return if negative, the (negative) number of drops; otherwise
456 * an estimate of maximum lag
457 * @throws IllegalStateException if closed
458 * @throws NullPointerException if item is null
459 * @throws RejectedExecutionException if thrown by Executor
460 */
461 public int offer(T item,
462 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
463 return doOffer(0L, item, onDrop);
464 }
465
466 /**
467 * Publishes the given item, if possible, to each current subscriber
468 * by asynchronously invoking its {@link
469 * Flow.Subscriber#onNext(Object) onNext} method, blocking while
470 * resources for any subscription are unavailable, up to the
471 * specified timeout or until the caller thread is interrupted, at
472 * which point the given handler (if non-null) is invoked, and if it
473 * returns true, retried once. (The drop handler may distinguish
474 * timeouts from interrupts by checking whether the current thread
475 * is interrupted.) Other calls to methods in this class by other
476 * threads are blocked while the handler is invoked. Unless
477 * recovery is assured, options are usually limited to logging the
478 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
479 * onError} signal to the subscriber.
480 *
481 * <p>This method returns a status indicator: If negative, it
482 * represents the (negative) number of drops (failed attempts to
483 * issue the item to a subscriber). Otherwise it is an estimate of
484 * the maximum lag (number of items submitted but not yet
485 * consumed) among all current subscribers. This value is at least
486 * one (accounting for this submitted item) if there are any
487 * subscribers, else zero.
488 *
489 * <p>If the Executor for this publisher throws a
490 * RejectedExecutionException (or any other RuntimeException or
491 * Error) when attempting to asynchronously notify subscribers, or
492 * the drop handler throws an exception when processing a dropped
493 * item, then this exception is rethrown.
494 *
495 * @param item the (non-null) item to publish
496 * @param timeout how long to wait for resources for any subscriber
497 * before giving up, in units of {@code unit}
498 * @param unit a {@code TimeUnit} determining how to interpret the
499 * {@code timeout} parameter
500 * @param onDrop if non-null, the handler invoked upon a drop to a
501 * subscriber, with arguments of the subscriber and item; if it
502 * returns true, an offer is re-attempted (once)
503 * @return if negative, the (negative) number of drops; otherwise
504 * an estimate of maximum lag
505 * @throws IllegalStateException if closed
506 * @throws NullPointerException if item is null
507 * @throws RejectedExecutionException if thrown by Executor
508 */
509 public int offer(T item, long timeout, TimeUnit unit,
510 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
511 return doOffer(unit.toNanos(timeout), item, onDrop);
512 }
513
514 /** Common implementation for both forms of offer */
515 final int doOffer(long nanos, T item,
516 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
517 if (item == null) throw new NullPointerException();
518 int lag = 0, drops = 0;
519 boolean complete;
520 synchronized (this) {
521 complete = closed;
522 BufferedSubscription<T> b = clients;
523 if (!complete) {
524 BufferedSubscription<T> pred = null, r = null, rtail = null;
525 while (b != null) {
526 BufferedSubscription<T> next = b.next;
527 int stat = b.offer(item);
528 if (stat < 0) {
529 b.next = null;
530 if (pred == null)
531 clients = next;
532 else
533 pred.next = next;
534 }
535 else {
536 if (stat > lag)
537 lag = stat;
538 else if (stat == 0) {
539 b.nextRetry = null;
540 if (rtail == null)
541 r = b;
542 else
543 rtail.nextRetry = b;
544 rtail = b;
545 }
546 else if (stat > lag)
547 lag = stat;
548 pred = b;
549 }
550 b = next;
551 }
552 while (r != null) {
553 BufferedSubscription<T> nextRetry = r.nextRetry;
554 r.nextRetry = null;
555 int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
556 r.offer(item);
557 if (stat == 0 && onDrop != null &&
558 onDrop.test(r.subscriber, item))
559 stat = r.offer(item);
560 if (stat == 0)
561 ++drops;
562 else if (stat > lag)
563 lag = stat;
564 else if (stat < 0 && clients == r)
565 clients = r.next;
566 r = nextRetry;
567 }
568 }
569 }
570 if (complete)
571 throw new IllegalStateException("Closed");
572 else
573 return (drops > 0) ? -drops : lag;
574 }
575
576 /**
577 * Unless already closed, issues {@link
578 * Flow.Subscriber#onComplete() onComplete} signals to current
579 * subscribers, and disallows subsequent attempts to publish.
580 * Upon return, this method does <em>NOT</em> guarantee that all
581 * subscribers have yet completed.
582 */
583 public void close() {
584 if (!closed) {
585 BufferedSubscription<T> b;
586 synchronized (this) {
587 b = clients;
588 clients = null;
589 closed = true;
590 }
591 while (b != null) {
592 BufferedSubscription<T> next = b.next;
593 b.next = null;
594 b.onComplete();
595 b = next;
596 }
597 }
598 }
599
600 /**
601 * Unless already closed, issues {@link
602 * Flow.Subscriber#onError(Throwable) onError} signals to current
603 * subscribers with the given error, and disallows subsequent
604 * attempts to publish. Future subscribers also receive the given
605 * error. Upon return, this method does <em>NOT</em> guarantee
606 * that all subscribers have yet completed.
607 *
608 * @param error the {@code onError} argument sent to subscribers
609 * @throws NullPointerException if error is null
610 */
611 public void closeExceptionally(Throwable error) {
612 if (error == null)
613 throw new NullPointerException();
614 if (!closed) {
615 BufferedSubscription<T> b;
616 synchronized (this) {
617 b = clients;
618 clients = null;
619 closed = true;
620 closedException = error;
621 }
622 while (b != null) {
623 BufferedSubscription<T> next = b.next;
624 b.next = null;
625 b.onError(error);
626 b = next;
627 }
628 }
629 }
630
631 /**
632 * Returns true if this publisher is not accepting submissions.
633 *
634 * @return true if closed
635 */
636 public boolean isClosed() {
637 return closed;
638 }
639
640 /**
641 * Returns the exception associated with {@link
642 * #closeExceptionally(Throwable) closeExceptionally}, or null if
643 * not closed or if closed normally.
644 *
645 * @return the exception, or null if none
646 */
647 public Throwable getClosedException() {
648 return closedException;
649 }
650
651 /**
652 * Returns true if this publisher has any subscribers.
653 *
654 * @return true if this publisher has any subscribers
655 */
656 public boolean hasSubscribers() {
657 boolean nonEmpty = false;
658 if (!closed) {
659 synchronized (this) {
660 for (BufferedSubscription<T> b = clients; b != null;) {
661 BufferedSubscription<T> next = b.next;
662 if (b.isDisabled()) {
663 b.next = null;
664 b = clients = next;
665 }
666 else {
667 nonEmpty = true;
668 break;
669 }
670 }
671 }
672 }
673 return nonEmpty;
674 }
675
676 /**
677 * Returns the number of current subscribers.
678 *
679 * @return the number of current subscribers
680 */
681 public int getNumberOfSubscribers() {
682 int count = 0;
683 if (!closed) {
684 synchronized (this) {
685 BufferedSubscription<T> pred = null, next;
686 for (BufferedSubscription<T> b = clients; b != null; b = next) {
687 next = b.next;
688 if (b.isDisabled()) {
689 b.next = null;
690 if (pred == null)
691 clients = next;
692 else
693 pred.next = next;
694 }
695 else {
696 pred = b;
697 ++count;
698 }
699 }
700 }
701 }
702 return count;
703 }
704
705 /**
706 * Returns the Executor used for asynchronous delivery.
707 *
708 * @return the Executor used for asynchronous delivery
709 */
710 public Executor getExecutor() {
711 return executor;
712 }
713
714 /**
715 * Returns the maximum per-subscriber buffer capacity.
716 *
717 * @return the maximum per-subscriber buffer capacity
718 */
719 public int getMaxBufferCapacity() {
720 return maxBufferCapacity;
721 }
722
723 /**
724 * Returns a list of current subscribers for monitoring and
725 * tracking purposes, not for invoking {@link Flow.Subscriber}
726 * methods on the subscribers.
727 *
728 * @return list of current subscribers
729 */
730 public List<Flow.Subscriber<? super T>> getSubscribers() {
731 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
732 synchronized (this) {
733 BufferedSubscription<T> pred = null, next;
734 for (BufferedSubscription<T> b = clients; b != null; b = next) {
735 next = b.next;
736 if (b.isDisabled()) {
737 b.next = null;
738 if (pred == null)
739 clients = next;
740 else
741 pred.next = next;
742 }
743 else
744 subs.add(b.subscriber);
745 }
746 }
747 return subs;
748 }
749
750 /**
751 * Returns true if the given Subscriber is currently subscribed.
752 *
753 * @param subscriber the subscriber
754 * @return true if currently subscribed
755 * @throws NullPointerException if subscriber is null
756 */
757 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
758 if (subscriber == null) throw new NullPointerException();
759 if (!closed) {
760 synchronized (this) {
761 BufferedSubscription<T> pred = null, next;
762 for (BufferedSubscription<T> b = clients; b != null; b = next) {
763 next = b.next;
764 if (b.isDisabled()) {
765 b.next = null;
766 if (pred == null)
767 clients = next;
768 else
769 pred.next = next;
770 }
771 else if (subscriber.equals(b.subscriber))
772 return true;
773 else
774 pred = b;
775 }
776 }
777 }
778 return false;
779 }
780
781 /**
782 * Returns an estimate of the minimum number of items requested
783 * (via {@link Flow.Subscription#request(long) request}) but not
784 * yet produced, among all current subscribers.
785 *
786 * @return the estimate, or zero if no subscribers
787 */
788 public long estimateMinimumDemand() {
789 long min = Long.MAX_VALUE;
790 boolean nonEmpty = false;
791 synchronized (this) {
792 BufferedSubscription<T> pred = null, next;
793 for (BufferedSubscription<T> b = clients; b != null; b = next) {
794 int n; long d;
795 next = b.next;
796 if ((n = b.estimateLag()) < 0) {
797 b.next = null;
798 if (pred == null)
799 clients = next;
800 else
801 pred.next = next;
802 }
803 else {
804 if ((d = b.demand - n) < min)
805 min = d;
806 nonEmpty = true;
807 pred = b;
808 }
809 }
810 }
811 return nonEmpty ? min : 0;
812 }
813
814 /**
815 * Returns an estimate of the maximum number of items produced but
816 * not yet consumed among all current subscribers.
817 *
818 * @return the estimate
819 */
820 public int estimateMaximumLag() {
821 int max = 0;
822 synchronized (this) {
823 BufferedSubscription<T> pred = null, next;
824 for (BufferedSubscription<T> b = clients; b != null; b = next) {
825 int n;
826 next = b.next;
827 if ((n = b.estimateLag()) < 0) {
828 b.next = null;
829 if (pred == null)
830 clients = next;
831 else
832 pred.next = next;
833 }
834 else {
835 if (n > max)
836 max = n;
837 pred = b;
838 }
839 }
840 }
841 return max;
842 }
843
844 /**
845 * Processes all published items using the given Consumer function.
846 * Returns a CompletableFuture that is completed normally when this
847 * publisher signals {@link Flow.Subscriber#onComplete()
848 * onComplete}, or completed exceptionally upon any error, or an
849 * exception is thrown by the Consumer, or the returned
850 * CompletableFuture is cancelled, in which case no further items
851 * are processed.
852 *
853 * @param consumer the function applied to each onNext item
854 * @return a CompletableFuture that is completed normally
855 * when the publisher signals onComplete, and exceptionally
856 * upon any error or cancellation
857 * @throws NullPointerException if consumer is null
858 */
859 public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
860 if (consumer == null)
861 throw new NullPointerException();
862 CompletableFuture<Void> status = new CompletableFuture<>();
863 subscribe(new ConsumerSubscriber<T>(status, consumer));
864 return status;
865 }
866
867 /** Subscriber for method consume */
868 private static final class ConsumerSubscriber<T>
869 implements Flow.Subscriber<T> {
870 final CompletableFuture<Void> status;
871 final Consumer<? super T> consumer;
872 Flow.Subscription subscription;
873 ConsumerSubscriber(CompletableFuture<Void> status,
874 Consumer<? super T> consumer) {
875 this.status = status; this.consumer = consumer;
876 }
877 public final void onSubscribe(Flow.Subscription subscription) {
878 this.subscription = subscription;
879 status.whenComplete((v, e) -> subscription.cancel());
880 if (!status.isDone())
881 subscription.request(Long.MAX_VALUE);
882 }
883 public final void onError(Throwable ex) {
884 status.completeExceptionally(ex);
885 }
886 public final void onComplete() {
887 status.complete(null);
888 }
889 public final void onNext(T item) {
890 try {
891 consumer.accept(item);
892 } catch (Throwable ex) {
893 subscription.cancel();
894 status.completeExceptionally(ex);
895 }
896 }
897 }
898
899 /**
900 * A task for consuming buffer items and signals, created and
901 * executed whenever they become available. A task consumes as
902 * many items/signals as possible before terminating, at which
903 * point another task is created when needed. The dual Runnable
904 * and ForkJoinTask declaration saves overhead when executed by
905 * ForkJoinPools, without impacting other kinds of Executors.
906 */
907 @SuppressWarnings("serial")
908 static final class ConsumerTask<T> extends ForkJoinTask<Void>
909 implements Runnable {
910 final BufferedSubscription<T> consumer;
911 ConsumerTask(BufferedSubscription<T> consumer) {
912 this.consumer = consumer;
913 }
914 public final Void getRawResult() { return null; }
915 public final void setRawResult(Void v) {}
916 public final boolean exec() { consumer.consume(); return false; }
917 public final void run() { consumer.consume(); }
918 }
919
920 /**
921 * A bounded (ring) buffer with integrated control to start a
922 * consumer task whenever items are available. The buffer
923 * algorithm is similar to one used inside ForkJoinPool (see its
924 * internal documentation for details) specialized for the case of
925 * at most one concurrent producer and consumer, and power of two
926 * buffer sizes. This allows methods to operate without locks even
927 * while supporting resizing, blocking, task-triggering, and
928 * garbage-free buffers (nulling out elements when consumed),
929 * although supporting these does impose a bit of overhead
930 * compared to plain fixed-size ring buffers.
931 *
932 * The publisher guarantees a single producer via its lock. We
933 * ensure in this class that there is at most one consumer. The
934 * request and cancel methods must be fully thread-safe but are
935 * coded to exploit the most common case in which they are only
936 * called by consumers (usually within onNext).
937 *
938 * Execution control is managed using the ACTIVE ctl bit. We
939 * ensure that a task is active when consumable items (and
940 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
941 * there is demand (unfilled requests). This is complicated on
942 * the creation side by the possibility of exceptions when trying
943 * to execute tasks. These eventually force DISABLED state, but
944 * sometimes not directly. On the task side, termination (clearing
945 * ACTIVE) that would otherwise race with producers or request()
946 * calls uses the CONSUME keep-alive bit to force a recheck.
947 *
948 * The ctl field also manages run state. When DISABLED, no further
949 * updates are possible. Disabling may be preceded by setting
950 * ERROR or COMPLETE (or both -- ERROR has precedence), in which
951 * case the associated Subscriber methods are invoked, possibly
952 * synchronously if there is no active consumer task (including
953 * cases where execute() failed). The cancel() method is supported
954 * by treating as ERROR but suppressing onError signal.
955 *
956 * Support for blocking also exploits the fact that there is only
957 * one possible waiter. ManagedBlocker-compatible control fields
958 * are placed in this class itself rather than in wait-nodes.
959 * Blocking control relies on the "waiter" field. Producers set
960 * the field before trying to block, but must then recheck (via
961 * offer) before parking. Signalling then just unparks and clears
962 * waiter field. If the producer and consumer are both in the same
963 * ForkJoinPool, or consumers are running in commonPool, the
964 * producer attempts to help run consumer tasks that it forked
965 * before blocking. To avoid potential cycles, only one level of
966 * helping is currently supported.
967 *
968 * This class uses @Contended and heuristic field declaration
969 * ordering to reduce false-sharing-based memory contention among
970 * instances of BufferedSubscription, but it does not currently
971 * attempt to avoid memory contention among buffers. This field
972 * and element packing can hurt performance especially when each
973 * publisher has only one client operating at a high rate.
974 * Addressing this may require allocating substantially more space
975 * than users expect.
976 */
977 @SuppressWarnings("serial")
978 @sun.misc.Contended
979 private static final class BufferedSubscription<T>
980 implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
981 // Order-sensitive field declarations
982 long timeout; // > 0 if timed wait
983 volatile long demand; // # unfilled requests
984 int maxCapacity; // reduced on OOME
985 int putStat; // offer result for ManagedBlocker
986 int helpDepth; // nested helping depth (at most 1)
987 volatile int ctl; // atomic run state flags
988 volatile int head; // next position to take
989 int tail; // next position to put
990 Object[] array; // buffer: null if disabled
991 Flow.Subscriber<? super T> subscriber; // null if disabled
992 Executor executor; // null if disabled
993 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
994 volatile Throwable pendingError; // holds until onError issued
995 volatile Thread waiter; // blocked producer thread
996 T putItem; // for offer within ManagedBlocker
997 BufferedSubscription<T> next; // used only by publisher
998 BufferedSubscription<T> nextRetry; // used only by publisher
999
1000 // ctl values
1001 static final int ACTIVE = 0x01; // consumer task active
1002 static final int CONSUME = 0x02; // keep-alive for consumer task
1003 static final int DISABLED = 0x04; // final state
1004 static final int ERROR = 0x08; // signal onError then disable
1005 static final int SUBSCRIBE = 0x10; // signal onSubscribe
1006 static final int COMPLETE = 0x20; // signal onComplete when done
1007
1008 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
1009
1010 /**
1011 * Initial buffer capacity used when maxBufferCapacity is
1012 * greater. Must be a power of two.
1013 */
1014 static final int DEFAULT_INITIAL_CAP = 32;
1015
1016 BufferedSubscription(Flow.Subscriber<? super T> subscriber,
1017 Executor executor,
1018 BiConsumer<? super Flow.Subscriber<? super T>,
1019 ? super Throwable> onNextHandler,
1020 int maxBufferCapacity) {
1021 this.subscriber = subscriber;
1022 this.executor = executor;
1023 this.onNextHandler = onNextHandler;
1024 this.maxCapacity = maxBufferCapacity;
1025 this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
1026 (maxBufferCapacity < 2 ? // at least 2 slots
1027 2 : maxBufferCapacity) :
1028 DEFAULT_INITIAL_CAP];
1029 }
1030
1031 final boolean isDisabled() {
1032 return ctl == DISABLED;
1033 }
1034
1035 /**
1036 * Returns estimated number of buffered items, or -1 if
1037 * disabled.
1038 */
1039 final int estimateLag() {
1040 int n;
1041 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
1042 }
1043
1044 /**
1045 * Tries to add item and start consumer task if necessary.
1046 * @return -1 if disabled, 0 if dropped, else estimated lag
1047 */
1048 final int offer(T item) {
1049 int h = head, t = tail, cap, size, stat;
1050 Object[] a = array;
1051 if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
1052 a[(cap - 1) & t] = item; // relaxed writes OK
1053 tail = t + 1;
1054 stat = size;
1055 }
1056 else
1057 stat = growAndAdd(a, item);
1058 return (stat > 0 &&
1059 (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
1060 startOnOffer(stat) : stat;
1061 }
1062
1063 /**
1064 * Tries to create or expand buffer, then adds item if possible.
1065 */
1066 private int growAndAdd(Object[] a, T item) {
1067 boolean alloc;
1068 int cap, stat;
1069 if ((ctl & (ERROR | DISABLED)) != 0) {
1070 cap = 0;
1071 stat = -1;
1072 alloc = false;
1073 }
1074 else if (a == null || (cap = a.length) <= 0) {
1075 cap = 0;
1076 stat = 1;
1077 alloc = true;
1078 }
1079 else {
1080 U.fullFence(); // recheck
1081 int h = head, t = tail, size = t + 1 - h;
1082 if (cap >= size) {
1083 a[(cap - 1) & t] = item;
1084 tail = t + 1;
1085 stat = size;
1086 alloc = false;
1087 }
1088 else if (cap >= maxCapacity) {
1089 stat = 0; // cannot grow
1090 alloc = false;
1091 }
1092 else {
1093 stat = cap + 1;
1094 alloc = true;
1095 }
1096 }
1097 if (alloc) {
1098 int newCap = (cap > 0) ? cap << 1 : 1;
1099 if (newCap <= cap)
1100 stat = 0;
1101 else {
1102 Object[] newArray = null;
1103 try {
1104 newArray = new Object[newCap];
1105 } catch (Throwable ex) { // try to cope with OOME
1106 }
1107 if (newArray == null) {
1108 if (cap > 0)
1109 maxCapacity = cap; // avoid continuous failure
1110 stat = 0;
1111 }
1112 else {
1113 array = newArray;
1114 int t = tail;
1115 int newMask = newCap - 1;
1116 if (a != null && cap > 0) {
1117 int mask = cap - 1;
1118 for (int j = head; j != t; ++j) {
1119 long k = ((long)(j & mask) << ASHIFT) + ABASE;
1120 Object x = U.getObjectVolatile(a, k);
1121 if (x != null && // races with consumer
1122 U.compareAndSwapObject(a, k, x, null))
1123 newArray[j & newMask] = x;
1124 }
1125 }
1126 newArray[t & newMask] = item;
1127 tail = t + 1;
1128 }
1129 }
1130 }
1131 return stat;
1132 }
1133
1134 /**
1135 * Spins/helps/blocks while offer returns 0. Called only if
1136 * initial offer return 0.
1137 */
1138 final int submit(T item) {
1139 int stat; Executor e; ForkJoinWorkerThread w;
1140 if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1141 ((e = executor) instanceof ForkJoinPool)) {
1142 helpDepth = 1;
1143 Thread thread = Thread.currentThread();
1144 if ((thread instanceof ForkJoinWorkerThread) &&
1145 ((w = (ForkJoinWorkerThread)thread)).getPool() == e)
1146 stat = internalHelpConsume(w.workQueue, item);
1147 else if (e == ForkJoinPool.commonPool())
1148 stat = externalHelpConsume
1149 (ForkJoinPool.commonSubmitterQueue(), item);
1150 helpDepth = 0;
1151 }
1152 if (stat == 0 && (stat = offer(item)) == 0) {
1153 putItem = item;
1154 timeout = 0L;
1155 try {
1156 ForkJoinPool.managedBlock(this);
1157 } catch (InterruptedException ie) {
1158 timeout = INTERRUPTED;
1159 }
1160 stat = putStat;
1161 if (timeout < 0L)
1162 Thread.currentThread().interrupt();
1163 }
1164 return stat;
1165 }
1166
1167 /**
1168 * Tries helping for FJ submitter.
1169 */
1170 private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1171 int stat = 0;
1172 if (w != null) {
1173 ForkJoinTask<?> t;
1174 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1175 if ((stat = offer(item)) != 0 || !w.tryUnpush(t))
1176 break;
1177 ((ConsumerTask<?>)t).consumer.consume();
1178 }
1179 }
1180 return stat;
1181 }
1182
1183 /**
1184 * Tries helping for non-FJ submitter.
1185 */
1186 private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1187 int stat = 0;
1188 if (w != null) {
1189 ForkJoinTask<?> t;
1190 while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1191 if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t))
1192 break;
1193 ((ConsumerTask<?>)t).consumer.consume();
1194 }
1195 }
1196 return stat;
1197 }
1198
1199 /**
1200 * Timeout version; similar to submit.
1201 */
1202 final int timedOffer(T item, long nanos) {
1203 int stat; Executor e;
1204 if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1205 ((e = executor) instanceof ForkJoinPool)) {
1206 Thread thread = Thread.currentThread();
1207 if (((thread instanceof ForkJoinWorkerThread) &&
1208 ((ForkJoinWorkerThread)thread).getPool() == e) ||
1209 e == ForkJoinPool.commonPool()) {
1210 helpDepth = 1;
1211 ForkJoinTask<?> t;
1212 long deadline = System.nanoTime() + nanos;
1213 while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
1214 (t instanceof ConsumerTask)) {
1215 if ((stat = offer(item)) != 0 ||
1216 (nanos = deadline - System.nanoTime()) <= 0L ||
1217 !t.tryUnfork())
1218 break;
1219 ((ConsumerTask<?>)t).consumer.consume();
1220 }
1221 helpDepth = 0;
1222 }
1223 }
1224 if (stat == 0 && (stat = offer(item)) == 0 &&
1225 (timeout = nanos) > 0L) {
1226 putItem = item;
1227 try {
1228 ForkJoinPool.managedBlock(this);
1229 } catch (InterruptedException ie) {
1230 timeout = INTERRUPTED;
1231 }
1232 stat = putStat;
1233 if (timeout < 0L)
1234 Thread.currentThread().interrupt();
1235 }
1236 return stat;
1237 }
1238
1239 /**
1240 * Tries to start consumer task after offer.
1241 * @return -1 if now disabled, else argument
1242 */
1243 private int startOnOffer(int stat) {
1244 for (;;) {
1245 Executor e; int c;
1246 if ((c = ctl) == DISABLED || (e = executor) == null) {
1247 stat = -1;
1248 break;
1249 }
1250 else if ((c & ACTIVE) != 0) { // ensure keep-alive
1251 if ((c & CONSUME) != 0 ||
1252 U.compareAndSwapInt(this, CTL, c,
1253 c | CONSUME))
1254 break;
1255 }
1256 else if (demand == 0L || tail == head)
1257 break;
1258 else if (U.compareAndSwapInt(this, CTL, c,
1259 c | (ACTIVE | CONSUME))) {
1260 try {
1261 e.execute(new ConsumerTask<T>(this));
1262 break;
1263 } catch (RuntimeException | Error ex) { // back out
1264 do {} while (((c = ctl) & DISABLED) == 0 &&
1265 (c & ACTIVE) != 0 &&
1266 !U.compareAndSwapInt(this, CTL, c,
1267 c & ~ACTIVE));
1268 throw ex;
1269 }
1270 }
1271 }
1272 return stat;
1273 }
1274
1275 private void signalWaiter(Thread w) {
1276 waiter = null;
1277 LockSupport.unpark(w); // release producer
1278 }
1279
1280 /**
1281 * Nulls out most fields, mainly to avoid garbage retention
1282 * until publisher unsubscribes, but also to help cleanly stop
1283 * upon error by nulling required components.
1284 */
1285 private void detach() {
1286 Thread w = waiter;
1287 executor = null;
1288 subscriber = null;
1289 pendingError = null;
1290 signalWaiter(w);
1291 }
1292
1293 /**
1294 * Issues error signal, asynchronously if a task is running,
1295 * else synchronously.
1296 */
1297 final void onError(Throwable ex) {
1298 for (int c;;) {
1299 if (((c = ctl) & (ERROR | DISABLED)) != 0)
1300 break;
1301 else if ((c & ACTIVE) != 0) {
1302 pendingError = ex;
1303 if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1304 break; // cause consumer task to exit
1305 }
1306 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1307 Flow.Subscriber<? super T> s = subscriber;
1308 if (s != null && ex != null) {
1309 try {
1310 s.onError(ex);
1311 } catch (Throwable ignore) {
1312 }
1313 }
1314 detach();
1315 break;
1316 }
1317 }
1318 }
1319
1320 /**
1321 * Tries to start consumer task upon a signal or request;
1322 * disables on failure.
1323 */
1324 private void startOrDisable() {
1325 Executor e;
1326 if ((e = executor) != null) { // skip if already disabled
1327 try {
1328 e.execute(new ConsumerTask<T>(this));
1329 } catch (Throwable ex) { // back out and force signal
1330 for (int c;;) {
1331 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1332 break;
1333 if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
1334 onError(ex);
1335 break;
1336 }
1337 }
1338 }
1339 }
1340 }
1341
1342 final void onComplete() {
1343 for (int c;;) {
1344 if ((c = ctl) == DISABLED)
1345 break;
1346 if (U.compareAndSwapInt(this, CTL, c,
1347 c | (ACTIVE | CONSUME | COMPLETE))) {
1348 if ((c & ACTIVE) == 0)
1349 startOrDisable();
1350 break;
1351 }
1352 }
1353 }
1354
1355 final void onSubscribe() {
1356 for (int c;;) {
1357 if ((c = ctl) == DISABLED)
1358 break;
1359 if (U.compareAndSwapInt(this, CTL, c,
1360 c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1361 if ((c & ACTIVE) == 0)
1362 startOrDisable();
1363 break;
1364 }
1365 }
1366 }
1367
1368 /**
1369 * Causes consumer task to exit if active (without reporting
1370 * onError unless there is already a pending error), and
1371 * disables.
1372 */
1373 public void cancel() {
1374 for (int c;;) {
1375 if ((c = ctl) == DISABLED)
1376 break;
1377 else if ((c & ACTIVE) != 0) {
1378 if (U.compareAndSwapInt(this, CTL, c,
1379 c | (CONSUME | ERROR)))
1380 break;
1381 }
1382 else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1383 detach();
1384 break;
1385 }
1386 }
1387 }
1388
1389 /**
1390 * Adds to demand and possibly starts task.
1391 */
1392 public void request(long n) {
1393 if (n > 0L) {
1394 for (;;) {
1395 long prev = demand, d;
1396 if ((d = prev + n) < prev) // saturate
1397 d = Long.MAX_VALUE;
1398 if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
1399 for (int c, h;;) {
1400 if ((c = ctl) == DISABLED)
1401 break;
1402 else if ((c & ACTIVE) != 0) {
1403 if ((c & CONSUME) != 0 ||
1404 U.compareAndSwapInt(this, CTL, c,
1405 c | CONSUME))
1406 break;
1407 }
1408 else if ((h = head) != tail) {
1409 if (U.compareAndSwapInt(this, CTL, c,
1410 c | (ACTIVE|CONSUME))) {
1411 startOrDisable();
1412 break;
1413 }
1414 }
1415 else if (head == h && tail == h)
1416 break; // else stale
1417 if (demand == 0L)
1418 break;
1419 }
1420 break;
1421 }
1422 }
1423 }
1424 else if (n < 0L)
1425 onError(new IllegalArgumentException(
1426 "negative subscription request"));
1427 }
1428
1429 public final boolean isReleasable() { // for ManagedBlocker
1430 T item = putItem;
1431 if (item != null) {
1432 if ((putStat = offer(item)) == 0)
1433 return false;
1434 putItem = null;
1435 }
1436 return true;
1437 }
1438
1439 public final boolean block() { // for ManagedBlocker
1440 T item = putItem;
1441 if (item != null) {
1442 putItem = null;
1443 long nanos = timeout;
1444 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1445 while ((putStat = offer(item)) == 0) {
1446 if (Thread.interrupted()) {
1447 timeout = INTERRUPTED;
1448 if (nanos > 0L)
1449 break;
1450 }
1451 else if (nanos > 0L &&
1452 (nanos = deadline - System.nanoTime()) <= 0L)
1453 break;
1454 else if (waiter == null)
1455 waiter = Thread.currentThread();
1456 else {
1457 if (nanos > 0L)
1458 LockSupport.parkNanos(this, nanos);
1459 else
1460 LockSupport.park(this);
1461 waiter = null;
1462 }
1463 }
1464 }
1465 waiter = null;
1466 return true;
1467 }
1468
1469 /**
1470 * Consumer loop, called from ConsumerTask, or indirectly
1471 * when helping during submit.
1472 */
1473 final void consume() {
1474 Flow.Subscriber<? super T> s;
1475 int h = head;
1476 if ((s = subscriber) != null) { // else disabled
1477 for (;;) {
1478 long d = demand;
1479 int c; Object[] a; int n; long i; Object x; Thread w;
1480 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1481 if (!checkControl(s, c))
1482 break;
1483 }
1484 else if ((a = array) == null || h == tail ||
1485 (n = a.length) == 0 ||
1486 (x = U.getObjectVolatile
1487 (a, (i = ((long)((n - 1) & h) << ASHIFT) + ABASE)))
1488 == null) {
1489 if (!checkEmpty(s, c))
1490 break;
1491 }
1492 else if (d == 0L) {
1493 if (!checkDemand(c))
1494 break;
1495 }
1496 else if (((c & CONSUME) != 0 ||
1497 U.compareAndSwapInt(this, CTL, c, c | CONSUME)) &&
1498 U.compareAndSwapObject(a, i, x, null)) {
1499 U.putOrderedInt(this, HEAD, ++h);
1500 U.getAndAddLong(this, DEMAND, -1L);
1501 if ((w = waiter) != null)
1502 signalWaiter(w);
1503 try {
1504 @SuppressWarnings("unchecked") T y = (T) x;
1505 s.onNext(y);
1506 } catch (Throwable ex) {
1507 handleOnNext(s, ex);
1508 }
1509 }
1510 }
1511 }
1512 }
1513
1514 /**
1515 * Responds to control events in consume().
1516 */
1517 private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1518 boolean stat = true;
1519 if ((c & ERROR) != 0) {
1520 Throwable ex = pendingError;
1521 ctl = DISABLED; // no need for CAS
1522 if (ex != null) { // null if errorless cancel
1523 try {
1524 if (s != null)
1525 s.onError(ex);
1526 } catch (Throwable ignore) {
1527 }
1528 }
1529 }
1530 else if ((c & SUBSCRIBE) != 0) {
1531 if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) {
1532 try {
1533 if (s != null)
1534 s.onSubscribe(this);
1535 } catch (Throwable ex) {
1536 onError(ex);
1537 }
1538 }
1539 }
1540 else {
1541 detach();
1542 stat = false;
1543 }
1544 return stat;
1545 }
1546
1547 /**
1548 * Responds to apparent emptiness in consume().
1549 */
1550 private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1551 boolean stat = true;
1552 if (head == tail) {
1553 if ((c & CONSUME) != 0)
1554 U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1555 else if ((c & COMPLETE) != 0) {
1556 if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1557 try {
1558 if (s != null)
1559 s.onComplete();
1560 } catch (Throwable ignore) {
1561 }
1562 }
1563 }
1564 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1565 stat = false;
1566 }
1567 return stat;
1568 }
1569
1570 /**
1571 * Responds to apparent zero demand in consume().
1572 */
1573 private boolean checkDemand(int c) {
1574 boolean stat = true;
1575 if (demand == 0L) {
1576 if ((c & CONSUME) != 0)
1577 U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1578 else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
1579 stat = false;
1580 }
1581 return stat;
1582 }
1583
1584 /**
1585 * Processes exception in Subscriber.onNext.
1586 */
1587 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1588 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1589 if ((h = onNextHandler) != null) {
1590 try {
1591 h.accept(s, ex);
1592 } catch (Throwable ignore) {
1593 }
1594 }
1595 onError(ex);
1596 }
1597
1598 // Unsafe mechanics
1599 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1600 private static final long CTL;
1601 private static final long TAIL;
1602 private static final long HEAD;
1603 private static final long DEMAND;
1604 private static final int ABASE;
1605 private static final int ASHIFT;
1606
1607 static {
1608 try {
1609 CTL = U.objectFieldOffset
1610 (BufferedSubscription.class.getDeclaredField("ctl"));
1611 TAIL = U.objectFieldOffset
1612 (BufferedSubscription.class.getDeclaredField("tail"));
1613 HEAD = U.objectFieldOffset
1614 (BufferedSubscription.class.getDeclaredField("head"));
1615 DEMAND = U.objectFieldOffset
1616 (BufferedSubscription.class.getDeclaredField("demand"));
1617
1618 ABASE = U.arrayBaseOffset(Object[].class);
1619 int scale = U.arrayIndexScale(Object[].class);
1620 if ((scale & (scale - 1)) != 0)
1621 throw new Error("data type scale not a power of two");
1622 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
1623 } catch (ReflectiveOperationException e) {
1624 throw new Error(e);
1625 }
1626
1627 // Reduce the risk of rare disastrous classloading in first call to
1628 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1629 Class<?> ensureLoaded = LockSupport.class;
1630 }
1631 }
1632 }
--- EOF ---