< prev index next >

src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java

Print this page
8234131: Miscellaneous changes imported from jsr166 CVS 2020-12
Reviewed-by: martin

*** 39,48 **** --- 39,49 ---- import java.lang.invoke.VarHandle; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.locks.LockSupport; + import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.BiPredicate; import java.util.function.Consumer; import static java.util.concurrent.Flow.Publisher; import static java.util.concurrent.Flow.Subscriber;
*** 173,187 **** public class SubmissionPublisher<T> implements Publisher<T>, AutoCloseable { /* * Most mechanics are handled by BufferedSubscription. This class * mainly tracks subscribers and ensures sequentiality, by using ! * built-in synchronization locks across public methods. Using ! * built-in locks works well in the most typical case in which ! * only one thread submits items. We extend this idea in ! * submission methods by detecting single-ownership to reduce ! * producer-consumer synchronization strength. */ /** The largest possible power of two array size. */ static final int BUFFER_CAPACITY_LIMIT = 1 << 30; --- 174,188 ---- public class SubmissionPublisher<T> implements Publisher<T>, AutoCloseable { /* * Most mechanics are handled by BufferedSubscription. This class * mainly tracks subscribers and ensures sequentiality, by using ! * locks across public methods, to ensure thread-safety in the ! * presence of multiple sources and maintain acquire-release ! * ordering around user operations. However, we also track whether ! * there is only a single source, and if so streamline some buffer ! * operations by avoiding some atomics. */ /** The largest possible power of two array size. */ static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
*** 232,241 **** --- 233,244 ---- * saturated ones in retries list (using nextRetry field), and * retry, possibly blocking or dropping. */ BufferedSubscription<T> clients; + /** Lock for exclusion across multiple sources */ + final ReentrantLock lock; /** Run status, updated only within locks */ volatile boolean closed; /** Set true on first call to subscribe, to initialize possible owner */ boolean subscribed; /** The first caller thread to subscribe, or null if thread ever changed */
*** 272,281 **** --- 275,285 ---- BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) { if (executor == null) throw new NullPointerException(); if (maxBufferCapacity <= 0) throw new IllegalArgumentException("capacity must be positive"); + this.lock = new ReentrantLock(); this.executor = executor; this.onNextHandler = handler; this.maxBufferCapacity = roundCapacity(maxBufferCapacity); }
*** 335,351 **** * @param subscriber the subscriber * @throws NullPointerException if subscriber is null */ public void subscribe(Subscriber<? super T> subscriber) { if (subscriber == null) throw new NullPointerException(); int max = maxBufferCapacity; // allocate initial array Object[] array = new Object[max < INITIAL_CAPACITY ? max : INITIAL_CAPACITY]; BufferedSubscription<T> subscription = new BufferedSubscription<T>(subscriber, executor, onNextHandler, array, max); ! synchronized (this) { if (!subscribed) { subscribed = true; owner = Thread.currentThread(); } for (BufferedSubscription<T> b = clients, pred = null;;) { --- 339,357 ---- * @param subscriber the subscriber * @throws NullPointerException if subscriber is null */ public void subscribe(Subscriber<? super T> subscriber) { if (subscriber == null) throw new NullPointerException(); + ReentrantLock lock = this.lock; int max = maxBufferCapacity; // allocate initial array Object[] array = new Object[max < INITIAL_CAPACITY ? max : INITIAL_CAPACITY]; BufferedSubscription<T> subscription = new BufferedSubscription<T>(subscriber, executor, onNextHandler, array, max); ! lock.lock(); ! try { if (!subscribed) { subscribed = true; owner = Thread.currentThread(); } for (BufferedSubscription<T> b = clients, pred = null;;) {
*** 376,385 **** --- 382,393 ---- } else pred = b; b = next; } + } finally { + lock.unlock(); } } /** * Common implementation for all three forms of submit and offer.
*** 388,398 **** private int doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop) { if (item == null) throw new NullPointerException(); int lag = 0; boolean complete, unowned; ! synchronized (this) { Thread t = Thread.currentThread(), o; BufferedSubscription<T> b = clients; if ((unowned = ((o = owner) != t)) && o != null) owner = null; // disable bias if (b == null) --- 396,408 ---- private int doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop) { if (item == null) throw new NullPointerException(); int lag = 0; boolean complete, unowned; ! ReentrantLock lock = this.lock; ! lock.lock(); ! try { Thread t = Thread.currentThread(), o; BufferedSubscription<T> b = clients; if ((unowned = ((o = owner) != t)) && o != null) owner = null; // disable bias if (b == null)
*** 419,428 **** --- 429,440 ---- } while ((b = next) != null); if (retries != null || cleanMe) lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe); } + } finally { + lock.unlock(); } if (complete) throw new IllegalStateException("Closed"); else return lag;
*** 607,624 **** * subscribers, and disallows subsequent attempts to publish. * Upon return, this method does <em>NOT</em> guarantee that all * subscribers have yet completed. */ public void close() { if (!closed) { BufferedSubscription<T> b; ! synchronized (this) { // no need to re-check closed here b = clients; clients = null; owner = null; closed = true; } while (b != null) { BufferedSubscription<T> next = b.next; b.next = null; b.onComplete(); --- 619,640 ---- * subscribers, and disallows subsequent attempts to publish. * Upon return, this method does <em>NOT</em> guarantee that all * subscribers have yet completed. */ public void close() { + ReentrantLock lock = this.lock; if (!closed) { BufferedSubscription<T> b; ! lock.lock(); ! try { // no need to re-check closed here b = clients; clients = null; owner = null; closed = true; + } finally { + lock.unlock(); } while (b != null) { BufferedSubscription<T> next = b.next; b.next = null; b.onComplete();
*** 639,658 **** * @throws NullPointerException if error is null */ public void closeExceptionally(Throwable error) { if (error == null) throw new NullPointerException(); if (!closed) { BufferedSubscription<T> b; ! synchronized (this) { b = clients; if (!closed) { // don't clobber racing close closedException = error; clients = null; owner = null; closed = true; } } while (b != null) { BufferedSubscription<T> next = b.next; b.next = null; b.onError(error); --- 655,678 ---- * @throws NullPointerException if error is null */ public void closeExceptionally(Throwable error) { if (error == null) throw new NullPointerException(); + ReentrantLock lock = this.lock; if (!closed) { BufferedSubscription<T> b; ! lock.lock(); ! try { b = clients; if (!closed) { // don't clobber racing close closedException = error; clients = null; owner = null; closed = true; } + } finally { + lock.unlock(); } while (b != null) { BufferedSubscription<T> next = b.next; b.next = null; b.onError(error);
*** 686,696 **** * * @return true if this publisher has any subscribers */ public boolean hasSubscribers() { boolean nonEmpty = false; ! synchronized (this) { for (BufferedSubscription<T> b = clients; b != null;) { BufferedSubscription<T> next = b.next; if (b.isClosed()) { b.next = null; b = clients = next; --- 706,718 ---- * * @return true if this publisher has any subscribers */ public boolean hasSubscribers() { boolean nonEmpty = false; ! ReentrantLock lock = this.lock; ! lock.lock(); ! try { for (BufferedSubscription<T> b = clients; b != null;) { BufferedSubscription<T> next = b.next; if (b.isClosed()) { b.next = null; b = clients = next;
*** 698,720 **** else { nonEmpty = true; break; } } } return nonEmpty; } /** * Returns the number of current subscribers. * * @return the number of current subscribers */ public int getNumberOfSubscribers() { ! synchronized (this) { ! return cleanAndCount(); } } /** * Returns the Executor used for asynchronous delivery. * --- 720,750 ---- else { nonEmpty = true; break; } } + } finally { + lock.unlock(); } return nonEmpty; } /** * Returns the number of current subscribers. * * @return the number of current subscribers */ public int getNumberOfSubscribers() { ! int n; ! ReentrantLock lock = this.lock; ! lock.lock(); ! try { ! n = cleanAndCount(); ! } finally { ! lock.unlock(); } + return n; } /** * Returns the Executor used for asynchronous delivery. *
*** 740,750 **** * * @return list of current subscribers */ public List<Subscriber<? super T>> getSubscribers() { ArrayList<Subscriber<? super T>> subs = new ArrayList<>(); ! synchronized (this) { BufferedSubscription<T> pred = null, next; for (BufferedSubscription<T> b = clients; b != null; b = next) { next = b.next; if (b.isClosed()) { b.next = null; --- 770,782 ---- * * @return list of current subscribers */ public List<Subscriber<? super T>> getSubscribers() { ArrayList<Subscriber<? super T>> subs = new ArrayList<>(); ! ReentrantLock lock = this.lock; ! lock.lock(); ! try { BufferedSubscription<T> pred = null, next; for (BufferedSubscription<T> b = clients; b != null; b = next) { next = b.next; if (b.isClosed()) { b.next = null;
*** 756,765 **** --- 788,799 ---- else { subs.add(b.subscriber); pred = b; } } + } finally { + lock.unlock(); } return subs; } /**
*** 769,798 **** * @return true if currently subscribed * @throws NullPointerException if subscriber is null */ public boolean isSubscribed(Subscriber<? super T> subscriber) { if (subscriber == null) throw new NullPointerException(); if (!closed) { ! synchronized (this) { BufferedSubscription<T> pred = null, next; for (BufferedSubscription<T> b = clients; b != null; b = next) { next = b.next; if (b.isClosed()) { b.next = null; if (pred == null) clients = next; else pred.next = next; } ! else if (subscriber.equals(b.subscriber)) ! return true; else pred = b; } } } ! return false; } /** * Returns an estimate of the minimum number of items requested * (via {@link Flow.Subscription#request(long) request}) but not --- 803,837 ---- * @return true if currently subscribed * @throws NullPointerException if subscriber is null */ public boolean isSubscribed(Subscriber<? super T> subscriber) { if (subscriber == null) throw new NullPointerException(); + boolean subscribed = false; + ReentrantLock lock = this.lock; if (!closed) { ! lock.lock(); ! try { BufferedSubscription<T> pred = null, next; for (BufferedSubscription<T> b = clients; b != null; b = next) { next = b.next; if (b.isClosed()) { b.next = null; if (pred == null) clients = next; else pred.next = next; } ! else if (subscribed = subscriber.equals(b.subscriber)) ! break; else pred = b; } + } finally { + lock.unlock(); } } ! return subscribed; } /** * Returns an estimate of the minimum number of items requested * (via {@link Flow.Subscription#request(long) request}) but not
*** 801,811 **** * @return the estimate, or zero if no subscribers */ public long estimateMinimumDemand() { long min = Long.MAX_VALUE; boolean nonEmpty = false; ! synchronized (this) { BufferedSubscription<T> pred = null, next; for (BufferedSubscription<T> b = clients; b != null; b = next) { int n; long d; next = b.next; if ((n = b.estimateLag()) < 0) { --- 840,852 ---- * @return the estimate, or zero if no subscribers */ public long estimateMinimumDemand() { long min = Long.MAX_VALUE; boolean nonEmpty = false; ! ReentrantLock lock = this.lock; ! lock.lock(); ! try { BufferedSubscription<T> pred = null, next; for (BufferedSubscription<T> b = clients; b != null; b = next) { int n; long d; next = b.next; if ((n = b.estimateLag()) < 0) {
*** 820,829 **** --- 861,872 ---- min = d; nonEmpty = true; pred = b; } } + } finally { + lock.unlock(); } return nonEmpty ? min : 0; } /**
*** 832,842 **** * * @return the estimate */ public int estimateMaximumLag() { int max = 0; ! synchronized (this) { BufferedSubscription<T> pred = null, next; for (BufferedSubscription<T> b = clients; b != null; b = next) { int n; next = b.next; if ((n = b.estimateLag()) < 0) { --- 875,887 ---- * * @return the estimate */ public int estimateMaximumLag() { int max = 0; ! ReentrantLock lock = this.lock; ! lock.lock(); ! try { BufferedSubscription<T> pred = null, next; for (BufferedSubscription<T> b = clients; b != null; b = next) { int n; next = b.next; if ((n = b.estimateLag()) < 0) {
*** 850,859 **** --- 895,906 ---- if (n > max) max = n; pred = b; } } + } finally { + lock.unlock(); } return max; } /**
< prev index next >