1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.locks.*;

  39 import java.util.*;
  40 
  41 /**
  42  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  43  * the same ordering rules as class {@link PriorityQueue} and supplies
  44  * blocking retrieval operations.  While this queue is logically
  45  * unbounded, attempted additions may fail due to resource exhaustion
  46  * (causing {@code OutOfMemoryError}). This class does not permit
  47  * {@code null} elements.  A priority queue relying on {@linkplain
  48  * Comparable natural ordering} also does not permit insertion of
  49  * non-comparable objects (doing so results in
  50  * {@code ClassCastException}).
  51  *
  52  * <p>This class and its iterator implement all of the
  53  * <em>optional</em> methods of the {@link Collection} and {@link
  54  * Iterator} interfaces.  The Iterator provided in method {@link
  55  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
  56  * the PriorityBlockingQueue in any particular order. If you need
  57  * ordered traversal, consider using
  58  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
  59  * can be used to <em>remove</em> some or all elements in priority
  60  * order and place them in another collection.
  61  *
  62  * <p>Operations on this class make no guarantees about the ordering
  63  * of elements with equal priority. If you need to enforce an
  64  * ordering, you can define custom classes or comparators that use a
  65  * secondary key to break ties in primary priority values.  For
  66  * example, here is a class that applies first-in-first-out
  67  * tie-breaking to comparable elements. To use it, you would insert a
  68  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
  69  *
  70  *  <pre> {@code
  71  * class FIFOEntry<E extends Comparable<? super E>>
  72  *     implements Comparable<FIFOEntry<E>> {
  73  *   static final AtomicLong seq = new AtomicLong(0);
  74  *   final long seqNum;
  75  *   final E entry;
  76  *   public FIFOEntry(E entry) {
  77  *     seqNum = seq.getAndIncrement();
  78  *     this.entry = entry;
  79  *   }
  80  *   public E getEntry() { return entry; }
  81  *   public int compareTo(FIFOEntry<E> other) {
  82  *     int res = entry.compareTo(other.entry);
  83  *     if (res == 0 && other.entry != this.entry)
  84  *       res = (seqNum < other.seqNum ? -1 : 1);
  85  *     return res;
  86  *   }
  87  * }}</pre>
  88  *
  89  * <p>This class is a member of the
  90  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  91  * Java Collections Framework</a>.
  92  *
  93  * @since 1.5
  94  * @author Doug Lea
  95  * @param <E> the type of elements held in this collection
  96  */
  97 @SuppressWarnings("unchecked")
  98 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
  99     implements BlockingQueue<E>, java.io.Serializable {
 100     private static final long serialVersionUID = 5595510919245408276L;
 101 
 102     /*
 103      * The implementation uses an array-based binary heap, with public
 104      * operations protected with a single lock. However, allocation
 105      * during resizing uses a simple spinlock (used only while not
 106      * holding main lock) in order to allow takes to operate
 107      * concurrently with allocation.  This avoids repeated
 108      * postponement of waiting consumers and consequent element
 109      * build-up. The need to back away from lock during allocation
 110      * makes it impossible to simply wrap delegated
 111      * java.util.PriorityQueue operations within a lock, as was done
 112      * in a previous version of this class. To maintain
 113      * interoperability, a plain PriorityQueue is still used during
 114      * serialization, which maintains compatibility at the espense of
 115      * transiently doubling overhead.
 116      */
 117 
 118     /**
 119      * Default array capacity.
 120      */
 121     private static final int DEFAULT_INITIAL_CAPACITY = 11;
 122 
 123     /**
 124      * The maximum size of array to allocate.
 125      * Some VMs reserve some header words in an array.
 126      * Attempts to allocate larger arrays may result in
 127      * OutOfMemoryError: Requested array size exceeds VM limit
 128      */
 129     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 130 
 131     /**
 132      * Priority queue represented as a balanced binary heap: the two
 133      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 134      * priority queue is ordered by comparator, or by the elements'
 135      * natural ordering, if comparator is null: For each node n in the
 136      * heap and each descendant d of n, n <= d.  The element with the
 137      * lowest value is in queue[0], assuming the queue is nonempty.
 138      */
 139     private transient Object[] queue;
 140 
 141     /**
 142      * The number of elements in the priority queue.
 143      */
 144     private transient int size;
 145 
 146     /**
 147      * The comparator, or null if priority queue uses elements'
 148      * natural ordering.
 149      */
 150     private transient Comparator<? super E> comparator;
 151 
 152     /**
 153      * Lock used for all public operations
 154      */
 155     private final ReentrantLock lock;
 156 
 157     /**
 158      * Condition for blocking when empty
 159      */
 160     private final Condition notEmpty;
 161 
 162     /**
 163      * Spinlock for allocation, acquired via CAS.
 164      */
 165     private transient volatile int allocationSpinLock;
 166 
 167     /**
 168      * A plain PriorityQueue used only for serialization,
 169      * to maintain compatibility with previous versions
 170      * of this class. Non-null only during serialization/deserialization.
 171      */
 172     private PriorityQueue<E> q;
 173 
 174     /**
 175      * Creates a {@code PriorityBlockingQueue} with the default
 176      * initial capacity (11) that orders its elements according to
 177      * their {@linkplain Comparable natural ordering}.
 178      */
 179     public PriorityBlockingQueue() {
 180         this(DEFAULT_INITIAL_CAPACITY, null);
 181     }
 182 
 183     /**
 184      * Creates a {@code PriorityBlockingQueue} with the specified
 185      * initial capacity that orders its elements according to their
 186      * {@linkplain Comparable natural ordering}.
 187      *
 188      * @param initialCapacity the initial capacity for this priority queue
 189      * @throws IllegalArgumentException if {@code initialCapacity} is less
 190      *         than 1
 191      */
 192     public PriorityBlockingQueue(int initialCapacity) {
 193         this(initialCapacity, null);
 194     }
 195 
 196     /**
 197      * Creates a {@code PriorityBlockingQueue} with the specified initial
 198      * capacity that orders its elements according to the specified
 199      * comparator.
 200      *
 201      * @param initialCapacity the initial capacity for this priority queue
 202      * @param  comparator the comparator that will be used to order this
 203      *         priority queue.  If {@code null}, the {@linkplain Comparable
 204      *         natural ordering} of the elements will be used.
 205      * @throws IllegalArgumentException if {@code initialCapacity} is less
 206      *         than 1
 207      */
 208     public PriorityBlockingQueue(int initialCapacity,
 209                                  Comparator<? super E> comparator) {
 210         if (initialCapacity < 1)
 211             throw new IllegalArgumentException();
 212         this.lock = new ReentrantLock();
 213         this.notEmpty = lock.newCondition();
 214         this.comparator = comparator;
 215         this.queue = new Object[initialCapacity];
 216     }
 217 
 218     /**
 219      * Creates a {@code PriorityBlockingQueue} containing the elements
 220      * in the specified collection.  If the specified collection is a
 221      * {@link SortedSet} or a {@link PriorityQueue},  this
 222      * priority queue will be ordered according to the same ordering.
 223      * Otherwise, this priority queue will be ordered according to the
 224      * {@linkplain Comparable natural ordering} of its elements.
 225      *
 226      * @param  c the collection whose elements are to be placed
 227      *         into this priority queue
 228      * @throws ClassCastException if elements of the specified collection
 229      *         cannot be compared to one another according to the priority
 230      *         queue's ordering
 231      * @throws NullPointerException if the specified collection or any
 232      *         of its elements are null
 233      */
 234     public PriorityBlockingQueue(Collection<? extends E> c) {
 235         this.lock = new ReentrantLock();
 236         this.notEmpty = lock.newCondition();
 237         boolean heapify = true; // true if not known to be in heap order
 238         boolean screen = true;  // true if must screen for nulls
 239         if (c instanceof SortedSet<?>) {
 240             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
 241             this.comparator = (Comparator<? super E>) ss.comparator();
 242             heapify = false;
 243         }
 244         else if (c instanceof PriorityBlockingQueue<?>) {
 245             PriorityBlockingQueue<? extends E> pq =
 246                 (PriorityBlockingQueue<? extends E>) c;
 247             this.comparator = (Comparator<? super E>) pq.comparator();
 248             screen = false;
 249             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
 250                 heapify = false;
 251         }
 252         Object[] a = c.toArray();
 253         int n = a.length;
 254         // If c.toArray incorrectly doesn't return Object[], copy it.
 255         if (a.getClass() != Object[].class)
 256             a = Arrays.copyOf(a, n, Object[].class);
 257         if (screen && (n == 1 || this.comparator != null)) {
 258             for (int i = 0; i < n; ++i)
 259                 if (a[i] == null)
 260                     throw new NullPointerException();
 261         }
 262         this.queue = a;
 263         this.size = n;
 264         if (heapify)
 265             heapify();
 266     }
 267 
 268     /**
 269      * Tries to grow array to accommodate at least one more element
 270      * (but normally expand by about 50%), giving up (allowing retry)
 271      * on contention (which we expect to be rare). Call only while
 272      * holding lock.
 273      *
 274      * @param array the heap array
 275      * @param oldCap the length of the array
 276      */
 277     private void tryGrow(Object[] array, int oldCap) {
 278         lock.unlock(); // must release and then re-acquire main lock
 279         Object[] newArray = null;
 280         if (allocationSpinLock == 0 &&
 281             UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
 282                                      0, 1)) {
 283             try {
 284                 int newCap = oldCap + ((oldCap < 64) ?
 285                                        (oldCap + 2) : // grow faster if small
 286                                        (oldCap >> 1));
 287                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
 288                     int minCap = oldCap + 1;
 289                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
 290                         throw new OutOfMemoryError();
 291                     newCap = MAX_ARRAY_SIZE;
 292                 }
 293                 if (newCap > oldCap && queue == array)
 294                     newArray = new Object[newCap];
 295             } finally {
 296                 allocationSpinLock = 0;
 297             }
 298         }
 299         if (newArray == null) // back off if another thread is allocating
 300             Thread.yield();
 301         lock.lock();
 302         if (newArray != null && queue == array) {
 303             queue = newArray;
 304             System.arraycopy(array, 0, newArray, 0, oldCap);
 305         }
 306     }
 307 
 308     /**
 309      * Mechanics for poll().  Call only while holding lock.
 310      */
 311     private E extract() {
 312         E result;
 313         int n = size - 1;
 314         if (n < 0)
 315             result = null;
 316         else {
 317             Object[] array = queue;
 318             result = (E) array[0];
 319             E x = (E) array[n];
 320             array[n] = null;
 321             Comparator<? super E> cmp = comparator;
 322             if (cmp == null)
 323                 siftDownComparable(0, x, array, n);
 324             else
 325                 siftDownUsingComparator(0, x, array, n, cmp);
 326             size = n;
 327         }
 328         return result;
 329     }

 330 
 331     /**
 332      * Inserts item x at position k, maintaining heap invariant by
 333      * promoting x up the tree until it is greater than or equal to
 334      * its parent, or is the root.
 335      *
 336      * To simplify and speed up coercions and comparisons. the
 337      * Comparable and Comparator versions are separated into different
 338      * methods that are otherwise identical. (Similarly for siftDown.)
 339      * These methods are static, with heap state as arguments, to
 340      * simplify use in light of possible comparator exceptions.
 341      *
 342      * @param k the position to fill
 343      * @param x the item to insert
 344      * @param array the heap array
 345      * @param n heap size
 346      */
 347     private static <T> void siftUpComparable(int k, T x, Object[] array) {
 348         Comparable<? super T> key = (Comparable<? super T>) x;
 349         while (k > 0) {
 350             int parent = (k - 1) >>> 1;
 351             Object e = array[parent];
 352             if (key.compareTo((T) e) >= 0)
 353                 break;
 354             array[k] = e;
 355             k = parent;
 356         }
 357         array[k] = key;
 358     }
 359 
 360     private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
 361                                        Comparator<? super T> cmp) {
 362         while (k > 0) {
 363             int parent = (k - 1) >>> 1;
 364             Object e = array[parent];
 365             if (cmp.compare(x, (T) e) >= 0)
 366                 break;
 367             array[k] = e;
 368             k = parent;
 369         }
 370         array[k] = x;
 371     }
 372 
 373     /**
 374      * Inserts item x at position k, maintaining heap invariant by
 375      * demoting x down the tree repeatedly until it is less than or
 376      * equal to its children or is a leaf.
 377      *
 378      * @param k the position to fill
 379      * @param x the item to insert
 380      * @param array the heap array
 381      * @param n heap size
 382      */
 383     private static <T> void siftDownComparable(int k, T x, Object[] array,
 384                                                int n) {

 385         Comparable<? super T> key = (Comparable<? super T>)x;
 386         int half = n >>> 1;           // loop while a non-leaf
 387         while (k < half) {
 388             int child = (k << 1) + 1; // assume left child is least
 389             Object c = array[child];
 390             int right = child + 1;
 391             if (right < n &&
 392                 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
 393                 c = array[child = right];
 394             if (key.compareTo((T) c) <= 0)
 395                 break;
 396             array[k] = c;
 397             k = child;
 398         }
 399         array[k] = key;
 400     }

 401 
 402     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
 403                                                     int n,
 404                                                     Comparator<? super T> cmp) {

 405         int half = n >>> 1;
 406         while (k < half) {
 407             int child = (k << 1) + 1;
 408             Object c = array[child];
 409             int right = child + 1;
 410             if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
 411                 c = array[child = right];
 412             if (cmp.compare(x, (T) c) <= 0)
 413                 break;
 414             array[k] = c;
 415             k = child;
 416         }
 417         array[k] = x;
 418     }

 419 
 420     /**
 421      * Establishes the heap invariant (described above) in the entire tree,
 422      * assuming nothing about the order of the elements prior to the call.
 423      */
 424     private void heapify() {
 425         Object[] array = queue;
 426         int n = size;
 427         int half = (n >>> 1) - 1;
 428         Comparator<? super E> cmp = comparator;
 429         if (cmp == null) {
 430             for (int i = half; i >= 0; i--)
 431                 siftDownComparable(i, (E) array[i], array, n);
 432         }
 433         else {
 434             for (int i = half; i >= 0; i--)
 435                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
 436         }
 437     }
 438 
 439     /**
 440      * Inserts the specified element into this priority queue.
 441      *
 442      * @param e the element to add
 443      * @return {@code true} (as specified by {@link Collection#add})
 444      * @throws ClassCastException if the specified element cannot be compared
 445      *         with elements currently in the priority queue according to the
 446      *         priority queue's ordering
 447      * @throws NullPointerException if the specified element is null
 448      */
 449     public boolean add(E e) {
 450         return offer(e);
 451     }
 452 
 453     /**
 454      * Inserts the specified element into this priority queue.
 455      * As the queue is unbounded, this method will never return {@code false}.
 456      *
 457      * @param e the element to add
 458      * @return {@code true} (as specified by {@link Queue#offer})
 459      * @throws ClassCastException if the specified element cannot be compared
 460      *         with elements currently in the priority queue according to the
 461      *         priority queue's ordering
 462      * @throws NullPointerException if the specified element is null
 463      */
 464     public boolean offer(E e) {
 465         if (e == null)
 466             throw new NullPointerException();
 467         final ReentrantLock lock = this.lock;
 468         lock.lock();
 469         int n, cap;
 470         Object[] array;
 471         while ((n = size) >= (cap = (array = queue).length))
 472             tryGrow(array, cap);
 473         try {
 474             Comparator<? super E> cmp = comparator;
 475             if (cmp == null)
 476                 siftUpComparable(n, e, array);
 477             else
 478                 siftUpUsingComparator(n, e, array, cmp);
 479             size = n + 1;
 480             notEmpty.signal();
 481         } finally {
 482             lock.unlock();
 483         }
 484         return true;
 485     }
 486 
 487     /**
 488      * Inserts the specified element into this priority queue.
 489      * As the queue is unbounded, this method will never block.
 490      *
 491      * @param e the element to add
 492      * @throws ClassCastException if the specified element cannot be compared
 493      *         with elements currently in the priority queue according to the
 494      *         priority queue's ordering
 495      * @throws NullPointerException if the specified element is null
 496      */
 497     public void put(E e) {
 498         offer(e); // never need to block
 499     }
 500 
 501     /**
 502      * Inserts the specified element into this priority queue.
 503      * As the queue is unbounded, this method will never block or
 504      * return {@code false}.
 505      *
 506      * @param e the element to add
 507      * @param timeout This parameter is ignored as the method never blocks
 508      * @param unit This parameter is ignored as the method never blocks
 509      * @return {@code true} (as specified by
 510      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
 511      * @throws ClassCastException if the specified element cannot be compared
 512      *         with elements currently in the priority queue according to the
 513      *         priority queue's ordering
 514      * @throws NullPointerException if the specified element is null
 515      */
 516     public boolean offer(E e, long timeout, TimeUnit unit) {
 517         return offer(e); // never need to block
 518     }
 519 
 520     public E poll() {
 521         final ReentrantLock lock = this.lock;
 522         lock.lock();
 523         E result;
 524         try {
 525             result = extract();
 526         } finally {
 527             lock.unlock();
 528         }
 529         return result;
 530     }
 531 
 532     public E take() throws InterruptedException {
 533         final ReentrantLock lock = this.lock;
 534         lock.lockInterruptibly();
 535         E result;
 536         try {
 537             while ( (result = extract()) == null)
 538                 notEmpty.await();
 539         } finally {
 540             lock.unlock();
 541         }
 542         return result;
 543     }
 544 
 545     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 546         long nanos = unit.toNanos(timeout);
 547         final ReentrantLock lock = this.lock;
 548         lock.lockInterruptibly();
 549         E result;
 550         try {
 551             while ( (result = extract()) == null && nanos > 0)
 552                 nanos = notEmpty.awaitNanos(nanos);
 553         } finally {
 554             lock.unlock();
 555         }
 556         return result;
 557     }
 558 
 559     public E peek() {
 560         final ReentrantLock lock = this.lock;
 561         lock.lock();
 562         E result;
 563         try {
 564             result = size > 0 ? (E) queue[0] : null;
 565         } finally {
 566             lock.unlock();
 567         }
 568         return result;
 569     }
 570 
 571     /**
 572      * Returns the comparator used to order the elements in this queue,
 573      * or {@code null} if this queue uses the {@linkplain Comparable
 574      * natural ordering} of its elements.
 575      *
 576      * @return the comparator used to order the elements in this queue,
 577      *         or {@code null} if this queue uses the natural
 578      *         ordering of its elements
 579      */
 580     public Comparator<? super E> comparator() {
 581         return comparator;
 582     }
 583 
 584     public int size() {
 585         final ReentrantLock lock = this.lock;
 586         lock.lock();
 587         try {
 588             return size;
 589         } finally {
 590             lock.unlock();
 591         }
 592     }
 593 
 594     /**
 595      * Always returns {@code Integer.MAX_VALUE} because
 596      * a {@code PriorityBlockingQueue} is not capacity constrained.
 597      * @return {@code Integer.MAX_VALUE} always
 598      */
 599     public int remainingCapacity() {
 600         return Integer.MAX_VALUE;
 601     }
 602 
 603     private int indexOf(Object o) {
 604         if (o != null) {
 605             Object[] array = queue;
 606             int n = size;
 607             for (int i = 0; i < n; i++)
 608                 if (o.equals(array[i]))
 609                     return i;
 610         }
 611         return -1;
 612     }
 613 
 614     /**
 615      * Removes the ith element from queue.
 616      */
 617     private void removeAt(int i) {
 618         Object[] array = queue;
 619         int n = size - 1;
 620         if (n == i) // removed last element
 621             array[i] = null;
 622         else {
 623             E moved = (E) array[n];
 624             array[n] = null;
 625             Comparator<? super E> cmp = comparator;
 626             if (cmp == null)
 627                 siftDownComparable(i, moved, array, n);
 628             else
 629                 siftDownUsingComparator(i, moved, array, n, cmp);
 630             if (array[i] == moved) {
 631                 if (cmp == null)
 632                     siftUpComparable(i, moved, array);
 633                 else
 634                     siftUpUsingComparator(i, moved, array, cmp);
 635             }
 636         }
 637         size = n;
 638     }
 639 
 640     /**
 641      * Removes a single instance of the specified element from this queue,
 642      * if it is present.  More formally, removes an element {@code e} such
 643      * that {@code o.equals(e)}, if this queue contains one or more such
 644      * elements.  Returns {@code true} if and only if this queue contained
 645      * the specified element (or equivalently, if this queue changed as a
 646      * result of the call).
 647      *
 648      * @param o element to be removed from this queue, if present
 649      * @return {@code true} if this queue changed as a result of the call
 650      */
 651     public boolean remove(Object o) {
 652         boolean removed = false;
 653         final ReentrantLock lock = this.lock;
 654         lock.lock();
 655         try {
 656             int i = indexOf(o);
 657             if (i != -1) {

 658                 removeAt(i);
 659                 removed = true;
 660             }
 661         } finally {
 662             lock.unlock();
 663         }
 664         return removed;
 665     }
 666 
 667 
 668     /**
 669      * Identity-based version for use in Itr.remove
 670      */
 671     private void removeEQ(Object o) {
 672         final ReentrantLock lock = this.lock;
 673         lock.lock();
 674         try {
 675             Object[] array = queue;
 676             int n = size;
 677             for (int i = 0; i < n; i++) {
 678                 if (o == array[i]) {
 679                     removeAt(i);
 680                     break;
 681                 }
 682             }
 683         } finally {
 684             lock.unlock();
 685         }
 686     }
 687 
 688     /**
 689      * Returns {@code true} if this queue contains the specified element.
 690      * More formally, returns {@code true} if and only if this queue contains
 691      * at least one element {@code e} such that {@code o.equals(e)}.
 692      *
 693      * @param o object to be checked for containment in this queue
 694      * @return {@code true} if this queue contains the specified element
 695      */
 696     public boolean contains(Object o) {
 697         int index;
 698         final ReentrantLock lock = this.lock;
 699         lock.lock();
 700         try {
 701             index = indexOf(o);
 702         } finally {
 703             lock.unlock();
 704         }
 705         return index != -1;
 706     }
 707 
 708     /**
 709      * Returns an array containing all of the elements in this queue.
 710      * The returned array elements are in no particular order.
 711      *
 712      * <p>The returned array will be "safe" in that no references to it are
 713      * maintained by this queue.  (In other words, this method must allocate
 714      * a new array).  The caller is thus free to modify the returned array.
 715      *
 716      * <p>This method acts as bridge between array-based and collection-based
 717      * APIs.
 718      *
 719      * @return an array containing all of the elements in this queue
 720      */
 721     public Object[] toArray() {
 722         final ReentrantLock lock = this.lock;
 723         lock.lock();
 724         try {
 725             return Arrays.copyOf(queue, size);
 726         } finally {
 727             lock.unlock();
 728         }
 729     }
 730 
 731 
 732     public String toString() {
 733         final ReentrantLock lock = this.lock;
 734         lock.lock();
 735         try {
 736             int n = size;
 737             if (n == 0)
 738                 return "[]";
 739             StringBuilder sb = new StringBuilder();
 740             sb.append('[');
 741             for (int i = 0; i < n; ++i) {
 742                 E e = (E)queue[i];
 743                 sb.append(e == this ? "(this Collection)" : e);
 744                 if (i != n - 1)
 745                     sb.append(',').append(' ');
 746             }
 747             return sb.append(']').toString();
 748         } finally {
 749             lock.unlock();
 750         }
 751     }
 752 
 753     /**
 754      * @throws UnsupportedOperationException {@inheritDoc}
 755      * @throws ClassCastException            {@inheritDoc}
 756      * @throws NullPointerException          {@inheritDoc}
 757      * @throws IllegalArgumentException      {@inheritDoc}
 758      */
 759     public int drainTo(Collection<? super E> c) {
 760         if (c == null)
 761             throw new NullPointerException();
 762         if (c == this)
 763             throw new IllegalArgumentException();
 764         final ReentrantLock lock = this.lock;
 765         lock.lock();
 766         try {
 767             int n = 0;
 768             E e;
 769             while ( (e = extract()) != null) {
 770                 c.add(e);
 771                 ++n;
 772             }
 773             return n;
 774         } finally {
 775             lock.unlock();
 776         }
 777     }
 778 
 779     /**
 780      * @throws UnsupportedOperationException {@inheritDoc}
 781      * @throws ClassCastException            {@inheritDoc}
 782      * @throws NullPointerException          {@inheritDoc}
 783      * @throws IllegalArgumentException      {@inheritDoc}
 784      */
 785     public int drainTo(Collection<? super E> c, int maxElements) {
 786         if (c == null)
 787             throw new NullPointerException();
 788         if (c == this)
 789             throw new IllegalArgumentException();
 790         if (maxElements <= 0)
 791             return 0;
 792         final ReentrantLock lock = this.lock;
 793         lock.lock();
 794         try {
 795             int n = 0;
 796             E e;
 797             while (n < maxElements && (e = extract()) != null) {
 798                 c.add(e);
 799                 ++n;
 800             }
 801             return n;
 802         } finally {
 803             lock.unlock();
 804         }
 805     }
 806 
 807     /**
 808      * Atomically removes all of the elements from this queue.
 809      * The queue will be empty after this call returns.
 810      */
 811     public void clear() {
 812         final ReentrantLock lock = this.lock;
 813         lock.lock();
 814         try {
 815             Object[] array = queue;
 816             int n = size;
 817             size = 0;
 818             for (int i = 0; i < n; i++)
 819                 array[i] = null;
 820         } finally {
 821             lock.unlock();
 822         }
 823     }
 824 
 825     /**
 826      * Returns an array containing all of the elements in this queue; the
 827      * runtime type of the returned array is that of the specified array.
 828      * The returned array elements are in no particular order.
 829      * If the queue fits in the specified array, it is returned therein.
 830      * Otherwise, a new array is allocated with the runtime type of the
 831      * specified array and the size of this queue.
 832      *
 833      * <p>If this queue fits in the specified array with room to spare
 834      * (i.e., the array has more elements than this queue), the element in
 835      * the array immediately following the end of the queue is set to
 836      * {@code null}.
 837      *
 838      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 839      * array-based and collection-based APIs.  Further, this method allows
 840      * precise control over the runtime type of the output array, and may,
 841      * under certain circumstances, be used to save allocation costs.
 842      *
 843      * <p>Suppose {@code x} is a queue known to contain only strings.
 844      * The following code can be used to dump the queue into a newly
 845      * allocated array of {@code String}:
 846      *
 847      * <pre>
 848      *     String[] y = x.toArray(new String[0]);</pre>
 849      *
 850      * Note that {@code toArray(new Object[0])} is identical in function to
 851      * {@code toArray()}.
 852      *
 853      * @param a the array into which the elements of the queue are to
 854      *          be stored, if it is big enough; otherwise, a new array of the
 855      *          same runtime type is allocated for this purpose
 856      * @return an array containing all of the elements in this queue
 857      * @throws ArrayStoreException if the runtime type of the specified array
 858      *         is not a supertype of the runtime type of every element in
 859      *         this queue
 860      * @throws NullPointerException if the specified array is null
 861      */
 862     public <T> T[] toArray(T[] a) {
 863         final ReentrantLock lock = this.lock;
 864         lock.lock();
 865         try {
 866             int n = size;
 867             if (a.length < n)
 868                 // Make a new array of a's runtime type, but my contents:
 869                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
 870             System.arraycopy(queue, 0, a, 0, n);
 871             if (a.length > n)
 872                 a[n] = null;
 873             return a;
 874         } finally {
 875             lock.unlock();
 876         }
 877     }
 878 
 879     /**
 880      * Returns an iterator over the elements in this queue. The
 881      * iterator does not return the elements in any particular order.
 882      *
 883      * <p>The returned iterator is a "weakly consistent" iterator that
 884      * will never throw {@link java.util.ConcurrentModificationException
 885      * ConcurrentModificationException}, and guarantees to traverse
 886      * elements as they existed upon construction of the iterator, and
 887      * may (but is not guaranteed to) reflect any modifications
 888      * subsequent to construction.
 889      *
 890      * @return an iterator over the elements in this queue
 891      */
 892     public Iterator<E> iterator() {
 893         return new Itr(toArray());
 894     }
 895 
 896     /**
 897      * Snapshot iterator that works off copy of underlying q array.
 898      */
 899     final class Itr implements Iterator<E> {
 900         final Object[] array; // Array of all elements
 901         int cursor;           // index of next element to return;
 902         int lastRet;          // index of last element, or -1 if no such
 903 
 904         Itr(Object[] array) {
 905             lastRet = -1;
 906             this.array = array;
 907         }
 908 
 909         public boolean hasNext() {
 910             return cursor < array.length;
 911         }
 912 
 913         public E next() {
 914             if (cursor >= array.length)
 915                 throw new NoSuchElementException();
 916             lastRet = cursor;
 917             return (E)array[cursor++];
 918         }
 919 
 920         public void remove() {
 921             if (lastRet < 0)
 922                 throw new IllegalStateException();
 923             removeEQ(array[lastRet]);
 924             lastRet = -1;
 925         }
 926     }
 927 
 928     /**
 929      * Saves the state to a stream (that is, serializes it).  For
 930      * compatibility with previous version of this class,
 931      * elements are first copied to a java.util.PriorityQueue,
 932      * which is then serialized.

 933      */
 934     private void writeObject(java.io.ObjectOutputStream s)
 935         throws java.io.IOException {
 936         lock.lock();
 937         try {
 938             int n = size; // avoid zero capacity argument
 939             q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
 940             q.addAll(this);
 941             s.defaultWriteObject();
 942         } finally {
 943             q = null;
 944             lock.unlock();
 945         }
 946     }
 947 
 948     /**
 949      * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
 950      * (that is, deserializes it).
 951      *
 952      * @param s the stream
 953      */
 954     private void readObject(java.io.ObjectInputStream s)
 955         throws java.io.IOException, ClassNotFoundException {
 956         try {
 957             s.defaultReadObject();
 958             this.queue = new Object[q.size()];
 959             comparator = q.comparator();
 960             addAll(q);
 961         } finally {
 962             q = null;
 963         }
 964     }
 965 
 966     // Unsafe mechanics
 967     private static final sun.misc.Unsafe UNSAFE;
 968     private static final long allocationSpinLockOffset;
 969     static {
 970         try {
 971             UNSAFE = sun.misc.Unsafe.getUnsafe();
 972             Class<?> k = PriorityBlockingQueue.class;
 973             allocationSpinLockOffset = UNSAFE.objectFieldOffset
 974                 (k.getDeclaredField("allocationSpinLock"));
 975         } catch (Exception e) {
 976             throw new Error(e);
 977         }
 978     }
 979 }
--- EOF ---