1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.invoke.MethodHandles;
  39 import java.lang.invoke.VarHandle;
  40 import java.util.AbstractQueue;
  41 import java.util.Arrays;
  42 import java.util.Collection;
  43 import java.util.Comparator;
  44 import java.util.Iterator;
  45 import java.util.NoSuchElementException;
  46 import java.util.Objects;
  47 import java.util.PriorityQueue;
  48 import java.util.Queue;
  49 import java.util.SortedSet;
  50 import java.util.Spliterator;
  51 import java.util.concurrent.locks.Condition;
  52 import java.util.concurrent.locks.ReentrantLock;
  53 import java.util.function.Consumer;
  54 import java.util.function.Predicate;
  55 import jdk.internal.misc.SharedSecrets;
  56 
  57 /**
  58  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  59  * the same ordering rules as class {@link PriorityQueue} and supplies
  60  * blocking retrieval operations.  While this queue is logically
  61  * unbounded, attempted additions may fail due to resource exhaustion
  62  * (causing {@code OutOfMemoryError}). This class does not permit
  63  * {@code null} elements.  A priority queue relying on {@linkplain
  64  * Comparable natural ordering} also does not permit insertion of
  65  * non-comparable objects (doing so results in
  66  * {@code ClassCastException}).
  67  *
  68  * <p>This class and its iterator implement all of the <em>optional</em>
  69  * methods of the {@link Collection} and {@link Iterator} interfaces.
  70  * The Iterator provided in method {@link #iterator()} and the
  71  * Spliterator provided in method {@link #spliterator()} are <em>not</em>
  72  * guaranteed to traverse the elements of the PriorityBlockingQueue in
  73  * any particular order. If you need ordered traversal, consider using
  74  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo} can
  75  * be used to <em>remove</em> some or all elements in priority order and
  76  * place them in another collection.
  77  *
  78  * <p>Operations on this class make no guarantees about the ordering
  79  * of elements with equal priority. If you need to enforce an
  80  * ordering, you can define custom classes or comparators that use a
  81  * secondary key to break ties in primary priority values.  For
  82  * example, here is a class that applies first-in-first-out
  83  * tie-breaking to comparable elements. To use it, you would insert a
  84  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
  85  *
  86  * <pre> {@code
  87  * class FIFOEntry<E extends Comparable<? super E>>
  88  *     implements Comparable<FIFOEntry<E>> {
  89  *   static final AtomicLong seq = new AtomicLong(0);
  90  *   final long seqNum;
  91  *   final E entry;
  92  *   public FIFOEntry(E entry) {
  93  *     seqNum = seq.getAndIncrement();
  94  *     this.entry = entry;
  95  *   }
  96  *   public E getEntry() { return entry; }
  97  *   public int compareTo(FIFOEntry<E> other) {
  98  *     int res = entry.compareTo(other.entry);
  99  *     if (res == 0 && other.entry != this.entry)
 100  *       res = (seqNum < other.seqNum ? -1 : 1);
 101  *     return res;
 102  *   }
 103  * }}</pre>
 104  *
 105  * <p>This class is a member of the
 106  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
 107  * Java Collections Framework</a>.
 108  *
 109  * @since 1.5
 110  * @author Doug Lea
 111  * @param <E> the type of elements held in this queue
 112  */
 113 @SuppressWarnings("unchecked")
 114 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
 115     implements BlockingQueue<E>, java.io.Serializable {
 116     private static final long serialVersionUID = 5595510919245408276L;
 117 
 118     /*
 119      * The implementation uses an array-based binary heap, with public
 120      * operations protected with a single lock. However, allocation
 121      * during resizing uses a simple spinlock (used only while not
 122      * holding main lock) in order to allow takes to operate
 123      * concurrently with allocation.  This avoids repeated
 124      * postponement of waiting consumers and consequent element
 125      * build-up. The need to back away from lock during allocation
 126      * makes it impossible to simply wrap delegated
 127      * java.util.PriorityQueue operations within a lock, as was done
 128      * in a previous version of this class. To maintain
 129      * interoperability, a plain PriorityQueue is still used during
 130      * serialization, which maintains compatibility at the expense of
 131      * transiently doubling overhead.
 132      */
 133 
 134     /**
 135      * Default array capacity.
 136      */
 137     private static final int DEFAULT_INITIAL_CAPACITY = 11;
 138 
 139     /**
 140      * The maximum size of array to allocate.
 141      * Some VMs reserve some header words in an array.
 142      * Attempts to allocate larger arrays may result in
 143      * OutOfMemoryError: Requested array size exceeds VM limit
 144      */
 145     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 146 
 147     /**
 148      * Priority queue represented as a balanced binary heap: the two
 149      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 150      * priority queue is ordered by comparator, or by the elements'
 151      * natural ordering, if comparator is null: For each node n in the
 152      * heap and each descendant d of n, n <= d.  The element with the
 153      * lowest value is in queue[0], assuming the queue is nonempty.
 154      */
 155     private transient Object[] queue;
 156 
 157     /**
 158      * The number of elements in the priority queue.
 159      */
 160     private transient int size;
 161 
 162     /**
 163      * The comparator, or null if priority queue uses elements'
 164      * natural ordering.
 165      */
 166     private transient Comparator<? super E> comparator;
 167 
 168     /**
 169      * Lock used for all public operations.
 170      */
 171     private final ReentrantLock lock = new ReentrantLock();
 172 
 173     /**
 174      * Condition for blocking when empty.
 175      */
 176     private final Condition notEmpty = lock.newCondition();
 177 
 178     /**
 179      * Spinlock for allocation, acquired via CAS.
 180      */
 181     private transient volatile int allocationSpinLock;
 182 
 183     /**
 184      * A plain PriorityQueue used only for serialization,
 185      * to maintain compatibility with previous versions
 186      * of this class. Non-null only during serialization/deserialization.
 187      */
 188     private PriorityQueue<E> q;
 189 
 190     /**
 191      * Creates a {@code PriorityBlockingQueue} with the default
 192      * initial capacity (11) that orders its elements according to
 193      * their {@linkplain Comparable natural ordering}.
 194      */
 195     public PriorityBlockingQueue() {
 196         this(DEFAULT_INITIAL_CAPACITY, null);
 197     }
 198 
 199     /**
 200      * Creates a {@code PriorityBlockingQueue} with the specified
 201      * initial capacity that orders its elements according to their
 202      * {@linkplain Comparable natural ordering}.
 203      *
 204      * @param initialCapacity the initial capacity for this priority queue
 205      * @throws IllegalArgumentException if {@code initialCapacity} is less
 206      *         than 1
 207      */
 208     public PriorityBlockingQueue(int initialCapacity) {
 209         this(initialCapacity, null);
 210     }
 211 
 212     /**
 213      * Creates a {@code PriorityBlockingQueue} with the specified initial
 214      * capacity that orders its elements according to the specified
 215      * comparator.
 216      *
 217      * @param initialCapacity the initial capacity for this priority queue
 218      * @param  comparator the comparator that will be used to order this
 219      *         priority queue.  If {@code null}, the {@linkplain Comparable
 220      *         natural ordering} of the elements will be used.
 221      * @throws IllegalArgumentException if {@code initialCapacity} is less
 222      *         than 1
 223      */
 224     public PriorityBlockingQueue(int initialCapacity,
 225                                  Comparator<? super E> comparator) {
 226         if (initialCapacity < 1)
 227             throw new IllegalArgumentException();
 228         this.comparator = comparator;
 229         this.queue = new Object[Math.max(1, initialCapacity)];
 230     }
 231 
 232     /**
 233      * Creates a {@code PriorityBlockingQueue} containing the elements
 234      * in the specified collection.  If the specified collection is a
 235      * {@link SortedSet} or a {@link PriorityQueue}, this
 236      * priority queue will be ordered according to the same ordering.
 237      * Otherwise, this priority queue will be ordered according to the
 238      * {@linkplain Comparable natural ordering} of its elements.
 239      *
 240      * @param  c the collection whose elements are to be placed
 241      *         into this priority queue
 242      * @throws ClassCastException if elements of the specified collection
 243      *         cannot be compared to one another according to the priority
 244      *         queue's ordering
 245      * @throws NullPointerException if the specified collection or any
 246      *         of its elements are null
 247      */
 248     public PriorityBlockingQueue(Collection<? extends E> c) {
 249         boolean heapify = true; // true if not known to be in heap order
 250         boolean screen = true;  // true if must screen for nulls
 251         if (c instanceof SortedSet<?>) {
 252             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
 253             this.comparator = (Comparator<? super E>) ss.comparator();
 254             heapify = false;
 255         }
 256         else if (c instanceof PriorityBlockingQueue<?>) {
 257             PriorityBlockingQueue<? extends E> pq =
 258                 (PriorityBlockingQueue<? extends E>) c;
 259             this.comparator = (Comparator<? super E>) pq.comparator();
 260             screen = false;
 261             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
 262                 heapify = false;
 263         }
 264         Object[] es = c.toArray();
 265         int n = es.length;
 266         // If c.toArray incorrectly doesn't return Object[], copy it.
 267         if (es.getClass() != Object[].class)
 268             es = Arrays.copyOf(es, n, Object[].class);
 269         if (screen && (n == 1 || this.comparator != null)) {
 270             for (Object e : es)
 271                 if (e == null)
 272                     throw new NullPointerException();
 273         }
 274         this.queue = ensureNonEmpty(es);
 275         this.size = n;
 276         if (heapify)
 277             heapify();
 278     }
 279 
 280     /** Ensures that queue[0] exists, helping peek() and poll(). */
 281     private static Object[] ensureNonEmpty(Object[] es) {
 282         return (es.length > 0) ? es : new Object[1];
 283     }
 284 
 285     /**
 286      * Tries to grow array to accommodate at least one more element
 287      * (but normally expand by about 50%), giving up (allowing retry)
 288      * on contention (which we expect to be rare). Call only while
 289      * holding lock.
 290      *
 291      * @param array the heap array
 292      * @param oldCap the length of the array
 293      */
 294     private void tryGrow(Object[] array, int oldCap) {
 295         lock.unlock(); // must release and then re-acquire main lock
 296         Object[] newArray = null;
 297         if (allocationSpinLock == 0 &&
 298             ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
 299             try {
 300                 int newCap = oldCap + ((oldCap < 64) ?
 301                                        (oldCap + 2) : // grow faster if small
 302                                        (oldCap >> 1));
 303                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
 304                     int minCap = oldCap + 1;
 305                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
 306                         throw new OutOfMemoryError();
 307                     newCap = MAX_ARRAY_SIZE;
 308                 }
 309                 if (newCap > oldCap && queue == array)
 310                     newArray = new Object[newCap];
 311             } finally {
 312                 allocationSpinLock = 0;
 313             }
 314         }
 315         if (newArray == null) // back off if another thread is allocating
 316             Thread.yield();
 317         lock.lock();
 318         if (newArray != null && queue == array) {
 319             queue = newArray;
 320             System.arraycopy(array, 0, newArray, 0, oldCap);
 321         }
 322     }
 323 
 324     /**
 325      * Mechanics for poll().  Call only while holding lock.
 326      */
 327     private E dequeue() {
 328         // assert lock.isHeldByCurrentThread();
 329         final Object[] es;
 330         final E result;
 331 
 332         if ((result = (E) ((es = queue)[0])) != null) {
 333             final int n;
 334             final E x = (E) es[(n = --size)];
 335             es[n] = null;
 336             if (n > 0) {
 337                 final Comparator<? super E> cmp;
 338                 if ((cmp = comparator) == null)
 339                     siftDownComparable(0, x, es, n);
 340                 else
 341                     siftDownUsingComparator(0, x, es, n, cmp);
 342             }
 343         }
 344         return result;
 345     }
 346 
 347     /**
 348      * Inserts item x at position k, maintaining heap invariant by
 349      * promoting x up the tree until it is greater than or equal to
 350      * its parent, or is the root.
 351      *
 352      * To simplify and speed up coercions and comparisons, the
 353      * Comparable and Comparator versions are separated into different
 354      * methods that are otherwise identical. (Similarly for siftDown.)
 355      *
 356      * @param k the position to fill
 357      * @param x the item to insert
 358      * @param es the heap array
 359      */
 360     private static <T> void siftUpComparable(int k, T x, Object[] es) {
 361         Comparable<? super T> key = (Comparable<? super T>) x;
 362         while (k > 0) {
 363             int parent = (k - 1) >>> 1;
 364             Object e = es[parent];
 365             if (key.compareTo((T) e) >= 0)
 366                 break;
 367             es[k] = e;
 368             k = parent;
 369         }
 370         es[k] = key;
 371     }
 372 
 373     private static <T> void siftUpUsingComparator(
 374         int k, T x, Object[] es, Comparator<? super T> cmp) {
 375         while (k > 0) {
 376             int parent = (k - 1) >>> 1;
 377             Object e = es[parent];
 378             if (cmp.compare(x, (T) e) >= 0)
 379                 break;
 380             es[k] = e;
 381             k = parent;
 382         }
 383         es[k] = x;
 384     }
 385 
 386     /**
 387      * Inserts item x at position k, maintaining heap invariant by
 388      * demoting x down the tree repeatedly until it is less than or
 389      * equal to its children or is a leaf.
 390      *
 391      * @param k the position to fill
 392      * @param x the item to insert
 393      * @param es the heap array
 394      * @param n heap size
 395      */
 396     private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
 397         // assert n > 0;
 398         Comparable<? super T> key = (Comparable<? super T>)x;
 399         int half = n >>> 1;           // loop while a non-leaf
 400         while (k < half) {
 401             int child = (k << 1) + 1; // assume left child is least
 402             Object c = es[child];
 403             int right = child + 1;
 404             if (right < n &&
 405                 ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
 406                 c = es[child = right];
 407             if (key.compareTo((T) c) <= 0)
 408                 break;
 409             es[k] = c;
 410             k = child;
 411         }
 412         es[k] = key;
 413     }
 414 
 415     private static <T> void siftDownUsingComparator(
 416         int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
 417         // assert n > 0;
 418         int half = n >>> 1;
 419         while (k < half) {
 420             int child = (k << 1) + 1;
 421             Object c = es[child];
 422             int right = child + 1;
 423             if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
 424                 c = es[child = right];
 425             if (cmp.compare(x, (T) c) <= 0)
 426                 break;
 427             es[k] = c;
 428             k = child;
 429         }
 430         es[k] = x;
 431     }
 432 
 433     /**
 434      * Establishes the heap invariant (described above) in the entire tree,
 435      * assuming nothing about the order of the elements prior to the call.
 436      * This classic algorithm due to Floyd (1964) is known to be O(size).
 437      */
 438     private void heapify() {
 439         final Object[] es = queue;
 440         int n = size, i = (n >>> 1) - 1;
 441         final Comparator<? super E> cmp;
 442         if ((cmp = comparator) == null)
 443             for (; i >= 0; i--)
 444                 siftDownComparable(i, (E) es[i], es, n);
 445         else
 446             for (; i >= 0; i--)
 447                 siftDownUsingComparator(i, (E) es[i], es, n, cmp);
 448     }
 449 
 450     /**
 451      * Inserts the specified element into this priority queue.
 452      *
 453      * @param e the element to add
 454      * @return {@code true} (as specified by {@link Collection#add})
 455      * @throws ClassCastException if the specified element cannot be compared
 456      *         with elements currently in the priority queue according to the
 457      *         priority queue's ordering
 458      * @throws NullPointerException if the specified element is null
 459      */
 460     public boolean add(E e) {
 461         return offer(e);
 462     }
 463 
 464     /**
 465      * Inserts the specified element into this priority queue.
 466      * As the queue is unbounded, this method will never return {@code false}.
 467      *
 468      * @param e the element to add
 469      * @return {@code true} (as specified by {@link Queue#offer})
 470      * @throws ClassCastException if the specified element cannot be compared
 471      *         with elements currently in the priority queue according to the
 472      *         priority queue's ordering
 473      * @throws NullPointerException if the specified element is null
 474      */
 475     public boolean offer(E e) {
 476         if (e == null)
 477             throw new NullPointerException();
 478         final ReentrantLock lock = this.lock;
 479         lock.lock();
 480         int n, cap;
 481         Object[] es;
 482         while ((n = size) >= (cap = (es = queue).length))
 483             tryGrow(es, cap);
 484         try {
 485             final Comparator<? super E> cmp;
 486             if ((cmp = comparator) == null)
 487                 siftUpComparable(n, e, es);
 488             else
 489                 siftUpUsingComparator(n, e, es, cmp);
 490             size = n + 1;
 491             notEmpty.signal();
 492         } finally {
 493             lock.unlock();
 494         }
 495         return true;
 496     }
 497 
 498     /**
 499      * Inserts the specified element into this priority queue.
 500      * As the queue is unbounded, this method will never block.
 501      *
 502      * @param e the element to add
 503      * @throws ClassCastException if the specified element cannot be compared
 504      *         with elements currently in the priority queue according to the
 505      *         priority queue's ordering
 506      * @throws NullPointerException if the specified element is null
 507      */
 508     public void put(E e) {
 509         offer(e); // never need to block
 510     }
 511 
 512     /**
 513      * Inserts the specified element into this priority queue.
 514      * As the queue is unbounded, this method will never block or
 515      * return {@code false}.
 516      *
 517      * @param e the element to add
 518      * @param timeout This parameter is ignored as the method never blocks
 519      * @param unit This parameter is ignored as the method never blocks
 520      * @return {@code true} (as specified by
 521      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
 522      * @throws ClassCastException if the specified element cannot be compared
 523      *         with elements currently in the priority queue according to the
 524      *         priority queue's ordering
 525      * @throws NullPointerException if the specified element is null
 526      */
 527     public boolean offer(E e, long timeout, TimeUnit unit) {
 528         return offer(e); // never need to block
 529     }
 530 
 531     public E poll() {
 532         final ReentrantLock lock = this.lock;
 533         lock.lock();
 534         try {
 535             return dequeue();
 536         } finally {
 537             lock.unlock();
 538         }
 539     }
 540 
 541     public E take() throws InterruptedException {
 542         final ReentrantLock lock = this.lock;
 543         lock.lockInterruptibly();
 544         E result;
 545         try {
 546             while ( (result = dequeue()) == null)
 547                 notEmpty.await();
 548         } finally {
 549             lock.unlock();
 550         }
 551         return result;
 552     }
 553 
 554     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 555         long nanos = unit.toNanos(timeout);
 556         final ReentrantLock lock = this.lock;
 557         lock.lockInterruptibly();
 558         E result;
 559         try {
 560             while ( (result = dequeue()) == null && nanos > 0)
 561                 nanos = notEmpty.awaitNanos(nanos);
 562         } finally {
 563             lock.unlock();
 564         }
 565         return result;
 566     }
 567 
 568     public E peek() {
 569         final ReentrantLock lock = this.lock;
 570         lock.lock();
 571         try {
 572             return (E) queue[0];
 573         } finally {
 574             lock.unlock();
 575         }
 576     }
 577 
 578     /**
 579      * Returns the comparator used to order the elements in this queue,
 580      * or {@code null} if this queue uses the {@linkplain Comparable
 581      * natural ordering} of its elements.
 582      *
 583      * @return the comparator used to order the elements in this queue,
 584      *         or {@code null} if this queue uses the natural
 585      *         ordering of its elements
 586      */
 587     public Comparator<? super E> comparator() {
 588         return comparator;
 589     }
 590 
 591     public int size() {
 592         final ReentrantLock lock = this.lock;
 593         lock.lock();
 594         try {
 595             return size;
 596         } finally {
 597             lock.unlock();
 598         }
 599     }
 600 
 601     /**
 602      * Always returns {@code Integer.MAX_VALUE} because
 603      * a {@code PriorityBlockingQueue} is not capacity constrained.
 604      * @return {@code Integer.MAX_VALUE} always
 605      */
 606     public int remainingCapacity() {
 607         return Integer.MAX_VALUE;
 608     }
 609 
 610     private int indexOf(Object o) {
 611         if (o != null) {
 612             final Object[] es = queue;
 613             for (int i = 0, n = size; i < n; i++)
 614                 if (o.equals(es[i]))
 615                     return i;
 616         }
 617         return -1;
 618     }
 619 
 620     /**
 621      * Removes the ith element from queue.
 622      */
 623     private void removeAt(int i) {
 624         final Object[] es = queue;
 625         final int n = size - 1;
 626         if (n == i) // removed last element
 627             es[i] = null;
 628         else {
 629             E moved = (E) es[n];
 630             es[n] = null;
 631             final Comparator<? super E> cmp;
 632             if ((cmp = comparator) == null)
 633                 siftDownComparable(i, moved, es, n);
 634             else
 635                 siftDownUsingComparator(i, moved, es, n, cmp);
 636             if (es[i] == moved) {
 637                 if (cmp == null)
 638                     siftUpComparable(i, moved, es);
 639                 else
 640                     siftUpUsingComparator(i, moved, es, cmp);
 641             }
 642         }
 643         size = n;
 644     }
 645 
 646     /**
 647      * Removes a single instance of the specified element from this queue,
 648      * if it is present.  More formally, removes an element {@code e} such
 649      * that {@code o.equals(e)}, if this queue contains one or more such
 650      * elements.  Returns {@code true} if and only if this queue contained
 651      * the specified element (or equivalently, if this queue changed as a
 652      * result of the call).
 653      *
 654      * @param o element to be removed from this queue, if present
 655      * @return {@code true} if this queue changed as a result of the call
 656      */
 657     public boolean remove(Object o) {
 658         final ReentrantLock lock = this.lock;
 659         lock.lock();
 660         try {
 661             int i = indexOf(o);
 662             if (i == -1)
 663                 return false;
 664             removeAt(i);
 665             return true;
 666         } finally {
 667             lock.unlock();
 668         }
 669     }
 670 
 671     /**
 672      * Identity-based version for use in Itr.remove.
 673      *
 674      * @param o element to be removed from this queue, if present
 675      */
 676     void removeEq(Object o) {
 677         final ReentrantLock lock = this.lock;
 678         lock.lock();
 679         try {
 680             final Object[] es = queue;
 681             for (int i = 0, n = size; i < n; i++) {
 682                 if (o == es[i]) {
 683                     removeAt(i);
 684                     break;
 685                 }
 686             }
 687         } finally {
 688             lock.unlock();
 689         }
 690     }
 691 
 692     /**
 693      * Returns {@code true} if this queue contains the specified element.
 694      * More formally, returns {@code true} if and only if this queue contains
 695      * at least one element {@code e} such that {@code o.equals(e)}.
 696      *
 697      * @param o object to be checked for containment in this queue
 698      * @return {@code true} if this queue contains the specified element
 699      */
 700     public boolean contains(Object o) {
 701         final ReentrantLock lock = this.lock;
 702         lock.lock();
 703         try {
 704             return indexOf(o) != -1;
 705         } finally {
 706             lock.unlock();
 707         }
 708     }
 709 
 710     public String toString() {
 711         return Helpers.collectionToString(this);
 712     }
 713 
 714     /**
 715      * @throws UnsupportedOperationException {@inheritDoc}
 716      * @throws ClassCastException            {@inheritDoc}
 717      * @throws NullPointerException          {@inheritDoc}
 718      * @throws IllegalArgumentException      {@inheritDoc}
 719      */
 720     public int drainTo(Collection<? super E> c) {
 721         return drainTo(c, Integer.MAX_VALUE);
 722     }
 723 
 724     /**
 725      * @throws UnsupportedOperationException {@inheritDoc}
 726      * @throws ClassCastException            {@inheritDoc}
 727      * @throws NullPointerException          {@inheritDoc}
 728      * @throws IllegalArgumentException      {@inheritDoc}
 729      */
 730     public int drainTo(Collection<? super E> c, int maxElements) {
 731         Objects.requireNonNull(c);
 732         if (c == this)
 733             throw new IllegalArgumentException();
 734         if (maxElements <= 0)
 735             return 0;
 736         final ReentrantLock lock = this.lock;
 737         lock.lock();
 738         try {
 739             int n = Math.min(size, maxElements);
 740             for (int i = 0; i < n; i++) {
 741                 c.add((E) queue[0]); // In this order, in case add() throws.
 742                 dequeue();
 743             }
 744             return n;
 745         } finally {
 746             lock.unlock();
 747         }
 748     }
 749 
 750     /**
 751      * Atomically removes all of the elements from this queue.
 752      * The queue will be empty after this call returns.
 753      */
 754     public void clear() {
 755         final ReentrantLock lock = this.lock;
 756         lock.lock();
 757         try {
 758             final Object[] es = queue;
 759             for (int i = 0, n = size; i < n; i++)
 760                 es[i] = null;
 761             size = 0;
 762         } finally {
 763             lock.unlock();
 764         }
 765     }
 766 
 767     /**
 768      * Returns an array containing all of the elements in this queue.
 769      * The returned array elements are in no particular order.
 770      *
 771      * <p>The returned array will be "safe" in that no references to it are
 772      * maintained by this queue.  (In other words, this method must allocate
 773      * a new array).  The caller is thus free to modify the returned array.
 774      *
 775      * <p>This method acts as bridge between array-based and collection-based
 776      * APIs.
 777      *
 778      * @return an array containing all of the elements in this queue
 779      */
 780     public Object[] toArray() {
 781         final ReentrantLock lock = this.lock;
 782         lock.lock();
 783         try {
 784             return Arrays.copyOf(queue, size);
 785         } finally {
 786             lock.unlock();
 787         }
 788     }
 789 
 790     /**
 791      * Returns an array containing all of the elements in this queue; the
 792      * runtime type of the returned array is that of the specified array.
 793      * The returned array elements are in no particular order.
 794      * If the queue fits in the specified array, it is returned therein.
 795      * Otherwise, a new array is allocated with the runtime type of the
 796      * specified array and the size of this queue.
 797      *
 798      * <p>If this queue fits in the specified array with room to spare
 799      * (i.e., the array has more elements than this queue), the element in
 800      * the array immediately following the end of the queue is set to
 801      * {@code null}.
 802      *
 803      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 804      * array-based and collection-based APIs.  Further, this method allows
 805      * precise control over the runtime type of the output array, and may,
 806      * under certain circumstances, be used to save allocation costs.
 807      *
 808      * <p>Suppose {@code x} is a queue known to contain only strings.
 809      * The following code can be used to dump the queue into a newly
 810      * allocated array of {@code String}:
 811      *
 812      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
 813      *
 814      * Note that {@code toArray(new Object[0])} is identical in function to
 815      * {@code toArray()}.
 816      *
 817      * @param a the array into which the elements of the queue are to
 818      *          be stored, if it is big enough; otherwise, a new array of the
 819      *          same runtime type is allocated for this purpose
 820      * @return an array containing all of the elements in this queue
 821      * @throws ArrayStoreException if the runtime type of the specified array
 822      *         is not a supertype of the runtime type of every element in
 823      *         this queue
 824      * @throws NullPointerException if the specified array is null
 825      */
 826     public <T> T[] toArray(T[] a) {
 827         final ReentrantLock lock = this.lock;
 828         lock.lock();
 829         try {
 830             int n = size;
 831             if (a.length < n)
 832                 // Make a new array of a's runtime type, but my contents:
 833                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
 834             System.arraycopy(queue, 0, a, 0, n);
 835             if (a.length > n)
 836                 a[n] = null;
 837             return a;
 838         } finally {
 839             lock.unlock();
 840         }
 841     }
 842 
 843     /**
 844      * Returns an iterator over the elements in this queue. The
 845      * iterator does not return the elements in any particular order.
 846      *
 847      * <p>The returned iterator is
 848      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
 849      *
 850      * @return an iterator over the elements in this queue
 851      */
 852     public Iterator<E> iterator() {
 853         return new Itr(toArray());
 854     }
 855 
 856     /**
 857      * Snapshot iterator that works off copy of underlying q array.
 858      */
 859     final class Itr implements Iterator<E> {
 860         final Object[] array; // Array of all elements
 861         int cursor;           // index of next element to return
 862         int lastRet = -1;     // index of last element, or -1 if no such
 863 
 864         Itr(Object[] array) {
 865             this.array = array;
 866         }
 867 
 868         public boolean hasNext() {
 869             return cursor < array.length;
 870         }
 871 
 872         public E next() {
 873             if (cursor >= array.length)
 874                 throw new NoSuchElementException();
 875             return (E)array[lastRet = cursor++];
 876         }
 877 
 878         public void remove() {
 879             if (lastRet < 0)
 880                 throw new IllegalStateException();
 881             removeEq(array[lastRet]);
 882             lastRet = -1;
 883         }
 884 
 885         public void forEachRemaining(Consumer<? super E> action) {
 886             Objects.requireNonNull(action);
 887             final Object[] es = array;
 888             int i;
 889             if ((i = cursor) < es.length) {
 890                 lastRet = -1;
 891                 cursor = es.length;
 892                 for (; i < es.length; i++)
 893                     action.accept((E) es[i]);
 894                 lastRet = es.length - 1;
 895             }
 896         }
 897     }
 898 
 899     /**
 900      * Saves this queue to a stream (that is, serializes it).
 901      *
 902      * For compatibility with previous version of this class, elements
 903      * are first copied to a java.util.PriorityQueue, which is then
 904      * serialized.
 905      *
 906      * @param s the stream
 907      * @throws java.io.IOException if an I/O error occurs
 908      */
 909     private void writeObject(java.io.ObjectOutputStream s)
 910         throws java.io.IOException {
 911         lock.lock();
 912         try {
 913             // avoid zero capacity argument
 914             q = new PriorityQueue<E>(Math.max(size, 1), comparator);
 915             q.addAll(this);
 916             s.defaultWriteObject();
 917         } finally {
 918             q = null;
 919             lock.unlock();
 920         }
 921     }
 922 
 923     /**
 924      * Reconstitutes this queue from a stream (that is, deserializes it).
 925      * @param s the stream
 926      * @throws ClassNotFoundException if the class of a serialized object
 927      *         could not be found
 928      * @throws java.io.IOException if an I/O error occurs
 929      */
 930     private void readObject(java.io.ObjectInputStream s)
 931         throws java.io.IOException, ClassNotFoundException {
 932         try {
 933             s.defaultReadObject();
 934             int sz = q.size();
 935             SharedSecrets.getJavaObjectInputStreamAccess().checkArray(s, Object[].class, sz);
 936             this.queue = new Object[Math.max(1, sz)];
 937             comparator = q.comparator();
 938             addAll(q);
 939         } finally {
 940             q = null;
 941         }
 942     }
 943 
 944     /**
 945      * Immutable snapshot spliterator that binds to elements "late".
 946      */
 947     final class PBQSpliterator implements Spliterator<E> {
 948         Object[] array;        // null until late-bound-initialized
 949         int index;
 950         int fence;
 951 
 952         PBQSpliterator() {}
 953 
 954         PBQSpliterator(Object[] array, int index, int fence) {
 955             this.array = array;
 956             this.index = index;
 957             this.fence = fence;
 958         }
 959 
 960         private int getFence() {
 961             if (array == null)
 962                 fence = (array = toArray()).length;
 963             return fence;
 964         }
 965 
 966         public PBQSpliterator trySplit() {
 967             int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
 968             return (lo >= mid) ? null :
 969                 new PBQSpliterator(array, lo, index = mid);
 970         }
 971 
 972         public void forEachRemaining(Consumer<? super E> action) {
 973             Objects.requireNonNull(action);
 974             final int hi = getFence(), lo = index;
 975             final Object[] es = array;
 976             index = hi;                 // ensure exhaustion
 977             for (int i = lo; i < hi; i++)
 978                 action.accept((E) es[i]);
 979         }
 980 
 981         public boolean tryAdvance(Consumer<? super E> action) {
 982             Objects.requireNonNull(action);
 983             if (getFence() > index && index >= 0) {
 984                 action.accept((E) array[index++]);
 985                 return true;
 986             }
 987             return false;
 988         }
 989 
 990         public long estimateSize() { return getFence() - index; }
 991 
 992         public int characteristics() {
 993             return (Spliterator.NONNULL |
 994                     Spliterator.SIZED |
 995                     Spliterator.SUBSIZED);
 996         }
 997     }
 998 
 999     /**
1000      * Returns a {@link Spliterator} over the elements in this queue.
1001      * The spliterator does not traverse elements in any particular order
1002      * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported).
1003      *
1004      * <p>The returned spliterator is
1005      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1006      *
1007      * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
1008      * {@link Spliterator#NONNULL}.
1009      *
1010      * @implNote
1011      * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
1012      *
1013      * @return a {@code Spliterator} over the elements in this queue
1014      * @since 1.8
1015      */
1016     public Spliterator<E> spliterator() {
1017         return new PBQSpliterator();
1018     }
1019 
1020     /**
1021      * @throws NullPointerException {@inheritDoc}
1022      */
1023     public boolean removeIf(Predicate<? super E> filter) {
1024         Objects.requireNonNull(filter);
1025         return bulkRemove(filter);
1026     }
1027 
1028     /**
1029      * @throws NullPointerException {@inheritDoc}
1030      */
1031     public boolean removeAll(Collection<?> c) {
1032         Objects.requireNonNull(c);
1033         return bulkRemove(e -> c.contains(e));
1034     }
1035 
1036     /**
1037      * @throws NullPointerException {@inheritDoc}
1038      */
1039     public boolean retainAll(Collection<?> c) {
1040         Objects.requireNonNull(c);
1041         return bulkRemove(e -> !c.contains(e));
1042     }
1043 
1044     // A tiny bit set implementation
1045 
1046     private static long[] nBits(int n) {
1047         return new long[((n - 1) >> 6) + 1];
1048     }
1049     private static void setBit(long[] bits, int i) {
1050         bits[i >> 6] |= 1L << i;
1051     }
1052     private static boolean isClear(long[] bits, int i) {
1053         return (bits[i >> 6] & (1L << i)) == 0;
1054     }
1055 
1056     /** Implementation of bulk remove methods. */
1057     private boolean bulkRemove(Predicate<? super E> filter) {
1058         final ReentrantLock lock = this.lock;
1059         lock.lock();
1060         try {
1061             final Object[] es = queue;
1062             final int end = size;
1063             int i;
1064             // Optimize for initial run of survivors
1065             for (i = 0; i < end && !filter.test((E) es[i]); i++)
1066                 ;
1067             if (i >= end)
1068                 return false;
1069             // Tolerate predicates that reentrantly access the
1070             // collection for read, so traverse once to find elements
1071             // to delete, a second pass to physically expunge.
1072             final int beg = i;
1073             final long[] deathRow = nBits(end - beg);
1074             deathRow[0] = 1L;   // set bit 0
1075             for (i = beg + 1; i < end; i++)
1076                 if (filter.test((E) es[i]))
1077                     setBit(deathRow, i - beg);
1078             int w = beg;
1079             for (i = beg; i < end; i++)
1080                 if (isClear(deathRow, i - beg))
1081                     es[w++] = es[i];
1082             for (i = size = w; i < end; i++)
1083                 es[i] = null;
1084             heapify();
1085             return true;
1086         } finally {
1087             lock.unlock();
1088         }
1089     }
1090 
1091     /**
1092      * @throws NullPointerException {@inheritDoc}
1093      */
1094     public void forEach(Consumer<? super E> action) {
1095         Objects.requireNonNull(action);
1096         final ReentrantLock lock = this.lock;
1097         lock.lock();
1098         try {
1099             final Object[] es = queue;
1100             for (int i = 0, n = size; i < n; i++)
1101                 action.accept((E) es[i]);
1102         } finally {
1103             lock.unlock();
1104         }
1105     }
1106 
1107     // VarHandle mechanics
1108     private static final VarHandle ALLOCATIONSPINLOCK;
1109     static {
1110         try {
1111             MethodHandles.Lookup l = MethodHandles.lookup();
1112             ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class,
1113                                                  "allocationSpinLock",
1114                                                  int.class);
1115         } catch (ReflectiveOperationException e) {
1116             throw new ExceptionInInitializerError(e);
1117         }
1118     }
1119 }