< prev index next >
src/java.base/share/classes/java/util/concurrent/SynchronousQueue.java
Print this page
8246677: LinkedTransferQueue and SynchronousQueue synchronization updates
Reviewed-by: martin
*** 164,173 ****
--- 164,185 ----
* node's link to now point to the node itself. This doesn't arise
* much for Stack nodes (because blocked threads do not hang on to
* old head pointers), but references in Queue nodes must be
* aggressively forgotten to avoid reachability of everything any
* node has ever referred to since arrival.
+ *
+ * The above steps improve throughput when many threads produce
+ * and/or consume data. But they don't help much with
+ * single-source / single-sink usages in which one side or the
+ * other is always transiently blocked, and so throughput is
+ * mainly a function of thread scheduling. This is not usually
+ * noticeably improved with bounded short spin-waits. Instead both
+ * forms of transfer try Thread.yield if apparently the sole
+ * waiter. This works well when there are more tasks that cores,
+ * which is expected to be the main usage context of this mode. In
+ * other cases, waiters may help with some bookkeeping, then
+ * park/unpark.
*/
/**
* Shared internal API for dual stacks and queues.
*/
*** 187,217 ****
*/
abstract E transfer(E e, boolean timed, long nanos);
}
/**
- * The number of times to spin before blocking in timed waits.
- * The value is empirically derived -- it works well across a
- * variety of processors and OSes. Empirically, the best value
- * seems not to vary with number of CPUs (beyond 2) so is just
- * a constant.
- */
- static final int MAX_TIMED_SPINS =
- (Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32;
-
- /**
- * The number of times to spin before blocking in untimed waits.
- * This is greater than timed value because untimed waits spin
- * faster since they don't need to check times on each spin.
- */
- static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16;
-
- /**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
! static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
/** Dual stack */
static final class TransferStack<E> extends Transferer<E> {
/*
* This extends Scherer-Scott dual stack algorithm, differing,
--- 199,212 ----
*/
abstract E transfer(E e, boolean timed, long nanos);
}
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
! static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
/** Dual stack */
static final class TransferStack<E> extends Transferer<E> {
/*
* This extends Scherer-Scott dual stack algorithm, differing,
*** 231,241 ****
/** Returns true if m has fulfilling bit set. */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
/** Node class for TransferStacks. */
! static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
--- 226,236 ----
/** Returns true if m has fulfilling bit set. */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
/** Node class for TransferStacks. */
! static final class SNode implements ForkJoinPool.ManagedBlocker {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
*** 259,299 ****
*
* @param s the node to match
* @return true if successfully matched to s
*/
boolean tryMatch(SNode s) {
! if (match == null &&
! SMATCH.compareAndSet(this, null, s)) {
! Thread w = waiter;
! if (w != null) { // waiters need at most one unpark
! waiter = null;
LockSupport.unpark(w);
- }
return true;
}
! return match == s;
}
/**
* Tries to cancel a wait by matching node to itself.
*/
! void tryCancel() {
! SMATCH.compareAndSet(this, null, this);
}
boolean isCancelled() {
return match == this;
}
// VarHandle mechanics
private static final VarHandle SMATCH;
private static final VarHandle SNEXT;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
--- 254,310 ----
*
* @param s the node to match
* @return true if successfully matched to s
*/
boolean tryMatch(SNode s) {
! SNode m; Thread w;
! if ((m = match) == null) {
! if (SMATCH.compareAndSet(this, null, s)) {
! if ((w = waiter) != null)
LockSupport.unpark(w);
return true;
}
! else
! m = match;
! }
! return m == s;
}
/**
* Tries to cancel a wait by matching node to itself.
*/
! boolean tryCancel() {
! return SMATCH.compareAndSet(this, null, this);
}
boolean isCancelled() {
return match == this;
}
+ public final boolean isReleasable() {
+ return match != null || Thread.currentThread().isInterrupted();
+ }
+
+ public final boolean block() {
+ while (!isReleasable()) LockSupport.park();
+ return true;
+ }
+
+ void forgetWaiter() {
+ SWAITER.setOpaque(this, null);
+ }
+
// VarHandle mechanics
private static final VarHandle SMATCH;
private static final VarHandle SNEXT;
+ private static final VarHandle SWAITER;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);
+ SWAITER = l.findVarHandle(SNode.class, "waiter", Thread.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
*** 356,373 ****
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
! SNode m = awaitFulfill(s, timed, nanos);
! if (m == s) { // wait was cancelled
! clean(s);
return null;
}
! if ((h = head) != null && h.next == s)
! casHead(h, s.next); // help s's fulfiller
! return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
--- 367,413 ----
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
! long deadline = timed ? System.nanoTime() + nanos : 0L;
! Thread w = Thread.currentThread();
! int stat = -1; // -1: may yield, +1: park, else 0
! SNode m; // await fulfill or cancel
! while ((m = s.match) == null) {
! if ((timed &&
! (nanos = deadline - System.nanoTime()) <= 0) ||
! w.isInterrupted()) {
! if (s.tryCancel()) {
! clean(s); // wait cancelled
return null;
}
! } else if ((m = s.match) != null) {
! break; // recheck
! } else if (stat <= 0) {
! if (stat < 0 && h == null && head == s) {
! stat = 0; // yield once if was empty
! Thread.yield();
! } else {
! stat = 1;
! s.waiter = w; // enable signal
! }
! } else if (!timed) {
! LockSupport.setCurrentBlocker(this);
! try {
! ForkJoinPool.managedBlock(s);
! } catch (InterruptedException cannotHappen) { }
! LockSupport.setCurrentBlocker(null);
! } else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
! LockSupport.parkNanos(this, nanos);
! }
! if (stat == 1)
! s.forgetWaiter();
! Object result = (mode == REQUEST) ? m.item : s.item;
! if (h != null && h.next == s)
! casHead(h, s.next); // help fulfiller
! return (E) result;
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
*** 400,485 ****
}
}
}
/**
- * Spins/blocks until node s is matched by a fulfill operation.
- *
- * @param s the waiting node
- * @param timed true if timed wait
- * @param nanos timeout value
- * @return matched node, or s if cancelled
- */
- SNode awaitFulfill(SNode s, boolean timed, long nanos) {
- /*
- * When a node/thread is about to block, it sets its waiter
- * field and then rechecks state at least one more time
- * before actually parking, thus covering race vs
- * fulfiller noticing that waiter is non-null so should be
- * woken.
- *
- * When invoked by nodes that appear at the point of call
- * to be at the head of the stack, calls to park are
- * preceded by spins to avoid blocking when producers and
- * consumers are arriving very close in time. This can
- * happen enough to bother only on multiprocessors.
- *
- * The order of checks for returning out of main loop
- * reflects fact that interrupts have precedence over
- * normal returns, which have precedence over
- * timeouts. (So, on timeout, one last check for match is
- * done before giving up.) Except that calls from untimed
- * SynchronousQueue.{poll/offer} don't check interrupts
- * and don't wait at all, so are trapped in transfer
- * method rather than calling awaitFulfill.
- */
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- Thread w = Thread.currentThread();
- int spins = shouldSpin(s)
- ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
- : 0;
- for (;;) {
- if (w.isInterrupted())
- s.tryCancel();
- SNode m = s.match;
- if (m != null)
- return m;
- if (timed) {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- s.tryCancel();
- continue;
- }
- }
- if (spins > 0) {
- Thread.onSpinWait();
- spins = shouldSpin(s) ? (spins - 1) : 0;
- }
- else if (s.waiter == null)
- s.waiter = w; // establish waiter so can park next iter
- else if (!timed)
- LockSupport.park(this);
- else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanos);
- }
- }
-
- /**
- * Returns true if node s is at head or there is an active
- * fulfiller.
- */
- boolean shouldSpin(SNode s) {
- SNode h = head;
- return (h == s || h == null || isFulfilling(h.mode));
- }
-
- /**
* Unlinks s from the stack.
*/
void clean(SNode s) {
s.item = null; // forget item
! s.waiter = null; // forget thread
/*
* At worst we may need to traverse entire stack to unlink
* s. If there are multiple concurrent calls to clean, we
* might not see s if another thread has already removed
--- 440,454 ----
}
}
}
/**
* Unlinks s from the stack.
*/
void clean(SNode s) {
s.item = null; // forget item
! s.forgetWaiter();
/*
* At worst we may need to traverse entire stack to unlink
* s. If there are multiple concurrent calls to clean, we
* might not see s if another thread has already removed
*** 531,541 ****
* nodes, and matching is done by CAS'ing QNode.item field
* from non-null to null (for put) or vice versa (for take).
*/
/** Node class for TransferQueue. */
! static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
--- 500,510 ----
* nodes, and matching is done by CAS'ing QNode.item field
* from non-null to null (for put) or vice versa (for take).
*/
/** Node class for TransferQueue. */
! static final class QNode implements ForkJoinPool.ManagedBlocker {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
*** 555,566 ****
}
/**
* Tries to cancel by CAS'ing ref to this as item.
*/
! void tryCancel(Object cmp) {
! QITEM.compareAndSet(this, cmp, this);
}
boolean isCancelled() {
return item == this;
}
--- 524,535 ----
}
/**
* Tries to cancel by CAS'ing ref to this as item.
*/
! boolean tryCancel(Object cmp) {
! return QITEM.compareAndSet(this, cmp, this);
}
boolean isCancelled() {
return item == this;
}
*** 572,589 ****
--- 541,580 ----
*/
boolean isOffList() {
return next == this;
}
+ void forgetWaiter() {
+ QWAITER.setOpaque(this, null);
+ }
+
+ boolean isFulfilled() {
+ Object x;
+ return isData == ((x = item) == null) || x == this;
+ }
+
+ public final boolean isReleasable() {
+ Object x;
+ return isData == ((x = item) == null) || x == this ||
+ Thread.currentThread().isInterrupted();
+ }
+
+ public final boolean block() {
+ while (!isReleasable()) LockSupport.park();
+ return true;
+ }
+
// VarHandle mechanics
private static final VarHandle QITEM;
private static final VarHandle QNEXT;
+ private static final VarHandle QWAITER;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
QITEM = l.findVarHandle(QNode.class, "item", Object.class);
QNEXT = l.findVarHandle(QNode.class, "next", QNode.class);
+ QWAITER = l.findVarHandle(QNode.class, "waiter", Thread.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
*** 661,774 ****
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
-
for (;;) {
! QNode t = tail;
! QNode h = head;
! if (t == null || h == null) // saw uninitialized value
! continue; // spin
!
! if (h == t || t.isData == isData) { // empty or same-mode
! QNode tn = t.next;
! if (t != tail) // inconsistent read
! continue;
! if (tn != null) { // lagging tail
advanceTail(t, tn);
! continue;
! }
! if (timed && nanos <= 0L) // can't wait
return null;
! if (s == null)
! s = new QNode(e, isData);
! if (!t.casNext(null, s)) // failed to link in
! continue;
!
! advanceTail(t, s); // swing tail and wait
! Object x = awaitFulfill(s, e, timed, nanos);
! if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
!
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
! if (x != null) // and forget fields
s.item = s;
- s.waiter = null;
}
! return (x != null) ? (E)x : e;
!
! } else { // complementary-mode
! QNode m = h.next; // node to fulfill
! if (t != tail || m == null || h != head)
! continue; // inconsistent read
!
! Object x = m.item;
! if (isData == (x != null) || // m already fulfilled
! x == m || // m cancelled
! !m.casItem(x, e)) { // lost CAS
! advanceHead(h, m); // dequeue and retry
! continue;
}
! advanceHead(h, m); // successfully fulfilled
! LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
-
- /**
- * Spins/blocks until node s is fulfilled.
- *
- * @param s the waiting node
- * @param e the comparison value for checking match
- * @param timed true if timed wait
- * @param nanos timeout value
- * @return matched item, or s if cancelled
- */
- Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
- /* Same idea as TransferStack.awaitFulfill */
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- Thread w = Thread.currentThread();
- int spins = (head.next == s)
- ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
- : 0;
- for (;;) {
- if (w.isInterrupted())
- s.tryCancel(e);
- Object x = s.item;
- if (x != e)
- return x;
- if (timed) {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- s.tryCancel(e);
- continue;
- }
- }
- if (spins > 0) {
- --spins;
- Thread.onSpinWait();
- }
- else if (s.waiter == null)
- s.waiter = w;
- else if (!timed)
- LockSupport.park(this);
- else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanos);
- }
}
/**
* Gets rid of cancelled node s with original predecessor pred.
*/
void clean(QNode pred, QNode s) {
! s.waiter = null; // forget thread
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
--- 652,740 ----
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
! QNode t = tail, h = head, m, tn; // m is node to fulfill
! if (t == null || h == null)
! ; // inconsistent
! else if (h == t || t.isData == isData) { // empty or same-mode
! if (t != tail) // inconsistent
! ;
! else if ((tn = t.next) != null) // lagging tail
advanceTail(t, tn);
! else if (timed && nanos <= 0L) // can't wait
return null;
! else if (t.casNext(null, (s != null) ? s :
! (s = new QNode(e, isData)))) {
! advanceTail(t, s);
! long deadline = timed ? System.nanoTime() + nanos : 0L;
! Thread w = Thread.currentThread();
! int stat = -1; // same idea as TransferStack
! Object item;
! while ((item = s.item) == e) {
! if ((timed &&
! (nanos = deadline - System.nanoTime()) <= 0) ||
! w.isInterrupted()) {
! if (s.tryCancel(e)) {
clean(t, s);
return null;
}
! } else if ((item = s.item) != e) {
! break; // recheck
! } else if (stat <= 0) {
! if (t.next == s) {
! if (stat < 0 && t.isFulfilled()) {
! stat = 0; // yield once if first
! Thread.yield();
! }
! else {
! stat = 1;
! s.waiter = w;
! }
! }
! } else if (!timed) {
! LockSupport.setCurrentBlocker(this);
! try {
! ForkJoinPool.managedBlock(s);
! } catch (InterruptedException cannotHappen) { }
! LockSupport.setCurrentBlocker(null);
! }
! else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
! LockSupport.parkNanos(this, nanos);
! }
! if (stat == 1)
! s.forgetWaiter();
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
! if (item != null) // and forget fields
s.item = s;
}
! return (item != null) ? (E)item : e;
}
! } else if ((m = h.next) != null && t == tail && h == head) {
! Thread waiter;
! Object x = m.item;
! boolean fulfilled = ((isData == (x == null)) &&
! x != m && m.casItem(x, e));
! advanceHead(h, m); // (help) dequeue
! if (fulfilled) {
! if ((waiter = m.waiter) != null)
! LockSupport.unpark(waiter);
return (x != null) ? (E)x : e;
}
}
}
}
/**
* Gets rid of cancelled node s with original predecessor pred.
*/
void clean(QNode pred, QNode s) {
! s.forgetWaiter();
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
< prev index next >