1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.concurrent.locks.*; 39 import java.util.*; 40 41 /** 42 * An unbounded {@linkplain BlockingQueue blocking queue} that uses 43 * the same ordering rules as class {@link PriorityQueue} and supplies 44 * blocking retrieval operations. While this queue is logically 45 * unbounded, attempted additions may fail due to resource exhaustion 46 * (causing <tt>OutOfMemoryError</tt>). This class does not permit 47 * <tt>null</tt> elements. A priority queue relying on {@linkplain 48 * Comparable natural ordering} also does not permit insertion of 49 * non-comparable objects (doing so results in 50 * <tt>ClassCastException</tt>). 51 * 52 * <p>This class and its iterator implement all of the 53 * <em>optional</em> methods of the {@link Collection} and {@link 54 * Iterator} interfaces. The Iterator provided in method {@link 55 * #iterator()} is <em>not</em> guaranteed to traverse the elements of 56 * the PriorityBlockingQueue in any particular order. If you need 57 * ordered traversal, consider using 58 * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt> 59 * can be used to <em>remove</em> some or all elements in priority 60 * order and place them in another collection. 61 * 62 * <p>Operations on this class make no guarantees about the ordering 63 * of elements with equal priority. If you need to enforce an 64 * ordering, you can define custom classes or comparators that use a 65 * secondary key to break ties in primary priority values. For 66 * example, here is a class that applies first-in-first-out 67 * tie-breaking to comparable elements. To use it, you would insert a 68 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object. 69 * 70 * <pre> 71 * class FIFOEntry<E extends Comparable<? super E>> 72 * implements Comparable<FIFOEntry<E>> { 73 * final static AtomicLong seq = new AtomicLong(); 74 * final long seqNum; 75 * final E entry; 76 * public FIFOEntry(E entry) { 77 * seqNum = seq.getAndIncrement(); 78 * this.entry = entry; 79 * } 80 * public E getEntry() { return entry; } 81 * public int compareTo(FIFOEntry<E> other) { 82 * int res = entry.compareTo(other.entry); 83 * if (res == 0 && other.entry != this.entry) 84 * res = (seqNum < other.seqNum ? -1 : 1); 85 * return res; 86 * } 87 * }</pre> 88 * 89 * <p>This class is a member of the 90 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 91 * Java Collections Framework</a>. 92 * 93 * @since 1.5 94 * @author Doug Lea 95 * @param <E> the type of elements held in this collection 96 */ 97 public class PriorityBlockingQueue<E> extends AbstractQueue<E> 98 implements BlockingQueue<E>, java.io.Serializable { 99 private static final long serialVersionUID = 5595510919245408276L; 100 101 private final PriorityQueue<E> q; 102 private final ReentrantLock lock = new ReentrantLock(true); 103 private final Condition notEmpty = lock.newCondition(); 104 105 /** 106 * Creates a <tt>PriorityBlockingQueue</tt> with the default 107 * initial capacity (11) that orders its elements according to 108 * their {@linkplain Comparable natural ordering}. 109 */ 110 public PriorityBlockingQueue() { 111 q = new PriorityQueue<E>(); 112 } 113 114 /** 115 * Creates a <tt>PriorityBlockingQueue</tt> with the specified 116 * initial capacity that orders its elements according to their 117 * {@linkplain Comparable natural ordering}. 118 * 119 * @param initialCapacity the initial capacity for this priority queue 120 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less 121 * than 1 122 */ 123 public PriorityBlockingQueue(int initialCapacity) { 124 q = new PriorityQueue<E>(initialCapacity, null); 125 } 126 127 /** 128 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial 129 * capacity that orders its elements according to the specified 130 * comparator. 131 * 132 * @param initialCapacity the initial capacity for this priority queue 133 * @param comparator the comparator that will be used to order this 134 * priority queue. If {@code null}, the {@linkplain Comparable 135 * natural ordering} of the elements will be used. 136 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less 137 * than 1 138 */ 139 public PriorityBlockingQueue(int initialCapacity, 140 Comparator<? super E> comparator) { 141 q = new PriorityQueue<E>(initialCapacity, comparator); 142 } 143 144 /** 145 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements 146 * in the specified collection. If the specified collection is a 147 * {@link SortedSet} or a {@link PriorityQueue}, this 148 * priority queue will be ordered according to the same ordering. 149 * Otherwise, this priority queue will be ordered according to the 150 * {@linkplain Comparable natural ordering} of its elements. 151 * 152 * @param c the collection whose elements are to be placed 153 * into this priority queue 154 * @throws ClassCastException if elements of the specified collection 155 * cannot be compared to one another according to the priority 156 * queue's ordering 157 * @throws NullPointerException if the specified collection or any 158 * of its elements are null 159 */ 160 public PriorityBlockingQueue(Collection<? extends E> c) { 161 q = new PriorityQueue<E>(c); 162 } 163 164 /** 165 * Inserts the specified element into this priority queue. 166 * 167 * @param e the element to add 168 * @return <tt>true</tt> (as specified by {@link Collection#add}) 169 * @throws ClassCastException if the specified element cannot be compared 170 * with elements currently in the priority queue according to the 171 * priority queue's ordering 172 * @throws NullPointerException if the specified element is null 173 */ 174 public boolean add(E e) { 175 return offer(e); 176 } 177 178 /** 179 * Inserts the specified element into this priority queue. 180 * 181 * @param e the element to add 182 * @return <tt>true</tt> (as specified by {@link Queue#offer}) 183 * @throws ClassCastException if the specified element cannot be compared 184 * with elements currently in the priority queue according to the 185 * priority queue's ordering 186 * @throws NullPointerException if the specified element is null 187 */ 188 public boolean offer(E e) { 189 final ReentrantLock lock = this.lock; 190 lock.lock(); 191 try { 192 boolean ok = q.offer(e); 193 assert ok; 194 notEmpty.signal(); 195 return true; 196 } finally { 197 lock.unlock(); 198 } 199 } 200 201 /** 202 * Inserts the specified element into this priority queue. As the queue is 203 * unbounded this method will never block. 204 * 205 * @param e the element to add 206 * @throws ClassCastException if the specified element cannot be compared 207 * with elements currently in the priority queue according to the 208 * priority queue's ordering 209 * @throws NullPointerException if the specified element is null 210 */ 211 public void put(E e) { 212 offer(e); // never need to block 213 } 214 215 /** 216 * Inserts the specified element into this priority queue. As the queue is 217 * unbounded this method will never block. 218 * 219 * @param e the element to add 220 * @param timeout This parameter is ignored as the method never blocks 221 * @param unit This parameter is ignored as the method never blocks 222 * @return <tt>true</tt> 223 * @throws ClassCastException if the specified element cannot be compared 224 * with elements currently in the priority queue according to the 225 * priority queue's ordering 226 * @throws NullPointerException if the specified element is null 227 */ 228 public boolean offer(E e, long timeout, TimeUnit unit) { 229 return offer(e); // never need to block 230 } 231 232 public E poll() { 233 final ReentrantLock lock = this.lock; 234 lock.lock(); 235 try { 236 return q.poll(); 237 } finally { 238 lock.unlock(); 239 } 240 } 241 242 public E take() throws InterruptedException { 243 final ReentrantLock lock = this.lock; 244 lock.lockInterruptibly(); 245 try { 246 try { 247 while (q.size() == 0) 248 notEmpty.await(); 249 } catch (InterruptedException ie) { 250 notEmpty.signal(); // propagate to non-interrupted thread 251 throw ie; 252 } 253 E x = q.poll(); 254 assert x != null; 255 return x; 256 } finally { 257 lock.unlock(); 258 } 259 } 260 261 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 262 long nanos = unit.toNanos(timeout); 263 final ReentrantLock lock = this.lock; 264 lock.lockInterruptibly(); 265 try { 266 for (;;) { 267 E x = q.poll(); 268 if (x != null) 269 return x; 270 if (nanos <= 0) 271 return null; 272 try { 273 nanos = notEmpty.awaitNanos(nanos); 274 } catch (InterruptedException ie) { 275 notEmpty.signal(); // propagate to non-interrupted thread 276 throw ie; 277 } 278 } 279 } finally { 280 lock.unlock(); 281 } 282 } 283 284 public E peek() { 285 final ReentrantLock lock = this.lock; 286 lock.lock(); 287 try { 288 return q.peek(); 289 } finally { 290 lock.unlock(); 291 } 292 } 293 294 /** 295 * Returns the comparator used to order the elements in this queue, 296 * or <tt>null</tt> if this queue uses the {@linkplain Comparable 297 * natural ordering} of its elements. 298 * 299 * @return the comparator used to order the elements in this queue, 300 * or <tt>null</tt> if this queue uses the natural 301 * ordering of its elements 302 */ 303 public Comparator<? super E> comparator() { 304 return q.comparator(); 305 } 306 307 public int size() { 308 final ReentrantLock lock = this.lock; 309 lock.lock(); 310 try { 311 return q.size(); 312 } finally { 313 lock.unlock(); 314 } 315 } 316 317 /** 318 * Always returns <tt>Integer.MAX_VALUE</tt> because 319 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained. 320 * @return <tt>Integer.MAX_VALUE</tt> 321 */ 322 public int remainingCapacity() { 323 return Integer.MAX_VALUE; 324 } 325 326 /** 327 * Removes a single instance of the specified element from this queue, 328 * if it is present. More formally, removes an element {@code e} such 329 * that {@code o.equals(e)}, if this queue contains one or more such 330 * elements. Returns {@code true} if and only if this queue contained 331 * the specified element (or equivalently, if this queue changed as a 332 * result of the call). 333 * 334 * @param o element to be removed from this queue, if present 335 * @return <tt>true</tt> if this queue changed as a result of the call 336 */ 337 public boolean remove(Object o) { 338 final ReentrantLock lock = this.lock; 339 lock.lock(); 340 try { 341 return q.remove(o); 342 } finally { 343 lock.unlock(); 344 } 345 } 346 347 /** 348 * Returns {@code true} if this queue contains the specified element. 349 * More formally, returns {@code true} if and only if this queue contains 350 * at least one element {@code e} such that {@code o.equals(e)}. 351 * 352 * @param o object to be checked for containment in this queue 353 * @return <tt>true</tt> if this queue contains the specified element 354 */ 355 public boolean contains(Object o) { 356 final ReentrantLock lock = this.lock; 357 lock.lock(); 358 try { 359 return q.contains(o); 360 } finally { 361 lock.unlock(); 362 } 363 } 364 365 /** 366 * Returns an array containing all of the elements in this queue. 367 * The returned array elements are in no particular order. 368 * 369 * <p>The returned array will be "safe" in that no references to it are 370 * maintained by this queue. (In other words, this method must allocate 371 * a new array). The caller is thus free to modify the returned array. 372 * 373 * <p>This method acts as bridge between array-based and collection-based 374 * APIs. 375 * 376 * @return an array containing all of the elements in this queue 377 */ 378 public Object[] toArray() { 379 final ReentrantLock lock = this.lock; 380 lock.lock(); 381 try { 382 return q.toArray(); 383 } finally { 384 lock.unlock(); 385 } 386 } 387 388 389 public String toString() { 390 final ReentrantLock lock = this.lock; 391 lock.lock(); 392 try { 393 return q.toString(); 394 } finally { 395 lock.unlock(); 396 } 397 } 398 399 /** 400 * @throws UnsupportedOperationException {@inheritDoc} 401 * @throws ClassCastException {@inheritDoc} 402 * @throws NullPointerException {@inheritDoc} 403 * @throws IllegalArgumentException {@inheritDoc} 404 */ 405 public int drainTo(Collection<? super E> c) { 406 if (c == null) 407 throw new NullPointerException(); 408 if (c == this) 409 throw new IllegalArgumentException(); 410 final ReentrantLock lock = this.lock; 411 lock.lock(); 412 try { 413 int n = 0; 414 E e; 415 while ( (e = q.poll()) != null) { 416 c.add(e); 417 ++n; 418 } 419 return n; 420 } finally { 421 lock.unlock(); 422 } 423 } 424 425 /** 426 * @throws UnsupportedOperationException {@inheritDoc} 427 * @throws ClassCastException {@inheritDoc} 428 * @throws NullPointerException {@inheritDoc} 429 * @throws IllegalArgumentException {@inheritDoc} 430 */ 431 public int drainTo(Collection<? super E> c, int maxElements) { 432 if (c == null) 433 throw new NullPointerException(); 434 if (c == this) 435 throw new IllegalArgumentException(); 436 if (maxElements <= 0) 437 return 0; 438 final ReentrantLock lock = this.lock; 439 lock.lock(); 440 try { 441 int n = 0; 442 E e; 443 while (n < maxElements && (e = q.poll()) != null) { 444 c.add(e); 445 ++n; 446 } 447 return n; 448 } finally { 449 lock.unlock(); 450 } 451 } 452 453 /** 454 * Atomically removes all of the elements from this queue. 455 * The queue will be empty after this call returns. 456 */ 457 public void clear() { 458 final ReentrantLock lock = this.lock; 459 lock.lock(); 460 try { 461 q.clear(); 462 } finally { 463 lock.unlock(); 464 } 465 } 466 467 /** 468 * Returns an array containing all of the elements in this queue; the 469 * runtime type of the returned array is that of the specified array. 470 * The returned array elements are in no particular order. 471 * If the queue fits in the specified array, it is returned therein. 472 * Otherwise, a new array is allocated with the runtime type of the 473 * specified array and the size of this queue. 474 * 475 * <p>If this queue fits in the specified array with room to spare 476 * (i.e., the array has more elements than this queue), the element in 477 * the array immediately following the end of the queue is set to 478 * <tt>null</tt>. 479 * 480 * <p>Like the {@link #toArray()} method, this method acts as bridge between 481 * array-based and collection-based APIs. Further, this method allows 482 * precise control over the runtime type of the output array, and may, 483 * under certain circumstances, be used to save allocation costs. 484 * 485 * <p>Suppose <tt>x</tt> is a queue known to contain only strings. 486 * The following code can be used to dump the queue into a newly 487 * allocated array of <tt>String</tt>: 488 * 489 * <pre> 490 * String[] y = x.toArray(new String[0]);</pre> 491 * 492 * Note that <tt>toArray(new Object[0])</tt> is identical in function to 493 * <tt>toArray()</tt>. 494 * 495 * @param a the array into which the elements of the queue are to 496 * be stored, if it is big enough; otherwise, a new array of the 497 * same runtime type is allocated for this purpose 498 * @return an array containing all of the elements in this queue 499 * @throws ArrayStoreException if the runtime type of the specified array 500 * is not a supertype of the runtime type of every element in 501 * this queue 502 * @throws NullPointerException if the specified array is null 503 */ 504 public <T> T[] toArray(T[] a) { 505 final ReentrantLock lock = this.lock; 506 lock.lock(); 507 try { 508 return q.toArray(a); 509 } finally { 510 lock.unlock(); 511 } 512 } 513 514 /** 515 * Returns an iterator over the elements in this queue. The 516 * iterator does not return the elements in any particular order. 517 * The returned <tt>Iterator</tt> is a "weakly consistent" 518 * iterator that will never throw {@link 519 * ConcurrentModificationException}, and guarantees to traverse 520 * elements as they existed upon construction of the iterator, and 521 * may (but is not guaranteed to) reflect any modifications 522 * subsequent to construction. 523 * 524 * @return an iterator over the elements in this queue 525 */ 526 public Iterator<E> iterator() { 527 return new Itr(toArray()); 528 } 529 530 /** 531 * Snapshot iterator that works off copy of underlying q array. 532 */ 533 private class Itr implements Iterator<E> { 534 final Object[] array; // Array of all elements 535 int cursor; // index of next element to return; 536 int lastRet; // index of last element, or -1 if no such 537 538 Itr(Object[] array) { 539 lastRet = -1; 540 this.array = array; 541 } 542 543 public boolean hasNext() { 544 return cursor < array.length; 545 } 546 547 public E next() { 548 if (cursor >= array.length) 549 throw new NoSuchElementException(); 550 lastRet = cursor; 551 return (E)array[cursor++]; 552 } 553 554 public void remove() { 555 if (lastRet < 0) 556 throw new IllegalStateException(); 557 Object x = array[lastRet]; 558 lastRet = -1; 559 // Traverse underlying queue to find == element, 560 // not just a .equals element. 561 lock.lock(); 562 try { 563 for (Iterator it = q.iterator(); it.hasNext(); ) { 564 if (it.next() == x) { 565 it.remove(); 566 return; 567 } 568 } 569 } finally { 570 lock.unlock(); 571 } 572 } 573 } 574 575 /** 576 * Saves the state to a stream (that is, serializes it). This 577 * merely wraps default serialization within lock. The 578 * serialization strategy for items is left to underlying 579 * Queue. Note that locking is not needed on deserialization, so 580 * readObject is not defined, just relying on default. 581 */ 582 private void writeObject(java.io.ObjectOutputStream s) 583 throws java.io.IOException { 584 lock.lock(); 585 try { 586 s.defaultWriteObject(); 587 } finally { 588 lock.unlock(); 589 } 590 } 591 592 }