src/share/classes/java/util/concurrent/ArrayBlockingQueue.java

Print this page

        

*** 47,64 **** * operations obtain elements at the head of the queue. * * <p>This is a classic &quot;bounded buffer&quot;, in which a * fixed-sized array holds elements inserted by producers and * extracted by consumers. Once created, the capacity cannot be ! * increased. Attempts to <tt>put</tt> an element into a full queue ! * will result in the operation blocking; attempts to <tt>take</tt> an * element from an empty queue will similarly block. * ! * <p> This class supports an optional fairness policy for ordering * waiting producer and consumer threads. By default, this ordering * is not guaranteed. However, a queue constructed with fairness set ! * to <tt>true</tt> grants threads access in FIFO order. Fairness * generally decreases throughput but reduces variability and avoids * starvation. * * <p>This class and its iterator implement all of the * <em>optional</em> methods of the {@link Collection} and {@link --- 47,64 ---- * operations obtain elements at the head of the queue. * * <p>This is a classic &quot;bounded buffer&quot;, in which a * fixed-sized array holds elements inserted by producers and * extracted by consumers. Once created, the capacity cannot be ! * changed. Attempts to {@code put} an element into a full queue ! * will result in the operation blocking; attempts to {@code take} an * element from an empty queue will similarly block. * ! * <p>This class supports an optional fairness policy for ordering * waiting producer and consumer threads. By default, this ordering * is not guaranteed. However, a queue constructed with fairness set ! * to {@code true} grants threads access in FIFO order. Fairness * generally decreases throughput but reduces variability and avoids * starvation. * * <p>This class and its iterator implement all of the * <em>optional</em> methods of the {@link Collection} and {@link
*** 82,106 **** * necessary here. */ private static final long serialVersionUID = -817911632652898426L; /** The queued items */ ! private final E[] items; ! /** items index for next take, poll or remove */ ! private int takeIndex; ! /** items index for next put, offer, or add. */ ! private int putIndex; ! /** Number of items in the queue */ ! private int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ ! private final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; --- 82,109 ---- * necessary here. */ private static final long serialVersionUID = -817911632652898426L; /** The queued items */ ! final Object[] items; + /** items index for next take, poll, peek or remove */ + int takeIndex; + + /** items index for next put, offer, or add */ + int putIndex; + + /** Number of elements in the queue */ + int count; + /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ ! final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
*** 108,121 **** /** * Circularly increment i. */ final int inc(int i) { ! return (++i == items.length)? 0 : i; } /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void insert(E x) { items[putIndex] = x; --- 111,153 ---- /** * Circularly increment i. */ final int inc(int i) { ! return (++i == items.length) ? 0 : i; } /** + * Circularly decrement i. + */ + final int dec(int i) { + return ((i == 0) ? items.length : i) - 1; + } + + @SuppressWarnings("unchecked") + static <E> E cast(Object item) { + return (E) item; + } + + /** + * Returns item at index i. + */ + final E itemAt(int i) { + return this.<E>cast(items[i]); + } + + /** + * Throws NullPointerException if argument is null. + * + * @param v the element + */ + private static void checkNotNull(Object v) { + if (v == null) + throw new NullPointerException(); + } + + /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void insert(E x) { items[putIndex] = x;
*** 127,151 **** /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E extract() { ! final E[] items = this.items; ! E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; } /** ! * Utility for remove and iterator.remove: Delete item at position i. * Call only when holding lock. */ void removeAt(int i) { ! final E[] items = this.items; // if removing front item, just advance if (i == takeIndex) { items[takeIndex] = null; takeIndex = inc(takeIndex); } else { --- 159,184 ---- /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E extract() { ! final Object[] items = this.items; ! E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; } /** ! * Deletes item at position i. ! * Utility for remove and iterator.remove. * Call only when holding lock. */ void removeAt(int i) { ! final Object[] items = this.items; // if removing front item, just advance if (i == takeIndex) { items[takeIndex] = null; takeIndex = inc(takeIndex); } else {
*** 165,237 **** --count; notFull.signal(); } /** ! * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed) * capacity and default access policy. * * @param capacity the capacity of this queue ! * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** ! * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue ! * @param fair if <tt>true</tt> then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; ! * if <tt>false</tt> the access order is unspecified. ! * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); ! this.items = (E[]) new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** ! * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed) * capacity, the specified access policy and initially containing the * elements of the given collection, * added in traversal order of the collection's iterator. * * @param capacity the capacity of this queue ! * @param fair if <tt>true</tt> then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; ! * if <tt>false</tt> the access order is unspecified. * @param c the collection of elements to initially contain ! * @throws IllegalArgumentException if <tt>capacity</tt> is less than ! * <tt>c.size()</tt>, or less than 1. * @throws NullPointerException if the specified collection or any * of its elements are null */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); - if (capacity < c.size()) - throw new IllegalArgumentException(); ! for (E e : c) ! add(e); } /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, ! * returning <tt>true</tt> upon success and throwing an ! * <tt>IllegalStateException</tt> if this queue is full. * * @param e the element to add ! * @return <tt>true</tt> (as specified by {@link Collection#add}) * @throws IllegalStateException if this queue is full * @throws NullPointerException if the specified element is null */ public boolean add(E e) { return super.add(e); --- 198,283 ---- --count; notFull.signal(); } /** ! * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and default access policy. * * @param capacity the capacity of this queue ! * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** ! * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue ! * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; ! * if {@code false} the access order is unspecified. ! * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); ! this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** ! * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity, the specified access policy and initially containing the * elements of the given collection, * added in traversal order of the collection's iterator. * * @param capacity the capacity of this queue ! * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; ! * if {@code false} the access order is unspecified. * @param c the collection of elements to initially contain ! * @throws IllegalArgumentException if {@code capacity} is less than ! * {@code c.size()}, or less than 1. * @throws NullPointerException if the specified collection or any * of its elements are null */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); ! final ReentrantLock lock = this.lock; ! lock.lock(); // Lock only for visibility, not mutual exclusion ! try { ! int i = 0; ! try { ! for (E e : c) { ! checkNotNull(e); ! items[i++] = e; } + } catch (ArrayIndexOutOfBoundsException ex) { + throw new IllegalArgumentException(); + } + count = i; + putIndex = (i == capacity) ? 0 : i; + } finally { + lock.unlock(); + } + } /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, ! * returning {@code true} upon success and throwing an ! * {@code IllegalStateException} if this queue is full. * * @param e the element to add ! * @return {@code true} (as specified by {@link Collection#add}) * @throws IllegalStateException if this queue is full * @throws NullPointerException if the specified element is null */ public boolean add(E e) { return super.add(e);
*** 238,255 **** } /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, ! * returning <tt>true</tt> upon success and <tt>false</tt> if this queue * is full. This method is generally preferable to method {@link #add}, * which can fail to insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { ! if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; --- 284,301 ---- } /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, ! * returning {@code true} upon success and {@code false} if this queue * is full. This method is generally preferable to method {@link #add}, * which can fail to insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { ! checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false;
*** 268,289 **** * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { ! if (e == null) throw new NullPointerException(); ! final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - try { while (count == items.length) notFull.await(); - } catch (InterruptedException ie) { - notFull.signal(); // propagate to non-interrupted thread - throw ie; - } insert(e); } finally { lock.unlock(); } } --- 314,329 ---- * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { ! checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } }
*** 297,356 **** * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { ! if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { ! for (;;) { ! if (count != items.length) { ! insert(e); ! return true; ! } if (nanos <= 0) return false; - try { nanos = notFull.awaitNanos(nanos); - } catch (InterruptedException ie) { - notFull.signal(); // propagate to non-interrupted thread - throw ie; } ! } } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { ! if (count == 0) ! return null; ! E x = extract(); ! return x; } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - try { while (count == 0) notEmpty.await(); ! } catch (InterruptedException ie) { ! notEmpty.signal(); // propagate to non-interrupted thread ! throw ie; ! } ! E x = extract(); ! return x; } finally { lock.unlock(); } } --- 337,380 ---- * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { ! checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { ! while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } ! insert(e); ! return true; } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { ! return (count == 0) ? null : extract(); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); ! return extract(); } finally { lock.unlock(); } }
*** 357,391 **** public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { ! for (;;) { ! if (count != 0) { ! E x = extract(); ! return x; ! } if (nanos <= 0) return null; - try { nanos = notEmpty.awaitNanos(nanos); - } catch (InterruptedException ie) { - notEmpty.signal(); // propagate to non-interrupted thread - throw ie; } ! ! } } finally { lock.unlock(); } } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { ! return (count == 0) ? null : items[takeIndex]; } finally { lock.unlock(); } } --- 381,406 ---- public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { ! while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } ! return extract(); } finally { lock.unlock(); } } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { ! return (count == 0) ? null : itemAt(takeIndex); } finally { lock.unlock(); } }
*** 410,423 **** // without the reference to unlimited queues. /** * Returns the number of additional elements that this queue can ideally * (in the absence of memory or resource constraints) accept without * blocking. This is always equal to the initial capacity of this queue ! * less the current <tt>size</tt> of this queue. * * <p>Note that you <em>cannot</em> always tell if an attempt to insert ! * an element will succeed by inspecting <tt>remainingCapacity</tt> * because it may be the case that another thread is about to * insert or remove an element. */ public int remainingCapacity() { final ReentrantLock lock = this.lock; --- 425,438 ---- // without the reference to unlimited queues. /** * Returns the number of additional elements that this queue can ideally * (in the absence of memory or resource constraints) accept without * blocking. This is always equal to the initial capacity of this queue ! * less the current {@code size} of this queue. * * <p>Note that you <em>cannot</em> always tell if an attempt to insert ! * an element will succeed by inspecting {@code remainingCapacity} * because it may be the case that another thread is about to * insert or remove an element. */ public int remainingCapacity() { final ReentrantLock lock = this.lock;
*** 429,491 **** } } /** * Removes a single instance of the specified element from this queue, ! * if it is present. More formally, removes an element <tt>e</tt> such ! * that <tt>o.equals(e)</tt>, if this queue contains one or more such * elements. ! * Returns <tt>true</tt> if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * * @param o element to be removed from this queue, if present ! * @return <tt>true</tt> if this queue changed as a result of the call */ public boolean remove(Object o) { if (o == null) return false; ! final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { ! int i = takeIndex; ! int k = 0; ! for (;;) { ! if (k++ >= count) ! return false; if (o.equals(items[i])) { removeAt(i); return true; } - i = inc(i); } ! } finally { lock.unlock(); } } /** ! * Returns <tt>true</tt> if this queue contains the specified element. ! * More formally, returns <tt>true</tt> if and only if this queue contains ! * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>. * * @param o object to be checked for containment in this queue ! * @return <tt>true</tt> if this queue contains the specified element */ public boolean contains(Object o) { if (o == null) return false; ! final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { ! int i = takeIndex; ! int k = 0; ! while (k++ < count) { if (o.equals(items[i])) return true; - i = inc(i); - } return false; } finally { lock.unlock(); } } --- 444,503 ---- } } /** * Removes a single instance of the specified element from this queue, ! * if it is present. More formally, removes an element {@code e} such ! * that {@code o.equals(e)}, if this queue contains one or more such * elements. ! * Returns {@code true} if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * + * <p>Removal of interior elements in circular array based queues + * is an intrinsically slow and disruptive operation, so should + * be undertaken only in exceptional circumstances, ideally + * only when the queue is known not to be accessible by other + * threads. + * * @param o element to be removed from this queue, if present ! * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o) { if (o == null) return false; ! final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { ! for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { if (o.equals(items[i])) { removeAt(i); return true; } } ! return false; } finally { lock.unlock(); } } /** ! * Returns {@code true} if this queue contains the specified element. ! * More formally, returns {@code true} if and only if this queue contains ! * at least one element {@code e} such that {@code o.equals(e)}. * * @param o object to be checked for containment in this queue ! * @return {@code true} if this queue contains the specified element */ public boolean contains(Object o) { if (o == null) return false; ! final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { ! for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) if (o.equals(items[i])) return true; return false; } finally { lock.unlock(); } }
*** 502,522 **** * APIs. * * @return an array containing all of the elements in this queue */ public Object[] toArray() { ! final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { Object[] a = new Object[count]; ! int k = 0; ! int i = takeIndex; ! while (k < count) { ! a[k++] = items[i]; ! i = inc(i); ! } return a; } finally { lock.unlock(); } } --- 514,531 ---- * APIs. * * @return an array containing all of the elements in this queue */ public Object[] toArray() { ! final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { + final int count = this.count; Object[] a = new Object[count]; ! for (int i = takeIndex, k = 0; k < count; i = inc(i), k++) ! a[k] = items[i]; return a; } finally { lock.unlock(); } }
*** 529,554 **** * runtime type of the specified array and the size of this queue. * * <p>If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to ! * <tt>null</tt>. * * <p>Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * ! * <p>Suppose <tt>x</tt> is a queue known to contain only strings. * The following code can be used to dump the queue into a newly ! * allocated array of <tt>String</tt>: * * <pre> * String[] y = x.toArray(new String[0]);</pre> * ! * Note that <tt>toArray(new Object[0])</tt> is identical in function to ! * <tt>toArray()</tt>. * * @param a the array into which the elements of the queue are to * be stored, if it is big enough; otherwise, a new array of the * same runtime type is allocated for this purpose * @return an array containing all of the elements in this queue --- 538,563 ---- * runtime type of the specified array and the size of this queue. * * <p>If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to ! * {@code null}. * * <p>Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * ! * <p>Suppose {@code x} is a queue known to contain only strings. * The following code can be used to dump the queue into a newly ! * allocated array of {@code String}: * * <pre> * String[] y = x.toArray(new String[0]);</pre> * ! * Note that {@code toArray(new Object[0])} is identical in function to ! * {@code toArray()}. * * @param a the array into which the elements of the queue are to * be stored, if it is big enough; otherwise, a new array of the * same runtime type is allocated for this purpose * @return an array containing all of the elements in this queue
*** 555,582 **** * @throws ArrayStoreException if the runtime type of the specified array * is not a supertype of the runtime type of every element in * this queue * @throws NullPointerException if the specified array is null */ public <T> T[] toArray(T[] a) { ! final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { ! if (a.length < count) a = (T[])java.lang.reflect.Array.newInstance( ! a.getClass().getComponentType(), ! count ! ); ! ! int k = 0; ! int i = takeIndex; ! while (k < count) { ! a[k++] = (T)items[i]; ! i = inc(i); ! } ! if (a.length > count) a[count] = null; return a; } finally { lock.unlock(); } --- 564,587 ---- * @throws ArrayStoreException if the runtime type of the specified array * is not a supertype of the runtime type of every element in * this queue * @throws NullPointerException if the specified array is null */ + @SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { ! final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { ! final int count = this.count; ! final int len = a.length; ! if (len < count) a = (T[])java.lang.reflect.Array.newInstance( ! a.getClass().getComponentType(), count); ! for (int i = takeIndex, k = 0; k < count; i = inc(i), k++) ! a[k] = (T) items[i]; ! if (len > count) a[count] = null; return a; } finally { lock.unlock(); }
*** 584,594 **** public String toString() { final ReentrantLock lock = this.lock; lock.lock(); try { ! return super.toString(); } finally { lock.unlock(); } } --- 589,611 ---- public String toString() { final ReentrantLock lock = this.lock; lock.lock(); try { ! int k = count; ! if (k == 0) ! return "[]"; ! ! StringBuilder sb = new StringBuilder(); ! sb.append('['); ! for (int i = takeIndex; ; i = inc(i)) { ! Object e = items[i]; ! sb.append(e == this ? "(this Collection)" : e); ! if (--k == 0) ! return sb.append(']').toString(); ! sb.append(',').append(' '); ! } } finally { lock.unlock(); } }
*** 595,614 **** /** * Atomically removes all of the elements from this queue. * The queue will be empty after this call returns. */ public void clear() { ! final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { ! int i = takeIndex; ! int k = count; ! while (k-- > 0) { items[i] = null; - i = inc(i); - } count = 0; putIndex = 0; takeIndex = 0; notFull.signalAll(); } finally { --- 612,627 ---- /** * Atomically removes all of the elements from this queue. * The queue will be empty after this call returns. */ public void clear() { ! final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { ! for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) items[i] = null; count = 0; putIndex = 0; takeIndex = 0; notFull.signalAll(); } finally {
*** 621,643 **** * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c) { ! if (c == null) ! throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); ! final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int n = 0; int max = count; while (n < max) { ! c.add(items[i]); items[i] = null; i = inc(i); ++n; } if (n > 0) { --- 634,655 ---- * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c) { ! checkNotNull(c); if (c == this) throw new IllegalArgumentException(); ! final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int n = 0; int max = count; while (n < max) { ! c.add(this.<E>cast(items[i])); items[i] = null; i = inc(i); ++n; } if (n > 0) {
*** 657,682 **** * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c, int maxElements) { ! if (c == null) ! throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; ! final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int n = 0; ! int sz = count; ! int max = (maxElements < count)? maxElements : count; while (n < max) { ! c.add(items[i]); items[i] = null; i = inc(i); ++n; } if (n > 0) { --- 669,692 ---- * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c, int maxElements) { ! checkNotNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; ! final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int n = 0; ! int max = (maxElements < count) ? maxElements : count; while (n < max) { ! c.add(this.<E>cast(items[i])); items[i] = null; i = inc(i); ++n; } if (n > 0) {
*** 688,786 **** } finally { lock.unlock(); } } - /** * Returns an iterator over the elements in this queue in proper sequence. ! * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that ! * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. * * @return an iterator over the elements in this queue in proper sequence */ public Iterator<E> iterator() { - final ReentrantLock lock = this.lock; - lock.lock(); - try { return new Itr(); - } finally { - lock.unlock(); } - } /** ! * Iterator for ArrayBlockingQueue */ private class Itr implements Iterator<E> { ! /** ! * Index of element to be returned by next, ! * or a negative number if no such. ! */ ! private int nextIndex; - /** - * nextItem holds on to item fields because once we claim - * that an element exists in hasNext(), we must return it in - * the following next() call even if it was in the process of - * being removed when hasNext() was called. - */ - private E nextItem; - - /** - * Index of element returned by most recent call to next. - * Reset to -1 if this element is deleted by a call to remove. - */ - private int lastRet; - Itr() { lastRet = -1; ! if (count == 0) ! nextIndex = -1; ! else { ! nextIndex = takeIndex; ! nextItem = items[takeIndex]; } } public boolean hasNext() { ! /* ! * No sync. We can return true by mistake here ! * only if this iterator passed across threads, ! * which we don't support anyway. ! */ ! return nextIndex >= 0; } - /** - * Checks whether nextIndex is valid; if so setting nextItem. - * Stops iterator when either hits putIndex or sees null item. - */ - private void checkNext() { - if (nextIndex == putIndex) { - nextIndex = -1; - nextItem = null; - } else { - nextItem = items[nextIndex]; - if (nextItem == null) - nextIndex = -1; - } - } - public E next() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { ! if (nextIndex < 0) throw new NoSuchElementException(); lastRet = nextIndex; ! E x = nextItem; ! nextIndex = inc(nextIndex); ! checkNext(); return x; } finally { lock.unlock(); } } --- 698,780 ---- } finally { lock.unlock(); } } /** * Returns an iterator over the elements in this queue in proper sequence. ! * The elements will be returned in order from first (head) to last (tail). ! * ! * <p>The returned {@code Iterator} is a "weakly consistent" iterator that ! * will never throw {@link java.util.ConcurrentModificationException ! * ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. * * @return an iterator over the elements in this queue in proper sequence */ public Iterator<E> iterator() { return new Itr(); } /** ! * Iterator for ArrayBlockingQueue. To maintain weak consistency ! * with respect to puts and takes, we (1) read ahead one slot, so ! * as to not report hasNext true but then not have an element to ! * return -- however we later recheck this slot to use the most ! * current value; (2) ensure that each array slot is traversed at ! * most once (by tracking "remaining" elements); (3) skip over ! * null slots, which can occur if takes race ahead of iterators. ! * However, for circular array-based queues, we cannot rely on any ! * well established definition of what it means to be weakly ! * consistent with respect to interior removes since these may ! * require slot overwrites in the process of sliding elements to ! * cover gaps. So we settle for resiliency, operating on ! * established apparent nexts, which may miss some elements that ! * have moved between calls to next. */ private class Itr implements Iterator<E> { ! private int remaining; // Number of elements yet to be returned ! private int nextIndex; // Index of element to be returned by next ! private E nextItem; // Element to be returned by next call to next ! private E lastItem; // Element returned by last call to next ! private int lastRet; // Index of last element returned, or -1 if none Itr() { + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { lastRet = -1; ! if ((remaining = count) > 0) ! nextItem = itemAt(nextIndex = takeIndex); ! } finally { ! lock.unlock(); } } public boolean hasNext() { ! return remaining > 0; } public E next() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { ! if (remaining <= 0) throw new NoSuchElementException(); lastRet = nextIndex; ! E x = itemAt(nextIndex); // check for fresher value ! if (x == null) { ! x = nextItem; // we are forced to report old value ! lastItem = null; // but ensure remove fails ! } ! else ! lastItem = x; ! while (--remaining > 0 && // skip over nulls ! (nextItem = itemAt(nextIndex = inc(nextIndex))) == null) ! ; return x; } finally { lock.unlock(); } }
*** 791,807 **** try { int i = lastRet; if (i == -1) throw new IllegalStateException(); lastRet = -1; ! ! int ti = takeIndex; removeAt(i); ! // back up cursor (reset to front if was first element) ! nextIndex = (i == ti) ? takeIndex : i; ! checkNext(); } finally { lock.unlock(); } } } } --- 785,805 ---- try { int i = lastRet; if (i == -1) throw new IllegalStateException(); lastRet = -1; ! E x = lastItem; ! lastItem = null; ! // only remove if item still at index ! if (x != null && x == items[i]) { ! boolean removingHead = (i == takeIndex); removeAt(i); ! if (!removingHead) ! nextIndex = dec(nextIndex); ! } } finally { lock.unlock(); } } } + }