src/share/classes/java/util/concurrent/SynchronousQueue.java
Print this page
@@ -161,11 +161,11 @@
*/
/**
* Shared internal API for dual stacks and queues.
*/
- static abstract class Transferer {
+ abstract static class Transferer {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
@@ -188,11 +188,11 @@
* The value is empirically derived -- it works well across a
* variety of processors and OSes. Empirically, the best value
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
- static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
+ static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
/**
* The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin.
@@ -239,23 +239,15 @@
SNode(Object item) {
this.item = item;
}
- static final AtomicReferenceFieldUpdater<SNode, SNode>
- nextUpdater = AtomicReferenceFieldUpdater.newUpdater
- (SNode.class, SNode.class, "next");
-
boolean casNext(SNode cmp, SNode val) {
- return (cmp == next &&
- nextUpdater.compareAndSet(this, cmp, val));
+ return cmp == next &&
+ UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
- static final AtomicReferenceFieldUpdater<SNode, SNode>
- matchUpdater = AtomicReferenceFieldUpdater.newUpdater
- (SNode.class, SNode.class, "match");
-
/**
* Tries to match node s to this node, if so, waking up thread.
* Fulfillers call tryMatch to identify their waiters.
* Waiters block until they have been matched.
*
@@ -262,11 +254,11 @@
* @param s the node to match
* @return true if successfully matched to s
*/
boolean tryMatch(SNode s) {
if (match == null &&
- matchUpdater.compareAndSet(this, null, s)) {
+ UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
@@ -277,27 +269,32 @@
/**
* Tries to cancel a wait by matching node to itself.
*/
void tryCancel() {
- matchUpdater.compareAndSet(this, null, this);
+ UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long nextOffset =
+ objectFieldOffset(UNSAFE, "next", SNode.class);
+ private static final long matchOffset =
+ objectFieldOffset(UNSAFE, "match", SNode.class);
+
}
/** The head (top) of the stack */
volatile SNode head;
- static final AtomicReferenceFieldUpdater<TransferStack, SNode>
- headUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferStack.class, SNode.class, "head");
-
boolean casHead(SNode h, SNode nh) {
- return h == head && headUpdater.compareAndSet(this, h, nh);
+ return h == head &&
+ UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
/**
* Creates or resets fields of a node. Called only from transfer
* where the node to push on stack is lazily created and
@@ -336,11 +333,11 @@
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*/
SNode s = null; // constructed/reused as needed
- int mode = (e == null)? REQUEST : DATA;
+ int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
@@ -354,11 +351,11 @@
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
- return mode == REQUEST? m.item : s.item;
+ return (mode == REQUEST) ? m.item : s.item;
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
@@ -370,11 +367,11 @@
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
- return (mode == REQUEST)? m.item : s.item;
+ return (mode == REQUEST) ? m.item : s.item;
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
@@ -421,15 +418,15 @@
* done before giving up.) Except that calls from untimed
* SynchronousQueue.{poll/offer} don't check interrupts
* and don't wait at all, so are trapped in transfer
* method rather than calling awaitFulfill.
*/
- long lastTime = (timed)? System.nanoTime() : 0;
+ long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
SNode h = head;
- int spins = (shouldSpin(s)?
- (timed? maxTimedSpins : maxUntimedSpins) : 0);
+ int spins = (shouldSpin(s) ?
+ (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
if (m != null)
@@ -442,11 +439,11 @@
s.tryCancel();
continue;
}
}
if (spins > 0)
- spins = shouldSpin(s)? (spins-1) : 0;
+ spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
@@ -497,10 +494,16 @@
p.casNext(n, n.next);
else
p = n;
}
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long headOffset =
+ objectFieldOffset(UNSAFE, "head", TransferStack.class);
+
}
/** Dual Queue */
static final class TransferQueue extends Transferer {
/*
@@ -522,33 +525,25 @@
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
- static final AtomicReferenceFieldUpdater<QNode, QNode>
- nextUpdater = AtomicReferenceFieldUpdater.newUpdater
- (QNode.class, QNode.class, "next");
-
boolean casNext(QNode cmp, QNode val) {
- return (next == cmp &&
- nextUpdater.compareAndSet(this, cmp, val));
+ return next == cmp &&
+ UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
- static final AtomicReferenceFieldUpdater<QNode, Object>
- itemUpdater = AtomicReferenceFieldUpdater.newUpdater
- (QNode.class, Object.class, "item");
-
boolean casItem(Object cmp, Object val) {
- return (item == cmp &&
- itemUpdater.compareAndSet(this, cmp, val));
+ return item == cmp &&
+ UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* Tries to cancel by CAS'ing ref to this as item.
*/
void tryCancel(Object cmp) {
- itemUpdater.compareAndSet(this, cmp, this);
+ UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
boolean isCancelled() {
return item == this;
}
@@ -559,10 +554,17 @@
* an advanceHead operation.
*/
boolean isOffList() {
return next == this;
}
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long nextOffset =
+ objectFieldOffset(UNSAFE, "next", QNode.class);
+ private static final long itemOffset =
+ objectFieldOffset(UNSAFE, "item", QNode.class);
}
/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
@@ -578,45 +580,34 @@
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
- static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
- headUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferQueue.class, QNode.class, "head");
-
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
void advanceHead(QNode h, QNode nh) {
- if (h == head && headUpdater.compareAndSet(this, h, nh))
+ if (h == head &&
+ UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
- static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
- tailUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferQueue.class, QNode.class, "tail");
-
/**
* Tries to cas nt as new tail.
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
- tailUpdater.compareAndSet(this, t, nt);
+ UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
- static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
- cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater
- (TransferQueue.class, QNode.class, "cleanMe");
-
/**
* Tries to CAS cleanMe slot.
*/
boolean casCleanMe(QNode cmp, QNode val) {
- return (cleanMe == cmp &&
- cleanMeUpdater.compareAndSet(this, cmp, val));
+ return cleanMe == cmp &&
+ UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
/**
* Puts or takes an item.
*/
@@ -681,11 +672,11 @@
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
- return (x != null)? x : e;
+ return (x != null) ? x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
@@ -698,11 +689,11 @@
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
- return (x != null)? x : e;
+ return (x != null) ? x : e;
}
}
}
/**
@@ -714,14 +705,14 @@
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
- long lastTime = (timed)? System.nanoTime() : 0;
+ long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
- (timed? maxTimedSpins : maxUntimedSpins) : 0);
+ (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e)
@@ -797,10 +788,20 @@
return; // s is already saved node
} else if (casCleanMe(null, pred))
return; // Postpone cleaning s
}
}
+
+ // unsafe mechanics
+ private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+ private static final long headOffset =
+ objectFieldOffset(UNSAFE, "head", TransferQueue.class);
+ private static final long tailOffset =
+ objectFieldOffset(UNSAFE, "tail", TransferQueue.class);
+ private static final long cleanMeOffset =
+ objectFieldOffset(UNSAFE, "cleanMe", TransferQueue.class);
+
}
/**
* The transferer. Set only in constructor, but cannot be declared
* as final without further complicating serialization. Since
@@ -822,11 +823,11 @@
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
- transferer = (fair)? new TransferQueue() : new TransferStack();
+ transferer = fair ? new TransferQueue() : new TransferStack();
}
/**
* Adds the specified element to this queue, waiting if necessary for
* another thread to receive it.
@@ -1139,6 +1140,19 @@
transferer = new TransferQueue();
else
transferer = new TransferStack();
}
+ // Unsafe mechanics
+ static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+ String field, Class<?> klazz) {
+ try {
+ return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+ } catch (NoSuchFieldException e) {
+ // Convert Exception to corresponding Error
+ NoSuchFieldError error = new NoSuchFieldError(field);
+ error.initCause(e);
+ throw error;
+ }
+ }
+
}