< prev index next >

src/java.base/share/classes/java/lang/ref/ReferenceQueue.java

Print this page

        

*** 44,98 **** boolean enqueue(Reference<? extends S> r) { return false; } } ! static ReferenceQueue<Object> NULL = new Null<>(); ! static ReferenceQueue<Object> ENQUEUED = new Null<>(); ! static private class Lock { }; ! private Lock lock = new Lock(); ! private volatile Reference<? extends T> head = null; ! private long queueLength = 0; boolean enqueue(Reference<? extends T> r) { /* Called only by Reference class */ ! synchronized (lock) { ! // Check that since getting the lock this reference hasn't already been // enqueued (and even then removed) ! ReferenceQueue<?> queue = r.queue; if ((queue == NULL) || (queue == ENQUEUED)) { return false; } assert queue == this; ! r.queue = ENQUEUED; ! r.next = (head == null) ? r : head; ! head = r; ! queueLength++; ! if (r instanceof FinalReference) { ! sun.misc.VM.addFinalRefCount(1); } lock.notifyAll(); ! return true; } } ! @SuppressWarnings("unchecked") ! private Reference<? extends T> reallyPoll() { /* Must hold lock */ ! Reference<? extends T> r = head; ! if (r != null) { ! head = (r.next == r) ? ! null : ! r.next; // Unchecked due to the next field having a raw type in Reference ! r.queue = NULL; r.next = r; ! queueLength--; ! if (r instanceof FinalReference) { ! sun.misc.VM.addFinalRefCount(-1); } - return r; } ! return null; } /** * Polls this queue to see if a reference object is available. If one is * available without further delay then it is removed from the queue and --- 44,117 ---- boolean enqueue(Reference<? extends S> r) { return false; } } ! static final ReferenceQueue<Object> NULL = new Null<>(); ! static final ReferenceQueue<Object> ENQUEUED = new Null<>(); ! static private class Lock { } ! private final Lock lock = new Lock(); ! private volatile int waiters; ! ! @SuppressWarnings("unused") ! private volatile Reference<? extends T> head; // we assign using Unsafe CAS boolean enqueue(Reference<? extends T> r) { /* Called only by Reference class */ ! if (markEnqueued(r)) { // markEnqueued is atomic ! addChunk(r, r); ! return true; ! } else { ! return false; ! } ! } ! ! boolean markEnqueued(Reference<?> r) { ! ReferenceQueue<?> queue; ! do { ! // Check that this reference hasn't already been // enqueued (and even then removed) ! queue = r.queue; if ((queue == NULL) || (queue == ENQUEUED)) { return false; } + } while (!r.casQueue(queue, ENQUEUED)); assert queue == this; ! return true; } + + @SuppressWarnings("unchecked") + void addChunk(Reference<?> chunkHead, Reference<?> chunkTail) { + Reference<? extends T> h; + do { + h = head; + chunkTail.next = (h == null) ? chunkTail : h; + } while (!casHead(h, (Reference) chunkHead)); + // notify waiters + if (waiters > 0) { + synchronized (lock) { + if (waiters > 0) { lock.notifyAll(); ! } ! } } } ! private Reference<? extends T> reallyPoll() { ! Reference<? extends T> r; ! while ((r = head) != null) { ! @SuppressWarnings("unchecked") // due to cast to raw type ! Reference<? extends T> nh = (r.next == r) ! ? null : (Reference) r.next; ! if (casHead(r, nh)) { ! r.lazySetQueue(NULL); ! UNSAFE.storeFence(); r.next = r; ! break; } } ! return r; } /** * Polls this queue to see if a reference object is available. If one is * available without further delay then it is removed from the queue and
*** 100,115 **** * * @return A reference object, if one was immediately available, * otherwise <code>null</code> */ public Reference<? extends T> poll() { - if (head == null) - return null; - synchronized (lock) { return reallyPoll(); } - } /** * Removes the next reference object in this queue, blocking until either * one becomes available or the given timeout period expires. * --- 119,130 ----
*** 133,156 **** throws IllegalArgumentException, InterruptedException { if (timeout < 0) { throw new IllegalArgumentException("Negative timeout value"); } - synchronized (lock) { Reference<? extends T> r = reallyPoll(); if (r != null) return r; ! long start = (timeout == 0) ? 0 : System.nanoTime(); ! for (;;) { ! lock.wait(timeout); ! r = reallyPoll(); if (r != null) return r; ! if (timeout != 0) { ! long end = System.nanoTime(); ! timeout -= (end - start) / 1000_000; ! if (timeout <= 0) return null; ! start = end; } } } } /** --- 148,182 ---- throws IllegalArgumentException, InterruptedException { if (timeout < 0) { throw new IllegalArgumentException("Negative timeout value"); } Reference<? extends T> r = reallyPoll(); if (r != null) return r; ! return reallyRemove(timeout); ! } ! ! private Reference<? extends T> reallyRemove(long timeout) throws InterruptedException { ! long deadline = (timeout == 0) ! ? 0 : System.nanoTime() + timeout * 1000_000L; ! ! synchronized (lock) { ! ++waiters; ! try { ! for (; ; ) { ! Reference<? extends T> r = reallyPoll(); if (r != null) return r; ! if (timeout == 0) { ! lock.wait(0); ! } else { ! long timeoutNanos = deadline - System.nanoTime(); ! if (timeoutNanos <= 0) return null; ! lock.wait(timeoutNanos / 1000_000L, (int) (timeoutNanos % 1000_000L)); ! } } + } finally { + --waiters; } } } /**
*** 162,167 **** --- 188,211 ---- */ public Reference<? extends T> remove() throws InterruptedException { return remove(0); } + // Unsafe machinery + + private boolean casHead(Reference<?> cmp, Reference<? extends T> val) { + return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); + } + + private static final sun.misc.Unsafe UNSAFE; + private static final long headOffset; + + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class<ReferenceQueue> rqc = ReferenceQueue.class; + headOffset = UNSAFE.objectFieldOffset(rqc.getDeclaredField("head")); + } catch (Exception e) { + throw new Error(e); + } + } }
< prev index next >