--- old/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java 2021-01-09 11:35:36.433091086 -0800 +++ new/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java 2021-01-09 11:35:36.113093606 -0800 @@ -41,6 +41,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.BiPredicate; import java.util.function.Consumer; @@ -175,11 +176,11 @@ /* * Most mechanics are handled by BufferedSubscription. This class * mainly tracks subscribers and ensures sequentiality, by using - * built-in synchronization locks across public methods. Using - * built-in locks works well in the most typical case in which - * only one thread submits items. We extend this idea in - * submission methods by detecting single-ownership to reduce - * producer-consumer synchronization strength. + * locks across public methods, to ensure thread-safety in the + * presence of multiple sources and maintain acquire-release + * ordering around user operations. However, we also track whether + * there is only a single source, and if so streamline some buffer + * operations by avoiding some atomics. */ /** The largest possible power of two array size. */ @@ -234,6 +235,8 @@ */ BufferedSubscription clients; + /** Lock for exclusion across multiple sources */ + final ReentrantLock lock; /** Run status, updated only within locks */ volatile boolean closed; /** Set true on first call to subscribe, to initialize possible owner */ @@ -274,6 +277,7 @@ throw new NullPointerException(); if (maxBufferCapacity <= 0) throw new IllegalArgumentException("capacity must be positive"); + this.lock = new ReentrantLock(); this.executor = executor; this.onNextHandler = handler; this.maxBufferCapacity = roundCapacity(maxBufferCapacity); @@ -337,13 +341,15 @@ */ public void subscribe(Subscriber subscriber) { if (subscriber == null) throw new NullPointerException(); + ReentrantLock lock = this.lock; int max = maxBufferCapacity; // allocate initial array Object[] array = new Object[max < INITIAL_CAPACITY ? max : INITIAL_CAPACITY]; BufferedSubscription subscription = new BufferedSubscription(subscriber, executor, onNextHandler, array, max); - synchronized (this) { + lock.lock(); + try { if (!subscribed) { subscribed = true; owner = Thread.currentThread(); @@ -378,6 +384,8 @@ pred = b; b = next; } + } finally { + lock.unlock(); } } @@ -390,7 +398,9 @@ if (item == null) throw new NullPointerException(); int lag = 0; boolean complete, unowned; - synchronized (this) { + ReentrantLock lock = this.lock; + lock.lock(); + try { Thread t = Thread.currentThread(), o; BufferedSubscription b = clients; if ((unowned = ((o = owner) != t)) && o != null) @@ -421,6 +431,8 @@ if (retries != null || cleanMe) lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe); } + } finally { + lock.unlock(); } if (complete) throw new IllegalStateException("Closed"); @@ -609,14 +621,18 @@ * subscribers have yet completed. */ public void close() { + ReentrantLock lock = this.lock; if (!closed) { BufferedSubscription b; - synchronized (this) { + lock.lock(); + try { // no need to re-check closed here b = clients; clients = null; owner = null; closed = true; + } finally { + lock.unlock(); } while (b != null) { BufferedSubscription next = b.next; @@ -641,9 +657,11 @@ public void closeExceptionally(Throwable error) { if (error == null) throw new NullPointerException(); + ReentrantLock lock = this.lock; if (!closed) { BufferedSubscription b; - synchronized (this) { + lock.lock(); + try { b = clients; if (!closed) { // don't clobber racing close closedException = error; @@ -651,6 +669,8 @@ owner = null; closed = true; } + } finally { + lock.unlock(); } while (b != null) { BufferedSubscription next = b.next; @@ -688,7 +708,9 @@ */ public boolean hasSubscribers() { boolean nonEmpty = false; - synchronized (this) { + ReentrantLock lock = this.lock; + lock.lock(); + try { for (BufferedSubscription b = clients; b != null;) { BufferedSubscription next = b.next; if (b.isClosed()) { @@ -700,6 +722,8 @@ break; } } + } finally { + lock.unlock(); } return nonEmpty; } @@ -710,9 +734,15 @@ * @return the number of current subscribers */ public int getNumberOfSubscribers() { - synchronized (this) { - return cleanAndCount(); + int n; + ReentrantLock lock = this.lock; + lock.lock(); + try { + n = cleanAndCount(); + } finally { + lock.unlock(); } + return n; } /** @@ -742,7 +772,9 @@ */ public List> getSubscribers() { ArrayList> subs = new ArrayList<>(); - synchronized (this) { + ReentrantLock lock = this.lock; + lock.lock(); + try { BufferedSubscription pred = null, next; for (BufferedSubscription b = clients; b != null; b = next) { next = b.next; @@ -758,6 +790,8 @@ pred = b; } } + } finally { + lock.unlock(); } return subs; } @@ -771,8 +805,11 @@ */ public boolean isSubscribed(Subscriber subscriber) { if (subscriber == null) throw new NullPointerException(); + boolean subscribed = false; + ReentrantLock lock = this.lock; if (!closed) { - synchronized (this) { + lock.lock(); + try { BufferedSubscription pred = null, next; for (BufferedSubscription b = clients; b != null; b = next) { next = b.next; @@ -783,14 +820,16 @@ else pred.next = next; } - else if (subscriber.equals(b.subscriber)) - return true; + else if (subscribed = subscriber.equals(b.subscriber)) + break; else pred = b; } + } finally { + lock.unlock(); } } - return false; + return subscribed; } /** @@ -803,7 +842,9 @@ public long estimateMinimumDemand() { long min = Long.MAX_VALUE; boolean nonEmpty = false; - synchronized (this) { + ReentrantLock lock = this.lock; + lock.lock(); + try { BufferedSubscription pred = null, next; for (BufferedSubscription b = clients; b != null; b = next) { int n; long d; @@ -822,6 +863,8 @@ pred = b; } } + } finally { + lock.unlock(); } return nonEmpty ? min : 0; } @@ -834,7 +877,9 @@ */ public int estimateMaximumLag() { int max = 0; - synchronized (this) { + ReentrantLock lock = this.lock; + lock.lock(); + try { BufferedSubscription pred = null, next; for (BufferedSubscription b = clients; b != null; b = next) { int n; @@ -852,6 +897,8 @@ pred = b; } } + } finally { + lock.unlock(); } return max; }