src/share/classes/java/util/concurrent/SynchronousQueue.java

Print this page

        

*** 161,171 **** */ /** * Shared internal API for dual stacks and queues. */ ! static abstract class Transferer { /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item --- 161,171 ---- */ /** * Shared internal API for dual stacks and queues. */ ! abstract static class Transferer { /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item
*** 188,198 **** * The value is empirically derived -- it works well across a * variety of processors and OSes. Empirically, the best value * seems not to vary with number of CPUs (beyond 2) so is just * a constant. */ ! static final int maxTimedSpins = (NCPUS < 2)? 0 : 32; /** * The number of times to spin before blocking in untimed waits. * This is greater than timed value because untimed waits spin * faster since they don't need to check times on each spin. --- 188,198 ---- * The value is empirically derived -- it works well across a * variety of processors and OSes. Empirically, the best value * seems not to vary with number of CPUs (beyond 2) so is just * a constant. */ ! static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; /** * The number of times to spin before blocking in untimed waits. * This is greater than timed value because untimed waits spin * faster since they don't need to check times on each spin.
*** 239,261 **** SNode(Object item) { this.item = item; } - static final AtomicReferenceFieldUpdater<SNode, SNode> - nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (SNode.class, SNode.class, "next"); - boolean casNext(SNode cmp, SNode val) { ! return (cmp == next && ! nextUpdater.compareAndSet(this, cmp, val)); } - static final AtomicReferenceFieldUpdater<SNode, SNode> - matchUpdater = AtomicReferenceFieldUpdater.newUpdater - (SNode.class, SNode.class, "match"); - /** * Tries to match node s to this node, if so, waking up thread. * Fulfillers call tryMatch to identify their waiters. * Waiters block until they have been matched. * --- 239,253 ---- SNode(Object item) { this.item = item; } boolean casNext(SNode cmp, SNode val) { ! return cmp == next && ! UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } /** * Tries to match node s to this node, if so, waking up thread. * Fulfillers call tryMatch to identify their waiters. * Waiters block until they have been matched. *
*** 262,272 **** * @param s the node to match * @return true if successfully matched to s */ boolean tryMatch(SNode s) { if (match == null && ! matchUpdater.compareAndSet(this, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } --- 254,264 ---- * @param s the node to match * @return true if successfully matched to s */ boolean tryMatch(SNode s) { if (match == null && ! UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); }
*** 277,303 **** /** * Tries to cancel a wait by matching node to itself. */ void tryCancel() { ! matchUpdater.compareAndSet(this, null, this); } boolean isCancelled() { return match == this; } } /** The head (top) of the stack */ volatile SNode head; - static final AtomicReferenceFieldUpdater<TransferStack, SNode> - headUpdater = AtomicReferenceFieldUpdater.newUpdater - (TransferStack.class, SNode.class, "head"); - boolean casHead(SNode h, SNode nh) { ! return h == head && headUpdater.compareAndSet(this, h, nh); } /** * Creates or resets fields of a node. Called only from transfer * where the node to push on stack is lazily created and --- 269,300 ---- /** * Tries to cancel a wait by matching node to itself. */ void tryCancel() { ! UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } boolean isCancelled() { return match == this; } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); + private static final long nextOffset = + objectFieldOffset(UNSAFE, "next", SNode.class); + private static final long matchOffset = + objectFieldOffset(UNSAFE, "match", SNode.class); + } /** The head (top) of the stack */ volatile SNode head; boolean casHead(SNode h, SNode nh) { ! return h == head && ! UNSAFE.compareAndSwapObject(this, headOffset, h, nh); } /** * Creates or resets fields of a node. Called only from transfer * where the node to push on stack is lazily created and
*** 336,346 **** * is essentially the same as for fulfilling, except * that it doesn't return the item. */ SNode s = null; // constructed/reused as needed ! int mode = (e == null)? REQUEST : DATA; for (;;) { SNode h = head; if (h == null || h.mode == mode) { // empty or same-mode if (timed && nanos <= 0) { // can't wait --- 333,343 ---- * is essentially the same as for fulfilling, except * that it doesn't return the item. */ SNode s = null; // constructed/reused as needed ! int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; if (h == null || h.mode == mode) { // empty or same-mode if (timed && nanos <= 0) { // can't wait
*** 354,364 **** clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller ! return mode == REQUEST? m.item : s.item; } } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { --- 351,361 ---- clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller ! return (mode == REQUEST) ? m.item : s.item; } } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
*** 370,380 **** break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m ! return (mode == REQUEST)? m.item : s.item; } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller --- 367,377 ---- break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m ! return (mode == REQUEST) ? m.item : s.item; } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller
*** 421,435 **** * done before giving up.) Except that calls from untimed * SynchronousQueue.{poll/offer} don't check interrupts * and don't wait at all, so are trapped in transfer * method rather than calling awaitFulfill. */ ! long lastTime = (timed)? System.nanoTime() : 0; Thread w = Thread.currentThread(); SNode h = head; ! int spins = (shouldSpin(s)? ! (timed? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); SNode m = s.match; if (m != null) --- 418,432 ---- * done before giving up.) Except that calls from untimed * SynchronousQueue.{poll/offer} don't check interrupts * and don't wait at all, so are trapped in transfer * method rather than calling awaitFulfill. */ ! long lastTime = timed ? System.nanoTime() : 0; Thread w = Thread.currentThread(); SNode h = head; ! int spins = (shouldSpin(s) ? ! (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); SNode m = s.match; if (m != null)
*** 442,452 **** s.tryCancel(); continue; } } if (spins > 0) ! spins = shouldSpin(s)? (spins-1) : 0; else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) --- 439,449 ---- s.tryCancel(); continue; } } if (spins > 0) ! spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold)
*** 497,506 **** --- 494,509 ---- p.casNext(n, n.next); else p = n; } } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); + private static final long headOffset = + objectFieldOffset(UNSAFE, "head", TransferStack.class); + } /** Dual Queue */ static final class TransferQueue extends Transferer { /*
*** 522,554 **** QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } - static final AtomicReferenceFieldUpdater<QNode, QNode> - nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (QNode.class, QNode.class, "next"); - boolean casNext(QNode cmp, QNode val) { ! return (next == cmp && ! nextUpdater.compareAndSet(this, cmp, val)); } - static final AtomicReferenceFieldUpdater<QNode, Object> - itemUpdater = AtomicReferenceFieldUpdater.newUpdater - (QNode.class, Object.class, "item"); - boolean casItem(Object cmp, Object val) { ! return (item == cmp && ! itemUpdater.compareAndSet(this, cmp, val)); } /** * Tries to cancel by CAS'ing ref to this as item. */ void tryCancel(Object cmp) { ! itemUpdater.compareAndSet(this, cmp, this); } boolean isCancelled() { return item == this; } --- 525,549 ---- QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } boolean casNext(QNode cmp, QNode val) { ! return next == cmp && ! UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } boolean casItem(Object cmp, Object val) { ! return item == cmp && ! UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /** * Tries to cancel by CAS'ing ref to this as item. */ void tryCancel(Object cmp) { ! UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } boolean isCancelled() { return item == this; }
*** 559,568 **** --- 554,570 ---- * an advanceHead operation. */ boolean isOffList() { return next == this; } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); + private static final long nextOffset = + objectFieldOffset(UNSAFE, "next", QNode.class); + private static final long itemOffset = + objectFieldOffset(UNSAFE, "item", QNode.class); } /** Head of queue */ transient volatile QNode head; /** Tail of queue */
*** 578,622 **** QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; } - static final AtomicReferenceFieldUpdater<TransferQueue, QNode> - headUpdater = AtomicReferenceFieldUpdater.newUpdater - (TransferQueue.class, QNode.class, "head"); - /** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ void advanceHead(QNode h, QNode nh) { ! if (h == head && headUpdater.compareAndSet(this, h, nh)) h.next = h; // forget old next } - static final AtomicReferenceFieldUpdater<TransferQueue, QNode> - tailUpdater = AtomicReferenceFieldUpdater.newUpdater - (TransferQueue.class, QNode.class, "tail"); - /** * Tries to cas nt as new tail. */ void advanceTail(QNode t, QNode nt) { if (tail == t) ! tailUpdater.compareAndSet(this, t, nt); } - static final AtomicReferenceFieldUpdater<TransferQueue, QNode> - cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater - (TransferQueue.class, QNode.class, "cleanMe"); - /** * Tries to CAS cleanMe slot. */ boolean casCleanMe(QNode cmp, QNode val) { ! return (cleanMe == cmp && ! cleanMeUpdater.compareAndSet(this, cmp, val)); } /** * Puts or takes an item. */ --- 580,613 ---- QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; } /** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ void advanceHead(QNode h, QNode nh) { ! if (h == head && ! UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next } /** * Tries to cas nt as new tail. */ void advanceTail(QNode t, QNode nt) { if (tail == t) ! UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); } /** * Tries to CAS cleanMe slot. */ boolean casCleanMe(QNode cmp, QNode val) { ! return cleanMe == cmp && ! UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val); } /** * Puts or takes an item. */
*** 681,691 **** advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } ! return (x != null)? x : e; } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read --- 672,682 ---- advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } ! return (x != null) ? x : e; } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read
*** 698,708 **** continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); ! return (x != null)? x : e; } } } /** --- 689,699 ---- continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); ! return (x != null) ? x : e; } } } /**
*** 714,727 **** * @param nanos timeout value * @return matched item, or s if cancelled */ Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ ! long lastTime = (timed)? System.nanoTime() : 0; Thread w = Thread.currentThread(); int spins = ((head.next == s) ? ! (timed? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; if (x != e) --- 705,718 ---- * @param nanos timeout value * @return matched item, or s if cancelled */ Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ ! long lastTime = timed ? System.nanoTime() : 0; Thread w = Thread.currentThread(); int spins = ((head.next == s) ? ! (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; if (x != e)
*** 797,806 **** --- 788,807 ---- return; // s is already saved node } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } } + + // unsafe mechanics + private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); + private static final long headOffset = + objectFieldOffset(UNSAFE, "head", TransferQueue.class); + private static final long tailOffset = + objectFieldOffset(UNSAFE, "tail", TransferQueue.class); + private static final long cleanMeOffset = + objectFieldOffset(UNSAFE, "cleanMe", TransferQueue.class); + } /** * The transferer. Set only in constructor, but cannot be declared * as final without further complicating serialization. Since
*** 822,832 **** * * @param fair if true, waiting threads contend in FIFO order for * access; otherwise the order is unspecified. */ public SynchronousQueue(boolean fair) { ! transferer = (fair)? new TransferQueue() : new TransferStack(); } /** * Adds the specified element to this queue, waiting if necessary for * another thread to receive it. --- 823,833 ---- * * @param fair if true, waiting threads contend in FIFO order for * access; otherwise the order is unspecified. */ public SynchronousQueue(boolean fair) { ! transferer = fair ? new TransferQueue() : new TransferStack(); } /** * Adds the specified element to this queue, waiting if necessary for * another thread to receive it.
*** 1139,1144 **** --- 1140,1158 ---- transferer = new TransferQueue(); else transferer = new TransferStack(); } + // Unsafe mechanics + static long objectFieldOffset(sun.misc.Unsafe UNSAFE, + String field, Class<?> klazz) { + try { + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; + } + } + }