1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.locks.*;
  39 import java.util.*;
  40 
  41 /**
  42  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  43  * the same ordering rules as class {@link PriorityQueue} and supplies
  44  * blocking retrieval operations.  While this queue is logically
  45  * unbounded, attempted additions may fail due to resource exhaustion
  46  * (causing {@code OutOfMemoryError}). This class does not permit
  47  * {@code null} elements.  A priority queue relying on {@linkplain
  48  * Comparable natural ordering} also does not permit insertion of
  49  * non-comparable objects (doing so results in
  50  * {@code ClassCastException}).
  51  *
  52  * <p>This class and its iterator implement all of the
  53  * <em>optional</em> methods of the {@link Collection} and {@link
  54  * Iterator} interfaces.  The Iterator provided in method {@link
  55  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
  56  * the PriorityBlockingQueue in any particular order. If you need
  57  * ordered traversal, consider using
  58  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
  59  * can be used to <em>remove</em> some or all elements in priority
  60  * order and place them in another collection.
  61  *
  62  * <p>Operations on this class make no guarantees about the ordering
  63  * of elements with equal priority. If you need to enforce an
  64  * ordering, you can define custom classes or comparators that use a
  65  * secondary key to break ties in primary priority values.  For
  66  * example, here is a class that applies first-in-first-out
  67  * tie-breaking to comparable elements. To use it, you would insert a
  68  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
  69  *
  70  *  <pre> {@code
  71  * class FIFOEntry<E extends Comparable<? super E>>
  72  *     implements Comparable<FIFOEntry<E>> {
  73  *   static final AtomicLong seq = new AtomicLong(0);
  74  *   final long seqNum;
  75  *   final E entry;
  76  *   public FIFOEntry(E entry) {
  77  *     seqNum = seq.getAndIncrement();
  78  *     this.entry = entry;
  79  *   }
  80  *   public E getEntry() { return entry; }
  81  *   public int compareTo(FIFOEntry<E> other) {
  82  *     int res = entry.compareTo(other.entry);
  83  *     if (res == 0 && other.entry != this.entry)
  84  *       res = (seqNum < other.seqNum ? -1 : 1);
  85  *     return res;
  86  *   }
  87  * }}</pre>
  88  *
  89  * <p>This class is a member of the
  90  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  91  * Java Collections Framework</a>.
  92  *
  93  * @since 1.5
  94  * @author Doug Lea
  95  * @param <E> the type of elements held in this collection
  96  */
  97 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
  98     implements BlockingQueue<E>, java.io.Serializable {
  99     private static final long serialVersionUID = 5595510919245408276L;
 100 
 101     /*
 102      * The implementation uses an array-based binary heap, with public
 103      * operations protected with a single lock. However, allocation
 104      * during resizing uses a simple spinlock (used only while not
 105      * holding main lock) in order to allow takes to operate
 106      * concurrently with allocation.  This avoids repeated
 107      * postponement of waiting consumers and consequent element
 108      * build-up. The need to back away from lock during allocation
 109      * makes it impossible to simply wrap delegated
 110      * java.util.PriorityQueue operations within a lock, as was done
 111      * in a previous version of this class. To maintain
 112      * interoperability, a plain PriorityQueue is still used during
 113      * serialization, which maintains compatibility at the espense of
 114      * transiently doubling overhead.
 115      */
 116 
 117     /**
 118      * Default array capacity.
 119      */
 120     private static final int DEFAULT_INITIAL_CAPACITY = 11;
 121 
 122     /**
 123      * The maximum size of array to allocate.
 124      * Some VMs reserve some header words in an array.
 125      * Attempts to allocate larger arrays may result in
 126      * OutOfMemoryError: Requested array size exceeds VM limit
 127      */
 128     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 129 
 130     /**
 131      * Priority queue represented as a balanced binary heap: the two
 132      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 133      * priority queue is ordered by comparator, or by the elements'
 134      * natural ordering, if comparator is null: For each node n in the
 135      * heap and each descendant d of n, n <= d.  The element with the
 136      * lowest value is in queue[0], assuming the queue is nonempty.
 137      */
 138     private transient Object[] queue;
 139 
 140     /**
 141      * The number of elements in the priority queue.
 142      */
 143     private transient int size;
 144 
 145     /**
 146      * The comparator, or null if priority queue uses elements'
 147      * natural ordering.
 148      */
 149     private transient Comparator<? super E> comparator;
 150 
 151     /**
 152      * Lock used for all public operations
 153      */
 154     private final ReentrantLock lock;
 155 
 156     /**
 157      * Condition for blocking when empty
 158      */
 159     private final Condition notEmpty;
 160 
 161     /**
 162      * Spinlock for allocation, acquired via CAS.
 163      */
 164     private transient volatile int allocationSpinLock;
 165 
 166     /**
 167      * A plain PriorityQueue used only for serialization,
 168      * to maintain compatibility with previous versions
 169      * of this class. Non-null only during serialization/deserialization.
 170      */
 171     private PriorityQueue q;
 172 
 173     /**
 174      * Creates a {@code PriorityBlockingQueue} with the default
 175      * initial capacity (11) that orders its elements according to
 176      * their {@linkplain Comparable natural ordering}.
 177      */
 178     public PriorityBlockingQueue() {
 179         this(DEFAULT_INITIAL_CAPACITY, null);
 180     }
 181 
 182     /**
 183      * Creates a {@code PriorityBlockingQueue} with the specified
 184      * initial capacity that orders its elements according to their
 185      * {@linkplain Comparable natural ordering}.
 186      *
 187      * @param initialCapacity the initial capacity for this priority queue
 188      * @throws IllegalArgumentException if {@code initialCapacity} is less
 189      *         than 1
 190      */
 191     public PriorityBlockingQueue(int initialCapacity) {
 192         this(initialCapacity, null);
 193     }
 194 
 195     /**
 196      * Creates a {@code PriorityBlockingQueue} with the specified initial
 197      * capacity that orders its elements according to the specified
 198      * comparator.
 199      *
 200      * @param initialCapacity the initial capacity for this priority queue
 201      * @param  comparator the comparator that will be used to order this
 202      *         priority queue.  If {@code null}, the {@linkplain Comparable
 203      *         natural ordering} of the elements will be used.
 204      * @throws IllegalArgumentException if {@code initialCapacity} is less
 205      *         than 1
 206      */
 207     public PriorityBlockingQueue(int initialCapacity,
 208                                  Comparator<? super E> comparator) {
 209         if (initialCapacity < 1)
 210             throw new IllegalArgumentException();
 211         this.lock = new ReentrantLock();
 212         this.notEmpty = lock.newCondition();
 213         this.comparator = comparator;
 214         this.queue = new Object[initialCapacity];
 215     }
 216 
 217     /**
 218      * Creates a {@code PriorityBlockingQueue} containing the elements
 219      * in the specified collection.  If the specified collection is a
 220      * {@link SortedSet} or a {@link PriorityQueue},  this
 221      * priority queue will be ordered according to the same ordering.
 222      * Otherwise, this priority queue will be ordered according to the
 223      * {@linkplain Comparable natural ordering} of its elements.
 224      *
 225      * @param  c the collection whose elements are to be placed
 226      *         into this priority queue
 227      * @throws ClassCastException if elements of the specified collection
 228      *         cannot be compared to one another according to the priority
 229      *         queue's ordering
 230      * @throws NullPointerException if the specified collection or any
 231      *         of its elements are null
 232      */
 233     public PriorityBlockingQueue(Collection<? extends E> c) {
 234         this.lock = new ReentrantLock();
 235         this.notEmpty = lock.newCondition();
 236         boolean heapify = true; // true if not known to be in heap order
 237         boolean screen = true;  // true if must screen for nulls
 238         if (c instanceof SortedSet<?>) {
 239             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
 240             this.comparator = (Comparator<? super E>) ss.comparator();
 241             heapify = false;
 242         }
 243         else if (c instanceof PriorityBlockingQueue<?>) {
 244             PriorityBlockingQueue<? extends E> pq =
 245                 (PriorityBlockingQueue<? extends E>) c;
 246             this.comparator = (Comparator<? super E>) pq.comparator();
 247             screen = false;
 248             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
 249                 heapify = false;
 250         }
 251         Object[] a = c.toArray();
 252         int n = a.length;
 253         // If c.toArray incorrectly doesn't return Object[], copy it.
 254         if (a.getClass() != Object[].class)
 255             a = Arrays.copyOf(a, n, Object[].class);
 256         if (screen && (n == 1 || this.comparator != null)) {
 257             for (int i = 0; i < n; ++i)
 258                 if (a[i] == null)
 259                     throw new NullPointerException();
 260         }
 261         this.queue = a;
 262         this.size = n;
 263         if (heapify)
 264             heapify();
 265     }
 266 
 267     /**
 268      * Tries to grow array to accommodate at least one more element
 269      * (but normally expand by about 50%), giving up (allowing retry)
 270      * on contention (which we expect to be rare). Call only while
 271      * holding lock.
 272      *
 273      * @param array the heap array
 274      * @param oldCap the length of the array
 275      */
 276     private void tryGrow(Object[] array, int oldCap) {
 277         lock.unlock(); // must release and then re-acquire main lock
 278         Object[] newArray = null;
 279         if (allocationSpinLock == 0 &&
 280             UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
 281                                      0, 1)) {
 282             try {
 283                 int newCap = oldCap + ((oldCap < 64) ?
 284                                        (oldCap + 2) : // grow faster if small
 285                                        (oldCap >> 1));
 286                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
 287                     int minCap = oldCap + 1;
 288                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
 289                         throw new OutOfMemoryError();
 290                     newCap = MAX_ARRAY_SIZE;
 291                 }
 292                 if (newCap > oldCap && queue == array)
 293                     newArray = new Object[newCap];
 294             } finally {
 295                 allocationSpinLock = 0;
 296             }
 297         }
 298         if (newArray == null) // back off if another thread is allocating
 299             Thread.yield();
 300         lock.lock();
 301         if (newArray != null && queue == array) {
 302             queue = newArray;
 303             System.arraycopy(array, 0, newArray, 0, oldCap);
 304         }
 305     }
 306 
 307     /**
 308      * Mechanics for poll().  Call only while holding lock.
 309      */
 310     private E extract() {
 311         E result;
 312         int n = size - 1;
 313         if (n < 0)
 314             result = null;
 315         else {
 316             Object[] array = queue;
 317             result = (E) array[0];
 318             E x = (E) array[n];
 319             array[n] = null;
 320             Comparator<? super E> cmp = comparator;
 321             if (cmp == null)
 322                 siftDownComparable(0, x, array, n);
 323             else
 324                 siftDownUsingComparator(0, x, array, n, cmp);
 325             size = n;
 326         }
 327         return result;
 328     }
 329 
 330     /**
 331      * Inserts item x at position k, maintaining heap invariant by
 332      * promoting x up the tree until it is greater than or equal to
 333      * its parent, or is the root.
 334      *
 335      * To simplify and speed up coercions and comparisons. the
 336      * Comparable and Comparator versions are separated into different
 337      * methods that are otherwise identical. (Similarly for siftDown.)
 338      * These methods are static, with heap state as arguments, to
 339      * simplify use in light of possible comparator exceptions.
 340      *
 341      * @param k the position to fill
 342      * @param x the item to insert
 343      * @param array the heap array
 344      * @param n heap size
 345      */
 346     private static <T> void siftUpComparable(int k, T x, Object[] array) {
 347         Comparable<? super T> key = (Comparable<? super T>) x;
 348         while (k > 0) {
 349             int parent = (k - 1) >>> 1;
 350             Object e = array[parent];
 351             if (key.compareTo((T) e) >= 0)
 352                 break;
 353             array[k] = e;
 354             k = parent;
 355         }
 356         array[k] = key;
 357     }
 358 
 359     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
 360                                        Comparator<? super T> cmp) {
 361         while (k > 0) {
 362             int parent = (k - 1) >>> 1;
 363             Object e = array[parent];
 364             if (cmp.compare(x, (T) e) >= 0)
 365                 break;
 366             array[k] = e;
 367             k = parent;
 368         }
 369         array[k] = x;
 370     }
 371 
 372     /**
 373      * Inserts item x at position k, maintaining heap invariant by
 374      * demoting x down the tree repeatedly until it is less than or
 375      * equal to its children or is a leaf.
 376      *
 377      * @param k the position to fill
 378      * @param x the item to insert
 379      * @param array the heap array
 380      * @param n heap size
 381      */
 382     private static <T> void siftDownComparable(int k, T x, Object[] array,
 383                                                int n) {
 384         Comparable<? super T> key = (Comparable<? super T>)x;
 385         int half = n >>> 1;           // loop while a non-leaf
 386         while (k < half) {
 387             int child = (k << 1) + 1; // assume left child is least
 388             Object c = array[child];
 389             int right = child + 1;
 390             if (right < n &&
 391                 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
 392                 c = array[child = right];
 393             if (key.compareTo((T) c) <= 0)
 394                 break;
 395             array[k] = c;
 396             k = child;
 397         }
 398         array[k] = key;
 399     }
 400 
 401     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
 402                                                     int n,
 403                                                     Comparator<? super T> cmp) {
 404         int half = n >>> 1;
 405         while (k < half) {
 406             int child = (k << 1) + 1;
 407             Object c = array[child];
 408             int right = child + 1;
 409             if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
 410                 c = array[child = right];
 411             if (cmp.compare(x, (T) c) <= 0)
 412                 break;
 413             array[k] = c;
 414             k = child;
 415         }
 416         array[k] = x;
 417     }
 418 
 419     /**
 420      * Establishes the heap invariant (described above) in the entire tree,
 421      * assuming nothing about the order of the elements prior to the call.
 422      */
 423     private void heapify() {
 424         Object[] array = queue;
 425         int n = size;
 426         int half = (n >>> 1) - 1;
 427         Comparator<? super E> cmp = comparator;
 428         if (cmp == null) {
 429             for (int i = half; i >= 0; i--)
 430                 siftDownComparable(i, (E) array[i], array, n);
 431         }
 432         else {
 433             for (int i = half; i >= 0; i--)
 434                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
 435         }
 436     }
 437 
 438     /**
 439      * Inserts the specified element into this priority queue.
 440      *
 441      * @param e the element to add
 442      * @return {@code true} (as specified by {@link Collection#add})
 443      * @throws ClassCastException if the specified element cannot be compared
 444      *         with elements currently in the priority queue according to the
 445      *         priority queue's ordering
 446      * @throws NullPointerException if the specified element is null
 447      */
 448     public boolean add(E e) {
 449         return offer(e);
 450     }
 451 
 452     /**
 453      * Inserts the specified element into this priority queue.
 454      * As the queue is unbounded, this method will never return {@code false}.
 455      *
 456      * @param e the element to add
 457      * @return {@code true} (as specified by {@link Queue#offer})
 458      * @throws ClassCastException if the specified element cannot be compared
 459      *         with elements currently in the priority queue according to the
 460      *         priority queue's ordering
 461      * @throws NullPointerException if the specified element is null
 462      */
 463     public boolean offer(E e) {
 464         if (e == null)
 465             throw new NullPointerException();
 466         final ReentrantLock lock = this.lock;
 467         lock.lock();
 468         int n, cap;
 469         Object[] array;
 470         while ((n = size) >= (cap = (array = queue).length))
 471             tryGrow(array, cap);
 472         try {
 473             Comparator<? super E> cmp = comparator;
 474             if (cmp == null)
 475                 siftUpComparable(n, e, array);
 476             else
 477                 siftUpUsingComparator(n, e, array, cmp);
 478             size = n + 1;
 479             notEmpty.signal();
 480         } finally {
 481             lock.unlock();
 482         }
 483         return true;
 484     }
 485 
 486     /**
 487      * Inserts the specified element into this priority queue.
 488      * As the queue is unbounded, this method will never block.
 489      *
 490      * @param e the element to add
 491      * @throws ClassCastException if the specified element cannot be compared
 492      *         with elements currently in the priority queue according to the
 493      *         priority queue's ordering
 494      * @throws NullPointerException if the specified element is null
 495      */
 496     public void put(E e) {
 497         offer(e); // never need to block
 498     }
 499 
 500     /**
 501      * Inserts the specified element into this priority queue.
 502      * As the queue is unbounded, this method will never block or
 503      * return {@code false}.
 504      *
 505      * @param e the element to add
 506      * @param timeout This parameter is ignored as the method never blocks
 507      * @param unit This parameter is ignored as the method never blocks
 508      * @return {@code true} (as specified by
 509      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
 510      * @throws ClassCastException if the specified element cannot be compared
 511      *         with elements currently in the priority queue according to the
 512      *         priority queue's ordering
 513      * @throws NullPointerException if the specified element is null
 514      */
 515     public boolean offer(E e, long timeout, TimeUnit unit) {
 516         return offer(e); // never need to block
 517     }
 518 
 519     public E poll() {
 520         final ReentrantLock lock = this.lock;
 521         lock.lock();
 522         E result;
 523         try {
 524             result = extract();
 525         } finally {
 526             lock.unlock();
 527         }
 528         return result;
 529     }
 530 
 531     public E take() throws InterruptedException {
 532         final ReentrantLock lock = this.lock;
 533         lock.lockInterruptibly();
 534         E result;
 535         try {
 536             while ( (result = extract()) == null)
 537                 notEmpty.await();
 538         } finally {
 539             lock.unlock();
 540         }
 541         return result;
 542     }
 543 
 544     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 545         long nanos = unit.toNanos(timeout);
 546         final ReentrantLock lock = this.lock;
 547         lock.lockInterruptibly();
 548         E result;
 549         try {
 550             while ( (result = extract()) == null && nanos > 0)
 551                 nanos = notEmpty.awaitNanos(nanos);
 552         } finally {
 553             lock.unlock();
 554         }
 555         return result;
 556     }
 557 
 558     public E peek() {
 559         final ReentrantLock lock = this.lock;
 560         lock.lock();
 561         E result;
 562         try {
 563             result = size > 0 ? (E) queue[0] : null;
 564         } finally {
 565             lock.unlock();
 566         }
 567         return result;
 568     }
 569 
 570     /**
 571      * Returns the comparator used to order the elements in this queue,
 572      * or {@code null} if this queue uses the {@linkplain Comparable
 573      * natural ordering} of its elements.
 574      *
 575      * @return the comparator used to order the elements in this queue,
 576      *         or {@code null} if this queue uses the natural
 577      *         ordering of its elements
 578      */
 579     public Comparator<? super E> comparator() {
 580         return comparator;
 581     }
 582 
 583     public int size() {
 584         final ReentrantLock lock = this.lock;
 585         lock.lock();
 586         try {
 587             return size;
 588         } finally {
 589             lock.unlock();
 590         }
 591     }
 592 
 593     /**
 594      * Always returns {@code Integer.MAX_VALUE} because
 595      * a {@code PriorityBlockingQueue} is not capacity constrained.
 596      * @return {@code Integer.MAX_VALUE} always
 597      */
 598     public int remainingCapacity() {
 599         return Integer.MAX_VALUE;
 600     }
 601 
 602     private int indexOf(Object o) {
 603         if (o != null) {
 604             Object[] array = queue;
 605             int n = size;
 606             for (int i = 0; i < n; i++)
 607                 if (o.equals(array[i]))
 608                     return i;
 609         }
 610         return -1;
 611     }
 612 
 613     /**
 614      * Removes the ith element from queue.
 615      */
 616     private void removeAt(int i) {
 617         Object[] array = queue;
 618         int n = size - 1;
 619         if (n == i) // removed last element
 620             array[i] = null;
 621         else {
 622             E moved = (E) array[n];
 623             array[n] = null;
 624             Comparator<? super E> cmp = comparator;
 625             if (cmp == null)
 626                 siftDownComparable(i, moved, array, n);
 627             else
 628                 siftDownUsingComparator(i, moved, array, n, cmp);
 629             if (array[i] == moved) {
 630                 if (cmp == null)
 631                     siftUpComparable(i, moved, array);
 632                 else
 633                     siftUpUsingComparator(i, moved, array, cmp);
 634             }
 635         }
 636         size = n;
 637     }
 638 
 639     /**
 640      * Removes a single instance of the specified element from this queue,
 641      * if it is present.  More formally, removes an element {@code e} such
 642      * that {@code o.equals(e)}, if this queue contains one or more such
 643      * elements.  Returns {@code true} if and only if this queue contained
 644      * the specified element (or equivalently, if this queue changed as a
 645      * result of the call).
 646      *
 647      * @param o element to be removed from this queue, if present
 648      * @return {@code true} if this queue changed as a result of the call
 649      */
 650     public boolean remove(Object o) {
 651         boolean removed = false;
 652         final ReentrantLock lock = this.lock;
 653         lock.lock();
 654         try {
 655             int i = indexOf(o);
 656             if (i != -1) {
 657                 removeAt(i);
 658                 removed = true;
 659             }
 660         } finally {
 661             lock.unlock();
 662         }
 663         return removed;
 664     }
 665 
 666 
 667     /**
 668      * Identity-based version for use in Itr.remove
 669      */
 670     private void removeEQ(Object o) {
 671         final ReentrantLock lock = this.lock;
 672         lock.lock();
 673         try {
 674             Object[] array = queue;
 675             int n = size;
 676             for (int i = 0; i < n; i++) {
 677                 if (o == array[i]) {
 678                     removeAt(i);
 679                     break;
 680                 }
 681             }
 682         } finally {
 683             lock.unlock();
 684         }
 685     }
 686 
 687     /**
 688      * Returns {@code true} if this queue contains the specified element.
 689      * More formally, returns {@code true} if and only if this queue contains
 690      * at least one element {@code e} such that {@code o.equals(e)}.
 691      *
 692      * @param o object to be checked for containment in this queue
 693      * @return {@code true} if this queue contains the specified element
 694      */
 695     public boolean contains(Object o) {
 696         int index;
 697         final ReentrantLock lock = this.lock;
 698         lock.lock();
 699         try {
 700             index = indexOf(o);
 701         } finally {
 702             lock.unlock();
 703         }
 704         return index != -1;
 705     }
 706 
 707     /**
 708      * Returns an array containing all of the elements in this queue.
 709      * The returned array elements are in no particular order.
 710      *
 711      * <p>The returned array will be "safe" in that no references to it are
 712      * maintained by this queue.  (In other words, this method must allocate
 713      * a new array).  The caller is thus free to modify the returned array.
 714      *
 715      * <p>This method acts as bridge between array-based and collection-based
 716      * APIs.
 717      *
 718      * @return an array containing all of the elements in this queue
 719      */
 720     public Object[] toArray() {
 721         final ReentrantLock lock = this.lock;
 722         lock.lock();
 723         try {
 724             return Arrays.copyOf(queue, size);
 725         } finally {
 726             lock.unlock();
 727         }
 728     }
 729 
 730 
 731     public String toString() {
 732         final ReentrantLock lock = this.lock;
 733         lock.lock();
 734         try {
 735             int n = size;
 736             if (n == 0)
 737                 return "[]";
 738             StringBuilder sb = new StringBuilder();
 739             sb.append('[');
 740             for (int i = 0; i < n; ++i) {
 741                 E e = (E)queue[i];
 742                 sb.append(e == this ? "(this Collection)" : e);
 743                 if (i != n - 1)
 744                     sb.append(',').append(' ');
 745             }
 746             return sb.append(']').toString();
 747         } finally {
 748             lock.unlock();
 749         }
 750     }
 751 
 752     /**
 753      * @throws UnsupportedOperationException {@inheritDoc}
 754      * @throws ClassCastException            {@inheritDoc}
 755      * @throws NullPointerException          {@inheritDoc}
 756      * @throws IllegalArgumentException      {@inheritDoc}
 757      */
 758     public int drainTo(Collection<? super E> c) {
 759         if (c == null)
 760             throw new NullPointerException();
 761         if (c == this)
 762             throw new IllegalArgumentException();
 763         final ReentrantLock lock = this.lock;
 764         lock.lock();
 765         try {
 766             int n = 0;
 767             E e;
 768             while ( (e = extract()) != null) {
 769                 c.add(e);
 770                 ++n;
 771             }
 772             return n;
 773         } finally {
 774             lock.unlock();
 775         }
 776     }
 777 
 778     /**
 779      * @throws UnsupportedOperationException {@inheritDoc}
 780      * @throws ClassCastException            {@inheritDoc}
 781      * @throws NullPointerException          {@inheritDoc}
 782      * @throws IllegalArgumentException      {@inheritDoc}
 783      */
 784     public int drainTo(Collection<? super E> c, int maxElements) {
 785         if (c == null)
 786             throw new NullPointerException();
 787         if (c == this)
 788             throw new IllegalArgumentException();
 789         if (maxElements <= 0)
 790             return 0;
 791         final ReentrantLock lock = this.lock;
 792         lock.lock();
 793         try {
 794             int n = 0;
 795             E e;
 796             while (n < maxElements && (e = extract()) != null) {
 797                 c.add(e);
 798                 ++n;
 799             }
 800             return n;
 801         } finally {
 802             lock.unlock();
 803         }
 804     }
 805 
 806     /**
 807      * Atomically removes all of the elements from this queue.
 808      * The queue will be empty after this call returns.
 809      */
 810     public void clear() {
 811         final ReentrantLock lock = this.lock;
 812         lock.lock();
 813         try {
 814             Object[] array = queue;
 815             int n = size;
 816             size = 0;
 817             for (int i = 0; i < n; i++)
 818                 array[i] = null;
 819         } finally {
 820             lock.unlock();
 821         }
 822     }
 823 
 824     /**
 825      * Returns an array containing all of the elements in this queue; the
 826      * runtime type of the returned array is that of the specified array.
 827      * The returned array elements are in no particular order.
 828      * If the queue fits in the specified array, it is returned therein.
 829      * Otherwise, a new array is allocated with the runtime type of the
 830      * specified array and the size of this queue.
 831      *
 832      * <p>If this queue fits in the specified array with room to spare
 833      * (i.e., the array has more elements than this queue), the element in
 834      * the array immediately following the end of the queue is set to
 835      * {@code null}.
 836      *
 837      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 838      * array-based and collection-based APIs.  Further, this method allows
 839      * precise control over the runtime type of the output array, and may,
 840      * under certain circumstances, be used to save allocation costs.
 841      *
 842      * <p>Suppose {@code x} is a queue known to contain only strings.
 843      * The following code can be used to dump the queue into a newly
 844      * allocated array of {@code String}:
 845      *
 846      * <pre>
 847      *     String[] y = x.toArray(new String[0]);</pre>
 848      *
 849      * Note that {@code toArray(new Object[0])} is identical in function to
 850      * {@code toArray()}.
 851      *
 852      * @param a the array into which the elements of the queue are to
 853      *          be stored, if it is big enough; otherwise, a new array of the
 854      *          same runtime type is allocated for this purpose
 855      * @return an array containing all of the elements in this queue
 856      * @throws ArrayStoreException if the runtime type of the specified array
 857      *         is not a supertype of the runtime type of every element in
 858      *         this queue
 859      * @throws NullPointerException if the specified array is null
 860      */
 861     public <T> T[] toArray(T[] a) {
 862         final ReentrantLock lock = this.lock;
 863         lock.lock();
 864         try {
 865             int n = size;
 866             if (a.length < n)
 867                 // Make a new array of a's runtime type, but my contents:
 868                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
 869             System.arraycopy(queue, 0, a, 0, n);
 870             if (a.length > n)
 871                 a[n] = null;
 872             return a;
 873         } finally {
 874             lock.unlock();
 875         }
 876     }
 877 
 878     /**
 879      * Returns an iterator over the elements in this queue. The
 880      * iterator does not return the elements in any particular order.
 881      *
 882      * <p>The returned iterator is a "weakly consistent" iterator that
 883      * will never throw {@link java.util.ConcurrentModificationException
 884      * ConcurrentModificationException}, and guarantees to traverse
 885      * elements as they existed upon construction of the iterator, and
 886      * may (but is not guaranteed to) reflect any modifications
 887      * subsequent to construction.
 888      *
 889      * @return an iterator over the elements in this queue
 890      */
 891     public Iterator<E> iterator() {
 892         return new Itr(toArray());
 893     }
 894 
 895     /**
 896      * Snapshot iterator that works off copy of underlying q array.
 897      */
 898     final class Itr implements Iterator<E> {
 899         final Object[] array; // Array of all elements
 900         int cursor;           // index of next element to return;
 901         int lastRet;          // index of last element, or -1 if no such
 902 
 903         Itr(Object[] array) {
 904             lastRet = -1;
 905             this.array = array;
 906         }
 907 
 908         public boolean hasNext() {
 909             return cursor < array.length;
 910         }
 911 
 912         public E next() {
 913             if (cursor >= array.length)
 914                 throw new NoSuchElementException();
 915             lastRet = cursor;
 916             return (E)array[cursor++];
 917         }
 918 
 919         public void remove() {
 920             if (lastRet < 0)
 921                 throw new IllegalStateException();
 922             removeEQ(array[lastRet]);
 923             lastRet = -1;
 924         }
 925     }
 926 
 927     /**
 928      * Saves the state to a stream (that is, serializes it).  For
 929      * compatibility with previous version of this class,
 930      * elements are first copied to a java.util.PriorityQueue,
 931      * which is then serialized.
 932      */
 933     private void writeObject(java.io.ObjectOutputStream s)
 934         throws java.io.IOException {
 935         lock.lock();
 936         try {
 937             int n = size; // avoid zero capacity argument
 938             q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
 939             q.addAll(this);
 940             s.defaultWriteObject();
 941         } finally {
 942             q = null;
 943             lock.unlock();
 944         }
 945     }
 946 
 947     /**
 948      * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
 949      * (that is, deserializes it).
 950      *
 951      * @param s the stream
 952      */
 953     private void readObject(java.io.ObjectInputStream s)
 954         throws java.io.IOException, ClassNotFoundException {
 955         try {
 956             s.defaultReadObject();
 957             this.queue = new Object[q.size()];
 958             comparator = q.comparator();
 959             addAll(q);
 960         } finally {
 961             q = null;
 962         }
 963     }
 964 
 965     // Unsafe mechanics
 966     private static final sun.misc.Unsafe UNSAFE;
 967     private static final long allocationSpinLockOffset;
 968     static {
 969         try {
 970             UNSAFE = sun.misc.Unsafe.getUnsafe();
 971             Class k = PriorityBlockingQueue.class;
 972             allocationSpinLockOffset = UNSAFE.objectFieldOffset
 973                 (k.getDeclaredField("allocationSpinLock"));
 974         } catch (Exception e) {
 975             throw new Error(e);
 976         }
 977     }
 978 }