1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.concurrent.locks.*; 39 import java.util.*; 40 41 /** 42 * An unbounded {@linkplain BlockingQueue blocking queue} that uses 43 * the same ordering rules as class {@link PriorityQueue} and supplies 44 * blocking retrieval operations. While this queue is logically 45 * unbounded, attempted additions may fail due to resource exhaustion 46 * (causing {@code OutOfMemoryError}). This class does not permit 47 * {@code null} elements. A priority queue relying on {@linkplain 48 * Comparable natural ordering} also does not permit insertion of 49 * non-comparable objects (doing so results in 50 * {@code ClassCastException}). 51 * 52 * <p>This class and its iterator implement all of the 53 * <em>optional</em> methods of the {@link Collection} and {@link 54 * Iterator} interfaces. The Iterator provided in method {@link 55 * #iterator()} is <em>not</em> guaranteed to traverse the elements of 56 * the PriorityBlockingQueue in any particular order. If you need 57 * ordered traversal, consider using 58 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} 59 * can be used to <em>remove</em> some or all elements in priority 60 * order and place them in another collection. 61 * 62 * <p>Operations on this class make no guarantees about the ordering 63 * of elements with equal priority. If you need to enforce an 64 * ordering, you can define custom classes or comparators that use a 65 * secondary key to break ties in primary priority values. For 66 * example, here is a class that applies first-in-first-out 67 * tie-breaking to comparable elements. To use it, you would insert a 68 * {@code new FIFOEntry(anEntry)} instead of a plain entry object. 69 * 70 * <pre> {@code 71 * class FIFOEntry<E extends Comparable<? super E>> 72 * implements Comparable<FIFOEntry<E>> { 73 * static final AtomicLong seq = new AtomicLong(0); 74 * final long seqNum; 75 * final E entry; 76 * public FIFOEntry(E entry) { 77 * seqNum = seq.getAndIncrement(); 78 * this.entry = entry; 79 * } 80 * public E getEntry() { return entry; } 81 * public int compareTo(FIFOEntry<E> other) { 82 * int res = entry.compareTo(other.entry); 83 * if (res == 0 && other.entry != this.entry) 84 * res = (seqNum < other.seqNum ? -1 : 1); 85 * return res; 86 * } 87 * }}</pre> 88 * 89 * <p>This class is a member of the 90 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 91 * Java Collections Framework</a>. 92 * 93 * @since 1.5 94 * @author Doug Lea 95 * @param <E> the type of elements held in this collection 96 */ 97 @SuppressWarnings("unchecked") 98 public class PriorityBlockingQueue<E> extends AbstractQueue<E> 99 implements BlockingQueue<E>, java.io.Serializable { 100 private static final long serialVersionUID = 5595510919245408276L; 101 102 /* 103 * The implementation uses an array-based binary heap, with public 104 * operations protected with a single lock. However, allocation 105 * during resizing uses a simple spinlock (used only while not 106 * holding main lock) in order to allow takes to operate 107 * concurrently with allocation. This avoids repeated 108 * postponement of waiting consumers and consequent element 109 * build-up. The need to back away from lock during allocation 110 * makes it impossible to simply wrap delegated 111 * java.util.PriorityQueue operations within a lock, as was done 112 * in a previous version of this class. To maintain 113 * interoperability, a plain PriorityQueue is still used during 114 * serialization, which maintains compatibility at the espense of 115 * transiently doubling overhead. 116 */ 117 118 /** 119 * Default array capacity. 120 */ 121 private static final int DEFAULT_INITIAL_CAPACITY = 11; 122 123 /** 124 * The maximum size of array to allocate. 125 * Some VMs reserve some header words in an array. 126 * Attempts to allocate larger arrays may result in 127 * OutOfMemoryError: Requested array size exceeds VM limit 128 */ 129 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 130 131 /** 132 * Priority queue represented as a balanced binary heap: the two 133 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The 134 * priority queue is ordered by comparator, or by the elements' 135 * natural ordering, if comparator is null: For each node n in the 136 * heap and each descendant d of n, n <= d. The element with the 137 * lowest value is in queue[0], assuming the queue is nonempty. 138 */ 139 private transient Object[] queue; 140 141 /** 142 * The number of elements in the priority queue. 143 */ 144 private transient int size; 145 146 /** 147 * The comparator, or null if priority queue uses elements' 148 * natural ordering. 149 */ 150 private transient Comparator<? super E> comparator; 151 152 /** 153 * Lock used for all public operations 154 */ 155 private final ReentrantLock lock; 156 157 /** 158 * Condition for blocking when empty 159 */ 160 private final Condition notEmpty; 161 162 /** 163 * Spinlock for allocation, acquired via CAS. 164 */ 165 private transient volatile int allocationSpinLock; 166 167 /** 168 * A plain PriorityQueue used only for serialization, 169 * to maintain compatibility with previous versions 170 * of this class. Non-null only during serialization/deserialization. 171 */ 172 private PriorityQueue<E> q; 173 174 /** 175 * Creates a {@code PriorityBlockingQueue} with the default 176 * initial capacity (11) that orders its elements according to 177 * their {@linkplain Comparable natural ordering}. 178 */ 179 public PriorityBlockingQueue() { 180 this(DEFAULT_INITIAL_CAPACITY, null); 181 } 182 183 /** 184 * Creates a {@code PriorityBlockingQueue} with the specified 185 * initial capacity that orders its elements according to their 186 * {@linkplain Comparable natural ordering}. 187 * 188 * @param initialCapacity the initial capacity for this priority queue 189 * @throws IllegalArgumentException if {@code initialCapacity} is less 190 * than 1 191 */ 192 public PriorityBlockingQueue(int initialCapacity) { 193 this(initialCapacity, null); 194 } 195 196 /** 197 * Creates a {@code PriorityBlockingQueue} with the specified initial 198 * capacity that orders its elements according to the specified 199 * comparator. 200 * 201 * @param initialCapacity the initial capacity for this priority queue 202 * @param comparator the comparator that will be used to order this 203 * priority queue. If {@code null}, the {@linkplain Comparable 204 * natural ordering} of the elements will be used. 205 * @throws IllegalArgumentException if {@code initialCapacity} is less 206 * than 1 207 */ 208 public PriorityBlockingQueue(int initialCapacity, 209 Comparator<? super E> comparator) { 210 if (initialCapacity < 1) 211 throw new IllegalArgumentException(); 212 this.lock = new ReentrantLock(); 213 this.notEmpty = lock.newCondition(); 214 this.comparator = comparator; 215 this.queue = new Object[initialCapacity]; 216 } 217 218 /** 219 * Creates a {@code PriorityBlockingQueue} containing the elements 220 * in the specified collection. If the specified collection is a 221 * {@link SortedSet} or a {@link PriorityQueue}, this 222 * priority queue will be ordered according to the same ordering. 223 * Otherwise, this priority queue will be ordered according to the 224 * {@linkplain Comparable natural ordering} of its elements. 225 * 226 * @param c the collection whose elements are to be placed 227 * into this priority queue 228 * @throws ClassCastException if elements of the specified collection 229 * cannot be compared to one another according to the priority 230 * queue's ordering 231 * @throws NullPointerException if the specified collection or any 232 * of its elements are null 233 */ 234 public PriorityBlockingQueue(Collection<? extends E> c) { 235 this.lock = new ReentrantLock(); 236 this.notEmpty = lock.newCondition(); 237 boolean heapify = true; // true if not known to be in heap order 238 boolean screen = true; // true if must screen for nulls 239 if (c instanceof SortedSet<?>) { 240 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; 241 this.comparator = (Comparator<? super E>) ss.comparator(); 242 heapify = false; 243 } 244 else if (c instanceof PriorityBlockingQueue<?>) { 245 PriorityBlockingQueue<? extends E> pq = 246 (PriorityBlockingQueue<? extends E>) c; 247 this.comparator = (Comparator<? super E>) pq.comparator(); 248 screen = false; 249 if (pq.getClass() == PriorityBlockingQueue.class) // exact match 250 heapify = false; 251 } 252 Object[] a = c.toArray(); 253 int n = a.length; 254 // If c.toArray incorrectly doesn't return Object[], copy it. 255 if (a.getClass() != Object[].class) 256 a = Arrays.copyOf(a, n, Object[].class); 257 if (screen && (n == 1 || this.comparator != null)) { 258 for (int i = 0; i < n; ++i) 259 if (a[i] == null) 260 throw new NullPointerException(); 261 } 262 this.queue = a; 263 this.size = n; 264 if (heapify) 265 heapify(); 266 } 267 268 /** 269 * Tries to grow array to accommodate at least one more element 270 * (but normally expand by about 50%), giving up (allowing retry) 271 * on contention (which we expect to be rare). Call only while 272 * holding lock. 273 * 274 * @param array the heap array 275 * @param oldCap the length of the array 276 */ 277 private void tryGrow(Object[] array, int oldCap) { 278 lock.unlock(); // must release and then re-acquire main lock 279 Object[] newArray = null; 280 if (allocationSpinLock == 0 && 281 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 282 0, 1)) { 283 try { 284 int newCap = oldCap + ((oldCap < 64) ? 285 (oldCap + 2) : // grow faster if small 286 (oldCap >> 1)); 287 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow 288 int minCap = oldCap + 1; 289 if (minCap < 0 || minCap > MAX_ARRAY_SIZE) 290 throw new OutOfMemoryError(); 291 newCap = MAX_ARRAY_SIZE; 292 } 293 if (newCap > oldCap && queue == array) 294 newArray = new Object[newCap]; 295 } finally { 296 allocationSpinLock = 0; 297 } 298 } 299 if (newArray == null) // back off if another thread is allocating 300 Thread.yield(); 301 lock.lock(); 302 if (newArray != null && queue == array) { 303 queue = newArray; 304 System.arraycopy(array, 0, newArray, 0, oldCap); 305 } 306 } 307 308 /** 309 * Mechanics for poll(). Call only while holding lock. 310 */ 311 private E extract() { 312 E result; 313 int n = size - 1; 314 if (n < 0) 315 result = null; 316 else { 317 Object[] array = queue; 318 result = (E) array[0]; 319 E x = (E) array[n]; 320 array[n] = null; 321 Comparator<? super E> cmp = comparator; 322 if (cmp == null) 323 siftDownComparable(0, x, array, n); 324 else 325 siftDownUsingComparator(0, x, array, n, cmp); 326 size = n; 327 } 328 return result; 329 } 330 331 /** 332 * Inserts item x at position k, maintaining heap invariant by 333 * promoting x up the tree until it is greater than or equal to 334 * its parent, or is the root. 335 * 336 * To simplify and speed up coercions and comparisons. the 337 * Comparable and Comparator versions are separated into different 338 * methods that are otherwise identical. (Similarly for siftDown.) 339 * These methods are static, with heap state as arguments, to 340 * simplify use in light of possible comparator exceptions. 341 * 342 * @param k the position to fill 343 * @param x the item to insert 344 * @param array the heap array 345 * @param n heap size 346 */ 347 private static <T> void siftUpComparable(int k, T x, Object[] array) { 348 Comparable<? super T> key = (Comparable<? super T>) x; 349 while (k > 0) { 350 int parent = (k - 1) >>> 1; 351 Object e = array[parent]; 352 if (key.compareTo((T) e) >= 0) 353 break; 354 array[k] = e; 355 k = parent; 356 } 357 array[k] = key; 358 } 359 360 private static <T> void siftUpUsingComparator(int k, T x, Object[] array, 361 Comparator<? super T> cmp) { 362 while (k > 0) { 363 int parent = (k - 1) >>> 1; 364 Object e = array[parent]; 365 if (cmp.compare(x, (T) e) >= 0) 366 break; 367 array[k] = e; 368 k = parent; 369 } 370 array[k] = x; 371 } 372 373 /** 374 * Inserts item x at position k, maintaining heap invariant by 375 * demoting x down the tree repeatedly until it is less than or 376 * equal to its children or is a leaf. 377 * 378 * @param k the position to fill 379 * @param x the item to insert 380 * @param array the heap array 381 * @param n heap size 382 */ 383 private static <T> void siftDownComparable(int k, T x, Object[] array, 384 int n) { 385 Comparable<? super T> key = (Comparable<? super T>)x; 386 int half = n >>> 1; // loop while a non-leaf 387 while (k < half) { 388 int child = (k << 1) + 1; // assume left child is least 389 Object c = array[child]; 390 int right = child + 1; 391 if (right < n && 392 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) 393 c = array[child = right]; 394 if (key.compareTo((T) c) <= 0) 395 break; 396 array[k] = c; 397 k = child; 398 } 399 array[k] = key; 400 } 401 402 private static <T> void siftDownUsingComparator(int k, T x, Object[] array, 403 int n, 404 Comparator<? super T> cmp) { 405 int half = n >>> 1; 406 while (k < half) { 407 int child = (k << 1) + 1; 408 Object c = array[child]; 409 int right = child + 1; 410 if (right < n && cmp.compare((T) c, (T) array[right]) > 0) 411 c = array[child = right]; 412 if (cmp.compare(x, (T) c) <= 0) 413 break; 414 array[k] = c; 415 k = child; 416 } 417 array[k] = x; 418 } 419 420 /** 421 * Establishes the heap invariant (described above) in the entire tree, 422 * assuming nothing about the order of the elements prior to the call. 423 */ 424 private void heapify() { 425 Object[] array = queue; 426 int n = size; 427 int half = (n >>> 1) - 1; 428 Comparator<? super E> cmp = comparator; 429 if (cmp == null) { 430 for (int i = half; i >= 0; i--) 431 siftDownComparable(i, (E) array[i], array, n); 432 } 433 else { 434 for (int i = half; i >= 0; i--) 435 siftDownUsingComparator(i, (E) array[i], array, n, cmp); 436 } 437 } 438 439 /** 440 * Inserts the specified element into this priority queue. 441 * 442 * @param e the element to add 443 * @return {@code true} (as specified by {@link Collection#add}) 444 * @throws ClassCastException if the specified element cannot be compared 445 * with elements currently in the priority queue according to the 446 * priority queue's ordering 447 * @throws NullPointerException if the specified element is null 448 */ 449 public boolean add(E e) { 450 return offer(e); 451 } 452 453 /** 454 * Inserts the specified element into this priority queue. 455 * As the queue is unbounded, this method will never return {@code false}. 456 * 457 * @param e the element to add 458 * @return {@code true} (as specified by {@link Queue#offer}) 459 * @throws ClassCastException if the specified element cannot be compared 460 * with elements currently in the priority queue according to the 461 * priority queue's ordering 462 * @throws NullPointerException if the specified element is null 463 */ 464 public boolean offer(E e) { 465 if (e == null) 466 throw new NullPointerException(); 467 final ReentrantLock lock = this.lock; 468 lock.lock(); 469 int n, cap; 470 Object[] array; 471 while ((n = size) >= (cap = (array = queue).length)) 472 tryGrow(array, cap); 473 try { 474 Comparator<? super E> cmp = comparator; 475 if (cmp == null) 476 siftUpComparable(n, e, array); 477 else 478 siftUpUsingComparator(n, e, array, cmp); 479 size = n + 1; 480 notEmpty.signal(); 481 } finally { 482 lock.unlock(); 483 } 484 return true; 485 } 486 487 /** 488 * Inserts the specified element into this priority queue. 489 * As the queue is unbounded, this method will never block. 490 * 491 * @param e the element to add 492 * @throws ClassCastException if the specified element cannot be compared 493 * with elements currently in the priority queue according to the 494 * priority queue's ordering 495 * @throws NullPointerException if the specified element is null 496 */ 497 public void put(E e) { 498 offer(e); // never need to block 499 } 500 501 /** 502 * Inserts the specified element into this priority queue. 503 * As the queue is unbounded, this method will never block or 504 * return {@code false}. 505 * 506 * @param e the element to add 507 * @param timeout This parameter is ignored as the method never blocks 508 * @param unit This parameter is ignored as the method never blocks 509 * @return {@code true} (as specified by 510 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) 511 * @throws ClassCastException if the specified element cannot be compared 512 * with elements currently in the priority queue according to the 513 * priority queue's ordering 514 * @throws NullPointerException if the specified element is null 515 */ 516 public boolean offer(E e, long timeout, TimeUnit unit) { 517 return offer(e); // never need to block 518 } 519 520 public E poll() { 521 final ReentrantLock lock = this.lock; 522 lock.lock(); 523 E result; 524 try { 525 result = extract(); 526 } finally { 527 lock.unlock(); 528 } 529 return result; 530 } 531 532 public E take() throws InterruptedException { 533 final ReentrantLock lock = this.lock; 534 lock.lockInterruptibly(); 535 E result; 536 try { 537 while ( (result = extract()) == null) 538 notEmpty.await(); 539 } finally { 540 lock.unlock(); 541 } 542 return result; 543 } 544 545 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 546 long nanos = unit.toNanos(timeout); 547 final ReentrantLock lock = this.lock; 548 lock.lockInterruptibly(); 549 E result; 550 try { 551 while ( (result = extract()) == null && nanos > 0) 552 nanos = notEmpty.awaitNanos(nanos); 553 } finally { 554 lock.unlock(); 555 } 556 return result; 557 } 558 559 public E peek() { 560 final ReentrantLock lock = this.lock; 561 lock.lock(); 562 E result; 563 try { 564 result = size > 0 ? (E) queue[0] : null; 565 } finally { 566 lock.unlock(); 567 } 568 return result; 569 } 570 571 /** 572 * Returns the comparator used to order the elements in this queue, 573 * or {@code null} if this queue uses the {@linkplain Comparable 574 * natural ordering} of its elements. 575 * 576 * @return the comparator used to order the elements in this queue, 577 * or {@code null} if this queue uses the natural 578 * ordering of its elements 579 */ 580 public Comparator<? super E> comparator() { 581 return comparator; 582 } 583 584 public int size() { 585 final ReentrantLock lock = this.lock; 586 lock.lock(); 587 try { 588 return size; 589 } finally { 590 lock.unlock(); 591 } 592 } 593 594 /** 595 * Always returns {@code Integer.MAX_VALUE} because 596 * a {@code PriorityBlockingQueue} is not capacity constrained. 597 * @return {@code Integer.MAX_VALUE} always 598 */ 599 public int remainingCapacity() { 600 return Integer.MAX_VALUE; 601 } 602 603 private int indexOf(Object o) { 604 if (o != null) { 605 Object[] array = queue; 606 int n = size; 607 for (int i = 0; i < n; i++) 608 if (o.equals(array[i])) 609 return i; 610 } 611 return -1; 612 } 613 614 /** 615 * Removes the ith element from queue. 616 */ 617 private void removeAt(int i) { 618 Object[] array = queue; 619 int n = size - 1; 620 if (n == i) // removed last element 621 array[i] = null; 622 else { 623 E moved = (E) array[n]; 624 array[n] = null; 625 Comparator<? super E> cmp = comparator; 626 if (cmp == null) 627 siftDownComparable(i, moved, array, n); 628 else 629 siftDownUsingComparator(i, moved, array, n, cmp); 630 if (array[i] == moved) { 631 if (cmp == null) 632 siftUpComparable(i, moved, array); 633 else 634 siftUpUsingComparator(i, moved, array, cmp); 635 } 636 } 637 size = n; 638 } 639 640 /** 641 * Removes a single instance of the specified element from this queue, 642 * if it is present. More formally, removes an element {@code e} such 643 * that {@code o.equals(e)}, if this queue contains one or more such 644 * elements. Returns {@code true} if and only if this queue contained 645 * the specified element (or equivalently, if this queue changed as a 646 * result of the call). 647 * 648 * @param o element to be removed from this queue, if present 649 * @return {@code true} if this queue changed as a result of the call 650 */ 651 public boolean remove(Object o) { 652 boolean removed = false; 653 final ReentrantLock lock = this.lock; 654 lock.lock(); 655 try { 656 int i = indexOf(o); 657 if (i != -1) { 658 removeAt(i); 659 removed = true; 660 } 661 } finally { 662 lock.unlock(); 663 } 664 return removed; 665 } 666 667 668 /** 669 * Identity-based version for use in Itr.remove 670 */ 671 private void removeEQ(Object o) { 672 final ReentrantLock lock = this.lock; 673 lock.lock(); 674 try { 675 Object[] array = queue; 676 int n = size; 677 for (int i = 0; i < n; i++) { 678 if (o == array[i]) { 679 removeAt(i); 680 break; 681 } 682 } 683 } finally { 684 lock.unlock(); 685 } 686 } 687 688 /** 689 * Returns {@code true} if this queue contains the specified element. 690 * More formally, returns {@code true} if and only if this queue contains 691 * at least one element {@code e} such that {@code o.equals(e)}. 692 * 693 * @param o object to be checked for containment in this queue 694 * @return {@code true} if this queue contains the specified element 695 */ 696 public boolean contains(Object o) { 697 int index; 698 final ReentrantLock lock = this.lock; 699 lock.lock(); 700 try { 701 index = indexOf(o); 702 } finally { 703 lock.unlock(); 704 } 705 return index != -1; 706 } 707 708 /** 709 * Returns an array containing all of the elements in this queue. 710 * The returned array elements are in no particular order. 711 * 712 * <p>The returned array will be "safe" in that no references to it are 713 * maintained by this queue. (In other words, this method must allocate 714 * a new array). The caller is thus free to modify the returned array. 715 * 716 * <p>This method acts as bridge between array-based and collection-based 717 * APIs. 718 * 719 * @return an array containing all of the elements in this queue 720 */ 721 public Object[] toArray() { 722 final ReentrantLock lock = this.lock; 723 lock.lock(); 724 try { 725 return Arrays.copyOf(queue, size); 726 } finally { 727 lock.unlock(); 728 } 729 } 730 731 732 public String toString() { 733 final ReentrantLock lock = this.lock; 734 lock.lock(); 735 try { 736 int n = size; 737 if (n == 0) 738 return "[]"; 739 StringBuilder sb = new StringBuilder(); 740 sb.append('['); 741 for (int i = 0; i < n; ++i) { 742 E e = (E)queue[i]; 743 sb.append(e == this ? "(this Collection)" : e); 744 if (i != n - 1) 745 sb.append(',').append(' '); 746 } 747 return sb.append(']').toString(); 748 } finally { 749 lock.unlock(); 750 } 751 } 752 753 /** 754 * @throws UnsupportedOperationException {@inheritDoc} 755 * @throws ClassCastException {@inheritDoc} 756 * @throws NullPointerException {@inheritDoc} 757 * @throws IllegalArgumentException {@inheritDoc} 758 */ 759 public int drainTo(Collection<? super E> c) { 760 if (c == null) 761 throw new NullPointerException(); 762 if (c == this) 763 throw new IllegalArgumentException(); 764 final ReentrantLock lock = this.lock; 765 lock.lock(); 766 try { 767 int n = 0; 768 E e; 769 while ( (e = extract()) != null) { 770 c.add(e); 771 ++n; 772 } 773 return n; 774 } finally { 775 lock.unlock(); 776 } 777 } 778 779 /** 780 * @throws UnsupportedOperationException {@inheritDoc} 781 * @throws ClassCastException {@inheritDoc} 782 * @throws NullPointerException {@inheritDoc} 783 * @throws IllegalArgumentException {@inheritDoc} 784 */ 785 public int drainTo(Collection<? super E> c, int maxElements) { 786 if (c == null) 787 throw new NullPointerException(); 788 if (c == this) 789 throw new IllegalArgumentException(); 790 if (maxElements <= 0) 791 return 0; 792 final ReentrantLock lock = this.lock; 793 lock.lock(); 794 try { 795 int n = 0; 796 E e; 797 while (n < maxElements && (e = extract()) != null) { 798 c.add(e); 799 ++n; 800 } 801 return n; 802 } finally { 803 lock.unlock(); 804 } 805 } 806 807 /** 808 * Atomically removes all of the elements from this queue. 809 * The queue will be empty after this call returns. 810 */ 811 public void clear() { 812 final ReentrantLock lock = this.lock; 813 lock.lock(); 814 try { 815 Object[] array = queue; 816 int n = size; 817 size = 0; 818 for (int i = 0; i < n; i++) 819 array[i] = null; 820 } finally { 821 lock.unlock(); 822 } 823 } 824 825 /** 826 * Returns an array containing all of the elements in this queue; the 827 * runtime type of the returned array is that of the specified array. 828 * The returned array elements are in no particular order. 829 * If the queue fits in the specified array, it is returned therein. 830 * Otherwise, a new array is allocated with the runtime type of the 831 * specified array and the size of this queue. 832 * 833 * <p>If this queue fits in the specified array with room to spare 834 * (i.e., the array has more elements than this queue), the element in 835 * the array immediately following the end of the queue is set to 836 * {@code null}. 837 * 838 * <p>Like the {@link #toArray()} method, this method acts as bridge between 839 * array-based and collection-based APIs. Further, this method allows 840 * precise control over the runtime type of the output array, and may, 841 * under certain circumstances, be used to save allocation costs. 842 * 843 * <p>Suppose {@code x} is a queue known to contain only strings. 844 * The following code can be used to dump the queue into a newly 845 * allocated array of {@code String}: 846 * 847 * <pre> 848 * String[] y = x.toArray(new String[0]);</pre> 849 * 850 * Note that {@code toArray(new Object[0])} is identical in function to 851 * {@code toArray()}. 852 * 853 * @param a the array into which the elements of the queue are to 854 * be stored, if it is big enough; otherwise, a new array of the 855 * same runtime type is allocated for this purpose 856 * @return an array containing all of the elements in this queue 857 * @throws ArrayStoreException if the runtime type of the specified array 858 * is not a supertype of the runtime type of every element in 859 * this queue 860 * @throws NullPointerException if the specified array is null 861 */ 862 public <T> T[] toArray(T[] a) { 863 final ReentrantLock lock = this.lock; 864 lock.lock(); 865 try { 866 int n = size; 867 if (a.length < n) 868 // Make a new array of a's runtime type, but my contents: 869 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 870 System.arraycopy(queue, 0, a, 0, n); 871 if (a.length > n) 872 a[n] = null; 873 return a; 874 } finally { 875 lock.unlock(); 876 } 877 } 878 879 /** 880 * Returns an iterator over the elements in this queue. The 881 * iterator does not return the elements in any particular order. 882 * 883 * <p>The returned iterator is a "weakly consistent" iterator that 884 * will never throw {@link java.util.ConcurrentModificationException 885 * ConcurrentModificationException}, and guarantees to traverse 886 * elements as they existed upon construction of the iterator, and 887 * may (but is not guaranteed to) reflect any modifications 888 * subsequent to construction. 889 * 890 * @return an iterator over the elements in this queue 891 */ 892 public Iterator<E> iterator() { 893 return new Itr(toArray()); 894 } 895 896 /** 897 * Snapshot iterator that works off copy of underlying q array. 898 */ 899 final class Itr implements Iterator<E> { 900 final Object[] array; // Array of all elements 901 int cursor; // index of next element to return; 902 int lastRet; // index of last element, or -1 if no such 903 904 Itr(Object[] array) { 905 lastRet = -1; 906 this.array = array; 907 } 908 909 public boolean hasNext() { 910 return cursor < array.length; 911 } 912 913 public E next() { 914 if (cursor >= array.length) 915 throw new NoSuchElementException(); 916 lastRet = cursor; 917 return (E)array[cursor++]; 918 } 919 920 public void remove() { 921 if (lastRet < 0) 922 throw new IllegalStateException(); 923 removeEQ(array[lastRet]); 924 lastRet = -1; 925 } 926 } 927 928 /** 929 * Saves the state to a stream (that is, serializes it). For 930 * compatibility with previous version of this class, 931 * elements are first copied to a java.util.PriorityQueue, 932 * which is then serialized. 933 */ 934 private void writeObject(java.io.ObjectOutputStream s) 935 throws java.io.IOException { 936 lock.lock(); 937 try { 938 int n = size; // avoid zero capacity argument 939 q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator); 940 q.addAll(this); 941 s.defaultWriteObject(); 942 } finally { 943 q = null; 944 lock.unlock(); 945 } 946 } 947 948 /** 949 * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream 950 * (that is, deserializes it). 951 * 952 * @param s the stream 953 */ 954 private void readObject(java.io.ObjectInputStream s) 955 throws java.io.IOException, ClassNotFoundException { 956 try { 957 s.defaultReadObject(); 958 this.queue = new Object[q.size()]; 959 comparator = q.comparator(); 960 addAll(q); 961 } finally { 962 q = null; 963 } 964 } 965 966 // Unsafe mechanics 967 private static final sun.misc.Unsafe UNSAFE; 968 private static final long allocationSpinLockOffset; 969 static { 970 try { 971 UNSAFE = sun.misc.Unsafe.getUnsafe(); 972 Class<?> k = PriorityBlockingQueue.class; 973 allocationSpinLockOffset = UNSAFE.objectFieldOffset 974 (k.getDeclaredField("allocationSpinLock")); 975 } catch (Exception e) { 976 throw new Error(e); 977 } 978 } 979 }