1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.AbstractQueue;
  39 import java.util.Arrays;
  40 import java.util.Collection;
  41 import java.util.Comparator;
  42 import java.util.Iterator;
  43 import java.util.NoSuchElementException;
  44 import java.util.PriorityQueue;
  45 import java.util.Queue;
  46 import java.util.SortedSet;
  47 import java.util.Spliterator;
  48 import java.util.concurrent.locks.Condition;
  49 import java.util.concurrent.locks.ReentrantLock;
  50 import java.util.function.Consumer;
  51 
  52 /**
  53  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  54  * the same ordering rules as class {@link PriorityQueue} and supplies
  55  * blocking retrieval operations.  While this queue is logically
  56  * unbounded, attempted additions may fail due to resource exhaustion
  57  * (causing {@code OutOfMemoryError}). This class does not permit
  58  * {@code null} elements.  A priority queue relying on {@linkplain
  59  * Comparable natural ordering} also does not permit insertion of
  60  * non-comparable objects (doing so results in
  61  * {@code ClassCastException}).
  62  *
  63  * <p>This class and its iterator implement all of the
  64  * <em>optional</em> methods of the {@link Collection} and {@link
  65  * Iterator} interfaces.  The Iterator provided in method {@link
  66  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
  67  * the PriorityBlockingQueue in any particular order. If you need
  68  * ordered traversal, consider using
  69  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
  70  * can be used to <em>remove</em> some or all elements in priority
  71  * order and place them in another collection.
  72  *
  73  * <p>Operations on this class make no guarantees about the ordering
  74  * of elements with equal priority. If you need to enforce an
  75  * ordering, you can define custom classes or comparators that use a
  76  * secondary key to break ties in primary priority values.  For
  77  * example, here is a class that applies first-in-first-out
  78  * tie-breaking to comparable elements. To use it, you would insert a
  79  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
  80  *
  81  * <pre> {@code
  82  * class FIFOEntry<E extends Comparable<? super E>>
  83  *     implements Comparable<FIFOEntry<E>> {
  84  *   static final AtomicLong seq = new AtomicLong(0);
  85  *   final long seqNum;
  86  *   final E entry;
  87  *   public FIFOEntry(E entry) {
  88  *     seqNum = seq.getAndIncrement();
  89  *     this.entry = entry;
  90  *   }
  91  *   public E getEntry() { return entry; }
  92  *   public int compareTo(FIFOEntry<E> other) {
  93  *     int res = entry.compareTo(other.entry);
  94  *     if (res == 0 && other.entry != this.entry)
  95  *       res = (seqNum < other.seqNum ? -1 : 1);
  96  *     return res;
  97  *   }
  98  * }}</pre>
  99  *
 100  * <p>This class is a member of the
 101  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 102  * Java Collections Framework</a>.
 103  *
 104  * @since 1.5
 105  * @author Doug Lea
 106  * @param <E> the type of elements held in this queue
 107  */
 108 @SuppressWarnings("unchecked")
 109 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
 110     implements BlockingQueue<E>, java.io.Serializable {
 111     private static final long serialVersionUID = 5595510919245408276L;
 112 
 113     /*
 114      * The implementation uses an array-based binary heap, with public
 115      * operations protected with a single lock. However, allocation
 116      * during resizing uses a simple spinlock (used only while not
 117      * holding main lock) in order to allow takes to operate
 118      * concurrently with allocation.  This avoids repeated
 119      * postponement of waiting consumers and consequent element
 120      * build-up. The need to back away from lock during allocation
 121      * makes it impossible to simply wrap delegated
 122      * java.util.PriorityQueue operations within a lock, as was done
 123      * in a previous version of this class. To maintain
 124      * interoperability, a plain PriorityQueue is still used during
 125      * serialization, which maintains compatibility at the expense of
 126      * transiently doubling overhead.
 127      */
 128 
 129     /**
 130      * Default array capacity.
 131      */
 132     private static final int DEFAULT_INITIAL_CAPACITY = 11;
 133 
 134     /**
 135      * The maximum size of array to allocate.
 136      * Some VMs reserve some header words in an array.
 137      * Attempts to allocate larger arrays may result in
 138      * OutOfMemoryError: Requested array size exceeds VM limit
 139      */
 140     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 141 
 142     /**
 143      * Priority queue represented as a balanced binary heap: the two
 144      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 145      * priority queue is ordered by comparator, or by the elements'
 146      * natural ordering, if comparator is null: For each node n in the
 147      * heap and each descendant d of n, n <= d.  The element with the
 148      * lowest value is in queue[0], assuming the queue is nonempty.
 149      */
 150     private transient Object[] queue;
 151 
 152     /**
 153      * The number of elements in the priority queue.
 154      */
 155     private transient int size;
 156 
 157     /**
 158      * The comparator, or null if priority queue uses elements'
 159      * natural ordering.
 160      */
 161     private transient Comparator<? super E> comparator;
 162 
 163     /**
 164      * Lock used for all public operations.
 165      */
 166     private final ReentrantLock lock;
 167 
 168     /**
 169      * Condition for blocking when empty.
 170      */
 171     private final Condition notEmpty;
 172 
 173     /**
 174      * Spinlock for allocation, acquired via CAS.
 175      */
 176     private transient volatile int allocationSpinLock;
 177 
 178     /**
 179      * A plain PriorityQueue used only for serialization,
 180      * to maintain compatibility with previous versions
 181      * of this class. Non-null only during serialization/deserialization.
 182      */
 183     private PriorityQueue<E> q;
 184 
 185     /**
 186      * Creates a {@code PriorityBlockingQueue} with the default
 187      * initial capacity (11) that orders its elements according to
 188      * their {@linkplain Comparable natural ordering}.
 189      */
 190     public PriorityBlockingQueue() {
 191         this(DEFAULT_INITIAL_CAPACITY, null);
 192     }
 193 
 194     /**
 195      * Creates a {@code PriorityBlockingQueue} with the specified
 196      * initial capacity that orders its elements according to their
 197      * {@linkplain Comparable natural ordering}.
 198      *
 199      * @param initialCapacity the initial capacity for this priority queue
 200      * @throws IllegalArgumentException if {@code initialCapacity} is less
 201      *         than 1
 202      */
 203     public PriorityBlockingQueue(int initialCapacity) {
 204         this(initialCapacity, null);
 205     }
 206 
 207     /**
 208      * Creates a {@code PriorityBlockingQueue} with the specified initial
 209      * capacity that orders its elements according to the specified
 210      * comparator.
 211      *
 212      * @param initialCapacity the initial capacity for this priority queue
 213      * @param  comparator the comparator that will be used to order this
 214      *         priority queue.  If {@code null}, the {@linkplain Comparable
 215      *         natural ordering} of the elements will be used.
 216      * @throws IllegalArgumentException if {@code initialCapacity} is less
 217      *         than 1
 218      */
 219     public PriorityBlockingQueue(int initialCapacity,
 220                                  Comparator<? super E> comparator) {
 221         if (initialCapacity < 1)
 222             throw new IllegalArgumentException();
 223         this.lock = new ReentrantLock();
 224         this.notEmpty = lock.newCondition();
 225         this.comparator = comparator;
 226         this.queue = new Object[initialCapacity];
 227     }
 228 
 229     /**
 230      * Creates a {@code PriorityBlockingQueue} containing the elements
 231      * in the specified collection.  If the specified collection is a
 232      * {@link SortedSet} or a {@link PriorityQueue}, this
 233      * priority queue will be ordered according to the same ordering.
 234      * Otherwise, this priority queue will be ordered according to the
 235      * {@linkplain Comparable natural ordering} of its elements.
 236      *
 237      * @param  c the collection whose elements are to be placed
 238      *         into this priority queue
 239      * @throws ClassCastException if elements of the specified collection
 240      *         cannot be compared to one another according to the priority
 241      *         queue's ordering
 242      * @throws NullPointerException if the specified collection or any
 243      *         of its elements are null
 244      */
 245     public PriorityBlockingQueue(Collection<? extends E> c) {
 246         this.lock = new ReentrantLock();
 247         this.notEmpty = lock.newCondition();
 248         boolean heapify = true; // true if not known to be in heap order
 249         boolean screen = true;  // true if must screen for nulls
 250         if (c instanceof SortedSet<?>) {
 251             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
 252             this.comparator = (Comparator<? super E>) ss.comparator();
 253             heapify = false;
 254         }
 255         else if (c instanceof PriorityBlockingQueue<?>) {
 256             PriorityBlockingQueue<? extends E> pq =
 257                 (PriorityBlockingQueue<? extends E>) c;
 258             this.comparator = (Comparator<? super E>) pq.comparator();
 259             screen = false;
 260             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
 261                 heapify = false;
 262         }
 263         Object[] a = c.toArray();
 264         int n = a.length;
 265         // If c.toArray incorrectly doesn't return Object[], copy it.
 266         if (a.getClass() != Object[].class)
 267             a = Arrays.copyOf(a, n, Object[].class);
 268         if (screen && (n == 1 || this.comparator != null)) {
 269             for (int i = 0; i < n; ++i)
 270                 if (a[i] == null)
 271                     throw new NullPointerException();
 272         }
 273         this.queue = a;
 274         this.size = n;
 275         if (heapify)
 276             heapify();
 277     }
 278 
 279     /**
 280      * Tries to grow array to accommodate at least one more element
 281      * (but normally expand by about 50%), giving up (allowing retry)
 282      * on contention (which we expect to be rare). Call only while
 283      * holding lock.
 284      *
 285      * @param array the heap array
 286      * @param oldCap the length of the array
 287      */
 288     private void tryGrow(Object[] array, int oldCap) {
 289         lock.unlock(); // must release and then re-acquire main lock
 290         Object[] newArray = null;
 291         if (allocationSpinLock == 0 &&
 292             U.compareAndSwapInt(this, ALLOCATIONSPINLOCK, 0, 1)) {
 293             try {
 294                 int newCap = oldCap + ((oldCap < 64) ?
 295                                        (oldCap + 2) : // grow faster if small
 296                                        (oldCap >> 1));
 297                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
 298                     int minCap = oldCap + 1;
 299                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
 300                         throw new OutOfMemoryError();
 301                     newCap = MAX_ARRAY_SIZE;
 302                 }
 303                 if (newCap > oldCap && queue == array)
 304                     newArray = new Object[newCap];
 305             } finally {
 306                 allocationSpinLock = 0;
 307             }
 308         }
 309         if (newArray == null) // back off if another thread is allocating
 310             Thread.yield();
 311         lock.lock();
 312         if (newArray != null && queue == array) {
 313             queue = newArray;
 314             System.arraycopy(array, 0, newArray, 0, oldCap);
 315         }
 316     }
 317 
 318     /**
 319      * Mechanics for poll().  Call only while holding lock.
 320      */
 321     private E dequeue() {
 322         int n = size - 1;
 323         if (n < 0)
 324             return null;
 325         else {
 326             Object[] array = queue;
 327             E result = (E) array[0];
 328             E x = (E) array[n];
 329             array[n] = null;
 330             Comparator<? super E> cmp = comparator;
 331             if (cmp == null)
 332                 siftDownComparable(0, x, array, n);
 333             else
 334                 siftDownUsingComparator(0, x, array, n, cmp);
 335             size = n;
 336             return result;
 337         }
 338     }
 339 
 340     /**
 341      * Inserts item x at position k, maintaining heap invariant by
 342      * promoting x up the tree until it is greater than or equal to
 343      * its parent, or is the root.
 344      *
 345      * To simplify and speed up coercions and comparisons. the
 346      * Comparable and Comparator versions are separated into different
 347      * methods that are otherwise identical. (Similarly for siftDown.)
 348      * These methods are static, with heap state as arguments, to
 349      * simplify use in light of possible comparator exceptions.
 350      *
 351      * @param k the position to fill
 352      * @param x the item to insert
 353      * @param array the heap array
 354      */
 355     private static <T> void siftUpComparable(int k, T x, Object[] array) {
 356         Comparable<? super T> key = (Comparable<? super T>) x;
 357         while (k > 0) {
 358             int parent = (k - 1) >>> 1;
 359             Object e = array[parent];
 360             if (key.compareTo((T) e) >= 0)
 361                 break;
 362             array[k] = e;
 363             k = parent;
 364         }
 365         array[k] = key;
 366     }
 367 
 368     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
 369                                        Comparator<? super T> cmp) {
 370         while (k > 0) {
 371             int parent = (k - 1) >>> 1;
 372             Object e = array[parent];
 373             if (cmp.compare(x, (T) e) >= 0)
 374                 break;
 375             array[k] = e;
 376             k = parent;
 377         }
 378         array[k] = x;
 379     }
 380 
 381     /**
 382      * Inserts item x at position k, maintaining heap invariant by
 383      * demoting x down the tree repeatedly until it is less than or
 384      * equal to its children or is a leaf.
 385      *
 386      * @param k the position to fill
 387      * @param x the item to insert
 388      * @param array the heap array
 389      * @param n heap size
 390      */
 391     private static <T> void siftDownComparable(int k, T x, Object[] array,
 392                                                int n) {
 393         if (n > 0) {
 394             Comparable<? super T> key = (Comparable<? super T>)x;
 395             int half = n >>> 1;           // loop while a non-leaf
 396             while (k < half) {
 397                 int child = (k << 1) + 1; // assume left child is least
 398                 Object c = array[child];
 399                 int right = child + 1;
 400                 if (right < n &&
 401                     ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
 402                     c = array[child = right];
 403                 if (key.compareTo((T) c) <= 0)
 404                     break;
 405                 array[k] = c;
 406                 k = child;
 407             }
 408             array[k] = key;
 409         }
 410     }
 411 
 412     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
 413                                                     int n,
 414                                                     Comparator<? super T> cmp) {
 415         if (n > 0) {
 416             int half = n >>> 1;
 417             while (k < half) {
 418                 int child = (k << 1) + 1;
 419                 Object c = array[child];
 420                 int right = child + 1;
 421                 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
 422                     c = array[child = right];
 423                 if (cmp.compare(x, (T) c) <= 0)
 424                     break;
 425                 array[k] = c;
 426                 k = child;
 427             }
 428             array[k] = x;
 429         }
 430     }
 431 
 432     /**
 433      * Establishes the heap invariant (described above) in the entire tree,
 434      * assuming nothing about the order of the elements prior to the call.
 435      */
 436     private void heapify() {
 437         Object[] array = queue;
 438         int n = size;
 439         int half = (n >>> 1) - 1;
 440         Comparator<? super E> cmp = comparator;
 441         if (cmp == null) {
 442             for (int i = half; i >= 0; i--)
 443                 siftDownComparable(i, (E) array[i], array, n);
 444         }
 445         else {
 446             for (int i = half; i >= 0; i--)
 447                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
 448         }
 449     }
 450 
 451     /**
 452      * Inserts the specified element into this priority queue.
 453      *
 454      * @param e the element to add
 455      * @return {@code true} (as specified by {@link Collection#add})
 456      * @throws ClassCastException if the specified element cannot be compared
 457      *         with elements currently in the priority queue according to the
 458      *         priority queue's ordering
 459      * @throws NullPointerException if the specified element is null
 460      */
 461     public boolean add(E e) {
 462         return offer(e);
 463     }
 464 
 465     /**
 466      * Inserts the specified element into this priority queue.
 467      * As the queue is unbounded, this method will never return {@code false}.
 468      *
 469      * @param e the element to add
 470      * @return {@code true} (as specified by {@link Queue#offer})
 471      * @throws ClassCastException if the specified element cannot be compared
 472      *         with elements currently in the priority queue according to the
 473      *         priority queue's ordering
 474      * @throws NullPointerException if the specified element is null
 475      */
 476     public boolean offer(E e) {
 477         if (e == null)
 478             throw new NullPointerException();
 479         final ReentrantLock lock = this.lock;
 480         lock.lock();
 481         int n, cap;
 482         Object[] array;
 483         while ((n = size) >= (cap = (array = queue).length))
 484             tryGrow(array, cap);
 485         try {
 486             Comparator<? super E> cmp = comparator;
 487             if (cmp == null)
 488                 siftUpComparable(n, e, array);
 489             else
 490                 siftUpUsingComparator(n, e, array, cmp);
 491             size = n + 1;
 492             notEmpty.signal();
 493         } finally {
 494             lock.unlock();
 495         }
 496         return true;
 497     }
 498 
 499     /**
 500      * Inserts the specified element into this priority queue.
 501      * As the queue is unbounded, this method will never block.
 502      *
 503      * @param e the element to add
 504      * @throws ClassCastException if the specified element cannot be compared
 505      *         with elements currently in the priority queue according to the
 506      *         priority queue's ordering
 507      * @throws NullPointerException if the specified element is null
 508      */
 509     public void put(E e) {
 510         offer(e); // never need to block
 511     }
 512 
 513     /**
 514      * Inserts the specified element into this priority queue.
 515      * As the queue is unbounded, this method will never block or
 516      * return {@code false}.
 517      *
 518      * @param e the element to add
 519      * @param timeout This parameter is ignored as the method never blocks
 520      * @param unit This parameter is ignored as the method never blocks
 521      * @return {@code true} (as specified by
 522      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
 523      * @throws ClassCastException if the specified element cannot be compared
 524      *         with elements currently in the priority queue according to the
 525      *         priority queue's ordering
 526      * @throws NullPointerException if the specified element is null
 527      */
 528     public boolean offer(E e, long timeout, TimeUnit unit) {
 529         return offer(e); // never need to block
 530     }
 531 
 532     public E poll() {
 533         final ReentrantLock lock = this.lock;
 534         lock.lock();
 535         try {
 536             return dequeue();
 537         } finally {
 538             lock.unlock();
 539         }
 540     }
 541 
 542     public E take() throws InterruptedException {
 543         final ReentrantLock lock = this.lock;
 544         lock.lockInterruptibly();
 545         E result;
 546         try {
 547             while ( (result = dequeue()) == null)
 548                 notEmpty.await();
 549         } finally {
 550             lock.unlock();
 551         }
 552         return result;
 553     }
 554 
 555     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 556         long nanos = unit.toNanos(timeout);
 557         final ReentrantLock lock = this.lock;
 558         lock.lockInterruptibly();
 559         E result;
 560         try {
 561             while ( (result = dequeue()) == null && nanos > 0)
 562                 nanos = notEmpty.awaitNanos(nanos);
 563         } finally {
 564             lock.unlock();
 565         }
 566         return result;
 567     }
 568 
 569     public E peek() {
 570         final ReentrantLock lock = this.lock;
 571         lock.lock();
 572         try {
 573             return (size == 0) ? null : (E) queue[0];
 574         } finally {
 575             lock.unlock();
 576         }
 577     }
 578 
 579     /**
 580      * Returns the comparator used to order the elements in this queue,
 581      * or {@code null} if this queue uses the {@linkplain Comparable
 582      * natural ordering} of its elements.
 583      *
 584      * @return the comparator used to order the elements in this queue,
 585      *         or {@code null} if this queue uses the natural
 586      *         ordering of its elements
 587      */
 588     public Comparator<? super E> comparator() {
 589         return comparator;
 590     }
 591 
 592     public int size() {
 593         final ReentrantLock lock = this.lock;
 594         lock.lock();
 595         try {
 596             return size;
 597         } finally {
 598             lock.unlock();
 599         }
 600     }
 601 
 602     /**
 603      * Always returns {@code Integer.MAX_VALUE} because
 604      * a {@code PriorityBlockingQueue} is not capacity constrained.
 605      * @return {@code Integer.MAX_VALUE} always
 606      */
 607     public int remainingCapacity() {
 608         return Integer.MAX_VALUE;
 609     }
 610 
 611     private int indexOf(Object o) {
 612         if (o != null) {
 613             Object[] array = queue;
 614             int n = size;
 615             for (int i = 0; i < n; i++)
 616                 if (o.equals(array[i]))
 617                     return i;
 618         }
 619         return -1;
 620     }
 621 
 622     /**
 623      * Removes the ith element from queue.
 624      */
 625     private void removeAt(int i) {
 626         Object[] array = queue;
 627         int n = size - 1;
 628         if (n == i) // removed last element
 629             array[i] = null;
 630         else {
 631             E moved = (E) array[n];
 632             array[n] = null;
 633             Comparator<? super E> cmp = comparator;
 634             if (cmp == null)
 635                 siftDownComparable(i, moved, array, n);
 636             else
 637                 siftDownUsingComparator(i, moved, array, n, cmp);
 638             if (array[i] == moved) {
 639                 if (cmp == null)
 640                     siftUpComparable(i, moved, array);
 641                 else
 642                     siftUpUsingComparator(i, moved, array, cmp);
 643             }
 644         }
 645         size = n;
 646     }
 647 
 648     /**
 649      * Removes a single instance of the specified element from this queue,
 650      * if it is present.  More formally, removes an element {@code e} such
 651      * that {@code o.equals(e)}, if this queue contains one or more such
 652      * elements.  Returns {@code true} if and only if this queue contained
 653      * the specified element (or equivalently, if this queue changed as a
 654      * result of the call).
 655      *
 656      * @param o element to be removed from this queue, if present
 657      * @return {@code true} if this queue changed as a result of the call
 658      */
 659     public boolean remove(Object o) {
 660         final ReentrantLock lock = this.lock;
 661         lock.lock();
 662         try {
 663             int i = indexOf(o);
 664             if (i == -1)
 665                 return false;
 666             removeAt(i);
 667             return true;
 668         } finally {
 669             lock.unlock();
 670         }
 671     }
 672 
 673     /**
 674      * Identity-based version for use in Itr.remove.
 675      */
 676     void removeEQ(Object o) {
 677         final ReentrantLock lock = this.lock;
 678         lock.lock();
 679         try {
 680             Object[] array = queue;
 681             for (int i = 0, n = size; i < n; i++) {
 682                 if (o == array[i]) {
 683                     removeAt(i);
 684                     break;
 685                 }
 686             }
 687         } finally {
 688             lock.unlock();
 689         }
 690     }
 691 
 692     /**
 693      * Returns {@code true} if this queue contains the specified element.
 694      * More formally, returns {@code true} if and only if this queue contains
 695      * at least one element {@code e} such that {@code o.equals(e)}.
 696      *
 697      * @param o object to be checked for containment in this queue
 698      * @return {@code true} if this queue contains the specified element
 699      */
 700     public boolean contains(Object o) {
 701         final ReentrantLock lock = this.lock;
 702         lock.lock();
 703         try {
 704             return indexOf(o) != -1;
 705         } finally {
 706             lock.unlock();
 707         }
 708     }
 709 
 710     public String toString() {
 711         return Helpers.collectionToString(this);
 712     }
 713 
 714     /**
 715      * @throws UnsupportedOperationException {@inheritDoc}
 716      * @throws ClassCastException            {@inheritDoc}
 717      * @throws NullPointerException          {@inheritDoc}
 718      * @throws IllegalArgumentException      {@inheritDoc}
 719      */
 720     public int drainTo(Collection<? super E> c) {
 721         return drainTo(c, Integer.MAX_VALUE);
 722     }
 723 
 724     /**
 725      * @throws UnsupportedOperationException {@inheritDoc}
 726      * @throws ClassCastException            {@inheritDoc}
 727      * @throws NullPointerException          {@inheritDoc}
 728      * @throws IllegalArgumentException      {@inheritDoc}
 729      */
 730     public int drainTo(Collection<? super E> c, int maxElements) {
 731         if (c == null)
 732             throw new NullPointerException();
 733         if (c == this)
 734             throw new IllegalArgumentException();
 735         if (maxElements <= 0)
 736             return 0;
 737         final ReentrantLock lock = this.lock;
 738         lock.lock();
 739         try {
 740             int n = Math.min(size, maxElements);
 741             for (int i = 0; i < n; i++) {
 742                 c.add((E) queue[0]); // In this order, in case add() throws.
 743                 dequeue();
 744             }
 745             return n;
 746         } finally {
 747             lock.unlock();
 748         }
 749     }
 750 
 751     /**
 752      * Atomically removes all of the elements from this queue.
 753      * The queue will be empty after this call returns.
 754      */
 755     public void clear() {
 756         final ReentrantLock lock = this.lock;
 757         lock.lock();
 758         try {
 759             Object[] array = queue;
 760             int n = size;
 761             size = 0;
 762             for (int i = 0; i < n; i++)
 763                 array[i] = null;
 764         } finally {
 765             lock.unlock();
 766         }
 767     }
 768 
 769     /**
 770      * Returns an array containing all of the elements in this queue.
 771      * The returned array elements are in no particular order.
 772      *
 773      * <p>The returned array will be "safe" in that no references to it are
 774      * maintained by this queue.  (In other words, this method must allocate
 775      * a new array).  The caller is thus free to modify the returned array.
 776      *
 777      * <p>This method acts as bridge between array-based and collection-based
 778      * APIs.
 779      *
 780      * @return an array containing all of the elements in this queue
 781      */
 782     public Object[] toArray() {
 783         final ReentrantLock lock = this.lock;
 784         lock.lock();
 785         try {
 786             return Arrays.copyOf(queue, size);
 787         } finally {
 788             lock.unlock();
 789         }
 790     }
 791 
 792     /**
 793      * Returns an array containing all of the elements in this queue; the
 794      * runtime type of the returned array is that of the specified array.
 795      * The returned array elements are in no particular order.
 796      * If the queue fits in the specified array, it is returned therein.
 797      * Otherwise, a new array is allocated with the runtime type of the
 798      * specified array and the size of this queue.
 799      *
 800      * <p>If this queue fits in the specified array with room to spare
 801      * (i.e., the array has more elements than this queue), the element in
 802      * the array immediately following the end of the queue is set to
 803      * {@code null}.
 804      *
 805      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 806      * array-based and collection-based APIs.  Further, this method allows
 807      * precise control over the runtime type of the output array, and may,
 808      * under certain circumstances, be used to save allocation costs.
 809      *
 810      * <p>Suppose {@code x} is a queue known to contain only strings.
 811      * The following code can be used to dump the queue into a newly
 812      * allocated array of {@code String}:
 813      *
 814      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
 815      *
 816      * Note that {@code toArray(new Object[0])} is identical in function to
 817      * {@code toArray()}.
 818      *
 819      * @param a the array into which the elements of the queue are to
 820      *          be stored, if it is big enough; otherwise, a new array of the
 821      *          same runtime type is allocated for this purpose
 822      * @return an array containing all of the elements in this queue
 823      * @throws ArrayStoreException if the runtime type of the specified array
 824      *         is not a supertype of the runtime type of every element in
 825      *         this queue
 826      * @throws NullPointerException if the specified array is null
 827      */
 828     public <T> T[] toArray(T[] a) {
 829         final ReentrantLock lock = this.lock;
 830         lock.lock();
 831         try {
 832             int n = size;
 833             if (a.length < n)
 834                 // Make a new array of a's runtime type, but my contents:
 835                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
 836             System.arraycopy(queue, 0, a, 0, n);
 837             if (a.length > n)
 838                 a[n] = null;
 839             return a;
 840         } finally {
 841             lock.unlock();
 842         }
 843     }
 844 
 845     /**
 846      * Returns an iterator over the elements in this queue. The
 847      * iterator does not return the elements in any particular order.
 848      *
 849      * <p>The returned iterator is
 850      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
 851      *
 852      * @return an iterator over the elements in this queue
 853      */
 854     public Iterator<E> iterator() {
 855         return new Itr(toArray());
 856     }
 857 
 858     /**
 859      * Snapshot iterator that works off copy of underlying q array.
 860      */
 861     final class Itr implements Iterator<E> {
 862         final Object[] array; // Array of all elements
 863         int cursor;           // index of next element to return
 864         int lastRet;          // index of last element, or -1 if no such
 865 
 866         Itr(Object[] array) {
 867             lastRet = -1;
 868             this.array = array;
 869         }
 870 
 871         public boolean hasNext() {
 872             return cursor < array.length;
 873         }
 874 
 875         public E next() {
 876             if (cursor >= array.length)
 877                 throw new NoSuchElementException();
 878             lastRet = cursor;
 879             return (E)array[cursor++];
 880         }
 881 
 882         public void remove() {
 883             if (lastRet < 0)
 884                 throw new IllegalStateException();
 885             removeEQ(array[lastRet]);
 886             lastRet = -1;
 887         }
 888     }
 889 
 890     /**
 891      * Saves this queue to a stream (that is, serializes it).
 892      *
 893      * For compatibility with previous version of this class, elements
 894      * are first copied to a java.util.PriorityQueue, which is then
 895      * serialized.
 896      *
 897      * @param s the stream
 898      * @throws java.io.IOException if an I/O error occurs
 899      */
 900     private void writeObject(java.io.ObjectOutputStream s)
 901         throws java.io.IOException {
 902         lock.lock();
 903         try {
 904             // avoid zero capacity argument
 905             q = new PriorityQueue<E>(Math.max(size, 1), comparator);
 906             q.addAll(this);
 907             s.defaultWriteObject();
 908         } finally {
 909             q = null;
 910             lock.unlock();
 911         }
 912     }
 913 
 914     /**
 915      * Reconstitutes this queue from a stream (that is, deserializes it).
 916      * @param s the stream
 917      * @throws ClassNotFoundException if the class of a serialized object
 918      *         could not be found
 919      * @throws java.io.IOException if an I/O error occurs
 920      */
 921     private void readObject(java.io.ObjectInputStream s)
 922         throws java.io.IOException, ClassNotFoundException {
 923         try {
 924             s.defaultReadObject();
 925             this.queue = new Object[q.size()];
 926             comparator = q.comparator();
 927             addAll(q);
 928         } finally {
 929             q = null;
 930         }
 931     }
 932 
 933     // Similar to Collections.ArraySnapshotSpliterator but avoids
 934     // commitment to toArray until needed
 935     static final class PBQSpliterator<E> implements Spliterator<E> {
 936         final PriorityBlockingQueue<E> queue;
 937         Object[] array;
 938         int index;
 939         int fence;
 940 
 941         PBQSpliterator(PriorityBlockingQueue<E> queue, Object[] array,
 942                        int index, int fence) {
 943             this.queue = queue;
 944             this.array = array;
 945             this.index = index;
 946             this.fence = fence;
 947         }
 948 
 949         final int getFence() {
 950             int hi;
 951             if ((hi = fence) < 0)
 952                 hi = fence = (array = queue.toArray()).length;
 953             return hi;
 954         }
 955 
 956         public PBQSpliterator<E> trySplit() {
 957             int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
 958             return (lo >= mid) ? null :
 959                 new PBQSpliterator<E>(queue, array, lo, index = mid);
 960         }
 961 
 962         @SuppressWarnings("unchecked")
 963         public void forEachRemaining(Consumer<? super E> action) {
 964             Object[] a; int i, hi; // hoist accesses and checks from loop
 965             if (action == null)
 966                 throw new NullPointerException();
 967             if ((a = array) == null)
 968                 fence = (a = queue.toArray()).length;
 969             if ((hi = fence) <= a.length &&
 970                 (i = index) >= 0 && i < (index = hi)) {
 971                 do { action.accept((E)a[i]); } while (++i < hi);
 972             }
 973         }
 974 
 975         public boolean tryAdvance(Consumer<? super E> action) {
 976             if (action == null)
 977                 throw new NullPointerException();
 978             if (getFence() > index && index >= 0) {
 979                 @SuppressWarnings("unchecked") E e = (E) array[index++];
 980                 action.accept(e);
 981                 return true;
 982             }
 983             return false;
 984         }
 985 
 986         public long estimateSize() { return (long)(getFence() - index); }
 987 
 988         public int characteristics() {
 989             return Spliterator.NONNULL | Spliterator.SIZED | Spliterator.SUBSIZED;
 990         }
 991     }
 992 
 993     /**
 994      * Returns a {@link Spliterator} over the elements in this queue.
 995      *
 996      * <p>The returned spliterator is
 997      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
 998      *
 999      * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
1000      * {@link Spliterator#NONNULL}.
1001      *
1002      * @implNote
1003      * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
1004      *
1005      * @return a {@code Spliterator} over the elements in this queue
1006      * @since 1.8
1007      */
1008     public Spliterator<E> spliterator() {
1009         return new PBQSpliterator<E>(this, null, 0, -1);
1010     }
1011 
1012     // Unsafe mechanics
1013     private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
1014     private static final long ALLOCATIONSPINLOCK;
1015     static {
1016         try {
1017             ALLOCATIONSPINLOCK = U.objectFieldOffset
1018                 (PriorityBlockingQueue.class.getDeclaredField("allocationSpinLock"));
1019         } catch (ReflectiveOperationException e) {
1020             throw new Error(e);
1021         }
1022     }
1023 }