< prev index next >

src/java.base/share/classes/java/lang/ref/ReferenceQueue.java

Print this page

        

@@ -44,55 +44,74 @@
         boolean enqueue(Reference<? extends S> r) {
             return false;
         }
     }
 
-    static ReferenceQueue<Object> NULL = new Null<>();
-    static ReferenceQueue<Object> ENQUEUED = new Null<>();
+    static final ReferenceQueue<Object> NULL = new Null<>();
+    static final ReferenceQueue<Object> ENQUEUED = new Null<>();
 
-    static private class Lock { };
-    private Lock lock = new Lock();
-    private volatile Reference<? extends T> head = null;
-    private long queueLength = 0;
+    static private class Lock { }
+    private final Lock lock = new Lock();
+    private volatile int waiters;
+
+    @SuppressWarnings("unused")
+    private volatile Reference<? extends T> head; // we assign using Unsafe CAS
 
     boolean enqueue(Reference<? extends T> r) { /* Called only by Reference class */
-        synchronized (lock) {
-            // Check that since getting the lock this reference hasn't already been
+        if (markEnqueued(r)) {
+            addChunk(r, r);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    boolean markEnqueued(Reference<?> r) {
+        ReferenceQueue<?> queue;
+        do {
+            // Check that this reference hasn't already been
             // enqueued (and even then removed)
-            ReferenceQueue<?> queue = r.queue;
+            queue = r.queue;
             if ((queue == NULL) || (queue == ENQUEUED)) {
                 return false;
             }
+        } while (!r.casQueue(queue, ENQUEUED));
             assert queue == this;
-            r.queue = ENQUEUED;
-            r.next = (head == null) ? r : head;
-            head = r;
-            queueLength++;
-            if (r instanceof FinalReference) {
-                sun.misc.VM.addFinalRefCount(1);
+        return true;
             }
+
+    @SuppressWarnings("unchecked")
+    void addChunk(Reference<?> chunkHead, Reference<?> chunkTail) {
+        Reference<? extends T> h;
+        do {
+            h = head;
+            chunkTail.next = (h == null) ? chunkTail : h;
+        } while (!casHead(h, (Reference) chunkHead));
+        // notify waiters
+        if (waiters > 0) {
+            synchronized (lock) {
+                if (waiters > 0) {
             lock.notifyAll();
-            return true;
+                }
+            }
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private Reference<? extends T> reallyPoll() {       /* Must hold lock */
-        Reference<? extends T> r = head;
-        if (r != null) {
-            head = (r.next == r) ?
-                null :
-                r.next; // Unchecked due to the next field having a raw type in Reference
-            r.queue = NULL;
+    private Reference<? extends T> reallyPoll() {
+        Reference<? extends T> r;
+        while ((r = head) != null) {
+            @SuppressWarnings("unchecked") // due to cast to raw type
+            Reference<? extends T> nh = (r.next == r)
+                ? null : (Reference) r.next;
+            if (casHead(r, nh)) {
+                r.lazySetQueue(NULL);
+                UNSAFE.storeFence();
             r.next = r;
-            queueLength--;
-            if (r instanceof FinalReference) {
-                sun.misc.VM.addFinalRefCount(-1);
+                break;
             }
-            return r;
         }
-        return null;
+        return r;
     }
 
     /**
      * Polls this queue to see if a reference object is available.  If one is
      * available without further delay then it is removed from the queue and

@@ -100,16 +119,12 @@
      *
      * @return  A reference object, if one was immediately available,
      *          otherwise <code>null</code>
      */
     public Reference<? extends T> poll() {
-        if (head == null)
-            return null;
-        synchronized (lock) {
             return reallyPoll();
         }
-    }
 
     /**
      * Removes the next reference object in this queue, blocking until either
      * one becomes available or the given timeout period expires.
      *

@@ -133,24 +148,36 @@
         throws IllegalArgumentException, InterruptedException
     {
         if (timeout < 0) {
             throw new IllegalArgumentException("Negative timeout value");
         }
-        synchronized (lock) {
             Reference<? extends T> r = reallyPoll();
             if (r != null) return r;
-            long start = (timeout == 0) ? 0 : System.nanoTime();
-            for (;;) {
-                lock.wait(timeout);
-                r = reallyPoll();
+        return reallyRemove(timeout);
+    }
+
+    private Reference<? extends T> reallyRemove(long timeout) throws InterruptedException {
+        long deadline = (timeout == 0)
+                ? 0 : System.nanoTime() + timeout * 1000_000L;
+
+        synchronized (lock) {
+            int w = waiters;
+            waiters = w + 1;
+            try {
+                for (; ; ) {
+                    Reference<? extends T> r = reallyPoll();
                 if (r != null) return r;
-                if (timeout != 0) {
-                    long end = System.nanoTime();
-                    timeout -= (end - start) / 1000_000;
-                    if (timeout <= 0) return null;
-                    start = end;
+                    if (timeout == 0) {
+                        lock.wait(0);
+                    } else {
+                        long timeoutNanos = deadline - System.nanoTime();
+                        if (timeoutNanos <= 0) return null;
+                        lock.wait(timeoutNanos / 1000_000L, (int) (timeoutNanos % 1000_000L));
+                    }
                 }
+            } finally {
+                waiters = w;
             }
         }
     }
 
     /**

@@ -162,6 +189,24 @@
      */
     public Reference<? extends T> remove() throws InterruptedException {
         return remove(0);
     }
 
+    // Unsafe machinery
+
+    private boolean casHead(Reference<?> cmp, Reference<? extends T> val) {
+        return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
+    }
+
+    private static final sun.misc.Unsafe UNSAFE;
+    private static final long headOffset;
+
+    static {
+        try {
+            UNSAFE = sun.misc.Unsafe.getUnsafe();
+            Class<ReferenceQueue> rqc = ReferenceQueue.class;
+            headOffset = UNSAFE.objectFieldOffset(rqc.getDeclaredField("head"));
+        } catch (Exception e) {
+            throw new Error(e);
+        }
+    }
 }
< prev index next >