1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.locks.Condition;
  39 import java.util.concurrent.locks.ReentrantLock;
  40 import java.util.*;
  41 
  42 /**
  43  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  44  * the same ordering rules as class {@link PriorityQueue} and supplies
  45  * blocking retrieval operations.  While this queue is logically
  46  * unbounded, attempted additions may fail due to resource exhaustion
  47  * (causing {@code OutOfMemoryError}). This class does not permit
  48  * {@code null} elements.  A priority queue relying on {@linkplain
  49  * Comparable natural ordering} also does not permit insertion of
  50  * non-comparable objects (doing so results in
  51  * {@code ClassCastException}).
  52  *
  53  * <p>This class and its iterator implement all of the
  54  * <em>optional</em> methods of the {@link Collection} and {@link
  55  * Iterator} interfaces.  The Iterator provided in method {@link
  56  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
  57  * the PriorityBlockingQueue in any particular order. If you need
  58  * ordered traversal, consider using
  59  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
  60  * can be used to <em>remove</em> some or all elements in priority
  61  * order and place them in another collection.
  62  *
  63  * <p>Operations on this class make no guarantees about the ordering
  64  * of elements with equal priority. If you need to enforce an
  65  * ordering, you can define custom classes or comparators that use a
  66  * secondary key to break ties in primary priority values.  For
  67  * example, here is a class that applies first-in-first-out
  68  * tie-breaking to comparable elements. To use it, you would insert a
  69  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
  70  *
  71  *  <pre> {@code
  72  * class FIFOEntry<E extends Comparable<? super E>>
  73  *     implements Comparable<FIFOEntry<E>> {
  74  *   static final AtomicLong seq = new AtomicLong(0);
  75  *   final long seqNum;
  76  *   final E entry;
  77  *   public FIFOEntry(E entry) {
  78  *     seqNum = seq.getAndIncrement();
  79  *     this.entry = entry;
  80  *   }
  81  *   public E getEntry() { return entry; }
  82  *   public int compareTo(FIFOEntry<E> other) {
  83  *     int res = entry.compareTo(other.entry);
  84  *     if (res == 0 && other.entry != this.entry)
  85  *       res = (seqNum < other.seqNum ? -1 : 1);
  86  *     return res;
  87  *   }
  88  * }}</pre>
  89  *
  90  * <p>This class is a member of the
  91  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  92  * Java Collections Framework</a>.
  93  *
  94  * @since 1.5
  95  * @author Doug Lea
  96  * @param <E> the type of elements held in this collection
  97  */
  98 @SuppressWarnings("unchecked")
  99 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
 100     implements BlockingQueue<E>, java.io.Serializable {
 101     private static final long serialVersionUID = 5595510919245408276L;
 102 
 103     /*
 104      * The implementation uses an array-based binary heap, with public
 105      * operations protected with a single lock. However, allocation
 106      * during resizing uses a simple spinlock (used only while not
 107      * holding main lock) in order to allow takes to operate
 108      * concurrently with allocation.  This avoids repeated
 109      * postponement of waiting consumers and consequent element
 110      * build-up. The need to back away from lock during allocation
 111      * makes it impossible to simply wrap delegated
 112      * java.util.PriorityQueue operations within a lock, as was done
 113      * in a previous version of this class. To maintain
 114      * interoperability, a plain PriorityQueue is still used during
 115      * serialization, which maintains compatibility at the expense of
 116      * transiently doubling overhead.
 117      */
 118 
 119     /**
 120      * Default array capacity.
 121      */
 122     private static final int DEFAULT_INITIAL_CAPACITY = 11;
 123 
 124     /**
 125      * The maximum size of array to allocate.
 126      * Some VMs reserve some header words in an array.
 127      * Attempts to allocate larger arrays may result in
 128      * OutOfMemoryError: Requested array size exceeds VM limit
 129      */
 130     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 131 
 132     /**
 133      * Priority queue represented as a balanced binary heap: the two
 134      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 135      * priority queue is ordered by comparator, or by the elements'
 136      * natural ordering, if comparator is null: For each node n in the
 137      * heap and each descendant d of n, n <= d.  The element with the
 138      * lowest value is in queue[0], assuming the queue is nonempty.
 139      */
 140     private transient Object[] queue;
 141 
 142     /**
 143      * The number of elements in the priority queue.
 144      */
 145     private transient int size;
 146 
 147     /**
 148      * The comparator, or null if priority queue uses elements'
 149      * natural ordering.
 150      */
 151     private transient Comparator<? super E> comparator;
 152 
 153     /**
 154      * Lock used for all public operations
 155      */
 156     private final ReentrantLock lock;
 157 
 158     /**
 159      * Condition for blocking when empty
 160      */
 161     private final Condition notEmpty;
 162 
 163     /**
 164      * Spinlock for allocation, acquired via CAS.
 165      */
 166     private transient volatile int allocationSpinLock;
 167 
 168     /**
 169      * A plain PriorityQueue used only for serialization,
 170      * to maintain compatibility with previous versions
 171      * of this class. Non-null only during serialization/deserialization.
 172      */
 173     private PriorityQueue<E> q;
 174 
 175     /**
 176      * Creates a {@code PriorityBlockingQueue} with the default
 177      * initial capacity (11) that orders its elements according to
 178      * their {@linkplain Comparable natural ordering}.
 179      */
 180     public PriorityBlockingQueue() {
 181         this(DEFAULT_INITIAL_CAPACITY, null);
 182     }
 183 
 184     /**
 185      * Creates a {@code PriorityBlockingQueue} with the specified
 186      * initial capacity that orders its elements according to their
 187      * {@linkplain Comparable natural ordering}.
 188      *
 189      * @param initialCapacity the initial capacity for this priority queue
 190      * @throws IllegalArgumentException if {@code initialCapacity} is less
 191      *         than 1
 192      */
 193     public PriorityBlockingQueue(int initialCapacity) {
 194         this(initialCapacity, null);
 195     }
 196 
 197     /**
 198      * Creates a {@code PriorityBlockingQueue} with the specified initial
 199      * capacity that orders its elements according to the specified
 200      * comparator.
 201      *
 202      * @param initialCapacity the initial capacity for this priority queue
 203      * @param  comparator the comparator that will be used to order this
 204      *         priority queue.  If {@code null}, the {@linkplain Comparable
 205      *         natural ordering} of the elements will be used.
 206      * @throws IllegalArgumentException if {@code initialCapacity} is less
 207      *         than 1
 208      */
 209     public PriorityBlockingQueue(int initialCapacity,
 210                                  Comparator<? super E> comparator) {
 211         if (initialCapacity < 1)
 212             throw new IllegalArgumentException();
 213         this.lock = new ReentrantLock();
 214         this.notEmpty = lock.newCondition();
 215         this.comparator = comparator;
 216         this.queue = new Object[initialCapacity];
 217     }
 218 
 219     /**
 220      * Creates a {@code PriorityBlockingQueue} containing the elements
 221      * in the specified collection.  If the specified collection is a
 222      * {@link SortedSet} or a {@link PriorityQueue},  this
 223      * priority queue will be ordered according to the same ordering.
 224      * Otherwise, this priority queue will be ordered according to the
 225      * {@linkplain Comparable natural ordering} of its elements.
 226      *
 227      * @param  c the collection whose elements are to be placed
 228      *         into this priority queue
 229      * @throws ClassCastException if elements of the specified collection
 230      *         cannot be compared to one another according to the priority
 231      *         queue's ordering
 232      * @throws NullPointerException if the specified collection or any
 233      *         of its elements are null
 234      */
 235     public PriorityBlockingQueue(Collection<? extends E> c) {
 236         this.lock = new ReentrantLock();
 237         this.notEmpty = lock.newCondition();
 238         boolean heapify = true; // true if not known to be in heap order
 239         boolean screen = true;  // true if must screen for nulls
 240         if (c instanceof SortedSet<?>) {
 241             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
 242             this.comparator = (Comparator<? super E>) ss.comparator();
 243             heapify = false;
 244         }
 245         else if (c instanceof PriorityBlockingQueue<?>) {
 246             PriorityBlockingQueue<? extends E> pq =
 247                 (PriorityBlockingQueue<? extends E>) c;
 248             this.comparator = (Comparator<? super E>) pq.comparator();
 249             screen = false;
 250             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
 251                 heapify = false;
 252         }
 253         Object[] a = c.toArray();
 254         int n = a.length;
 255         // If c.toArray incorrectly doesn't return Object[], copy it.
 256         if (a.getClass() != Object[].class)
 257             a = Arrays.copyOf(a, n, Object[].class);
 258         if (screen && (n == 1 || this.comparator != null)) {
 259             for (int i = 0; i < n; ++i)
 260                 if (a[i] == null)
 261                     throw new NullPointerException();
 262         }
 263         this.queue = a;
 264         this.size = n;
 265         if (heapify)
 266             heapify();
 267     }
 268 
 269     /**
 270      * Tries to grow array to accommodate at least one more element
 271      * (but normally expand by about 50%), giving up (allowing retry)
 272      * on contention (which we expect to be rare). Call only while
 273      * holding lock.
 274      *
 275      * @param array the heap array
 276      * @param oldCap the length of the array
 277      */
 278     private void tryGrow(Object[] array, int oldCap) {
 279         lock.unlock(); // must release and then re-acquire main lock
 280         Object[] newArray = null;
 281         if (allocationSpinLock == 0 &&
 282             UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
 283                                      0, 1)) {
 284             try {
 285                 int newCap = oldCap + ((oldCap < 64) ?
 286                                        (oldCap + 2) : // grow faster if small
 287                                        (oldCap >> 1));
 288                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
 289                     int minCap = oldCap + 1;
 290                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
 291                         throw new OutOfMemoryError();
 292                     newCap = MAX_ARRAY_SIZE;
 293                 }
 294                 if (newCap > oldCap && queue == array)
 295                     newArray = new Object[newCap];
 296             } finally {
 297                 allocationSpinLock = 0;
 298             }
 299         }
 300         if (newArray == null) // back off if another thread is allocating
 301             Thread.yield();
 302         lock.lock();
 303         if (newArray != null && queue == array) {
 304             queue = newArray;
 305             System.arraycopy(array, 0, newArray, 0, oldCap);
 306         }
 307     }
 308 
 309     /**
 310      * Mechanics for poll().  Call only while holding lock.
 311      */
 312     private E dequeue() {
 313         int n = size - 1;
 314         if (n < 0)
 315             return null;
 316         else {
 317             Object[] array = queue;
 318             E result = (E) array[0];
 319             E x = (E) array[n];
 320             array[n] = null;
 321             Comparator<? super E> cmp = comparator;
 322             if (cmp == null)
 323                 siftDownComparable(0, x, array, n);
 324             else
 325                 siftDownUsingComparator(0, x, array, n, cmp);
 326             size = n;
 327             return result;
 328         }
 329     }
 330 
 331     /**
 332      * Inserts item x at position k, maintaining heap invariant by
 333      * promoting x up the tree until it is greater than or equal to
 334      * its parent, or is the root.
 335      *
 336      * To simplify and speed up coercions and comparisons. the
 337      * Comparable and Comparator versions are separated into different
 338      * methods that are otherwise identical. (Similarly for siftDown.)
 339      * These methods are static, with heap state as arguments, to
 340      * simplify use in light of possible comparator exceptions.
 341      *
 342      * @param k the position to fill
 343      * @param x the item to insert
 344      * @param array the heap array
 345      * @param n heap size
 346      */
 347     private static <T> void siftUpComparable(int k, T x, Object[] array) {
 348         Comparable<? super T> key = (Comparable<? super T>) x;
 349         while (k > 0) {
 350             int parent = (k - 1) >>> 1;
 351             Object e = array[parent];
 352             if (key.compareTo((T) e) >= 0)
 353                 break;
 354             array[k] = e;
 355             k = parent;
 356         }
 357         array[k] = key;
 358     }
 359 
 360     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
 361                                        Comparator<? super T> cmp) {
 362         while (k > 0) {
 363             int parent = (k - 1) >>> 1;
 364             Object e = array[parent];
 365             if (cmp.compare(x, (T) e) >= 0)
 366                 break;
 367             array[k] = e;
 368             k = parent;
 369         }
 370         array[k] = x;
 371     }
 372 
 373     /**
 374      * Inserts item x at position k, maintaining heap invariant by
 375      * demoting x down the tree repeatedly until it is less than or
 376      * equal to its children or is a leaf.
 377      *
 378      * @param k the position to fill
 379      * @param x the item to insert
 380      * @param array the heap array
 381      * @param n heap size
 382      */
 383     private static <T> void siftDownComparable(int k, T x, Object[] array,
 384                                                int n) {
 385         if (n > 0) {
 386             Comparable<? super T> key = (Comparable<? super T>)x;
 387             int half = n >>> 1;           // loop while a non-leaf
 388             while (k < half) {
 389                 int child = (k << 1) + 1; // assume left child is least
 390                 Object c = array[child];
 391                 int right = child + 1;
 392                 if (right < n &&
 393                     ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
 394                     c = array[child = right];
 395                 if (key.compareTo((T) c) <= 0)
 396                     break;
 397                 array[k] = c;
 398                 k = child;
 399             }
 400             array[k] = key;
 401         }
 402     }
 403 
 404     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
 405                                                     int n,
 406                                                     Comparator<? super T> cmp) {
 407         if (n > 0) {
 408             int half = n >>> 1;
 409             while (k < half) {
 410                 int child = (k << 1) + 1;
 411                 Object c = array[child];
 412                 int right = child + 1;
 413                 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
 414                     c = array[child = right];
 415                 if (cmp.compare(x, (T) c) <= 0)
 416                     break;
 417                 array[k] = c;
 418                 k = child;
 419             }
 420             array[k] = x;
 421         }
 422     }
 423 
 424     /**
 425      * Establishes the heap invariant (described above) in the entire tree,
 426      * assuming nothing about the order of the elements prior to the call.
 427      */
 428     private void heapify() {
 429         Object[] array = queue;
 430         int n = size;
 431         int half = (n >>> 1) - 1;
 432         Comparator<? super E> cmp = comparator;
 433         if (cmp == null) {
 434             for (int i = half; i >= 0; i--)
 435                 siftDownComparable(i, (E) array[i], array, n);
 436         }
 437         else {
 438             for (int i = half; i >= 0; i--)
 439                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
 440         }
 441     }
 442 
 443     /**
 444      * Inserts the specified element into this priority queue.
 445      *
 446      * @param e the element to add
 447      * @return {@code true} (as specified by {@link Collection#add})
 448      * @throws ClassCastException if the specified element cannot be compared
 449      *         with elements currently in the priority queue according to the
 450      *         priority queue's ordering
 451      * @throws NullPointerException if the specified element is null
 452      */
 453     public boolean add(E e) {
 454         return offer(e);
 455     }
 456 
 457     /**
 458      * Inserts the specified element into this priority queue.
 459      * As the queue is unbounded, this method will never return {@code false}.
 460      *
 461      * @param e the element to add
 462      * @return {@code true} (as specified by {@link Queue#offer})
 463      * @throws ClassCastException if the specified element cannot be compared
 464      *         with elements currently in the priority queue according to the
 465      *         priority queue's ordering
 466      * @throws NullPointerException if the specified element is null
 467      */
 468     public boolean offer(E e) {
 469         if (e == null)
 470             throw new NullPointerException();
 471         final ReentrantLock lock = this.lock;
 472         lock.lock();
 473         int n, cap;
 474         Object[] array;
 475         while ((n = size) >= (cap = (array = queue).length))
 476             tryGrow(array, cap);
 477         try {
 478             Comparator<? super E> cmp = comparator;
 479             if (cmp == null)
 480                 siftUpComparable(n, e, array);
 481             else
 482                 siftUpUsingComparator(n, e, array, cmp);
 483             size = n + 1;
 484             notEmpty.signal();
 485         } finally {
 486             lock.unlock();
 487         }
 488         return true;
 489     }
 490 
 491     /**
 492      * Inserts the specified element into this priority queue.
 493      * As the queue is unbounded, this method will never block.
 494      *
 495      * @param e the element to add
 496      * @throws ClassCastException if the specified element cannot be compared
 497      *         with elements currently in the priority queue according to the
 498      *         priority queue's ordering
 499      * @throws NullPointerException if the specified element is null
 500      */
 501     public void put(E e) {
 502         offer(e); // never need to block
 503     }
 504 
 505     /**
 506      * Inserts the specified element into this priority queue.
 507      * As the queue is unbounded, this method will never block or
 508      * return {@code false}.
 509      *
 510      * @param e the element to add
 511      * @param timeout This parameter is ignored as the method never blocks
 512      * @param unit This parameter is ignored as the method never blocks
 513      * @return {@code true} (as specified by
 514      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
 515      * @throws ClassCastException if the specified element cannot be compared
 516      *         with elements currently in the priority queue according to the
 517      *         priority queue's ordering
 518      * @throws NullPointerException if the specified element is null
 519      */
 520     public boolean offer(E e, long timeout, TimeUnit unit) {
 521         return offer(e); // never need to block
 522     }
 523 
 524     public E poll() {
 525         final ReentrantLock lock = this.lock;
 526         lock.lock();
 527         try {
 528             return dequeue();
 529         } finally {
 530             lock.unlock();
 531         }
 532     }
 533 
 534     public E take() throws InterruptedException {
 535         final ReentrantLock lock = this.lock;
 536         lock.lockInterruptibly();
 537         E result;
 538         try {
 539             while ( (result = dequeue()) == null)
 540                 notEmpty.await();
 541         } finally {
 542             lock.unlock();
 543         }
 544         return result;
 545     }
 546 
 547     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 548         long nanos = unit.toNanos(timeout);
 549         final ReentrantLock lock = this.lock;
 550         lock.lockInterruptibly();
 551         E result;
 552         try {
 553             while ( (result = dequeue()) == null && nanos > 0)
 554                 nanos = notEmpty.awaitNanos(nanos);
 555         } finally {
 556             lock.unlock();
 557         }
 558         return result;
 559     }
 560 
 561     public E peek() {
 562         final ReentrantLock lock = this.lock;
 563         lock.lock();
 564         try {
 565             return (size == 0) ? null : (E) queue[0];
 566         } finally {
 567             lock.unlock();
 568         }
 569     }
 570 
 571     /**
 572      * Returns the comparator used to order the elements in this queue,
 573      * or {@code null} if this queue uses the {@linkplain Comparable
 574      * natural ordering} of its elements.
 575      *
 576      * @return the comparator used to order the elements in this queue,
 577      *         or {@code null} if this queue uses the natural
 578      *         ordering of its elements
 579      */
 580     public Comparator<? super E> comparator() {
 581         return comparator;
 582     }
 583 
 584     public int size() {
 585         final ReentrantLock lock = this.lock;
 586         lock.lock();
 587         try {
 588             return size;
 589         } finally {
 590             lock.unlock();
 591         }
 592     }
 593 
 594     /**
 595      * Always returns {@code Integer.MAX_VALUE} because
 596      * a {@code PriorityBlockingQueue} is not capacity constrained.
 597      * @return {@code Integer.MAX_VALUE} always
 598      */
 599     public int remainingCapacity() {
 600         return Integer.MAX_VALUE;
 601     }
 602 
 603     private int indexOf(Object o) {
 604         if (o != null) {
 605             Object[] array = queue;
 606             int n = size;
 607             for (int i = 0; i < n; i++)
 608                 if (o.equals(array[i]))
 609                     return i;
 610         }
 611         return -1;
 612     }
 613 
 614     /**
 615      * Removes the ith element from queue.
 616      */
 617     private void removeAt(int i) {
 618         Object[] array = queue;
 619         int n = size - 1;
 620         if (n == i) // removed last element
 621             array[i] = null;
 622         else {
 623             E moved = (E) array[n];
 624             array[n] = null;
 625             Comparator<? super E> cmp = comparator;
 626             if (cmp == null)
 627                 siftDownComparable(i, moved, array, n);
 628             else
 629                 siftDownUsingComparator(i, moved, array, n, cmp);
 630             if (array[i] == moved) {
 631                 if (cmp == null)
 632                     siftUpComparable(i, moved, array);
 633                 else
 634                     siftUpUsingComparator(i, moved, array, cmp);
 635             }
 636         }
 637         size = n;
 638     }
 639 
 640     /**
 641      * Removes a single instance of the specified element from this queue,
 642      * if it is present.  More formally, removes an element {@code e} such
 643      * that {@code o.equals(e)}, if this queue contains one or more such
 644      * elements.  Returns {@code true} if and only if this queue contained
 645      * the specified element (or equivalently, if this queue changed as a
 646      * result of the call).
 647      *
 648      * @param o element to be removed from this queue, if present
 649      * @return {@code true} if this queue changed as a result of the call
 650      */
 651     public boolean remove(Object o) {
 652         final ReentrantLock lock = this.lock;
 653         lock.lock();
 654         try {
 655             int i = indexOf(o);
 656             if (i == -1)
 657                 return false;
 658             removeAt(i);
 659             return true;
 660         } finally {
 661             lock.unlock();
 662         }
 663     }
 664 
 665     /**
 666      * Identity-based version for use in Itr.remove
 667      */
 668     void removeEQ(Object o) {
 669         final ReentrantLock lock = this.lock;
 670         lock.lock();
 671         try {
 672             Object[] array = queue;
 673             for (int i = 0, n = size; i < n; i++) {
 674                 if (o == array[i]) {
 675                     removeAt(i);
 676                     break;
 677                 }
 678             }
 679         } finally {
 680             lock.unlock();
 681         }
 682     }
 683 
 684     /**
 685      * Returns {@code true} if this queue contains the specified element.
 686      * More formally, returns {@code true} if and only if this queue contains
 687      * at least one element {@code e} such that {@code o.equals(e)}.
 688      *
 689      * @param o object to be checked for containment in this queue
 690      * @return {@code true} if this queue contains the specified element
 691      */
 692     public boolean contains(Object o) {
 693         final ReentrantLock lock = this.lock;
 694         lock.lock();
 695         try {
 696             return indexOf(o) != -1;
 697         } finally {
 698             lock.unlock();
 699         }
 700     }
 701 
 702     /**
 703      * Returns an array containing all of the elements in this queue.
 704      * The returned array elements are in no particular order.
 705      *
 706      * <p>The returned array will be "safe" in that no references to it are
 707      * maintained by this queue.  (In other words, this method must allocate
 708      * a new array).  The caller is thus free to modify the returned array.
 709      *
 710      * <p>This method acts as bridge between array-based and collection-based
 711      * APIs.
 712      *
 713      * @return an array containing all of the elements in this queue
 714      */
 715     public Object[] toArray() {
 716         final ReentrantLock lock = this.lock;
 717         lock.lock();
 718         try {
 719             return Arrays.copyOf(queue, size);
 720         } finally {
 721             lock.unlock();
 722         }
 723     }
 724 
 725     public String toString() {
 726         final ReentrantLock lock = this.lock;
 727         lock.lock();
 728         try {
 729             int n = size;
 730             if (n == 0)
 731                 return "[]";
 732             StringBuilder sb = new StringBuilder();
 733             sb.append('[');
 734             for (int i = 0; i < n; ++i) {
 735                 Object e = queue[i];
 736                 sb.append(e == this ? "(this Collection)" : e);
 737                 if (i != n - 1)
 738                     sb.append(',').append(' ');
 739             }
 740             return sb.append(']').toString();
 741         } finally {
 742             lock.unlock();
 743         }
 744     }
 745 
 746     /**
 747      * @throws UnsupportedOperationException {@inheritDoc}
 748      * @throws ClassCastException            {@inheritDoc}
 749      * @throws NullPointerException          {@inheritDoc}
 750      * @throws IllegalArgumentException      {@inheritDoc}
 751      */
 752     public int drainTo(Collection<? super E> c) {
 753         return drainTo(c, Integer.MAX_VALUE);
 754     }
 755 
 756     /**
 757      * @throws UnsupportedOperationException {@inheritDoc}
 758      * @throws ClassCastException            {@inheritDoc}
 759      * @throws NullPointerException          {@inheritDoc}
 760      * @throws IllegalArgumentException      {@inheritDoc}
 761      */
 762     public int drainTo(Collection<? super E> c, int maxElements) {
 763         if (c == null)
 764             throw new NullPointerException();
 765         if (c == this)
 766             throw new IllegalArgumentException();
 767         if (maxElements <= 0)
 768             return 0;
 769         final ReentrantLock lock = this.lock;
 770         lock.lock();
 771         try {
 772             int n = Math.min(size, maxElements);
 773             for (int i = 0; i < n; i++) {
 774                 c.add((E) queue[0]); // In this order, in case add() throws.
 775                 dequeue();
 776             }
 777             return n;
 778         } finally {
 779             lock.unlock();
 780         }
 781     }
 782 
 783     /**
 784      * Atomically removes all of the elements from this queue.
 785      * The queue will be empty after this call returns.
 786      */
 787     public void clear() {
 788         final ReentrantLock lock = this.lock;
 789         lock.lock();
 790         try {
 791             Object[] array = queue;
 792             int n = size;
 793             size = 0;
 794             for (int i = 0; i < n; i++)
 795                 array[i] = null;
 796         } finally {
 797             lock.unlock();
 798         }
 799     }
 800 
 801     /**
 802      * Returns an array containing all of the elements in this queue; the
 803      * runtime type of the returned array is that of the specified array.
 804      * The returned array elements are in no particular order.
 805      * If the queue fits in the specified array, it is returned therein.
 806      * Otherwise, a new array is allocated with the runtime type of the
 807      * specified array and the size of this queue.
 808      *
 809      * <p>If this queue fits in the specified array with room to spare
 810      * (i.e., the array has more elements than this queue), the element in
 811      * the array immediately following the end of the queue is set to
 812      * {@code null}.
 813      *
 814      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 815      * array-based and collection-based APIs.  Further, this method allows
 816      * precise control over the runtime type of the output array, and may,
 817      * under certain circumstances, be used to save allocation costs.
 818      *
 819      * <p>Suppose {@code x} is a queue known to contain only strings.
 820      * The following code can be used to dump the queue into a newly
 821      * allocated array of {@code String}:
 822      *
 823      *  <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
 824      *
 825      * Note that {@code toArray(new Object[0])} is identical in function to
 826      * {@code toArray()}.
 827      *
 828      * @param a the array into which the elements of the queue are to
 829      *          be stored, if it is big enough; otherwise, a new array of the
 830      *          same runtime type is allocated for this purpose
 831      * @return an array containing all of the elements in this queue
 832      * @throws ArrayStoreException if the runtime type of the specified array
 833      *         is not a supertype of the runtime type of every element in
 834      *         this queue
 835      * @throws NullPointerException if the specified array is null
 836      */
 837     public <T> T[] toArray(T[] a) {
 838         final ReentrantLock lock = this.lock;
 839         lock.lock();
 840         try {
 841             int n = size;
 842             if (a.length < n)
 843                 // Make a new array of a's runtime type, but my contents:
 844                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
 845             System.arraycopy(queue, 0, a, 0, n);
 846             if (a.length > n)
 847                 a[n] = null;
 848             return a;
 849         } finally {
 850             lock.unlock();
 851         }
 852     }
 853 
 854     /**
 855      * Returns an iterator over the elements in this queue. The
 856      * iterator does not return the elements in any particular order.
 857      *
 858      * <p>The returned iterator is a "weakly consistent" iterator that
 859      * will never throw {@link java.util.ConcurrentModificationException
 860      * ConcurrentModificationException}, and guarantees to traverse
 861      * elements as they existed upon construction of the iterator, and
 862      * may (but is not guaranteed to) reflect any modifications
 863      * subsequent to construction.
 864      *
 865      * @return an iterator over the elements in this queue
 866      */
 867     public Iterator<E> iterator() {
 868         return new Itr(toArray());
 869     }
 870 
 871     /**
 872      * Snapshot iterator that works off copy of underlying q array.
 873      */
 874     final class Itr implements Iterator<E> {
 875         final Object[] array; // Array of all elements
 876         int cursor;           // index of next element to return
 877         int lastRet;          // index of last element, or -1 if no such
 878 
 879         Itr(Object[] array) {
 880             lastRet = -1;
 881             this.array = array;
 882         }
 883 
 884         public boolean hasNext() {
 885             return cursor < array.length;
 886         }
 887 
 888         public E next() {
 889             if (cursor >= array.length)
 890                 throw new NoSuchElementException();
 891             lastRet = cursor;
 892             return (E)array[cursor++];
 893         }
 894 
 895         public void remove() {
 896             if (lastRet < 0)
 897                 throw new IllegalStateException();
 898             removeEQ(array[lastRet]);
 899             lastRet = -1;
 900         }
 901     }
 902 
 903     /**
 904      * Saves this queue to a stream (that is, serializes it).
 905      *
 906      * For compatibility with previous version of this class, elements
 907      * are first copied to a java.util.PriorityQueue, which is then
 908      * serialized.
 909      */
 910     private void writeObject(java.io.ObjectOutputStream s)
 911         throws java.io.IOException {
 912         lock.lock();
 913         try {
 914             // avoid zero capacity argument
 915             q = new PriorityQueue<E>(Math.max(size, 1), comparator);
 916             q.addAll(this);
 917             s.defaultWriteObject();
 918         } finally {
 919             q = null;
 920             lock.unlock();
 921         }
 922     }
 923 
 924     /**
 925      * Reconstitutes this queue from a stream (that is, deserializes it).
 926      */
 927     private void readObject(java.io.ObjectInputStream s)
 928         throws java.io.IOException, ClassNotFoundException {
 929         try {
 930             s.defaultReadObject();
 931             this.queue = new Object[q.size()];
 932             comparator = q.comparator();
 933             addAll(q);
 934         } finally {
 935             q = null;
 936         }
 937     }
 938 
 939     // Unsafe mechanics
 940     private static final sun.misc.Unsafe UNSAFE;
 941     private static final long allocationSpinLockOffset;
 942     static {
 943         try {
 944             UNSAFE = sun.misc.Unsafe.getUnsafe();
 945             Class<?> k = PriorityBlockingQueue.class;
 946             allocationSpinLockOffset = UNSAFE.objectFieldOffset
 947                 (k.getDeclaredField("allocationSpinLock"));
 948         } catch (Exception e) {
 949             throw new Error(e);
 950         }
 951     }
 952 }