1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.locks.*;
  39 import java.util.*;
  40 
  41 /**
  42  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  43  * the same ordering rules as class {@link PriorityQueue} and supplies
  44  * blocking retrieval operations.  While this queue is logically
  45  * unbounded, attempted additions may fail due to resource exhaustion
  46  * (causing <tt>OutOfMemoryError</tt>). This class does not permit
  47  * <tt>null</tt> elements.  A priority queue relying on {@linkplain
  48  * Comparable natural ordering} also does not permit insertion of
  49  * non-comparable objects (doing so results in
  50  * <tt>ClassCastException</tt>).
  51  *
  52  * <p>This class and its iterator implement all of the
  53  * <em>optional</em> methods of the {@link Collection} and {@link
  54  * Iterator} interfaces.  The Iterator provided in method {@link
  55  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
  56  * the PriorityBlockingQueue in any particular order. If you need
  57  * ordered traversal, consider using
  58  * <tt>Arrays.sort(pq.toArray())</tt>.  Also, method <tt>drainTo</tt>
  59  * can be used to <em>remove</em> some or all elements in priority
  60  * order and place them in another collection.
  61  *
  62  * <p>Operations on this class make no guarantees about the ordering
  63  * of elements with equal priority. If you need to enforce an
  64  * ordering, you can define custom classes or comparators that use a
  65  * secondary key to break ties in primary priority values.  For
  66  * example, here is a class that applies first-in-first-out
  67  * tie-breaking to comparable elements. To use it, you would insert a
  68  * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
  69  *
  70  * <pre>
  71  * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
  72  *     implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
  73  *   final static AtomicLong seq = new AtomicLong();
  74  *   final long seqNum;
  75  *   final E entry;
  76  *   public FIFOEntry(E entry) {
  77  *     seqNum = seq.getAndIncrement();
  78  *     this.entry = entry;
  79  *   }
  80  *   public E getEntry() { return entry; }
  81  *   public int compareTo(FIFOEntry&lt;E&gt; other) {
  82  *     int res = entry.compareTo(other.entry);
  83  *     if (res == 0 &amp;&amp; other.entry != this.entry)
  84  *       res = (seqNum &lt; other.seqNum ? -1 : 1);
  85  *     return res;
  86  *   }
  87  * }</pre>
  88  *
  89  * <p>This class is a member of the
  90  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  91  * Java Collections Framework</a>.
  92  *
  93  * @since 1.5
  94  * @author Doug Lea
  95  * @param <E> the type of elements held in this collection
  96  */
  97 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
  98     implements BlockingQueue<E>, java.io.Serializable {
  99     private static final long serialVersionUID = 5595510919245408276L;
 100 
 101     private final PriorityQueue<E> q;
 102     private final ReentrantLock lock = new ReentrantLock(true);
 103     private final Condition notEmpty = lock.newCondition();
 104 
 105     /**
 106      * Creates a <tt>PriorityBlockingQueue</tt> with the default
 107      * initial capacity (11) that orders its elements according to
 108      * their {@linkplain Comparable natural ordering}.
 109      */
 110     public PriorityBlockingQueue() {
 111         q = new PriorityQueue<E>();
 112     }
 113 
 114     /**
 115      * Creates a <tt>PriorityBlockingQueue</tt> with the specified
 116      * initial capacity that orders its elements according to their
 117      * {@linkplain Comparable natural ordering}.
 118      *
 119      * @param initialCapacity the initial capacity for this priority queue
 120      * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
 121      *         than 1
 122      */
 123     public PriorityBlockingQueue(int initialCapacity) {
 124         q = new PriorityQueue<E>(initialCapacity, null);
 125     }
 126 
 127     /**
 128      * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
 129      * capacity that orders its elements according to the specified
 130      * comparator.
 131      *
 132      * @param initialCapacity the initial capacity for this priority queue
 133      * @param  comparator the comparator that will be used to order this
 134      *         priority queue.  If {@code null}, the {@linkplain Comparable
 135      *         natural ordering} of the elements will be used.
 136      * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
 137      *         than 1
 138      */
 139     public PriorityBlockingQueue(int initialCapacity,
 140                                  Comparator<? super E> comparator) {
 141         q = new PriorityQueue<E>(initialCapacity, comparator);
 142     }
 143 
 144     /**
 145      * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
 146      * in the specified collection.  If the specified collection is a
 147      * {@link SortedSet} or a {@link PriorityQueue},  this
 148      * priority queue will be ordered according to the same ordering.
 149      * Otherwise, this priority queue will be ordered according to the
 150      * {@linkplain Comparable natural ordering} of its elements.
 151      *
 152      * @param  c the collection whose elements are to be placed
 153      *         into this priority queue
 154      * @throws ClassCastException if elements of the specified collection
 155      *         cannot be compared to one another according to the priority
 156      *         queue's ordering
 157      * @throws NullPointerException if the specified collection or any
 158      *         of its elements are null
 159      */
 160     public PriorityBlockingQueue(Collection<? extends E> c) {
 161         q = new PriorityQueue<E>(c);
 162     }
 163 
 164     /**
 165      * Inserts the specified element into this priority queue.
 166      *
 167      * @param e the element to add
 168      * @return <tt>true</tt> (as specified by {@link Collection#add})
 169      * @throws ClassCastException if the specified element cannot be compared
 170      *         with elements currently in the priority queue according to the
 171      *         priority queue's ordering
 172      * @throws NullPointerException if the specified element is null
 173      */
 174     public boolean add(E e) {
 175         return offer(e);
 176     }
 177 
 178     /**
 179      * Inserts the specified element into this priority queue.
 180      *
 181      * @param e the element to add
 182      * @return <tt>true</tt> (as specified by {@link Queue#offer})
 183      * @throws ClassCastException if the specified element cannot be compared
 184      *         with elements currently in the priority queue according to the
 185      *         priority queue's ordering
 186      * @throws NullPointerException if the specified element is null
 187      */
 188     public boolean offer(E e) {
 189         final ReentrantLock lock = this.lock;
 190         lock.lock();
 191         try {
 192             boolean ok = q.offer(e);
 193             assert ok;
 194             notEmpty.signal();
 195             return true;
 196         } finally {
 197             lock.unlock();
 198         }
 199     }
 200 
 201     /**
 202      * Inserts the specified element into this priority queue. As the queue is
 203      * unbounded this method will never block.
 204      *
 205      * @param e the element to add
 206      * @throws ClassCastException if the specified element cannot be compared
 207      *         with elements currently in the priority queue according to the
 208      *         priority queue's ordering
 209      * @throws NullPointerException if the specified element is null
 210      */
 211     public void put(E e) {
 212         offer(e); // never need to block
 213     }
 214 
 215     /**
 216      * Inserts the specified element into this priority queue. As the queue is
 217      * unbounded this method will never block.
 218      *
 219      * @param e the element to add
 220      * @param timeout This parameter is ignored as the method never blocks
 221      * @param unit This parameter is ignored as the method never blocks
 222      * @return <tt>true</tt>
 223      * @throws ClassCastException if the specified element cannot be compared
 224      *         with elements currently in the priority queue according to the
 225      *         priority queue's ordering
 226      * @throws NullPointerException if the specified element is null
 227      */
 228     public boolean offer(E e, long timeout, TimeUnit unit) {
 229         return offer(e); // never need to block
 230     }
 231 
 232     public E poll() {
 233         final ReentrantLock lock = this.lock;
 234         lock.lock();
 235         try {
 236             return q.poll();
 237         } finally {
 238             lock.unlock();
 239         }
 240     }
 241 
 242     public E take() throws InterruptedException {
 243         final ReentrantLock lock = this.lock;
 244         lock.lockInterruptibly();
 245         try {
 246             try {
 247                 while (q.size() == 0)
 248                     notEmpty.await();
 249             } catch (InterruptedException ie) {
 250                 notEmpty.signal(); // propagate to non-interrupted thread
 251                 throw ie;
 252             }
 253             E x = q.poll();
 254             assert x != null;
 255             return x;
 256         } finally {
 257             lock.unlock();
 258         }
 259     }
 260 
 261     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 262         long nanos = unit.toNanos(timeout);
 263         final ReentrantLock lock = this.lock;
 264         lock.lockInterruptibly();
 265         try {
 266             for (;;) {
 267                 E x = q.poll();
 268                 if (x != null)
 269                     return x;
 270                 if (nanos <= 0)
 271                     return null;
 272                 try {
 273                     nanos = notEmpty.awaitNanos(nanos);
 274                 } catch (InterruptedException ie) {
 275                     notEmpty.signal(); // propagate to non-interrupted thread
 276                     throw ie;
 277                 }
 278             }
 279         } finally {
 280             lock.unlock();
 281         }
 282     }
 283 
 284     public E peek() {
 285         final ReentrantLock lock = this.lock;
 286         lock.lock();
 287         try {
 288             return q.peek();
 289         } finally {
 290             lock.unlock();
 291         }
 292     }
 293 
 294     /**
 295      * Returns the comparator used to order the elements in this queue,
 296      * or <tt>null</tt> if this queue uses the {@linkplain Comparable
 297      * natural ordering} of its elements.
 298      *
 299      * @return the comparator used to order the elements in this queue,
 300      *         or <tt>null</tt> if this queue uses the natural
 301      *         ordering of its elements
 302      */
 303     public Comparator<? super E> comparator() {
 304         return q.comparator();
 305     }
 306 
 307     public int size() {
 308         final ReentrantLock lock = this.lock;
 309         lock.lock();
 310         try {
 311             return q.size();
 312         } finally {
 313             lock.unlock();
 314         }
 315     }
 316 
 317     /**
 318      * Always returns <tt>Integer.MAX_VALUE</tt> because
 319      * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
 320      * @return <tt>Integer.MAX_VALUE</tt>
 321      */
 322     public int remainingCapacity() {
 323         return Integer.MAX_VALUE;
 324     }
 325 
 326     /**
 327      * Removes a single instance of the specified element from this queue,
 328      * if it is present.  More formally, removes an element {@code e} such
 329      * that {@code o.equals(e)}, if this queue contains one or more such
 330      * elements.  Returns {@code true} if and only if this queue contained
 331      * the specified element (or equivalently, if this queue changed as a
 332      * result of the call).
 333      *
 334      * @param o element to be removed from this queue, if present
 335      * @return <tt>true</tt> if this queue changed as a result of the call
 336      */
 337     public boolean remove(Object o) {
 338         final ReentrantLock lock = this.lock;
 339         lock.lock();
 340         try {
 341             return q.remove(o);
 342         } finally {
 343             lock.unlock();
 344         }
 345     }
 346 
 347     /**
 348      * Returns {@code true} if this queue contains the specified element.
 349      * More formally, returns {@code true} if and only if this queue contains
 350      * at least one element {@code e} such that {@code o.equals(e)}.
 351      *
 352      * @param o object to be checked for containment in this queue
 353      * @return <tt>true</tt> if this queue contains the specified element
 354      */
 355     public boolean contains(Object o) {
 356         final ReentrantLock lock = this.lock;
 357         lock.lock();
 358         try {
 359             return q.contains(o);
 360         } finally {
 361             lock.unlock();
 362         }
 363     }
 364 
 365     /**
 366      * Returns an array containing all of the elements in this queue.
 367      * The returned array elements are in no particular order.
 368      *
 369      * <p>The returned array will be "safe" in that no references to it are
 370      * maintained by this queue.  (In other words, this method must allocate
 371      * a new array).  The caller is thus free to modify the returned array.
 372      *
 373      * <p>This method acts as bridge between array-based and collection-based
 374      * APIs.
 375      *
 376      * @return an array containing all of the elements in this queue
 377      */
 378     public Object[] toArray() {
 379         final ReentrantLock lock = this.lock;
 380         lock.lock();
 381         try {
 382             return q.toArray();
 383         } finally {
 384             lock.unlock();
 385         }
 386     }
 387 
 388 
 389     public String toString() {
 390         final ReentrantLock lock = this.lock;
 391         lock.lock();
 392         try {
 393             return q.toString();
 394         } finally {
 395             lock.unlock();
 396         }
 397     }
 398 
 399     /**
 400      * @throws UnsupportedOperationException {@inheritDoc}
 401      * @throws ClassCastException            {@inheritDoc}
 402      * @throws NullPointerException          {@inheritDoc}
 403      * @throws IllegalArgumentException      {@inheritDoc}
 404      */
 405     public int drainTo(Collection<? super E> c) {
 406         if (c == null)
 407             throw new NullPointerException();
 408         if (c == this)
 409             throw new IllegalArgumentException();
 410         final ReentrantLock lock = this.lock;
 411         lock.lock();
 412         try {
 413             int n = 0;
 414             E e;
 415             while ( (e = q.poll()) != null) {
 416                 c.add(e);
 417                 ++n;
 418             }
 419             return n;
 420         } finally {
 421             lock.unlock();
 422         }
 423     }
 424 
 425     /**
 426      * @throws UnsupportedOperationException {@inheritDoc}
 427      * @throws ClassCastException            {@inheritDoc}
 428      * @throws NullPointerException          {@inheritDoc}
 429      * @throws IllegalArgumentException      {@inheritDoc}
 430      */
 431     public int drainTo(Collection<? super E> c, int maxElements) {
 432         if (c == null)
 433             throw new NullPointerException();
 434         if (c == this)
 435             throw new IllegalArgumentException();
 436         if (maxElements <= 0)
 437             return 0;
 438         final ReentrantLock lock = this.lock;
 439         lock.lock();
 440         try {
 441             int n = 0;
 442             E e;
 443             while (n < maxElements && (e = q.poll()) != null) {
 444                 c.add(e);
 445                 ++n;
 446             }
 447             return n;
 448         } finally {
 449             lock.unlock();
 450         }
 451     }
 452 
 453     /**
 454      * Atomically removes all of the elements from this queue.
 455      * The queue will be empty after this call returns.
 456      */
 457     public void clear() {
 458         final ReentrantLock lock = this.lock;
 459         lock.lock();
 460         try {
 461             q.clear();
 462         } finally {
 463             lock.unlock();
 464         }
 465     }
 466 
 467     /**
 468      * Returns an array containing all of the elements in this queue; the
 469      * runtime type of the returned array is that of the specified array.
 470      * The returned array elements are in no particular order.
 471      * If the queue fits in the specified array, it is returned therein.
 472      * Otherwise, a new array is allocated with the runtime type of the
 473      * specified array and the size of this queue.
 474      *
 475      * <p>If this queue fits in the specified array with room to spare
 476      * (i.e., the array has more elements than this queue), the element in
 477      * the array immediately following the end of the queue is set to
 478      * <tt>null</tt>.
 479      *
 480      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 481      * array-based and collection-based APIs.  Further, this method allows
 482      * precise control over the runtime type of the output array, and may,
 483      * under certain circumstances, be used to save allocation costs.
 484      *
 485      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
 486      * The following code can be used to dump the queue into a newly
 487      * allocated array of <tt>String</tt>:
 488      *
 489      * <pre>
 490      *     String[] y = x.toArray(new String[0]);</pre>
 491      *
 492      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
 493      * <tt>toArray()</tt>.
 494      *
 495      * @param a the array into which the elements of the queue are to
 496      *          be stored, if it is big enough; otherwise, a new array of the
 497      *          same runtime type is allocated for this purpose
 498      * @return an array containing all of the elements in this queue
 499      * @throws ArrayStoreException if the runtime type of the specified array
 500      *         is not a supertype of the runtime type of every element in
 501      *         this queue
 502      * @throws NullPointerException if the specified array is null
 503      */
 504     public <T> T[] toArray(T[] a) {
 505         final ReentrantLock lock = this.lock;
 506         lock.lock();
 507         try {
 508             return q.toArray(a);
 509         } finally {
 510             lock.unlock();
 511         }
 512     }
 513 
 514     /**
 515      * Returns an iterator over the elements in this queue. The
 516      * iterator does not return the elements in any particular order.
 517      * The returned <tt>Iterator</tt> is a "weakly consistent"
 518      * iterator that will never throw {@link
 519      * ConcurrentModificationException}, and guarantees to traverse
 520      * elements as they existed upon construction of the iterator, and
 521      * may (but is not guaranteed to) reflect any modifications
 522      * subsequent to construction.
 523      *
 524      * @return an iterator over the elements in this queue
 525      */
 526     public Iterator<E> iterator() {
 527         return new Itr(toArray());
 528     }
 529 
 530     /**
 531      * Snapshot iterator that works off copy of underlying q array.
 532      */
 533     private class Itr implements Iterator<E> {
 534         final Object[] array; // Array of all elements
 535         int cursor;           // index of next element to return;
 536         int lastRet;          // index of last element, or -1 if no such
 537 
 538         Itr(Object[] array) {
 539             lastRet = -1;
 540             this.array = array;
 541         }
 542 
 543         public boolean hasNext() {
 544             return cursor < array.length;
 545         }
 546 
 547         public E next() {
 548             if (cursor >= array.length)
 549                 throw new NoSuchElementException();
 550             lastRet = cursor;
 551             return (E)array[cursor++];
 552         }
 553 
 554         public void remove() {
 555             if (lastRet < 0)
 556                 throw new IllegalStateException();
 557             Object x = array[lastRet];
 558             lastRet = -1;
 559             // Traverse underlying queue to find == element,
 560             // not just a .equals element.
 561             lock.lock();
 562             try {
 563                 for (Iterator it = q.iterator(); it.hasNext(); ) {
 564                     if (it.next() == x) {
 565                         it.remove();
 566                         return;
 567                     }
 568                 }
 569             } finally {
 570                 lock.unlock();
 571             }
 572         }
 573     }
 574 
 575     /**
 576      * Saves the state to a stream (that is, serializes it).  This
 577      * merely wraps default serialization within lock.  The
 578      * serialization strategy for items is left to underlying
 579      * Queue. Note that locking is not needed on deserialization, so
 580      * readObject is not defined, just relying on default.
 581      */
 582     private void writeObject(java.io.ObjectOutputStream s)
 583         throws java.io.IOException {
 584         lock.lock();
 585         try {
 586             s.defaultWriteObject();
 587         } finally {
 588             lock.unlock();
 589         }
 590     }
 591 
 592 }