1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.invoke.MethodHandles; 39 import java.lang.invoke.VarHandle; 40 import java.util.AbstractQueue; 41 import java.util.Arrays; 42 import java.util.Collection; 43 import java.util.Comparator; 44 import java.util.Iterator; 45 import java.util.NoSuchElementException; 46 import java.util.Objects; 47 import java.util.PriorityQueue; 48 import java.util.Queue; 49 import java.util.SortedSet; 50 import java.util.Spliterator; 51 import java.util.concurrent.locks.Condition; 52 import java.util.concurrent.locks.ReentrantLock; 53 import java.util.function.Consumer; 54 import java.util.function.Predicate; 55 import jdk.internal.access.SharedSecrets; 56 import jdk.internal.util.ArraysSupport; 57 58 /** 59 * An unbounded {@linkplain BlockingQueue blocking queue} that uses 60 * the same ordering rules as class {@link PriorityQueue} and supplies 61 * blocking retrieval operations. While this queue is logically 62 * unbounded, attempted additions may fail due to resource exhaustion 63 * (causing {@code OutOfMemoryError}). This class does not permit 64 * {@code null} elements. A priority queue relying on {@linkplain 65 * Comparable natural ordering} also does not permit insertion of 66 * non-comparable objects (doing so results in 67 * {@code ClassCastException}). 68 * 69 * <p>This class and its iterator implement all of the <em>optional</em> 70 * methods of the {@link Collection} and {@link Iterator} interfaces. 71 * The Iterator provided in method {@link #iterator()} and the 72 * Spliterator provided in method {@link #spliterator()} are <em>not</em> 73 * guaranteed to traverse the elements of the PriorityBlockingQueue in 74 * any particular order. If you need ordered traversal, consider using 75 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} can 76 * be used to <em>remove</em> some or all elements in priority order and 77 * place them in another collection. 78 * 79 * <p>Operations on this class make no guarantees about the ordering 80 * of elements with equal priority. If you need to enforce an 81 * ordering, you can define custom classes or comparators that use a 82 * secondary key to break ties in primary priority values. For 83 * example, here is a class that applies first-in-first-out 84 * tie-breaking to comparable elements. To use it, you would insert a 85 * {@code new FIFOEntry(anEntry)} instead of a plain entry object. 86 * 87 * <pre> {@code 88 * class FIFOEntry<E extends Comparable<? super E>> 89 * implements Comparable<FIFOEntry<E>> { 90 * static final AtomicLong seq = new AtomicLong(0); 91 * final long seqNum; 92 * final E entry; 93 * public FIFOEntry(E entry) { 94 * seqNum = seq.getAndIncrement(); 95 * this.entry = entry; 96 * } 97 * public E getEntry() { return entry; } 98 * public int compareTo(FIFOEntry<E> other) { 99 * int res = entry.compareTo(other.entry); 100 * if (res == 0 && other.entry != this.entry) 101 * res = (seqNum < other.seqNum ? -1 : 1); 102 * return res; 103 * } 104 * }}</pre> 105 * 106 * <p>This class is a member of the 107 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 108 * Java Collections Framework</a>. 109 * 110 * @since 1.5 111 * @author Doug Lea 112 * @param <E> the type of elements held in this queue 113 */ 114 @SuppressWarnings("unchecked") 115 public class PriorityBlockingQueue<E> extends AbstractQueue<E> 116 implements BlockingQueue<E>, java.io.Serializable { 117 private static final long serialVersionUID = 5595510919245408276L; 118 119 /* 120 * The implementation uses an array-based binary heap, with public 121 * operations protected with a single lock. However, allocation 122 * during resizing uses a simple spinlock (used only while not 123 * holding main lock) in order to allow takes to operate 124 * concurrently with allocation. This avoids repeated 125 * postponement of waiting consumers and consequent element 126 * build-up. The need to back away from lock during allocation 127 * makes it impossible to simply wrap delegated 128 * java.util.PriorityQueue operations within a lock, as was done 129 * in a previous version of this class. To maintain 130 * interoperability, a plain PriorityQueue is still used during 131 * serialization, which maintains compatibility at the expense of 132 * transiently doubling overhead. 133 */ 134 135 /** 136 * Default array capacity. 137 */ 138 private static final int DEFAULT_INITIAL_CAPACITY = 11; 139 140 /** 141 * Priority queue represented as a balanced binary heap: the two 142 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The 143 * priority queue is ordered by comparator, or by the elements' 144 * natural ordering, if comparator is null: For each node n in the 145 * heap and each descendant d of n, n <= d. The element with the 146 * lowest value is in queue[0], assuming the queue is nonempty. 147 */ 148 private transient Object[] queue; 149 150 /** 151 * The number of elements in the priority queue. 152 */ 153 private transient int size; 154 155 /** 156 * The comparator, or null if priority queue uses elements' 157 * natural ordering. 158 */ 159 private transient Comparator<? super E> comparator; 160 161 /** 162 * Lock used for all public operations. 163 */ 164 private final ReentrantLock lock = new ReentrantLock(); 165 166 /** 167 * Condition for blocking when empty. 168 */ 169 @SuppressWarnings("serial") // Classes implementing Condition may be serializable. 170 private final Condition notEmpty = lock.newCondition(); 171 172 /** 173 * Spinlock for allocation, acquired via CAS. 174 */ 175 private transient volatile int allocationSpinLock; 176 177 /** 178 * A plain PriorityQueue used only for serialization, 179 * to maintain compatibility with previous versions 180 * of this class. Non-null only during serialization/deserialization. 181 */ 182 private PriorityQueue<E> q; 183 184 /** 185 * Creates a {@code PriorityBlockingQueue} with the default 186 * initial capacity (11) that orders its elements according to 187 * their {@linkplain Comparable natural ordering}. 188 */ 189 public PriorityBlockingQueue() { 190 this(DEFAULT_INITIAL_CAPACITY, null); 191 } 192 193 /** 194 * Creates a {@code PriorityBlockingQueue} with the specified 195 * initial capacity that orders its elements according to their 196 * {@linkplain Comparable natural ordering}. 197 * 198 * @param initialCapacity the initial capacity for this priority queue 199 * @throws IllegalArgumentException if {@code initialCapacity} is less 200 * than 1 201 */ 202 public PriorityBlockingQueue(int initialCapacity) { 203 this(initialCapacity, null); 204 } 205 206 /** 207 * Creates a {@code PriorityBlockingQueue} with the specified initial 208 * capacity that orders its elements according to the specified 209 * comparator. 210 * 211 * @param initialCapacity the initial capacity for this priority queue 212 * @param comparator the comparator that will be used to order this 213 * priority queue. If {@code null}, the {@linkplain Comparable 214 * natural ordering} of the elements will be used. 215 * @throws IllegalArgumentException if {@code initialCapacity} is less 216 * than 1 217 */ 218 public PriorityBlockingQueue(int initialCapacity, 219 Comparator<? super E> comparator) { 220 if (initialCapacity < 1) 221 throw new IllegalArgumentException(); 222 this.comparator = comparator; 223 this.queue = new Object[Math.max(1, initialCapacity)]; 224 } 225 226 /** 227 * Creates a {@code PriorityBlockingQueue} containing the elements 228 * in the specified collection. If the specified collection is a 229 * {@link SortedSet} or a {@link PriorityQueue}, this 230 * priority queue will be ordered according to the same ordering. 231 * Otherwise, this priority queue will be ordered according to the 232 * {@linkplain Comparable natural ordering} of its elements. 233 * 234 * @param c the collection whose elements are to be placed 235 * into this priority queue 236 * @throws ClassCastException if elements of the specified collection 237 * cannot be compared to one another according to the priority 238 * queue's ordering 239 * @throws NullPointerException if the specified collection or any 240 * of its elements are null 241 */ 242 public PriorityBlockingQueue(Collection<? extends E> c) { 243 boolean heapify = true; // true if not known to be in heap order 244 boolean screen = true; // true if must screen for nulls 245 if (c instanceof SortedSet<?>) { 246 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; 247 this.comparator = (Comparator<? super E>) ss.comparator(); 248 heapify = false; 249 } 250 else if (c instanceof PriorityBlockingQueue<?>) { 251 PriorityBlockingQueue<? extends E> pq = 252 (PriorityBlockingQueue<? extends E>) c; 253 this.comparator = (Comparator<? super E>) pq.comparator(); 254 screen = false; 255 if (pq.getClass() == PriorityBlockingQueue.class) // exact match 256 heapify = false; 257 } 258 Object[] es = c.toArray(); 259 int n = es.length; 260 if (c.getClass() != java.util.ArrayList.class) 261 es = Arrays.copyOf(es, n, Object[].class); 262 if (screen && (n == 1 || this.comparator != null)) { 263 for (Object e : es) 264 if (e == null) 265 throw new NullPointerException(); 266 } 267 this.queue = ensureNonEmpty(es); 268 this.size = n; 269 if (heapify) 270 heapify(); 271 } 272 273 /** Ensures that queue[0] exists, helping peek() and poll(). */ 274 private static Object[] ensureNonEmpty(Object[] es) { 275 return (es.length > 0) ? es : new Object[1]; 276 } 277 278 /** 279 * Tries to grow array to accommodate at least one more element 280 * (but normally expand by about 50%), giving up (allowing retry) 281 * on contention (which we expect to be rare). Call only while 282 * holding lock. 283 * 284 * @param array the heap array 285 * @param oldCap the length of the array 286 */ 287 private void tryGrow(Object[] array, int oldCap) { 288 lock.unlock(); // must release and then re-acquire main lock 289 Object[] newArray = null; 290 if (allocationSpinLock == 0 && 291 ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) { 292 try { 293 int growth = oldCap < 64 ? oldCap + 2 : oldCap >> 1; 294 int newCap = ArraysSupport.newLength(oldCap, 1, growth); 295 if (queue == array) 296 newArray = new Object[newCap]; 297 } finally { 298 allocationSpinLock = 0; 299 } 300 } 301 if (newArray == null) // back off if another thread is allocating 302 Thread.yield(); 303 lock.lock(); 304 if (newArray != null && queue == array) { 305 queue = newArray; 306 System.arraycopy(array, 0, newArray, 0, oldCap); 307 } 308 } 309 310 /** 311 * Mechanics for poll(). Call only while holding lock. 312 */ 313 private E dequeue() { 314 // assert lock.isHeldByCurrentThread(); 315 final Object[] es; 316 final E result; 317 318 if ((result = (E) ((es = queue)[0])) != null) { 319 final int n; 320 final E x = (E) es[(n = --size)]; 321 es[n] = null; 322 if (n > 0) { 323 final Comparator<? super E> cmp; 324 if ((cmp = comparator) == null) 325 siftDownComparable(0, x, es, n); 326 else 327 siftDownUsingComparator(0, x, es, n, cmp); 328 } 329 } 330 return result; 331 } 332 333 /** 334 * Inserts item x at position k, maintaining heap invariant by 335 * promoting x up the tree until it is greater than or equal to 336 * its parent, or is the root. 337 * 338 * To simplify and speed up coercions and comparisons, the 339 * Comparable and Comparator versions are separated into different 340 * methods that are otherwise identical. (Similarly for siftDown.) 341 * 342 * @param k the position to fill 343 * @param x the item to insert 344 * @param es the heap array 345 */ 346 private static <T> void siftUpComparable(int k, T x, Object[] es) { 347 Comparable<? super T> key = (Comparable<? super T>) x; 348 while (k > 0) { 349 int parent = (k - 1) >>> 1; 350 Object e = es[parent]; 351 if (key.compareTo((T) e) >= 0) 352 break; 353 es[k] = e; 354 k = parent; 355 } 356 es[k] = key; 357 } 358 359 private static <T> void siftUpUsingComparator( 360 int k, T x, Object[] es, Comparator<? super T> cmp) { 361 while (k > 0) { 362 int parent = (k - 1) >>> 1; 363 Object e = es[parent]; 364 if (cmp.compare(x, (T) e) >= 0) 365 break; 366 es[k] = e; 367 k = parent; 368 } 369 es[k] = x; 370 } 371 372 /** 373 * Inserts item x at position k, maintaining heap invariant by 374 * demoting x down the tree repeatedly until it is less than or 375 * equal to its children or is a leaf. 376 * 377 * @param k the position to fill 378 * @param x the item to insert 379 * @param es the heap array 380 * @param n heap size 381 */ 382 private static <T> void siftDownComparable(int k, T x, Object[] es, int n) { 383 // assert n > 0; 384 Comparable<? super T> key = (Comparable<? super T>)x; 385 int half = n >>> 1; // loop while a non-leaf 386 while (k < half) { 387 int child = (k << 1) + 1; // assume left child is least 388 Object c = es[child]; 389 int right = child + 1; 390 if (right < n && 391 ((Comparable<? super T>) c).compareTo((T) es[right]) > 0) 392 c = es[child = right]; 393 if (key.compareTo((T) c) <= 0) 394 break; 395 es[k] = c; 396 k = child; 397 } 398 es[k] = key; 399 } 400 401 private static <T> void siftDownUsingComparator( 402 int k, T x, Object[] es, int n, Comparator<? super T> cmp) { 403 // assert n > 0; 404 int half = n >>> 1; 405 while (k < half) { 406 int child = (k << 1) + 1; 407 Object c = es[child]; 408 int right = child + 1; 409 if (right < n && cmp.compare((T) c, (T) es[right]) > 0) 410 c = es[child = right]; 411 if (cmp.compare(x, (T) c) <= 0) 412 break; 413 es[k] = c; 414 k = child; 415 } 416 es[k] = x; 417 } 418 419 /** 420 * Establishes the heap invariant (described above) in the entire tree, 421 * assuming nothing about the order of the elements prior to the call. 422 * This classic algorithm due to Floyd (1964) is known to be O(size). 423 */ 424 private void heapify() { 425 final Object[] es = queue; 426 int n = size, i = (n >>> 1) - 1; 427 final Comparator<? super E> cmp; 428 if ((cmp = comparator) == null) 429 for (; i >= 0; i--) 430 siftDownComparable(i, (E) es[i], es, n); 431 else 432 for (; i >= 0; i--) 433 siftDownUsingComparator(i, (E) es[i], es, n, cmp); 434 } 435 436 /** 437 * Inserts the specified element into this priority queue. 438 * 439 * @param e the element to add 440 * @return {@code true} (as specified by {@link Collection#add}) 441 * @throws ClassCastException if the specified element cannot be compared 442 * with elements currently in the priority queue according to the 443 * priority queue's ordering 444 * @throws NullPointerException if the specified element is null 445 */ 446 public boolean add(E e) { 447 return offer(e); 448 } 449 450 /** 451 * Inserts the specified element into this priority queue. 452 * As the queue is unbounded, this method will never return {@code false}. 453 * 454 * @param e the element to add 455 * @return {@code true} (as specified by {@link Queue#offer}) 456 * @throws ClassCastException if the specified element cannot be compared 457 * with elements currently in the priority queue according to the 458 * priority queue's ordering 459 * @throws NullPointerException if the specified element is null 460 */ 461 public boolean offer(E e) { 462 if (e == null) 463 throw new NullPointerException(); 464 final ReentrantLock lock = this.lock; 465 lock.lock(); 466 int n, cap; 467 Object[] es; 468 while ((n = size) >= (cap = (es = queue).length)) 469 tryGrow(es, cap); 470 try { 471 final Comparator<? super E> cmp; 472 if ((cmp = comparator) == null) 473 siftUpComparable(n, e, es); 474 else 475 siftUpUsingComparator(n, e, es, cmp); 476 size = n + 1; 477 notEmpty.signal(); 478 } finally { 479 lock.unlock(); 480 } 481 return true; 482 } 483 484 /** 485 * Inserts the specified element into this priority queue. 486 * As the queue is unbounded, this method will never block. 487 * 488 * @param e the element to add 489 * @throws ClassCastException if the specified element cannot be compared 490 * with elements currently in the priority queue according to the 491 * priority queue's ordering 492 * @throws NullPointerException if the specified element is null 493 */ 494 public void put(E e) { 495 offer(e); // never need to block 496 } 497 498 /** 499 * Inserts the specified element into this priority queue. 500 * As the queue is unbounded, this method will never block or 501 * return {@code false}. 502 * 503 * @param e the element to add 504 * @param timeout This parameter is ignored as the method never blocks 505 * @param unit This parameter is ignored as the method never blocks 506 * @return {@code true} (as specified by 507 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) 508 * @throws ClassCastException if the specified element cannot be compared 509 * with elements currently in the priority queue according to the 510 * priority queue's ordering 511 * @throws NullPointerException if the specified element is null 512 */ 513 public boolean offer(E e, long timeout, TimeUnit unit) { 514 return offer(e); // never need to block 515 } 516 517 public E poll() { 518 final ReentrantLock lock = this.lock; 519 lock.lock(); 520 try { 521 return dequeue(); 522 } finally { 523 lock.unlock(); 524 } 525 } 526 527 public E take() throws InterruptedException { 528 final ReentrantLock lock = this.lock; 529 lock.lockInterruptibly(); 530 E result; 531 try { 532 while ( (result = dequeue()) == null) 533 notEmpty.await(); 534 } finally { 535 lock.unlock(); 536 } 537 return result; 538 } 539 540 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 541 long nanos = unit.toNanos(timeout); 542 final ReentrantLock lock = this.lock; 543 lock.lockInterruptibly(); 544 E result; 545 try { 546 while ( (result = dequeue()) == null && nanos > 0) 547 nanos = notEmpty.awaitNanos(nanos); 548 } finally { 549 lock.unlock(); 550 } 551 return result; 552 } 553 554 public E peek() { 555 final ReentrantLock lock = this.lock; 556 lock.lock(); 557 try { 558 return (E) queue[0]; 559 } finally { 560 lock.unlock(); 561 } 562 } 563 564 /** 565 * Returns the comparator used to order the elements in this queue, 566 * or {@code null} if this queue uses the {@linkplain Comparable 567 * natural ordering} of its elements. 568 * 569 * @return the comparator used to order the elements in this queue, 570 * or {@code null} if this queue uses the natural 571 * ordering of its elements 572 */ 573 public Comparator<? super E> comparator() { 574 return comparator; 575 } 576 577 public int size() { 578 final ReentrantLock lock = this.lock; 579 lock.lock(); 580 try { 581 return size; 582 } finally { 583 lock.unlock(); 584 } 585 } 586 587 /** 588 * Always returns {@code Integer.MAX_VALUE} because 589 * a {@code PriorityBlockingQueue} is not capacity constrained. 590 * @return {@code Integer.MAX_VALUE} always 591 */ 592 public int remainingCapacity() { 593 return Integer.MAX_VALUE; 594 } 595 596 private int indexOf(Object o) { 597 if (o != null) { 598 final Object[] es = queue; 599 for (int i = 0, n = size; i < n; i++) 600 if (o.equals(es[i])) 601 return i; 602 } 603 return -1; 604 } 605 606 /** 607 * Removes the ith element from queue. 608 */ 609 private void removeAt(int i) { 610 final Object[] es = queue; 611 final int n = size - 1; 612 if (n == i) // removed last element 613 es[i] = null; 614 else { 615 E moved = (E) es[n]; 616 es[n] = null; 617 final Comparator<? super E> cmp; 618 if ((cmp = comparator) == null) 619 siftDownComparable(i, moved, es, n); 620 else 621 siftDownUsingComparator(i, moved, es, n, cmp); 622 if (es[i] == moved) { 623 if (cmp == null) 624 siftUpComparable(i, moved, es); 625 else 626 siftUpUsingComparator(i, moved, es, cmp); 627 } 628 } 629 size = n; 630 } 631 632 /** 633 * Removes a single instance of the specified element from this queue, 634 * if it is present. More formally, removes an element {@code e} such 635 * that {@code o.equals(e)}, if this queue contains one or more such 636 * elements. Returns {@code true} if and only if this queue contained 637 * the specified element (or equivalently, if this queue changed as a 638 * result of the call). 639 * 640 * @param o element to be removed from this queue, if present 641 * @return {@code true} if this queue changed as a result of the call 642 */ 643 public boolean remove(Object o) { 644 final ReentrantLock lock = this.lock; 645 lock.lock(); 646 try { 647 int i = indexOf(o); 648 if (i == -1) 649 return false; 650 removeAt(i); 651 return true; 652 } finally { 653 lock.unlock(); 654 } 655 } 656 657 /** 658 * Identity-based version for use in Itr.remove. 659 * 660 * @param o element to be removed from this queue, if present 661 */ 662 void removeEq(Object o) { 663 final ReentrantLock lock = this.lock; 664 lock.lock(); 665 try { 666 final Object[] es = queue; 667 for (int i = 0, n = size; i < n; i++) { 668 if (o == es[i]) { 669 removeAt(i); 670 break; 671 } 672 } 673 } finally { 674 lock.unlock(); 675 } 676 } 677 678 /** 679 * Returns {@code true} if this queue contains the specified element. 680 * More formally, returns {@code true} if and only if this queue contains 681 * at least one element {@code e} such that {@code o.equals(e)}. 682 * 683 * @param o object to be checked for containment in this queue 684 * @return {@code true} if this queue contains the specified element 685 */ 686 public boolean contains(Object o) { 687 final ReentrantLock lock = this.lock; 688 lock.lock(); 689 try { 690 return indexOf(o) != -1; 691 } finally { 692 lock.unlock(); 693 } 694 } 695 696 public String toString() { 697 return Helpers.collectionToString(this); 698 } 699 700 /** 701 * @throws UnsupportedOperationException {@inheritDoc} 702 * @throws ClassCastException {@inheritDoc} 703 * @throws NullPointerException {@inheritDoc} 704 * @throws IllegalArgumentException {@inheritDoc} 705 */ 706 public int drainTo(Collection<? super E> c) { 707 return drainTo(c, Integer.MAX_VALUE); 708 } 709 710 /** 711 * @throws UnsupportedOperationException {@inheritDoc} 712 * @throws ClassCastException {@inheritDoc} 713 * @throws NullPointerException {@inheritDoc} 714 * @throws IllegalArgumentException {@inheritDoc} 715 */ 716 public int drainTo(Collection<? super E> c, int maxElements) { 717 Objects.requireNonNull(c); 718 if (c == this) 719 throw new IllegalArgumentException(); 720 if (maxElements <= 0) 721 return 0; 722 final ReentrantLock lock = this.lock; 723 lock.lock(); 724 try { 725 int n = Math.min(size, maxElements); 726 for (int i = 0; i < n; i++) { 727 c.add((E) queue[0]); // In this order, in case add() throws. 728 dequeue(); 729 } 730 return n; 731 } finally { 732 lock.unlock(); 733 } 734 } 735 736 /** 737 * Atomically removes all of the elements from this queue. 738 * The queue will be empty after this call returns. 739 */ 740 public void clear() { 741 final ReentrantLock lock = this.lock; 742 lock.lock(); 743 try { 744 final Object[] es = queue; 745 for (int i = 0, n = size; i < n; i++) 746 es[i] = null; 747 size = 0; 748 } finally { 749 lock.unlock(); 750 } 751 } 752 753 /** 754 * Returns an array containing all of the elements in this queue. 755 * The returned array elements are in no particular order. 756 * 757 * <p>The returned array will be "safe" in that no references to it are 758 * maintained by this queue. (In other words, this method must allocate 759 * a new array). The caller is thus free to modify the returned array. 760 * 761 * <p>This method acts as bridge between array-based and collection-based 762 * APIs. 763 * 764 * @return an array containing all of the elements in this queue 765 */ 766 public Object[] toArray() { 767 final ReentrantLock lock = this.lock; 768 lock.lock(); 769 try { 770 return Arrays.copyOf(queue, size); 771 } finally { 772 lock.unlock(); 773 } 774 } 775 776 /** 777 * Returns an array containing all of the elements in this queue; the 778 * runtime type of the returned array is that of the specified array. 779 * The returned array elements are in no particular order. 780 * If the queue fits in the specified array, it is returned therein. 781 * Otherwise, a new array is allocated with the runtime type of the 782 * specified array and the size of this queue. 783 * 784 * <p>If this queue fits in the specified array with room to spare 785 * (i.e., the array has more elements than this queue), the element in 786 * the array immediately following the end of the queue is set to 787 * {@code null}. 788 * 789 * <p>Like the {@link #toArray()} method, this method acts as bridge between 790 * array-based and collection-based APIs. Further, this method allows 791 * precise control over the runtime type of the output array, and may, 792 * under certain circumstances, be used to save allocation costs. 793 * 794 * <p>Suppose {@code x} is a queue known to contain only strings. 795 * The following code can be used to dump the queue into a newly 796 * allocated array of {@code String}: 797 * 798 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 799 * 800 * Note that {@code toArray(new Object[0])} is identical in function to 801 * {@code toArray()}. 802 * 803 * @param a the array into which the elements of the queue are to 804 * be stored, if it is big enough; otherwise, a new array of the 805 * same runtime type is allocated for this purpose 806 * @return an array containing all of the elements in this queue 807 * @throws ArrayStoreException if the runtime type of the specified array 808 * is not a supertype of the runtime type of every element in 809 * this queue 810 * @throws NullPointerException if the specified array is null 811 */ 812 public <T> T[] toArray(T[] a) { 813 final ReentrantLock lock = this.lock; 814 lock.lock(); 815 try { 816 int n = size; 817 if (a.length < n) 818 // Make a new array of a's runtime type, but my contents: 819 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 820 System.arraycopy(queue, 0, a, 0, n); 821 if (a.length > n) 822 a[n] = null; 823 return a; 824 } finally { 825 lock.unlock(); 826 } 827 } 828 829 /** 830 * Returns an iterator over the elements in this queue. The 831 * iterator does not return the elements in any particular order. 832 * 833 * <p>The returned iterator is 834 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 835 * 836 * @return an iterator over the elements in this queue 837 */ 838 public Iterator<E> iterator() { 839 return new Itr(toArray()); 840 } 841 842 /** 843 * Snapshot iterator that works off copy of underlying q array. 844 */ 845 final class Itr implements Iterator<E> { 846 final Object[] array; // Array of all elements 847 int cursor; // index of next element to return 848 int lastRet = -1; // index of last element, or -1 if no such 849 850 Itr(Object[] array) { 851 this.array = array; 852 } 853 854 public boolean hasNext() { 855 return cursor < array.length; 856 } 857 858 public E next() { 859 if (cursor >= array.length) 860 throw new NoSuchElementException(); 861 return (E)array[lastRet = cursor++]; 862 } 863 864 public void remove() { 865 if (lastRet < 0) 866 throw new IllegalStateException(); 867 removeEq(array[lastRet]); 868 lastRet = -1; 869 } 870 871 public void forEachRemaining(Consumer<? super E> action) { 872 Objects.requireNonNull(action); 873 final Object[] es = array; 874 int i; 875 if ((i = cursor) < es.length) { 876 lastRet = -1; 877 cursor = es.length; 878 for (; i < es.length; i++) 879 action.accept((E) es[i]); 880 lastRet = es.length - 1; 881 } 882 } 883 } 884 885 /** 886 * Saves this queue to a stream (that is, serializes it). 887 * 888 * For compatibility with previous version of this class, elements 889 * are first copied to a java.util.PriorityQueue, which is then 890 * serialized. 891 * 892 * @param s the stream 893 * @throws java.io.IOException if an I/O error occurs 894 */ 895 private void writeObject(java.io.ObjectOutputStream s) 896 throws java.io.IOException { 897 lock.lock(); 898 try { 899 // avoid zero capacity argument 900 q = new PriorityQueue<E>(Math.max(size, 1), comparator); 901 q.addAll(this); 902 s.defaultWriteObject(); 903 } finally { 904 q = null; 905 lock.unlock(); 906 } 907 } 908 909 /** 910 * Reconstitutes this queue from a stream (that is, deserializes it). 911 * @param s the stream 912 * @throws ClassNotFoundException if the class of a serialized object 913 * could not be found 914 * @throws java.io.IOException if an I/O error occurs 915 */ 916 private void readObject(java.io.ObjectInputStream s) 917 throws java.io.IOException, ClassNotFoundException { 918 try { 919 s.defaultReadObject(); 920 int sz = q.size(); 921 SharedSecrets.getJavaObjectInputStreamAccess().checkArray(s, Object[].class, sz); 922 this.queue = new Object[Math.max(1, sz)]; 923 comparator = q.comparator(); 924 addAll(q); 925 } finally { 926 q = null; 927 } 928 } 929 930 /** 931 * Immutable snapshot spliterator that binds to elements "late". 932 */ 933 final class PBQSpliterator implements Spliterator<E> { 934 Object[] array; // null until late-bound-initialized 935 int index; 936 int fence; 937 938 PBQSpliterator() {} 939 940 PBQSpliterator(Object[] array, int index, int fence) { 941 this.array = array; 942 this.index = index; 943 this.fence = fence; 944 } 945 946 private int getFence() { 947 if (array == null) 948 fence = (array = toArray()).length; 949 return fence; 950 } 951 952 public PBQSpliterator trySplit() { 953 int hi = getFence(), lo = index, mid = (lo + hi) >>> 1; 954 return (lo >= mid) ? null : 955 new PBQSpliterator(array, lo, index = mid); 956 } 957 958 public void forEachRemaining(Consumer<? super E> action) { 959 Objects.requireNonNull(action); 960 final int hi = getFence(), lo = index; 961 final Object[] es = array; 962 index = hi; // ensure exhaustion 963 for (int i = lo; i < hi; i++) 964 action.accept((E) es[i]); 965 } 966 967 public boolean tryAdvance(Consumer<? super E> action) { 968 Objects.requireNonNull(action); 969 if (getFence() > index && index >= 0) { 970 action.accept((E) array[index++]); 971 return true; 972 } 973 return false; 974 } 975 976 public long estimateSize() { return getFence() - index; } 977 978 public int characteristics() { 979 return (Spliterator.NONNULL | 980 Spliterator.SIZED | 981 Spliterator.SUBSIZED); 982 } 983 } 984 985 /** 986 * Returns a {@link Spliterator} over the elements in this queue. 987 * The spliterator does not traverse elements in any particular order 988 * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported). 989 * 990 * <p>The returned spliterator is 991 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 992 * 993 * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and 994 * {@link Spliterator#NONNULL}. 995 * 996 * @implNote 997 * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}. 998 * 999 * @return a {@code Spliterator} over the elements in this queue 1000 * @since 1.8 1001 */ 1002 public Spliterator<E> spliterator() { 1003 return new PBQSpliterator(); 1004 } 1005 1006 /** 1007 * @throws NullPointerException {@inheritDoc} 1008 */ 1009 public boolean removeIf(Predicate<? super E> filter) { 1010 Objects.requireNonNull(filter); 1011 return bulkRemove(filter); 1012 } 1013 1014 /** 1015 * @throws NullPointerException {@inheritDoc} 1016 */ 1017 public boolean removeAll(Collection<?> c) { 1018 Objects.requireNonNull(c); 1019 return bulkRemove(e -> c.contains(e)); 1020 } 1021 1022 /** 1023 * @throws NullPointerException {@inheritDoc} 1024 */ 1025 public boolean retainAll(Collection<?> c) { 1026 Objects.requireNonNull(c); 1027 return bulkRemove(e -> !c.contains(e)); 1028 } 1029 1030 // A tiny bit set implementation 1031 1032 private static long[] nBits(int n) { 1033 return new long[((n - 1) >> 6) + 1]; 1034 } 1035 private static void setBit(long[] bits, int i) { 1036 bits[i >> 6] |= 1L << i; 1037 } 1038 private static boolean isClear(long[] bits, int i) { 1039 return (bits[i >> 6] & (1L << i)) == 0; 1040 } 1041 1042 /** Implementation of bulk remove methods. */ 1043 private boolean bulkRemove(Predicate<? super E> filter) { 1044 final ReentrantLock lock = this.lock; 1045 lock.lock(); 1046 try { 1047 final Object[] es = queue; 1048 final int end = size; 1049 int i; 1050 // Optimize for initial run of survivors 1051 for (i = 0; i < end && !filter.test((E) es[i]); i++) 1052 ; 1053 if (i >= end) 1054 return false; 1055 // Tolerate predicates that reentrantly access the 1056 // collection for read, so traverse once to find elements 1057 // to delete, a second pass to physically expunge. 1058 final int beg = i; 1059 final long[] deathRow = nBits(end - beg); 1060 deathRow[0] = 1L; // set bit 0 1061 for (i = beg + 1; i < end; i++) 1062 if (filter.test((E) es[i])) 1063 setBit(deathRow, i - beg); 1064 int w = beg; 1065 for (i = beg; i < end; i++) 1066 if (isClear(deathRow, i - beg)) 1067 es[w++] = es[i]; 1068 for (i = size = w; i < end; i++) 1069 es[i] = null; 1070 heapify(); 1071 return true; 1072 } finally { 1073 lock.unlock(); 1074 } 1075 } 1076 1077 /** 1078 * @throws NullPointerException {@inheritDoc} 1079 */ 1080 public void forEach(Consumer<? super E> action) { 1081 Objects.requireNonNull(action); 1082 final ReentrantLock lock = this.lock; 1083 lock.lock(); 1084 try { 1085 final Object[] es = queue; 1086 for (int i = 0, n = size; i < n; i++) 1087 action.accept((E) es[i]); 1088 } finally { 1089 lock.unlock(); 1090 } 1091 } 1092 1093 // VarHandle mechanics 1094 private static final VarHandle ALLOCATIONSPINLOCK; 1095 static { 1096 try { 1097 MethodHandles.Lookup l = MethodHandles.lookup(); 1098 ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class, 1099 "allocationSpinLock", 1100 int.class); 1101 } catch (ReflectiveOperationException e) { 1102 throw new ExceptionInInitializerError(e); 1103 } 1104 } 1105 }