rev 12972 : 8140606: Update library code to use internal Unsafe
Reviewed-by: duke
1 /*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.util.AbstractQueue;
39 import java.util.Arrays;
40 import java.util.Collection;
41 import java.util.Comparator;
42 import java.util.Iterator;
43 import java.util.NoSuchElementException;
44 import java.util.PriorityQueue;
45 import java.util.Queue;
46 import java.util.SortedSet;
47 import java.util.Spliterator;
48 import java.util.concurrent.locks.Condition;
49 import java.util.concurrent.locks.ReentrantLock;
50 import java.util.function.Consumer;
51
52 /**
53 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
54 * the same ordering rules as class {@link PriorityQueue} and supplies
55 * blocking retrieval operations. While this queue is logically
56 * unbounded, attempted additions may fail due to resource exhaustion
57 * (causing {@code OutOfMemoryError}). This class does not permit
58 * {@code null} elements. A priority queue relying on {@linkplain
59 * Comparable natural ordering} also does not permit insertion of
60 * non-comparable objects (doing so results in
61 * {@code ClassCastException}).
62 *
63 * <p>This class and its iterator implement all of the
64 * <em>optional</em> methods of the {@link Collection} and {@link
65 * Iterator} interfaces. The Iterator provided in method {@link
66 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
67 * the PriorityBlockingQueue in any particular order. If you need
68 * ordered traversal, consider using
69 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo}
70 * can be used to <em>remove</em> some or all elements in priority
71 * order and place them in another collection.
72 *
73 * <p>Operations on this class make no guarantees about the ordering
74 * of elements with equal priority. If you need to enforce an
75 * ordering, you can define custom classes or comparators that use a
76 * secondary key to break ties in primary priority values. For
77 * example, here is a class that applies first-in-first-out
78 * tie-breaking to comparable elements. To use it, you would insert a
79 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
80 *
81 * <pre> {@code
82 * class FIFOEntry<E extends Comparable<? super E>>
83 * implements Comparable<FIFOEntry<E>> {
84 * static final AtomicLong seq = new AtomicLong(0);
85 * final long seqNum;
86 * final E entry;
87 * public FIFOEntry(E entry) {
88 * seqNum = seq.getAndIncrement();
89 * this.entry = entry;
90 * }
91 * public E getEntry() { return entry; }
92 * public int compareTo(FIFOEntry<E> other) {
93 * int res = entry.compareTo(other.entry);
94 * if (res == 0 && other.entry != this.entry)
95 * res = (seqNum < other.seqNum ? -1 : 1);
96 * return res;
97 * }
98 * }}</pre>
99 *
100 * <p>This class is a member of the
101 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
102 * Java Collections Framework</a>.
103 *
104 * @since 1.5
105 * @author Doug Lea
106 * @param <E> the type of elements held in this queue
107 */
108 @SuppressWarnings("unchecked")
109 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
110 implements BlockingQueue<E>, java.io.Serializable {
111 private static final long serialVersionUID = 5595510919245408276L;
112
113 /*
114 * The implementation uses an array-based binary heap, with public
115 * operations protected with a single lock. However, allocation
116 * during resizing uses a simple spinlock (used only while not
117 * holding main lock) in order to allow takes to operate
118 * concurrently with allocation. This avoids repeated
119 * postponement of waiting consumers and consequent element
120 * build-up. The need to back away from lock during allocation
121 * makes it impossible to simply wrap delegated
122 * java.util.PriorityQueue operations within a lock, as was done
123 * in a previous version of this class. To maintain
124 * interoperability, a plain PriorityQueue is still used during
125 * serialization, which maintains compatibility at the expense of
126 * transiently doubling overhead.
127 */
128
129 /**
130 * Default array capacity.
131 */
132 private static final int DEFAULT_INITIAL_CAPACITY = 11;
133
134 /**
135 * The maximum size of array to allocate.
136 * Some VMs reserve some header words in an array.
137 * Attempts to allocate larger arrays may result in
138 * OutOfMemoryError: Requested array size exceeds VM limit
139 */
140 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
141
142 /**
143 * Priority queue represented as a balanced binary heap: the two
144 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
145 * priority queue is ordered by comparator, or by the elements'
146 * natural ordering, if comparator is null: For each node n in the
147 * heap and each descendant d of n, n <= d. The element with the
148 * lowest value is in queue[0], assuming the queue is nonempty.
149 */
150 private transient Object[] queue;
151
152 /**
153 * The number of elements in the priority queue.
154 */
155 private transient int size;
156
157 /**
158 * The comparator, or null if priority queue uses elements'
159 * natural ordering.
160 */
161 private transient Comparator<? super E> comparator;
162
163 /**
164 * Lock used for all public operations.
165 */
166 private final ReentrantLock lock;
167
168 /**
169 * Condition for blocking when empty.
170 */
171 private final Condition notEmpty;
172
173 /**
174 * Spinlock for allocation, acquired via CAS.
175 */
176 private transient volatile int allocationSpinLock;
177
178 /**
179 * A plain PriorityQueue used only for serialization,
180 * to maintain compatibility with previous versions
181 * of this class. Non-null only during serialization/deserialization.
182 */
183 private PriorityQueue<E> q;
184
185 /**
186 * Creates a {@code PriorityBlockingQueue} with the default
187 * initial capacity (11) that orders its elements according to
188 * their {@linkplain Comparable natural ordering}.
189 */
190 public PriorityBlockingQueue() {
191 this(DEFAULT_INITIAL_CAPACITY, null);
192 }
193
194 /**
195 * Creates a {@code PriorityBlockingQueue} with the specified
196 * initial capacity that orders its elements according to their
197 * {@linkplain Comparable natural ordering}.
198 *
199 * @param initialCapacity the initial capacity for this priority queue
200 * @throws IllegalArgumentException if {@code initialCapacity} is less
201 * than 1
202 */
203 public PriorityBlockingQueue(int initialCapacity) {
204 this(initialCapacity, null);
205 }
206
207 /**
208 * Creates a {@code PriorityBlockingQueue} with the specified initial
209 * capacity that orders its elements according to the specified
210 * comparator.
211 *
212 * @param initialCapacity the initial capacity for this priority queue
213 * @param comparator the comparator that will be used to order this
214 * priority queue. If {@code null}, the {@linkplain Comparable
215 * natural ordering} of the elements will be used.
216 * @throws IllegalArgumentException if {@code initialCapacity} is less
217 * than 1
218 */
219 public PriorityBlockingQueue(int initialCapacity,
220 Comparator<? super E> comparator) {
221 if (initialCapacity < 1)
222 throw new IllegalArgumentException();
223 this.lock = new ReentrantLock();
224 this.notEmpty = lock.newCondition();
225 this.comparator = comparator;
226 this.queue = new Object[initialCapacity];
227 }
228
229 /**
230 * Creates a {@code PriorityBlockingQueue} containing the elements
231 * in the specified collection. If the specified collection is a
232 * {@link SortedSet} or a {@link PriorityQueue}, this
233 * priority queue will be ordered according to the same ordering.
234 * Otherwise, this priority queue will be ordered according to the
235 * {@linkplain Comparable natural ordering} of its elements.
236 *
237 * @param c the collection whose elements are to be placed
238 * into this priority queue
239 * @throws ClassCastException if elements of the specified collection
240 * cannot be compared to one another according to the priority
241 * queue's ordering
242 * @throws NullPointerException if the specified collection or any
243 * of its elements are null
244 */
245 public PriorityBlockingQueue(Collection<? extends E> c) {
246 this.lock = new ReentrantLock();
247 this.notEmpty = lock.newCondition();
248 boolean heapify = true; // true if not known to be in heap order
249 boolean screen = true; // true if must screen for nulls
250 if (c instanceof SortedSet<?>) {
251 SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
252 this.comparator = (Comparator<? super E>) ss.comparator();
253 heapify = false;
254 }
255 else if (c instanceof PriorityBlockingQueue<?>) {
256 PriorityBlockingQueue<? extends E> pq =
257 (PriorityBlockingQueue<? extends E>) c;
258 this.comparator = (Comparator<? super E>) pq.comparator();
259 screen = false;
260 if (pq.getClass() == PriorityBlockingQueue.class) // exact match
261 heapify = false;
262 }
263 Object[] a = c.toArray();
264 int n = a.length;
265 // If c.toArray incorrectly doesn't return Object[], copy it.
266 if (a.getClass() != Object[].class)
267 a = Arrays.copyOf(a, n, Object[].class);
268 if (screen && (n == 1 || this.comparator != null)) {
269 for (int i = 0; i < n; ++i)
270 if (a[i] == null)
271 throw new NullPointerException();
272 }
273 this.queue = a;
274 this.size = n;
275 if (heapify)
276 heapify();
277 }
278
279 /**
280 * Tries to grow array to accommodate at least one more element
281 * (but normally expand by about 50%), giving up (allowing retry)
282 * on contention (which we expect to be rare). Call only while
283 * holding lock.
284 *
285 * @param array the heap array
286 * @param oldCap the length of the array
287 */
288 private void tryGrow(Object[] array, int oldCap) {
289 lock.unlock(); // must release and then re-acquire main lock
290 Object[] newArray = null;
291 if (allocationSpinLock == 0 &&
292 U.compareAndSwapInt(this, ALLOCATIONSPINLOCK, 0, 1)) {
293 try {
294 int newCap = oldCap + ((oldCap < 64) ?
295 (oldCap + 2) : // grow faster if small
296 (oldCap >> 1));
297 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
298 int minCap = oldCap + 1;
299 if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
300 throw new OutOfMemoryError();
301 newCap = MAX_ARRAY_SIZE;
302 }
303 if (newCap > oldCap && queue == array)
304 newArray = new Object[newCap];
305 } finally {
306 allocationSpinLock = 0;
307 }
308 }
309 if (newArray == null) // back off if another thread is allocating
310 Thread.yield();
311 lock.lock();
312 if (newArray != null && queue == array) {
313 queue = newArray;
314 System.arraycopy(array, 0, newArray, 0, oldCap);
315 }
316 }
317
318 /**
319 * Mechanics for poll(). Call only while holding lock.
320 */
321 private E dequeue() {
322 int n = size - 1;
323 if (n < 0)
324 return null;
325 else {
326 Object[] array = queue;
327 E result = (E) array[0];
328 E x = (E) array[n];
329 array[n] = null;
330 Comparator<? super E> cmp = comparator;
331 if (cmp == null)
332 siftDownComparable(0, x, array, n);
333 else
334 siftDownUsingComparator(0, x, array, n, cmp);
335 size = n;
336 return result;
337 }
338 }
339
340 /**
341 * Inserts item x at position k, maintaining heap invariant by
342 * promoting x up the tree until it is greater than or equal to
343 * its parent, or is the root.
344 *
345 * To simplify and speed up coercions and comparisons. the
346 * Comparable and Comparator versions are separated into different
347 * methods that are otherwise identical. (Similarly for siftDown.)
348 * These methods are static, with heap state as arguments, to
349 * simplify use in light of possible comparator exceptions.
350 *
351 * @param k the position to fill
352 * @param x the item to insert
353 * @param array the heap array
354 */
355 private static <T> void siftUpComparable(int k, T x, Object[] array) {
356 Comparable<? super T> key = (Comparable<? super T>) x;
357 while (k > 0) {
358 int parent = (k - 1) >>> 1;
359 Object e = array[parent];
360 if (key.compareTo((T) e) >= 0)
361 break;
362 array[k] = e;
363 k = parent;
364 }
365 array[k] = key;
366 }
367
368 private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
369 Comparator<? super T> cmp) {
370 while (k > 0) {
371 int parent = (k - 1) >>> 1;
372 Object e = array[parent];
373 if (cmp.compare(x, (T) e) >= 0)
374 break;
375 array[k] = e;
376 k = parent;
377 }
378 array[k] = x;
379 }
380
381 /**
382 * Inserts item x at position k, maintaining heap invariant by
383 * demoting x down the tree repeatedly until it is less than or
384 * equal to its children or is a leaf.
385 *
386 * @param k the position to fill
387 * @param x the item to insert
388 * @param array the heap array
389 * @param n heap size
390 */
391 private static <T> void siftDownComparable(int k, T x, Object[] array,
392 int n) {
393 if (n > 0) {
394 Comparable<? super T> key = (Comparable<? super T>)x;
395 int half = n >>> 1; // loop while a non-leaf
396 while (k < half) {
397 int child = (k << 1) + 1; // assume left child is least
398 Object c = array[child];
399 int right = child + 1;
400 if (right < n &&
401 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
402 c = array[child = right];
403 if (key.compareTo((T) c) <= 0)
404 break;
405 array[k] = c;
406 k = child;
407 }
408 array[k] = key;
409 }
410 }
411
412 private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
413 int n,
414 Comparator<? super T> cmp) {
415 if (n > 0) {
416 int half = n >>> 1;
417 while (k < half) {
418 int child = (k << 1) + 1;
419 Object c = array[child];
420 int right = child + 1;
421 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
422 c = array[child = right];
423 if (cmp.compare(x, (T) c) <= 0)
424 break;
425 array[k] = c;
426 k = child;
427 }
428 array[k] = x;
429 }
430 }
431
432 /**
433 * Establishes the heap invariant (described above) in the entire tree,
434 * assuming nothing about the order of the elements prior to the call.
435 */
436 private void heapify() {
437 Object[] array = queue;
438 int n = size;
439 int half = (n >>> 1) - 1;
440 Comparator<? super E> cmp = comparator;
441 if (cmp == null) {
442 for (int i = half; i >= 0; i--)
443 siftDownComparable(i, (E) array[i], array, n);
444 }
445 else {
446 for (int i = half; i >= 0; i--)
447 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
448 }
449 }
450
451 /**
452 * Inserts the specified element into this priority queue.
453 *
454 * @param e the element to add
455 * @return {@code true} (as specified by {@link Collection#add})
456 * @throws ClassCastException if the specified element cannot be compared
457 * with elements currently in the priority queue according to the
458 * priority queue's ordering
459 * @throws NullPointerException if the specified element is null
460 */
461 public boolean add(E e) {
462 return offer(e);
463 }
464
465 /**
466 * Inserts the specified element into this priority queue.
467 * As the queue is unbounded, this method will never return {@code false}.
468 *
469 * @param e the element to add
470 * @return {@code true} (as specified by {@link Queue#offer})
471 * @throws ClassCastException if the specified element cannot be compared
472 * with elements currently in the priority queue according to the
473 * priority queue's ordering
474 * @throws NullPointerException if the specified element is null
475 */
476 public boolean offer(E e) {
477 if (e == null)
478 throw new NullPointerException();
479 final ReentrantLock lock = this.lock;
480 lock.lock();
481 int n, cap;
482 Object[] array;
483 while ((n = size) >= (cap = (array = queue).length))
484 tryGrow(array, cap);
485 try {
486 Comparator<? super E> cmp = comparator;
487 if (cmp == null)
488 siftUpComparable(n, e, array);
489 else
490 siftUpUsingComparator(n, e, array, cmp);
491 size = n + 1;
492 notEmpty.signal();
493 } finally {
494 lock.unlock();
495 }
496 return true;
497 }
498
499 /**
500 * Inserts the specified element into this priority queue.
501 * As the queue is unbounded, this method will never block.
502 *
503 * @param e the element to add
504 * @throws ClassCastException if the specified element cannot be compared
505 * with elements currently in the priority queue according to the
506 * priority queue's ordering
507 * @throws NullPointerException if the specified element is null
508 */
509 public void put(E e) {
510 offer(e); // never need to block
511 }
512
513 /**
514 * Inserts the specified element into this priority queue.
515 * As the queue is unbounded, this method will never block or
516 * return {@code false}.
517 *
518 * @param e the element to add
519 * @param timeout This parameter is ignored as the method never blocks
520 * @param unit This parameter is ignored as the method never blocks
521 * @return {@code true} (as specified by
522 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
523 * @throws ClassCastException if the specified element cannot be compared
524 * with elements currently in the priority queue according to the
525 * priority queue's ordering
526 * @throws NullPointerException if the specified element is null
527 */
528 public boolean offer(E e, long timeout, TimeUnit unit) {
529 return offer(e); // never need to block
530 }
531
532 public E poll() {
533 final ReentrantLock lock = this.lock;
534 lock.lock();
535 try {
536 return dequeue();
537 } finally {
538 lock.unlock();
539 }
540 }
541
542 public E take() throws InterruptedException {
543 final ReentrantLock lock = this.lock;
544 lock.lockInterruptibly();
545 E result;
546 try {
547 while ( (result = dequeue()) == null)
548 notEmpty.await();
549 } finally {
550 lock.unlock();
551 }
552 return result;
553 }
554
555 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
556 long nanos = unit.toNanos(timeout);
557 final ReentrantLock lock = this.lock;
558 lock.lockInterruptibly();
559 E result;
560 try {
561 while ( (result = dequeue()) == null && nanos > 0)
562 nanos = notEmpty.awaitNanos(nanos);
563 } finally {
564 lock.unlock();
565 }
566 return result;
567 }
568
569 public E peek() {
570 final ReentrantLock lock = this.lock;
571 lock.lock();
572 try {
573 return (size == 0) ? null : (E) queue[0];
574 } finally {
575 lock.unlock();
576 }
577 }
578
579 /**
580 * Returns the comparator used to order the elements in this queue,
581 * or {@code null} if this queue uses the {@linkplain Comparable
582 * natural ordering} of its elements.
583 *
584 * @return the comparator used to order the elements in this queue,
585 * or {@code null} if this queue uses the natural
586 * ordering of its elements
587 */
588 public Comparator<? super E> comparator() {
589 return comparator;
590 }
591
592 public int size() {
593 final ReentrantLock lock = this.lock;
594 lock.lock();
595 try {
596 return size;
597 } finally {
598 lock.unlock();
599 }
600 }
601
602 /**
603 * Always returns {@code Integer.MAX_VALUE} because
604 * a {@code PriorityBlockingQueue} is not capacity constrained.
605 * @return {@code Integer.MAX_VALUE} always
606 */
607 public int remainingCapacity() {
608 return Integer.MAX_VALUE;
609 }
610
611 private int indexOf(Object o) {
612 if (o != null) {
613 Object[] array = queue;
614 int n = size;
615 for (int i = 0; i < n; i++)
616 if (o.equals(array[i]))
617 return i;
618 }
619 return -1;
620 }
621
622 /**
623 * Removes the ith element from queue.
624 */
625 private void removeAt(int i) {
626 Object[] array = queue;
627 int n = size - 1;
628 if (n == i) // removed last element
629 array[i] = null;
630 else {
631 E moved = (E) array[n];
632 array[n] = null;
633 Comparator<? super E> cmp = comparator;
634 if (cmp == null)
635 siftDownComparable(i, moved, array, n);
636 else
637 siftDownUsingComparator(i, moved, array, n, cmp);
638 if (array[i] == moved) {
639 if (cmp == null)
640 siftUpComparable(i, moved, array);
641 else
642 siftUpUsingComparator(i, moved, array, cmp);
643 }
644 }
645 size = n;
646 }
647
648 /**
649 * Removes a single instance of the specified element from this queue,
650 * if it is present. More formally, removes an element {@code e} such
651 * that {@code o.equals(e)}, if this queue contains one or more such
652 * elements. Returns {@code true} if and only if this queue contained
653 * the specified element (or equivalently, if this queue changed as a
654 * result of the call).
655 *
656 * @param o element to be removed from this queue, if present
657 * @return {@code true} if this queue changed as a result of the call
658 */
659 public boolean remove(Object o) {
660 final ReentrantLock lock = this.lock;
661 lock.lock();
662 try {
663 int i = indexOf(o);
664 if (i == -1)
665 return false;
666 removeAt(i);
667 return true;
668 } finally {
669 lock.unlock();
670 }
671 }
672
673 /**
674 * Identity-based version for use in Itr.remove.
675 */
676 void removeEQ(Object o) {
677 final ReentrantLock lock = this.lock;
678 lock.lock();
679 try {
680 Object[] array = queue;
681 for (int i = 0, n = size; i < n; i++) {
682 if (o == array[i]) {
683 removeAt(i);
684 break;
685 }
686 }
687 } finally {
688 lock.unlock();
689 }
690 }
691
692 /**
693 * Returns {@code true} if this queue contains the specified element.
694 * More formally, returns {@code true} if and only if this queue contains
695 * at least one element {@code e} such that {@code o.equals(e)}.
696 *
697 * @param o object to be checked for containment in this queue
698 * @return {@code true} if this queue contains the specified element
699 */
700 public boolean contains(Object o) {
701 final ReentrantLock lock = this.lock;
702 lock.lock();
703 try {
704 return indexOf(o) != -1;
705 } finally {
706 lock.unlock();
707 }
708 }
709
710 public String toString() {
711 return Helpers.collectionToString(this);
712 }
713
714 /**
715 * @throws UnsupportedOperationException {@inheritDoc}
716 * @throws ClassCastException {@inheritDoc}
717 * @throws NullPointerException {@inheritDoc}
718 * @throws IllegalArgumentException {@inheritDoc}
719 */
720 public int drainTo(Collection<? super E> c) {
721 return drainTo(c, Integer.MAX_VALUE);
722 }
723
724 /**
725 * @throws UnsupportedOperationException {@inheritDoc}
726 * @throws ClassCastException {@inheritDoc}
727 * @throws NullPointerException {@inheritDoc}
728 * @throws IllegalArgumentException {@inheritDoc}
729 */
730 public int drainTo(Collection<? super E> c, int maxElements) {
731 if (c == null)
732 throw new NullPointerException();
733 if (c == this)
734 throw new IllegalArgumentException();
735 if (maxElements <= 0)
736 return 0;
737 final ReentrantLock lock = this.lock;
738 lock.lock();
739 try {
740 int n = Math.min(size, maxElements);
741 for (int i = 0; i < n; i++) {
742 c.add((E) queue[0]); // In this order, in case add() throws.
743 dequeue();
744 }
745 return n;
746 } finally {
747 lock.unlock();
748 }
749 }
750
751 /**
752 * Atomically removes all of the elements from this queue.
753 * The queue will be empty after this call returns.
754 */
755 public void clear() {
756 final ReentrantLock lock = this.lock;
757 lock.lock();
758 try {
759 Object[] array = queue;
760 int n = size;
761 size = 0;
762 for (int i = 0; i < n; i++)
763 array[i] = null;
764 } finally {
765 lock.unlock();
766 }
767 }
768
769 /**
770 * Returns an array containing all of the elements in this queue.
771 * The returned array elements are in no particular order.
772 *
773 * <p>The returned array will be "safe" in that no references to it are
774 * maintained by this queue. (In other words, this method must allocate
775 * a new array). The caller is thus free to modify the returned array.
776 *
777 * <p>This method acts as bridge between array-based and collection-based
778 * APIs.
779 *
780 * @return an array containing all of the elements in this queue
781 */
782 public Object[] toArray() {
783 final ReentrantLock lock = this.lock;
784 lock.lock();
785 try {
786 return Arrays.copyOf(queue, size);
787 } finally {
788 lock.unlock();
789 }
790 }
791
792 /**
793 * Returns an array containing all of the elements in this queue; the
794 * runtime type of the returned array is that of the specified array.
795 * The returned array elements are in no particular order.
796 * If the queue fits in the specified array, it is returned therein.
797 * Otherwise, a new array is allocated with the runtime type of the
798 * specified array and the size of this queue.
799 *
800 * <p>If this queue fits in the specified array with room to spare
801 * (i.e., the array has more elements than this queue), the element in
802 * the array immediately following the end of the queue is set to
803 * {@code null}.
804 *
805 * <p>Like the {@link #toArray()} method, this method acts as bridge between
806 * array-based and collection-based APIs. Further, this method allows
807 * precise control over the runtime type of the output array, and may,
808 * under certain circumstances, be used to save allocation costs.
809 *
810 * <p>Suppose {@code x} is a queue known to contain only strings.
811 * The following code can be used to dump the queue into a newly
812 * allocated array of {@code String}:
813 *
814 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
815 *
816 * Note that {@code toArray(new Object[0])} is identical in function to
817 * {@code toArray()}.
818 *
819 * @param a the array into which the elements of the queue are to
820 * be stored, if it is big enough; otherwise, a new array of the
821 * same runtime type is allocated for this purpose
822 * @return an array containing all of the elements in this queue
823 * @throws ArrayStoreException if the runtime type of the specified array
824 * is not a supertype of the runtime type of every element in
825 * this queue
826 * @throws NullPointerException if the specified array is null
827 */
828 public <T> T[] toArray(T[] a) {
829 final ReentrantLock lock = this.lock;
830 lock.lock();
831 try {
832 int n = size;
833 if (a.length < n)
834 // Make a new array of a's runtime type, but my contents:
835 return (T[]) Arrays.copyOf(queue, size, a.getClass());
836 System.arraycopy(queue, 0, a, 0, n);
837 if (a.length > n)
838 a[n] = null;
839 return a;
840 } finally {
841 lock.unlock();
842 }
843 }
844
845 /**
846 * Returns an iterator over the elements in this queue. The
847 * iterator does not return the elements in any particular order.
848 *
849 * <p>The returned iterator is
850 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
851 *
852 * @return an iterator over the elements in this queue
853 */
854 public Iterator<E> iterator() {
855 return new Itr(toArray());
856 }
857
858 /**
859 * Snapshot iterator that works off copy of underlying q array.
860 */
861 final class Itr implements Iterator<E> {
862 final Object[] array; // Array of all elements
863 int cursor; // index of next element to return
864 int lastRet; // index of last element, or -1 if no such
865
866 Itr(Object[] array) {
867 lastRet = -1;
868 this.array = array;
869 }
870
871 public boolean hasNext() {
872 return cursor < array.length;
873 }
874
875 public E next() {
876 if (cursor >= array.length)
877 throw new NoSuchElementException();
878 lastRet = cursor;
879 return (E)array[cursor++];
880 }
881
882 public void remove() {
883 if (lastRet < 0)
884 throw new IllegalStateException();
885 removeEQ(array[lastRet]);
886 lastRet = -1;
887 }
888 }
889
890 /**
891 * Saves this queue to a stream (that is, serializes it).
892 *
893 * For compatibility with previous version of this class, elements
894 * are first copied to a java.util.PriorityQueue, which is then
895 * serialized.
896 *
897 * @param s the stream
898 * @throws java.io.IOException if an I/O error occurs
899 */
900 private void writeObject(java.io.ObjectOutputStream s)
901 throws java.io.IOException {
902 lock.lock();
903 try {
904 // avoid zero capacity argument
905 q = new PriorityQueue<E>(Math.max(size, 1), comparator);
906 q.addAll(this);
907 s.defaultWriteObject();
908 } finally {
909 q = null;
910 lock.unlock();
911 }
912 }
913
914 /**
915 * Reconstitutes this queue from a stream (that is, deserializes it).
916 * @param s the stream
917 * @throws ClassNotFoundException if the class of a serialized object
918 * could not be found
919 * @throws java.io.IOException if an I/O error occurs
920 */
921 private void readObject(java.io.ObjectInputStream s)
922 throws java.io.IOException, ClassNotFoundException {
923 try {
924 s.defaultReadObject();
925 this.queue = new Object[q.size()];
926 comparator = q.comparator();
927 addAll(q);
928 } finally {
929 q = null;
930 }
931 }
932
933 // Similar to Collections.ArraySnapshotSpliterator but avoids
934 // commitment to toArray until needed
935 static final class PBQSpliterator<E> implements Spliterator<E> {
936 final PriorityBlockingQueue<E> queue;
937 Object[] array;
938 int index;
939 int fence;
940
941 PBQSpliterator(PriorityBlockingQueue<E> queue, Object[] array,
942 int index, int fence) {
943 this.queue = queue;
944 this.array = array;
945 this.index = index;
946 this.fence = fence;
947 }
948
949 final int getFence() {
950 int hi;
951 if ((hi = fence) < 0)
952 hi = fence = (array = queue.toArray()).length;
953 return hi;
954 }
955
956 public PBQSpliterator<E> trySplit() {
957 int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
958 return (lo >= mid) ? null :
959 new PBQSpliterator<E>(queue, array, lo, index = mid);
960 }
961
962 @SuppressWarnings("unchecked")
963 public void forEachRemaining(Consumer<? super E> action) {
964 Object[] a; int i, hi; // hoist accesses and checks from loop
965 if (action == null)
966 throw new NullPointerException();
967 if ((a = array) == null)
968 fence = (a = queue.toArray()).length;
969 if ((hi = fence) <= a.length &&
970 (i = index) >= 0 && i < (index = hi)) {
971 do { action.accept((E)a[i]); } while (++i < hi);
972 }
973 }
974
975 public boolean tryAdvance(Consumer<? super E> action) {
976 if (action == null)
977 throw new NullPointerException();
978 if (getFence() > index && index >= 0) {
979 @SuppressWarnings("unchecked") E e = (E) array[index++];
980 action.accept(e);
981 return true;
982 }
983 return false;
984 }
985
986 public long estimateSize() { return (long)(getFence() - index); }
987
988 public int characteristics() {
989 return Spliterator.NONNULL | Spliterator.SIZED | Spliterator.SUBSIZED;
990 }
991 }
992
993 /**
994 * Returns a {@link Spliterator} over the elements in this queue.
995 *
996 * <p>The returned spliterator is
997 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
998 *
999 * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
1000 * {@link Spliterator#NONNULL}.
1001 *
1002 * @implNote
1003 * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
1004 *
1005 * @return a {@code Spliterator} over the elements in this queue
1006 * @since 1.8
1007 */
1008 public Spliterator<E> spliterator() {
1009 return new PBQSpliterator<E>(this, null, 0, -1);
1010 }
1011
1012 // Unsafe mechanics
1013 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1014 private static final long ALLOCATIONSPINLOCK;
1015 static {
1016 try {
1017 ALLOCATIONSPINLOCK = U.objectFieldOffset
1018 (PriorityBlockingQueue.class.getDeclaredField("allocationSpinLock"));
1019 } catch (ReflectiveOperationException e) {
1020 throw new Error(e);
1021 }
1022 }
1023 }
--- EOF ---