src/share/classes/java/util/concurrent/PriorityBlockingQueue.java
Index Unified diffs Context diffs Sdiffs Wdiffs Patch New Old Previous File Next File
*** old/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java	Mon Jun 25 00:05:16 2012
--- new/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java	Mon Jun 25 00:05:14 2012

*** 33,43 **** --- 33,44 ---- * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; ! import java.util.concurrent.locks.*; ! import java.util.concurrent.locks.Condition; + import java.util.concurrent.locks.ReentrantLock; import java.util.*; /** * An unbounded {@linkplain BlockingQueue blocking queue} that uses * the same ordering rules as class {@link PriorityQueue} and supplies
*** 109,119 **** --- 110,120 ---- * build-up. The need to back away from lock during allocation * makes it impossible to simply wrap delegated * java.util.PriorityQueue operations within a lock, as was done * in a previous version of this class. To maintain * interoperability, a plain PriorityQueue is still used during ! * serialization, which maintains compatibility at the espense of ! * serialization, which maintains compatibility at the expense of * transiently doubling overhead. */ /** * Default array capacity.
*** 306,334 **** --- 307,334 ---- } /** * Mechanics for poll(). Call only while holding lock. */ ! private E extract() { E result; ! private E dequeue() { int n = size - 1; if (n < 0) ! result = null; ! return null; else { Object[] array = queue; ! E result = (E) array[0]; E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; } return result; } + } /** * Inserts item x at position k, maintaining heap invariant by * promoting x up the tree until it is greater than or equal to * its parent, or is the root.
*** 380,389 **** --- 380,390 ---- * @param array the heap array * @param n heap size */ private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { + if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child];
*** 396,409 **** --- 397,412 ---- array[k] = c; k = child; } array[k] = key; } + } private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { + if (n > 0) { int half = n >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1;
*** 414,423 **** --- 417,427 ---- array[k] = c; k = child; } array[k] = x; } + } /** * Establishes the heap invariant (described above) in the entire tree, * assuming nothing about the order of the elements prior to the call. */
*** 518,542 **** --- 522,544 ---- } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); E result; try { ! result = extract(); ! return dequeue(); } finally { lock.unlock(); } return result; } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { ! while ( (result = extract()) == null) ! while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result;
*** 546,573 **** --- 548,573 ---- long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { ! while ( (result = extract()) == null && nanos > 0) ! while ( (result = dequeue()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { lock.unlock(); } return result; } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); E result; try { ! result = size > 0 ? (E) queue[0] : null; ! return (size == 0) ? null : (E) queue[0]; } finally { lock.unlock(); } return result; } /** * Returns the comparator used to order the elements in this queue, * or {@code null} if this queue uses the {@linkplain Comparable
*** 647,682 **** --- 647,678 ---- * * @param o element to be removed from this queue, if present * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o) { boolean removed = false; final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(o); ! if (i != -1) { ! if (i == -1) + return false; removeAt(i); ! removed = true; } ! return true; } finally { lock.unlock(); } return removed; } /** * Identity-based version for use in Itr.remove */ - private void removeEQ(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] array = queue; int n = size; for (int i = 0; i < n; i++) { + for (int i = 0, n = size; i < n; i++) { if (o == array[i]) { removeAt(i); break; } }
*** 692,710 **** --- 688,704 ---- * * @param o object to be checked for containment in this queue * @return {@code true} if this queue contains the specified element */ public boolean contains(Object o) { int index; final ReentrantLock lock = this.lock; lock.lock(); try { ! index = indexOf(o); ! return indexOf(o) != -1; } finally { lock.unlock(); } return index != -1; } /** * Returns an array containing all of the elements in this queue. * The returned array elements are in no particular order.
*** 726,747 **** --- 720,740 ---- } finally { lock.unlock(); } } public String toString() { final ReentrantLock lock = this.lock; lock.lock(); try { int n = size; if (n == 0) return "[]"; StringBuilder sb = new StringBuilder(); sb.append('['); for (int i = 0; i < n; ++i) { ! E e = (E)queue[i]; ! Object e = queue[i]; sb.append(e == this ? "(this Collection)" : e); if (i != n - 1) sb.append(',').append(' '); } return sb.append(']').toString();
*** 755,781 **** --- 748,758 ---- * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; E e; while ( (e = extract()) != null) { c.add(e); ++n; } return n; } finally { lock.unlock(); } + return drainTo(c, Integer.MAX_VALUE); } /** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc}
*** 790,804 **** --- 767,780 ---- if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { ! int n = 0; E e; while (n < maxElements && (e = extract()) != null) { ! c.add(e); ++n; ! int n = Math.min(size, maxElements); + for (int i = 0; i < n; i++) { + c.add((E) queue[0]); // In this order, in case add() throws. ! dequeue(); } return n; } finally { lock.unlock(); }
*** 842,853 **** --- 818,828 ---- * * <p>Suppose {@code x} is a queue known to contain only strings. * The following code can be used to dump the queue into a newly * allocated array of {@code String}: * ! * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> * String[] y = x.toArray(new String[0]);</pre> * * Note that {@code toArray(new Object[0])} is identical in function to * {@code toArray()}. * * @param a the array into which the elements of the queue are to
*** 896,906 **** --- 871,881 ---- /** * Snapshot iterator that works off copy of underlying q array. */ final class Itr implements Iterator<E> { final Object[] array; // Array of all elements - int cursor; // index of next element to return; int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array;
*** 924,957 **** --- 899,930 ---- lastRet = -1; } } /** ! * Saves the state to a stream (that is, serializes it). For * compatibility with previous version of this class, * elements are first copied to a java.util.PriorityQueue, * which is then serialized. ! * Saves this queue to a stream (that is, serializes it). + * + * For compatibility with previous version of this class, elements + * are first copied to a java.util.PriorityQueue, which is then + * serialized. */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { lock.lock(); try { - int n = size; // avoid zero capacity argument ! q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator); ! q = new PriorityQueue<E>(Math.max(size, 1), comparator); q.addAll(this); s.defaultWriteObject(); } finally { q = null; lock.unlock(); } } /** ! * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream * (that is, deserializes it). * * @param s the stream ! * Reconstitutes this queue from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { try { s.defaultReadObject();

src/share/classes/java/util/concurrent/PriorityBlockingQueue.java
Index Unified diffs Context diffs Sdiffs Wdiffs Patch New Old Previous File Next File