Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java
+++ new/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation. Oracle designates this
7 7 * particular file as subject to the "Classpath" exception as provided
8 8 * by Oracle in the LICENSE file that accompanied this code.
9 9 *
10 10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 13 * version 2 for more details (a copy is included in the LICENSE file that
14 14 * accompanied this code).
15 15 *
16 16 * You should have received a copy of the GNU General Public License version
17 17 * 2 along with this work; if not, write to the Free Software Foundation,
18 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 19 *
20 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 21 * or visit www.oracle.com if you need additional information or have any
22 22 * questions.
23 23 */
24 24
25 25 /*
26 26 * This file is available under and governed by the GNU General Public
27 27 * License version 2 only, as published by the Free Software Foundation.
↓ open down ↓ |
27 lines elided |
↑ open up ↑ |
28 28 * However, the following notice accompanied the original version of this
29 29 * file:
30 30 *
31 31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 32 * Expert Group and released to the public domain, as explained at
33 33 * http://creativecommons.org/publicdomain/zero/1.0/
34 34 */
35 35
36 36 package java.util.concurrent;
37 37
38 -import java.util.concurrent.locks.*;
38 +import java.util.concurrent.locks.Condition;
39 +import java.util.concurrent.locks.ReentrantLock;
39 40 import java.util.*;
40 41
41 42 /**
42 43 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
43 44 * the same ordering rules as class {@link PriorityQueue} and supplies
44 45 * blocking retrieval operations. While this queue is logically
45 46 * unbounded, attempted additions may fail due to resource exhaustion
46 47 * (causing {@code OutOfMemoryError}). This class does not permit
47 48 * {@code null} elements. A priority queue relying on {@linkplain
48 49 * Comparable natural ordering} also does not permit insertion of
49 50 * non-comparable objects (doing so results in
50 51 * {@code ClassCastException}).
51 52 *
52 53 * <p>This class and its iterator implement all of the
53 54 * <em>optional</em> methods of the {@link Collection} and {@link
54 55 * Iterator} interfaces. The Iterator provided in method {@link
55 56 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
56 57 * the PriorityBlockingQueue in any particular order. If you need
57 58 * ordered traversal, consider using
58 59 * {@code Arrays.sort(pq.toArray())}. Also, method {@code drainTo}
59 60 * can be used to <em>remove</em> some or all elements in priority
60 61 * order and place them in another collection.
61 62 *
62 63 * <p>Operations on this class make no guarantees about the ordering
63 64 * of elements with equal priority. If you need to enforce an
64 65 * ordering, you can define custom classes or comparators that use a
65 66 * secondary key to break ties in primary priority values. For
66 67 * example, here is a class that applies first-in-first-out
67 68 * tie-breaking to comparable elements. To use it, you would insert a
68 69 * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
69 70 *
70 71 * <pre> {@code
71 72 * class FIFOEntry<E extends Comparable<? super E>>
72 73 * implements Comparable<FIFOEntry<E>> {
73 74 * static final AtomicLong seq = new AtomicLong(0);
74 75 * final long seqNum;
75 76 * final E entry;
76 77 * public FIFOEntry(E entry) {
77 78 * seqNum = seq.getAndIncrement();
78 79 * this.entry = entry;
79 80 * }
80 81 * public E getEntry() { return entry; }
81 82 * public int compareTo(FIFOEntry<E> other) {
82 83 * int res = entry.compareTo(other.entry);
83 84 * if (res == 0 && other.entry != this.entry)
84 85 * res = (seqNum < other.seqNum ? -1 : 1);
85 86 * return res;
86 87 * }
87 88 * }}</pre>
88 89 *
89 90 * <p>This class is a member of the
90 91 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
91 92 * Java Collections Framework</a>.
92 93 *
93 94 * @since 1.5
94 95 * @author Doug Lea
95 96 * @param <E> the type of elements held in this collection
96 97 */
97 98 @SuppressWarnings("unchecked")
98 99 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
99 100 implements BlockingQueue<E>, java.io.Serializable {
100 101 private static final long serialVersionUID = 5595510919245408276L;
101 102
102 103 /*
103 104 * The implementation uses an array-based binary heap, with public
↓ open down ↓ |
55 lines elided |
↑ open up ↑ |
104 105 * operations protected with a single lock. However, allocation
105 106 * during resizing uses a simple spinlock (used only while not
106 107 * holding main lock) in order to allow takes to operate
107 108 * concurrently with allocation. This avoids repeated
108 109 * postponement of waiting consumers and consequent element
109 110 * build-up. The need to back away from lock during allocation
110 111 * makes it impossible to simply wrap delegated
111 112 * java.util.PriorityQueue operations within a lock, as was done
112 113 * in a previous version of this class. To maintain
113 114 * interoperability, a plain PriorityQueue is still used during
114 - * serialization, which maintains compatibility at the espense of
115 + * serialization, which maintains compatibility at the expense of
115 116 * transiently doubling overhead.
116 117 */
117 118
118 119 /**
119 120 * Default array capacity.
120 121 */
121 122 private static final int DEFAULT_INITIAL_CAPACITY = 11;
122 123
123 124 /**
124 125 * The maximum size of array to allocate.
125 126 * Some VMs reserve some header words in an array.
126 127 * Attempts to allocate larger arrays may result in
127 128 * OutOfMemoryError: Requested array size exceeds VM limit
128 129 */
129 130 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
130 131
131 132 /**
132 133 * Priority queue represented as a balanced binary heap: the two
133 134 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
134 135 * priority queue is ordered by comparator, or by the elements'
135 136 * natural ordering, if comparator is null: For each node n in the
136 137 * heap and each descendant d of n, n <= d. The element with the
137 138 * lowest value is in queue[0], assuming the queue is nonempty.
138 139 */
139 140 private transient Object[] queue;
140 141
141 142 /**
142 143 * The number of elements in the priority queue.
143 144 */
144 145 private transient int size;
145 146
146 147 /**
147 148 * The comparator, or null if priority queue uses elements'
148 149 * natural ordering.
149 150 */
150 151 private transient Comparator<? super E> comparator;
151 152
152 153 /**
153 154 * Lock used for all public operations
154 155 */
155 156 private final ReentrantLock lock;
156 157
157 158 /**
158 159 * Condition for blocking when empty
159 160 */
160 161 private final Condition notEmpty;
161 162
162 163 /**
163 164 * Spinlock for allocation, acquired via CAS.
164 165 */
165 166 private transient volatile int allocationSpinLock;
166 167
167 168 /**
168 169 * A plain PriorityQueue used only for serialization,
169 170 * to maintain compatibility with previous versions
170 171 * of this class. Non-null only during serialization/deserialization.
171 172 */
172 173 private PriorityQueue<E> q;
173 174
174 175 /**
175 176 * Creates a {@code PriorityBlockingQueue} with the default
176 177 * initial capacity (11) that orders its elements according to
177 178 * their {@linkplain Comparable natural ordering}.
178 179 */
179 180 public PriorityBlockingQueue() {
180 181 this(DEFAULT_INITIAL_CAPACITY, null);
181 182 }
182 183
183 184 /**
184 185 * Creates a {@code PriorityBlockingQueue} with the specified
185 186 * initial capacity that orders its elements according to their
186 187 * {@linkplain Comparable natural ordering}.
187 188 *
188 189 * @param initialCapacity the initial capacity for this priority queue
189 190 * @throws IllegalArgumentException if {@code initialCapacity} is less
190 191 * than 1
191 192 */
192 193 public PriorityBlockingQueue(int initialCapacity) {
193 194 this(initialCapacity, null);
194 195 }
195 196
196 197 /**
197 198 * Creates a {@code PriorityBlockingQueue} with the specified initial
198 199 * capacity that orders its elements according to the specified
199 200 * comparator.
200 201 *
201 202 * @param initialCapacity the initial capacity for this priority queue
202 203 * @param comparator the comparator that will be used to order this
203 204 * priority queue. If {@code null}, the {@linkplain Comparable
204 205 * natural ordering} of the elements will be used.
205 206 * @throws IllegalArgumentException if {@code initialCapacity} is less
206 207 * than 1
207 208 */
208 209 public PriorityBlockingQueue(int initialCapacity,
209 210 Comparator<? super E> comparator) {
210 211 if (initialCapacity < 1)
211 212 throw new IllegalArgumentException();
212 213 this.lock = new ReentrantLock();
213 214 this.notEmpty = lock.newCondition();
214 215 this.comparator = comparator;
215 216 this.queue = new Object[initialCapacity];
216 217 }
217 218
218 219 /**
219 220 * Creates a {@code PriorityBlockingQueue} containing the elements
220 221 * in the specified collection. If the specified collection is a
221 222 * {@link SortedSet} or a {@link PriorityQueue}, this
222 223 * priority queue will be ordered according to the same ordering.
223 224 * Otherwise, this priority queue will be ordered according to the
224 225 * {@linkplain Comparable natural ordering} of its elements.
225 226 *
226 227 * @param c the collection whose elements are to be placed
227 228 * into this priority queue
228 229 * @throws ClassCastException if elements of the specified collection
229 230 * cannot be compared to one another according to the priority
230 231 * queue's ordering
231 232 * @throws NullPointerException if the specified collection or any
232 233 * of its elements are null
233 234 */
234 235 public PriorityBlockingQueue(Collection<? extends E> c) {
235 236 this.lock = new ReentrantLock();
236 237 this.notEmpty = lock.newCondition();
237 238 boolean heapify = true; // true if not known to be in heap order
238 239 boolean screen = true; // true if must screen for nulls
239 240 if (c instanceof SortedSet<?>) {
240 241 SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
241 242 this.comparator = (Comparator<? super E>) ss.comparator();
242 243 heapify = false;
243 244 }
244 245 else if (c instanceof PriorityBlockingQueue<?>) {
245 246 PriorityBlockingQueue<? extends E> pq =
246 247 (PriorityBlockingQueue<? extends E>) c;
247 248 this.comparator = (Comparator<? super E>) pq.comparator();
248 249 screen = false;
249 250 if (pq.getClass() == PriorityBlockingQueue.class) // exact match
250 251 heapify = false;
251 252 }
252 253 Object[] a = c.toArray();
253 254 int n = a.length;
254 255 // If c.toArray incorrectly doesn't return Object[], copy it.
255 256 if (a.getClass() != Object[].class)
256 257 a = Arrays.copyOf(a, n, Object[].class);
257 258 if (screen && (n == 1 || this.comparator != null)) {
258 259 for (int i = 0; i < n; ++i)
259 260 if (a[i] == null)
260 261 throw new NullPointerException();
261 262 }
262 263 this.queue = a;
263 264 this.size = n;
264 265 if (heapify)
265 266 heapify();
266 267 }
267 268
268 269 /**
269 270 * Tries to grow array to accommodate at least one more element
270 271 * (but normally expand by about 50%), giving up (allowing retry)
271 272 * on contention (which we expect to be rare). Call only while
272 273 * holding lock.
273 274 *
274 275 * @param array the heap array
275 276 * @param oldCap the length of the array
276 277 */
277 278 private void tryGrow(Object[] array, int oldCap) {
278 279 lock.unlock(); // must release and then re-acquire main lock
279 280 Object[] newArray = null;
280 281 if (allocationSpinLock == 0 &&
281 282 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
282 283 0, 1)) {
283 284 try {
284 285 int newCap = oldCap + ((oldCap < 64) ?
285 286 (oldCap + 2) : // grow faster if small
286 287 (oldCap >> 1));
287 288 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
288 289 int minCap = oldCap + 1;
289 290 if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
290 291 throw new OutOfMemoryError();
291 292 newCap = MAX_ARRAY_SIZE;
292 293 }
293 294 if (newCap > oldCap && queue == array)
294 295 newArray = new Object[newCap];
295 296 } finally {
296 297 allocationSpinLock = 0;
297 298 }
298 299 }
299 300 if (newArray == null) // back off if another thread is allocating
300 301 Thread.yield();
↓ open down ↓ |
176 lines elided |
↑ open up ↑ |
301 302 lock.lock();
302 303 if (newArray != null && queue == array) {
303 304 queue = newArray;
304 305 System.arraycopy(array, 0, newArray, 0, oldCap);
305 306 }
306 307 }
307 308
308 309 /**
309 310 * Mechanics for poll(). Call only while holding lock.
310 311 */
311 - private E extract() {
312 - E result;
312 + private E dequeue() {
313 313 int n = size - 1;
314 314 if (n < 0)
315 - result = null;
315 + return null;
316 316 else {
317 317 Object[] array = queue;
318 - result = (E) array[0];
318 + E result = (E) array[0];
319 319 E x = (E) array[n];
320 320 array[n] = null;
321 321 Comparator<? super E> cmp = comparator;
322 322 if (cmp == null)
323 323 siftDownComparable(0, x, array, n);
324 324 else
325 325 siftDownUsingComparator(0, x, array, n, cmp);
326 326 size = n;
327 + return result;
327 328 }
328 - return result;
329 329 }
330 330
331 331 /**
332 332 * Inserts item x at position k, maintaining heap invariant by
333 333 * promoting x up the tree until it is greater than or equal to
334 334 * its parent, or is the root.
335 335 *
336 336 * To simplify and speed up coercions and comparisons. the
337 337 * Comparable and Comparator versions are separated into different
338 338 * methods that are otherwise identical. (Similarly for siftDown.)
339 339 * These methods are static, with heap state as arguments, to
340 340 * simplify use in light of possible comparator exceptions.
341 341 *
342 342 * @param k the position to fill
343 343 * @param x the item to insert
344 344 * @param array the heap array
345 345 * @param n heap size
346 346 */
347 347 private static <T> void siftUpComparable(int k, T x, Object[] array) {
348 348 Comparable<? super T> key = (Comparable<? super T>) x;
349 349 while (k > 0) {
350 350 int parent = (k - 1) >>> 1;
351 351 Object e = array[parent];
352 352 if (key.compareTo((T) e) >= 0)
353 353 break;
354 354 array[k] = e;
355 355 k = parent;
356 356 }
357 357 array[k] = key;
358 358 }
359 359
360 360 private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
361 361 Comparator<? super T> cmp) {
362 362 while (k > 0) {
363 363 int parent = (k - 1) >>> 1;
364 364 Object e = array[parent];
365 365 if (cmp.compare(x, (T) e) >= 0)
366 366 break;
367 367 array[k] = e;
368 368 k = parent;
369 369 }
370 370 array[k] = x;
371 371 }
372 372
373 373 /**
374 374 * Inserts item x at position k, maintaining heap invariant by
↓ open down ↓ |
36 lines elided |
↑ open up ↑ |
375 375 * demoting x down the tree repeatedly until it is less than or
376 376 * equal to its children or is a leaf.
377 377 *
378 378 * @param k the position to fill
379 379 * @param x the item to insert
380 380 * @param array the heap array
381 381 * @param n heap size
382 382 */
383 383 private static <T> void siftDownComparable(int k, T x, Object[] array,
384 384 int n) {
385 - Comparable<? super T> key = (Comparable<? super T>)x;
386 - int half = n >>> 1; // loop while a non-leaf
387 - while (k < half) {
388 - int child = (k << 1) + 1; // assume left child is least
389 - Object c = array[child];
390 - int right = child + 1;
391 - if (right < n &&
392 - ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
393 - c = array[child = right];
394 - if (key.compareTo((T) c) <= 0)
395 - break;
396 - array[k] = c;
397 - k = child;
385 + if (n > 0) {
386 + Comparable<? super T> key = (Comparable<? super T>)x;
387 + int half = n >>> 1; // loop while a non-leaf
388 + while (k < half) {
389 + int child = (k << 1) + 1; // assume left child is least
390 + Object c = array[child];
391 + int right = child + 1;
392 + if (right < n &&
393 + ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
394 + c = array[child = right];
395 + if (key.compareTo((T) c) <= 0)
396 + break;
397 + array[k] = c;
398 + k = child;
399 + }
400 + array[k] = key;
398 401 }
399 - array[k] = key;
400 402 }
401 403
402 404 private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
403 405 int n,
404 406 Comparator<? super T> cmp) {
405 - int half = n >>> 1;
406 - while (k < half) {
407 - int child = (k << 1) + 1;
408 - Object c = array[child];
409 - int right = child + 1;
410 - if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
411 - c = array[child = right];
412 - if (cmp.compare(x, (T) c) <= 0)
413 - break;
414 - array[k] = c;
415 - k = child;
407 + if (n > 0) {
408 + int half = n >>> 1;
409 + while (k < half) {
410 + int child = (k << 1) + 1;
411 + Object c = array[child];
412 + int right = child + 1;
413 + if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
414 + c = array[child = right];
415 + if (cmp.compare(x, (T) c) <= 0)
416 + break;
417 + array[k] = c;
418 + k = child;
419 + }
420 + array[k] = x;
416 421 }
417 - array[k] = x;
418 422 }
419 423
420 424 /**
421 425 * Establishes the heap invariant (described above) in the entire tree,
422 426 * assuming nothing about the order of the elements prior to the call.
423 427 */
424 428 private void heapify() {
425 429 Object[] array = queue;
426 430 int n = size;
427 431 int half = (n >>> 1) - 1;
428 432 Comparator<? super E> cmp = comparator;
429 433 if (cmp == null) {
430 434 for (int i = half; i >= 0; i--)
431 435 siftDownComparable(i, (E) array[i], array, n);
432 436 }
433 437 else {
434 438 for (int i = half; i >= 0; i--)
435 439 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
436 440 }
437 441 }
438 442
439 443 /**
440 444 * Inserts the specified element into this priority queue.
441 445 *
442 446 * @param e the element to add
443 447 * @return {@code true} (as specified by {@link Collection#add})
444 448 * @throws ClassCastException if the specified element cannot be compared
445 449 * with elements currently in the priority queue according to the
446 450 * priority queue's ordering
447 451 * @throws NullPointerException if the specified element is null
448 452 */
449 453 public boolean add(E e) {
450 454 return offer(e);
451 455 }
452 456
453 457 /**
454 458 * Inserts the specified element into this priority queue.
455 459 * As the queue is unbounded, this method will never return {@code false}.
456 460 *
457 461 * @param e the element to add
458 462 * @return {@code true} (as specified by {@link Queue#offer})
459 463 * @throws ClassCastException if the specified element cannot be compared
460 464 * with elements currently in the priority queue according to the
461 465 * priority queue's ordering
462 466 * @throws NullPointerException if the specified element is null
463 467 */
464 468 public boolean offer(E e) {
465 469 if (e == null)
466 470 throw new NullPointerException();
467 471 final ReentrantLock lock = this.lock;
468 472 lock.lock();
469 473 int n, cap;
470 474 Object[] array;
471 475 while ((n = size) >= (cap = (array = queue).length))
472 476 tryGrow(array, cap);
473 477 try {
474 478 Comparator<? super E> cmp = comparator;
475 479 if (cmp == null)
476 480 siftUpComparable(n, e, array);
477 481 else
478 482 siftUpUsingComparator(n, e, array, cmp);
479 483 size = n + 1;
480 484 notEmpty.signal();
481 485 } finally {
482 486 lock.unlock();
483 487 }
484 488 return true;
485 489 }
486 490
487 491 /**
488 492 * Inserts the specified element into this priority queue.
489 493 * As the queue is unbounded, this method will never block.
490 494 *
491 495 * @param e the element to add
492 496 * @throws ClassCastException if the specified element cannot be compared
493 497 * with elements currently in the priority queue according to the
494 498 * priority queue's ordering
495 499 * @throws NullPointerException if the specified element is null
496 500 */
497 501 public void put(E e) {
498 502 offer(e); // never need to block
499 503 }
500 504
501 505 /**
502 506 * Inserts the specified element into this priority queue.
503 507 * As the queue is unbounded, this method will never block or
504 508 * return {@code false}.
505 509 *
506 510 * @param e the element to add
507 511 * @param timeout This parameter is ignored as the method never blocks
508 512 * @param unit This parameter is ignored as the method never blocks
509 513 * @return {@code true} (as specified by
510 514 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
511 515 * @throws ClassCastException if the specified element cannot be compared
512 516 * with elements currently in the priority queue according to the
↓ open down ↓ |
85 lines elided |
↑ open up ↑ |
513 517 * priority queue's ordering
514 518 * @throws NullPointerException if the specified element is null
515 519 */
516 520 public boolean offer(E e, long timeout, TimeUnit unit) {
517 521 return offer(e); // never need to block
518 522 }
519 523
520 524 public E poll() {
521 525 final ReentrantLock lock = this.lock;
522 526 lock.lock();
523 - E result;
524 527 try {
525 - result = extract();
528 + return dequeue();
526 529 } finally {
527 530 lock.unlock();
528 531 }
529 - return result;
530 532 }
531 533
532 534 public E take() throws InterruptedException {
533 535 final ReentrantLock lock = this.lock;
534 536 lock.lockInterruptibly();
535 537 E result;
536 538 try {
537 - while ( (result = extract()) == null)
539 + while ( (result = dequeue()) == null)
538 540 notEmpty.await();
539 541 } finally {
540 542 lock.unlock();
541 543 }
542 544 return result;
543 545 }
544 546
545 547 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
546 548 long nanos = unit.toNanos(timeout);
547 549 final ReentrantLock lock = this.lock;
548 550 lock.lockInterruptibly();
549 551 E result;
550 552 try {
551 - while ( (result = extract()) == null && nanos > 0)
553 + while ( (result = dequeue()) == null && nanos > 0)
552 554 nanos = notEmpty.awaitNanos(nanos);
553 555 } finally {
554 556 lock.unlock();
555 557 }
556 558 return result;
557 559 }
558 560
559 561 public E peek() {
560 562 final ReentrantLock lock = this.lock;
561 563 lock.lock();
562 - E result;
563 564 try {
564 - result = size > 0 ? (E) queue[0] : null;
565 + return (size == 0) ? null : (E) queue[0];
565 566 } finally {
566 567 lock.unlock();
567 568 }
568 - return result;
569 569 }
570 570
571 571 /**
572 572 * Returns the comparator used to order the elements in this queue,
573 573 * or {@code null} if this queue uses the {@linkplain Comparable
574 574 * natural ordering} of its elements.
575 575 *
576 576 * @return the comparator used to order the elements in this queue,
577 577 * or {@code null} if this queue uses the natural
578 578 * ordering of its elements
579 579 */
580 580 public Comparator<? super E> comparator() {
581 581 return comparator;
582 582 }
583 583
584 584 public int size() {
585 585 final ReentrantLock lock = this.lock;
586 586 lock.lock();
587 587 try {
588 588 return size;
589 589 } finally {
590 590 lock.unlock();
591 591 }
592 592 }
593 593
594 594 /**
595 595 * Always returns {@code Integer.MAX_VALUE} because
596 596 * a {@code PriorityBlockingQueue} is not capacity constrained.
597 597 * @return {@code Integer.MAX_VALUE} always
598 598 */
599 599 public int remainingCapacity() {
600 600 return Integer.MAX_VALUE;
601 601 }
602 602
603 603 private int indexOf(Object o) {
604 604 if (o != null) {
605 605 Object[] array = queue;
606 606 int n = size;
607 607 for (int i = 0; i < n; i++)
608 608 if (o.equals(array[i]))
609 609 return i;
610 610 }
611 611 return -1;
612 612 }
613 613
614 614 /**
615 615 * Removes the ith element from queue.
616 616 */
617 617 private void removeAt(int i) {
618 618 Object[] array = queue;
619 619 int n = size - 1;
620 620 if (n == i) // removed last element
621 621 array[i] = null;
622 622 else {
623 623 E moved = (E) array[n];
624 624 array[n] = null;
625 625 Comparator<? super E> cmp = comparator;
626 626 if (cmp == null)
627 627 siftDownComparable(i, moved, array, n);
628 628 else
629 629 siftDownUsingComparator(i, moved, array, n, cmp);
630 630 if (array[i] == moved) {
631 631 if (cmp == null)
632 632 siftUpComparable(i, moved, array);
633 633 else
634 634 siftUpUsingComparator(i, moved, array, cmp);
635 635 }
636 636 }
637 637 size = n;
638 638 }
639 639
640 640 /**
641 641 * Removes a single instance of the specified element from this queue,
↓ open down ↓ |
63 lines elided |
↑ open up ↑ |
642 642 * if it is present. More formally, removes an element {@code e} such
643 643 * that {@code o.equals(e)}, if this queue contains one or more such
644 644 * elements. Returns {@code true} if and only if this queue contained
645 645 * the specified element (or equivalently, if this queue changed as a
646 646 * result of the call).
647 647 *
648 648 * @param o element to be removed from this queue, if present
649 649 * @return {@code true} if this queue changed as a result of the call
650 650 */
651 651 public boolean remove(Object o) {
652 - boolean removed = false;
653 652 final ReentrantLock lock = this.lock;
654 653 lock.lock();
655 654 try {
656 655 int i = indexOf(o);
657 - if (i != -1) {
658 - removeAt(i);
659 - removed = true;
660 - }
656 + if (i == -1)
657 + return false;
658 + removeAt(i);
659 + return true;
661 660 } finally {
662 661 lock.unlock();
663 662 }
664 - return removed;
665 663 }
666 664
667 -
668 665 /**
669 666 * Identity-based version for use in Itr.remove
670 667 */
671 - private void removeEQ(Object o) {
668 + void removeEQ(Object o) {
672 669 final ReentrantLock lock = this.lock;
673 670 lock.lock();
674 671 try {
675 672 Object[] array = queue;
676 - int n = size;
677 - for (int i = 0; i < n; i++) {
673 + for (int i = 0, n = size; i < n; i++) {
678 674 if (o == array[i]) {
679 675 removeAt(i);
680 676 break;
681 677 }
682 678 }
683 679 } finally {
684 680 lock.unlock();
685 681 }
686 682 }
687 683
688 684 /**
689 685 * Returns {@code true} if this queue contains the specified element.
690 686 * More formally, returns {@code true} if and only if this queue contains
691 687 * at least one element {@code e} such that {@code o.equals(e)}.
692 688 *
693 689 * @param o object to be checked for containment in this queue
694 690 * @return {@code true} if this queue contains the specified element
695 691 */
696 692 public boolean contains(Object o) {
697 - int index;
698 693 final ReentrantLock lock = this.lock;
699 694 lock.lock();
700 695 try {
701 - index = indexOf(o);
696 + return indexOf(o) != -1;
702 697 } finally {
703 698 lock.unlock();
704 699 }
705 - return index != -1;
706 700 }
707 701
708 702 /**
709 703 * Returns an array containing all of the elements in this queue.
710 704 * The returned array elements are in no particular order.
711 705 *
712 706 * <p>The returned array will be "safe" in that no references to it are
713 707 * maintained by this queue. (In other words, this method must allocate
714 708 * a new array). The caller is thus free to modify the returned array.
715 709 *
716 710 * <p>This method acts as bridge between array-based and collection-based
717 711 * APIs.
718 712 *
719 713 * @return an array containing all of the elements in this queue
720 714 */
↓ open down ↓ |
5 lines elided |
↑ open up ↑ |
721 715 public Object[] toArray() {
722 716 final ReentrantLock lock = this.lock;
723 717 lock.lock();
724 718 try {
725 719 return Arrays.copyOf(queue, size);
726 720 } finally {
727 721 lock.unlock();
728 722 }
729 723 }
730 724
731 -
732 725 public String toString() {
733 726 final ReentrantLock lock = this.lock;
734 727 lock.lock();
735 728 try {
736 729 int n = size;
737 730 if (n == 0)
738 731 return "[]";
739 732 StringBuilder sb = new StringBuilder();
740 733 sb.append('[');
741 734 for (int i = 0; i < n; ++i) {
742 - E e = (E)queue[i];
735 + Object e = queue[i];
743 736 sb.append(e == this ? "(this Collection)" : e);
744 737 if (i != n - 1)
745 738 sb.append(',').append(' ');
746 739 }
747 740 return sb.append(']').toString();
748 741 } finally {
749 742 lock.unlock();
750 743 }
751 744 }
752 745
753 746 /**
754 747 * @throws UnsupportedOperationException {@inheritDoc}
755 748 * @throws ClassCastException {@inheritDoc}
756 749 * @throws NullPointerException {@inheritDoc}
757 750 * @throws IllegalArgumentException {@inheritDoc}
758 751 */
759 752 public int drainTo(Collection<? super E> c) {
760 - if (c == null)
761 - throw new NullPointerException();
762 - if (c == this)
763 - throw new IllegalArgumentException();
764 - final ReentrantLock lock = this.lock;
765 - lock.lock();
766 - try {
767 - int n = 0;
768 - E e;
769 - while ( (e = extract()) != null) {
770 - c.add(e);
771 - ++n;
772 - }
773 - return n;
774 - } finally {
775 - lock.unlock();
776 - }
753 + return drainTo(c, Integer.MAX_VALUE);
777 754 }
778 755
779 756 /**
780 757 * @throws UnsupportedOperationException {@inheritDoc}
781 758 * @throws ClassCastException {@inheritDoc}
782 759 * @throws NullPointerException {@inheritDoc}
783 760 * @throws IllegalArgumentException {@inheritDoc}
784 761 */
785 762 public int drainTo(Collection<? super E> c, int maxElements) {
786 763 if (c == null)
787 764 throw new NullPointerException();
788 765 if (c == this)
789 766 throw new IllegalArgumentException();
790 767 if (maxElements <= 0)
791 768 return 0;
792 769 final ReentrantLock lock = this.lock;
793 770 lock.lock();
794 771 try {
795 - int n = 0;
796 - E e;
797 - while (n < maxElements && (e = extract()) != null) {
798 - c.add(e);
799 - ++n;
772 + int n = Math.min(size, maxElements);
773 + for (int i = 0; i < n; i++) {
774 + c.add((E) queue[0]); // In this order, in case add() throws.
775 + dequeue();
800 776 }
801 777 return n;
802 778 } finally {
803 779 lock.unlock();
804 780 }
805 781 }
806 782
807 783 /**
808 784 * Atomically removes all of the elements from this queue.
809 785 * The queue will be empty after this call returns.
810 786 */
811 787 public void clear() {
812 788 final ReentrantLock lock = this.lock;
813 789 lock.lock();
814 790 try {
815 791 Object[] array = queue;
816 792 int n = size;
817 793 size = 0;
818 794 for (int i = 0; i < n; i++)
819 795 array[i] = null;
820 796 } finally {
821 797 lock.unlock();
822 798 }
823 799 }
824 800
825 801 /**
826 802 * Returns an array containing all of the elements in this queue; the
827 803 * runtime type of the returned array is that of the specified array.
828 804 * The returned array elements are in no particular order.
829 805 * If the queue fits in the specified array, it is returned therein.
830 806 * Otherwise, a new array is allocated with the runtime type of the
831 807 * specified array and the size of this queue.
832 808 *
833 809 * <p>If this queue fits in the specified array with room to spare
834 810 * (i.e., the array has more elements than this queue), the element in
835 811 * the array immediately following the end of the queue is set to
836 812 * {@code null}.
↓ open down ↓ |
27 lines elided |
↑ open up ↑ |
837 813 *
838 814 * <p>Like the {@link #toArray()} method, this method acts as bridge between
839 815 * array-based and collection-based APIs. Further, this method allows
840 816 * precise control over the runtime type of the output array, and may,
841 817 * under certain circumstances, be used to save allocation costs.
842 818 *
843 819 * <p>Suppose {@code x} is a queue known to contain only strings.
844 820 * The following code can be used to dump the queue into a newly
845 821 * allocated array of {@code String}:
846 822 *
847 - * <pre>
848 - * String[] y = x.toArray(new String[0]);</pre>
823 + * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
849 824 *
850 825 * Note that {@code toArray(new Object[0])} is identical in function to
851 826 * {@code toArray()}.
852 827 *
853 828 * @param a the array into which the elements of the queue are to
854 829 * be stored, if it is big enough; otherwise, a new array of the
855 830 * same runtime type is allocated for this purpose
856 831 * @return an array containing all of the elements in this queue
857 832 * @throws ArrayStoreException if the runtime type of the specified array
858 833 * is not a supertype of the runtime type of every element in
859 834 * this queue
860 835 * @throws NullPointerException if the specified array is null
861 836 */
862 837 public <T> T[] toArray(T[] a) {
863 838 final ReentrantLock lock = this.lock;
864 839 lock.lock();
865 840 try {
866 841 int n = size;
867 842 if (a.length < n)
868 843 // Make a new array of a's runtime type, but my contents:
869 844 return (T[]) Arrays.copyOf(queue, size, a.getClass());
870 845 System.arraycopy(queue, 0, a, 0, n);
871 846 if (a.length > n)
872 847 a[n] = null;
873 848 return a;
874 849 } finally {
875 850 lock.unlock();
876 851 }
877 852 }
878 853
879 854 /**
880 855 * Returns an iterator over the elements in this queue. The
881 856 * iterator does not return the elements in any particular order.
882 857 *
883 858 * <p>The returned iterator is a "weakly consistent" iterator that
884 859 * will never throw {@link java.util.ConcurrentModificationException
885 860 * ConcurrentModificationException}, and guarantees to traverse
886 861 * elements as they existed upon construction of the iterator, and
887 862 * may (but is not guaranteed to) reflect any modifications
888 863 * subsequent to construction.
889 864 *
890 865 * @return an iterator over the elements in this queue
↓ open down ↓ |
32 lines elided |
↑ open up ↑ |
891 866 */
892 867 public Iterator<E> iterator() {
893 868 return new Itr(toArray());
894 869 }
895 870
896 871 /**
897 872 * Snapshot iterator that works off copy of underlying q array.
898 873 */
899 874 final class Itr implements Iterator<E> {
900 875 final Object[] array; // Array of all elements
901 - int cursor; // index of next element to return;
876 + int cursor; // index of next element to return
902 877 int lastRet; // index of last element, or -1 if no such
903 878
904 879 Itr(Object[] array) {
905 880 lastRet = -1;
906 881 this.array = array;
907 882 }
908 883
909 884 public boolean hasNext() {
910 885 return cursor < array.length;
911 886 }
912 887
913 888 public E next() {
914 889 if (cursor >= array.length)
915 890 throw new NoSuchElementException();
916 891 lastRet = cursor;
917 892 return (E)array[cursor++];
918 893 }
↓ open down ↓ |
7 lines elided |
↑ open up ↑ |
919 894
920 895 public void remove() {
921 896 if (lastRet < 0)
922 897 throw new IllegalStateException();
923 898 removeEQ(array[lastRet]);
924 899 lastRet = -1;
925 900 }
926 901 }
927 902
928 903 /**
929 - * Saves the state to a stream (that is, serializes it). For
930 - * compatibility with previous version of this class,
931 - * elements are first copied to a java.util.PriorityQueue,
932 - * which is then serialized.
904 + * Saves this queue to a stream (that is, serializes it).
905 + *
906 + * For compatibility with previous version of this class, elements
907 + * are first copied to a java.util.PriorityQueue, which is then
908 + * serialized.
933 909 */
934 910 private void writeObject(java.io.ObjectOutputStream s)
935 911 throws java.io.IOException {
936 912 lock.lock();
937 913 try {
938 - int n = size; // avoid zero capacity argument
939 - q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
914 + // avoid zero capacity argument
915 + q = new PriorityQueue<E>(Math.max(size, 1), comparator);
940 916 q.addAll(this);
941 917 s.defaultWriteObject();
942 918 } finally {
943 919 q = null;
944 920 lock.unlock();
945 921 }
946 922 }
947 923
948 924 /**
949 - * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
950 - * (that is, deserializes it).
951 - *
952 - * @param s the stream
925 + * Reconstitutes this queue from a stream (that is, deserializes it).
953 926 */
954 927 private void readObject(java.io.ObjectInputStream s)
955 928 throws java.io.IOException, ClassNotFoundException {
956 929 try {
957 930 s.defaultReadObject();
958 931 this.queue = new Object[q.size()];
959 932 comparator = q.comparator();
960 933 addAll(q);
961 934 } finally {
962 935 q = null;
963 936 }
964 937 }
965 938
966 939 // Unsafe mechanics
967 940 private static final sun.misc.Unsafe UNSAFE;
968 941 private static final long allocationSpinLockOffset;
969 942 static {
970 943 try {
971 944 UNSAFE = sun.misc.Unsafe.getUnsafe();
972 945 Class<?> k = PriorityBlockingQueue.class;
973 946 allocationSpinLockOffset = UNSAFE.objectFieldOffset
974 947 (k.getDeclaredField("allocationSpinLock"));
975 948 } catch (Exception e) {
976 949 throw new Error(e);
977 950 }
978 951 }
979 952 }
↓ open down ↓ |
17 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX