src/share/classes/java/util/concurrent/SynchronousQueue.java

Print this page

        

@@ -161,11 +161,11 @@
      */
 
     /**
      * Shared internal API for dual stacks and queues.
      */
-    static abstract class Transferer {
+    abstract static class Transferer {
         /**
          * Performs a put or take.
          *
          * @param e if non-null, the item to be handed to a consumer;
          *          if null, requests that transfer return an item

@@ -188,11 +188,11 @@
      * The value is empirically derived -- it works well across a
      * variety of processors and OSes. Empirically, the best value
      * seems not to vary with number of CPUs (beyond 2) so is just
      * a constant.
      */
-    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
+    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
 
     /**
      * The number of times to spin before blocking in untimed waits.
      * This is greater than timed value because untimed waits spin
      * faster since they don't need to check times on each spin.

@@ -239,23 +239,15 @@
 
             SNode(Object item) {
                 this.item = item;
             }
 
-            static final AtomicReferenceFieldUpdater<SNode, SNode>
-                nextUpdater = AtomicReferenceFieldUpdater.newUpdater
-                (SNode.class, SNode.class, "next");
-
             boolean casNext(SNode cmp, SNode val) {
-                return (cmp == next &&
-                        nextUpdater.compareAndSet(this, cmp, val));
+                return cmp == next &&
+                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
             }
 
-            static final AtomicReferenceFieldUpdater<SNode, SNode>
-                matchUpdater = AtomicReferenceFieldUpdater.newUpdater
-                (SNode.class, SNode.class, "match");
-
             /**
              * Tries to match node s to this node, if so, waking up thread.
              * Fulfillers call tryMatch to identify their waiters.
              * Waiters block until they have been matched.
              *

@@ -262,11 +254,11 @@
              * @param s the node to match
              * @return true if successfully matched to s
              */
             boolean tryMatch(SNode s) {
                 if (match == null &&
-                    matchUpdater.compareAndSet(this, null, s)) {
+                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                     Thread w = waiter;
                     if (w != null) {    // waiters need at most one unpark
                         waiter = null;
                         LockSupport.unpark(w);
                     }

@@ -277,27 +269,32 @@
 
             /**
              * Tries to cancel a wait by matching node to itself.
              */
             void tryCancel() {
-                matchUpdater.compareAndSet(this, null, this);
+                UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
             }
 
             boolean isCancelled() {
                 return match == this;
             }
+
+            // Unsafe mechanics
+            private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+            private static final long nextOffset =
+                objectFieldOffset(UNSAFE, "next", SNode.class);
+            private static final long matchOffset =
+                objectFieldOffset(UNSAFE, "match", SNode.class);
+
         }
 
         /** The head (top) of the stack */
         volatile SNode head;
 
-        static final AtomicReferenceFieldUpdater<TransferStack, SNode>
-            headUpdater = AtomicReferenceFieldUpdater.newUpdater
-            (TransferStack.class,  SNode.class, "head");
-
         boolean casHead(SNode h, SNode nh) {
-            return h == head && headUpdater.compareAndSet(this, h, nh);
+            return h == head &&
+                UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
         }
 
         /**
          * Creates or resets fields of a node. Called only from transfer
          * where the node to push on stack is lazily created and

@@ -336,11 +333,11 @@
              *    is essentially the same as for fulfilling, except
              *    that it doesn't return the item.
              */
 
             SNode s = null; // constructed/reused as needed
-            int mode = (e == null)? REQUEST : DATA;
+            int mode = (e == null) ? REQUEST : DATA;
 
             for (;;) {
                 SNode h = head;
                 if (h == null || h.mode == mode) {  // empty or same-mode
                     if (timed && nanos <= 0) {      // can't wait

@@ -354,11 +351,11 @@
                             clean(s);
                             return null;
                         }
                         if ((h = head) != null && h.next == s)
                             casHead(h, s.next);     // help s's fulfiller
-                        return mode == REQUEST? m.item : s.item;
+                        return (mode == REQUEST) ? m.item : s.item;
                     }
                 } else if (!isFulfilling(h.mode)) { // try to fulfill
                     if (h.isCancelled())            // already cancelled
                         casHead(h, h.next);         // pop and retry
                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {

@@ -370,11 +367,11 @@
                                 break;              // restart main loop
                             }
                             SNode mn = m.next;
                             if (m.tryMatch(s)) {
                                 casHead(s, mn);     // pop both s and m
-                                return (mode == REQUEST)? m.item : s.item;
+                                return (mode == REQUEST) ? m.item : s.item;
                             } else                  // lost match
                                 s.casNext(m, mn);   // help unlink
                         }
                     }
                 } else {                            // help a fulfiller

@@ -421,15 +418,15 @@
              * done before giving up.) Except that calls from untimed
              * SynchronousQueue.{poll/offer} don't check interrupts
              * and don't wait at all, so are trapped in transfer
              * method rather than calling awaitFulfill.
              */
-            long lastTime = (timed)? System.nanoTime() : 0;
+            long lastTime = timed ? System.nanoTime() : 0;
             Thread w = Thread.currentThread();
             SNode h = head;
-            int spins = (shouldSpin(s)?
-                         (timed? maxTimedSpins : maxUntimedSpins) : 0);
+            int spins = (shouldSpin(s) ?
+                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
             for (;;) {
                 if (w.isInterrupted())
                     s.tryCancel();
                 SNode m = s.match;
                 if (m != null)

@@ -442,11 +439,11 @@
                         s.tryCancel();
                         continue;
                     }
                 }
                 if (spins > 0)
-                    spins = shouldSpin(s)? (spins-1) : 0;
+                    spins = shouldSpin(s) ? (spins-1) : 0;
                 else if (s.waiter == null)
                     s.waiter = w; // establish waiter so can park next iter
                 else if (!timed)
                     LockSupport.park(this);
                 else if (nanos > spinForTimeoutThreshold)

@@ -497,10 +494,16 @@
                     p.casNext(n, n.next);
                 else
                     p = n;
             }
         }
+
+        // Unsafe mechanics
+        private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+        private static final long headOffset =
+            objectFieldOffset(UNSAFE, "head", TransferStack.class);
+
     }
 
     /** Dual Queue */
     static final class TransferQueue extends Transferer {
         /*

@@ -522,33 +525,25 @@
             QNode(Object item, boolean isData) {
                 this.item = item;
                 this.isData = isData;
             }
 
-            static final AtomicReferenceFieldUpdater<QNode, QNode>
-                nextUpdater = AtomicReferenceFieldUpdater.newUpdater
-                (QNode.class, QNode.class, "next");
-
             boolean casNext(QNode cmp, QNode val) {
-                return (next == cmp &&
-                        nextUpdater.compareAndSet(this, cmp, val));
+                return next == cmp &&
+                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
             }
 
-            static final AtomicReferenceFieldUpdater<QNode, Object>
-                itemUpdater = AtomicReferenceFieldUpdater.newUpdater
-                (QNode.class, Object.class, "item");
-
             boolean casItem(Object cmp, Object val) {
-                return (item == cmp &&
-                        itemUpdater.compareAndSet(this, cmp, val));
+                return item == cmp &&
+                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
             }
 
             /**
              * Tries to cancel by CAS'ing ref to this as item.
              */
             void tryCancel(Object cmp) {
-                itemUpdater.compareAndSet(this, cmp, this);
+                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
             }
 
             boolean isCancelled() {
                 return item == this;
             }

@@ -559,10 +554,17 @@
              * an advanceHead operation.
              */
             boolean isOffList() {
                 return next == this;
             }
+
+            // Unsafe mechanics
+            private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+            private static final long nextOffset =
+                objectFieldOffset(UNSAFE, "next", QNode.class);
+            private static final long itemOffset =
+                objectFieldOffset(UNSAFE, "item", QNode.class);
         }
 
         /** Head of queue */
         transient volatile QNode head;
         /** Tail of queue */

@@ -578,45 +580,34 @@
             QNode h = new QNode(null, false); // initialize to dummy node.
             head = h;
             tail = h;
         }
 
-        static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
-            headUpdater = AtomicReferenceFieldUpdater.newUpdater
-            (TransferQueue.class,  QNode.class, "head");
-
         /**
          * Tries to cas nh as new head; if successful, unlink
          * old head's next node to avoid garbage retention.
          */
         void advanceHead(QNode h, QNode nh) {
-            if (h == head && headUpdater.compareAndSet(this, h, nh))
+            if (h == head &&
+                UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
                 h.next = h; // forget old next
         }
 
-        static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
-            tailUpdater = AtomicReferenceFieldUpdater.newUpdater
-            (TransferQueue.class, QNode.class, "tail");
-
         /**
          * Tries to cas nt as new tail.
          */
         void advanceTail(QNode t, QNode nt) {
             if (tail == t)
-                tailUpdater.compareAndSet(this, t, nt);
+                UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
         }
 
-        static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
-            cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater
-            (TransferQueue.class, QNode.class, "cleanMe");
-
         /**
          * Tries to CAS cleanMe slot.
          */
         boolean casCleanMe(QNode cmp, QNode val) {
-            return (cleanMe == cmp &&
-                    cleanMeUpdater.compareAndSet(this, cmp, val));
+            return cleanMe == cmp &&
+                UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
         }
 
         /**
          * Puts or takes an item.
          */

@@ -681,11 +672,11 @@
                         advanceHead(t, s);          // unlink if head
                         if (x != null)              // and forget fields
                             s.item = s;
                         s.waiter = null;
                     }
-                    return (x != null)? x : e;
+                    return (x != null) ? x : e;
 
                 } else {                            // complementary-mode
                     QNode m = h.next;               // node to fulfill
                     if (t != tail || m == null || h != head)
                         continue;                   // inconsistent read

@@ -698,11 +689,11 @@
                         continue;
                     }
 
                     advanceHead(h, m);              // successfully fulfilled
                     LockSupport.unpark(m.waiter);
-                    return (x != null)? x : e;
+                    return (x != null) ? x : e;
                 }
             }
         }
 
         /**

@@ -714,14 +705,14 @@
          * @param nanos timeout value
          * @return matched item, or s if cancelled
          */
         Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
             /* Same idea as TransferStack.awaitFulfill */
-            long lastTime = (timed)? System.nanoTime() : 0;
+            long lastTime = timed ? System.nanoTime() : 0;
             Thread w = Thread.currentThread();
             int spins = ((head.next == s) ?
-                         (timed? maxTimedSpins : maxUntimedSpins) : 0);
+                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
             for (;;) {
                 if (w.isInterrupted())
                     s.tryCancel(e);
                 Object x = s.item;
                 if (x != e)

@@ -797,10 +788,20 @@
                         return;      // s is already saved node
                 } else if (casCleanMe(null, pred))
                     return;          // Postpone cleaning s
             }
         }
+
+        // unsafe mechanics
+        private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
+        private static final long headOffset =
+            objectFieldOffset(UNSAFE, "head", TransferQueue.class);
+        private static final long tailOffset =
+            objectFieldOffset(UNSAFE, "tail", TransferQueue.class);
+        private static final long cleanMeOffset =
+            objectFieldOffset(UNSAFE, "cleanMe", TransferQueue.class);
+
     }
 
     /**
      * The transferer. Set only in constructor, but cannot be declared
      * as final without further complicating serialization.  Since

@@ -822,11 +823,11 @@
      *
      * @param fair if true, waiting threads contend in FIFO order for
      *        access; otherwise the order is unspecified.
      */
     public SynchronousQueue(boolean fair) {
-        transferer = (fair)? new TransferQueue() : new TransferStack();
+        transferer = fair ? new TransferQueue() : new TransferStack();
     }
 
     /**
      * Adds the specified element to this queue, waiting if necessary for
      * another thread to receive it.

@@ -1139,6 +1140,19 @@
             transferer = new TransferQueue();
         else
             transferer = new TransferStack();
     }
 
+    // Unsafe mechanics
+    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
+                                  String field, Class<?> klazz) {
+        try {
+            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
+        } catch (NoSuchFieldException e) {
+            // Convert Exception to corresponding Error
+            NoSuchFieldError error = new NoSuchFieldError(field);
+            error.initCause(e);
+            throw error;
+        }
+    }
+
 }