src/share/classes/java/util/concurrent/PriorityBlockingQueue.java
Index Unified diffs Context diffs Sdiffs Wdiffs Patch New Old Previous File Next File jdk-7161229 Sdiff src/share/classes/java/util/concurrent

src/share/classes/java/util/concurrent/PriorityBlockingQueue.java

Print this page




  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.locks.*;

  39 import java.util.*;
  40 
  41 /**
  42  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  43  * the same ordering rules as class {@link PriorityQueue} and supplies
  44  * blocking retrieval operations.  While this queue is logically
  45  * unbounded, attempted additions may fail due to resource exhaustion
  46  * (causing {@code OutOfMemoryError}). This class does not permit
  47  * {@code null} elements.  A priority queue relying on {@linkplain
  48  * Comparable natural ordering} also does not permit insertion of
  49  * non-comparable objects (doing so results in
  50  * {@code ClassCastException}).
  51  *
  52  * <p>This class and its iterator implement all of the
  53  * <em>optional</em> methods of the {@link Collection} and {@link
  54  * Iterator} interfaces.  The Iterator provided in method {@link
  55  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
  56  * the PriorityBlockingQueue in any particular order. If you need
  57  * ordered traversal, consider using
  58  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}


  94  * @author Doug Lea
  95  * @param <E> the type of elements held in this collection
  96  */
  97 @SuppressWarnings("unchecked")
  98 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
  99     implements BlockingQueue<E>, java.io.Serializable {
 100     private static final long serialVersionUID = 5595510919245408276L;
 101 
 102     /*
 103      * The implementation uses an array-based binary heap, with public
 104      * operations protected with a single lock. However, allocation
 105      * during resizing uses a simple spinlock (used only while not
 106      * holding main lock) in order to allow takes to operate
 107      * concurrently with allocation.  This avoids repeated
 108      * postponement of waiting consumers and consequent element
 109      * build-up. The need to back away from lock during allocation
 110      * makes it impossible to simply wrap delegated
 111      * java.util.PriorityQueue operations within a lock, as was done
 112      * in a previous version of this class. To maintain
 113      * interoperability, a plain PriorityQueue is still used during
 114      * serialization, which maintains compatibility at the espense of
 115      * transiently doubling overhead.
 116      */
 117 
 118     /**
 119      * Default array capacity.
 120      */
 121     private static final int DEFAULT_INITIAL_CAPACITY = 11;
 122 
 123     /**
 124      * The maximum size of array to allocate.
 125      * Some VMs reserve some header words in an array.
 126      * Attempts to allocate larger arrays may result in
 127      * OutOfMemoryError: Requested array size exceeds VM limit
 128      */
 129     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 130 
 131     /**
 132      * Priority queue represented as a balanced binary heap: the two
 133      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 134      * priority queue is ordered by comparator, or by the elements'


 291                     newCap = MAX_ARRAY_SIZE;
 292                 }
 293                 if (newCap > oldCap && queue == array)
 294                     newArray = new Object[newCap];
 295             } finally {
 296                 allocationSpinLock = 0;
 297             }
 298         }
 299         if (newArray == null) // back off if another thread is allocating
 300             Thread.yield();
 301         lock.lock();
 302         if (newArray != null && queue == array) {
 303             queue = newArray;
 304             System.arraycopy(array, 0, newArray, 0, oldCap);
 305         }
 306     }
 307 
 308     /**
 309      * Mechanics for poll().  Call only while holding lock.
 310      */
 311     private E extract() {
 312         E result;
 313         int n = size - 1;
 314         if (n < 0)
 315             result = null;
 316         else {
 317             Object[] array = queue;
 318             result = (E) array[0];
 319             E x = (E) array[n];
 320             array[n] = null;
 321             Comparator<? super E> cmp = comparator;
 322             if (cmp == null)
 323                 siftDownComparable(0, x, array, n);
 324             else
 325                 siftDownUsingComparator(0, x, array, n, cmp);
 326             size = n;
 327         }
 328         return result;
 329     }

 330 
 331     /**
 332      * Inserts item x at position k, maintaining heap invariant by
 333      * promoting x up the tree until it is greater than or equal to
 334      * its parent, or is the root.
 335      *
 336      * To simplify and speed up coercions and comparisons. the
 337      * Comparable and Comparator versions are separated into different
 338      * methods that are otherwise identical. (Similarly for siftDown.)
 339      * These methods are static, with heap state as arguments, to
 340      * simplify use in light of possible comparator exceptions.
 341      *
 342      * @param k the position to fill
 343      * @param x the item to insert
 344      * @param array the heap array
 345      * @param n heap size
 346      */
 347     private static <T> void siftUpComparable(int k, T x, Object[] array) {
 348         Comparable<? super T> key = (Comparable<? super T>) x;
 349         while (k > 0) {


 365             if (cmp.compare(x, (T) e) >= 0)
 366                 break;
 367             array[k] = e;
 368             k = parent;
 369         }
 370         array[k] = x;
 371     }
 372 
 373     /**
 374      * Inserts item x at position k, maintaining heap invariant by
 375      * demoting x down the tree repeatedly until it is less than or
 376      * equal to its children or is a leaf.
 377      *
 378      * @param k the position to fill
 379      * @param x the item to insert
 380      * @param array the heap array
 381      * @param n heap size
 382      */
 383     private static <T> void siftDownComparable(int k, T x, Object[] array,
 384                                                int n) {

 385         Comparable<? super T> key = (Comparable<? super T>)x;
 386         int half = n >>> 1;           // loop while a non-leaf
 387         while (k < half) {
 388             int child = (k << 1) + 1; // assume left child is least
 389             Object c = array[child];
 390             int right = child + 1;
 391             if (right < n &&
 392                 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
 393                 c = array[child = right];
 394             if (key.compareTo((T) c) <= 0)
 395                 break;
 396             array[k] = c;
 397             k = child;
 398         }
 399         array[k] = key;
 400     }

 401 
 402     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
 403                                                     int n,
 404                                                     Comparator<? super T> cmp) {

 405         int half = n >>> 1;
 406         while (k < half) {
 407             int child = (k << 1) + 1;
 408             Object c = array[child];
 409             int right = child + 1;
 410             if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
 411                 c = array[child = right];
 412             if (cmp.compare(x, (T) c) <= 0)
 413                 break;
 414             array[k] = c;
 415             k = child;
 416         }
 417         array[k] = x;
 418     }

 419 
 420     /**
 421      * Establishes the heap invariant (described above) in the entire tree,
 422      * assuming nothing about the order of the elements prior to the call.
 423      */
 424     private void heapify() {
 425         Object[] array = queue;
 426         int n = size;
 427         int half = (n >>> 1) - 1;
 428         Comparator<? super E> cmp = comparator;
 429         if (cmp == null) {
 430             for (int i = half; i >= 0; i--)
 431                 siftDownComparable(i, (E) array[i], array, n);
 432         }
 433         else {
 434             for (int i = half; i >= 0; i--)
 435                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
 436         }
 437     }
 438 


 503      * As the queue is unbounded, this method will never block or
 504      * return {@code false}.
 505      *
 506      * @param e the element to add
 507      * @param timeout This parameter is ignored as the method never blocks
 508      * @param unit This parameter is ignored as the method never blocks
 509      * @return {@code true} (as specified by
 510      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
 511      * @throws ClassCastException if the specified element cannot be compared
 512      *         with elements currently in the priority queue according to the
 513      *         priority queue's ordering
 514      * @throws NullPointerException if the specified element is null
 515      */
 516     public boolean offer(E e, long timeout, TimeUnit unit) {
 517         return offer(e); // never need to block
 518     }
 519 
 520     public E poll() {
 521         final ReentrantLock lock = this.lock;
 522         lock.lock();
 523         E result;
 524         try {
 525             result = extract();
 526         } finally {
 527             lock.unlock();
 528         }
 529         return result;
 530     }
 531 
 532     public E take() throws InterruptedException {
 533         final ReentrantLock lock = this.lock;
 534         lock.lockInterruptibly();
 535         E result;
 536         try {
 537             while ( (result = extract()) == null)
 538                 notEmpty.await();
 539         } finally {
 540             lock.unlock();
 541         }
 542         return result;
 543     }
 544 
 545     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 546         long nanos = unit.toNanos(timeout);
 547         final ReentrantLock lock = this.lock;
 548         lock.lockInterruptibly();
 549         E result;
 550         try {
 551             while ( (result = extract()) == null && nanos > 0)
 552                 nanos = notEmpty.awaitNanos(nanos);
 553         } finally {
 554             lock.unlock();
 555         }
 556         return result;
 557     }
 558 
 559     public E peek() {
 560         final ReentrantLock lock = this.lock;
 561         lock.lock();
 562         E result;
 563         try {
 564             result = size > 0 ? (E) queue[0] : null;
 565         } finally {
 566             lock.unlock();
 567         }
 568         return result;
 569     }
 570 
 571     /**
 572      * Returns the comparator used to order the elements in this queue,
 573      * or {@code null} if this queue uses the {@linkplain Comparable
 574      * natural ordering} of its elements.
 575      *
 576      * @return the comparator used to order the elements in this queue,
 577      *         or {@code null} if this queue uses the natural
 578      *         ordering of its elements
 579      */
 580     public Comparator<? super E> comparator() {
 581         return comparator;
 582     }
 583 
 584     public int size() {
 585         final ReentrantLock lock = this.lock;
 586         lock.lock();
 587         try {
 588             return size;


 632                     siftUpComparable(i, moved, array);
 633                 else
 634                     siftUpUsingComparator(i, moved, array, cmp);
 635             }
 636         }
 637         size = n;
 638     }
 639 
 640     /**
 641      * Removes a single instance of the specified element from this queue,
 642      * if it is present.  More formally, removes an element {@code e} such
 643      * that {@code o.equals(e)}, if this queue contains one or more such
 644      * elements.  Returns {@code true} if and only if this queue contained
 645      * the specified element (or equivalently, if this queue changed as a
 646      * result of the call).
 647      *
 648      * @param o element to be removed from this queue, if present
 649      * @return {@code true} if this queue changed as a result of the call
 650      */
 651     public boolean remove(Object o) {
 652         boolean removed = false;
 653         final ReentrantLock lock = this.lock;
 654         lock.lock();
 655         try {
 656             int i = indexOf(o);
 657             if (i != -1) {

 658                 removeAt(i);
 659                 removed = true;
 660             }
 661         } finally {
 662             lock.unlock();
 663         }
 664         return removed;
 665     }
 666 
 667 
 668     /**
 669      * Identity-based version for use in Itr.remove
 670      */
 671     private void removeEQ(Object o) {
 672         final ReentrantLock lock = this.lock;
 673         lock.lock();
 674         try {
 675             Object[] array = queue;
 676             int n = size;
 677             for (int i = 0; i < n; i++) {
 678                 if (o == array[i]) {
 679                     removeAt(i);
 680                     break;
 681                 }
 682             }
 683         } finally {
 684             lock.unlock();
 685         }
 686     }
 687 
 688     /**
 689      * Returns {@code true} if this queue contains the specified element.
 690      * More formally, returns {@code true} if and only if this queue contains
 691      * at least one element {@code e} such that {@code o.equals(e)}.
 692      *
 693      * @param o object to be checked for containment in this queue
 694      * @return {@code true} if this queue contains the specified element
 695      */
 696     public boolean contains(Object o) {
 697         int index;
 698         final ReentrantLock lock = this.lock;
 699         lock.lock();
 700         try {
 701             index = indexOf(o);
 702         } finally {
 703             lock.unlock();
 704         }
 705         return index != -1;
 706     }
 707 
 708     /**
 709      * Returns an array containing all of the elements in this queue.
 710      * The returned array elements are in no particular order.
 711      *
 712      * <p>The returned array will be "safe" in that no references to it are
 713      * maintained by this queue.  (In other words, this method must allocate
 714      * a new array).  The caller is thus free to modify the returned array.
 715      *
 716      * <p>This method acts as bridge between array-based and collection-based
 717      * APIs.
 718      *
 719      * @return an array containing all of the elements in this queue
 720      */
 721     public Object[] toArray() {
 722         final ReentrantLock lock = this.lock;
 723         lock.lock();
 724         try {
 725             return Arrays.copyOf(queue, size);
 726         } finally {
 727             lock.unlock();
 728         }
 729     }
 730 
 731 
 732     public String toString() {
 733         final ReentrantLock lock = this.lock;
 734         lock.lock();
 735         try {
 736             int n = size;
 737             if (n == 0)
 738                 return "[]";
 739             StringBuilder sb = new StringBuilder();
 740             sb.append('[');
 741             for (int i = 0; i < n; ++i) {
 742                 E e = (E)queue[i];
 743                 sb.append(e == this ? "(this Collection)" : e);
 744                 if (i != n - 1)
 745                     sb.append(',').append(' ');
 746             }
 747             return sb.append(']').toString();
 748         } finally {
 749             lock.unlock();
 750         }
 751     }
 752 
 753     /**
 754      * @throws UnsupportedOperationException {@inheritDoc}
 755      * @throws ClassCastException            {@inheritDoc}
 756      * @throws NullPointerException          {@inheritDoc}
 757      * @throws IllegalArgumentException      {@inheritDoc}
 758      */
 759     public int drainTo(Collection<? super E> c) {
 760         if (c == null)
 761             throw new NullPointerException();
 762         if (c == this)
 763             throw new IllegalArgumentException();
 764         final ReentrantLock lock = this.lock;
 765         lock.lock();
 766         try {
 767             int n = 0;
 768             E e;
 769             while ( (e = extract()) != null) {
 770                 c.add(e);
 771                 ++n;
 772             }
 773             return n;
 774         } finally {
 775             lock.unlock();
 776         }
 777     }
 778 
 779     /**
 780      * @throws UnsupportedOperationException {@inheritDoc}
 781      * @throws ClassCastException            {@inheritDoc}
 782      * @throws NullPointerException          {@inheritDoc}
 783      * @throws IllegalArgumentException      {@inheritDoc}
 784      */
 785     public int drainTo(Collection<? super E> c, int maxElements) {
 786         if (c == null)
 787             throw new NullPointerException();
 788         if (c == this)
 789             throw new IllegalArgumentException();
 790         if (maxElements <= 0)
 791             return 0;
 792         final ReentrantLock lock = this.lock;
 793         lock.lock();
 794         try {
 795             int n = 0;
 796             E e;
 797             while (n < maxElements && (e = extract()) != null) {
 798                 c.add(e);
 799                 ++n;
 800             }
 801             return n;
 802         } finally {
 803             lock.unlock();
 804         }
 805     }
 806 
 807     /**
 808      * Atomically removes all of the elements from this queue.
 809      * The queue will be empty after this call returns.
 810      */
 811     public void clear() {
 812         final ReentrantLock lock = this.lock;
 813         lock.lock();
 814         try {
 815             Object[] array = queue;
 816             int n = size;
 817             size = 0;
 818             for (int i = 0; i < n; i++)
 819                 array[i] = null;


 827      * runtime type of the returned array is that of the specified array.
 828      * The returned array elements are in no particular order.
 829      * If the queue fits in the specified array, it is returned therein.
 830      * Otherwise, a new array is allocated with the runtime type of the
 831      * specified array and the size of this queue.
 832      *
 833      * <p>If this queue fits in the specified array with room to spare
 834      * (i.e., the array has more elements than this queue), the element in
 835      * the array immediately following the end of the queue is set to
 836      * {@code null}.
 837      *
 838      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 839      * array-based and collection-based APIs.  Further, this method allows
 840      * precise control over the runtime type of the output array, and may,
 841      * under certain circumstances, be used to save allocation costs.
 842      *
 843      * <p>Suppose {@code x} is a queue known to contain only strings.
 844      * The following code can be used to dump the queue into a newly
 845      * allocated array of {@code String}:
 846      *
 847      * <pre>
 848      *     String[] y = x.toArray(new String[0]);</pre>
 849      *
 850      * Note that {@code toArray(new Object[0])} is identical in function to
 851      * {@code toArray()}.
 852      *
 853      * @param a the array into which the elements of the queue are to
 854      *          be stored, if it is big enough; otherwise, a new array of the
 855      *          same runtime type is allocated for this purpose
 856      * @return an array containing all of the elements in this queue
 857      * @throws ArrayStoreException if the runtime type of the specified array
 858      *         is not a supertype of the runtime type of every element in
 859      *         this queue
 860      * @throws NullPointerException if the specified array is null
 861      */
 862     public <T> T[] toArray(T[] a) {
 863         final ReentrantLock lock = this.lock;
 864         lock.lock();
 865         try {
 866             int n = size;
 867             if (a.length < n)
 868                 // Make a new array of a's runtime type, but my contents:


 881      * iterator does not return the elements in any particular order.
 882      *
 883      * <p>The returned iterator is a "weakly consistent" iterator that
 884      * will never throw {@link java.util.ConcurrentModificationException
 885      * ConcurrentModificationException}, and guarantees to traverse
 886      * elements as they existed upon construction of the iterator, and
 887      * may (but is not guaranteed to) reflect any modifications
 888      * subsequent to construction.
 889      *
 890      * @return an iterator over the elements in this queue
 891      */
 892     public Iterator<E> iterator() {
 893         return new Itr(toArray());
 894     }
 895 
 896     /**
 897      * Snapshot iterator that works off copy of underlying q array.
 898      */
 899     final class Itr implements Iterator<E> {
 900         final Object[] array; // Array of all elements
 901         int cursor;           // index of next element to return;
 902         int lastRet;          // index of last element, or -1 if no such
 903 
 904         Itr(Object[] array) {
 905             lastRet = -1;
 906             this.array = array;
 907         }
 908 
 909         public boolean hasNext() {
 910             return cursor < array.length;
 911         }
 912 
 913         public E next() {
 914             if (cursor >= array.length)
 915                 throw new NoSuchElementException();
 916             lastRet = cursor;
 917             return (E)array[cursor++];
 918         }
 919 
 920         public void remove() {
 921             if (lastRet < 0)
 922                 throw new IllegalStateException();
 923             removeEQ(array[lastRet]);
 924             lastRet = -1;
 925         }
 926     }
 927 
 928     /**
 929      * Saves the state to a stream (that is, serializes it).  For
 930      * compatibility with previous version of this class,
 931      * elements are first copied to a java.util.PriorityQueue,
 932      * which is then serialized.

 933      */
 934     private void writeObject(java.io.ObjectOutputStream s)
 935         throws java.io.IOException {
 936         lock.lock();
 937         try {
 938             int n = size; // avoid zero capacity argument
 939             q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
 940             q.addAll(this);
 941             s.defaultWriteObject();
 942         } finally {
 943             q = null;
 944             lock.unlock();
 945         }
 946     }
 947 
 948     /**
 949      * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
 950      * (that is, deserializes it).
 951      *
 952      * @param s the stream
 953      */
 954     private void readObject(java.io.ObjectInputStream s)
 955         throws java.io.IOException, ClassNotFoundException {
 956         try {
 957             s.defaultReadObject();
 958             this.queue = new Object[q.size()];
 959             comparator = q.comparator();
 960             addAll(q);
 961         } finally {
 962             q = null;
 963         }
 964     }
 965 
 966     // Unsafe mechanics
 967     private static final sun.misc.Unsafe UNSAFE;
 968     private static final long allocationSpinLockOffset;
 969     static {
 970         try {
 971             UNSAFE = sun.misc.Unsafe.getUnsafe();
 972             Class<?> k = PriorityBlockingQueue.class;


  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.locks.Condition;
  39 import java.util.concurrent.locks.ReentrantLock;
  40 import java.util.*;
  41 
  42 /**
  43  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  44  * the same ordering rules as class {@link PriorityQueue} and supplies
  45  * blocking retrieval operations.  While this queue is logically
  46  * unbounded, attempted additions may fail due to resource exhaustion
  47  * (causing {@code OutOfMemoryError}). This class does not permit
  48  * {@code null} elements.  A priority queue relying on {@linkplain
  49  * Comparable natural ordering} also does not permit insertion of
  50  * non-comparable objects (doing so results in
  51  * {@code ClassCastException}).
  52  *
  53  * <p>This class and its iterator implement all of the
  54  * <em>optional</em> methods of the {@link Collection} and {@link
  55  * Iterator} interfaces.  The Iterator provided in method {@link
  56  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
  57  * the PriorityBlockingQueue in any particular order. If you need
  58  * ordered traversal, consider using
  59  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}


  95  * @author Doug Lea
  96  * @param <E> the type of elements held in this collection
  97  */
  98 @SuppressWarnings("unchecked")
  99 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
 100     implements BlockingQueue<E>, java.io.Serializable {
 101     private static final long serialVersionUID = 5595510919245408276L;
 102 
 103     /*
 104      * The implementation uses an array-based binary heap, with public
 105      * operations protected with a single lock. However, allocation
 106      * during resizing uses a simple spinlock (used only while not
 107      * holding main lock) in order to allow takes to operate
 108      * concurrently with allocation.  This avoids repeated
 109      * postponement of waiting consumers and consequent element
 110      * build-up. The need to back away from lock during allocation
 111      * makes it impossible to simply wrap delegated
 112      * java.util.PriorityQueue operations within a lock, as was done
 113      * in a previous version of this class. To maintain
 114      * interoperability, a plain PriorityQueue is still used during
 115      * serialization, which maintains compatibility at the expense of
 116      * transiently doubling overhead.
 117      */
 118 
 119     /**
 120      * Default array capacity.
 121      */
 122     private static final int DEFAULT_INITIAL_CAPACITY = 11;
 123 
 124     /**
 125      * The maximum size of array to allocate.
 126      * Some VMs reserve some header words in an array.
 127      * Attempts to allocate larger arrays may result in
 128      * OutOfMemoryError: Requested array size exceeds VM limit
 129      */
 130     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 131 
 132     /**
 133      * Priority queue represented as a balanced binary heap: the two
 134      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 135      * priority queue is ordered by comparator, or by the elements'


 292                     newCap = MAX_ARRAY_SIZE;
 293                 }
 294                 if (newCap > oldCap && queue == array)
 295                     newArray = new Object[newCap];
 296             } finally {
 297                 allocationSpinLock = 0;
 298             }
 299         }
 300         if (newArray == null) // back off if another thread is allocating
 301             Thread.yield();
 302         lock.lock();
 303         if (newArray != null && queue == array) {
 304             queue = newArray;
 305             System.arraycopy(array, 0, newArray, 0, oldCap);
 306         }
 307     }
 308 
 309     /**
 310      * Mechanics for poll().  Call only while holding lock.
 311      */
 312     private E dequeue() {

 313         int n = size - 1;
 314         if (n < 0)
 315             return null;
 316         else {
 317             Object[] array = queue;
 318             E result = (E) array[0];
 319             E x = (E) array[n];
 320             array[n] = null;
 321             Comparator<? super E> cmp = comparator;
 322             if (cmp == null)
 323                 siftDownComparable(0, x, array, n);
 324             else
 325                 siftDownUsingComparator(0, x, array, n, cmp);
 326             size = n;

 327             return result;
 328         }
 329     }
 330 
 331     /**
 332      * Inserts item x at position k, maintaining heap invariant by
 333      * promoting x up the tree until it is greater than or equal to
 334      * its parent, or is the root.
 335      *
 336      * To simplify and speed up coercions and comparisons. the
 337      * Comparable and Comparator versions are separated into different
 338      * methods that are otherwise identical. (Similarly for siftDown.)
 339      * These methods are static, with heap state as arguments, to
 340      * simplify use in light of possible comparator exceptions.
 341      *
 342      * @param k the position to fill
 343      * @param x the item to insert
 344      * @param array the heap array
 345      * @param n heap size
 346      */
 347     private static <T> void siftUpComparable(int k, T x, Object[] array) {
 348         Comparable<? super T> key = (Comparable<? super T>) x;
 349         while (k > 0) {


 365             if (cmp.compare(x, (T) e) >= 0)
 366                 break;
 367             array[k] = e;
 368             k = parent;
 369         }
 370         array[k] = x;
 371     }
 372 
 373     /**
 374      * Inserts item x at position k, maintaining heap invariant by
 375      * demoting x down the tree repeatedly until it is less than or
 376      * equal to its children or is a leaf.
 377      *
 378      * @param k the position to fill
 379      * @param x the item to insert
 380      * @param array the heap array
 381      * @param n heap size
 382      */
 383     private static <T> void siftDownComparable(int k, T x, Object[] array,
 384                                                int n) {
 385         if (n > 0) {
 386             Comparable<? super T> key = (Comparable<? super T>)x;
 387             int half = n >>> 1;           // loop while a non-leaf
 388             while (k < half) {
 389                 int child = (k << 1) + 1; // assume left child is least
 390                 Object c = array[child];
 391                 int right = child + 1;
 392                 if (right < n &&
 393                     ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
 394                     c = array[child = right];
 395                 if (key.compareTo((T) c) <= 0)
 396                     break;
 397                 array[k] = c;
 398                 k = child;
 399             }
 400             array[k] = key;
 401         }
 402     }
 403 
 404     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
 405                                                     int n,
 406                                                     Comparator<? super T> cmp) {
 407         if (n > 0) {
 408             int half = n >>> 1;
 409             while (k < half) {
 410                 int child = (k << 1) + 1;
 411                 Object c = array[child];
 412                 int right = child + 1;
 413                 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
 414                     c = array[child = right];
 415                 if (cmp.compare(x, (T) c) <= 0)
 416                     break;
 417                 array[k] = c;
 418                 k = child;
 419             }
 420             array[k] = x;
 421         }
 422     }
 423 
 424     /**
 425      * Establishes the heap invariant (described above) in the entire tree,
 426      * assuming nothing about the order of the elements prior to the call.
 427      */
 428     private void heapify() {
 429         Object[] array = queue;
 430         int n = size;
 431         int half = (n >>> 1) - 1;
 432         Comparator<? super E> cmp = comparator;
 433         if (cmp == null) {
 434             for (int i = half; i >= 0; i--)
 435                 siftDownComparable(i, (E) array[i], array, n);
 436         }
 437         else {
 438             for (int i = half; i >= 0; i--)
 439                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
 440         }
 441     }
 442 


 507      * As the queue is unbounded, this method will never block or
 508      * return {@code false}.
 509      *
 510      * @param e the element to add
 511      * @param timeout This parameter is ignored as the method never blocks
 512      * @param unit This parameter is ignored as the method never blocks
 513      * @return {@code true} (as specified by
 514      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
 515      * @throws ClassCastException if the specified element cannot be compared
 516      *         with elements currently in the priority queue according to the
 517      *         priority queue's ordering
 518      * @throws NullPointerException if the specified element is null
 519      */
 520     public boolean offer(E e, long timeout, TimeUnit unit) {
 521         return offer(e); // never need to block
 522     }
 523 
 524     public E poll() {
 525         final ReentrantLock lock = this.lock;
 526         lock.lock();

 527         try {
 528             return dequeue();
 529         } finally {
 530             lock.unlock();
 531         }

 532     }
 533 
 534     public E take() throws InterruptedException {
 535         final ReentrantLock lock = this.lock;
 536         lock.lockInterruptibly();
 537         E result;
 538         try {
 539             while ( (result = dequeue()) == null)
 540                 notEmpty.await();
 541         } finally {
 542             lock.unlock();
 543         }
 544         return result;
 545     }
 546 
 547     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 548         long nanos = unit.toNanos(timeout);
 549         final ReentrantLock lock = this.lock;
 550         lock.lockInterruptibly();
 551         E result;
 552         try {
 553             while ( (result = dequeue()) == null && nanos > 0)
 554                 nanos = notEmpty.awaitNanos(nanos);
 555         } finally {
 556             lock.unlock();
 557         }
 558         return result;
 559     }
 560 
 561     public E peek() {
 562         final ReentrantLock lock = this.lock;
 563         lock.lock();

 564         try {
 565             return (size == 0) ? null : (E) queue[0];
 566         } finally {
 567             lock.unlock();
 568         }

 569     }
 570 
 571     /**
 572      * Returns the comparator used to order the elements in this queue,
 573      * or {@code null} if this queue uses the {@linkplain Comparable
 574      * natural ordering} of its elements.
 575      *
 576      * @return the comparator used to order the elements in this queue,
 577      *         or {@code null} if this queue uses the natural
 578      *         ordering of its elements
 579      */
 580     public Comparator<? super E> comparator() {
 581         return comparator;
 582     }
 583 
 584     public int size() {
 585         final ReentrantLock lock = this.lock;
 586         lock.lock();
 587         try {
 588             return size;


 632                     siftUpComparable(i, moved, array);
 633                 else
 634                     siftUpUsingComparator(i, moved, array, cmp);
 635             }
 636         }
 637         size = n;
 638     }
 639 
 640     /**
 641      * Removes a single instance of the specified element from this queue,
 642      * if it is present.  More formally, removes an element {@code e} such
 643      * that {@code o.equals(e)}, if this queue contains one or more such
 644      * elements.  Returns {@code true} if and only if this queue contained
 645      * the specified element (or equivalently, if this queue changed as a
 646      * result of the call).
 647      *
 648      * @param o element to be removed from this queue, if present
 649      * @return {@code true} if this queue changed as a result of the call
 650      */
 651     public boolean remove(Object o) {

 652         final ReentrantLock lock = this.lock;
 653         lock.lock();
 654         try {
 655             int i = indexOf(o);
 656             if (i == -1)
 657                 return false;
 658             removeAt(i);
 659             return true;

 660         } finally {
 661             lock.unlock();
 662         }

 663     }
 664 

 665     /**
 666      * Identity-based version for use in Itr.remove
 667      */
 668     void removeEQ(Object o) {
 669         final ReentrantLock lock = this.lock;
 670         lock.lock();
 671         try {
 672             Object[] array = queue;
 673             for (int i = 0, n = size; i < n; i++) {

 674                 if (o == array[i]) {
 675                     removeAt(i);
 676                     break;
 677                 }
 678             }
 679         } finally {
 680             lock.unlock();
 681         }
 682     }
 683 
 684     /**
 685      * Returns {@code true} if this queue contains the specified element.
 686      * More formally, returns {@code true} if and only if this queue contains
 687      * at least one element {@code e} such that {@code o.equals(e)}.
 688      *
 689      * @param o object to be checked for containment in this queue
 690      * @return {@code true} if this queue contains the specified element
 691      */
 692     public boolean contains(Object o) {

 693         final ReentrantLock lock = this.lock;
 694         lock.lock();
 695         try {
 696             return indexOf(o) != -1;
 697         } finally {
 698             lock.unlock();
 699         }

 700     }
 701 
 702     /**
 703      * Returns an array containing all of the elements in this queue.
 704      * The returned array elements are in no particular order.
 705      *
 706      * <p>The returned array will be "safe" in that no references to it are
 707      * maintained by this queue.  (In other words, this method must allocate
 708      * a new array).  The caller is thus free to modify the returned array.
 709      *
 710      * <p>This method acts as bridge between array-based and collection-based
 711      * APIs.
 712      *
 713      * @return an array containing all of the elements in this queue
 714      */
 715     public Object[] toArray() {
 716         final ReentrantLock lock = this.lock;
 717         lock.lock();
 718         try {
 719             return Arrays.copyOf(queue, size);
 720         } finally {
 721             lock.unlock();
 722         }
 723     }
 724 

 725     public String toString() {
 726         final ReentrantLock lock = this.lock;
 727         lock.lock();
 728         try {
 729             int n = size;
 730             if (n == 0)
 731                 return "[]";
 732             StringBuilder sb = new StringBuilder();
 733             sb.append('[');
 734             for (int i = 0; i < n; ++i) {
 735                 Object e = queue[i];
 736                 sb.append(e == this ? "(this Collection)" : e);
 737                 if (i != n - 1)
 738                     sb.append(',').append(' ');
 739             }
 740             return sb.append(']').toString();
 741         } finally {
 742             lock.unlock();
 743         }
 744     }
 745 
 746     /**
 747      * @throws UnsupportedOperationException {@inheritDoc}
 748      * @throws ClassCastException            {@inheritDoc}
 749      * @throws NullPointerException          {@inheritDoc}
 750      * @throws IllegalArgumentException      {@inheritDoc}
 751      */
 752     public int drainTo(Collection<? super E> c) {
 753         return drainTo(c, Integer.MAX_VALUE);











 754     }





 755 
 756     /**
 757      * @throws UnsupportedOperationException {@inheritDoc}
 758      * @throws ClassCastException            {@inheritDoc}
 759      * @throws NullPointerException          {@inheritDoc}
 760      * @throws IllegalArgumentException      {@inheritDoc}
 761      */
 762     public int drainTo(Collection<? super E> c, int maxElements) {
 763         if (c == null)
 764             throw new NullPointerException();
 765         if (c == this)
 766             throw new IllegalArgumentException();
 767         if (maxElements <= 0)
 768             return 0;
 769         final ReentrantLock lock = this.lock;
 770         lock.lock();
 771         try {
 772             int n = Math.min(size, maxElements);
 773             for (int i = 0; i < n; i++) {
 774                 c.add((E) queue[0]); // In this order, in case add() throws.
 775                 dequeue();

 776             }
 777             return n;
 778         } finally {
 779             lock.unlock();
 780         }
 781     }
 782 
 783     /**
 784      * Atomically removes all of the elements from this queue.
 785      * The queue will be empty after this call returns.
 786      */
 787     public void clear() {
 788         final ReentrantLock lock = this.lock;
 789         lock.lock();
 790         try {
 791             Object[] array = queue;
 792             int n = size;
 793             size = 0;
 794             for (int i = 0; i < n; i++)
 795                 array[i] = null;


 803      * runtime type of the returned array is that of the specified array.
 804      * The returned array elements are in no particular order.
 805      * If the queue fits in the specified array, it is returned therein.
 806      * Otherwise, a new array is allocated with the runtime type of the
 807      * specified array and the size of this queue.
 808      *
 809      * <p>If this queue fits in the specified array with room to spare
 810      * (i.e., the array has more elements than this queue), the element in
 811      * the array immediately following the end of the queue is set to
 812      * {@code null}.
 813      *
 814      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 815      * array-based and collection-based APIs.  Further, this method allows
 816      * precise control over the runtime type of the output array, and may,
 817      * under certain circumstances, be used to save allocation costs.
 818      *
 819      * <p>Suppose {@code x} is a queue known to contain only strings.
 820      * The following code can be used to dump the queue into a newly
 821      * allocated array of {@code String}:
 822      *
 823      *  <pre> {@code String[] y = x.toArray(new String[0]);}</pre>

 824      *
 825      * Note that {@code toArray(new Object[0])} is identical in function to
 826      * {@code toArray()}.
 827      *
 828      * @param a the array into which the elements of the queue are to
 829      *          be stored, if it is big enough; otherwise, a new array of the
 830      *          same runtime type is allocated for this purpose
 831      * @return an array containing all of the elements in this queue
 832      * @throws ArrayStoreException if the runtime type of the specified array
 833      *         is not a supertype of the runtime type of every element in
 834      *         this queue
 835      * @throws NullPointerException if the specified array is null
 836      */
 837     public <T> T[] toArray(T[] a) {
 838         final ReentrantLock lock = this.lock;
 839         lock.lock();
 840         try {
 841             int n = size;
 842             if (a.length < n)
 843                 // Make a new array of a's runtime type, but my contents:


 856      * iterator does not return the elements in any particular order.
 857      *
 858      * <p>The returned iterator is a "weakly consistent" iterator that
 859      * will never throw {@link java.util.ConcurrentModificationException
 860      * ConcurrentModificationException}, and guarantees to traverse
 861      * elements as they existed upon construction of the iterator, and
 862      * may (but is not guaranteed to) reflect any modifications
 863      * subsequent to construction.
 864      *
 865      * @return an iterator over the elements in this queue
 866      */
 867     public Iterator<E> iterator() {
 868         return new Itr(toArray());
 869     }
 870 
 871     /**
 872      * Snapshot iterator that works off copy of underlying q array.
 873      */
 874     final class Itr implements Iterator<E> {
 875         final Object[] array; // Array of all elements
 876         int cursor;           // index of next element to return
 877         int lastRet;          // index of last element, or -1 if no such
 878 
 879         Itr(Object[] array) {
 880             lastRet = -1;
 881             this.array = array;
 882         }
 883 
 884         public boolean hasNext() {
 885             return cursor < array.length;
 886         }
 887 
 888         public E next() {
 889             if (cursor >= array.length)
 890                 throw new NoSuchElementException();
 891             lastRet = cursor;
 892             return (E)array[cursor++];
 893         }
 894 
 895         public void remove() {
 896             if (lastRet < 0)
 897                 throw new IllegalStateException();
 898             removeEQ(array[lastRet]);
 899             lastRet = -1;
 900         }
 901     }
 902 
 903     /**
 904      * Saves this queue to a stream (that is, serializes it).
 905      *
 906      * For compatibility with previous version of this class, elements
 907      * are first copied to a java.util.PriorityQueue, which is then
 908      * serialized.
 909      */
 910     private void writeObject(java.io.ObjectOutputStream s)
 911         throws java.io.IOException {
 912         lock.lock();
 913         try {
 914             // avoid zero capacity argument
 915             q = new PriorityQueue<E>(Math.max(size, 1), comparator);
 916             q.addAll(this);
 917             s.defaultWriteObject();
 918         } finally {
 919             q = null;
 920             lock.unlock();
 921         }
 922     }
 923 
 924     /**
 925      * Reconstitutes this queue from a stream (that is, deserializes it).



 926      */
 927     private void readObject(java.io.ObjectInputStream s)
 928         throws java.io.IOException, ClassNotFoundException {
 929         try {
 930             s.defaultReadObject();
 931             this.queue = new Object[q.size()];
 932             comparator = q.comparator();
 933             addAll(q);
 934         } finally {
 935             q = null;
 936         }
 937     }
 938 
 939     // Unsafe mechanics
 940     private static final sun.misc.Unsafe UNSAFE;
 941     private static final long allocationSpinLockOffset;
 942     static {
 943         try {
 944             UNSAFE = sun.misc.Unsafe.getUnsafe();
 945             Class<?> k = PriorityBlockingQueue.class;
src/share/classes/java/util/concurrent/PriorityBlockingQueue.java
Index Unified diffs Context diffs Sdiffs Wdiffs Patch New Old Previous File Next File