29 * Reference queues, to which registered reference objects are appended by the 30 * garbage collector after the appropriate reachability changes are detected. 31 * 32 * @author Mark Reinhold 33 * @since 1.2 34 */ 35 36 public class ReferenceQueue<T> { 37 38 /** 39 * Constructs a new reference-object queue. 40 */ 41 public ReferenceQueue() { } 42 43 private static class Null<S> extends ReferenceQueue<S> { 44 boolean enqueue(Reference<? extends S> r) { 45 return false; 46 } 47 } 48 49 static ReferenceQueue<Object> NULL = new Null<>(); 50 static ReferenceQueue<Object> ENQUEUED = new Null<>(); 51 52 static private class Lock { }; 53 private Lock lock = new Lock(); 54 private volatile Reference<? extends T> head = null; 55 private long queueLength = 0; 56 57 boolean enqueue(Reference<? extends T> r) { /* Called only by Reference class */ 58 synchronized (lock) { 59 // Check that since getting the lock this reference hasn't already been 60 // enqueued (and even then removed) 61 ReferenceQueue<?> queue = r.queue; 62 if ((queue == NULL) || (queue == ENQUEUED)) { 63 return false; 64 } 65 assert queue == this; 66 r.queue = ENQUEUED; 67 r.next = (head == null) ? r : head; 68 head = r; 69 queueLength++; 70 if (r instanceof FinalReference) { 71 sun.misc.VM.addFinalRefCount(1); 72 } 73 lock.notifyAll(); 74 return true; 75 } 76 } 77 78 @SuppressWarnings("unchecked") 79 private Reference<? extends T> reallyPoll() { /* Must hold lock */ 80 Reference<? extends T> r = head; 81 if (r != null) { 82 head = (r.next == r) ? 83 null : 84 r.next; // Unchecked due to the next field having a raw type in Reference 85 r.queue = NULL; 86 r.next = r; 87 queueLength--; 88 if (r instanceof FinalReference) { 89 sun.misc.VM.addFinalRefCount(-1); 90 } 91 return r; 92 } 93 return null; 94 } 95 96 /** 97 * Polls this queue to see if a reference object is available. If one is 98 * available without further delay then it is removed from the queue and 99 * returned. Otherwise this method immediately returns <tt>null</tt>. 100 * 101 * @return A reference object, if one was immediately available, 102 * otherwise <code>null</code> 103 */ 104 public Reference<? extends T> poll() { 105 if (head == null) 106 return null; 107 synchronized (lock) { 108 return reallyPoll(); 109 } 110 } 111 112 /** 113 * Removes the next reference object in this queue, blocking until either 114 * one becomes available or the given timeout period expires. 115 * 116 * <p> This method does not offer real-time guarantees: It schedules the 117 * timeout as if by invoking the {@link Object#wait(long)} method. 118 * 119 * @param timeout If positive, block for up to <code>timeout</code> 120 * milliseconds while waiting for a reference to be 121 * added to this queue. If zero, block indefinitely. 122 * 123 * @return A reference object, if one was available within the specified 124 * timeout period, otherwise <code>null</code> 125 * 126 * @throws IllegalArgumentException 127 * If the value of the timeout argument is negative 128 * 129 * @throws InterruptedException 130 * If the timeout wait is interrupted 131 */ 132 public Reference<? extends T> remove(long timeout) 133 throws IllegalArgumentException, InterruptedException 134 { 135 if (timeout < 0) { 136 throw new IllegalArgumentException("Negative timeout value"); 137 } 138 synchronized (lock) { 139 Reference<? extends T> r = reallyPoll(); 140 if (r != null) return r; 141 long start = (timeout == 0) ? 0 : System.nanoTime(); 142 for (;;) { 143 lock.wait(timeout); 144 r = reallyPoll(); 145 if (r != null) return r; 146 if (timeout != 0) { 147 long end = System.nanoTime(); 148 timeout -= (end - start) / 1000_000; 149 if (timeout <= 0) return null; 150 start = end; 151 } 152 } 153 } 154 } 155 156 /** 157 * Removes the next reference object in this queue, blocking until one 158 * becomes available. 159 * 160 * @return A reference object, blocking until one becomes available 161 * @throws InterruptedException If the wait is interrupted 162 */ 163 public Reference<? extends T> remove() throws InterruptedException { 164 return remove(0); 165 } 166 167 } | 29 * Reference queues, to which registered reference objects are appended by the 30 * garbage collector after the appropriate reachability changes are detected. 31 * 32 * @author Mark Reinhold 33 * @since 1.2 34 */ 35 36 public class ReferenceQueue<T> { 37 38 /** 39 * Constructs a new reference-object queue. 40 */ 41 public ReferenceQueue() { } 42 43 private static class Null<S> extends ReferenceQueue<S> { 44 boolean enqueue(Reference<? extends S> r) { 45 return false; 46 } 47 } 48 49 static final ReferenceQueue<Object> NULL = new Null<>(); 50 static final ReferenceQueue<Object> ENQUEUED = new Null<>(); 51 52 static private class Lock { } 53 private final Lock lock = new Lock(); 54 private volatile int waiters; 55 56 @SuppressWarnings("unused") 57 private volatile Reference<? extends T> head; // we assign using Unsafe CAS 58 59 boolean enqueue(Reference<? extends T> r) { /* Called only by Reference class */ 60 if (markEnqueued(r)) { // markEnqueued is atomic 61 addChunk(r, r); 62 return true; 63 } else { 64 return false; 65 } 66 } 67 68 boolean markEnqueued(Reference<?> r) { 69 ReferenceQueue<?> queue; 70 do { 71 // Check that this reference hasn't already been 72 // enqueued (and even then removed) 73 queue = r.queue; 74 if ((queue == NULL) || (queue == ENQUEUED)) { 75 return false; 76 } 77 } while (!r.casQueue(queue, ENQUEUED)); 78 assert queue == this; 79 return true; 80 } 81 82 @SuppressWarnings("unchecked") 83 void addChunk(Reference<?> chunkHead, Reference<?> chunkTail) { 84 Reference<? extends T> h; 85 do { 86 h = head; 87 chunkTail.next = (h == null) ? chunkTail : h; 88 } while (!casHead(h, (Reference) chunkHead)); 89 // notify waiters 90 if (waiters > 0) { 91 synchronized (lock) { 92 if (waiters > 0) { 93 lock.notifyAll(); 94 } 95 } 96 } 97 } 98 99 private Reference<? extends T> reallyPoll() { 100 Reference<? extends T> r; 101 while ((r = head) != null) { 102 @SuppressWarnings("unchecked") // due to cast to raw type 103 Reference<? extends T> nh = (r.next == r) 104 ? null : (Reference) r.next; 105 if (casHead(r, nh)) { 106 r.lazySetQueue(NULL); 107 UNSAFE.storeFence(); 108 r.next = r; 109 break; 110 } 111 } 112 return r; 113 } 114 115 /** 116 * Polls this queue to see if a reference object is available. If one is 117 * available without further delay then it is removed from the queue and 118 * returned. Otherwise this method immediately returns <tt>null</tt>. 119 * 120 * @return A reference object, if one was immediately available, 121 * otherwise <code>null</code> 122 */ 123 public Reference<? extends T> poll() { 124 return reallyPoll(); 125 } 126 127 /** 128 * Removes the next reference object in this queue, blocking until either 129 * one becomes available or the given timeout period expires. 130 * 131 * <p> This method does not offer real-time guarantees: It schedules the 132 * timeout as if by invoking the {@link Object#wait(long)} method. 133 * 134 * @param timeout If positive, block for up to <code>timeout</code> 135 * milliseconds while waiting for a reference to be 136 * added to this queue. If zero, block indefinitely. 137 * 138 * @return A reference object, if one was available within the specified 139 * timeout period, otherwise <code>null</code> 140 * 141 * @throws IllegalArgumentException 142 * If the value of the timeout argument is negative 143 * 144 * @throws InterruptedException 145 * If the timeout wait is interrupted 146 */ 147 public Reference<? extends T> remove(long timeout) 148 throws IllegalArgumentException, InterruptedException 149 { 150 if (timeout < 0) { 151 throw new IllegalArgumentException("Negative timeout value"); 152 } 153 Reference<? extends T> r = reallyPoll(); 154 if (r != null) return r; 155 return reallyRemove(timeout); 156 } 157 158 private Reference<? extends T> reallyRemove(long timeout) throws InterruptedException { 159 long deadline = (timeout == 0) 160 ? 0 : System.nanoTime() + timeout * 1000_000L; 161 162 synchronized (lock) { 163 ++waiters; 164 try { 165 for (; ; ) { 166 Reference<? extends T> r = reallyPoll(); 167 if (r != null) return r; 168 if (timeout == 0) { 169 lock.wait(0); 170 } else { 171 long timeoutNanos = deadline - System.nanoTime(); 172 if (timeoutNanos <= 0) return null; 173 lock.wait(timeoutNanos / 1000_000L, (int) (timeoutNanos % 1000_000L)); 174 } 175 } 176 } finally { 177 --waiters; 178 } 179 } 180 } 181 182 /** 183 * Removes the next reference object in this queue, blocking until one 184 * becomes available. 185 * 186 * @return A reference object, blocking until one becomes available 187 * @throws InterruptedException If the wait is interrupted 188 */ 189 public Reference<? extends T> remove() throws InterruptedException { 190 return remove(0); 191 } 192 193 // Unsafe machinery 194 195 private boolean casHead(Reference<?> cmp, Reference<? extends T> val) { 196 return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); 197 } 198 199 private static final sun.misc.Unsafe UNSAFE; 200 private static final long headOffset; 201 202 static { 203 try { 204 UNSAFE = sun.misc.Unsafe.getUnsafe(); 205 Class<ReferenceQueue> rqc = ReferenceQueue.class; 206 headOffset = UNSAFE.objectFieldOffset(rqc.getDeclaredField("head")); 207 } catch (Exception e) { 208 throw new Error(e); 209 } 210 } 211 } |