< prev index next >
src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java
Print this page
8246677: LinkedTransferQueue and SynchronousQueue synchronization updates
Reviewed-by: martin
*** 307,341 ****
* CAS-advance the tail pointer by at least two hops.
*
* 2. Await match or cancellation (method awaitMatch)
*
* Wait for another thread to match node; instead cancelling if
! * the current thread was interrupted or the wait timed out. On
! * multiprocessors, we use front-of-queue spinning: If a node
! * appears to be the first unmatched node in the queue, it
! * spins a bit before blocking. In either case, before blocking
! * it tries to unsplice any nodes between the current "head"
! * and the first unmatched node.
! *
! * Front-of-queue spinning vastly improves performance of
! * heavily contended queues. And so long as it is relatively
! * brief and "quiet", spinning does not much impact performance
! * of less-contended queues. During spins threads check their
! * interrupt status and generate a thread-local random number
! * to decide to occasionally perform a Thread.yield. While
! * yield has underdefined specs, we assume that it might help,
! * and will not hurt, in limiting impact of spinning on busy
! * systems. We also use smaller (1/2) spins for nodes that are
! * not known to be front but whose predecessors have not
! * blocked -- these "chained" spins avoid artifacts of
! * front-of-queue rules which otherwise lead to alternating
! * nodes spinning vs blocking. Further, front threads that
! * represent phase changes (from data to request node or vice
! * versa) compared to their predecessors receive additional
! * chained spins, reflecting longer paths typically required to
! * unblock threads during phase changes.
! *
*
* ** Unlinking removed interior nodes **
*
* In addition to minimizing garbage retention via self-linking
* described above, we also unlink removed interior nodes. These
--- 307,322 ----
* CAS-advance the tail pointer by at least two hops.
*
* 2. Await match or cancellation (method awaitMatch)
*
* Wait for another thread to match node; instead cancelling if
! * the current thread was interrupted or the wait timed out. To
! * improve performance in common single-source / single-sink
! * usages when there are more tasks that cores, an initial
! * Thread.yield is tried when there is apparently only one
! * waiter. In other cases, waiters may help with some
! * bookkeeping, then park/unpark.
*
* ** Unlinking removed interior nodes **
*
* In addition to minimizing garbage retention via self-linking
* described above, we also unlink removed interior nodes. These
*** 367,400 ****
* never fall off the list because of an untimed call to take() at
* the front of the queue.
*
* When these cases arise, rather than always retraversing the
* entire list to find an actual predecessor to unlink (which
! * won't help for case (1) anyway), we record a conservative
! * estimate of possible unsplice failures (in "sweepVotes").
! * We trigger a full sweep when the estimate exceeds a threshold
! * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
! * removal failures to tolerate before sweeping through, unlinking
! * cancelled nodes that were not unlinked upon initial removal.
! * We perform sweeps by the thread hitting threshold (rather than
! * background threads or by spreading work to other threads)
! * because in the main contexts in which removal occurs, the
! * caller is timed-out or cancelled, which are not time-critical
! * enough to warrant the overhead that alternatives would impose
! * on other threads.
! *
! * Because the sweepVotes estimate is conservative, and because
! * nodes become unlinked "naturally" as they fall off the head of
! * the queue, and because we allow votes to accumulate even while
! * sweeps are in progress, there are typically significantly fewer
! * such nodes than estimated. Choice of a threshold value
! * balances the likelihood of wasted effort and contention, versus
! * providing a worst-case bound on retention of interior nodes in
! * quiescent queues. The value defined below was chosen
! * empirically to balance these under various timeout scenarios.
! *
! * Because traversal operations on the linked list of nodes are a
* natural opportunity to sweep dead nodes, we generally do so,
* including all the operations that might remove elements as they
* traverse, such as removeIf and Iterator.remove. This largely
* eliminates long chains of dead interior nodes, except from
* cancelled or timed out blocking operations.
--- 348,360 ----
* never fall off the list because of an untimed call to take() at
* the front of the queue.
*
* When these cases arise, rather than always retraversing the
* entire list to find an actual predecessor to unlink (which
! * won't help for case (1) anyway), we record the need to sweep the
! * next time any thread would otherwise block in awaitMatch. Also,
! * because traversal operations on the linked list of nodes are a
* natural opportunity to sweep dead nodes, we generally do so,
* including all the operations that might remove elements as they
* traverse, such as removeIf and Iterator.remove. This largely
* eliminates long chains of dead interior nodes, except from
* cancelled or timed out blocking operations.
*** 403,434 ****
* sweeps. However, the associated garbage chains terminate when
* some successor ultimately falls off the head of the list and is
* self-linked.
*/
- /** True if on multiprocessor */
- private static final boolean MP =
- Runtime.getRuntime().availableProcessors() > 1;
-
- /**
- * The number of times to spin (with randomly interspersed calls
- * to Thread.yield) on multiprocessor before blocking when a node
- * is apparently the first waiter in the queue. See above for
- * explanation. Must be a power of two. The value is empirically
- * derived -- it works pretty well across a variety of processors,
- * numbers of CPUs, and OSes.
- */
- private static final int FRONT_SPINS = 1 << 7;
-
/**
! * The number of times to spin before blocking when a node is
! * preceded by another node that is apparently spinning. Also
! * serves as an increment to FRONT_SPINS on phase changes, and as
! * base average frequency for yielding during spins. Must be a
! * power of two.
*/
! private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
/**
* The maximum number of estimated removal failures (sweepVotes)
* to tolerate before sweeping through the queue unlinking
* cancelled nodes that were not unlinked upon initial
--- 363,378 ----
* sweeps. However, the associated garbage chains terminate when
* some successor ultimately falls off the head of the list and is
* self-linked.
*/
/**
! * The number of nanoseconds for which it is faster to spin
! * rather than to use timed park. A rough estimate suffices.
! * Using a power of two minus one simplifies some comparisons.
*/
! static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
/**
* The maximum number of estimated removal failures (sweepVotes)
* to tolerate before sweeping through the queue unlinking
* cancelled nodes that were not unlinked upon initial
*** 440,450 ****
/**
* Queue nodes. Uses Object, not E, for items to allow forgetting
* them after use. Writes that are intrinsically ordered wrt
* other accesses or CASes use simple relaxed forms.
*/
! static final class Node {
final boolean isData; // false if this is a request node
volatile Object item; // initially non-null if isData; CASed to match
volatile Node next;
volatile Thread waiter; // null when not waiting for a match
--- 384,394 ----
/**
* Queue nodes. Uses Object, not E, for items to allow forgetting
* them after use. Writes that are intrinsically ordered wrt
* other accesses or CASes use simple relaxed forms.
*/
! static final class Node implements ForkJoinPool.ManagedBlocker {
final boolean isData; // false if this is a request node
volatile Object item; // initially non-null if isData; CASed to match
volatile Node next;
volatile Thread waiter; // null when not waiting for a match
*** 485,512 ****
}
final void appendRelaxed(Node next) {
// assert next != null;
// assert this.next == null;
! NEXT.set(this, next);
! }
!
! /**
! * Sets item (of a request node) to self and waiter to null,
! * to avoid garbage retention after matching or cancelling.
! * Uses relaxed writes because order is already constrained in
! * the only calling contexts: item is forgotten only after
! * volatile/atomic mechanics that extract items, and visitors
! * of request nodes only ever check whether item is null.
! * Similarly, clearing waiter follows either CAS or return
! * from park (if ever parked; else we don't care).
! */
! final void forgetContents() {
! // assert isMatched();
! if (!isData)
! ITEM.set(this, this);
! WAITER.set(this, null);
}
/**
* Returns true if this node has been matched, including the
* case of artificial matches due to cancellation.
--- 429,439 ----
}
final void appendRelaxed(Node next) {
// assert next != null;
// assert this.next == null;
! NEXT.setOpaque(this, next);
}
/**
* Returns true if this node has been matched, including the
* case of artificial matches due to cancellation.
*** 532,541 ****
--- 459,478 ----
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
return d != haveData && d != (item == null);
}
+ public final boolean isReleasable() {
+ return (isData == (item == null)) ||
+ Thread.currentThread().isInterrupted();
+ }
+
+ public final boolean block() {
+ while (!isReleasable()) LockSupport.park();
+ return true;
+ }
+
private static final long serialVersionUID = -3375979862319811754L;
}
/**
* A node from which the first live (non-matched) node (if any)
*** 564,574 ****
* - tail.next may or may not be self-linked.
*/
private transient volatile Node tail;
/** The number of apparent failures to unsplice cancelled nodes */
! private transient volatile int sweepVotes;
private boolean casTail(Node cmp, Node val) {
// assert cmp != null;
// assert val != null;
return TAIL.compareAndSet(this, cmp, val);
--- 501,511 ----
* - tail.next may or may not be self-linked.
*/
private transient volatile Node tail;
/** The number of apparent failures to unsplice cancelled nodes */
! private transient volatile boolean needSweep;
private boolean casTail(Node cmp, Node val) {
// assert cmp != null;
// assert val != null;
return TAIL.compareAndSet(this, cmp, val);
*** 576,590 ****
private boolean casHead(Node cmp, Node val) {
return HEAD.compareAndSet(this, cmp, val);
}
- /** Atomic version of ++sweepVotes. */
- private int incSweepVotes() {
- return (int) SWEEPVOTES.getAndAdd(this, 1) + 1;
- }
-
/**
* Tries to CAS pred.next (or head, if pred is null) from c to p.
* Caller must ensure that we're not unlinking the trailing node.
*/
private boolean tryCasSuccessor(Node pred, Node c, Node p) {
--- 513,522 ----
*** 687,766 ****
}
}
}
/**
! * Spins/yields/blocks until node s is matched or caller gives up.
*
* @param s the waiting node
* @param pred the predecessor of s, or null if unknown (the null
* case does not occur in any current calls but may in possible
* future extensions)
* @param e the comparison value for checking match
* @param timed if true, wait only until timeout elapses
* @param nanos timeout in nanosecs, used only if timed is true
* @return matched item, or e if unmatched on interrupt or timeout
*/
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
! Thread w = Thread.currentThread();
! int spins = -1; // initialized after first item and cancel checks
! ThreadLocalRandom randomYields = null; // bound if needed
!
! for (;;) {
! final Object item;
! if ((item = s.item) != e) { // matched
! // assert item != s;
! s.forgetContents(); // avoid garbage
! @SuppressWarnings("unchecked") E itemE = (E) item;
! return itemE;
! }
! else if (w.isInterrupted() || (timed && nanos <= 0L)) {
! // try to cancel and unlink
! if (s.casItem(e, s.isData ? null : s)) {
! unsplice(pred, s);
return e;
}
- // return normally if lost CAS
}
! else if (spins < 0) { // establish spins at/near front
! if ((spins = spinsFor(pred, s.isData)) > 0)
! randomYields = ThreadLocalRandom.current();
}
! else if (spins > 0) { // spin
! --spins;
! if (randomYields.nextInt(CHAINED_SPINS) == 0)
! Thread.yield(); // occasionally yield
}
! else if (s.waiter == null) {
! s.waiter = w; // request unpark then recheck
}
! else if (timed) {
! nanos = deadline - System.nanoTime();
! if (nanos > 0L)
! LockSupport.parkNanos(this, nanos);
}
else {
! LockSupport.park(this);
! }
! }
}
-
- /**
- * Returns spin/yield value for a node with given predecessor and
- * data mode. See above for explanation.
- */
- private static int spinsFor(Node pred, boolean haveData) {
- if (MP && pred != null) {
- if (pred.isData != haveData) // phase change
- return FRONT_SPINS + CHAINED_SPINS;
- if (pred.isMatched()) // probably at front
- return FRONT_SPINS;
- if (pred.waiter == null) // pred apparently spinning
- return CHAINED_SPINS;
}
! return 0;
}
/* -------------- Traversal methods -------------- */
/**
--- 619,688 ----
}
}
}
/**
! * Possibly blocks until node s is matched or caller gives up.
*
* @param s the waiting node
* @param pred the predecessor of s, or null if unknown (the null
* case does not occur in any current calls but may in possible
* future extensions)
* @param e the comparison value for checking match
* @param timed if true, wait only until timeout elapses
* @param nanos timeout in nanosecs, used only if timed is true
* @return matched item, or e if unmatched on interrupt or timeout
*/
+ @SuppressWarnings("unchecked")
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
+ final boolean isData = s.isData;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
! final Thread w = Thread.currentThread();
! int stat = -1; // -1: may yield, +1: park, else 0
! Object item;
! while ((item = s.item) == e) {
! if (needSweep) // help clean
! sweep();
! else if ((timed && nanos <= 0L) || w.isInterrupted()) {
! if (s.casItem(e, (e == null) ? s : null)) {
! unsplice(pred, s); // cancelled
return e;
}
}
! else if (stat <= 0) {
! if (pred != null && pred.next == s) {
! if (stat < 0 &&
! (pred.isData != isData || pred.isMatched())) {
! stat = 0; // yield once if first
! Thread.yield();
}
! else {
! stat = 1;
! s.waiter = w; // enable unpark
}
! } // else signal in progress
}
! else if ((item = s.item) != e)
! break; // recheck
! else if (!timed) {
! LockSupport.setCurrentBlocker(this);
! try {
! ForkJoinPool.managedBlock(s);
! } catch (InterruptedException cannotHappen) { }
! LockSupport.setCurrentBlocker(null);
}
else {
! nanos = deadline - System.nanoTime();
! if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
! LockSupport.parkNanos(this, nanos);
}
}
! if (stat == 1)
! WAITER.set(s, null);
! if (!isData)
! ITEM.set(s, s); // self-link to avoid garbage
! return (E) item;
}
/* -------------- Traversal methods -------------- */
/**
*** 1179,1190 ****
s.waiter = null; // disable signals
/*
* See above for rationale. Briefly: if pred still points to
* s, try to unlink s. If s cannot be unlinked, because it is
* trailing node or pred might be unlinked, and neither pred
! * nor s are head or offlist, add to sweepVotes, and if enough
! * votes have accumulated, sweep.
*/
if (pred != null && pred.next == s) {
Node n = s.next;
if (n == null ||
(n != s && pred.casNext(s, n) && pred.isMatched())) {
--- 1101,1111 ----
s.waiter = null; // disable signals
/*
* See above for rationale. Briefly: if pred still points to
* s, try to unlink s. If s cannot be unlinked, because it is
* trailing node or pred might be unlinked, and neither pred
! * nor s are head or offlist, set needSweep;
*/
if (pred != null && pred.next == s) {
Node n = s.next;
if (n == null ||
(n != s && pred.casNext(s, n) && pred.isMatched())) {
*** 1198,1220 ****
if (hn == null)
return; // now empty
if (hn != h && casHead(h, hn))
h.selfLink(); // advance head
}
! // sweep every SWEEP_THRESHOLD votes
! if (pred.next != pred && s.next != s // recheck if offlist
! && (incSweepVotes() & (SWEEP_THRESHOLD - 1)) == 0)
! sweep();
}
}
}
/**
* Unlinks matched (typically cancelled) nodes encountered in a
* traversal from head.
*/
private void sweep() {
for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
if (!s.isMatched())
// Unmatched nodes are never self-linked
p = s;
else if ((n = s.next) == null) // trailing node is pinned
--- 1119,1140 ----
if (hn == null)
return; // now empty
if (hn != h && casHead(h, hn))
h.selfLink(); // advance head
}
! if (pred.next != pred && s.next != s)
! needSweep = true;
}
}
}
/**
* Unlinks matched (typically cancelled) nodes encountered in a
* traversal from head.
*/
private void sweep() {
+ needSweep = false;
for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
if (!s.isMatched())
// Unmatched nodes are never self-linked
p = s;
else if ((n = s.next) == null) // trailing node is pinned
*** 1263,1273 ****
* As the queue is unbounded, this method will never block.
*
* @throws NullPointerException if the specified element is null
*/
public void put(E e) {
! xfer(e, true, ASYNC, 0);
}
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never block or
--- 1183,1193 ----
* As the queue is unbounded, this method will never block.
*
* @throws NullPointerException if the specified element is null
*/
public void put(E e) {
! xfer(e, true, ASYNC, 0L);
}
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never block or
*** 1276,1286 ****
* @return {@code true} (as specified by
* {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
! xfer(e, true, ASYNC, 0);
return true;
}
/**
* Inserts the specified element at the tail of this queue.
--- 1196,1206 ----
* @return {@code true} (as specified by
* {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
! xfer(e, true, ASYNC, 0L);
return true;
}
/**
* Inserts the specified element at the tail of this queue.
*** 1288,1298 ****
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
! xfer(e, true, ASYNC, 0);
return true;
}
/**
* Inserts the specified element at the tail of this queue.
--- 1208,1218 ----
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
! xfer(e, true, ASYNC, 0L);
return true;
}
/**
* Inserts the specified element at the tail of this queue.
*** 1301,1311 ****
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
! xfer(e, true, ASYNC, 0);
return true;
}
/**
* Transfers the element to a waiting consumer immediately, if possible.
--- 1221,1231 ----
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
! xfer(e, true, ASYNC, 0L);
return true;
}
/**
* Transfers the element to a waiting consumer immediately, if possible.
*** 1316,1326 ****
* otherwise returning {@code false} without enqueuing the element.
*
* @throws NullPointerException if the specified element is null
*/
public boolean tryTransfer(E e) {
! return xfer(e, true, NOW, 0) == null;
}
/**
* Transfers the element to a consumer, waiting if necessary to do so.
*
--- 1236,1246 ----
* otherwise returning {@code false} without enqueuing the element.
*
* @throws NullPointerException if the specified element is null
*/
public boolean tryTransfer(E e) {
! return xfer(e, true, NOW, 0L) == null;
}
/**
* Transfers the element to a consumer, waiting if necessary to do so.
*
*** 1331,1341 ****
* and waits until the element is received by a consumer.
*
* @throws NullPointerException if the specified element is null
*/
public void transfer(E e) throws InterruptedException {
! if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
--- 1251,1261 ----
* and waits until the element is received by a consumer.
*
* @throws NullPointerException if the specified element is null
*/
public void transfer(E e) throws InterruptedException {
! if (xfer(e, true, SYNC, 0L) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
*** 1361,1371 ****
return false;
throw new InterruptedException();
}
public E take() throws InterruptedException {
! E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
--- 1281,1291 ----
return false;
throw new InterruptedException();
}
public E take() throws InterruptedException {
! E e = xfer(null, false, SYNC, 0L);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
*** 1376,1386 ****
return e;
throw new InterruptedException();
}
public E poll() {
! return xfer(null, false, NOW, 0);
}
/**
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
--- 1296,1306 ----
return e;
throw new InterruptedException();
}
public E poll() {
! return xfer(null, false, NOW, 0L);
}
/**
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*** 1720,1742 ****
}
// VarHandle mechanics
private static final VarHandle HEAD;
private static final VarHandle TAIL;
- private static final VarHandle SWEEPVOTES;
static final VarHandle ITEM;
static final VarHandle NEXT;
static final VarHandle WAITER;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
HEAD = l.findVarHandle(LinkedTransferQueue.class, "head",
Node.class);
TAIL = l.findVarHandle(LinkedTransferQueue.class, "tail",
Node.class);
- SWEEPVOTES = l.findVarHandle(LinkedTransferQueue.class, "sweepVotes",
- int.class);
ITEM = l.findVarHandle(Node.class, "item", Object.class);
NEXT = l.findVarHandle(Node.class, "next", Node.class);
WAITER = l.findVarHandle(Node.class, "waiter", Thread.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
--- 1640,1659 ----
< prev index next >