1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.concurrent.locks.*; 39 import java.util.*; 40 41 /** 42 * An unbounded {@linkplain BlockingQueue blocking queue} that uses 43 * the same ordering rules as class {@link PriorityQueue} and supplies 44 * blocking retrieval operations. While this queue is logically 45 * unbounded, attempted additions may fail due to resource exhaustion 46 * (causing {@code OutOfMemoryError}). This class does not permit 47 * {@code null} elements. A priority queue relying on {@linkplain 48 * Comparable natural ordering} also does not permit insertion of 49 * non-comparable objects (doing so results in 50 * {@code ClassCastException}). 51 * 52 * <p>This class and its iterator implement all of the 53 * <em>optional</em> methods of the {@link Collection} and {@link 54 * Iterator} interfaces. The Iterator provided in method {@link 55 * #iterator()} is <em>not</em> guaranteed to traverse the elements of 56 * the PriorityBlockingQueue in any particular order. If you need 57 * ordered traversal, consider using 58 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo} 59 * can be used to <em>remove</em> some or all elements in priority 60 * order and place them in another collection. 61 * 62 * <p>Operations on this class make no guarantees about the ordering 63 * of elements with equal priority. If you need to enforce an 64 * ordering, you can define custom classes or comparators that use a 65 * secondary key to break ties in primary priority values. For 66 * example, here is a class that applies first-in-first-out 67 * tie-breaking to comparable elements. To use it, you would insert a 68 * {@code new FIFOEntry(anEntry)} instead of a plain entry object. 69 * 70 * <pre> {@code 71 * class FIFOEntry<E extends Comparable<? super E>> 72 * implements Comparable<FIFOEntry<E>> { 73 * static final AtomicLong seq = new AtomicLong(0); 74 * final long seqNum; 75 * final E entry; 76 * public FIFOEntry(E entry) { 77 * seqNum = seq.getAndIncrement(); 78 * this.entry = entry; 79 * } 80 * public E getEntry() { return entry; } 81 * public int compareTo(FIFOEntry<E> other) { 82 * int res = entry.compareTo(other.entry); 83 * if (res == 0 && other.entry != this.entry) 84 * res = (seqNum < other.seqNum ? -1 : 1); 85 * return res; 86 * } 87 * }}</pre> 88 * 89 * <p>This class is a member of the 90 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 91 * Java Collections Framework</a>. 92 * 93 * @since 1.5 94 * @author Doug Lea 95 * @param <E> the type of elements held in this collection 96 */ 97 public class PriorityBlockingQueue<E> extends AbstractQueue<E> 98 implements BlockingQueue<E>, java.io.Serializable { 99 private static final long serialVersionUID = 5595510919245408276L; 100 101 /* 102 * The implementation uses an array-based binary heap, with public 103 * operations protected with a single lock. However, allocation 104 * during resizing uses a simple spinlock (used only while not 105 * holding main lock) in order to allow takes to operate 106 * concurrently with allocation. This avoids repeated 107 * postponement of waiting consumers and consequent element 108 * build-up. The need to back away from lock during allocation 109 * makes it impossible to simply wrap delegated 110 * java.util.PriorityQueue operations within a lock, as was done 111 * in a previous version of this class. To maintain 112 * interoperability, a plain PriorityQueue is still used during 113 * serialization, which maintains compatibility at the espense of 114 * transiently doubling overhead. 115 */ 116 117 /** 118 * Default array capacity. 119 */ 120 private static final int DEFAULT_INITIAL_CAPACITY = 11; 121 122 /** 123 * The maximum size of array to allocate. 124 * Some VMs reserve some header words in an array. 125 * Attempts to allocate larger arrays may result in 126 * OutOfMemoryError: Requested array size exceeds VM limit 127 */ 128 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 129 130 /** 131 * Priority queue represented as a balanced binary heap: the two 132 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The 133 * priority queue is ordered by comparator, or by the elements' 134 * natural ordering, if comparator is null: For each node n in the 135 * heap and each descendant d of n, n <= d. The element with the 136 * lowest value is in queue[0], assuming the queue is nonempty. 137 */ 138 private transient Object[] queue; 139 140 /** 141 * The number of elements in the priority queue. 142 */ 143 private transient int size; 144 145 /** 146 * The comparator, or null if priority queue uses elements' 147 * natural ordering. 148 */ 149 private transient Comparator<? super E> comparator; 150 151 /** 152 * Lock used for all public operations 153 */ 154 private final ReentrantLock lock; 155 156 /** 157 * Condition for blocking when empty 158 */ 159 private final Condition notEmpty; 160 161 /** 162 * Spinlock for allocation, acquired via CAS. 163 */ 164 private transient volatile int allocationSpinLock; 165 166 /** 167 * A plain PriorityQueue used only for serialization, 168 * to maintain compatibility with previous versions 169 * of this class. Non-null only during serialization/deserialization. 170 */ 171 private PriorityQueue q; 172 173 /** 174 * Creates a {@code PriorityBlockingQueue} with the default 175 * initial capacity (11) that orders its elements according to 176 * their {@linkplain Comparable natural ordering}. 177 */ 178 public PriorityBlockingQueue() { 179 this(DEFAULT_INITIAL_CAPACITY, null); 180 } 181 182 /** 183 * Creates a {@code PriorityBlockingQueue} with the specified 184 * initial capacity that orders its elements according to their 185 * {@linkplain Comparable natural ordering}. 186 * 187 * @param initialCapacity the initial capacity for this priority queue 188 * @throws IllegalArgumentException if {@code initialCapacity} is less 189 * than 1 190 */ 191 public PriorityBlockingQueue(int initialCapacity) { 192 this(initialCapacity, null); 193 } 194 195 /** 196 * Creates a {@code PriorityBlockingQueue} with the specified initial 197 * capacity that orders its elements according to the specified 198 * comparator. 199 * 200 * @param initialCapacity the initial capacity for this priority queue 201 * @param comparator the comparator that will be used to order this 202 * priority queue. If {@code null}, the {@linkplain Comparable 203 * natural ordering} of the elements will be used. 204 * @throws IllegalArgumentException if {@code initialCapacity} is less 205 * than 1 206 */ 207 public PriorityBlockingQueue(int initialCapacity, 208 Comparator<? super E> comparator) { 209 if (initialCapacity < 1) 210 throw new IllegalArgumentException(); 211 this.lock = new ReentrantLock(); 212 this.notEmpty = lock.newCondition(); 213 this.comparator = comparator; 214 this.queue = new Object[initialCapacity]; 215 } 216 217 /** 218 * Creates a {@code PriorityBlockingQueue} containing the elements 219 * in the specified collection. If the specified collection is a 220 * {@link SortedSet} or a {@link PriorityQueue}, this 221 * priority queue will be ordered according to the same ordering. 222 * Otherwise, this priority queue will be ordered according to the 223 * {@linkplain Comparable natural ordering} of its elements. 224 * 225 * @param c the collection whose elements are to be placed 226 * into this priority queue 227 * @throws ClassCastException if elements of the specified collection 228 * cannot be compared to one another according to the priority 229 * queue's ordering 230 * @throws NullPointerException if the specified collection or any 231 * of its elements are null 232 */ 233 public PriorityBlockingQueue(Collection<? extends E> c) { 234 this.lock = new ReentrantLock(); 235 this.notEmpty = lock.newCondition(); 236 boolean heapify = true; // true if not known to be in heap order 237 boolean screen = true; // true if must screen for nulls 238 if (c instanceof SortedSet<?>) { 239 SortedSet<? extends E> ss = (SortedSet<? extends E>) c; 240 this.comparator = (Comparator<? super E>) ss.comparator(); 241 heapify = false; 242 } 243 else if (c instanceof PriorityBlockingQueue<?>) { 244 PriorityBlockingQueue<? extends E> pq = 245 (PriorityBlockingQueue<? extends E>) c; 246 this.comparator = (Comparator<? super E>) pq.comparator(); 247 screen = false; 248 if (pq.getClass() == PriorityBlockingQueue.class) // exact match 249 heapify = false; 250 } 251 Object[] a = c.toArray(); 252 int n = a.length; 253 // If c.toArray incorrectly doesn't return Object[], copy it. 254 if (a.getClass() != Object[].class) 255 a = Arrays.copyOf(a, n, Object[].class); 256 if (screen && (n == 1 || this.comparator != null)) { 257 for (int i = 0; i < n; ++i) 258 if (a[i] == null) 259 throw new NullPointerException(); 260 } 261 this.queue = a; 262 this.size = n; 263 if (heapify) 264 heapify(); 265 } 266 267 /** 268 * Tries to grow array to accommodate at least one more element 269 * (but normally expand by about 50%), giving up (allowing retry) 270 * on contention (which we expect to be rare). Call only while 271 * holding lock. 272 * 273 * @param array the heap array 274 * @param oldCap the length of the array 275 */ 276 private void tryGrow(Object[] array, int oldCap) { 277 lock.unlock(); // must release and then re-acquire main lock 278 Object[] newArray = null; 279 if (allocationSpinLock == 0 && 280 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 281 0, 1)) { 282 try { 283 int newCap = oldCap + ((oldCap < 64) ? 284 (oldCap + 2) : // grow faster if small 285 (oldCap >> 1)); 286 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow 287 int minCap = oldCap + 1; 288 if (minCap < 0 || minCap > MAX_ARRAY_SIZE) 289 throw new OutOfMemoryError(); 290 newCap = MAX_ARRAY_SIZE; 291 } 292 if (newCap > oldCap && queue == array) 293 newArray = new Object[newCap]; 294 } finally { 295 allocationSpinLock = 0; 296 } 297 } 298 if (newArray == null) // back off if another thread is allocating 299 Thread.yield(); 300 lock.lock(); 301 if (newArray != null && queue == array) { 302 queue = newArray; 303 System.arraycopy(array, 0, newArray, 0, oldCap); 304 } 305 } 306 307 /** 308 * Mechanics for poll(). Call only while holding lock. 309 */ 310 private E extract() { 311 E result; 312 int n = size - 1; 313 if (n < 0) 314 result = null; 315 else { 316 Object[] array = queue; 317 result = (E) array[0]; 318 E x = (E) array[n]; 319 array[n] = null; 320 Comparator<? super E> cmp = comparator; 321 if (cmp == null) 322 siftDownComparable(0, x, array, n); 323 else 324 siftDownUsingComparator(0, x, array, n, cmp); 325 size = n; 326 } 327 return result; 328 } 329 330 /** 331 * Inserts item x at position k, maintaining heap invariant by 332 * promoting x up the tree until it is greater than or equal to 333 * its parent, or is the root. 334 * 335 * To simplify and speed up coercions and comparisons. the 336 * Comparable and Comparator versions are separated into different 337 * methods that are otherwise identical. (Similarly for siftDown.) 338 * These methods are static, with heap state as arguments, to 339 * simplify use in light of possible comparator exceptions. 340 * 341 * @param k the position to fill 342 * @param x the item to insert 343 * @param array the heap array 344 * @param n heap size 345 */ 346 private static <T> void siftUpComparable(int k, T x, Object[] array) { 347 Comparable<? super T> key = (Comparable<? super T>) x; 348 while (k > 0) { 349 int parent = (k - 1) >>> 1; 350 Object e = array[parent]; 351 if (key.compareTo((T) e) >= 0) 352 break; 353 array[k] = e; 354 k = parent; 355 } 356 array[k] = key; 357 } 358 359 private static <T> void siftUpUsingComparator(int k, T x, Object[] array, 360 Comparator<? super T> cmp) { 361 while (k > 0) { 362 int parent = (k - 1) >>> 1; 363 Object e = array[parent]; 364 if (cmp.compare(x, (T) e) >= 0) 365 break; 366 array[k] = e; 367 k = parent; 368 } 369 array[k] = x; 370 } 371 372 /** 373 * Inserts item x at position k, maintaining heap invariant by 374 * demoting x down the tree repeatedly until it is less than or 375 * equal to its children or is a leaf. 376 * 377 * @param k the position to fill 378 * @param x the item to insert 379 * @param array the heap array 380 * @param n heap size 381 */ 382 private static <T> void siftDownComparable(int k, T x, Object[] array, 383 int n) { 384 Comparable<? super T> key = (Comparable<? super T>)x; 385 int half = n >>> 1; // loop while a non-leaf 386 while (k < half) { 387 int child = (k << 1) + 1; // assume left child is least 388 Object c = array[child]; 389 int right = child + 1; 390 if (right < n && 391 ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) 392 c = array[child = right]; 393 if (key.compareTo((T) c) <= 0) 394 break; 395 array[k] = c; 396 k = child; 397 } 398 array[k] = key; 399 } 400 401 private static <T> void siftDownUsingComparator(int k, T x, Object[] array, 402 int n, 403 Comparator<? super T> cmp) { 404 int half = n >>> 1; 405 while (k < half) { 406 int child = (k << 1) + 1; 407 Object c = array[child]; 408 int right = child + 1; 409 if (right < n && cmp.compare((T) c, (T) array[right]) > 0) 410 c = array[child = right]; 411 if (cmp.compare(x, (T) c) <= 0) 412 break; 413 array[k] = c; 414 k = child; 415 } 416 array[k] = x; 417 } 418 419 /** 420 * Establishes the heap invariant (described above) in the entire tree, 421 * assuming nothing about the order of the elements prior to the call. 422 */ 423 private void heapify() { 424 Object[] array = queue; 425 int n = size; 426 int half = (n >>> 1) - 1; 427 Comparator<? super E> cmp = comparator; 428 if (cmp == null) { 429 for (int i = half; i >= 0; i--) 430 siftDownComparable(i, (E) array[i], array, n); 431 } 432 else { 433 for (int i = half; i >= 0; i--) 434 siftDownUsingComparator(i, (E) array[i], array, n, cmp); 435 } 436 } 437 438 /** 439 * Inserts the specified element into this priority queue. 440 * 441 * @param e the element to add 442 * @return {@code true} (as specified by {@link Collection#add}) 443 * @throws ClassCastException if the specified element cannot be compared 444 * with elements currently in the priority queue according to the 445 * priority queue's ordering 446 * @throws NullPointerException if the specified element is null 447 */ 448 public boolean add(E e) { 449 return offer(e); 450 } 451 452 /** 453 * Inserts the specified element into this priority queue. 454 * As the queue is unbounded, this method will never return {@code false}. 455 * 456 * @param e the element to add 457 * @return {@code true} (as specified by {@link Queue#offer}) 458 * @throws ClassCastException if the specified element cannot be compared 459 * with elements currently in the priority queue according to the 460 * priority queue's ordering 461 * @throws NullPointerException if the specified element is null 462 */ 463 public boolean offer(E e) { 464 if (e == null) 465 throw new NullPointerException(); 466 final ReentrantLock lock = this.lock; 467 lock.lock(); 468 int n, cap; 469 Object[] array; 470 while ((n = size) >= (cap = (array = queue).length)) 471 tryGrow(array, cap); 472 try { 473 Comparator<? super E> cmp = comparator; 474 if (cmp == null) 475 siftUpComparable(n, e, array); 476 else 477 siftUpUsingComparator(n, e, array, cmp); 478 size = n + 1; 479 notEmpty.signal(); 480 } finally { 481 lock.unlock(); 482 } 483 return true; 484 } 485 486 /** 487 * Inserts the specified element into this priority queue. 488 * As the queue is unbounded, this method will never block. 489 * 490 * @param e the element to add 491 * @throws ClassCastException if the specified element cannot be compared 492 * with elements currently in the priority queue according to the 493 * priority queue's ordering 494 * @throws NullPointerException if the specified element is null 495 */ 496 public void put(E e) { 497 offer(e); // never need to block 498 } 499 500 /** 501 * Inserts the specified element into this priority queue. 502 * As the queue is unbounded, this method will never block or 503 * return {@code false}. 504 * 505 * @param e the element to add 506 * @param timeout This parameter is ignored as the method never blocks 507 * @param unit This parameter is ignored as the method never blocks 508 * @return {@code true} (as specified by 509 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) 510 * @throws ClassCastException if the specified element cannot be compared 511 * with elements currently in the priority queue according to the 512 * priority queue's ordering 513 * @throws NullPointerException if the specified element is null 514 */ 515 public boolean offer(E e, long timeout, TimeUnit unit) { 516 return offer(e); // never need to block 517 } 518 519 public E poll() { 520 final ReentrantLock lock = this.lock; 521 lock.lock(); 522 E result; 523 try { 524 result = extract(); 525 } finally { 526 lock.unlock(); 527 } 528 return result; 529 } 530 531 public E take() throws InterruptedException { 532 final ReentrantLock lock = this.lock; 533 lock.lockInterruptibly(); 534 E result; 535 try { 536 while ( (result = extract()) == null) 537 notEmpty.await(); 538 } finally { 539 lock.unlock(); 540 } 541 return result; 542 } 543 544 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 545 long nanos = unit.toNanos(timeout); 546 final ReentrantLock lock = this.lock; 547 lock.lockInterruptibly(); 548 E result; 549 try { 550 while ( (result = extract()) == null && nanos > 0) 551 nanos = notEmpty.awaitNanos(nanos); 552 } finally { 553 lock.unlock(); 554 } 555 return result; 556 } 557 558 public E peek() { 559 final ReentrantLock lock = this.lock; 560 lock.lock(); 561 E result; 562 try { 563 result = size > 0 ? (E) queue[0] : null; 564 } finally { 565 lock.unlock(); 566 } 567 return result; 568 } 569 570 /** 571 * Returns the comparator used to order the elements in this queue, 572 * or {@code null} if this queue uses the {@linkplain Comparable 573 * natural ordering} of its elements. 574 * 575 * @return the comparator used to order the elements in this queue, 576 * or {@code null} if this queue uses the natural 577 * ordering of its elements 578 */ 579 public Comparator<? super E> comparator() { 580 return comparator; 581 } 582 583 public int size() { 584 final ReentrantLock lock = this.lock; 585 lock.lock(); 586 try { 587 return size; 588 } finally { 589 lock.unlock(); 590 } 591 } 592 593 /** 594 * Always returns {@code Integer.MAX_VALUE} because 595 * a {@code PriorityBlockingQueue} is not capacity constrained. 596 * @return {@code Integer.MAX_VALUE} always 597 */ 598 public int remainingCapacity() { 599 return Integer.MAX_VALUE; 600 } 601 602 private int indexOf(Object o) { 603 if (o != null) { 604 Object[] array = queue; 605 int n = size; 606 for (int i = 0; i < n; i++) 607 if (o.equals(array[i])) 608 return i; 609 } 610 return -1; 611 } 612 613 /** 614 * Removes the ith element from queue. 615 */ 616 private void removeAt(int i) { 617 Object[] array = queue; 618 int n = size - 1; 619 if (n == i) // removed last element 620 array[i] = null; 621 else { 622 E moved = (E) array[n]; 623 array[n] = null; 624 Comparator<? super E> cmp = comparator; 625 if (cmp == null) 626 siftDownComparable(i, moved, array, n); 627 else 628 siftDownUsingComparator(i, moved, array, n, cmp); 629 if (array[i] == moved) { 630 if (cmp == null) 631 siftUpComparable(i, moved, array); 632 else 633 siftUpUsingComparator(i, moved, array, cmp); 634 } 635 } 636 size = n; 637 } 638 639 /** 640 * Removes a single instance of the specified element from this queue, 641 * if it is present. More formally, removes an element {@code e} such 642 * that {@code o.equals(e)}, if this queue contains one or more such 643 * elements. Returns {@code true} if and only if this queue contained 644 * the specified element (or equivalently, if this queue changed as a 645 * result of the call). 646 * 647 * @param o element to be removed from this queue, if present 648 * @return {@code true} if this queue changed as a result of the call 649 */ 650 public boolean remove(Object o) { 651 boolean removed = false; 652 final ReentrantLock lock = this.lock; 653 lock.lock(); 654 try { 655 int i = indexOf(o); 656 if (i != -1) { 657 removeAt(i); 658 removed = true; 659 } 660 } finally { 661 lock.unlock(); 662 } 663 return removed; 664 } 665 666 667 /** 668 * Identity-based version for use in Itr.remove 669 */ 670 private void removeEQ(Object o) { 671 final ReentrantLock lock = this.lock; 672 lock.lock(); 673 try { 674 Object[] array = queue; 675 int n = size; 676 for (int i = 0; i < n; i++) { 677 if (o == array[i]) { 678 removeAt(i); 679 break; 680 } 681 } 682 } finally { 683 lock.unlock(); 684 } 685 } 686 687 /** 688 * Returns {@code true} if this queue contains the specified element. 689 * More formally, returns {@code true} if and only if this queue contains 690 * at least one element {@code e} such that {@code o.equals(e)}. 691 * 692 * @param o object to be checked for containment in this queue 693 * @return {@code true} if this queue contains the specified element 694 */ 695 public boolean contains(Object o) { 696 int index; 697 final ReentrantLock lock = this.lock; 698 lock.lock(); 699 try { 700 index = indexOf(o); 701 } finally { 702 lock.unlock(); 703 } 704 return index != -1; 705 } 706 707 /** 708 * Returns an array containing all of the elements in this queue. 709 * The returned array elements are in no particular order. 710 * 711 * <p>The returned array will be "safe" in that no references to it are 712 * maintained by this queue. (In other words, this method must allocate 713 * a new array). The caller is thus free to modify the returned array. 714 * 715 * <p>This method acts as bridge between array-based and collection-based 716 * APIs. 717 * 718 * @return an array containing all of the elements in this queue 719 */ 720 public Object[] toArray() { 721 final ReentrantLock lock = this.lock; 722 lock.lock(); 723 try { 724 return Arrays.copyOf(queue, size); 725 } finally { 726 lock.unlock(); 727 } 728 } 729 730 731 public String toString() { 732 final ReentrantLock lock = this.lock; 733 lock.lock(); 734 try { 735 int n = size; 736 if (n == 0) 737 return "[]"; 738 StringBuilder sb = new StringBuilder(); 739 sb.append('['); 740 for (int i = 0; i < n; ++i) { 741 E e = (E)queue[i]; 742 sb.append(e == this ? "(this Collection)" : e); 743 if (i != n - 1) 744 sb.append(',').append(' '); 745 } 746 return sb.append(']').toString(); 747 } finally { 748 lock.unlock(); 749 } 750 } 751 752 /** 753 * @throws UnsupportedOperationException {@inheritDoc} 754 * @throws ClassCastException {@inheritDoc} 755 * @throws NullPointerException {@inheritDoc} 756 * @throws IllegalArgumentException {@inheritDoc} 757 */ 758 public int drainTo(Collection<? super E> c) { 759 if (c == null) 760 throw new NullPointerException(); 761 if (c == this) 762 throw new IllegalArgumentException(); 763 final ReentrantLock lock = this.lock; 764 lock.lock(); 765 try { 766 int n = 0; 767 E e; 768 while ( (e = extract()) != null) { 769 c.add(e); 770 ++n; 771 } 772 return n; 773 } finally { 774 lock.unlock(); 775 } 776 } 777 778 /** 779 * @throws UnsupportedOperationException {@inheritDoc} 780 * @throws ClassCastException {@inheritDoc} 781 * @throws NullPointerException {@inheritDoc} 782 * @throws IllegalArgumentException {@inheritDoc} 783 */ 784 public int drainTo(Collection<? super E> c, int maxElements) { 785 if (c == null) 786 throw new NullPointerException(); 787 if (c == this) 788 throw new IllegalArgumentException(); 789 if (maxElements <= 0) 790 return 0; 791 final ReentrantLock lock = this.lock; 792 lock.lock(); 793 try { 794 int n = 0; 795 E e; 796 while (n < maxElements && (e = extract()) != null) { 797 c.add(e); 798 ++n; 799 } 800 return n; 801 } finally { 802 lock.unlock(); 803 } 804 } 805 806 /** 807 * Atomically removes all of the elements from this queue. 808 * The queue will be empty after this call returns. 809 */ 810 public void clear() { 811 final ReentrantLock lock = this.lock; 812 lock.lock(); 813 try { 814 Object[] array = queue; 815 int n = size; 816 size = 0; 817 for (int i = 0; i < n; i++) 818 array[i] = null; 819 } finally { 820 lock.unlock(); 821 } 822 } 823 824 /** 825 * Returns an array containing all of the elements in this queue; the 826 * runtime type of the returned array is that of the specified array. 827 * The returned array elements are in no particular order. 828 * If the queue fits in the specified array, it is returned therein. 829 * Otherwise, a new array is allocated with the runtime type of the 830 * specified array and the size of this queue. 831 * 832 * <p>If this queue fits in the specified array with room to spare 833 * (i.e., the array has more elements than this queue), the element in 834 * the array immediately following the end of the queue is set to 835 * {@code null}. 836 * 837 * <p>Like the {@link #toArray()} method, this method acts as bridge between 838 * array-based and collection-based APIs. Further, this method allows 839 * precise control over the runtime type of the output array, and may, 840 * under certain circumstances, be used to save allocation costs. 841 * 842 * <p>Suppose {@code x} is a queue known to contain only strings. 843 * The following code can be used to dump the queue into a newly 844 * allocated array of {@code String}: 845 * 846 * <pre> 847 * String[] y = x.toArray(new String[0]);</pre> 848 * 849 * Note that {@code toArray(new Object[0])} is identical in function to 850 * {@code toArray()}. 851 * 852 * @param a the array into which the elements of the queue are to 853 * be stored, if it is big enough; otherwise, a new array of the 854 * same runtime type is allocated for this purpose 855 * @return an array containing all of the elements in this queue 856 * @throws ArrayStoreException if the runtime type of the specified array 857 * is not a supertype of the runtime type of every element in 858 * this queue 859 * @throws NullPointerException if the specified array is null 860 */ 861 public <T> T[] toArray(T[] a) { 862 final ReentrantLock lock = this.lock; 863 lock.lock(); 864 try { 865 int n = size; 866 if (a.length < n) 867 // Make a new array of a's runtime type, but my contents: 868 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 869 System.arraycopy(queue, 0, a, 0, n); 870 if (a.length > n) 871 a[n] = null; 872 return a; 873 } finally { 874 lock.unlock(); 875 } 876 } 877 878 /** 879 * Returns an iterator over the elements in this queue. The 880 * iterator does not return the elements in any particular order. 881 * 882 * <p>The returned iterator is a "weakly consistent" iterator that 883 * will never throw {@link java.util.ConcurrentModificationException 884 * ConcurrentModificationException}, and guarantees to traverse 885 * elements as they existed upon construction of the iterator, and 886 * may (but is not guaranteed to) reflect any modifications 887 * subsequent to construction. 888 * 889 * @return an iterator over the elements in this queue 890 */ 891 public Iterator<E> iterator() { 892 return new Itr(toArray()); 893 } 894 895 /** 896 * Snapshot iterator that works off copy of underlying q array. 897 */ 898 final class Itr implements Iterator<E> { 899 final Object[] array; // Array of all elements 900 int cursor; // index of next element to return; 901 int lastRet; // index of last element, or -1 if no such 902 903 Itr(Object[] array) { 904 lastRet = -1; 905 this.array = array; 906 } 907 908 public boolean hasNext() { 909 return cursor < array.length; 910 } 911 912 public E next() { 913 if (cursor >= array.length) 914 throw new NoSuchElementException(); 915 lastRet = cursor; 916 return (E)array[cursor++]; 917 } 918 919 public void remove() { 920 if (lastRet < 0) 921 throw new IllegalStateException(); 922 removeEQ(array[lastRet]); 923 lastRet = -1; 924 } 925 } 926 927 /** 928 * Saves the state to a stream (that is, serializes it). For 929 * compatibility with previous version of this class, 930 * elements are first copied to a java.util.PriorityQueue, 931 * which is then serialized. 932 */ 933 private void writeObject(java.io.ObjectOutputStream s) 934 throws java.io.IOException { 935 lock.lock(); 936 try { 937 int n = size; // avoid zero capacity argument 938 q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator); 939 q.addAll(this); 940 s.defaultWriteObject(); 941 } finally { 942 q = null; 943 lock.unlock(); 944 } 945 } 946 947 /** 948 * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream 949 * (that is, deserializes it). 950 * 951 * @param s the stream 952 */ 953 private void readObject(java.io.ObjectInputStream s) 954 throws java.io.IOException, ClassNotFoundException { 955 try { 956 s.defaultReadObject(); 957 this.queue = new Object[q.size()]; 958 comparator = q.comparator(); 959 addAll(q); 960 } finally { 961 q = null; 962 } 963 } 964 965 // Unsafe mechanics 966 private static final sun.misc.Unsafe UNSAFE; 967 private static final long allocationSpinLockOffset; 968 static { 969 try { 970 UNSAFE = sun.misc.Unsafe.getUnsafe(); 971 Class k = PriorityBlockingQueue.class; 972 allocationSpinLockOffset = UNSAFE.objectFieldOffset 973 (k.getDeclaredField("allocationSpinLock")); 974 } catch (Exception e) { 975 throw new Error(e); 976 } 977 } 978 }