src/share/classes/java/util/concurrent/SynchronousQueue.java
Print this page
*** 161,171 ****
*/
/**
* Shared internal API for dual stacks and queues.
*/
! static abstract class Transferer {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
--- 161,171 ----
*/
/**
* Shared internal API for dual stacks and queues.
*/
! abstract static class Transferer {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
*** 188,198 ****
* The value is empirically derived -- it works well across a
* variety of processors and OSes. Empirically, the best value
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
! static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
/**
* The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin.
--- 188,198 ----
* The value is empirically derived -- it works well across a
* variety of processors and OSes. Empirically, the best value
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
! static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
/**
* The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin.
*** 239,261 ****
SNode(Object item) {
this.item = item;
}
- static final AtomicReferenceFieldUpdater<SNode, SNode>
- nextUpdater = AtomicReferenceFieldUpdater.newUpdater
- (SNode.class, SNode.class, "next");
-
boolean casNext(SNode cmp, SNode val) {
! return (cmp == next &&
! nextUpdater.compareAndSet(this, cmp, val));
}
- static final AtomicReferenceFieldUpdater<SNode, SNode>
- matchUpdater = AtomicReferenceFieldUpdater.newUpdater
- (SNode.class, SNode.class, "match");
-
/**
* Tries to match node s to this node, if so, waking up thread.
* Fulfillers call tryMatch to identify their waiters.
* Waiters block until they have been matched.
*
--- 239,253 ----
SNode(Object item) {
this.item = item;
}
boolean casNext(SNode cmp, SNode val) {
! return cmp == next &&
! UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* Tries to match node s to this node, if so, waking up thread.
* Fulfillers call tryMatch to identify their waiters.
* Waiters block until they have been matched.
*
*** 262,272 ****
* @param s the node to match
* @return true if successfully matched to s
*/
boolean tryMatch(SNode s) {
if (match == null &&
! matchUpdater.compareAndSet(this, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
--- 254,264 ----
* @param s the node to match
* @return true if successfully matched to s
*/
boolean tryMatch(SNode s) {
if (match == null &&
! UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
*** 277,303 ****
/**
* Tries to cancel a wait by matching node to itself.
*/
void tryCancel() {
! matchUpdater.compareAndSet(this, null, this);
}
boolean isCancelled() {
return match == this;
}
}
/** The head (top) of the stack */
volatile SNode head;
- static final AtomicReferenceFieldUpdater<TransferStack, SNode>
- headUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferStack.class, SNode.class, "head");
-
boolean casHead(SNode h, SNode nh) {
! return h == head && headUpdater.compareAndSet(this, h, nh);
}
/**
* Creates or resets fields of a node. Called only from transfer
* where the node to push on stack is lazily created and
--- 269,300 ----
/**
* Tries to cancel a wait by matching node to itself.
*/
void tryCancel() {
! UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long nextOffset =
+ objectFieldOffset(UNSAFE, "next", SNode.class);
+ private static final long matchOffset =
+ objectFieldOffset(UNSAFE, "match", SNode.class);
+
}
/** The head (top) of the stack */
volatile SNode head;
boolean casHead(SNode h, SNode nh) {
! return h == head &&
! UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
/**
* Creates or resets fields of a node. Called only from transfer
* where the node to push on stack is lazily created and
*** 336,346 ****
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*/
SNode s = null; // constructed/reused as needed
! int mode = (e == null)? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
--- 333,343 ----
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*/
SNode s = null; // constructed/reused as needed
! int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
*** 354,364 ****
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
! return mode == REQUEST? m.item : s.item;
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
--- 351,361 ----
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
! return (mode == REQUEST) ? m.item : s.item;
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
*** 370,380 ****
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
! return (mode == REQUEST)? m.item : s.item;
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
--- 367,377 ----
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
! return (mode == REQUEST) ? m.item : s.item;
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
*** 421,435 ****
* done before giving up.) Except that calls from untimed
* SynchronousQueue.{poll/offer} don't check interrupts
* and don't wait at all, so are trapped in transfer
* method rather than calling awaitFulfill.
*/
! long lastTime = (timed)? System.nanoTime() : 0;
Thread w = Thread.currentThread();
SNode h = head;
! int spins = (shouldSpin(s)?
! (timed? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
if (m != null)
--- 418,432 ----
* done before giving up.) Except that calls from untimed
* SynchronousQueue.{poll/offer} don't check interrupts
* and don't wait at all, so are trapped in transfer
* method rather than calling awaitFulfill.
*/
! long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
SNode h = head;
! int spins = (shouldSpin(s) ?
! (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
if (m != null)
*** 442,452 ****
s.tryCancel();
continue;
}
}
if (spins > 0)
! spins = shouldSpin(s)? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
--- 439,449 ----
s.tryCancel();
continue;
}
}
if (spins > 0)
! spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
*** 497,506 ****
--- 494,509 ----
p.casNext(n, n.next);
else
p = n;
}
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long headOffset =
+ objectFieldOffset(UNSAFE, "head", TransferStack.class);
+
}
/** Dual Queue */
static final class TransferQueue extends Transferer {
/*
*** 522,554 ****
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
- static final AtomicReferenceFieldUpdater<QNode, QNode>
- nextUpdater = AtomicReferenceFieldUpdater.newUpdater
- (QNode.class, QNode.class, "next");
-
boolean casNext(QNode cmp, QNode val) {
! return (next == cmp &&
! nextUpdater.compareAndSet(this, cmp, val));
}
- static final AtomicReferenceFieldUpdater<QNode, Object>
- itemUpdater = AtomicReferenceFieldUpdater.newUpdater
- (QNode.class, Object.class, "item");
-
boolean casItem(Object cmp, Object val) {
! return (item == cmp &&
! itemUpdater.compareAndSet(this, cmp, val));
}
/**
* Tries to cancel by CAS'ing ref to this as item.
*/
void tryCancel(Object cmp) {
! itemUpdater.compareAndSet(this, cmp, this);
}
boolean isCancelled() {
return item == this;
}
--- 525,549 ----
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
boolean casNext(QNode cmp, QNode val) {
! return next == cmp &&
! UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
boolean casItem(Object cmp, Object val) {
! return item == cmp &&
! UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* Tries to cancel by CAS'ing ref to this as item.
*/
void tryCancel(Object cmp) {
! UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
boolean isCancelled() {
return item == this;
}
*** 559,568 ****
--- 554,570 ----
* an advanceHead operation.
*/
boolean isOffList() {
return next == this;
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long nextOffset =
+ objectFieldOffset(UNSAFE, "next", QNode.class);
+ private static final long itemOffset =
+ objectFieldOffset(UNSAFE, "item", QNode.class);
}
/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
*** 578,622 ****
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
- static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
- headUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferQueue.class, QNode.class, "head");
-
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
void advanceHead(QNode h, QNode nh) {
! if (h == head && headUpdater.compareAndSet(this, h, nh))
h.next = h; // forget old next
}
- static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
- tailUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferQueue.class, QNode.class, "tail");
-
/**
* Tries to cas nt as new tail.
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
! tailUpdater.compareAndSet(this, t, nt);
}
- static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
- cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferQueue.class, QNode.class, "cleanMe");
-
/**
* Tries to CAS cleanMe slot.
*/
boolean casCleanMe(QNode cmp, QNode val) {
! return (cleanMe == cmp &&
! cleanMeUpdater.compareAndSet(this, cmp, val));
}
/**
* Puts or takes an item.
*/
--- 580,613 ----
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
void advanceHead(QNode h, QNode nh) {
! if (h == head &&
! UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
/**
* Tries to cas nt as new tail.
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
! UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
/**
* Tries to CAS cleanMe slot.
*/
boolean casCleanMe(QNode cmp, QNode val) {
! return cleanMe == cmp &&
! UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
/**
* Puts or takes an item.
*/
*** 681,691 ****
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
! return (x != null)? x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
--- 672,682 ----
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
! return (x != null) ? x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
*** 698,708 ****
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
! return (x != null)? x : e;
}
}
}
/**
--- 689,699 ----
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
! return (x != null) ? x : e;
}
}
}
/**
*** 714,727 ****
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
! long lastTime = (timed)? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
! (timed? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e)
--- 705,718 ----
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
! long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
! (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e)
*** 797,806 ****
--- 788,807 ----
return; // s is already saved node
} else if (casCleanMe(null, pred))
return; // Postpone cleaning s
}
}
+
+ // unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long headOffset =
+ objectFieldOffset(UNSAFE, "head", TransferQueue.class);
+ private static final long tailOffset =
+ objectFieldOffset(UNSAFE, "tail", TransferQueue.class);
+ private static final long cleanMeOffset =
+ objectFieldOffset(UNSAFE, "cleanMe", TransferQueue.class);
+
}
/**
* The transferer. Set only in constructor, but cannot be declared
* as final without further complicating serialization. Since
*** 822,832 ****
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
! transferer = (fair)? new TransferQueue() : new TransferStack();
}
/**
* Adds the specified element to this queue, waiting if necessary for
* another thread to receive it.
--- 823,833 ----
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
! transferer = fair ? new TransferQueue() : new TransferStack();
}
/**
* Adds the specified element to this queue, waiting if necessary for
* another thread to receive it.
*** 1139,1144 ****
--- 1140,1158 ----
transferer = new TransferQueue();
else
transferer = new TransferStack();
}
+ // Unsafe mechanics
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+
}