1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.invoke.MethodHandles;
  39 import java.lang.invoke.VarHandle;
  40 import java.util.AbstractQueue;
  41 import java.util.Arrays;
  42 import java.util.Collection;
  43 import java.util.Comparator;
  44 import java.util.Iterator;
  45 import java.util.NoSuchElementException;
  46 import java.util.Objects;
  47 import java.util.PriorityQueue;
  48 import java.util.Queue;
  49 import java.util.SortedSet;
  50 import java.util.Spliterator;
  51 import java.util.concurrent.locks.Condition;
  52 import java.util.concurrent.locks.ReentrantLock;
  53 import java.util.function.Consumer;
  54 
  55 /**
  56  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  57  * the same ordering rules as class {@link PriorityQueue} and supplies
  58  * blocking retrieval operations.  While this queue is logically
  59  * unbounded, attempted additions may fail due to resource exhaustion
  60  * (causing {@code OutOfMemoryError}). This class does not permit
  61  * {@code null} elements.  A priority queue relying on {@linkplain
  62  * Comparable natural ordering} also does not permit insertion of
  63  * non-comparable objects (doing so results in
  64  * {@code ClassCastException}).
  65  *
  66  * <p>This class and its iterator implement all of the <em>optional</em>
  67  * methods of the {@link Collection} and {@link Iterator} interfaces.
  68  * The Iterator provided in method {@link #iterator()} and the
  69  * Spliterator provided in method {@link #spliterator()} are <em>not</em>
  70  * guaranteed to traverse the elements of the PriorityBlockingQueue in
  71  * any particular order. If you need ordered traversal, consider using
  72  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo} can
  73  * be used to <em>remove</em> some or all elements in priority order and
  74  * place them in another collection.
  75  *
  76  * <p>Operations on this class make no guarantees about the ordering
  77  * of elements with equal priority. If you need to enforce an
  78  * ordering, you can define custom classes or comparators that use a
  79  * secondary key to break ties in primary priority values.  For
  80  * example, here is a class that applies first-in-first-out
  81  * tie-breaking to comparable elements. To use it, you would insert a
  82  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
  83  *
  84  * <pre> {@code
  85  * class FIFOEntry<E extends Comparable<? super E>>
  86  *     implements Comparable<FIFOEntry<E>> {
  87  *   static final AtomicLong seq = new AtomicLong(0);
  88  *   final long seqNum;
  89  *   final E entry;
  90  *   public FIFOEntry(E entry) {
  91  *     seqNum = seq.getAndIncrement();
  92  *     this.entry = entry;
  93  *   }
  94  *   public E getEntry() { return entry; }
  95  *   public int compareTo(FIFOEntry<E> other) {
  96  *     int res = entry.compareTo(other.entry);
  97  *     if (res == 0 && other.entry != this.entry)
  98  *       res = (seqNum < other.seqNum ? -1 : 1);
  99  *     return res;
 100  *   }
 101  * }}</pre>
 102  *
 103  * <p>This class is a member of the
 104  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
 105  * Java Collections Framework</a>.
 106  *
 107  * @since 1.5
 108  * @author Doug Lea
 109  * @param <E> the type of elements held in this queue
 110  */
 111 @SuppressWarnings("unchecked")
 112 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
 113     implements BlockingQueue<E>, java.io.Serializable {
 114     private static final long serialVersionUID = 5595510919245408276L;
 115 
 116     /*
 117      * The implementation uses an array-based binary heap, with public
 118      * operations protected with a single lock. However, allocation
 119      * during resizing uses a simple spinlock (used only while not
 120      * holding main lock) in order to allow takes to operate
 121      * concurrently with allocation.  This avoids repeated
 122      * postponement of waiting consumers and consequent element
 123      * build-up. The need to back away from lock during allocation
 124      * makes it impossible to simply wrap delegated
 125      * java.util.PriorityQueue operations within a lock, as was done
 126      * in a previous version of this class. To maintain
 127      * interoperability, a plain PriorityQueue is still used during
 128      * serialization, which maintains compatibility at the expense of
 129      * transiently doubling overhead.
 130      */
 131 
 132     /**
 133      * Default array capacity.
 134      */
 135     private static final int DEFAULT_INITIAL_CAPACITY = 11;
 136 
 137     /**
 138      * The maximum size of array to allocate.
 139      * Some VMs reserve some header words in an array.
 140      * Attempts to allocate larger arrays may result in
 141      * OutOfMemoryError: Requested array size exceeds VM limit
 142      */
 143     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 144 
 145     /**
 146      * Priority queue represented as a balanced binary heap: the two
 147      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 148      * priority queue is ordered by comparator, or by the elements'
 149      * natural ordering, if comparator is null: For each node n in the
 150      * heap and each descendant d of n, n <= d.  The element with the
 151      * lowest value is in queue[0], assuming the queue is nonempty.
 152      */
 153     private transient Object[] queue;
 154 
 155     /**
 156      * The number of elements in the priority queue.
 157      */
 158     private transient int size;
 159 
 160     /**
 161      * The comparator, or null if priority queue uses elements'
 162      * natural ordering.
 163      */
 164     private transient Comparator<? super E> comparator;
 165 
 166     /**
 167      * Lock used for all public operations.
 168      */
 169     private final ReentrantLock lock;
 170 
 171     /**
 172      * Condition for blocking when empty.
 173      */
 174     private final Condition notEmpty;
 175 
 176     /**
 177      * Spinlock for allocation, acquired via CAS.
 178      */
 179     private transient volatile int allocationSpinLock;
 180 
 181     /**
 182      * A plain PriorityQueue used only for serialization,
 183      * to maintain compatibility with previous versions
 184      * of this class. Non-null only during serialization/deserialization.
 185      */
 186     private PriorityQueue<E> q;
 187 
 188     /**
 189      * Creates a {@code PriorityBlockingQueue} with the default
 190      * initial capacity (11) that orders its elements according to
 191      * their {@linkplain Comparable natural ordering}.
 192      */
 193     public PriorityBlockingQueue() {
 194         this(DEFAULT_INITIAL_CAPACITY, null);
 195     }
 196 
 197     /**
 198      * Creates a {@code PriorityBlockingQueue} with the specified
 199      * initial capacity that orders its elements according to their
 200      * {@linkplain Comparable natural ordering}.
 201      *
 202      * @param initialCapacity the initial capacity for this priority queue
 203      * @throws IllegalArgumentException if {@code initialCapacity} is less
 204      *         than 1
 205      */
 206     public PriorityBlockingQueue(int initialCapacity) {
 207         this(initialCapacity, null);
 208     }
 209 
 210     /**
 211      * Creates a {@code PriorityBlockingQueue} with the specified initial
 212      * capacity that orders its elements according to the specified
 213      * comparator.
 214      *
 215      * @param initialCapacity the initial capacity for this priority queue
 216      * @param  comparator the comparator that will be used to order this
 217      *         priority queue.  If {@code null}, the {@linkplain Comparable
 218      *         natural ordering} of the elements will be used.
 219      * @throws IllegalArgumentException if {@code initialCapacity} is less
 220      *         than 1
 221      */
 222     public PriorityBlockingQueue(int initialCapacity,
 223                                  Comparator<? super E> comparator) {
 224         if (initialCapacity < 1)
 225             throw new IllegalArgumentException();
 226         this.lock = new ReentrantLock();
 227         this.notEmpty = lock.newCondition();
 228         this.comparator = comparator;
 229         this.queue = new Object[initialCapacity];
 230     }
 231 
 232     /**
 233      * Creates a {@code PriorityBlockingQueue} containing the elements
 234      * in the specified collection.  If the specified collection is a
 235      * {@link SortedSet} or a {@link PriorityQueue}, this
 236      * priority queue will be ordered according to the same ordering.
 237      * Otherwise, this priority queue will be ordered according to the
 238      * {@linkplain Comparable natural ordering} of its elements.
 239      *
 240      * @param  c the collection whose elements are to be placed
 241      *         into this priority queue
 242      * @throws ClassCastException if elements of the specified collection
 243      *         cannot be compared to one another according to the priority
 244      *         queue's ordering
 245      * @throws NullPointerException if the specified collection or any
 246      *         of its elements are null
 247      */
 248     public PriorityBlockingQueue(Collection<? extends E> c) {
 249         this.lock = new ReentrantLock();
 250         this.notEmpty = lock.newCondition();
 251         boolean heapify = true; // true if not known to be in heap order
 252         boolean screen = true;  // true if must screen for nulls
 253         if (c instanceof SortedSet<?>) {
 254             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
 255             this.comparator = (Comparator<? super E>) ss.comparator();
 256             heapify = false;
 257         }
 258         else if (c instanceof PriorityBlockingQueue<?>) {
 259             PriorityBlockingQueue<? extends E> pq =
 260                 (PriorityBlockingQueue<? extends E>) c;
 261             this.comparator = (Comparator<? super E>) pq.comparator();
 262             screen = false;
 263             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
 264                 heapify = false;
 265         }
 266         Object[] a = c.toArray();
 267         int n = a.length;
 268         // If c.toArray incorrectly doesn't return Object[], copy it.
 269         if (a.getClass() != Object[].class)
 270             a = Arrays.copyOf(a, n, Object[].class);
 271         if (screen && (n == 1 || this.comparator != null)) {
 272             for (Object elt : a)
 273                 if (elt == null)
 274                     throw new NullPointerException();
 275         }
 276         this.queue = a;
 277         this.size = n;
 278         if (heapify)
 279             heapify();
 280     }
 281 
 282     /**
 283      * Tries to grow array to accommodate at least one more element
 284      * (but normally expand by about 50%), giving up (allowing retry)
 285      * on contention (which we expect to be rare). Call only while
 286      * holding lock.
 287      *
 288      * @param array the heap array
 289      * @param oldCap the length of the array
 290      */
 291     private void tryGrow(Object[] array, int oldCap) {
 292         lock.unlock(); // must release and then re-acquire main lock
 293         Object[] newArray = null;
 294         if (allocationSpinLock == 0 &&
 295             ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
 296             try {
 297                 int newCap = oldCap + ((oldCap < 64) ?
 298                                        (oldCap + 2) : // grow faster if small
 299                                        (oldCap >> 1));
 300                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
 301                     int minCap = oldCap + 1;
 302                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
 303                         throw new OutOfMemoryError();
 304                     newCap = MAX_ARRAY_SIZE;
 305                 }
 306                 if (newCap > oldCap && queue == array)
 307                     newArray = new Object[newCap];
 308             } finally {
 309                 allocationSpinLock = 0;
 310             }
 311         }
 312         if (newArray == null) // back off if another thread is allocating
 313             Thread.yield();
 314         lock.lock();
 315         if (newArray != null && queue == array) {
 316             queue = newArray;
 317             System.arraycopy(array, 0, newArray, 0, oldCap);
 318         }
 319     }
 320 
 321     /**
 322      * Mechanics for poll().  Call only while holding lock.
 323      */
 324     private E dequeue() {
 325         int n = size - 1;
 326         if (n < 0)
 327             return null;
 328         else {
 329             Object[] array = queue;
 330             E result = (E) array[0];
 331             E x = (E) array[n];
 332             array[n] = null;
 333             Comparator<? super E> cmp = comparator;
 334             if (cmp == null)
 335                 siftDownComparable(0, x, array, n);
 336             else
 337                 siftDownUsingComparator(0, x, array, n, cmp);
 338             size = n;
 339             return result;
 340         }
 341     }
 342 
 343     /**
 344      * Inserts item x at position k, maintaining heap invariant by
 345      * promoting x up the tree until it is greater than or equal to
 346      * its parent, or is the root.
 347      *
 348      * To simplify and speed up coercions and comparisons, the
 349      * Comparable and Comparator versions are separated into different
 350      * methods that are otherwise identical. (Similarly for siftDown.)
 351      *
 352      * @param k the position to fill
 353      * @param x the item to insert
 354      * @param array the heap array
 355      */
 356     private static <T> void siftUpComparable(int k, T x, Object[] array) {
 357         Comparable<? super T> key = (Comparable<? super T>) x;
 358         while (k > 0) {
 359             int parent = (k - 1) >>> 1;
 360             Object e = array[parent];
 361             if (key.compareTo((T) e) >= 0)
 362                 break;
 363             array[k] = e;
 364             k = parent;
 365         }
 366         array[k] = key;
 367     }
 368 
 369     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
 370                                        Comparator<? super T> cmp) {
 371         while (k > 0) {
 372             int parent = (k - 1) >>> 1;
 373             Object e = array[parent];
 374             if (cmp.compare(x, (T) e) >= 0)
 375                 break;
 376             array[k] = e;
 377             k = parent;
 378         }
 379         array[k] = x;
 380     }
 381 
 382     /**
 383      * Inserts item x at position k, maintaining heap invariant by
 384      * demoting x down the tree repeatedly until it is less than or
 385      * equal to its children or is a leaf.
 386      *
 387      * @param k the position to fill
 388      * @param x the item to insert
 389      * @param array the heap array
 390      * @param n heap size
 391      */
 392     private static <T> void siftDownComparable(int k, T x, Object[] array,
 393                                                int n) {
 394         if (n > 0) {
 395             Comparable<? super T> key = (Comparable<? super T>)x;
 396             int half = n >>> 1;           // loop while a non-leaf
 397             while (k < half) {
 398                 int child = (k << 1) + 1; // assume left child is least
 399                 Object c = array[child];
 400                 int right = child + 1;
 401                 if (right < n &&
 402                     ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
 403                     c = array[child = right];
 404                 if (key.compareTo((T) c) <= 0)
 405                     break;
 406                 array[k] = c;
 407                 k = child;
 408             }
 409             array[k] = key;
 410         }
 411     }
 412 
 413     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
 414                                                     int n,
 415                                                     Comparator<? super T> cmp) {
 416         if (n > 0) {
 417             int half = n >>> 1;
 418             while (k < half) {
 419                 int child = (k << 1) + 1;
 420                 Object c = array[child];
 421                 int right = child + 1;
 422                 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
 423                     c = array[child = right];
 424                 if (cmp.compare(x, (T) c) <= 0)
 425                     break;
 426                 array[k] = c;
 427                 k = child;
 428             }
 429             array[k] = x;
 430         }
 431     }
 432 
 433     /**
 434      * Establishes the heap invariant (described above) in the entire tree,
 435      * assuming nothing about the order of the elements prior to the call.
 436      * This classic algorithm due to Floyd (1964) is known to be O(size).
 437      */
 438     private void heapify() {
 439         Object[] array = queue;
 440         int n = size, i = (n >>> 1) - 1;
 441         Comparator<? super E> cmp = comparator;
 442         if (cmp == null) {
 443             for (; i >= 0; i--)
 444                 siftDownComparable(i, (E) array[i], array, n);
 445         }
 446         else {
 447             for (; i >= 0; i--)
 448                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
 449         }
 450     }
 451 
 452     /**
 453      * Inserts the specified element into this priority queue.
 454      *
 455      * @param e the element to add
 456      * @return {@code true} (as specified by {@link Collection#add})
 457      * @throws ClassCastException if the specified element cannot be compared
 458      *         with elements currently in the priority queue according to the
 459      *         priority queue's ordering
 460      * @throws NullPointerException if the specified element is null
 461      */
 462     public boolean add(E e) {
 463         return offer(e);
 464     }
 465 
 466     /**
 467      * Inserts the specified element into this priority queue.
 468      * As the queue is unbounded, this method will never return {@code false}.
 469      *
 470      * @param e the element to add
 471      * @return {@code true} (as specified by {@link Queue#offer})
 472      * @throws ClassCastException if the specified element cannot be compared
 473      *         with elements currently in the priority queue according to the
 474      *         priority queue's ordering
 475      * @throws NullPointerException if the specified element is null
 476      */
 477     public boolean offer(E e) {
 478         if (e == null)
 479             throw new NullPointerException();
 480         final ReentrantLock lock = this.lock;
 481         lock.lock();
 482         int n, cap;
 483         Object[] array;
 484         while ((n = size) >= (cap = (array = queue).length))
 485             tryGrow(array, cap);
 486         try {
 487             Comparator<? super E> cmp = comparator;
 488             if (cmp == null)
 489                 siftUpComparable(n, e, array);
 490             else
 491                 siftUpUsingComparator(n, e, array, cmp);
 492             size = n + 1;
 493             notEmpty.signal();
 494         } finally {
 495             lock.unlock();
 496         }
 497         return true;
 498     }
 499 
 500     /**
 501      * Inserts the specified element into this priority queue.
 502      * As the queue is unbounded, this method will never block.
 503      *
 504      * @param e the element to add
 505      * @throws ClassCastException if the specified element cannot be compared
 506      *         with elements currently in the priority queue according to the
 507      *         priority queue's ordering
 508      * @throws NullPointerException if the specified element is null
 509      */
 510     public void put(E e) {
 511         offer(e); // never need to block
 512     }
 513 
 514     /**
 515      * Inserts the specified element into this priority queue.
 516      * As the queue is unbounded, this method will never block or
 517      * return {@code false}.
 518      *
 519      * @param e the element to add
 520      * @param timeout This parameter is ignored as the method never blocks
 521      * @param unit This parameter is ignored as the method never blocks
 522      * @return {@code true} (as specified by
 523      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
 524      * @throws ClassCastException if the specified element cannot be compared
 525      *         with elements currently in the priority queue according to the
 526      *         priority queue's ordering
 527      * @throws NullPointerException if the specified element is null
 528      */
 529     public boolean offer(E e, long timeout, TimeUnit unit) {
 530         return offer(e); // never need to block
 531     }
 532 
 533     public E poll() {
 534         final ReentrantLock lock = this.lock;
 535         lock.lock();
 536         try {
 537             return dequeue();
 538         } finally {
 539             lock.unlock();
 540         }
 541     }
 542 
 543     public E take() throws InterruptedException {
 544         final ReentrantLock lock = this.lock;
 545         lock.lockInterruptibly();
 546         E result;
 547         try {
 548             while ( (result = dequeue()) == null)
 549                 notEmpty.await();
 550         } finally {
 551             lock.unlock();
 552         }
 553         return result;
 554     }
 555 
 556     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 557         long nanos = unit.toNanos(timeout);
 558         final ReentrantLock lock = this.lock;
 559         lock.lockInterruptibly();
 560         E result;
 561         try {
 562             while ( (result = dequeue()) == null && nanos > 0)
 563                 nanos = notEmpty.awaitNanos(nanos);
 564         } finally {
 565             lock.unlock();
 566         }
 567         return result;
 568     }
 569 
 570     public E peek() {
 571         final ReentrantLock lock = this.lock;
 572         lock.lock();
 573         try {
 574             return (size == 0) ? null : (E) queue[0];
 575         } finally {
 576             lock.unlock();
 577         }
 578     }
 579 
 580     /**
 581      * Returns the comparator used to order the elements in this queue,
 582      * or {@code null} if this queue uses the {@linkplain Comparable
 583      * natural ordering} of its elements.
 584      *
 585      * @return the comparator used to order the elements in this queue,
 586      *         or {@code null} if this queue uses the natural
 587      *         ordering of its elements
 588      */
 589     public Comparator<? super E> comparator() {
 590         return comparator;
 591     }
 592 
 593     public int size() {
 594         final ReentrantLock lock = this.lock;
 595         lock.lock();
 596         try {
 597             return size;
 598         } finally {
 599             lock.unlock();
 600         }
 601     }
 602 
 603     /**
 604      * Always returns {@code Integer.MAX_VALUE} because
 605      * a {@code PriorityBlockingQueue} is not capacity constrained.
 606      * @return {@code Integer.MAX_VALUE} always
 607      */
 608     public int remainingCapacity() {
 609         return Integer.MAX_VALUE;
 610     }
 611 
 612     private int indexOf(Object o) {
 613         if (o != null) {
 614             Object[] array = queue;
 615             int n = size;
 616             for (int i = 0; i < n; i++)
 617                 if (o.equals(array[i]))
 618                     return i;
 619         }
 620         return -1;
 621     }
 622 
 623     /**
 624      * Removes the ith element from queue.
 625      */
 626     private void removeAt(int i) {
 627         Object[] array = queue;
 628         int n = size - 1;
 629         if (n == i) // removed last element
 630             array[i] = null;
 631         else {
 632             E moved = (E) array[n];
 633             array[n] = null;
 634             Comparator<? super E> cmp = comparator;
 635             if (cmp == null)
 636                 siftDownComparable(i, moved, array, n);
 637             else
 638                 siftDownUsingComparator(i, moved, array, n, cmp);
 639             if (array[i] == moved) {
 640                 if (cmp == null)
 641                     siftUpComparable(i, moved, array);
 642                 else
 643                     siftUpUsingComparator(i, moved, array, cmp);
 644             }
 645         }
 646         size = n;
 647     }
 648 
 649     /**
 650      * Removes a single instance of the specified element from this queue,
 651      * if it is present.  More formally, removes an element {@code e} such
 652      * that {@code o.equals(e)}, if this queue contains one or more such
 653      * elements.  Returns {@code true} if and only if this queue contained
 654      * the specified element (or equivalently, if this queue changed as a
 655      * result of the call).
 656      *
 657      * @param o element to be removed from this queue, if present
 658      * @return {@code true} if this queue changed as a result of the call
 659      */
 660     public boolean remove(Object o) {
 661         final ReentrantLock lock = this.lock;
 662         lock.lock();
 663         try {
 664             int i = indexOf(o);
 665             if (i == -1)
 666                 return false;
 667             removeAt(i);
 668             return true;
 669         } finally {
 670             lock.unlock();
 671         }
 672     }
 673 
 674     /**
 675      * Identity-based version for use in Itr.remove.
 676      */
 677     void removeEQ(Object o) {
 678         final ReentrantLock lock = this.lock;
 679         lock.lock();
 680         try {
 681             Object[] array = queue;
 682             for (int i = 0, n = size; i < n; i++) {
 683                 if (o == array[i]) {
 684                     removeAt(i);
 685                     break;
 686                 }
 687             }
 688         } finally {
 689             lock.unlock();
 690         }
 691     }
 692 
 693     /**
 694      * Returns {@code true} if this queue contains the specified element.
 695      * More formally, returns {@code true} if and only if this queue contains
 696      * at least one element {@code e} such that {@code o.equals(e)}.
 697      *
 698      * @param o object to be checked for containment in this queue
 699      * @return {@code true} if this queue contains the specified element
 700      */
 701     public boolean contains(Object o) {
 702         final ReentrantLock lock = this.lock;
 703         lock.lock();
 704         try {
 705             return indexOf(o) != -1;
 706         } finally {
 707             lock.unlock();
 708         }
 709     }
 710 
 711     public String toString() {
 712         return Helpers.collectionToString(this);
 713     }
 714 
 715     /**
 716      * @throws UnsupportedOperationException {@inheritDoc}
 717      * @throws ClassCastException            {@inheritDoc}
 718      * @throws NullPointerException          {@inheritDoc}
 719      * @throws IllegalArgumentException      {@inheritDoc}
 720      */
 721     public int drainTo(Collection<? super E> c) {
 722         return drainTo(c, Integer.MAX_VALUE);
 723     }
 724 
 725     /**
 726      * @throws UnsupportedOperationException {@inheritDoc}
 727      * @throws ClassCastException            {@inheritDoc}
 728      * @throws NullPointerException          {@inheritDoc}
 729      * @throws IllegalArgumentException      {@inheritDoc}
 730      */
 731     public int drainTo(Collection<? super E> c, int maxElements) {
 732         Objects.requireNonNull(c);
 733         if (c == this)
 734             throw new IllegalArgumentException();
 735         if (maxElements <= 0)
 736             return 0;
 737         final ReentrantLock lock = this.lock;
 738         lock.lock();
 739         try {
 740             int n = Math.min(size, maxElements);
 741             for (int i = 0; i < n; i++) {
 742                 c.add((E) queue[0]); // In this order, in case add() throws.
 743                 dequeue();
 744             }
 745             return n;
 746         } finally {
 747             lock.unlock();
 748         }
 749     }
 750 
 751     /**
 752      * Atomically removes all of the elements from this queue.
 753      * The queue will be empty after this call returns.
 754      */
 755     public void clear() {
 756         final ReentrantLock lock = this.lock;
 757         lock.lock();
 758         try {
 759             Object[] array = queue;
 760             int n = size;
 761             size = 0;
 762             for (int i = 0; i < n; i++)
 763                 array[i] = null;
 764         } finally {
 765             lock.unlock();
 766         }
 767     }
 768 
 769     /**
 770      * Returns an array containing all of the elements in this queue.
 771      * The returned array elements are in no particular order.
 772      *
 773      * <p>The returned array will be "safe" in that no references to it are
 774      * maintained by this queue.  (In other words, this method must allocate
 775      * a new array).  The caller is thus free to modify the returned array.
 776      *
 777      * <p>This method acts as bridge between array-based and collection-based
 778      * APIs.
 779      *
 780      * @return an array containing all of the elements in this queue
 781      */
 782     public Object[] toArray() {
 783         final ReentrantLock lock = this.lock;
 784         lock.lock();
 785         try {
 786             return Arrays.copyOf(queue, size);
 787         } finally {
 788             lock.unlock();
 789         }
 790     }
 791 
 792     /**
 793      * Returns an array containing all of the elements in this queue; the
 794      * runtime type of the returned array is that of the specified array.
 795      * The returned array elements are in no particular order.
 796      * If the queue fits in the specified array, it is returned therein.
 797      * Otherwise, a new array is allocated with the runtime type of the
 798      * specified array and the size of this queue.
 799      *
 800      * <p>If this queue fits in the specified array with room to spare
 801      * (i.e., the array has more elements than this queue), the element in
 802      * the array immediately following the end of the queue is set to
 803      * {@code null}.
 804      *
 805      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 806      * array-based and collection-based APIs.  Further, this method allows
 807      * precise control over the runtime type of the output array, and may,
 808      * under certain circumstances, be used to save allocation costs.
 809      *
 810      * <p>Suppose {@code x} is a queue known to contain only strings.
 811      * The following code can be used to dump the queue into a newly
 812      * allocated array of {@code String}:
 813      *
 814      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
 815      *
 816      * Note that {@code toArray(new Object[0])} is identical in function to
 817      * {@code toArray()}.
 818      *
 819      * @param a the array into which the elements of the queue are to
 820      *          be stored, if it is big enough; otherwise, a new array of the
 821      *          same runtime type is allocated for this purpose
 822      * @return an array containing all of the elements in this queue
 823      * @throws ArrayStoreException if the runtime type of the specified array
 824      *         is not a supertype of the runtime type of every element in
 825      *         this queue
 826      * @throws NullPointerException if the specified array is null
 827      */
 828     public <T> T[] toArray(T[] a) {
 829         final ReentrantLock lock = this.lock;
 830         lock.lock();
 831         try {
 832             int n = size;
 833             if (a.length < n)
 834                 // Make a new array of a's runtime type, but my contents:
 835                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
 836             System.arraycopy(queue, 0, a, 0, n);
 837             if (a.length > n)
 838                 a[n] = null;
 839             return a;
 840         } finally {
 841             lock.unlock();
 842         }
 843     }
 844 
 845     /**
 846      * Returns an iterator over the elements in this queue. The
 847      * iterator does not return the elements in any particular order.
 848      *
 849      * <p>The returned iterator is
 850      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
 851      *
 852      * @return an iterator over the elements in this queue
 853      */
 854     public Iterator<E> iterator() {
 855         return new Itr(toArray());
 856     }
 857 
 858     /**
 859      * Snapshot iterator that works off copy of underlying q array.
 860      */
 861     final class Itr implements Iterator<E> {
 862         final Object[] array; // Array of all elements
 863         int cursor;           // index of next element to return
 864         int lastRet;          // index of last element, or -1 if no such
 865 
 866         Itr(Object[] array) {
 867             lastRet = -1;
 868             this.array = array;
 869         }
 870 
 871         public boolean hasNext() {
 872             return cursor < array.length;
 873         }
 874 
 875         public E next() {
 876             if (cursor >= array.length)
 877                 throw new NoSuchElementException();
 878             return (E)array[lastRet = cursor++];
 879         }
 880 
 881         public void remove() {
 882             if (lastRet < 0)
 883                 throw new IllegalStateException();
 884             removeEQ(array[lastRet]);
 885             lastRet = -1;
 886         }
 887     }
 888 
 889     /**
 890      * Saves this queue to a stream (that is, serializes it).
 891      *
 892      * For compatibility with previous version of this class, elements
 893      * are first copied to a java.util.PriorityQueue, which is then
 894      * serialized.
 895      *
 896      * @param s the stream
 897      * @throws java.io.IOException if an I/O error occurs
 898      */
 899     private void writeObject(java.io.ObjectOutputStream s)
 900         throws java.io.IOException {
 901         lock.lock();
 902         try {
 903             // avoid zero capacity argument
 904             q = new PriorityQueue<E>(Math.max(size, 1), comparator);
 905             q.addAll(this);
 906             s.defaultWriteObject();
 907         } finally {
 908             q = null;
 909             lock.unlock();
 910         }
 911     }
 912 
 913     /**
 914      * Reconstitutes this queue from a stream (that is, deserializes it).
 915      * @param s the stream
 916      * @throws ClassNotFoundException if the class of a serialized object
 917      *         could not be found
 918      * @throws java.io.IOException if an I/O error occurs
 919      */
 920     private void readObject(java.io.ObjectInputStream s)
 921         throws java.io.IOException, ClassNotFoundException {
 922         try {
 923             s.defaultReadObject();
 924             this.queue = new Object[q.size()];
 925             comparator = q.comparator();
 926             addAll(q);
 927         } finally {
 928             q = null;
 929         }
 930     }
 931 
 932     /**
 933      * Immutable snapshot spliterator that binds to elements "late".
 934      */
 935     final class PBQSpliterator implements Spliterator<E> {
 936         Object[] array;        // null until late-bound-initialized
 937         int index;
 938         int fence;
 939 
 940         PBQSpliterator() {}
 941 
 942         PBQSpliterator(Object[] array, int index, int fence) {
 943             this.array = array;
 944             this.index = index;
 945             this.fence = fence;
 946         }
 947 
 948         private int getFence() {
 949             if (array == null)
 950                 fence = (array = toArray()).length;
 951             return fence;
 952         }
 953 
 954         public PBQSpliterator trySplit() {
 955             int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
 956             return (lo >= mid) ? null :
 957                 new PBQSpliterator(array, lo, index = mid);
 958         }
 959 
 960         public void forEachRemaining(Consumer<? super E> action) {
 961             Objects.requireNonNull(action);
 962             final int hi = getFence(), lo = index;
 963             final Object[] a = array;
 964             index = hi;                 // ensure exhaustion
 965             for (int i = lo; i < hi; i++)
 966                 action.accept((E) a[i]);
 967         }
 968 
 969         public boolean tryAdvance(Consumer<? super E> action) {
 970             Objects.requireNonNull(action);
 971             if (getFence() > index && index >= 0) {
 972                 action.accept((E) array[index++]);
 973                 return true;
 974             }
 975             return false;
 976         }
 977 
 978         public long estimateSize() { return getFence() - index; }
 979 
 980         public int characteristics() {
 981             return (Spliterator.NONNULL |
 982                     Spliterator.SIZED |
 983                     Spliterator.SUBSIZED);
 984         }
 985     }
 986 
 987     /**
 988      * Returns a {@link Spliterator} over the elements in this queue.
 989      * The spliterator does not traverse elements in any particular order
 990      * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported).
 991      *
 992      * <p>The returned spliterator is
 993      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
 994      *
 995      * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
 996      * {@link Spliterator#NONNULL}.
 997      *
 998      * @implNote
 999      * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
1000      *
1001      * @return a {@code Spliterator} over the elements in this queue
1002      * @since 1.8
1003      */
1004     public Spliterator<E> spliterator() {
1005         return new PBQSpliterator();
1006     }
1007 
1008     // VarHandle mechanics
1009     private static final VarHandle ALLOCATIONSPINLOCK;
1010     static {
1011         try {
1012             MethodHandles.Lookup l = MethodHandles.lookup();
1013             ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class,
1014                                                  "allocationSpinLock",
1015                                                  int.class);
1016         } catch (ReflectiveOperationException e) {
1017             throw new ExceptionInInitializerError(e);
1018         }
1019     }
1020 }