Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/java/util/concurrent/DelayQueue.java
+++ new/src/share/classes/java/util/concurrent/DelayQueue.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation. Oracle designates this
7 7 * particular file as subject to the "Classpath" exception as provided
8 8 * by Oracle in the LICENSE file that accompanied this code.
9 9 *
10 10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 13 * version 2 for more details (a copy is included in the LICENSE file that
14 14 * accompanied this code).
15 15 *
16 16 * You should have received a copy of the GNU General Public License version
17 17 * 2 along with this work; if not, write to the Free Software Foundation,
18 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 19 *
20 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 21 * or visit www.oracle.com if you need additional information or have any
22 22 * questions.
23 23 */
24 24
25 25 /*
26 26 * This file is available under and governed by the GNU General Public
27 27 * License version 2 only, as published by the Free Software Foundation.
28 28 * However, the following notice accompanied the original version of this
29 29 * file:
30 30 *
31 31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 32 * Expert Group and released to the public domain, as explained at
33 33 * http://creativecommons.org/licenses/publicdomain
34 34 */
35 35
36 36
37 37 package java.util.concurrent;
38 38 import java.util.concurrent.locks.*;
39 39 import java.util.*;
40 40
41 41 /**
42 42 * An unbounded {@linkplain BlockingQueue blocking queue} of
43 43 * <tt>Delayed</tt> elements, in which an element can only be taken
44 44 * when its delay has expired. The <em>head</em> of the queue is that
45 45 * <tt>Delayed</tt> element whose delay expired furthest in the
46 46 * past. If no delay has expired there is no head and <tt>poll</tt>
47 47 * will return <tt>null</tt>. Expiration occurs when an element's
48 48 * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
49 49 * than or equal to zero. Even though unexpired elements cannot be
50 50 * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
51 51 * treated as normal elements. For example, the <tt>size</tt> method
52 52 * returns the count of both expired and unexpired elements.
53 53 * This queue does not permit null elements.
54 54 *
55 55 * <p>This class and its iterator implement all of the
56 56 * <em>optional</em> methods of the {@link Collection} and {@link
57 57 * Iterator} interfaces.
58 58 *
59 59 * <p>This class is a member of the
60 60 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
61 61 * Java Collections Framework</a>.
62 62 *
63 63 * @since 1.5
64 64 * @author Doug Lea
65 65 * @param <E> the type of elements held in this collection
66 66 */
67 67
68 68 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
69 69 implements BlockingQueue<E> {
70 70
71 71 private transient final ReentrantLock lock = new ReentrantLock();
72 72 private final PriorityQueue<E> q = new PriorityQueue<E>();
73 73
74 74 /**
75 75 * Thread designated to wait for the element at the head of
76 76 * the queue. This variant of the Leader-Follower pattern
77 77 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
78 78 * minimize unnecessary timed waiting. When a thread becomes
79 79 * the leader, it waits only for the next delay to elapse, but
80 80 * other threads await indefinitely. The leader thread must
81 81 * signal some other thread before returning from take() or
82 82 * poll(...), unless some other thread becomes leader in the
83 83 * interim. Whenever the head of the queue is replaced with
84 84 * an element with an earlier expiration time, the leader
85 85 * field is invalidated by being reset to null, and some
86 86 * waiting thread, but not necessarily the current leader, is
87 87 * signalled. So waiting threads must be prepared to acquire
88 88 * and lose leadership while waiting.
89 89 */
90 90 private Thread leader = null;
91 91
92 92 /**
93 93 * Condition signalled when a newer element becomes available
94 94 * at the head of the queue or a new thread may need to
95 95 * become leader.
96 96 */
97 97 private final Condition available = lock.newCondition();
98 98
99 99 /**
100 100 * Creates a new <tt>DelayQueue</tt> that is initially empty.
101 101 */
102 102 public DelayQueue() {}
103 103
104 104 /**
105 105 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
106 106 * given collection of {@link Delayed} instances.
107 107 *
108 108 * @param c the collection of elements to initially contain
109 109 * @throws NullPointerException if the specified collection or any
110 110 * of its elements are null
111 111 */
112 112 public DelayQueue(Collection<? extends E> c) {
113 113 this.addAll(c);
114 114 }
115 115
116 116 /**
117 117 * Inserts the specified element into this delay queue.
118 118 *
119 119 * @param e the element to add
120 120 * @return <tt>true</tt> (as specified by {@link Collection#add})
121 121 * @throws NullPointerException if the specified element is null
122 122 */
123 123 public boolean add(E e) {
124 124 return offer(e);
125 125 }
126 126
127 127 /**
128 128 * Inserts the specified element into this delay queue.
129 129 *
130 130 * @param e the element to add
131 131 * @return <tt>true</tt>
132 132 * @throws NullPointerException if the specified element is null
133 133 */
134 134 public boolean offer(E e) {
135 135 final ReentrantLock lock = this.lock;
136 136 lock.lock();
137 137 try {
138 138 q.offer(e);
139 139 if (q.peek() == e) {
140 140 leader = null;
141 141 available.signal();
142 142 }
143 143 return true;
144 144 } finally {
145 145 lock.unlock();
146 146 }
147 147 }
148 148
149 149 /**
150 150 * Inserts the specified element into this delay queue. As the queue is
151 151 * unbounded this method will never block.
152 152 *
153 153 * @param e the element to add
154 154 * @throws NullPointerException {@inheritDoc}
155 155 */
156 156 public void put(E e) {
157 157 offer(e);
158 158 }
159 159
160 160 /**
161 161 * Inserts the specified element into this delay queue. As the queue is
162 162 * unbounded this method will never block.
163 163 *
164 164 * @param e the element to add
165 165 * @param timeout This parameter is ignored as the method never blocks
166 166 * @param unit This parameter is ignored as the method never blocks
167 167 * @return <tt>true</tt>
168 168 * @throws NullPointerException {@inheritDoc}
169 169 */
170 170 public boolean offer(E e, long timeout, TimeUnit unit) {
171 171 return offer(e);
172 172 }
173 173
174 174 /**
175 175 * Retrieves and removes the head of this queue, or returns <tt>null</tt>
176 176 * if this queue has no elements with an expired delay.
177 177 *
178 178 * @return the head of this queue, or <tt>null</tt> if this
179 179 * queue has no elements with an expired delay
180 180 */
181 181 public E poll() {
182 182 final ReentrantLock lock = this.lock;
183 183 lock.lock();
184 184 try {
185 185 E first = q.peek();
186 186 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
187 187 return null;
188 188 else
189 189 return q.poll();
190 190 } finally {
191 191 lock.unlock();
192 192 }
193 193 }
194 194
195 195 /**
196 196 * Retrieves and removes the head of this queue, waiting if necessary
197 197 * until an element with an expired delay is available on this queue.
198 198 *
199 199 * @return the head of this queue
200 200 * @throws InterruptedException {@inheritDoc}
201 201 */
202 202 public E take() throws InterruptedException {
203 203 final ReentrantLock lock = this.lock;
204 204 lock.lockInterruptibly();
205 205 try {
206 206 for (;;) {
207 207 E first = q.peek();
208 208 if (first == null)
209 209 available.await();
210 210 else {
211 211 long delay = first.getDelay(TimeUnit.NANOSECONDS);
212 212 if (delay <= 0)
213 213 return q.poll();
214 214 else if (leader != null)
215 215 available.await();
216 216 else {
217 217 Thread thisThread = Thread.currentThread();
218 218 leader = thisThread;
219 219 try {
220 220 available.awaitNanos(delay);
221 221 } finally {
222 222 if (leader == thisThread)
223 223 leader = null;
224 224 }
225 225 }
226 226 }
227 227 }
228 228 } finally {
229 229 if (leader == null && q.peek() != null)
230 230 available.signal();
231 231 lock.unlock();
232 232 }
233 233 }
234 234
235 235 /**
236 236 * Retrieves and removes the head of this queue, waiting if necessary
237 237 * until an element with an expired delay is available on this queue,
238 238 * or the specified wait time expires.
239 239 *
240 240 * @return the head of this queue, or <tt>null</tt> if the
241 241 * specified waiting time elapses before an element with
242 242 * an expired delay becomes available
243 243 * @throws InterruptedException {@inheritDoc}
244 244 */
245 245 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
246 246 long nanos = unit.toNanos(timeout);
247 247 final ReentrantLock lock = this.lock;
248 248 lock.lockInterruptibly();
249 249 try {
250 250 for (;;) {
251 251 E first = q.peek();
252 252 if (first == null) {
253 253 if (nanos <= 0)
254 254 return null;
255 255 else
256 256 nanos = available.awaitNanos(nanos);
257 257 } else {
258 258 long delay = first.getDelay(TimeUnit.NANOSECONDS);
259 259 if (delay <= 0)
260 260 return q.poll();
261 261 if (nanos <= 0)
262 262 return null;
263 263 if (nanos < delay || leader != null)
264 264 nanos = available.awaitNanos(nanos);
265 265 else {
266 266 Thread thisThread = Thread.currentThread();
267 267 leader = thisThread;
268 268 try {
269 269 long timeLeft = available.awaitNanos(delay);
270 270 nanos -= delay - timeLeft;
271 271 } finally {
272 272 if (leader == thisThread)
273 273 leader = null;
274 274 }
275 275 }
276 276 }
277 277 }
278 278 } finally {
279 279 if (leader == null && q.peek() != null)
280 280 available.signal();
281 281 lock.unlock();
282 282 }
283 283 }
284 284
285 285 /**
286 286 * Retrieves, but does not remove, the head of this queue, or
287 287 * returns <tt>null</tt> if this queue is empty. Unlike
288 288 * <tt>poll</tt>, if no expired elements are available in the queue,
289 289 * this method returns the element that will expire next,
290 290 * if one exists.
291 291 *
292 292 * @return the head of this queue, or <tt>null</tt> if this
293 293 * queue is empty.
294 294 */
295 295 public E peek() {
296 296 final ReentrantLock lock = this.lock;
297 297 lock.lock();
298 298 try {
299 299 return q.peek();
300 300 } finally {
301 301 lock.unlock();
302 302 }
303 303 }
304 304
305 305 public int size() {
306 306 final ReentrantLock lock = this.lock;
307 307 lock.lock();
308 308 try {
309 309 return q.size();
310 310 } finally {
311 311 lock.unlock();
312 312 }
313 313 }
314 314
315 315 /**
316 316 * @throws UnsupportedOperationException {@inheritDoc}
317 317 * @throws ClassCastException {@inheritDoc}
318 318 * @throws NullPointerException {@inheritDoc}
319 319 * @throws IllegalArgumentException {@inheritDoc}
320 320 */
321 321 public int drainTo(Collection<? super E> c) {
322 322 if (c == null)
323 323 throw new NullPointerException();
324 324 if (c == this)
325 325 throw new IllegalArgumentException();
326 326 final ReentrantLock lock = this.lock;
327 327 lock.lock();
328 328 try {
329 329 int n = 0;
330 330 for (;;) {
331 331 E first = q.peek();
332 332 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
333 333 break;
334 334 c.add(q.poll());
335 335 ++n;
336 336 }
337 337 return n;
338 338 } finally {
339 339 lock.unlock();
340 340 }
341 341 }
342 342
343 343 /**
344 344 * @throws UnsupportedOperationException {@inheritDoc}
345 345 * @throws ClassCastException {@inheritDoc}
346 346 * @throws NullPointerException {@inheritDoc}
347 347 * @throws IllegalArgumentException {@inheritDoc}
348 348 */
349 349 public int drainTo(Collection<? super E> c, int maxElements) {
350 350 if (c == null)
351 351 throw new NullPointerException();
352 352 if (c == this)
353 353 throw new IllegalArgumentException();
354 354 if (maxElements <= 0)
355 355 return 0;
356 356 final ReentrantLock lock = this.lock;
357 357 lock.lock();
358 358 try {
359 359 int n = 0;
360 360 while (n < maxElements) {
361 361 E first = q.peek();
362 362 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
363 363 break;
364 364 c.add(q.poll());
365 365 ++n;
366 366 }
367 367 return n;
368 368 } finally {
369 369 lock.unlock();
370 370 }
371 371 }
372 372
373 373 /**
374 374 * Atomically removes all of the elements from this delay queue.
375 375 * The queue will be empty after this call returns.
376 376 * Elements with an unexpired delay are not waited for; they are
377 377 * simply discarded from the queue.
378 378 */
379 379 public void clear() {
380 380 final ReentrantLock lock = this.lock;
381 381 lock.lock();
382 382 try {
383 383 q.clear();
384 384 } finally {
385 385 lock.unlock();
386 386 }
387 387 }
388 388
389 389 /**
390 390 * Always returns <tt>Integer.MAX_VALUE</tt> because
391 391 * a <tt>DelayQueue</tt> is not capacity constrained.
392 392 *
393 393 * @return <tt>Integer.MAX_VALUE</tt>
394 394 */
395 395 public int remainingCapacity() {
396 396 return Integer.MAX_VALUE;
397 397 }
398 398
399 399 /**
400 400 * Returns an array containing all of the elements in this queue.
401 401 * The returned array elements are in no particular order.
402 402 *
403 403 * <p>The returned array will be "safe" in that no references to it are
404 404 * maintained by this queue. (In other words, this method must allocate
405 405 * a new array). The caller is thus free to modify the returned array.
406 406 *
407 407 * <p>This method acts as bridge between array-based and collection-based
408 408 * APIs.
409 409 *
410 410 * @return an array containing all of the elements in this queue
411 411 */
412 412 public Object[] toArray() {
413 413 final ReentrantLock lock = this.lock;
414 414 lock.lock();
415 415 try {
416 416 return q.toArray();
417 417 } finally {
418 418 lock.unlock();
419 419 }
420 420 }
421 421
422 422 /**
423 423 * Returns an array containing all of the elements in this queue; the
424 424 * runtime type of the returned array is that of the specified array.
425 425 * The returned array elements are in no particular order.
426 426 * If the queue fits in the specified array, it is returned therein.
427 427 * Otherwise, a new array is allocated with the runtime type of the
428 428 * specified array and the size of this queue.
429 429 *
430 430 * <p>If this queue fits in the specified array with room to spare
431 431 * (i.e., the array has more elements than this queue), the element in
432 432 * the array immediately following the end of the queue is set to
433 433 * <tt>null</tt>.
434 434 *
435 435 * <p>Like the {@link #toArray()} method, this method acts as bridge between
436 436 * array-based and collection-based APIs. Further, this method allows
437 437 * precise control over the runtime type of the output array, and may,
438 438 * under certain circumstances, be used to save allocation costs.
439 439 *
440 440 * <p>The following code can be used to dump a delay queue into a newly
441 441 * allocated array of <tt>Delayed</tt>:
442 442 *
443 443 * <pre>
444 444 * Delayed[] a = q.toArray(new Delayed[0]);</pre>
445 445 *
446 446 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
447 447 * <tt>toArray()</tt>.
448 448 *
449 449 * @param a the array into which the elements of the queue are to
450 450 * be stored, if it is big enough; otherwise, a new array of the
451 451 * same runtime type is allocated for this purpose
452 452 * @return an array containing all of the elements in this queue
453 453 * @throws ArrayStoreException if the runtime type of the specified array
454 454 * is not a supertype of the runtime type of every element in
455 455 * this queue
456 456 * @throws NullPointerException if the specified array is null
457 457 */
458 458 public <T> T[] toArray(T[] a) {
459 459 final ReentrantLock lock = this.lock;
460 460 lock.lock();
461 461 try {
462 462 return q.toArray(a);
463 463 } finally {
464 464 lock.unlock();
465 465 }
466 466 }
467 467
468 468 /**
469 469 * Removes a single instance of the specified element from this
470 470 * queue, if it is present, whether or not it has expired.
471 471 */
472 472 public boolean remove(Object o) {
473 473 final ReentrantLock lock = this.lock;
474 474 lock.lock();
↓ open down ↓ |
474 lines elided |
↑ open up ↑ |
475 475 try {
476 476 return q.remove(o);
477 477 } finally {
478 478 lock.unlock();
479 479 }
480 480 }
481 481
482 482 /**
483 483 * Returns an iterator over all the elements (both expired and
484 484 * unexpired) in this queue. The iterator does not return the
485 - * elements in any particular order. The returned
486 - * <tt>Iterator</tt> is a "weakly consistent" iterator that will
487 - * never throw {@link ConcurrentModificationException}, and
488 - * guarantees to traverse elements as they existed upon
489 - * construction of the iterator, and may (but is not guaranteed
490 - * to) reflect any modifications subsequent to construction.
485 + * elements in any particular order.
491 486 *
487 + * <p>The returned iterator is a "weakly consistent" iterator that
488 + * will never throw {@link java.util.ConcurrentModificationException
489 + * ConcurrentModificationException}, and guarantees to traverse
490 + * elements as they existed upon construction of the iterator, and
491 + * may (but is not guaranteed to) reflect any modifications
492 + * subsequent to construction.
493 + *
492 494 * @return an iterator over the elements in this queue
493 495 */
494 496 public Iterator<E> iterator() {
495 497 return new Itr(toArray());
496 498 }
497 499
498 500 /**
499 501 * Snapshot iterator that works off copy of underlying q array.
500 502 */
501 503 private class Itr implements Iterator<E> {
502 504 final Object[] array; // Array of all elements
503 505 int cursor; // index of next element to return;
504 506 int lastRet; // index of last element, or -1 if no such
505 507
506 508 Itr(Object[] array) {
507 509 lastRet = -1;
508 510 this.array = array;
509 511 }
510 512
511 513 public boolean hasNext() {
512 514 return cursor < array.length;
513 515 }
514 516
515 517 @SuppressWarnings("unchecked")
516 518 public E next() {
517 519 if (cursor >= array.length)
518 520 throw new NoSuchElementException();
519 521 lastRet = cursor;
520 522 return (E)array[cursor++];
521 523 }
522 524
523 525 public void remove() {
524 526 if (lastRet < 0)
525 527 throw new IllegalStateException();
526 528 Object x = array[lastRet];
527 529 lastRet = -1;
528 530 // Traverse underlying queue to find == element,
529 531 // not just a .equals element.
530 532 lock.lock();
531 533 try {
532 534 for (Iterator it = q.iterator(); it.hasNext(); ) {
533 535 if (it.next() == x) {
534 536 it.remove();
535 537 return;
536 538 }
537 539 }
538 540 } finally {
539 541 lock.unlock();
540 542 }
541 543 }
542 544 }
543 545
544 546 }
↓ open down ↓ |
43 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX