18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.util.concurrent.locks.*;
39 import java.util.*;
40
41 /**
42 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
43 * the same ordering rules as class {@link PriorityQueue} and supplies
44 * blocking retrieval operations. While this queue is logically
45 * unbounded, attempted additions may fail due to resource exhaustion
46 * (causing {@code OutOfMemoryError}). This class does not permit
47 * {@code null} elements. A priority queue relying on {@linkplain
48 * Comparable natural ordering} also does not permit insertion of
49 * non-comparable objects (doing so results in
50 * {@code ClassCastException}).
51 *
52 * <p>This class and its iterator implement all of the
53 * <em>optional</em> methods of the {@link Collection} and {@link
54 * Iterator} interfaces. The Iterator provided in method {@link
55 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
56 * the PriorityBlockingQueue in any particular order. If you need
57 * ordered traversal, consider using
58 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo}
94 * @author Doug Lea
95 * @param <E> the type of elements held in this collection
96 */
97 @SuppressWarnings("unchecked")
98 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
99 implements BlockingQueue<E>, java.io.Serializable {
100 private static final long serialVersionUID = 5595510919245408276L;
101
102 /*
103 * The implementation uses an array-based binary heap, with public
104 * operations protected with a single lock. However, allocation
105 * during resizing uses a simple spinlock (used only while not
106 * holding main lock) in order to allow takes to operate
107 * concurrently with allocation. This avoids repeated
108 * postponement of waiting consumers and consequent element
109 * build-up. The need to back away from lock during allocation
110 * makes it impossible to simply wrap delegated
111 * java.util.PriorityQueue operations within a lock, as was done
112 * in a previous version of this class. To maintain
113 * interoperability, a plain PriorityQueue is still used during
114 * serialization, which maintains compatibility at the espense of
115 * transiently doubling overhead.
116 */
117
118 /**
119 * Default array capacity.
120 */
121 private static final int DEFAULT_INITIAL_CAPACITY = 11;
122
123 /**
124 * The maximum size of array to allocate.
125 * Some VMs reserve some header words in an array.
126 * Attempts to allocate larger arrays may result in
127 * OutOfMemoryError: Requested array size exceeds VM limit
128 */
129 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
130
131 /**
132 * Priority queue represented as a balanced binary heap: the two
133 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
134 * priority queue is ordered by comparator, or by the elements'
291 newCap = MAX_ARRAY_SIZE;
292 }
293 if (newCap > oldCap && queue == array)
294 newArray = new Object[newCap];
295 } finally {
296 allocationSpinLock = 0;
297 }
298 }
299 if (newArray == null) // back off if another thread is allocating
300 Thread.yield();
301 lock.lock();
302 if (newArray != null && queue == array) {
303 queue = newArray;
304 System.arraycopy(array, 0, newArray, 0, oldCap);
305 }
306 }
307
308 /**
309 * Mechanics for poll(). Call only while holding lock.
310 */
311 private E extract() {
312 E result;
313 int n = size - 1;
314 if (n < 0)
315 result = null;
316 else {
317 Object[] array = queue;
318 result = (E) array[0];
319 E x = (E) array[n];
320 array[n] = null;
321 Comparator<? super E> cmp = comparator;
322 if (cmp == null)
323 siftDownComparable(0, x, array, n);
324 else
325 siftDownUsingComparator(0, x, array, n, cmp);
326 size = n;
327 }
328 return result;
329 }
330
331 /**
332 * Inserts item x at position k, maintaining heap invariant by
333 * promoting x up the tree until it is greater than or equal to
334 * its parent, or is the root.
335 *
336 * To simplify and speed up coercions and comparisons. the
337 * Comparable and Comparator versions are separated into different
338 * methods that are otherwise identical. (Similarly for siftDown.)
339 * These methods are static, with heap state as arguments, to
340 * simplify use in light of possible comparator exceptions.
341 *
342 * @param k the position to fill
343 * @param x the item to insert
344 * @param array the heap array
345 * @param n heap size
346 */
347 private static <T> void siftUpComparable(int k, T x, Object[] array) {
348 Comparable<? super T> key = (Comparable<? super T>) x;
349 while (k > 0) {
365 if (cmp.compare(x, (T) e) >= 0)
366 break;
367 array[k] = e;
368 k = parent;
369 }
370 array[k] = x;
371 }
372
373 /**
374 * Inserts item x at position k, maintaining heap invariant by
375 * demoting x down the tree repeatedly until it is less than or
376 * equal to its children or is a leaf.
377 *
378 * @param k the position to fill
379 * @param x the item to insert
380 * @param array the heap array
381 * @param n heap size
382 */
383 private static <T> void siftDownComparable(int k, T x, Object[] array,
384 int n) {
385 Comparable<? super T> key = (Comparable<? super T>)x;
386 int half = n >>> 1; // loop while a non-leaf
387 while (k < half) {
388 int child = (k << 1) + 1; // assume left child is least
389 Object c = array[child];
390 int right = child + 1;
391 if (right < n &&
392 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
393 c = array[child = right];
394 if (key.compareTo((T) c) <= 0)
395 break;
396 array[k] = c;
397 k = child;
398 }
399 array[k] = key;
400 }
401
402 private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
403 int n,
404 Comparator<? super T> cmp) {
405 int half = n >>> 1;
406 while (k < half) {
407 int child = (k << 1) + 1;
408 Object c = array[child];
409 int right = child + 1;
410 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
411 c = array[child = right];
412 if (cmp.compare(x, (T) c) <= 0)
413 break;
414 array[k] = c;
415 k = child;
416 }
417 array[k] = x;
418 }
419
420 /**
421 * Establishes the heap invariant (described above) in the entire tree,
422 * assuming nothing about the order of the elements prior to the call.
423 */
424 private void heapify() {
425 Object[] array = queue;
426 int n = size;
427 int half = (n >>> 1) - 1;
428 Comparator<? super E> cmp = comparator;
429 if (cmp == null) {
430 for (int i = half; i >= 0; i--)
431 siftDownComparable(i, (E) array[i], array, n);
432 }
433 else {
434 for (int i = half; i >= 0; i--)
435 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
436 }
437 }
438
503 * As the queue is unbounded, this method will never block or
504 * return {@code false}.
505 *
506 * @param e the element to add
507 * @param timeout This parameter is ignored as the method never blocks
508 * @param unit This parameter is ignored as the method never blocks
509 * @return {@code true} (as specified by
510 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
511 * @throws ClassCastException if the specified element cannot be compared
512 * with elements currently in the priority queue according to the
513 * priority queue's ordering
514 * @throws NullPointerException if the specified element is null
515 */
516 public boolean offer(E e, long timeout, TimeUnit unit) {
517 return offer(e); // never need to block
518 }
519
520 public E poll() {
521 final ReentrantLock lock = this.lock;
522 lock.lock();
523 E result;
524 try {
525 result = extract();
526 } finally {
527 lock.unlock();
528 }
529 return result;
530 }
531
532 public E take() throws InterruptedException {
533 final ReentrantLock lock = this.lock;
534 lock.lockInterruptibly();
535 E result;
536 try {
537 while ( (result = extract()) == null)
538 notEmpty.await();
539 } finally {
540 lock.unlock();
541 }
542 return result;
543 }
544
545 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
546 long nanos = unit.toNanos(timeout);
547 final ReentrantLock lock = this.lock;
548 lock.lockInterruptibly();
549 E result;
550 try {
551 while ( (result = extract()) == null && nanos > 0)
552 nanos = notEmpty.awaitNanos(nanos);
553 } finally {
554 lock.unlock();
555 }
556 return result;
557 }
558
559 public E peek() {
560 final ReentrantLock lock = this.lock;
561 lock.lock();
562 E result;
563 try {
564 result = size > 0 ? (E) queue[0] : null;
565 } finally {
566 lock.unlock();
567 }
568 return result;
569 }
570
571 /**
572 * Returns the comparator used to order the elements in this queue,
573 * or {@code null} if this queue uses the {@linkplain Comparable
574 * natural ordering} of its elements.
575 *
576 * @return the comparator used to order the elements in this queue,
577 * or {@code null} if this queue uses the natural
578 * ordering of its elements
579 */
580 public Comparator<? super E> comparator() {
581 return comparator;
582 }
583
584 public int size() {
585 final ReentrantLock lock = this.lock;
586 lock.lock();
587 try {
588 return size;
632 siftUpComparable(i, moved, array);
633 else
634 siftUpUsingComparator(i, moved, array, cmp);
635 }
636 }
637 size = n;
638 }
639
640 /**
641 * Removes a single instance of the specified element from this queue,
642 * if it is present. More formally, removes an element {@code e} such
643 * that {@code o.equals(e)}, if this queue contains one or more such
644 * elements. Returns {@code true} if and only if this queue contained
645 * the specified element (or equivalently, if this queue changed as a
646 * result of the call).
647 *
648 * @param o element to be removed from this queue, if present
649 * @return {@code true} if this queue changed as a result of the call
650 */
651 public boolean remove(Object o) {
652 boolean removed = false;
653 final ReentrantLock lock = this.lock;
654 lock.lock();
655 try {
656 int i = indexOf(o);
657 if (i != -1) {
658 removeAt(i);
659 removed = true;
660 }
661 } finally {
662 lock.unlock();
663 }
664 return removed;
665 }
666
667
668 /**
669 * Identity-based version for use in Itr.remove
670 */
671 private void removeEQ(Object o) {
672 final ReentrantLock lock = this.lock;
673 lock.lock();
674 try {
675 Object[] array = queue;
676 int n = size;
677 for (int i = 0; i < n; i++) {
678 if (o == array[i]) {
679 removeAt(i);
680 break;
681 }
682 }
683 } finally {
684 lock.unlock();
685 }
686 }
687
688 /**
689 * Returns {@code true} if this queue contains the specified element.
690 * More formally, returns {@code true} if and only if this queue contains
691 * at least one element {@code e} such that {@code o.equals(e)}.
692 *
693 * @param o object to be checked for containment in this queue
694 * @return {@code true} if this queue contains the specified element
695 */
696 public boolean contains(Object o) {
697 int index;
698 final ReentrantLock lock = this.lock;
699 lock.lock();
700 try {
701 index = indexOf(o);
702 } finally {
703 lock.unlock();
704 }
705 return index != -1;
706 }
707
708 /**
709 * Returns an array containing all of the elements in this queue.
710 * The returned array elements are in no particular order.
711 *
712 * <p>The returned array will be "safe" in that no references to it are
713 * maintained by this queue. (In other words, this method must allocate
714 * a new array). The caller is thus free to modify the returned array.
715 *
716 * <p>This method acts as bridge between array-based and collection-based
717 * APIs.
718 *
719 * @return an array containing all of the elements in this queue
720 */
721 public Object[] toArray() {
722 final ReentrantLock lock = this.lock;
723 lock.lock();
724 try {
725 return Arrays.copyOf(queue, size);
726 } finally {
727 lock.unlock();
728 }
729 }
730
731
732 public String toString() {
733 final ReentrantLock lock = this.lock;
734 lock.lock();
735 try {
736 int n = size;
737 if (n == 0)
738 return "[]";
739 StringBuilder sb = new StringBuilder();
740 sb.append('[');
741 for (int i = 0; i < n; ++i) {
742 E e = (E)queue[i];
743 sb.append(e == this ? "(this Collection)" : e);
744 if (i != n - 1)
745 sb.append(',').append(' ');
746 }
747 return sb.append(']').toString();
748 } finally {
749 lock.unlock();
750 }
751 }
752
753 /**
754 * @throws UnsupportedOperationException {@inheritDoc}
755 * @throws ClassCastException {@inheritDoc}
756 * @throws NullPointerException {@inheritDoc}
757 * @throws IllegalArgumentException {@inheritDoc}
758 */
759 public int drainTo(Collection<? super E> c) {
760 if (c == null)
761 throw new NullPointerException();
762 if (c == this)
763 throw new IllegalArgumentException();
764 final ReentrantLock lock = this.lock;
765 lock.lock();
766 try {
767 int n = 0;
768 E e;
769 while ( (e = extract()) != null) {
770 c.add(e);
771 ++n;
772 }
773 return n;
774 } finally {
775 lock.unlock();
776 }
777 }
778
779 /**
780 * @throws UnsupportedOperationException {@inheritDoc}
781 * @throws ClassCastException {@inheritDoc}
782 * @throws NullPointerException {@inheritDoc}
783 * @throws IllegalArgumentException {@inheritDoc}
784 */
785 public int drainTo(Collection<? super E> c, int maxElements) {
786 if (c == null)
787 throw new NullPointerException();
788 if (c == this)
789 throw new IllegalArgumentException();
790 if (maxElements <= 0)
791 return 0;
792 final ReentrantLock lock = this.lock;
793 lock.lock();
794 try {
795 int n = 0;
796 E e;
797 while (n < maxElements && (e = extract()) != null) {
798 c.add(e);
799 ++n;
800 }
801 return n;
802 } finally {
803 lock.unlock();
804 }
805 }
806
807 /**
808 * Atomically removes all of the elements from this queue.
809 * The queue will be empty after this call returns.
810 */
811 public void clear() {
812 final ReentrantLock lock = this.lock;
813 lock.lock();
814 try {
815 Object[] array = queue;
816 int n = size;
817 size = 0;
818 for (int i = 0; i < n; i++)
819 array[i] = null;
827 * runtime type of the returned array is that of the specified array.
828 * The returned array elements are in no particular order.
829 * If the queue fits in the specified array, it is returned therein.
830 * Otherwise, a new array is allocated with the runtime type of the
831 * specified array and the size of this queue.
832 *
833 * <p>If this queue fits in the specified array with room to spare
834 * (i.e., the array has more elements than this queue), the element in
835 * the array immediately following the end of the queue is set to
836 * {@code null}.
837 *
838 * <p>Like the {@link #toArray()} method, this method acts as bridge between
839 * array-based and collection-based APIs. Further, this method allows
840 * precise control over the runtime type of the output array, and may,
841 * under certain circumstances, be used to save allocation costs.
842 *
843 * <p>Suppose {@code x} is a queue known to contain only strings.
844 * The following code can be used to dump the queue into a newly
845 * allocated array of {@code String}:
846 *
847 * <pre>
848 * String[] y = x.toArray(new String[0]);</pre>
849 *
850 * Note that {@code toArray(new Object[0])} is identical in function to
851 * {@code toArray()}.
852 *
853 * @param a the array into which the elements of the queue are to
854 * be stored, if it is big enough; otherwise, a new array of the
855 * same runtime type is allocated for this purpose
856 * @return an array containing all of the elements in this queue
857 * @throws ArrayStoreException if the runtime type of the specified array
858 * is not a supertype of the runtime type of every element in
859 * this queue
860 * @throws NullPointerException if the specified array is null
861 */
862 public <T> T[] toArray(T[] a) {
863 final ReentrantLock lock = this.lock;
864 lock.lock();
865 try {
866 int n = size;
867 if (a.length < n)
868 // Make a new array of a's runtime type, but my contents:
881 * iterator does not return the elements in any particular order.
882 *
883 * <p>The returned iterator is a "weakly consistent" iterator that
884 * will never throw {@link java.util.ConcurrentModificationException
885 * ConcurrentModificationException}, and guarantees to traverse
886 * elements as they existed upon construction of the iterator, and
887 * may (but is not guaranteed to) reflect any modifications
888 * subsequent to construction.
889 *
890 * @return an iterator over the elements in this queue
891 */
892 public Iterator<E> iterator() {
893 return new Itr(toArray());
894 }
895
896 /**
897 * Snapshot iterator that works off copy of underlying q array.
898 */
899 final class Itr implements Iterator<E> {
900 final Object[] array; // Array of all elements
901 int cursor; // index of next element to return;
902 int lastRet; // index of last element, or -1 if no such
903
904 Itr(Object[] array) {
905 lastRet = -1;
906 this.array = array;
907 }
908
909 public boolean hasNext() {
910 return cursor < array.length;
911 }
912
913 public E next() {
914 if (cursor >= array.length)
915 throw new NoSuchElementException();
916 lastRet = cursor;
917 return (E)array[cursor++];
918 }
919
920 public void remove() {
921 if (lastRet < 0)
922 throw new IllegalStateException();
923 removeEQ(array[lastRet]);
924 lastRet = -1;
925 }
926 }
927
928 /**
929 * Saves the state to a stream (that is, serializes it). For
930 * compatibility with previous version of this class,
931 * elements are first copied to a java.util.PriorityQueue,
932 * which is then serialized.
933 */
934 private void writeObject(java.io.ObjectOutputStream s)
935 throws java.io.IOException {
936 lock.lock();
937 try {
938 int n = size; // avoid zero capacity argument
939 q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
940 q.addAll(this);
941 s.defaultWriteObject();
942 } finally {
943 q = null;
944 lock.unlock();
945 }
946 }
947
948 /**
949 * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
950 * (that is, deserializes it).
951 *
952 * @param s the stream
953 */
954 private void readObject(java.io.ObjectInputStream s)
955 throws java.io.IOException, ClassNotFoundException {
956 try {
957 s.defaultReadObject();
958 this.queue = new Object[q.size()];
959 comparator = q.comparator();
960 addAll(q);
961 } finally {
962 q = null;
963 }
964 }
965
966 // Unsafe mechanics
967 private static final sun.misc.Unsafe UNSAFE;
968 private static final long allocationSpinLockOffset;
969 static {
970 try {
971 UNSAFE = sun.misc.Unsafe.getUnsafe();
972 Class<?> k = PriorityBlockingQueue.class;
|
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.util.concurrent.locks.Condition;
39 import java.util.concurrent.locks.ReentrantLock;
40 import java.util.*;
41
42 /**
43 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
44 * the same ordering rules as class {@link PriorityQueue} and supplies
45 * blocking retrieval operations. While this queue is logically
46 * unbounded, attempted additions may fail due to resource exhaustion
47 * (causing {@code OutOfMemoryError}). This class does not permit
48 * {@code null} elements. A priority queue relying on {@linkplain
49 * Comparable natural ordering} also does not permit insertion of
50 * non-comparable objects (doing so results in
51 * {@code ClassCastException}).
52 *
53 * <p>This class and its iterator implement all of the
54 * <em>optional</em> methods of the {@link Collection} and {@link
55 * Iterator} interfaces. The Iterator provided in method {@link
56 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
57 * the PriorityBlockingQueue in any particular order. If you need
58 * ordered traversal, consider using
59 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo}
95 * @author Doug Lea
96 * @param <E> the type of elements held in this collection
97 */
98 @SuppressWarnings("unchecked")
99 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
100 implements BlockingQueue<E>, java.io.Serializable {
101 private static final long serialVersionUID = 5595510919245408276L;
102
103 /*
104 * The implementation uses an array-based binary heap, with public
105 * operations protected with a single lock. However, allocation
106 * during resizing uses a simple spinlock (used only while not
107 * holding main lock) in order to allow takes to operate
108 * concurrently with allocation. This avoids repeated
109 * postponement of waiting consumers and consequent element
110 * build-up. The need to back away from lock during allocation
111 * makes it impossible to simply wrap delegated
112 * java.util.PriorityQueue operations within a lock, as was done
113 * in a previous version of this class. To maintain
114 * interoperability, a plain PriorityQueue is still used during
115 * serialization, which maintains compatibility at the expense of
116 * transiently doubling overhead.
117 */
118
119 /**
120 * Default array capacity.
121 */
122 private static final int DEFAULT_INITIAL_CAPACITY = 11;
123
124 /**
125 * The maximum size of array to allocate.
126 * Some VMs reserve some header words in an array.
127 * Attempts to allocate larger arrays may result in
128 * OutOfMemoryError: Requested array size exceeds VM limit
129 */
130 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
131
132 /**
133 * Priority queue represented as a balanced binary heap: the two
134 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
135 * priority queue is ordered by comparator, or by the elements'
292 newCap = MAX_ARRAY_SIZE;
293 }
294 if (newCap > oldCap && queue == array)
295 newArray = new Object[newCap];
296 } finally {
297 allocationSpinLock = 0;
298 }
299 }
300 if (newArray == null) // back off if another thread is allocating
301 Thread.yield();
302 lock.lock();
303 if (newArray != null && queue == array) {
304 queue = newArray;
305 System.arraycopy(array, 0, newArray, 0, oldCap);
306 }
307 }
308
309 /**
310 * Mechanics for poll(). Call only while holding lock.
311 */
312 private E dequeue() {
313 int n = size - 1;
314 if (n < 0)
315 return null;
316 else {
317 Object[] array = queue;
318 E result = (E) array[0];
319 E x = (E) array[n];
320 array[n] = null;
321 Comparator<? super E> cmp = comparator;
322 if (cmp == null)
323 siftDownComparable(0, x, array, n);
324 else
325 siftDownUsingComparator(0, x, array, n, cmp);
326 size = n;
327 return result;
328 }
329 }
330
331 /**
332 * Inserts item x at position k, maintaining heap invariant by
333 * promoting x up the tree until it is greater than or equal to
334 * its parent, or is the root.
335 *
336 * To simplify and speed up coercions and comparisons. the
337 * Comparable and Comparator versions are separated into different
338 * methods that are otherwise identical. (Similarly for siftDown.)
339 * These methods are static, with heap state as arguments, to
340 * simplify use in light of possible comparator exceptions.
341 *
342 * @param k the position to fill
343 * @param x the item to insert
344 * @param array the heap array
345 * @param n heap size
346 */
347 private static <T> void siftUpComparable(int k, T x, Object[] array) {
348 Comparable<? super T> key = (Comparable<? super T>) x;
349 while (k > 0) {
365 if (cmp.compare(x, (T) e) >= 0)
366 break;
367 array[k] = e;
368 k = parent;
369 }
370 array[k] = x;
371 }
372
373 /**
374 * Inserts item x at position k, maintaining heap invariant by
375 * demoting x down the tree repeatedly until it is less than or
376 * equal to its children or is a leaf.
377 *
378 * @param k the position to fill
379 * @param x the item to insert
380 * @param array the heap array
381 * @param n heap size
382 */
383 private static <T> void siftDownComparable(int k, T x, Object[] array,
384 int n) {
385 if (n > 0) {
386 Comparable<? super T> key = (Comparable<? super T>)x;
387 int half = n >>> 1; // loop while a non-leaf
388 while (k < half) {
389 int child = (k << 1) + 1; // assume left child is least
390 Object c = array[child];
391 int right = child + 1;
392 if (right < n &&
393 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
394 c = array[child = right];
395 if (key.compareTo((T) c) <= 0)
396 break;
397 array[k] = c;
398 k = child;
399 }
400 array[k] = key;
401 }
402 }
403
404 private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
405 int n,
406 Comparator<? super T> cmp) {
407 if (n > 0) {
408 int half = n >>> 1;
409 while (k < half) {
410 int child = (k << 1) + 1;
411 Object c = array[child];
412 int right = child + 1;
413 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
414 c = array[child = right];
415 if (cmp.compare(x, (T) c) <= 0)
416 break;
417 array[k] = c;
418 k = child;
419 }
420 array[k] = x;
421 }
422 }
423
424 /**
425 * Establishes the heap invariant (described above) in the entire tree,
426 * assuming nothing about the order of the elements prior to the call.
427 */
428 private void heapify() {
429 Object[] array = queue;
430 int n = size;
431 int half = (n >>> 1) - 1;
432 Comparator<? super E> cmp = comparator;
433 if (cmp == null) {
434 for (int i = half; i >= 0; i--)
435 siftDownComparable(i, (E) array[i], array, n);
436 }
437 else {
438 for (int i = half; i >= 0; i--)
439 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
440 }
441 }
442
507 * As the queue is unbounded, this method will never block or
508 * return {@code false}.
509 *
510 * @param e the element to add
511 * @param timeout This parameter is ignored as the method never blocks
512 * @param unit This parameter is ignored as the method never blocks
513 * @return {@code true} (as specified by
514 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
515 * @throws ClassCastException if the specified element cannot be compared
516 * with elements currently in the priority queue according to the
517 * priority queue's ordering
518 * @throws NullPointerException if the specified element is null
519 */
520 public boolean offer(E e, long timeout, TimeUnit unit) {
521 return offer(e); // never need to block
522 }
523
524 public E poll() {
525 final ReentrantLock lock = this.lock;
526 lock.lock();
527 try {
528 return dequeue();
529 } finally {
530 lock.unlock();
531 }
532 }
533
534 public E take() throws InterruptedException {
535 final ReentrantLock lock = this.lock;
536 lock.lockInterruptibly();
537 E result;
538 try {
539 while ( (result = dequeue()) == null)
540 notEmpty.await();
541 } finally {
542 lock.unlock();
543 }
544 return result;
545 }
546
547 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
548 long nanos = unit.toNanos(timeout);
549 final ReentrantLock lock = this.lock;
550 lock.lockInterruptibly();
551 E result;
552 try {
553 while ( (result = dequeue()) == null && nanos > 0)
554 nanos = notEmpty.awaitNanos(nanos);
555 } finally {
556 lock.unlock();
557 }
558 return result;
559 }
560
561 public E peek() {
562 final ReentrantLock lock = this.lock;
563 lock.lock();
564 try {
565 return (size == 0) ? null : (E) queue[0];
566 } finally {
567 lock.unlock();
568 }
569 }
570
571 /**
572 * Returns the comparator used to order the elements in this queue,
573 * or {@code null} if this queue uses the {@linkplain Comparable
574 * natural ordering} of its elements.
575 *
576 * @return the comparator used to order the elements in this queue,
577 * or {@code null} if this queue uses the natural
578 * ordering of its elements
579 */
580 public Comparator<? super E> comparator() {
581 return comparator;
582 }
583
584 public int size() {
585 final ReentrantLock lock = this.lock;
586 lock.lock();
587 try {
588 return size;
632 siftUpComparable(i, moved, array);
633 else
634 siftUpUsingComparator(i, moved, array, cmp);
635 }
636 }
637 size = n;
638 }
639
640 /**
641 * Removes a single instance of the specified element from this queue,
642 * if it is present. More formally, removes an element {@code e} such
643 * that {@code o.equals(e)}, if this queue contains one or more such
644 * elements. Returns {@code true} if and only if this queue contained
645 * the specified element (or equivalently, if this queue changed as a
646 * result of the call).
647 *
648 * @param o element to be removed from this queue, if present
649 * @return {@code true} if this queue changed as a result of the call
650 */
651 public boolean remove(Object o) {
652 final ReentrantLock lock = this.lock;
653 lock.lock();
654 try {
655 int i = indexOf(o);
656 if (i == -1)
657 return false;
658 removeAt(i);
659 return true;
660 } finally {
661 lock.unlock();
662 }
663 }
664
665 /**
666 * Identity-based version for use in Itr.remove
667 */
668 void removeEQ(Object o) {
669 final ReentrantLock lock = this.lock;
670 lock.lock();
671 try {
672 Object[] array = queue;
673 for (int i = 0, n = size; i < n; i++) {
674 if (o == array[i]) {
675 removeAt(i);
676 break;
677 }
678 }
679 } finally {
680 lock.unlock();
681 }
682 }
683
684 /**
685 * Returns {@code true} if this queue contains the specified element.
686 * More formally, returns {@code true} if and only if this queue contains
687 * at least one element {@code e} such that {@code o.equals(e)}.
688 *
689 * @param o object to be checked for containment in this queue
690 * @return {@code true} if this queue contains the specified element
691 */
692 public boolean contains(Object o) {
693 final ReentrantLock lock = this.lock;
694 lock.lock();
695 try {
696 return indexOf(o) != -1;
697 } finally {
698 lock.unlock();
699 }
700 }
701
702 /**
703 * Returns an array containing all of the elements in this queue.
704 * The returned array elements are in no particular order.
705 *
706 * <p>The returned array will be "safe" in that no references to it are
707 * maintained by this queue. (In other words, this method must allocate
708 * a new array). The caller is thus free to modify the returned array.
709 *
710 * <p>This method acts as bridge between array-based and collection-based
711 * APIs.
712 *
713 * @return an array containing all of the elements in this queue
714 */
715 public Object[] toArray() {
716 final ReentrantLock lock = this.lock;
717 lock.lock();
718 try {
719 return Arrays.copyOf(queue, size);
720 } finally {
721 lock.unlock();
722 }
723 }
724
725 public String toString() {
726 final ReentrantLock lock = this.lock;
727 lock.lock();
728 try {
729 int n = size;
730 if (n == 0)
731 return "[]";
732 StringBuilder sb = new StringBuilder();
733 sb.append('[');
734 for (int i = 0; i < n; ++i) {
735 Object e = queue[i];
736 sb.append(e == this ? "(this Collection)" : e);
737 if (i != n - 1)
738 sb.append(',').append(' ');
739 }
740 return sb.append(']').toString();
741 } finally {
742 lock.unlock();
743 }
744 }
745
746 /**
747 * @throws UnsupportedOperationException {@inheritDoc}
748 * @throws ClassCastException {@inheritDoc}
749 * @throws NullPointerException {@inheritDoc}
750 * @throws IllegalArgumentException {@inheritDoc}
751 */
752 public int drainTo(Collection<? super E> c) {
753 return drainTo(c, Integer.MAX_VALUE);
754 }
755
756 /**
757 * @throws UnsupportedOperationException {@inheritDoc}
758 * @throws ClassCastException {@inheritDoc}
759 * @throws NullPointerException {@inheritDoc}
760 * @throws IllegalArgumentException {@inheritDoc}
761 */
762 public int drainTo(Collection<? super E> c, int maxElements) {
763 if (c == null)
764 throw new NullPointerException();
765 if (c == this)
766 throw new IllegalArgumentException();
767 if (maxElements <= 0)
768 return 0;
769 final ReentrantLock lock = this.lock;
770 lock.lock();
771 try {
772 int n = Math.min(size, maxElements);
773 for (int i = 0; i < n; i++) {
774 c.add((E) queue[0]); // In this order, in case add() throws.
775 dequeue();
776 }
777 return n;
778 } finally {
779 lock.unlock();
780 }
781 }
782
783 /**
784 * Atomically removes all of the elements from this queue.
785 * The queue will be empty after this call returns.
786 */
787 public void clear() {
788 final ReentrantLock lock = this.lock;
789 lock.lock();
790 try {
791 Object[] array = queue;
792 int n = size;
793 size = 0;
794 for (int i = 0; i < n; i++)
795 array[i] = null;
803 * runtime type of the returned array is that of the specified array.
804 * The returned array elements are in no particular order.
805 * If the queue fits in the specified array, it is returned therein.
806 * Otherwise, a new array is allocated with the runtime type of the
807 * specified array and the size of this queue.
808 *
809 * <p>If this queue fits in the specified array with room to spare
810 * (i.e., the array has more elements than this queue), the element in
811 * the array immediately following the end of the queue is set to
812 * {@code null}.
813 *
814 * <p>Like the {@link #toArray()} method, this method acts as bridge between
815 * array-based and collection-based APIs. Further, this method allows
816 * precise control over the runtime type of the output array, and may,
817 * under certain circumstances, be used to save allocation costs.
818 *
819 * <p>Suppose {@code x} is a queue known to contain only strings.
820 * The following code can be used to dump the queue into a newly
821 * allocated array of {@code String}:
822 *
823 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
824 *
825 * Note that {@code toArray(new Object[0])} is identical in function to
826 * {@code toArray()}.
827 *
828 * @param a the array into which the elements of the queue are to
829 * be stored, if it is big enough; otherwise, a new array of the
830 * same runtime type is allocated for this purpose
831 * @return an array containing all of the elements in this queue
832 * @throws ArrayStoreException if the runtime type of the specified array
833 * is not a supertype of the runtime type of every element in
834 * this queue
835 * @throws NullPointerException if the specified array is null
836 */
837 public <T> T[] toArray(T[] a) {
838 final ReentrantLock lock = this.lock;
839 lock.lock();
840 try {
841 int n = size;
842 if (a.length < n)
843 // Make a new array of a's runtime type, but my contents:
856 * iterator does not return the elements in any particular order.
857 *
858 * <p>The returned iterator is a "weakly consistent" iterator that
859 * will never throw {@link java.util.ConcurrentModificationException
860 * ConcurrentModificationException}, and guarantees to traverse
861 * elements as they existed upon construction of the iterator, and
862 * may (but is not guaranteed to) reflect any modifications
863 * subsequent to construction.
864 *
865 * @return an iterator over the elements in this queue
866 */
867 public Iterator<E> iterator() {
868 return new Itr(toArray());
869 }
870
871 /**
872 * Snapshot iterator that works off copy of underlying q array.
873 */
874 final class Itr implements Iterator<E> {
875 final Object[] array; // Array of all elements
876 int cursor; // index of next element to return
877 int lastRet; // index of last element, or -1 if no such
878
879 Itr(Object[] array) {
880 lastRet = -1;
881 this.array = array;
882 }
883
884 public boolean hasNext() {
885 return cursor < array.length;
886 }
887
888 public E next() {
889 if (cursor >= array.length)
890 throw new NoSuchElementException();
891 lastRet = cursor;
892 return (E)array[cursor++];
893 }
894
895 public void remove() {
896 if (lastRet < 0)
897 throw new IllegalStateException();
898 removeEQ(array[lastRet]);
899 lastRet = -1;
900 }
901 }
902
903 /**
904 * Saves this queue to a stream (that is, serializes it).
905 *
906 * For compatibility with previous version of this class, elements
907 * are first copied to a java.util.PriorityQueue, which is then
908 * serialized.
909 */
910 private void writeObject(java.io.ObjectOutputStream s)
911 throws java.io.IOException {
912 lock.lock();
913 try {
914 // avoid zero capacity argument
915 q = new PriorityQueue<E>(Math.max(size, 1), comparator);
916 q.addAll(this);
917 s.defaultWriteObject();
918 } finally {
919 q = null;
920 lock.unlock();
921 }
922 }
923
924 /**
925 * Reconstitutes this queue from a stream (that is, deserializes it).
926 */
927 private void readObject(java.io.ObjectInputStream s)
928 throws java.io.IOException, ClassNotFoundException {
929 try {
930 s.defaultReadObject();
931 this.queue = new Object[q.size()];
932 comparator = q.comparator();
933 addAll(q);
934 } finally {
935 q = null;
936 }
937 }
938
939 // Unsafe mechanics
940 private static final sun.misc.Unsafe UNSAFE;
941 private static final long allocationSpinLockOffset;
942 static {
943 try {
944 UNSAFE = sun.misc.Unsafe.getUnsafe();
945 Class<?> k = PriorityBlockingQueue.class;
|