1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.invoke.MethodHandles; 39 import java.lang.invoke.VarHandle; 40 import java.util.AbstractQueue; 41 import java.util.Arrays; 42 import java.util.Collection; 43 import java.util.Comparator; 44 import java.util.Iterator; 45 import java.util.NoSuchElementException; 46 import java.util.Objects; 47 import java.util.PriorityQueue; 48 import java.util.Queue; 49 import java.util.SortedSet; 50 import java.util.Spliterator; 51 import java.util.concurrent.locks.Condition; 52 import java.util.concurrent.locks.ReentrantLock; 53 import java.util.function.Consumer; 54 import java.util.function.Predicate; 55 import jdk.internal.access.SharedSecrets; 56 import jdk.internal.util.ArraysSupport; 57 58 /** 59 * An unbounded {@linkplain BlockingQueue blocking queue} that uses 60 * the same ordering rules as class {@link PriorityQueue} and supplies 61 * blocking retrieval operations. While this queue is logically 62 * unbounded, attempted additions may fail due to resource exhaustion 63 * (causing {@code OutOfMemoryError}). This class does not permit 64 * {@code null} elements. A priority queue relying on {@linkplain 65 * Comparable natural ordering} also does not permit insertion of 66 * non-comparable objects (doing so results in 67 * {@code ClassCastException}). 68 * 69 * <p>This class and its iterator implement all of the <em>optional</em> 70 * methods of the {@link Collection} and {@link Iterator} interfaces. 71 * The Iterator provided in method {@link #iterator()} and the 72 * Spliterator provided in method {@link #spliterator()} are <em>not</em> 73 * guaranteed to traverse the elements of the PriorityBlockingQueue in 74 * any particular order. If you need ordered traversal, consider using 75 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} can 76 * be used to <em>remove</em> some or all elements in priority order and 77 * place them in another collection. 78 * 79 * <p>Operations on this class make no guarantees about the ordering 80 * of elements with equal priority. If you need to enforce an 81 * ordering, you can define custom classes or comparators that use a 82 * secondary key to break ties in primary priority values. For 83 * example, here is a class that applies first-in-first-out 84 * tie-breaking to comparable elements. To use it, you would insert a 85 * {@code new FIFOEntry(anEntry)} instead of a plain entry object. 86 * 87 * <pre> {@code 88 * class FIFOEntry<E extends Comparable<? super E>> 89 * implements Comparable<FIFOEntry<E>> { 90 * static final AtomicLong seq = new AtomicLong(); 91 * final long seqNum; 92 * final E entry; 93 * public FIFOEntry(E entry) { 94 * seqNum = seq.getAndIncrement(); 95 * this.entry = entry; 96 * } 97 * public E getEntry() { return entry; } 98 * public int compareTo(FIFOEntry<E> other) { 99 * int res = entry.compareTo(other.entry); 100 * if (res == 0 && other.entry != this.entry) 101 * res = (seqNum < other.seqNum ? -1 : 1); 102 * return res; 103 * } 104 * }}</pre> 105 * 106 * <p>This class is a member of the 107 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 108 * Java Collections Framework</a>. 109 * 110 * @since 1.5 111 * @author Doug Lea 112 * @param <E> the type of elements held in this queue 113 */ 114 @SuppressWarnings("unchecked") 115 public class PriorityBlockingQueue<E> extends AbstractQueue<E> 116 implements BlockingQueue<E>, java.io.Serializable { 117 private static final long serialVersionUID = 5595510919245408276L; 118 119 /* 120 * The implementation uses an array-based binary heap, with public 121 * operations protected with a single lock. However, allocation 122 * during resizing uses a simple spinlock (used only while not 123 * holding main lock) in order to allow takes to operate 124 * concurrently with allocation. This avoids repeated 125 * postponement of waiting consumers and consequent element 126 * build-up. The need to back away from lock during allocation 127 * makes it impossible to simply wrap delegated 128 * java.util.PriorityQueue operations within a lock, as was done 129 * in a previous version of this class. To maintain 130 * interoperability, a plain PriorityQueue is still used during 131 * serialization, which maintains compatibility at the expense of 132 * transiently doubling overhead. 133 */ 134 135 /** 136 * Default array capacity. 137 */ 138 private static final int DEFAULT_INITIAL_CAPACITY = 11; 139 140 /** 141 * Priority queue represented as a balanced binary heap: the two 142 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The 143 * priority queue is ordered by comparator, or by the elements' 144 * natural ordering, if comparator is null: For each node n in the 145 * heap and each descendant d of n, n <= d. The element with the 146 * lowest value is in queue[0], assuming the queue is nonempty. 147 */ 148 private transient Object[] queue; 149 150 /** 151 * The number of elements in the priority queue. 152 */ 153 private transient int size; 154 155 /** 156 * The comparator, or null if priority queue uses elements' 157 * natural ordering. 158 */ 159 private transient Comparator<? super E> comparator; 160 161 /** 162 * Lock used for all public operations. 163 */ 164 private final ReentrantLock lock = new ReentrantLock(); 165 166 /** 167 * Condition for blocking when empty. 168 */ 169 @SuppressWarnings("serial") // Classes implementing Condition may be serializable. 170 private final Condition notEmpty = lock.newCondition(); 171 172 /** 173 * Spinlock for allocation, acquired via CAS. 174 */ 175 private transient volatile int allocationSpinLock; 176 177 /** 178 * A plain PriorityQueue used only for serialization, 179 * to maintain compatibility with previous versions 180 * of this class. Non-null only during serialization/deserialization. 181 */ 182 private PriorityQueue<E> q; 183 184 /** 185 * Creates a {@code PriorityBlockingQueue} with the default 186 * initial capacity (11) that orders its elements according to 187 * their {@linkplain Comparable natural ordering}. 188 */ 189 public PriorityBlockingQueue() { 190 this(DEFAULT_INITIAL_CAPACITY, null); 191 } 192 193 /** 194 * Creates a {@code PriorityBlockingQueue} with the specified 195 * initial capacity that orders its elements according to their 196 * {@linkplain Comparable natural ordering}. 197 * 198 * @param initialCapacity the initial capacity for this priority queue 199 * @throws IllegalArgumentException if {@code initialCapacity} is less 200 * than 1 201 */ 202 public PriorityBlockingQueue(int initialCapacity) { 203 this(initialCapacity, null); 204 } 205 206 /** 207 * Creates a {@code PriorityBlockingQueue} with the specified initial 208 * capacity that orders its elements according to the specified 209 * comparator. 210 * 211 * @param initialCapacity the initial capacity for this priority queue 212 * @param comparator the comparator that will be used to order this 213 * priority queue. If {@code null}, the {@linkplain Comparable 214 * natural ordering} of the elements will be used. 215 * @throws IllegalArgumentException if {@code initialCapacity} is less 216 * than 1 217 */ 218 public PriorityBlockingQueue(int initialCapacity, 219 Comparator<? super E> comparator) { 220 if (initialCapacity < 1) 221 throw new IllegalArgumentException(); 222 this.comparator = comparator; 223 this.queue = new Object[Math.max(1, initialCapacity)]; 224 } 225 226 /** 227 * Creates a {@code PriorityBlockingQueue} containing the elements 228 * in the specified collection. If the specified collection is a 229 * {@link SortedSet} or a {@link PriorityBlockingQueue}, this 230 * priority queue will be ordered according to the same ordering. 231 * Otherwise, this priority queue will be ordered according to the 232 * {@linkplain Comparable natural ordering} of its elements. 233 * 234 * @param c the collection whose elements are to be placed 235 * into this priority queue 236 * @throws ClassCastException if elements of the specified collection 237 * cannot be compared to one another according to the priority 238 * queue's ordering 239 * @throws NullPointerException if the specified collection or any 240 * of its elements are null 241 */ 242 public PriorityBlockingQueue(Collection<? extends E> c) { 243 boolean heapify = true; // true if not known to be in heap order 244 boolean screen = true; // true if must screen for nulls 245 if (c instanceof SortedSet<?>) { 246 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; 247 this.comparator = (Comparator<? super E>) ss.comparator(); 248 heapify = false; 249 } 250 else if (c instanceof PriorityBlockingQueue<?>) { 251 PriorityBlockingQueue<? extends E> pq = 252 (PriorityBlockingQueue<? extends E>) c; 253 this.comparator = (Comparator<? super E>) pq.comparator(); 254 screen = false; 255 if (pq.getClass() == PriorityBlockingQueue.class) // exact match 256 heapify = false; 257 } 258 Object[] es = c.toArray(); 259 int n = es.length; 260 if (c.getClass() != java.util.ArrayList.class) 261 es = Arrays.copyOf(es, n, Object[].class); 262 if (screen && (n == 1 || this.comparator != null)) { 263 for (Object e : es) 264 if (e == null) 265 throw new NullPointerException(); 266 } 267 this.queue = ensureNonEmpty(es); 268 this.size = n; 269 if (heapify) 270 heapify(); 271 } 272 273 /** Ensures that queue[0] exists, helping peek() and poll(). */ 274 private static Object[] ensureNonEmpty(Object[] es) { 275 return (es.length > 0) ? es : new Object[1]; 276 } 277 278 /** 279 * Tries to grow array to accommodate at least one more element 280 * (but normally expand by about 50%), giving up (allowing retry) 281 * on contention (which we expect to be rare). Call only while 282 * holding lock. 283 * 284 * @param array the heap array 285 * @param oldCap the length of the array 286 */ 287 private void tryGrow(Object[] array, int oldCap) { 288 lock.unlock(); // must release and then re-acquire main lock 289 Object[] newArray = null; 290 if (allocationSpinLock == 0 && 291 ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) { 292 try { 293 int growth = (oldCap < 64) 294 ? (oldCap + 2) // grow faster if small 295 : (oldCap >> 1); 296 int newCap = ArraysSupport.newLength(oldCap, 1, growth); 297 if (queue == array) 298 newArray = new Object[newCap]; 299 } finally { 300 allocationSpinLock = 0; 301 } 302 } 303 if (newArray == null) // back off if another thread is allocating 304 Thread.yield(); 305 lock.lock(); 306 if (newArray != null && queue == array) { 307 queue = newArray; 308 System.arraycopy(array, 0, newArray, 0, oldCap); 309 } 310 } 311 312 /** 313 * Mechanics for poll(). Call only while holding lock. 314 */ 315 private E dequeue() { 316 // assert lock.isHeldByCurrentThread(); 317 final Object[] es; 318 final E result; 319 320 if ((result = (E) ((es = queue)[0])) != null) { 321 final int n; 322 final E x = (E) es[(n = --size)]; 323 es[n] = null; 324 if (n > 0) { 325 final Comparator<? super E> cmp; 326 if ((cmp = comparator) == null) 327 siftDownComparable(0, x, es, n); 328 else 329 siftDownUsingComparator(0, x, es, n, cmp); 330 } 331 } 332 return result; 333 } 334 335 /** 336 * Inserts item x at position k, maintaining heap invariant by 337 * promoting x up the tree until it is greater than or equal to 338 * its parent, or is the root. 339 * 340 * To simplify and speed up coercions and comparisons, the 341 * Comparable and Comparator versions are separated into different 342 * methods that are otherwise identical. (Similarly for siftDown.) 343 * 344 * @param k the position to fill 345 * @param x the item to insert 346 * @param es the heap array 347 */ 348 private static <T> void siftUpComparable(int k, T x, Object[] es) { 349 Comparable<? super T> key = (Comparable<? super T>) x; 350 while (k > 0) { 351 int parent = (k - 1) >>> 1; 352 Object e = es[parent]; 353 if (key.compareTo((T) e) >= 0) 354 break; 355 es[k] = e; 356 k = parent; 357 } 358 es[k] = key; 359 } 360 361 private static <T> void siftUpUsingComparator( 362 int k, T x, Object[] es, Comparator<? super T> cmp) { 363 while (k > 0) { 364 int parent = (k - 1) >>> 1; 365 Object e = es[parent]; 366 if (cmp.compare(x, (T) e) >= 0) 367 break; 368 es[k] = e; 369 k = parent; 370 } 371 es[k] = x; 372 } 373 374 /** 375 * Inserts item x at position k, maintaining heap invariant by 376 * demoting x down the tree repeatedly until it is less than or 377 * equal to its children or is a leaf. 378 * 379 * @param k the position to fill 380 * @param x the item to insert 381 * @param es the heap array 382 * @param n heap size 383 */ 384 private static <T> void siftDownComparable(int k, T x, Object[] es, int n) { 385 // assert n > 0; 386 Comparable<? super T> key = (Comparable<? super T>)x; 387 int half = n >>> 1; // loop while a non-leaf 388 while (k < half) { 389 int child = (k << 1) + 1; // assume left child is least 390 Object c = es[child]; 391 int right = child + 1; 392 if (right < n && 393 ((Comparable<? super T>) c).compareTo((T) es[right]) > 0) 394 c = es[child = right]; 395 if (key.compareTo((T) c) <= 0) 396 break; 397 es[k] = c; 398 k = child; 399 } 400 es[k] = key; 401 } 402 403 private static <T> void siftDownUsingComparator( 404 int k, T x, Object[] es, int n, Comparator<? super T> cmp) { 405 // assert n > 0; 406 int half = n >>> 1; 407 while (k < half) { 408 int child = (k << 1) + 1; 409 Object c = es[child]; 410 int right = child + 1; 411 if (right < n && cmp.compare((T) c, (T) es[right]) > 0) 412 c = es[child = right]; 413 if (cmp.compare(x, (T) c) <= 0) 414 break; 415 es[k] = c; 416 k = child; 417 } 418 es[k] = x; 419 } 420 421 /** 422 * Establishes the heap invariant (described above) in the entire tree, 423 * assuming nothing about the order of the elements prior to the call. 424 * This classic algorithm due to Floyd (1964) is known to be O(size). 425 */ 426 private void heapify() { 427 final Object[] es = queue; 428 int n = size, i = (n >>> 1) - 1; 429 final Comparator<? super E> cmp; 430 if ((cmp = comparator) == null) 431 for (; i >= 0; i--) 432 siftDownComparable(i, (E) es[i], es, n); 433 else 434 for (; i >= 0; i--) 435 siftDownUsingComparator(i, (E) es[i], es, n, cmp); 436 } 437 438 /** 439 * Inserts the specified element into this priority queue. 440 * 441 * @param e the element to add 442 * @return {@code true} (as specified by {@link Collection#add}) 443 * @throws ClassCastException if the specified element cannot be compared 444 * with elements currently in the priority queue according to the 445 * priority queue's ordering 446 * @throws NullPointerException if the specified element is null 447 */ 448 public boolean add(E e) { 449 return offer(e); 450 } 451 452 /** 453 * Inserts the specified element into this priority queue. 454 * As the queue is unbounded, this method will never return {@code false}. 455 * 456 * @param e the element to add 457 * @return {@code true} (as specified by {@link Queue#offer}) 458 * @throws ClassCastException if the specified element cannot be compared 459 * with elements currently in the priority queue according to the 460 * priority queue's ordering 461 * @throws NullPointerException if the specified element is null 462 */ 463 public boolean offer(E e) { 464 if (e == null) 465 throw new NullPointerException(); 466 final ReentrantLock lock = this.lock; 467 lock.lock(); 468 int n, cap; 469 Object[] es; 470 while ((n = size) >= (cap = (es = queue).length)) 471 tryGrow(es, cap); 472 try { 473 final Comparator<? super E> cmp; 474 if ((cmp = comparator) == null) 475 siftUpComparable(n, e, es); 476 else 477 siftUpUsingComparator(n, e, es, cmp); 478 size = n + 1; 479 notEmpty.signal(); 480 } finally { 481 lock.unlock(); 482 } 483 return true; 484 } 485 486 /** 487 * Inserts the specified element into this priority queue. 488 * As the queue is unbounded, this method will never block. 489 * 490 * @param e the element to add 491 * @throws ClassCastException if the specified element cannot be compared 492 * with elements currently in the priority queue according to the 493 * priority queue's ordering 494 * @throws NullPointerException if the specified element is null 495 */ 496 public void put(E e) { 497 offer(e); // never need to block 498 } 499 500 /** 501 * Inserts the specified element into this priority queue. 502 * As the queue is unbounded, this method will never block or 503 * return {@code false}. 504 * 505 * @param e the element to add 506 * @param timeout This parameter is ignored as the method never blocks 507 * @param unit This parameter is ignored as the method never blocks 508 * @return {@code true} (as specified by 509 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) 510 * @throws ClassCastException if the specified element cannot be compared 511 * with elements currently in the priority queue according to the 512 * priority queue's ordering 513 * @throws NullPointerException if the specified element is null 514 */ 515 public boolean offer(E e, long timeout, TimeUnit unit) { 516 return offer(e); // never need to block 517 } 518 519 public E poll() { 520 final ReentrantLock lock = this.lock; 521 lock.lock(); 522 try { 523 return dequeue(); 524 } finally { 525 lock.unlock(); 526 } 527 } 528 529 public E take() throws InterruptedException { 530 final ReentrantLock lock = this.lock; 531 lock.lockInterruptibly(); 532 E result; 533 try { 534 while ( (result = dequeue()) == null) 535 notEmpty.await(); 536 } finally { 537 lock.unlock(); 538 } 539 return result; 540 } 541 542 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 543 long nanos = unit.toNanos(timeout); 544 final ReentrantLock lock = this.lock; 545 lock.lockInterruptibly(); 546 E result; 547 try { 548 while ( (result = dequeue()) == null && nanos > 0) 549 nanos = notEmpty.awaitNanos(nanos); 550 } finally { 551 lock.unlock(); 552 } 553 return result; 554 } 555 556 public E peek() { 557 final ReentrantLock lock = this.lock; 558 lock.lock(); 559 try { 560 return (E) queue[0]; 561 } finally { 562 lock.unlock(); 563 } 564 } 565 566 /** 567 * Returns the comparator used to order the elements in this queue, 568 * or {@code null} if this queue uses the {@linkplain Comparable 569 * natural ordering} of its elements. 570 * 571 * @return the comparator used to order the elements in this queue, 572 * or {@code null} if this queue uses the natural 573 * ordering of its elements 574 */ 575 public Comparator<? super E> comparator() { 576 return comparator; 577 } 578 579 public int size() { 580 final ReentrantLock lock = this.lock; 581 lock.lock(); 582 try { 583 return size; 584 } finally { 585 lock.unlock(); 586 } 587 } 588 589 /** 590 * Always returns {@code Integer.MAX_VALUE} because 591 * a {@code PriorityBlockingQueue} is not capacity constrained. 592 * @return {@code Integer.MAX_VALUE} always 593 */ 594 public int remainingCapacity() { 595 return Integer.MAX_VALUE; 596 } 597 598 private int indexOf(Object o) { 599 if (o != null) { 600 final Object[] es = queue; 601 for (int i = 0, n = size; i < n; i++) 602 if (o.equals(es[i])) 603 return i; 604 } 605 return -1; 606 } 607 608 /** 609 * Removes the ith element from queue. 610 */ 611 private void removeAt(int i) { 612 final Object[] es = queue; 613 final int n = size - 1; 614 if (n == i) // removed last element 615 es[i] = null; 616 else { 617 E moved = (E) es[n]; 618 es[n] = null; 619 final Comparator<? super E> cmp; 620 if ((cmp = comparator) == null) 621 siftDownComparable(i, moved, es, n); 622 else 623 siftDownUsingComparator(i, moved, es, n, cmp); 624 if (es[i] == moved) { 625 if (cmp == null) 626 siftUpComparable(i, moved, es); 627 else 628 siftUpUsingComparator(i, moved, es, cmp); 629 } 630 } 631 size = n; 632 } 633 634 /** 635 * Removes a single instance of the specified element from this queue, 636 * if it is present. More formally, removes an element {@code e} such 637 * that {@code o.equals(e)}, if this queue contains one or more such 638 * elements. Returns {@code true} if and only if this queue contained 639 * the specified element (or equivalently, if this queue changed as a 640 * result of the call). 641 * 642 * @param o element to be removed from this queue, if present 643 * @return {@code true} if this queue changed as a result of the call 644 */ 645 public boolean remove(Object o) { 646 final ReentrantLock lock = this.lock; 647 lock.lock(); 648 try { 649 int i = indexOf(o); 650 if (i == -1) 651 return false; 652 removeAt(i); 653 return true; 654 } finally { 655 lock.unlock(); 656 } 657 } 658 659 /** 660 * Identity-based version for use in Itr.remove. 661 * 662 * @param o element to be removed from this queue, if present 663 */ 664 void removeEq(Object o) { 665 final ReentrantLock lock = this.lock; 666 lock.lock(); 667 try { 668 final Object[] es = queue; 669 for (int i = 0, n = size; i < n; i++) { 670 if (o == es[i]) { 671 removeAt(i); 672 break; 673 } 674 } 675 } finally { 676 lock.unlock(); 677 } 678 } 679 680 /** 681 * Returns {@code true} if this queue contains the specified element. 682 * More formally, returns {@code true} if and only if this queue contains 683 * at least one element {@code e} such that {@code o.equals(e)}. 684 * 685 * @param o object to be checked for containment in this queue 686 * @return {@code true} if this queue contains the specified element 687 */ 688 public boolean contains(Object o) { 689 final ReentrantLock lock = this.lock; 690 lock.lock(); 691 try { 692 return indexOf(o) != -1; 693 } finally { 694 lock.unlock(); 695 } 696 } 697 698 public String toString() { 699 return Helpers.collectionToString(this); 700 } 701 702 /** 703 * @throws UnsupportedOperationException {@inheritDoc} 704 * @throws ClassCastException {@inheritDoc} 705 * @throws NullPointerException {@inheritDoc} 706 * @throws IllegalArgumentException {@inheritDoc} 707 */ 708 public int drainTo(Collection<? super E> c) { 709 return drainTo(c, Integer.MAX_VALUE); 710 } 711 712 /** 713 * @throws UnsupportedOperationException {@inheritDoc} 714 * @throws ClassCastException {@inheritDoc} 715 * @throws NullPointerException {@inheritDoc} 716 * @throws IllegalArgumentException {@inheritDoc} 717 */ 718 public int drainTo(Collection<? super E> c, int maxElements) { 719 Objects.requireNonNull(c); 720 if (c == this) 721 throw new IllegalArgumentException(); 722 if (maxElements <= 0) 723 return 0; 724 final ReentrantLock lock = this.lock; 725 lock.lock(); 726 try { 727 int n = Math.min(size, maxElements); 728 for (int i = 0; i < n; i++) { 729 c.add((E) queue[0]); // In this order, in case add() throws. 730 dequeue(); 731 } 732 return n; 733 } finally { 734 lock.unlock(); 735 } 736 } 737 738 /** 739 * Atomically removes all of the elements from this queue. 740 * The queue will be empty after this call returns. 741 */ 742 public void clear() { 743 final ReentrantLock lock = this.lock; 744 lock.lock(); 745 try { 746 final Object[] es = queue; 747 for (int i = 0, n = size; i < n; i++) 748 es[i] = null; 749 size = 0; 750 } finally { 751 lock.unlock(); 752 } 753 } 754 755 /** 756 * Returns an array containing all of the elements in this queue. 757 * The returned array elements are in no particular order. 758 * 759 * <p>The returned array will be "safe" in that no references to it are 760 * maintained by this queue. (In other words, this method must allocate 761 * a new array). The caller is thus free to modify the returned array. 762 * 763 * <p>This method acts as bridge between array-based and collection-based 764 * APIs. 765 * 766 * @return an array containing all of the elements in this queue 767 */ 768 public Object[] toArray() { 769 final ReentrantLock lock = this.lock; 770 lock.lock(); 771 try { 772 return Arrays.copyOf(queue, size); 773 } finally { 774 lock.unlock(); 775 } 776 } 777 778 /** 779 * Returns an array containing all of the elements in this queue; the 780 * runtime type of the returned array is that of the specified array. 781 * The returned array elements are in no particular order. 782 * If the queue fits in the specified array, it is returned therein. 783 * Otherwise, a new array is allocated with the runtime type of the 784 * specified array and the size of this queue. 785 * 786 * <p>If this queue fits in the specified array with room to spare 787 * (i.e., the array has more elements than this queue), the element in 788 * the array immediately following the end of the queue is set to 789 * {@code null}. 790 * 791 * <p>Like the {@link #toArray()} method, this method acts as bridge between 792 * array-based and collection-based APIs. Further, this method allows 793 * precise control over the runtime type of the output array, and may, 794 * under certain circumstances, be used to save allocation costs. 795 * 796 * <p>Suppose {@code x} is a queue known to contain only strings. 797 * The following code can be used to dump the queue into a newly 798 * allocated array of {@code String}: 799 * 800 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 801 * 802 * Note that {@code toArray(new Object[0])} is identical in function to 803 * {@code toArray()}. 804 * 805 * @param a the array into which the elements of the queue are to 806 * be stored, if it is big enough; otherwise, a new array of the 807 * same runtime type is allocated for this purpose 808 * @return an array containing all of the elements in this queue 809 * @throws ArrayStoreException if the runtime type of the specified array 810 * is not a supertype of the runtime type of every element in 811 * this queue 812 * @throws NullPointerException if the specified array is null 813 */ 814 public <T> T[] toArray(T[] a) { 815 final ReentrantLock lock = this.lock; 816 lock.lock(); 817 try { 818 int n = size; 819 if (a.length < n) 820 // Make a new array of a's runtime type, but my contents: 821 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 822 System.arraycopy(queue, 0, a, 0, n); 823 if (a.length > n) 824 a[n] = null; 825 return a; 826 } finally { 827 lock.unlock(); 828 } 829 } 830 831 /** 832 * Returns an iterator over the elements in this queue. The 833 * iterator does not return the elements in any particular order. 834 * 835 * <p>The returned iterator is 836 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 837 * 838 * @return an iterator over the elements in this queue 839 */ 840 public Iterator<E> iterator() { 841 return new Itr(toArray()); 842 } 843 844 /** 845 * Snapshot iterator that works off copy of underlying q array. 846 */ 847 final class Itr implements Iterator<E> { 848 final Object[] array; // Array of all elements 849 int cursor; // index of next element to return 850 int lastRet = -1; // index of last element, or -1 if no such 851 852 Itr(Object[] array) { 853 this.array = array; 854 } 855 856 public boolean hasNext() { 857 return cursor < array.length; 858 } 859 860 public E next() { 861 if (cursor >= array.length) 862 throw new NoSuchElementException(); 863 return (E)array[lastRet = cursor++]; 864 } 865 866 public void remove() { 867 if (lastRet < 0) 868 throw new IllegalStateException(); 869 removeEq(array[lastRet]); 870 lastRet = -1; 871 } 872 873 public void forEachRemaining(Consumer<? super E> action) { 874 Objects.requireNonNull(action); 875 final Object[] es = array; 876 int i; 877 if ((i = cursor) < es.length) { 878 lastRet = -1; 879 cursor = es.length; 880 for (; i < es.length; i++) 881 action.accept((E) es[i]); 882 lastRet = es.length - 1; 883 } 884 } 885 } 886 887 /** 888 * Saves this queue to a stream (that is, serializes it). 889 * 890 * For compatibility with previous version of this class, elements 891 * are first copied to a java.util.PriorityQueue, which is then 892 * serialized. 893 * 894 * @param s the stream 895 * @throws java.io.IOException if an I/O error occurs 896 */ 897 private void writeObject(java.io.ObjectOutputStream s) 898 throws java.io.IOException { 899 lock.lock(); 900 try { 901 // avoid zero capacity argument 902 q = new PriorityQueue<E>(Math.max(size, 1), comparator); 903 q.addAll(this); 904 s.defaultWriteObject(); 905 } finally { 906 q = null; 907 lock.unlock(); 908 } 909 } 910 911 /** 912 * Reconstitutes this queue from a stream (that is, deserializes it). 913 * @param s the stream 914 * @throws ClassNotFoundException if the class of a serialized object 915 * could not be found 916 * @throws java.io.IOException if an I/O error occurs 917 */ 918 private void readObject(java.io.ObjectInputStream s) 919 throws java.io.IOException, ClassNotFoundException { 920 try { 921 s.defaultReadObject(); 922 int sz = q.size(); 923 SharedSecrets.getJavaObjectInputStreamAccess().checkArray(s, Object[].class, sz); 924 this.queue = new Object[Math.max(1, sz)]; 925 comparator = q.comparator(); 926 addAll(q); 927 } finally { 928 q = null; 929 } 930 } 931 932 /** 933 * Immutable snapshot spliterator that binds to elements "late". 934 */ 935 final class PBQSpliterator implements Spliterator<E> { 936 Object[] array; // null until late-bound-initialized 937 int index; 938 int fence; 939 940 PBQSpliterator() {} 941 942 PBQSpliterator(Object[] array, int index, int fence) { 943 this.array = array; 944 this.index = index; 945 this.fence = fence; 946 } 947 948 private int getFence() { 949 if (array == null) 950 fence = (array = toArray()).length; 951 return fence; 952 } 953 954 public PBQSpliterator trySplit() { 955 int hi = getFence(), lo = index, mid = (lo + hi) >>> 1; 956 return (lo >= mid) ? null : 957 new PBQSpliterator(array, lo, index = mid); 958 } 959 960 public void forEachRemaining(Consumer<? super E> action) { 961 Objects.requireNonNull(action); 962 final int hi = getFence(), lo = index; 963 final Object[] es = array; 964 index = hi; // ensure exhaustion 965 for (int i = lo; i < hi; i++) 966 action.accept((E) es[i]); 967 } 968 969 public boolean tryAdvance(Consumer<? super E> action) { 970 Objects.requireNonNull(action); 971 if (getFence() > index && index >= 0) { 972 action.accept((E) array[index++]); 973 return true; 974 } 975 return false; 976 } 977 978 public long estimateSize() { return getFence() - index; } 979 980 public int characteristics() { 981 return (Spliterator.NONNULL | 982 Spliterator.SIZED | 983 Spliterator.SUBSIZED); 984 } 985 } 986 987 /** 988 * Returns a {@link Spliterator} over the elements in this queue. 989 * The spliterator does not traverse elements in any particular order 990 * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported). 991 * 992 * <p>The returned spliterator is 993 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 994 * 995 * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and 996 * {@link Spliterator#NONNULL}. 997 * 998 * @implNote 999 * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}. 1000 * 1001 * @return a {@code Spliterator} over the elements in this queue 1002 * @since 1.8 1003 */ 1004 public Spliterator<E> spliterator() { 1005 return new PBQSpliterator(); 1006 } 1007 1008 /** 1009 * @throws NullPointerException {@inheritDoc} 1010 */ 1011 public boolean removeIf(Predicate<? super E> filter) { 1012 Objects.requireNonNull(filter); 1013 return bulkRemove(filter); 1014 } 1015 1016 /** 1017 * @throws NullPointerException {@inheritDoc} 1018 */ 1019 public boolean removeAll(Collection<?> c) { 1020 Objects.requireNonNull(c); 1021 return bulkRemove(e -> c.contains(e)); 1022 } 1023 1024 /** 1025 * @throws NullPointerException {@inheritDoc} 1026 */ 1027 public boolean retainAll(Collection<?> c) { 1028 Objects.requireNonNull(c); 1029 return bulkRemove(e -> !c.contains(e)); 1030 } 1031 1032 // A tiny bit set implementation 1033 1034 private static long[] nBits(int n) { 1035 return new long[((n - 1) >> 6) + 1]; 1036 } 1037 private static void setBit(long[] bits, int i) { 1038 bits[i >> 6] |= 1L << i; 1039 } 1040 private static boolean isClear(long[] bits, int i) { 1041 return (bits[i >> 6] & (1L << i)) == 0; 1042 } 1043 1044 /** Implementation of bulk remove methods. */ 1045 private boolean bulkRemove(Predicate<? super E> filter) { 1046 final ReentrantLock lock = this.lock; 1047 lock.lock(); 1048 try { 1049 final Object[] es = queue; 1050 final int end = size; 1051 int i; 1052 // Optimize for initial run of survivors 1053 for (i = 0; i < end && !filter.test((E) es[i]); i++) 1054 ; 1055 if (i >= end) 1056 return false; 1057 // Tolerate predicates that reentrantly access the 1058 // collection for read, so traverse once to find elements 1059 // to delete, a second pass to physically expunge. 1060 final int beg = i; 1061 final long[] deathRow = nBits(end - beg); 1062 deathRow[0] = 1L; // set bit 0 1063 for (i = beg + 1; i < end; i++) 1064 if (filter.test((E) es[i])) 1065 setBit(deathRow, i - beg); 1066 int w = beg; 1067 for (i = beg; i < end; i++) 1068 if (isClear(deathRow, i - beg)) 1069 es[w++] = es[i]; 1070 for (i = size = w; i < end; i++) 1071 es[i] = null; 1072 heapify(); 1073 return true; 1074 } finally { 1075 lock.unlock(); 1076 } 1077 } 1078 1079 /** 1080 * @throws NullPointerException {@inheritDoc} 1081 */ 1082 public void forEach(Consumer<? super E> action) { 1083 Objects.requireNonNull(action); 1084 final ReentrantLock lock = this.lock; 1085 lock.lock(); 1086 try { 1087 final Object[] es = queue; 1088 for (int i = 0, n = size; i < n; i++) 1089 action.accept((E) es[i]); 1090 } finally { 1091 lock.unlock(); 1092 } 1093 } 1094 1095 // VarHandle mechanics 1096 private static final VarHandle ALLOCATIONSPINLOCK; 1097 static { 1098 try { 1099 MethodHandles.Lookup l = MethodHandles.lookup(); 1100 ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class, 1101 "allocationSpinLock", 1102 int.class); 1103 } catch (ReflectiveOperationException e) { 1104 throw new ExceptionInInitializerError(e); 1105 } 1106 } 1107 }