Print this page


Split Close
Expand all
Collapse all
          --- old/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java
          +++ new/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java
↓ open down ↓ 35 lines elided ↑ open up ↑
  36   36  package java.util.concurrent;
  37   37  
  38   38  import java.util.concurrent.locks.*;
  39   39  import java.util.*;
  40   40  
  41   41  /**
  42   42   * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  43   43   * the same ordering rules as class {@link PriorityQueue} and supplies
  44   44   * blocking retrieval operations.  While this queue is logically
  45   45   * unbounded, attempted additions may fail due to resource exhaustion
  46      - * (causing <tt>OutOfMemoryError</tt>). This class does not permit
  47      - * <tt>null</tt> elements.  A priority queue relying on {@linkplain
       46 + * (causing {@code OutOfMemoryError}). This class does not permit
       47 + * {@code null} elements.  A priority queue relying on {@linkplain
  48   48   * Comparable natural ordering} also does not permit insertion of
  49   49   * non-comparable objects (doing so results in
  50      - * <tt>ClassCastException</tt>).
       50 + * {@code ClassCastException}).
  51   51   *
  52   52   * <p>This class and its iterator implement all of the
  53   53   * <em>optional</em> methods of the {@link Collection} and {@link
  54   54   * Iterator} interfaces.  The Iterator provided in method {@link
  55   55   * #iterator()} is <em>not</em> guaranteed to traverse the elements of
  56   56   * the PriorityBlockingQueue in any particular order. If you need
  57   57   * ordered traversal, consider using
  58      - * <tt>Arrays.sort(pq.toArray())</tt>.  Also, method <tt>drainTo</tt>
       58 + * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}
  59   59   * can be used to <em>remove</em> some or all elements in priority
  60   60   * order and place them in another collection.
  61   61   *
  62   62   * <p>Operations on this class make no guarantees about the ordering
  63   63   * of elements with equal priority. If you need to enforce an
  64   64   * ordering, you can define custom classes or comparators that use a
  65   65   * secondary key to break ties in primary priority values.  For
  66   66   * example, here is a class that applies first-in-first-out
  67   67   * tie-breaking to comparable elements. To use it, you would insert a
  68      - * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
       68 + * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
  69   69   *
  70      - * <pre>
  71      - * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
  72      - *     implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
  73      - *   final static AtomicLong seq = new AtomicLong();
       70 + *  <pre> {@code
       71 + * class FIFOEntry<E extends Comparable<? super E>>
       72 + *     implements Comparable<FIFOEntry<E>> {
       73 + *   static final AtomicLong seq = new AtomicLong(0);
  74   74   *   final long seqNum;
  75   75   *   final E entry;
  76   76   *   public FIFOEntry(E entry) {
  77   77   *     seqNum = seq.getAndIncrement();
  78   78   *     this.entry = entry;
  79   79   *   }
  80   80   *   public E getEntry() { return entry; }
  81      - *   public int compareTo(FIFOEntry&lt;E&gt; other) {
       81 + *   public int compareTo(FIFOEntry<E> other) {
  82   82   *     int res = entry.compareTo(other.entry);
  83      - *     if (res == 0 &amp;&amp; other.entry != this.entry)
  84      - *       res = (seqNum &lt; other.seqNum ? -1 : 1);
       83 + *     if (res == 0 && other.entry != this.entry)
       84 + *       res = (seqNum < other.seqNum ? -1 : 1);
  85   85   *     return res;
  86   86   *   }
  87      - * }</pre>
       87 + * }}</pre>
  88   88   *
  89   89   * <p>This class is a member of the
  90   90   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  91   91   * Java Collections Framework</a>.
  92   92   *
  93   93   * @since 1.5
  94   94   * @author Doug Lea
  95   95   * @param <E> the type of elements held in this collection
  96   96   */
  97   97  public class PriorityBlockingQueue<E> extends AbstractQueue<E>
  98   98      implements BlockingQueue<E>, java.io.Serializable {
  99   99      private static final long serialVersionUID = 5595510919245408276L;
 100  100  
 101      -    private final PriorityQueue<E> q;
 102      -    private final ReentrantLock lock = new ReentrantLock(true);
 103      -    private final Condition notEmpty = lock.newCondition();
      101 +    /*
      102 +     * The implementation uses an array-based binary heap, with public
      103 +     * operations protected with a single lock. However, allocation
      104 +     * during resizing uses a simple spinlock (used only while not
      105 +     * holding main lock) in order to allow takes to operate
      106 +     * concurrently with allocation.  This avoids repeated
      107 +     * postponement of waiting consumers and consequent element
      108 +     * build-up. The need to back away from lock during allocation
      109 +     * makes it impossible to simply wrap delegated
      110 +     * java.util.PriorityQueue operations within a lock, as was done
      111 +     * in a previous version of this class. To maintain
      112 +     * interoperability, a plain PriorityQueue is still used during
      113 +     * serialization, which maintains compatibility at the espense of
      114 +     * transiently doubling overhead.
      115 +     */
 104  116  
 105  117      /**
 106      -     * Creates a <tt>PriorityBlockingQueue</tt> with the default
      118 +     * Default array capacity.
      119 +     */
      120 +    private static final int DEFAULT_INITIAL_CAPACITY = 11;
      121 +
      122 +    /**
      123 +     * The maximum size of array to allocate.
      124 +     * Some VMs reserve some header words in an array.
      125 +     * Attempts to allocate larger arrays may result in
      126 +     * OutOfMemoryError: Requested array size exceeds VM limit
      127 +     */
      128 +    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
      129 +
      130 +    /**
      131 +     * Priority queue represented as a balanced binary heap: the two
      132 +     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
      133 +     * priority queue is ordered by comparator, or by the elements'
      134 +     * natural ordering, if comparator is null: For each node n in the
      135 +     * heap and each descendant d of n, n <= d.  The element with the
      136 +     * lowest value is in queue[0], assuming the queue is nonempty.
      137 +     */
      138 +    private transient Object[] queue;
      139 +
      140 +    /**
      141 +     * The number of elements in the priority queue.
      142 +     */
      143 +    private transient int size;
      144 +
      145 +    /**
      146 +     * The comparator, or null if priority queue uses elements'
      147 +     * natural ordering.
      148 +     */
      149 +    private transient Comparator<? super E> comparator;
      150 +
      151 +    /**
      152 +     * Lock used for all public operations
      153 +     */
      154 +    private final ReentrantLock lock;
      155 +
      156 +    /**
      157 +     * Condition for blocking when empty
      158 +     */
      159 +    private final Condition notEmpty;
      160 +
      161 +    /**
      162 +     * Spinlock for allocation, acquired via CAS.
      163 +     */
      164 +    private transient volatile int allocationSpinLock;
      165 +
      166 +    /**
      167 +     * A plain PriorityQueue used only for serialization,
      168 +     * to maintain compatibility with previous versions
      169 +     * of this class. Non-null only during serialization/deserialization.
      170 +     */
      171 +    private PriorityQueue q;
      172 +
      173 +    /**
      174 +     * Creates a {@code PriorityBlockingQueue} with the default
 107  175       * initial capacity (11) that orders its elements according to
 108  176       * their {@linkplain Comparable natural ordering}.
 109  177       */
 110  178      public PriorityBlockingQueue() {
 111      -        q = new PriorityQueue<E>();
      179 +        this(DEFAULT_INITIAL_CAPACITY, null);
 112  180      }
 113  181  
 114  182      /**
 115      -     * Creates a <tt>PriorityBlockingQueue</tt> with the specified
      183 +     * Creates a {@code PriorityBlockingQueue} with the specified
 116  184       * initial capacity that orders its elements according to their
 117  185       * {@linkplain Comparable natural ordering}.
 118  186       *
 119  187       * @param initialCapacity the initial capacity for this priority queue
 120      -     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
      188 +     * @throws IllegalArgumentException if {@code initialCapacity} is less
 121  189       *         than 1
 122  190       */
 123  191      public PriorityBlockingQueue(int initialCapacity) {
 124      -        q = new PriorityQueue<E>(initialCapacity, null);
      192 +        this(initialCapacity, null);
 125  193      }
 126  194  
 127  195      /**
 128      -     * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
      196 +     * Creates a {@code PriorityBlockingQueue} with the specified initial
 129  197       * capacity that orders its elements according to the specified
 130  198       * comparator.
 131  199       *
 132  200       * @param initialCapacity the initial capacity for this priority queue
 133  201       * @param  comparator the comparator that will be used to order this
 134  202       *         priority queue.  If {@code null}, the {@linkplain Comparable
 135  203       *         natural ordering} of the elements will be used.
 136      -     * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
      204 +     * @throws IllegalArgumentException if {@code initialCapacity} is less
 137  205       *         than 1
 138  206       */
 139  207      public PriorityBlockingQueue(int initialCapacity,
 140  208                                   Comparator<? super E> comparator) {
 141      -        q = new PriorityQueue<E>(initialCapacity, comparator);
      209 +        if (initialCapacity < 1)
      210 +            throw new IllegalArgumentException();
      211 +        this.lock = new ReentrantLock();
      212 +        this.notEmpty = lock.newCondition();
      213 +        this.comparator = comparator;
      214 +        this.queue = new Object[initialCapacity];
 142  215      }
 143  216  
 144  217      /**
 145      -     * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
      218 +     * Creates a {@code PriorityBlockingQueue} containing the elements
 146  219       * in the specified collection.  If the specified collection is a
 147  220       * {@link SortedSet} or a {@link PriorityQueue},  this
 148  221       * priority queue will be ordered according to the same ordering.
 149  222       * Otherwise, this priority queue will be ordered according to the
 150  223       * {@linkplain Comparable natural ordering} of its elements.
 151  224       *
 152  225       * @param  c the collection whose elements are to be placed
 153  226       *         into this priority queue
 154  227       * @throws ClassCastException if elements of the specified collection
 155  228       *         cannot be compared to one another according to the priority
 156  229       *         queue's ordering
 157  230       * @throws NullPointerException if the specified collection or any
 158  231       *         of its elements are null
 159  232       */
 160  233      public PriorityBlockingQueue(Collection<? extends E> c) {
 161      -        q = new PriorityQueue<E>(c);
      234 +        this.lock = new ReentrantLock();
      235 +        this.notEmpty = lock.newCondition();
      236 +        boolean heapify = true; // true if not known to be in heap order
      237 +        boolean screen = true;  // true if must screen for nulls
      238 +        if (c instanceof SortedSet<?>) {
      239 +            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
      240 +            this.comparator = (Comparator<? super E>) ss.comparator();
      241 +            heapify = false;
      242 +        }
      243 +        else if (c instanceof PriorityBlockingQueue<?>) {
      244 +            PriorityBlockingQueue<? extends E> pq =
      245 +                (PriorityBlockingQueue<? extends E>) c;
      246 +            this.comparator = (Comparator<? super E>) pq.comparator();
      247 +            screen = false;
      248 +            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
      249 +                heapify = false;
      250 +        }
      251 +        Object[] a = c.toArray();
      252 +        int n = a.length;
      253 +        // If c.toArray incorrectly doesn't return Object[], copy it.
      254 +        if (a.getClass() != Object[].class)
      255 +            a = Arrays.copyOf(a, n, Object[].class);
      256 +        if (screen && (n == 1 || this.comparator != null)) {
      257 +            for (int i = 0; i < n; ++i)
      258 +                if (a[i] == null)
      259 +                    throw new NullPointerException();
      260 +        }
      261 +        this.queue = a;
      262 +        this.size = n;
      263 +        if (heapify)
      264 +            heapify();
 162  265      }
 163  266  
 164  267      /**
      268 +     * Tries to grow array to accommodate at least one more element
      269 +     * (but normally expand by about 50%), giving up (allowing retry)
      270 +     * on contention (which we expect to be rare). Call only while
      271 +     * holding lock.
      272 +     *
      273 +     * @param array the heap array
      274 +     * @param oldCap the length of the array
      275 +     */
      276 +    private void tryGrow(Object[] array, int oldCap) {
      277 +        lock.unlock(); // must release and then re-acquire main lock
      278 +        Object[] newArray = null;
      279 +        if (allocationSpinLock == 0 &&
      280 +            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
      281 +                                     0, 1)) {
      282 +            try {
      283 +                int newCap = oldCap + ((oldCap < 64) ?
      284 +                                       (oldCap + 2) : // grow faster if small
      285 +                                       (oldCap >> 1));
      286 +                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
      287 +                    int minCap = oldCap + 1;
      288 +                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
      289 +                        throw new OutOfMemoryError();
      290 +                    newCap = MAX_ARRAY_SIZE;
      291 +                }
      292 +                if (newCap > oldCap && queue == array)
      293 +                    newArray = new Object[newCap];
      294 +            } finally {
      295 +                allocationSpinLock = 0;
      296 +            }
      297 +        }
      298 +        if (newArray == null) // back off if another thread is allocating
      299 +            Thread.yield();
      300 +        lock.lock();
      301 +        if (newArray != null && queue == array) {
      302 +            queue = newArray;
      303 +            System.arraycopy(array, 0, newArray, 0, oldCap);
      304 +        }
      305 +    }
      306 +
      307 +    /**
      308 +     * Mechanics for poll().  Call only while holding lock.
      309 +     */
      310 +    private E extract() {
      311 +        E result;
      312 +        int n = size - 1;
      313 +        if (n < 0)
      314 +            result = null;
      315 +        else {
      316 +            Object[] array = queue;
      317 +            result = (E) array[0];
      318 +            E x = (E) array[n];
      319 +            array[n] = null;
      320 +            Comparator<? super E> cmp = comparator;
      321 +            if (cmp == null)
      322 +                siftDownComparable(0, x, array, n);
      323 +            else
      324 +                siftDownUsingComparator(0, x, array, n, cmp);
      325 +            size = n;
      326 +        }
      327 +        return result;
      328 +    }
      329 +
      330 +    /**
      331 +     * Inserts item x at position k, maintaining heap invariant by
      332 +     * promoting x up the tree until it is greater than or equal to
      333 +     * its parent, or is the root.
      334 +     *
      335 +     * To simplify and speed up coercions and comparisons. the
      336 +     * Comparable and Comparator versions are separated into different
      337 +     * methods that are otherwise identical. (Similarly for siftDown.)
      338 +     * These methods are static, with heap state as arguments, to
      339 +     * simplify use in light of possible comparator exceptions.
      340 +     *
      341 +     * @param k the position to fill
      342 +     * @param x the item to insert
      343 +     * @param array the heap array
      344 +     * @param n heap size
      345 +     */
      346 +    private static <T> void siftUpComparable(int k, T x, Object[] array) {
      347 +        Comparable<? super T> key = (Comparable<? super T>) x;
      348 +        while (k > 0) {
      349 +            int parent = (k - 1) >>> 1;
      350 +            Object e = array[parent];
      351 +            if (key.compareTo((T) e) >= 0)
      352 +                break;
      353 +            array[k] = e;
      354 +            k = parent;
      355 +        }
      356 +        array[k] = key;
      357 +    }
      358 +
      359 +    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
      360 +                                       Comparator<? super T> cmp) {
      361 +        while (k > 0) {
      362 +            int parent = (k - 1) >>> 1;
      363 +            Object e = array[parent];
      364 +            if (cmp.compare(x, (T) e) >= 0)
      365 +                break;
      366 +            array[k] = e;
      367 +            k = parent;
      368 +        }
      369 +        array[k] = x;
      370 +    }
      371 +
      372 +    /**
      373 +     * Inserts item x at position k, maintaining heap invariant by
      374 +     * demoting x down the tree repeatedly until it is less than or
      375 +     * equal to its children or is a leaf.
      376 +     *
      377 +     * @param k the position to fill
      378 +     * @param x the item to insert
      379 +     * @param array the heap array
      380 +     * @param n heap size
      381 +     */
      382 +    private static <T> void siftDownComparable(int k, T x, Object[] array,
      383 +                                               int n) {
      384 +        Comparable<? super T> key = (Comparable<? super T>)x;
      385 +        int half = n >>> 1;           // loop while a non-leaf
      386 +        while (k < half) {
      387 +            int child = (k << 1) + 1; // assume left child is least
      388 +            Object c = array[child];
      389 +            int right = child + 1;
      390 +            if (right < n &&
      391 +                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
      392 +                c = array[child = right];
      393 +            if (key.compareTo((T) c) <= 0)
      394 +                break;
      395 +            array[k] = c;
      396 +            k = child;
      397 +        }
      398 +        array[k] = key;
      399 +    }
      400 +
      401 +    private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
      402 +                                                    int n,
      403 +                                                    Comparator<? super T> cmp) {
      404 +        int half = n >>> 1;
      405 +        while (k < half) {
      406 +            int child = (k << 1) + 1;
      407 +            Object c = array[child];
      408 +            int right = child + 1;
      409 +            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
      410 +                c = array[child = right];
      411 +            if (cmp.compare(x, (T) c) <= 0)
      412 +                break;
      413 +            array[k] = c;
      414 +            k = child;
      415 +        }
      416 +        array[k] = x;
      417 +    }
      418 +
      419 +    /**
      420 +     * Establishes the heap invariant (described above) in the entire tree,
      421 +     * assuming nothing about the order of the elements prior to the call.
      422 +     */
      423 +    private void heapify() {
      424 +        Object[] array = queue;
      425 +        int n = size;
      426 +        int half = (n >>> 1) - 1;
      427 +        Comparator<? super E> cmp = comparator;
      428 +        if (cmp == null) {
      429 +            for (int i = half; i >= 0; i--)
      430 +                siftDownComparable(i, (E) array[i], array, n);
      431 +        }
      432 +        else {
      433 +            for (int i = half; i >= 0; i--)
      434 +                siftDownUsingComparator(i, (E) array[i], array, n, cmp);
      435 +        }
      436 +    }
      437 +
      438 +    /**
 165  439       * Inserts the specified element into this priority queue.
 166  440       *
 167  441       * @param e the element to add
 168      -     * @return <tt>true</tt> (as specified by {@link Collection#add})
      442 +     * @return {@code true} (as specified by {@link Collection#add})
 169  443       * @throws ClassCastException if the specified element cannot be compared
 170  444       *         with elements currently in the priority queue according to the
 171  445       *         priority queue's ordering
 172  446       * @throws NullPointerException if the specified element is null
 173  447       */
 174  448      public boolean add(E e) {
 175  449          return offer(e);
 176  450      }
 177  451  
 178  452      /**
 179  453       * Inserts the specified element into this priority queue.
      454 +     * As the queue is unbounded, this method will never return {@code false}.
 180  455       *
 181  456       * @param e the element to add
 182      -     * @return <tt>true</tt> (as specified by {@link Queue#offer})
      457 +     * @return {@code true} (as specified by {@link Queue#offer})
 183  458       * @throws ClassCastException if the specified element cannot be compared
 184  459       *         with elements currently in the priority queue according to the
 185  460       *         priority queue's ordering
 186  461       * @throws NullPointerException if the specified element is null
 187  462       */
 188  463      public boolean offer(E e) {
      464 +        if (e == null)
      465 +            throw new NullPointerException();
 189  466          final ReentrantLock lock = this.lock;
 190  467          lock.lock();
      468 +        int n, cap;
      469 +        Object[] array;
      470 +        while ((n = size) >= (cap = (array = queue).length))
      471 +            tryGrow(array, cap);
 191  472          try {
 192      -            boolean ok = q.offer(e);
 193      -            assert ok;
      473 +            Comparator<? super E> cmp = comparator;
      474 +            if (cmp == null)
      475 +                siftUpComparable(n, e, array);
      476 +            else
      477 +                siftUpUsingComparator(n, e, array, cmp);
      478 +            size = n + 1;
 194  479              notEmpty.signal();
 195      -            return true;
 196  480          } finally {
 197  481              lock.unlock();
 198  482          }
      483 +        return true;
 199  484      }
 200  485  
 201  486      /**
 202      -     * Inserts the specified element into this priority queue. As the queue is
 203      -     * unbounded this method will never block.
      487 +     * Inserts the specified element into this priority queue.
      488 +     * As the queue is unbounded, this method will never block.
 204  489       *
 205  490       * @param e the element to add
 206  491       * @throws ClassCastException if the specified element cannot be compared
 207  492       *         with elements currently in the priority queue according to the
 208  493       *         priority queue's ordering
 209  494       * @throws NullPointerException if the specified element is null
 210  495       */
 211  496      public void put(E e) {
 212  497          offer(e); // never need to block
 213  498      }
 214  499  
 215  500      /**
 216      -     * Inserts the specified element into this priority queue. As the queue is
 217      -     * unbounded this method will never block.
      501 +     * Inserts the specified element into this priority queue.
      502 +     * As the queue is unbounded, this method will never block or
      503 +     * return {@code false}.
 218  504       *
 219  505       * @param e the element to add
 220  506       * @param timeout This parameter is ignored as the method never blocks
 221  507       * @param unit This parameter is ignored as the method never blocks
 222      -     * @return <tt>true</tt>
      508 +     * @return {@code true} (as specified by
      509 +     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
 223  510       * @throws ClassCastException if the specified element cannot be compared
 224  511       *         with elements currently in the priority queue according to the
 225  512       *         priority queue's ordering
 226  513       * @throws NullPointerException if the specified element is null
 227  514       */
 228  515      public boolean offer(E e, long timeout, TimeUnit unit) {
 229  516          return offer(e); // never need to block
 230  517      }
 231  518  
 232  519      public E poll() {
 233  520          final ReentrantLock lock = this.lock;
 234  521          lock.lock();
      522 +        E result;
 235  523          try {
 236      -            return q.poll();
      524 +            result = extract();
 237  525          } finally {
 238  526              lock.unlock();
 239  527          }
      528 +        return result;
 240  529      }
 241  530  
 242  531      public E take() throws InterruptedException {
 243  532          final ReentrantLock lock = this.lock;
 244  533          lock.lockInterruptibly();
      534 +        E result;
 245  535          try {
 246      -            try {
 247      -                while (q.size() == 0)
 248      -                    notEmpty.await();
 249      -            } catch (InterruptedException ie) {
 250      -                notEmpty.signal(); // propagate to non-interrupted thread
 251      -                throw ie;
 252      -            }
 253      -            E x = q.poll();
 254      -            assert x != null;
 255      -            return x;
      536 +            while ( (result = extract()) == null)
      537 +                notEmpty.await();
 256  538          } finally {
 257  539              lock.unlock();
 258  540          }
      541 +        return result;
 259  542      }
 260  543  
 261  544      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 262  545          long nanos = unit.toNanos(timeout);
 263  546          final ReentrantLock lock = this.lock;
 264  547          lock.lockInterruptibly();
      548 +        E result;
 265  549          try {
 266      -            for (;;) {
 267      -                E x = q.poll();
 268      -                if (x != null)
 269      -                    return x;
 270      -                if (nanos <= 0)
 271      -                    return null;
 272      -                try {
 273      -                    nanos = notEmpty.awaitNanos(nanos);
 274      -                } catch (InterruptedException ie) {
 275      -                    notEmpty.signal(); // propagate to non-interrupted thread
 276      -                    throw ie;
 277      -                }
 278      -            }
      550 +            while ( (result = extract()) == null && nanos > 0)
      551 +                nanos = notEmpty.awaitNanos(nanos);
 279  552          } finally {
 280  553              lock.unlock();
 281  554          }
      555 +        return result;
 282  556      }
 283  557  
 284  558      public E peek() {
 285  559          final ReentrantLock lock = this.lock;
 286  560          lock.lock();
      561 +        E result;
 287  562          try {
 288      -            return q.peek();
      563 +            result = size > 0 ? (E) queue[0] : null;
 289  564          } finally {
 290  565              lock.unlock();
 291  566          }
      567 +        return result;
 292  568      }
 293  569  
 294  570      /**
 295  571       * Returns the comparator used to order the elements in this queue,
 296      -     * or <tt>null</tt> if this queue uses the {@linkplain Comparable
      572 +     * or {@code null} if this queue uses the {@linkplain Comparable
 297  573       * natural ordering} of its elements.
 298  574       *
 299  575       * @return the comparator used to order the elements in this queue,
 300      -     *         or <tt>null</tt> if this queue uses the natural
      576 +     *         or {@code null} if this queue uses the natural
 301  577       *         ordering of its elements
 302  578       */
 303  579      public Comparator<? super E> comparator() {
 304      -        return q.comparator();
      580 +        return comparator;
 305  581      }
 306  582  
 307  583      public int size() {
 308  584          final ReentrantLock lock = this.lock;
 309  585          lock.lock();
 310  586          try {
 311      -            return q.size();
      587 +            return size;
 312  588          } finally {
 313  589              lock.unlock();
 314  590          }
 315  591      }
 316  592  
 317  593      /**
 318      -     * Always returns <tt>Integer.MAX_VALUE</tt> because
 319      -     * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
 320      -     * @return <tt>Integer.MAX_VALUE</tt>
      594 +     * Always returns {@code Integer.MAX_VALUE} because
      595 +     * a {@code PriorityBlockingQueue} is not capacity constrained.
      596 +     * @return {@code Integer.MAX_VALUE} always
 321  597       */
 322  598      public int remainingCapacity() {
 323  599          return Integer.MAX_VALUE;
 324  600      }
 325  601  
      602 +    private int indexOf(Object o) {
      603 +        if (o != null) {
      604 +            Object[] array = queue;
      605 +            int n = size;
      606 +            for (int i = 0; i < n; i++)
      607 +                if (o.equals(array[i]))
      608 +                    return i;
      609 +        }
      610 +        return -1;
      611 +    }
      612 +
 326  613      /**
      614 +     * Removes the ith element from queue.
      615 +     */
      616 +    private void removeAt(int i) {
      617 +        Object[] array = queue;
      618 +        int n = size - 1;
      619 +        if (n == i) // removed last element
      620 +            array[i] = null;
      621 +        else {
      622 +            E moved = (E) array[n];
      623 +            array[n] = null;
      624 +            Comparator<? super E> cmp = comparator;
      625 +            if (cmp == null)
      626 +                siftDownComparable(i, moved, array, n);
      627 +            else
      628 +                siftDownUsingComparator(i, moved, array, n, cmp);
      629 +            if (array[i] == moved) {
      630 +                if (cmp == null)
      631 +                    siftUpComparable(i, moved, array);
      632 +                else
      633 +                    siftUpUsingComparator(i, moved, array, cmp);
      634 +            }
      635 +        }
      636 +        size = n;
      637 +    }
      638 +
      639 +    /**
 327  640       * Removes a single instance of the specified element from this queue,
 328  641       * if it is present.  More formally, removes an element {@code e} such
 329  642       * that {@code o.equals(e)}, if this queue contains one or more such
 330  643       * elements.  Returns {@code true} if and only if this queue contained
 331  644       * the specified element (or equivalently, if this queue changed as a
 332  645       * result of the call).
 333  646       *
 334  647       * @param o element to be removed from this queue, if present
 335      -     * @return <tt>true</tt> if this queue changed as a result of the call
      648 +     * @return {@code true} if this queue changed as a result of the call
 336  649       */
 337  650      public boolean remove(Object o) {
      651 +        boolean removed = false;
 338  652          final ReentrantLock lock = this.lock;
 339  653          lock.lock();
 340  654          try {
 341      -            return q.remove(o);
      655 +            int i = indexOf(o);
      656 +            if (i != -1) {
      657 +                removeAt(i);
      658 +                removed = true;
      659 +            }
 342  660          } finally {
 343  661              lock.unlock();
 344  662          }
      663 +        return removed;
 345  664      }
 346  665  
      666 +
 347  667      /**
      668 +     * Identity-based version for use in Itr.remove
      669 +     */
      670 +    private void removeEQ(Object o) {
      671 +        final ReentrantLock lock = this.lock;
      672 +        lock.lock();
      673 +        try {
      674 +            Object[] array = queue;
      675 +            int n = size;
      676 +            for (int i = 0; i < n; i++) {
      677 +                if (o == array[i]) {
      678 +                    removeAt(i);
      679 +                    break;
      680 +                }
      681 +            }
      682 +        } finally {
      683 +            lock.unlock();
      684 +        }
      685 +    }
      686 +
      687 +    /**
 348  688       * Returns {@code true} if this queue contains the specified element.
 349  689       * More formally, returns {@code true} if and only if this queue contains
 350  690       * at least one element {@code e} such that {@code o.equals(e)}.
 351  691       *
 352  692       * @param o object to be checked for containment in this queue
 353      -     * @return <tt>true</tt> if this queue contains the specified element
      693 +     * @return {@code true} if this queue contains the specified element
 354  694       */
 355  695      public boolean contains(Object o) {
      696 +        int index;
 356  697          final ReentrantLock lock = this.lock;
 357  698          lock.lock();
 358  699          try {
 359      -            return q.contains(o);
      700 +            index = indexOf(o);
 360  701          } finally {
 361  702              lock.unlock();
 362  703          }
      704 +        return index != -1;
 363  705      }
 364  706  
 365  707      /**
 366  708       * Returns an array containing all of the elements in this queue.
 367  709       * The returned array elements are in no particular order.
 368  710       *
 369  711       * <p>The returned array will be "safe" in that no references to it are
 370  712       * maintained by this queue.  (In other words, this method must allocate
 371  713       * a new array).  The caller is thus free to modify the returned array.
 372  714       *
 373  715       * <p>This method acts as bridge between array-based and collection-based
 374  716       * APIs.
 375  717       *
 376  718       * @return an array containing all of the elements in this queue
 377  719       */
 378  720      public Object[] toArray() {
 379  721          final ReentrantLock lock = this.lock;
 380  722          lock.lock();
 381  723          try {
 382      -            return q.toArray();
      724 +            return Arrays.copyOf(queue, size);
 383  725          } finally {
 384  726              lock.unlock();
 385  727          }
 386  728      }
 387  729  
 388  730  
 389  731      public String toString() {
 390  732          final ReentrantLock lock = this.lock;
 391  733          lock.lock();
 392  734          try {
 393      -            return q.toString();
      735 +            int n = size;
      736 +            if (n == 0)
      737 +                return "[]";
      738 +            StringBuilder sb = new StringBuilder();
      739 +            sb.append('[');
      740 +            for (int i = 0; i < n; ++i) {
      741 +                E e = (E)queue[i];
      742 +                sb.append(e == this ? "(this Collection)" : e);
      743 +                if (i != n - 1)
      744 +                    sb.append(',').append(' ');
      745 +            }
      746 +            return sb.append(']').toString();
 394  747          } finally {
 395  748              lock.unlock();
 396  749          }
 397  750      }
 398  751  
 399  752      /**
 400  753       * @throws UnsupportedOperationException {@inheritDoc}
 401  754       * @throws ClassCastException            {@inheritDoc}
 402  755       * @throws NullPointerException          {@inheritDoc}
 403  756       * @throws IllegalArgumentException      {@inheritDoc}
↓ open down ↓ 1 lines elided ↑ open up ↑
 405  758      public int drainTo(Collection<? super E> c) {
 406  759          if (c == null)
 407  760              throw new NullPointerException();
 408  761          if (c == this)
 409  762              throw new IllegalArgumentException();
 410  763          final ReentrantLock lock = this.lock;
 411  764          lock.lock();
 412  765          try {
 413  766              int n = 0;
 414  767              E e;
 415      -            while ( (e = q.poll()) != null) {
      768 +            while ( (e = extract()) != null) {
 416  769                  c.add(e);
 417  770                  ++n;
 418  771              }
 419  772              return n;
 420  773          } finally {
 421  774              lock.unlock();
 422  775          }
 423  776      }
 424  777  
 425  778      /**
↓ open down ↓ 7 lines elided ↑ open up ↑
 433  786              throw new NullPointerException();
 434  787          if (c == this)
 435  788              throw new IllegalArgumentException();
 436  789          if (maxElements <= 0)
 437  790              return 0;
 438  791          final ReentrantLock lock = this.lock;
 439  792          lock.lock();
 440  793          try {
 441  794              int n = 0;
 442  795              E e;
 443      -            while (n < maxElements && (e = q.poll()) != null) {
      796 +            while (n < maxElements && (e = extract()) != null) {
 444  797                  c.add(e);
 445  798                  ++n;
 446  799              }
 447  800              return n;
 448  801          } finally {
 449  802              lock.unlock();
 450  803          }
 451  804      }
 452  805  
 453  806      /**
 454  807       * Atomically removes all of the elements from this queue.
 455  808       * The queue will be empty after this call returns.
 456  809       */
 457  810      public void clear() {
 458  811          final ReentrantLock lock = this.lock;
 459  812          lock.lock();
 460  813          try {
 461      -            q.clear();
      814 +            Object[] array = queue;
      815 +            int n = size;
      816 +            size = 0;
      817 +            for (int i = 0; i < n; i++)
      818 +                array[i] = null;
 462  819          } finally {
 463  820              lock.unlock();
 464  821          }
 465  822      }
 466  823  
 467  824      /**
 468  825       * Returns an array containing all of the elements in this queue; the
 469  826       * runtime type of the returned array is that of the specified array.
 470  827       * The returned array elements are in no particular order.
 471  828       * If the queue fits in the specified array, it is returned therein.
 472  829       * Otherwise, a new array is allocated with the runtime type of the
 473  830       * specified array and the size of this queue.
 474  831       *
 475  832       * <p>If this queue fits in the specified array with room to spare
 476  833       * (i.e., the array has more elements than this queue), the element in
 477  834       * the array immediately following the end of the queue is set to
 478      -     * <tt>null</tt>.
      835 +     * {@code null}.
 479  836       *
 480  837       * <p>Like the {@link #toArray()} method, this method acts as bridge between
 481  838       * array-based and collection-based APIs.  Further, this method allows
 482  839       * precise control over the runtime type of the output array, and may,
 483  840       * under certain circumstances, be used to save allocation costs.
 484  841       *
 485      -     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
      842 +     * <p>Suppose {@code x} is a queue known to contain only strings.
 486  843       * The following code can be used to dump the queue into a newly
 487      -     * allocated array of <tt>String</tt>:
      844 +     * allocated array of {@code String}:
 488  845       *
 489  846       * <pre>
 490  847       *     String[] y = x.toArray(new String[0]);</pre>
 491  848       *
 492      -     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
 493      -     * <tt>toArray()</tt>.
      849 +     * Note that {@code toArray(new Object[0])} is identical in function to
      850 +     * {@code toArray()}.
 494  851       *
 495  852       * @param a the array into which the elements of the queue are to
 496  853       *          be stored, if it is big enough; otherwise, a new array of the
 497  854       *          same runtime type is allocated for this purpose
 498  855       * @return an array containing all of the elements in this queue
 499  856       * @throws ArrayStoreException if the runtime type of the specified array
 500  857       *         is not a supertype of the runtime type of every element in
 501  858       *         this queue
 502  859       * @throws NullPointerException if the specified array is null
 503  860       */
 504  861      public <T> T[] toArray(T[] a) {
 505  862          final ReentrantLock lock = this.lock;
 506  863          lock.lock();
 507  864          try {
 508      -            return q.toArray(a);
      865 +            int n = size;
      866 +            if (a.length < n)
      867 +                // Make a new array of a's runtime type, but my contents:
      868 +                return (T[]) Arrays.copyOf(queue, size, a.getClass());
      869 +            System.arraycopy(queue, 0, a, 0, n);
      870 +            if (a.length > n)
      871 +                a[n] = null;
      872 +            return a;
 509  873          } finally {
 510  874              lock.unlock();
 511  875          }
 512  876      }
 513  877  
 514  878      /**
 515  879       * Returns an iterator over the elements in this queue. The
 516  880       * iterator does not return the elements in any particular order.
 517      -     * The returned <tt>Iterator</tt> is a "weakly consistent"
 518      -     * iterator that will never throw {@link
      881 +     *
      882 +     * <p>The returned iterator is a "weakly consistent" iterator that
      883 +     * will never throw {@link java.util.ConcurrentModificationException
 519  884       * ConcurrentModificationException}, and guarantees to traverse
 520  885       * elements as they existed upon construction of the iterator, and
 521  886       * may (but is not guaranteed to) reflect any modifications
 522  887       * subsequent to construction.
 523  888       *
 524  889       * @return an iterator over the elements in this queue
 525  890       */
 526  891      public Iterator<E> iterator() {
 527  892          return new Itr(toArray());
 528  893      }
 529  894  
 530  895      /**
 531  896       * Snapshot iterator that works off copy of underlying q array.
 532  897       */
 533      -    private class Itr implements Iterator<E> {
      898 +    final class Itr implements Iterator<E> {
 534  899          final Object[] array; // Array of all elements
 535  900          int cursor;           // index of next element to return;
 536  901          int lastRet;          // index of last element, or -1 if no such
 537  902  
 538  903          Itr(Object[] array) {
 539  904              lastRet = -1;
 540  905              this.array = array;
 541  906          }
 542  907  
 543  908          public boolean hasNext() {
↓ open down ↓ 3 lines elided ↑ open up ↑
 547  912          public E next() {
 548  913              if (cursor >= array.length)
 549  914                  throw new NoSuchElementException();
 550  915              lastRet = cursor;
 551  916              return (E)array[cursor++];
 552  917          }
 553  918  
 554  919          public void remove() {
 555  920              if (lastRet < 0)
 556  921                  throw new IllegalStateException();
 557      -            Object x = array[lastRet];
      922 +            removeEQ(array[lastRet]);
 558  923              lastRet = -1;
 559      -            // Traverse underlying queue to find == element,
 560      -            // not just a .equals element.
 561      -            lock.lock();
 562      -            try {
 563      -                for (Iterator it = q.iterator(); it.hasNext(); ) {
 564      -                    if (it.next() == x) {
 565      -                        it.remove();
 566      -                        return;
 567      -                    }
 568      -                }
 569      -            } finally {
 570      -                lock.unlock();
 571      -            }
 572  924          }
 573  925      }
 574  926  
 575  927      /**
 576      -     * Saves the state to a stream (that is, serializes it).  This
 577      -     * merely wraps default serialization within lock.  The
 578      -     * serialization strategy for items is left to underlying
 579      -     * Queue. Note that locking is not needed on deserialization, so
 580      -     * readObject is not defined, just relying on default.
      928 +     * Saves the state to a stream (that is, serializes it).  For
      929 +     * compatibility with previous version of this class,
      930 +     * elements are first copied to a java.util.PriorityQueue,
      931 +     * which is then serialized.
 581  932       */
 582  933      private void writeObject(java.io.ObjectOutputStream s)
 583  934          throws java.io.IOException {
 584  935          lock.lock();
 585  936          try {
      937 +            int n = size; // avoid zero capacity argument
      938 +            q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
      939 +            q.addAll(this);
 586  940              s.defaultWriteObject();
 587  941          } finally {
      942 +            q = null;
 588  943              lock.unlock();
 589  944          }
 590  945      }
      946 +
      947 +    /**
      948 +     * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
      949 +     * (that is, deserializes it).
      950 +     *
      951 +     * @param s the stream
      952 +     */
      953 +    private void readObject(java.io.ObjectInputStream s)
      954 +        throws java.io.IOException, ClassNotFoundException {
      955 +        try {
      956 +            s.defaultReadObject();
      957 +            this.queue = new Object[q.size()];
      958 +            comparator = q.comparator();
      959 +            addAll(q);
      960 +        } finally {
      961 +            q = null;
      962 +        }
      963 +    }
      964 +
      965 +    // Unsafe mechanics
      966 +    private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
      967 +    private static final long allocationSpinLockOffset =
      968 +        objectFieldOffset(UNSAFE, "allocationSpinLock",
      969 +                          PriorityBlockingQueue.class);
      970 +
      971 +    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
      972 +                                  String field, Class<?> klazz) {
      973 +        try {
      974 +            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
      975 +        } catch (NoSuchFieldException e) {
      976 +            // Convert Exception to corresponding Error
      977 +            NoSuchFieldError error = new NoSuchFieldError(field);
      978 +            error.initCause(e);
      979 +            throw error;
      980 +        }
      981 +    }
 591  982  
 592  983  }
    
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX