< prev index next >
src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java
Print this page
8234131: Miscellaneous changes imported from jsr166 CVS 2021-01
Reviewed-by: martin
@@ -39,10 +39,11 @@
import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import static java.util.concurrent.Flow.Publisher;
import static java.util.concurrent.Flow.Subscriber;
@@ -173,15 +174,15 @@
public class SubmissionPublisher<T> implements Publisher<T>,
AutoCloseable {
/*
* Most mechanics are handled by BufferedSubscription. This class
* mainly tracks subscribers and ensures sequentiality, by using
- * built-in synchronization locks across public methods. Using
- * built-in locks works well in the most typical case in which
- * only one thread submits items. We extend this idea in
- * submission methods by detecting single-ownership to reduce
- * producer-consumer synchronization strength.
+ * locks across public methods, to ensure thread-safety in the
+ * presence of multiple sources and maintain acquire-release
+ * ordering around user operations. However, we also track whether
+ * there is only a single source, and if so streamline some buffer
+ * operations by avoiding some atomics.
*/
/** The largest possible power of two array size. */
static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
@@ -232,10 +233,12 @@
* saturated ones in retries list (using nextRetry field), and
* retry, possibly blocking or dropping.
*/
BufferedSubscription<T> clients;
+ /** Lock for exclusion across multiple sources */
+ final ReentrantLock lock;
/** Run status, updated only within locks */
volatile boolean closed;
/** Set true on first call to subscribe, to initialize possible owner */
boolean subscribed;
/** The first caller thread to subscribe, or null if thread ever changed */
@@ -272,10 +275,11 @@
BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
if (executor == null)
throw new NullPointerException();
if (maxBufferCapacity <= 0)
throw new IllegalArgumentException("capacity must be positive");
+ this.lock = new ReentrantLock();
this.executor = executor;
this.onNextHandler = handler;
this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
}
@@ -335,17 +339,19 @@
* @param subscriber the subscriber
* @throws NullPointerException if subscriber is null
*/
public void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
+ ReentrantLock lock = this.lock;
int max = maxBufferCapacity; // allocate initial array
Object[] array = new Object[max < INITIAL_CAPACITY ?
max : INITIAL_CAPACITY];
BufferedSubscription<T> subscription =
new BufferedSubscription<T>(subscriber, executor, onNextHandler,
array, max);
- synchronized (this) {
+ lock.lock();
+ try {
if (!subscribed) {
subscribed = true;
owner = Thread.currentThread();
}
for (BufferedSubscription<T> b = clients, pred = null;;) {
@@ -376,10 +382,12 @@
}
else
pred = b;
b = next;
}
+ } finally {
+ lock.unlock();
}
}
/**
* Common implementation for all three forms of submit and offer.
@@ -388,11 +396,13 @@
private int doOffer(T item, long nanos,
BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
if (item == null) throw new NullPointerException();
int lag = 0;
boolean complete, unowned;
- synchronized (this) {
+ ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
Thread t = Thread.currentThread(), o;
BufferedSubscription<T> b = clients;
if ((unowned = ((o = owner) != t)) && o != null)
owner = null; // disable bias
if (b == null)
@@ -419,10 +429,12 @@
} while ((b = next) != null);
if (retries != null || cleanMe)
lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
}
+ } finally {
+ lock.unlock();
}
if (complete)
throw new IllegalStateException("Closed");
else
return lag;
@@ -607,18 +619,22 @@
* subscribers, and disallows subsequent attempts to publish.
* Upon return, this method does <em>NOT</em> guarantee that all
* subscribers have yet completed.
*/
public void close() {
+ ReentrantLock lock = this.lock;
if (!closed) {
BufferedSubscription<T> b;
- synchronized (this) {
+ lock.lock();
+ try {
// no need to re-check closed here
b = clients;
clients = null;
owner = null;
closed = true;
+ } finally {
+ lock.unlock();
}
while (b != null) {
BufferedSubscription<T> next = b.next;
b.next = null;
b.onComplete();
@@ -639,20 +655,24 @@
* @throws NullPointerException if error is null
*/
public void closeExceptionally(Throwable error) {
if (error == null)
throw new NullPointerException();
+ ReentrantLock lock = this.lock;
if (!closed) {
BufferedSubscription<T> b;
- synchronized (this) {
+ lock.lock();
+ try {
b = clients;
if (!closed) { // don't clobber racing close
closedException = error;
clients = null;
owner = null;
closed = true;
}
+ } finally {
+ lock.unlock();
}
while (b != null) {
BufferedSubscription<T> next = b.next;
b.next = null;
b.onError(error);
@@ -686,11 +706,13 @@
*
* @return true if this publisher has any subscribers
*/
public boolean hasSubscribers() {
boolean nonEmpty = false;
- synchronized (this) {
+ ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
for (BufferedSubscription<T> b = clients; b != null;) {
BufferedSubscription<T> next = b.next;
if (b.isClosed()) {
b.next = null;
b = clients = next;
@@ -698,23 +720,31 @@
else {
nonEmpty = true;
break;
}
}
+ } finally {
+ lock.unlock();
}
return nonEmpty;
}
/**
* Returns the number of current subscribers.
*
* @return the number of current subscribers
*/
public int getNumberOfSubscribers() {
- synchronized (this) {
- return cleanAndCount();
+ int n;
+ ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ n = cleanAndCount();
+ } finally {
+ lock.unlock();
}
+ return n;
}
/**
* Returns the Executor used for asynchronous delivery.
*
@@ -740,11 +770,13 @@
*
* @return list of current subscribers
*/
public List<Subscriber<? super T>> getSubscribers() {
ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
- synchronized (this) {
+ ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if (b.isClosed()) {
b.next = null;
@@ -756,10 +788,12 @@
else {
subs.add(b.subscriber);
pred = b;
}
}
+ } finally {
+ lock.unlock();
}
return subs;
}
/**
@@ -769,30 +803,35 @@
* @return true if currently subscribed
* @throws NullPointerException if subscriber is null
*/
public boolean isSubscribed(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
+ boolean subscribed = false;
+ ReentrantLock lock = this.lock;
if (!closed) {
- synchronized (this) {
+ lock.lock();
+ try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if (b.isClosed()) {
b.next = null;
if (pred == null)
clients = next;
else
pred.next = next;
}
- else if (subscriber.equals(b.subscriber))
- return true;
+ else if (subscribed = subscriber.equals(b.subscriber))
+ break;
else
pred = b;
}
+ } finally {
+ lock.unlock();
}
}
- return false;
+ return subscribed;
}
/**
* Returns an estimate of the minimum number of items requested
* (via {@link Flow.Subscription#request(long) request}) but not
@@ -801,11 +840,13 @@
* @return the estimate, or zero if no subscribers
*/
public long estimateMinimumDemand() {
long min = Long.MAX_VALUE;
boolean nonEmpty = false;
- synchronized (this) {
+ ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
int n; long d;
next = b.next;
if ((n = b.estimateLag()) < 0) {
@@ -820,10 +861,12 @@
min = d;
nonEmpty = true;
pred = b;
}
}
+ } finally {
+ lock.unlock();
}
return nonEmpty ? min : 0;
}
/**
@@ -832,11 +875,13 @@
*
* @return the estimate
*/
public int estimateMaximumLag() {
int max = 0;
- synchronized (this) {
+ ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
int n;
next = b.next;
if ((n = b.estimateLag()) < 0) {
@@ -850,10 +895,12 @@
if (n > max)
max = n;
pred = b;
}
}
+ } finally {
+ lock.unlock();
}
return max;
}
/**
< prev index next >