< prev index next >
src/java.base/share/classes/java/lang/ref/ReferenceQueue.java
Print this page
*** 44,98 ****
boolean enqueue(Reference<? extends S> r) {
return false;
}
}
! static ReferenceQueue<Object> NULL = new Null<>();
! static ReferenceQueue<Object> ENQUEUED = new Null<>();
! static private class Lock { };
! private Lock lock = new Lock();
! private volatile Reference<? extends T> head = null;
! private long queueLength = 0;
boolean enqueue(Reference<? extends T> r) { /* Called only by Reference class */
! synchronized (lock) {
! // Check that since getting the lock this reference hasn't already been
// enqueued (and even then removed)
! ReferenceQueue<?> queue = r.queue;
if ((queue == NULL) || (queue == ENQUEUED)) {
return false;
}
assert queue == this;
! r.queue = ENQUEUED;
! r.next = (head == null) ? r : head;
! head = r;
! queueLength++;
! if (r instanceof FinalReference) {
! sun.misc.VM.addFinalRefCount(1);
}
lock.notifyAll();
! return true;
}
}
! @SuppressWarnings("unchecked")
! private Reference<? extends T> reallyPoll() { /* Must hold lock */
! Reference<? extends T> r = head;
! if (r != null) {
! head = (r.next == r) ?
! null :
! r.next; // Unchecked due to the next field having a raw type in Reference
! r.queue = NULL;
r.next = r;
! queueLength--;
! if (r instanceof FinalReference) {
! sun.misc.VM.addFinalRefCount(-1);
}
- return r;
}
! return null;
}
/**
* Polls this queue to see if a reference object is available. If one is
* available without further delay then it is removed from the queue and
--- 44,117 ----
boolean enqueue(Reference<? extends S> r) {
return false;
}
}
! static final ReferenceQueue<Object> NULL = new Null<>();
! static final ReferenceQueue<Object> ENQUEUED = new Null<>();
! static private class Lock { }
! private final Lock lock = new Lock();
! private volatile int waiters;
!
! @SuppressWarnings("unused")
! private volatile Reference<? extends T> head; // we assign using Unsafe CAS
boolean enqueue(Reference<? extends T> r) { /* Called only by Reference class */
! if (markEnqueued(r)) { // markEnqueued is atomic
! addChunk(r, r);
! return true;
! } else {
! return false;
! }
! }
!
! boolean markEnqueued(Reference<?> r) {
! ReferenceQueue<?> queue;
! do {
! // Check that this reference hasn't already been
// enqueued (and even then removed)
! queue = r.queue;
if ((queue == NULL) || (queue == ENQUEUED)) {
return false;
}
+ } while (!r.casQueue(queue, ENQUEUED));
assert queue == this;
! return true;
}
+
+ @SuppressWarnings("unchecked")
+ void addChunk(Reference<?> chunkHead, Reference<?> chunkTail) {
+ Reference<? extends T> h;
+ do {
+ h = head;
+ chunkTail.next = (h == null) ? chunkTail : h;
+ } while (!casHead(h, (Reference) chunkHead));
+ // notify waiters
+ if (waiters > 0) {
+ synchronized (lock) {
+ if (waiters > 0) {
lock.notifyAll();
! }
! }
}
}
! private Reference<? extends T> reallyPoll() {
! Reference<? extends T> r;
! while ((r = head) != null) {
! @SuppressWarnings("unchecked") // due to cast to raw type
! Reference<? extends T> nh = (r.next == r)
! ? null : (Reference) r.next;
! if (casHead(r, nh)) {
! r.lazySetQueue(NULL);
! UNSAFE.storeFence();
r.next = r;
! break;
}
}
! return r;
}
/**
* Polls this queue to see if a reference object is available. If one is
* available without further delay then it is removed from the queue and
*** 100,115 ****
*
* @return A reference object, if one was immediately available,
* otherwise <code>null</code>
*/
public Reference<? extends T> poll() {
- if (head == null)
- return null;
- synchronized (lock) {
return reallyPoll();
}
- }
/**
* Removes the next reference object in this queue, blocking until either
* one becomes available or the given timeout period expires.
*
--- 119,130 ----
*** 133,156 ****
throws IllegalArgumentException, InterruptedException
{
if (timeout < 0) {
throw new IllegalArgumentException("Negative timeout value");
}
- synchronized (lock) {
Reference<? extends T> r = reallyPoll();
if (r != null) return r;
! long start = (timeout == 0) ? 0 : System.nanoTime();
! for (;;) {
! lock.wait(timeout);
! r = reallyPoll();
if (r != null) return r;
! if (timeout != 0) {
! long end = System.nanoTime();
! timeout -= (end - start) / 1000_000;
! if (timeout <= 0) return null;
! start = end;
}
}
}
}
/**
--- 148,182 ----
throws IllegalArgumentException, InterruptedException
{
if (timeout < 0) {
throw new IllegalArgumentException("Negative timeout value");
}
Reference<? extends T> r = reallyPoll();
if (r != null) return r;
! return reallyRemove(timeout);
! }
!
! private Reference<? extends T> reallyRemove(long timeout) throws InterruptedException {
! long deadline = (timeout == 0)
! ? 0 : System.nanoTime() + timeout * 1000_000L;
!
! synchronized (lock) {
! ++waiters;
! try {
! for (; ; ) {
! Reference<? extends T> r = reallyPoll();
if (r != null) return r;
! if (timeout == 0) {
! lock.wait(0);
! } else {
! long timeoutNanos = deadline - System.nanoTime();
! if (timeoutNanos <= 0) return null;
! lock.wait(timeoutNanos / 1000_000L, (int) (timeoutNanos % 1000_000L));
! }
}
+ } finally {
+ --waiters;
}
}
}
/**
*** 162,167 ****
--- 188,211 ----
*/
public Reference<? extends T> remove() throws InterruptedException {
return remove(0);
}
+ // Unsafe machinery
+
+ private boolean casHead(Reference<?> cmp, Reference<? extends T> val) {
+ return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
+ }
+
+ private static final sun.misc.Unsafe UNSAFE;
+ private static final long headOffset;
+
+ static {
+ try {
+ UNSAFE = sun.misc.Unsafe.getUnsafe();
+ Class<ReferenceQueue> rqc = ReferenceQueue.class;
+ headOffset = UNSAFE.objectFieldOffset(rqc.getDeclaredField("head"));
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
}
< prev index next >