< prev index next >
src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java
Print this page
8234131: Miscellaneous changes imported from jsr166 CVS 2020-12
Reviewed-by: martin
*** 39,48 ****
--- 39,49 ----
import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
+ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import static java.util.concurrent.Flow.Publisher;
import static java.util.concurrent.Flow.Subscriber;
*** 173,187 ****
public class SubmissionPublisher<T> implements Publisher<T>,
AutoCloseable {
/*
* Most mechanics are handled by BufferedSubscription. This class
* mainly tracks subscribers and ensures sequentiality, by using
! * built-in synchronization locks across public methods. Using
! * built-in locks works well in the most typical case in which
! * only one thread submits items. We extend this idea in
! * submission methods by detecting single-ownership to reduce
! * producer-consumer synchronization strength.
*/
/** The largest possible power of two array size. */
static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
--- 174,188 ----
public class SubmissionPublisher<T> implements Publisher<T>,
AutoCloseable {
/*
* Most mechanics are handled by BufferedSubscription. This class
* mainly tracks subscribers and ensures sequentiality, by using
! * locks across public methods, to ensure thread-safety in the
! * presence of multiple sources and maintain acquire-release
! * ordering around user operations. However, we also track whether
! * there is only a single source, and if so streamline some buffer
! * operations by avoiding some atomics.
*/
/** The largest possible power of two array size. */
static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
*** 232,241 ****
--- 233,244 ----
* saturated ones in retries list (using nextRetry field), and
* retry, possibly blocking or dropping.
*/
BufferedSubscription<T> clients;
+ /** Lock for exclusion across multiple sources */
+ final ReentrantLock lock;
/** Run status, updated only within locks */
volatile boolean closed;
/** Set true on first call to subscribe, to initialize possible owner */
boolean subscribed;
/** The first caller thread to subscribe, or null if thread ever changed */
*** 272,281 ****
--- 275,285 ----
BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
if (executor == null)
throw new NullPointerException();
if (maxBufferCapacity <= 0)
throw new IllegalArgumentException("capacity must be positive");
+ this.lock = new ReentrantLock();
this.executor = executor;
this.onNextHandler = handler;
this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
}
*** 335,351 ****
* @param subscriber the subscriber
* @throws NullPointerException if subscriber is null
*/
public void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
int max = maxBufferCapacity; // allocate initial array
Object[] array = new Object[max < INITIAL_CAPACITY ?
max : INITIAL_CAPACITY];
BufferedSubscription<T> subscription =
new BufferedSubscription<T>(subscriber, executor, onNextHandler,
array, max);
! synchronized (this) {
if (!subscribed) {
subscribed = true;
owner = Thread.currentThread();
}
for (BufferedSubscription<T> b = clients, pred = null;;) {
--- 339,357 ----
* @param subscriber the subscriber
* @throws NullPointerException if subscriber is null
*/
public void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
+ ReentrantLock lock = this.lock;
int max = maxBufferCapacity; // allocate initial array
Object[] array = new Object[max < INITIAL_CAPACITY ?
max : INITIAL_CAPACITY];
BufferedSubscription<T> subscription =
new BufferedSubscription<T>(subscriber, executor, onNextHandler,
array, max);
! lock.lock();
! try {
if (!subscribed) {
subscribed = true;
owner = Thread.currentThread();
}
for (BufferedSubscription<T> b = clients, pred = null;;) {
*** 376,385 ****
--- 382,393 ----
}
else
pred = b;
b = next;
}
+ } finally {
+ lock.unlock();
}
}
/**
* Common implementation for all three forms of submit and offer.
*** 388,398 ****
private int doOffer(T item, long nanos,
BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
if (item == null) throw new NullPointerException();
int lag = 0;
boolean complete, unowned;
! synchronized (this) {
Thread t = Thread.currentThread(), o;
BufferedSubscription<T> b = clients;
if ((unowned = ((o = owner) != t)) && o != null)
owner = null; // disable bias
if (b == null)
--- 396,408 ----
private int doOffer(T item, long nanos,
BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
if (item == null) throw new NullPointerException();
int lag = 0;
boolean complete, unowned;
! ReentrantLock lock = this.lock;
! lock.lock();
! try {
Thread t = Thread.currentThread(), o;
BufferedSubscription<T> b = clients;
if ((unowned = ((o = owner) != t)) && o != null)
owner = null; // disable bias
if (b == null)
*** 419,428 ****
--- 429,440 ----
} while ((b = next) != null);
if (retries != null || cleanMe)
lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
}
+ } finally {
+ lock.unlock();
}
if (complete)
throw new IllegalStateException("Closed");
else
return lag;
*** 607,624 ****
* subscribers, and disallows subsequent attempts to publish.
* Upon return, this method does <em>NOT</em> guarantee that all
* subscribers have yet completed.
*/
public void close() {
if (!closed) {
BufferedSubscription<T> b;
! synchronized (this) {
// no need to re-check closed here
b = clients;
clients = null;
owner = null;
closed = true;
}
while (b != null) {
BufferedSubscription<T> next = b.next;
b.next = null;
b.onComplete();
--- 619,640 ----
* subscribers, and disallows subsequent attempts to publish.
* Upon return, this method does <em>NOT</em> guarantee that all
* subscribers have yet completed.
*/
public void close() {
+ ReentrantLock lock = this.lock;
if (!closed) {
BufferedSubscription<T> b;
! lock.lock();
! try {
// no need to re-check closed here
b = clients;
clients = null;
owner = null;
closed = true;
+ } finally {
+ lock.unlock();
}
while (b != null) {
BufferedSubscription<T> next = b.next;
b.next = null;
b.onComplete();
*** 639,658 ****
* @throws NullPointerException if error is null
*/
public void closeExceptionally(Throwable error) {
if (error == null)
throw new NullPointerException();
if (!closed) {
BufferedSubscription<T> b;
! synchronized (this) {
b = clients;
if (!closed) { // don't clobber racing close
closedException = error;
clients = null;
owner = null;
closed = true;
}
}
while (b != null) {
BufferedSubscription<T> next = b.next;
b.next = null;
b.onError(error);
--- 655,678 ----
* @throws NullPointerException if error is null
*/
public void closeExceptionally(Throwable error) {
if (error == null)
throw new NullPointerException();
+ ReentrantLock lock = this.lock;
if (!closed) {
BufferedSubscription<T> b;
! lock.lock();
! try {
b = clients;
if (!closed) { // don't clobber racing close
closedException = error;
clients = null;
owner = null;
closed = true;
}
+ } finally {
+ lock.unlock();
}
while (b != null) {
BufferedSubscription<T> next = b.next;
b.next = null;
b.onError(error);
*** 686,696 ****
*
* @return true if this publisher has any subscribers
*/
public boolean hasSubscribers() {
boolean nonEmpty = false;
! synchronized (this) {
for (BufferedSubscription<T> b = clients; b != null;) {
BufferedSubscription<T> next = b.next;
if (b.isClosed()) {
b.next = null;
b = clients = next;
--- 706,718 ----
*
* @return true if this publisher has any subscribers
*/
public boolean hasSubscribers() {
boolean nonEmpty = false;
! ReentrantLock lock = this.lock;
! lock.lock();
! try {
for (BufferedSubscription<T> b = clients; b != null;) {
BufferedSubscription<T> next = b.next;
if (b.isClosed()) {
b.next = null;
b = clients = next;
*** 698,720 ****
else {
nonEmpty = true;
break;
}
}
}
return nonEmpty;
}
/**
* Returns the number of current subscribers.
*
* @return the number of current subscribers
*/
public int getNumberOfSubscribers() {
! synchronized (this) {
! return cleanAndCount();
}
}
/**
* Returns the Executor used for asynchronous delivery.
*
--- 720,750 ----
else {
nonEmpty = true;
break;
}
}
+ } finally {
+ lock.unlock();
}
return nonEmpty;
}
/**
* Returns the number of current subscribers.
*
* @return the number of current subscribers
*/
public int getNumberOfSubscribers() {
! int n;
! ReentrantLock lock = this.lock;
! lock.lock();
! try {
! n = cleanAndCount();
! } finally {
! lock.unlock();
}
+ return n;
}
/**
* Returns the Executor used for asynchronous delivery.
*
*** 740,750 ****
*
* @return list of current subscribers
*/
public List<Subscriber<? super T>> getSubscribers() {
ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
! synchronized (this) {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if (b.isClosed()) {
b.next = null;
--- 770,782 ----
*
* @return list of current subscribers
*/
public List<Subscriber<? super T>> getSubscribers() {
ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
! ReentrantLock lock = this.lock;
! lock.lock();
! try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if (b.isClosed()) {
b.next = null;
*** 756,765 ****
--- 788,799 ----
else {
subs.add(b.subscriber);
pred = b;
}
}
+ } finally {
+ lock.unlock();
}
return subs;
}
/**
*** 769,798 ****
* @return true if currently subscribed
* @throws NullPointerException if subscriber is null
*/
public boolean isSubscribed(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
if (!closed) {
! synchronized (this) {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if (b.isClosed()) {
b.next = null;
if (pred == null)
clients = next;
else
pred.next = next;
}
! else if (subscriber.equals(b.subscriber))
! return true;
else
pred = b;
}
}
}
! return false;
}
/**
* Returns an estimate of the minimum number of items requested
* (via {@link Flow.Subscription#request(long) request}) but not
--- 803,837 ----
* @return true if currently subscribed
* @throws NullPointerException if subscriber is null
*/
public boolean isSubscribed(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
+ boolean subscribed = false;
+ ReentrantLock lock = this.lock;
if (!closed) {
! lock.lock();
! try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
if (b.isClosed()) {
b.next = null;
if (pred == null)
clients = next;
else
pred.next = next;
}
! else if (subscribed = subscriber.equals(b.subscriber))
! break;
else
pred = b;
}
+ } finally {
+ lock.unlock();
}
}
! return subscribed;
}
/**
* Returns an estimate of the minimum number of items requested
* (via {@link Flow.Subscription#request(long) request}) but not
*** 801,811 ****
* @return the estimate, or zero if no subscribers
*/
public long estimateMinimumDemand() {
long min = Long.MAX_VALUE;
boolean nonEmpty = false;
! synchronized (this) {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
int n; long d;
next = b.next;
if ((n = b.estimateLag()) < 0) {
--- 840,852 ----
* @return the estimate, or zero if no subscribers
*/
public long estimateMinimumDemand() {
long min = Long.MAX_VALUE;
boolean nonEmpty = false;
! ReentrantLock lock = this.lock;
! lock.lock();
! try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
int n; long d;
next = b.next;
if ((n = b.estimateLag()) < 0) {
*** 820,829 ****
--- 861,872 ----
min = d;
nonEmpty = true;
pred = b;
}
}
+ } finally {
+ lock.unlock();
}
return nonEmpty ? min : 0;
}
/**
*** 832,842 ****
*
* @return the estimate
*/
public int estimateMaximumLag() {
int max = 0;
! synchronized (this) {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
int n;
next = b.next;
if ((n = b.estimateLag()) < 0) {
--- 875,887 ----
*
* @return the estimate
*/
public int estimateMaximumLag() {
int max = 0;
! ReentrantLock lock = this.lock;
! lock.lock();
! try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
int n;
next = b.next;
if ((n = b.estimateLag()) < 0) {
*** 850,859 ****
--- 895,906 ----
if (n > max)
max = n;
pred = b;
}
}
+ } finally {
+ lock.unlock();
}
return max;
}
/**
< prev index next >