1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import static java.util.concurrent.TimeUnit.NANOSECONDS;
  39 
  40 import java.util.AbstractQueue;
  41 import java.util.Collection;
  42 import java.util.Iterator;
  43 import java.util.NoSuchElementException;
  44 import java.util.Objects;
  45 import java.util.PriorityQueue;
  46 import java.util.concurrent.locks.Condition;
  47 import java.util.concurrent.locks.ReentrantLock;
  48 
  49 /**
  50  * An unbounded {@linkplain BlockingQueue blocking queue} of
  51  * {@code Delayed} elements, in which an element can only be taken
  52  * when its delay has expired.  The <em>head</em> of the queue is that
  53  * {@code Delayed} element whose delay expired furthest in the
  54  * past.  If no delay has expired there is no head and {@code poll}
  55  * will return {@code null}. Expiration occurs when an element's
  56  * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
  57  * than or equal to zero.  Even though unexpired elements cannot be
  58  * removed using {@code take} or {@code poll}, they are otherwise
  59  * treated as normal elements. For example, the {@code size} method
  60  * returns the count of both expired and unexpired elements.
  61  * This queue does not permit null elements.
  62  *
  63  * <p>This class and its iterator implement all of the <em>optional</em>
  64  * methods of the {@link Collection} and {@link Iterator} interfaces.
  65  * The Iterator provided in method {@link #iterator()} is <em>not</em>
  66  * guaranteed to traverse the elements of the DelayQueue in any
  67  * particular order.
  68  *
  69  * <p>This class is a member of the
  70  * <a href="{@docRoot}/java/util/package-summary.html#CollectionsFramework">
  71  * Java Collections Framework</a>.
  72  *
  73  * @since 1.5
  74  * @author Doug Lea
  75  * @param <E> the type of elements held in this queue
  76  */
  77 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
  78     implements BlockingQueue<E> {
  79 
  80     private final transient ReentrantLock lock = new ReentrantLock();
  81     private final PriorityQueue<E> q = new PriorityQueue<E>();
  82 
  83     /**
  84      * Thread designated to wait for the element at the head of
  85      * the queue.  This variant of the Leader-Follower pattern
  86      * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
  87      * minimize unnecessary timed waiting.  When a thread becomes
  88      * the leader, it waits only for the next delay to elapse, but
  89      * other threads await indefinitely.  The leader thread must
  90      * signal some other thread before returning from take() or
  91      * poll(...), unless some other thread becomes leader in the
  92      * interim.  Whenever the head of the queue is replaced with
  93      * an element with an earlier expiration time, the leader
  94      * field is invalidated by being reset to null, and some
  95      * waiting thread, but not necessarily the current leader, is
  96      * signalled.  So waiting threads must be prepared to acquire
  97      * and lose leadership while waiting.
  98      */
  99     private Thread leader;
 100 
 101     /**
 102      * Condition signalled when a newer element becomes available
 103      * at the head of the queue or a new thread may need to
 104      * become leader.
 105      */
 106     private final Condition available = lock.newCondition();
 107 
 108     /**
 109      * Creates a new {@code DelayQueue} that is initially empty.
 110      */
 111     public DelayQueue() {}
 112 
 113     /**
 114      * Creates a {@code DelayQueue} initially containing the elements of the
 115      * given collection of {@link Delayed} instances.
 116      *
 117      * @param c the collection of elements to initially contain
 118      * @throws NullPointerException if the specified collection or any
 119      *         of its elements are null
 120      */
 121     public DelayQueue(Collection<? extends E> c) {
 122         this.addAll(c);
 123     }
 124 
 125     /**
 126      * Inserts the specified element into this delay queue.
 127      *
 128      * @param e the element to add
 129      * @return {@code true} (as specified by {@link Collection#add})
 130      * @throws NullPointerException if the specified element is null
 131      */
 132     public boolean add(E e) {
 133         return offer(e);
 134     }
 135 
 136     /**
 137      * Inserts the specified element into this delay queue.
 138      *
 139      * @param e the element to add
 140      * @return {@code true}
 141      * @throws NullPointerException if the specified element is null
 142      */
 143     public boolean offer(E e) {
 144         final ReentrantLock lock = this.lock;
 145         lock.lock();
 146         try {
 147             q.offer(e);
 148             if (q.peek() == e) {
 149                 leader = null;
 150                 available.signal();
 151             }
 152             return true;
 153         } finally {
 154             lock.unlock();
 155         }
 156     }
 157 
 158     /**
 159      * Inserts the specified element into this delay queue. As the queue is
 160      * unbounded this method will never block.
 161      *
 162      * @param e the element to add
 163      * @throws NullPointerException {@inheritDoc}
 164      */
 165     public void put(E e) {
 166         offer(e);
 167     }
 168 
 169     /**
 170      * Inserts the specified element into this delay queue. As the queue is
 171      * unbounded this method will never block.
 172      *
 173      * @param e the element to add
 174      * @param timeout This parameter is ignored as the method never blocks
 175      * @param unit This parameter is ignored as the method never blocks
 176      * @return {@code true}
 177      * @throws NullPointerException {@inheritDoc}
 178      */
 179     public boolean offer(E e, long timeout, TimeUnit unit) {
 180         return offer(e);
 181     }
 182 
 183     /**
 184      * Retrieves and removes the head of this queue, or returns {@code null}
 185      * if this queue has no elements with an expired delay.
 186      *
 187      * @return the head of this queue, or {@code null} if this
 188      *         queue has no elements with an expired delay
 189      */
 190     public E poll() {
 191         final ReentrantLock lock = this.lock;
 192         lock.lock();
 193         try {
 194             E first = q.peek();
 195             return (first == null || first.getDelay(NANOSECONDS) > 0)
 196                 ? null
 197                 : q.poll();
 198         } finally {
 199             lock.unlock();
 200         }
 201     }
 202 
 203     /**
 204      * Retrieves and removes the head of this queue, waiting if necessary
 205      * until an element with an expired delay is available on this queue.
 206      *
 207      * @return the head of this queue
 208      * @throws InterruptedException {@inheritDoc}
 209      */
 210     public E take() throws InterruptedException {
 211         final ReentrantLock lock = this.lock;
 212         lock.lockInterruptibly();
 213         try {
 214             for (;;) {
 215                 E first = q.peek();
 216                 if (first == null)
 217                     available.await();
 218                 else {
 219                     long delay = first.getDelay(NANOSECONDS);
 220                     if (delay <= 0L)
 221                         return q.poll();
 222                     first = null; // don't retain ref while waiting
 223                     if (leader != null)
 224                         available.await();
 225                     else {
 226                         Thread thisThread = Thread.currentThread();
 227                         leader = thisThread;
 228                         try {
 229                             available.awaitNanos(delay);
 230                         } finally {
 231                             if (leader == thisThread)
 232                                 leader = null;
 233                         }
 234                     }
 235                 }
 236             }
 237         } finally {
 238             if (leader == null && q.peek() != null)
 239                 available.signal();
 240             lock.unlock();
 241         }
 242     }
 243 
 244     /**
 245      * Retrieves and removes the head of this queue, waiting if necessary
 246      * until an element with an expired delay is available on this queue,
 247      * or the specified wait time expires.
 248      *
 249      * @return the head of this queue, or {@code null} if the
 250      *         specified waiting time elapses before an element with
 251      *         an expired delay becomes available
 252      * @throws InterruptedException {@inheritDoc}
 253      */
 254     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 255         long nanos = unit.toNanos(timeout);
 256         final ReentrantLock lock = this.lock;
 257         lock.lockInterruptibly();
 258         try {
 259             for (;;) {
 260                 E first = q.peek();
 261                 if (first == null) {
 262                     if (nanos <= 0L)
 263                         return null;
 264                     else
 265                         nanos = available.awaitNanos(nanos);
 266                 } else {
 267                     long delay = first.getDelay(NANOSECONDS);
 268                     if (delay <= 0L)
 269                         return q.poll();
 270                     if (nanos <= 0L)
 271                         return null;
 272                     first = null; // don't retain ref while waiting
 273                     if (nanos < delay || leader != null)
 274                         nanos = available.awaitNanos(nanos);
 275                     else {
 276                         Thread thisThread = Thread.currentThread();
 277                         leader = thisThread;
 278                         try {
 279                             long timeLeft = available.awaitNanos(delay);
 280                             nanos -= delay - timeLeft;
 281                         } finally {
 282                             if (leader == thisThread)
 283                                 leader = null;
 284                         }
 285                     }
 286                 }
 287             }
 288         } finally {
 289             if (leader == null && q.peek() != null)
 290                 available.signal();
 291             lock.unlock();
 292         }
 293     }
 294 
 295     /**
 296      * Retrieves, but does not remove, the head of this queue, or
 297      * returns {@code null} if this queue is empty.  Unlike
 298      * {@code poll}, if no expired elements are available in the queue,
 299      * this method returns the element that will expire next,
 300      * if one exists.
 301      *
 302      * @return the head of this queue, or {@code null} if this
 303      *         queue is empty
 304      */
 305     public E peek() {
 306         final ReentrantLock lock = this.lock;
 307         lock.lock();
 308         try {
 309             return q.peek();
 310         } finally {
 311             lock.unlock();
 312         }
 313     }
 314 
 315     public int size() {
 316         final ReentrantLock lock = this.lock;
 317         lock.lock();
 318         try {
 319             return q.size();
 320         } finally {
 321             lock.unlock();
 322         }
 323     }
 324 
 325     /**
 326      * @throws UnsupportedOperationException {@inheritDoc}
 327      * @throws ClassCastException            {@inheritDoc}
 328      * @throws NullPointerException          {@inheritDoc}
 329      * @throws IllegalArgumentException      {@inheritDoc}
 330      */
 331     public int drainTo(Collection<? super E> c) {
 332         return drainTo(c, Integer.MAX_VALUE);
 333     }
 334 
 335     /**
 336      * @throws UnsupportedOperationException {@inheritDoc}
 337      * @throws ClassCastException            {@inheritDoc}
 338      * @throws NullPointerException          {@inheritDoc}
 339      * @throws IllegalArgumentException      {@inheritDoc}
 340      */
 341     public int drainTo(Collection<? super E> c, int maxElements) {
 342         Objects.requireNonNull(c);
 343         if (c == this)
 344             throw new IllegalArgumentException();
 345         if (maxElements <= 0)
 346             return 0;
 347         final ReentrantLock lock = this.lock;
 348         lock.lock();
 349         try {
 350             int n = 0;
 351             for (E first;
 352                  n < maxElements
 353                      && (first = q.peek()) != null
 354                      && first.getDelay(NANOSECONDS) <= 0;) {
 355                 c.add(first);   // In this order, in case add() throws.
 356                 q.poll();
 357                 ++n;
 358             }
 359             return n;
 360         } finally {
 361             lock.unlock();
 362         }
 363     }
 364 
 365     /**
 366      * Atomically removes all of the elements from this delay queue.
 367      * The queue will be empty after this call returns.
 368      * Elements with an unexpired delay are not waited for; they are
 369      * simply discarded from the queue.
 370      */
 371     public void clear() {
 372         final ReentrantLock lock = this.lock;
 373         lock.lock();
 374         try {
 375             q.clear();
 376         } finally {
 377             lock.unlock();
 378         }
 379     }
 380 
 381     /**
 382      * Always returns {@code Integer.MAX_VALUE} because
 383      * a {@code DelayQueue} is not capacity constrained.
 384      *
 385      * @return {@code Integer.MAX_VALUE}
 386      */
 387     public int remainingCapacity() {
 388         return Integer.MAX_VALUE;
 389     }
 390 
 391     /**
 392      * Returns an array containing all of the elements in this queue.
 393      * The returned array elements are in no particular order.
 394      *
 395      * <p>The returned array will be "safe" in that no references to it are
 396      * maintained by this queue.  (In other words, this method must allocate
 397      * a new array).  The caller is thus free to modify the returned array.
 398      *
 399      * <p>This method acts as bridge between array-based and collection-based
 400      * APIs.
 401      *
 402      * @return an array containing all of the elements in this queue
 403      */
 404     public Object[] toArray() {
 405         final ReentrantLock lock = this.lock;
 406         lock.lock();
 407         try {
 408             return q.toArray();
 409         } finally {
 410             lock.unlock();
 411         }
 412     }
 413 
 414     /**
 415      * Returns an array containing all of the elements in this queue; the
 416      * runtime type of the returned array is that of the specified array.
 417      * The returned array elements are in no particular order.
 418      * If the queue fits in the specified array, it is returned therein.
 419      * Otherwise, a new array is allocated with the runtime type of the
 420      * specified array and the size of this queue.
 421      *
 422      * <p>If this queue fits in the specified array with room to spare
 423      * (i.e., the array has more elements than this queue), the element in
 424      * the array immediately following the end of the queue is set to
 425      * {@code null}.
 426      *
 427      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 428      * array-based and collection-based APIs.  Further, this method allows
 429      * precise control over the runtime type of the output array, and may,
 430      * under certain circumstances, be used to save allocation costs.
 431      *
 432      * <p>The following code can be used to dump a delay queue into a newly
 433      * allocated array of {@code Delayed}:
 434      *
 435      * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
 436      *
 437      * Note that {@code toArray(new Object[0])} is identical in function to
 438      * {@code toArray()}.
 439      *
 440      * @param a the array into which the elements of the queue are to
 441      *          be stored, if it is big enough; otherwise, a new array of the
 442      *          same runtime type is allocated for this purpose
 443      * @return an array containing all of the elements in this queue
 444      * @throws ArrayStoreException if the runtime type of the specified array
 445      *         is not a supertype of the runtime type of every element in
 446      *         this queue
 447      * @throws NullPointerException if the specified array is null
 448      */
 449     public <T> T[] toArray(T[] a) {
 450         final ReentrantLock lock = this.lock;
 451         lock.lock();
 452         try {
 453             return q.toArray(a);
 454         } finally {
 455             lock.unlock();
 456         }
 457     }
 458 
 459     /**
 460      * Removes a single instance of the specified element from this
 461      * queue, if it is present, whether or not it has expired.
 462      */
 463     public boolean remove(Object o) {
 464         final ReentrantLock lock = this.lock;
 465         lock.lock();
 466         try {
 467             return q.remove(o);
 468         } finally {
 469             lock.unlock();
 470         }
 471     }
 472 
 473     /**
 474      * Identity-based version for use in Itr.remove.
 475      */
 476     void removeEQ(Object o) {
 477         final ReentrantLock lock = this.lock;
 478         lock.lock();
 479         try {
 480             for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
 481                 if (o == it.next()) {
 482                     it.remove();
 483                     break;
 484                 }
 485             }
 486         } finally {
 487             lock.unlock();
 488         }
 489     }
 490 
 491     /**
 492      * Returns an iterator over all the elements (both expired and
 493      * unexpired) in this queue. The iterator does not return the
 494      * elements in any particular order.
 495      *
 496      * <p>The returned iterator is
 497      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
 498      *
 499      * @return an iterator over the elements in this queue
 500      */
 501     public Iterator<E> iterator() {
 502         return new Itr(toArray());
 503     }
 504 
 505     /**
 506      * Snapshot iterator that works off copy of underlying q array.
 507      */
 508     private class Itr implements Iterator<E> {
 509         final Object[] array; // Array of all elements
 510         int cursor;           // index of next element to return
 511         int lastRet;          // index of last element, or -1 if no such
 512 
 513         Itr(Object[] array) {
 514             lastRet = -1;
 515             this.array = array;
 516         }
 517 
 518         public boolean hasNext() {
 519             return cursor < array.length;
 520         }
 521 
 522         @SuppressWarnings("unchecked")
 523         public E next() {
 524             if (cursor >= array.length)
 525                 throw new NoSuchElementException();
 526             return (E)array[lastRet = cursor++];
 527         }
 528 
 529         public void remove() {
 530             if (lastRet < 0)
 531                 throw new IllegalStateException();
 532             removeEQ(array[lastRet]);
 533             lastRet = -1;
 534         }
 535     }
 536 
 537 }