< prev index next >

src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java

Print this page
8234131: Miscellaneous changes imported from jsr166 CVS 2021-01
Reviewed-by: martin

@@ -39,10 +39,11 @@
 import java.lang.invoke.VarHandle;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import static java.util.concurrent.Flow.Publisher;
 import static java.util.concurrent.Flow.Subscriber;

@@ -173,15 +174,15 @@
 public class SubmissionPublisher<T> implements Publisher<T>,
                                                AutoCloseable {
     /*
      * Most mechanics are handled by BufferedSubscription. This class
      * mainly tracks subscribers and ensures sequentiality, by using
-     * built-in synchronization locks across public methods. Using
-     * built-in locks works well in the most typical case in which
-     * only one thread submits items. We extend this idea in
-     * submission methods by detecting single-ownership to reduce
-     * producer-consumer synchronization strength.
+     * locks across public methods, to ensure thread-safety in the
+     * presence of multiple sources and maintain acquire-release
+     * ordering around user operations. However, we also track whether
+     * there is only a single source, and if so streamline some buffer
+     * operations by avoiding some atomics.
      */
 
     /** The largest possible power of two array size. */
     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
 

@@ -232,10 +233,12 @@
      * saturated ones in retries list (using nextRetry field), and
      * retry, possibly blocking or dropping.
      */
     BufferedSubscription<T> clients;
 
+    /** Lock for exclusion across multiple sources */
+    final ReentrantLock lock;
     /** Run status, updated only within locks */
     volatile boolean closed;
     /** Set true on first call to subscribe, to initialize possible owner */
     boolean subscribed;
     /** The first caller thread to subscribe, or null if thread ever changed */

@@ -272,10 +275,11 @@
                                BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
         if (executor == null)
             throw new NullPointerException();
         if (maxBufferCapacity <= 0)
             throw new IllegalArgumentException("capacity must be positive");
+        this.lock = new ReentrantLock();
         this.executor = executor;
         this.onNextHandler = handler;
         this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
     }
 

@@ -335,17 +339,19 @@
      * @param subscriber the subscriber
      * @throws NullPointerException if subscriber is null
      */
     public void subscribe(Subscriber<? super T> subscriber) {
         if (subscriber == null) throw new NullPointerException();
+        ReentrantLock lock = this.lock;
         int max = maxBufferCapacity; // allocate initial array
         Object[] array = new Object[max < INITIAL_CAPACITY ?
                                     max : INITIAL_CAPACITY];
         BufferedSubscription<T> subscription =
             new BufferedSubscription<T>(subscriber, executor, onNextHandler,
                                         array, max);
-        synchronized (this) {
+        lock.lock();
+        try {
             if (!subscribed) {
                 subscribed = true;
                 owner = Thread.currentThread();
             }
             for (BufferedSubscription<T> b = clients, pred = null;;) {

@@ -376,10 +382,12 @@
                 }
                 else
                     pred = b;
                 b = next;
             }
+        } finally {
+            lock.unlock();
         }
     }
 
     /**
      * Common implementation for all three forms of submit and offer.

@@ -388,11 +396,13 @@
     private int doOffer(T item, long nanos,
                         BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
         if (item == null) throw new NullPointerException();
         int lag = 0;
         boolean complete, unowned;
-        synchronized (this) {
+        ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
             Thread t = Thread.currentThread(), o;
             BufferedSubscription<T> b = clients;
             if ((unowned = ((o = owner) != t)) && o != null)
                 owner = null;                     // disable bias
             if (b == null)

@@ -419,10 +429,12 @@
                 } while ((b = next) != null);
 
                 if (retries != null || cleanMe)
                     lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
             }
+        } finally {
+            lock.unlock();
         }
         if (complete)
             throw new IllegalStateException("Closed");
         else
             return lag;

@@ -607,18 +619,22 @@
      * subscribers, and disallows subsequent attempts to publish.
      * Upon return, this method does <em>NOT</em> guarantee that all
      * subscribers have yet completed.
      */
     public void close() {
+        ReentrantLock lock = this.lock;
         if (!closed) {
             BufferedSubscription<T> b;
-            synchronized (this) {
+            lock.lock();
+            try {
                 // no need to re-check closed here
                 b = clients;
                 clients = null;
                 owner = null;
                 closed = true;
+            } finally {
+                lock.unlock();
             }
             while (b != null) {
                 BufferedSubscription<T> next = b.next;
                 b.next = null;
                 b.onComplete();

@@ -639,20 +655,24 @@
      * @throws NullPointerException if error is null
      */
     public void closeExceptionally(Throwable error) {
         if (error == null)
             throw new NullPointerException();
+        ReentrantLock lock = this.lock;
         if (!closed) {
             BufferedSubscription<T> b;
-            synchronized (this) {
+            lock.lock();
+            try {
                 b = clients;
                 if (!closed) {  // don't clobber racing close
                     closedException = error;
                     clients = null;
                     owner = null;
                     closed = true;
                 }
+            } finally {
+                lock.unlock();
             }
             while (b != null) {
                 BufferedSubscription<T> next = b.next;
                 b.next = null;
                 b.onError(error);

@@ -686,11 +706,13 @@
      *
      * @return true if this publisher has any subscribers
      */
     public boolean hasSubscribers() {
         boolean nonEmpty = false;
-        synchronized (this) {
+        ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
             for (BufferedSubscription<T> b = clients; b != null;) {
                 BufferedSubscription<T> next = b.next;
                 if (b.isClosed()) {
                     b.next = null;
                     b = clients = next;

@@ -698,23 +720,31 @@
                 else {
                     nonEmpty = true;
                     break;
                 }
             }
+        } finally {
+            lock.unlock();
         }
         return nonEmpty;
     }
 
     /**
      * Returns the number of current subscribers.
      *
      * @return the number of current subscribers
      */
     public int getNumberOfSubscribers() {
-        synchronized (this) {
-            return cleanAndCount();
+        int n;
+        ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            n = cleanAndCount();
+        } finally {
+            lock.unlock();
         }
+        return n;
     }
 
     /**
      * Returns the Executor used for asynchronous delivery.
      *

@@ -740,11 +770,13 @@
      *
      * @return list of current subscribers
      */
     public List<Subscriber<? super T>> getSubscribers() {
         ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
-        synchronized (this) {
+        ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
             BufferedSubscription<T> pred = null, next;
             for (BufferedSubscription<T> b = clients; b != null; b = next) {
                 next = b.next;
                 if (b.isClosed()) {
                     b.next = null;

@@ -756,10 +788,12 @@
                 else {
                     subs.add(b.subscriber);
                     pred = b;
                 }
             }
+        } finally {
+            lock.unlock();
         }
         return subs;
     }
 
     /**

@@ -769,30 +803,35 @@
      * @return true if currently subscribed
      * @throws NullPointerException if subscriber is null
      */
     public boolean isSubscribed(Subscriber<? super T> subscriber) {
         if (subscriber == null) throw new NullPointerException();
+        boolean subscribed = false;
+        ReentrantLock lock = this.lock;
         if (!closed) {
-            synchronized (this) {
+            lock.lock();
+            try {
                 BufferedSubscription<T> pred = null, next;
                 for (BufferedSubscription<T> b = clients; b != null; b = next) {
                     next = b.next;
                     if (b.isClosed()) {
                         b.next = null;
                         if (pred == null)
                             clients = next;
                         else
                             pred.next = next;
                     }
-                    else if (subscriber.equals(b.subscriber))
-                        return true;
+                    else if (subscribed = subscriber.equals(b.subscriber))
+                        break;
                     else
                         pred = b;
                 }
+            } finally {
+                lock.unlock();
             }
         }
-        return false;
+        return subscribed;
     }
 
     /**
      * Returns an estimate of the minimum number of items requested
      * (via {@link Flow.Subscription#request(long) request}) but not

@@ -801,11 +840,13 @@
      * @return the estimate, or zero if no subscribers
      */
     public long estimateMinimumDemand() {
         long min = Long.MAX_VALUE;
         boolean nonEmpty = false;
-        synchronized (this) {
+        ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
             BufferedSubscription<T> pred = null, next;
             for (BufferedSubscription<T> b = clients; b != null; b = next) {
                 int n; long d;
                 next = b.next;
                 if ((n = b.estimateLag()) < 0) {

@@ -820,10 +861,12 @@
                         min = d;
                     nonEmpty = true;
                     pred = b;
                 }
             }
+        } finally {
+            lock.unlock();
         }
         return nonEmpty ? min : 0;
     }
 
     /**

@@ -832,11 +875,13 @@
      *
      * @return the estimate
      */
     public int estimateMaximumLag() {
         int max = 0;
-        synchronized (this) {
+        ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
             BufferedSubscription<T> pred = null, next;
             for (BufferedSubscription<T> b = clients; b != null; b = next) {
                 int n;
                 next = b.next;
                 if ((n = b.estimateLag()) < 0) {

@@ -850,10 +895,12 @@
                     if (n > max)
                         max = n;
                     pred = b;
                 }
             }
+        } finally {
+            lock.unlock();
         }
         return max;
     }
 
     /**
< prev index next >