1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.lang.ref.WeakReference;
  39 import java.util.AbstractQueue;
  40 import java.util.Arrays;
  41 import java.util.Collection;
  42 import java.util.Iterator;
  43 import java.util.NoSuchElementException;
  44 import java.util.Objects;
  45 import java.util.Spliterator;
  46 import java.util.Spliterators;
  47 import java.util.concurrent.locks.Condition;
  48 import java.util.concurrent.locks.ReentrantLock;
  49 import java.util.function.Consumer;
  50 import java.util.function.Predicate;
  51 
  52 /**
  53  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
  54  * array.  This queue orders elements FIFO (first-in-first-out).  The
  55  * <em>head</em> of the queue is that element that has been on the
  56  * queue the longest time.  The <em>tail</em> of the queue is that
  57  * element that has been on the queue the shortest time. New elements
  58  * are inserted at the tail of the queue, and the queue retrieval
  59  * operations obtain elements at the head of the queue.
  60  *
  61  * <p>This is a classic &quot;bounded buffer&quot;, in which a
  62  * fixed-sized array holds elements inserted by producers and
  63  * extracted by consumers.  Once created, the capacity cannot be
  64  * changed.  Attempts to {@code put} an element into a full queue
  65  * will result in the operation blocking; attempts to {@code take} an
  66  * element from an empty queue will similarly block.
  67  *
  68  * <p>This class supports an optional fairness policy for ordering
  69  * waiting producer and consumer threads.  By default, this ordering
  70  * is not guaranteed. However, a queue constructed with fairness set
  71  * to {@code true} grants threads access in FIFO order. Fairness
  72  * generally decreases throughput but reduces variability and avoids
  73  * starvation.
  74  *
  75  * <p>This class and its iterator implement all of the <em>optional</em>
  76  * methods of the {@link Collection} and {@link Iterator} interfaces.
  77  *
  78  * <p>This class is a member of the
  79  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  80  * Java Collections Framework</a>.
  81  *
  82  * @since 1.5
  83  * @author Doug Lea
  84  * @param <E> the type of elements held in this queue
  85  */
  86 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  87         implements BlockingQueue<E>, java.io.Serializable {
  88 
  89     /*
  90      * Much of the implementation mechanics, especially the unusual
  91      * nested loops, are shared and co-maintained with ArrayDeque.
  92      */
  93 
  94     /**
  95      * Serialization ID. This class relies on default serialization
  96      * even for the items array, which is default-serialized, even if
  97      * it is empty. Otherwise it could not be declared final, which is
  98      * necessary here.
  99      */
 100     private static final long serialVersionUID = -817911632652898426L;
 101 
 102     /** The queued items */
 103     final Object[] items;
 104 
 105     /** items index for next take, poll, peek or remove */
 106     int takeIndex;
 107 
 108     /** items index for next put, offer, or add */
 109     int putIndex;
 110 
 111     /** Number of elements in the queue */
 112     int count;
 113 
 114     /*
 115      * Concurrency control uses the classic two-condition algorithm
 116      * found in any textbook.
 117      */
 118 
 119     /** Main lock guarding all access */
 120     final ReentrantLock lock;
 121 
 122     /** Condition for waiting takes */
 123     private final Condition notEmpty;
 124 
 125     /** Condition for waiting puts */
 126     private final Condition notFull;
 127 
 128     /**
 129      * Shared state for currently active iterators, or null if there
 130      * are known not to be any.  Allows queue operations to update
 131      * iterator state.
 132      */
 133     transient Itrs itrs;
 134 
 135     // Internal helper methods
 136 
 137     /**
 138      * Increments i, mod modulus.
 139      * Precondition and postcondition: 0 <= i < modulus.
 140      */
 141     static final int inc(int i, int modulus) {
 142         if (++i >= modulus) i = 0;
 143         return i;
 144     }
 145 
 146     /**
 147      * Decrements i, mod modulus.
 148      * Precondition and postcondition: 0 <= i < modulus.
 149      */
 150     static final int dec(int i, int modulus) {
 151         if (--i < 0) i = modulus - 1;
 152         return i;
 153     }
 154 
 155     /**
 156      * Returns item at index i.
 157      */
 158     @SuppressWarnings("unchecked")
 159     final E itemAt(int i) {
 160         return (E) items[i];
 161     }
 162 
 163     /**
 164      * Returns element at array index i.
 165      * This is a slight abuse of generics, accepted by javac.
 166      */
 167     @SuppressWarnings("unchecked")
 168     static <E> E itemAt(Object[] items, int i) {
 169         return (E) items[i];
 170     }
 171 
 172     /**
 173      * Inserts element at current put position, advances, and signals.
 174      * Call only when holding lock.
 175      */
 176     private void enqueue(E e) {
 177         // assert lock.isHeldByCurrentThread();
 178         // assert lock.getHoldCount() == 1;
 179         // assert items[putIndex] == null;
 180         final Object[] items = this.items;
 181         items[putIndex] = e;
 182         if (++putIndex == items.length) putIndex = 0;
 183         count++;
 184         notEmpty.signal();
 185     }
 186 
 187     /**
 188      * Extracts element at current take position, advances, and signals.
 189      * Call only when holding lock.
 190      */
 191     private E dequeue() {
 192         // assert lock.isHeldByCurrentThread();
 193         // assert lock.getHoldCount() == 1;
 194         // assert items[takeIndex] != null;
 195         final Object[] items = this.items;
 196         @SuppressWarnings("unchecked")
 197         E e = (E) items[takeIndex];
 198         items[takeIndex] = null;
 199         if (++takeIndex == items.length) takeIndex = 0;
 200         count--;
 201         if (itrs != null)
 202             itrs.elementDequeued();
 203         notFull.signal();
 204         return e;
 205     }
 206 
 207     /**
 208      * Deletes item at array index removeIndex.
 209      * Utility for remove(Object) and iterator.remove.
 210      * Call only when holding lock.
 211      */
 212     void removeAt(final int removeIndex) {
 213         // assert lock.isHeldByCurrentThread();
 214         // assert lock.getHoldCount() == 1;
 215         // assert items[removeIndex] != null;
 216         // assert removeIndex >= 0 && removeIndex < items.length;
 217         final Object[] items = this.items;
 218         if (removeIndex == takeIndex) {
 219             // removing front item; just advance
 220             items[takeIndex] = null;
 221             if (++takeIndex == items.length) takeIndex = 0;
 222             count--;
 223             if (itrs != null)
 224                 itrs.elementDequeued();
 225         } else {
 226             // an "interior" remove
 227 
 228             // slide over all others up through putIndex.
 229             for (int i = removeIndex, putIndex = this.putIndex;;) {
 230                 int pred = i;
 231                 if (++i == items.length) i = 0;
 232                 if (i == putIndex) {
 233                     items[pred] = null;
 234                     this.putIndex = pred;
 235                     break;
 236                 }
 237                 items[pred] = items[i];
 238             }
 239             count--;
 240             if (itrs != null)
 241                 itrs.removedAt(removeIndex);
 242         }
 243         notFull.signal();
 244     }
 245 
 246     /**
 247      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 248      * capacity and default access policy.
 249      *
 250      * @param capacity the capacity of this queue
 251      * @throws IllegalArgumentException if {@code capacity < 1}
 252      */
 253     public ArrayBlockingQueue(int capacity) {
 254         this(capacity, false);
 255     }
 256 
 257     /**
 258      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 259      * capacity and the specified access policy.
 260      *
 261      * @param capacity the capacity of this queue
 262      * @param fair if {@code true} then queue accesses for threads blocked
 263      *        on insertion or removal, are processed in FIFO order;
 264      *        if {@code false} the access order is unspecified.
 265      * @throws IllegalArgumentException if {@code capacity < 1}
 266      */
 267     public ArrayBlockingQueue(int capacity, boolean fair) {
 268         if (capacity <= 0)
 269             throw new IllegalArgumentException();
 270         this.items = new Object[capacity];
 271         lock = new ReentrantLock(fair);
 272         notEmpty = lock.newCondition();
 273         notFull =  lock.newCondition();
 274     }
 275 
 276     /**
 277      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 278      * capacity, the specified access policy and initially containing the
 279      * elements of the given collection,
 280      * added in traversal order of the collection's iterator.
 281      *
 282      * @param capacity the capacity of this queue
 283      * @param fair if {@code true} then queue accesses for threads blocked
 284      *        on insertion or removal, are processed in FIFO order;
 285      *        if {@code false} the access order is unspecified.
 286      * @param c the collection of elements to initially contain
 287      * @throws IllegalArgumentException if {@code capacity} is less than
 288      *         {@code c.size()}, or less than 1.
 289      * @throws NullPointerException if the specified collection or any
 290      *         of its elements are null
 291      */
 292     public ArrayBlockingQueue(int capacity, boolean fair,
 293                               Collection<? extends E> c) {
 294         this(capacity, fair);
 295 
 296         final ReentrantLock lock = this.lock;
 297         lock.lock(); // Lock only for visibility, not mutual exclusion
 298         try {
 299             final Object[] items = this.items;
 300             int i = 0;
 301             try {
 302                 for (E e : c)
 303                     items[i++] = Objects.requireNonNull(e);
 304             } catch (ArrayIndexOutOfBoundsException ex) {
 305                 throw new IllegalArgumentException();
 306             }
 307             count = i;
 308             putIndex = (i == capacity) ? 0 : i;
 309         } finally {
 310             lock.unlock();
 311         }
 312     }
 313 
 314     /**
 315      * Inserts the specified element at the tail of this queue if it is
 316      * possible to do so immediately without exceeding the queue's capacity,
 317      * returning {@code true} upon success and throwing an
 318      * {@code IllegalStateException} if this queue is full.
 319      *
 320      * @param e the element to add
 321      * @return {@code true} (as specified by {@link Collection#add})
 322      * @throws IllegalStateException if this queue is full
 323      * @throws NullPointerException if the specified element is null
 324      */
 325     public boolean add(E e) {
 326         return super.add(e);
 327     }
 328 
 329     /**
 330      * Inserts the specified element at the tail of this queue if it is
 331      * possible to do so immediately without exceeding the queue's capacity,
 332      * returning {@code true} upon success and {@code false} if this queue
 333      * is full.  This method is generally preferable to method {@link #add},
 334      * which can fail to insert an element only by throwing an exception.
 335      *
 336      * @throws NullPointerException if the specified element is null
 337      */
 338     public boolean offer(E e) {
 339         Objects.requireNonNull(e);
 340         final ReentrantLock lock = this.lock;
 341         lock.lock();
 342         try {
 343             if (count == items.length)
 344                 return false;
 345             else {
 346                 enqueue(e);
 347                 return true;
 348             }
 349         } finally {
 350             lock.unlock();
 351         }
 352     }
 353 
 354     /**
 355      * Inserts the specified element at the tail of this queue, waiting
 356      * for space to become available if the queue is full.
 357      *
 358      * @throws InterruptedException {@inheritDoc}
 359      * @throws NullPointerException {@inheritDoc}
 360      */
 361     public void put(E e) throws InterruptedException {
 362         Objects.requireNonNull(e);
 363         final ReentrantLock lock = this.lock;
 364         lock.lockInterruptibly();
 365         try {
 366             while (count == items.length)
 367                 notFull.await();
 368             enqueue(e);
 369         } finally {
 370             lock.unlock();
 371         }
 372     }
 373 
 374     /**
 375      * Inserts the specified element at the tail of this queue, waiting
 376      * up to the specified wait time for space to become available if
 377      * the queue is full.
 378      *
 379      * @throws InterruptedException {@inheritDoc}
 380      * @throws NullPointerException {@inheritDoc}
 381      */
 382     public boolean offer(E e, long timeout, TimeUnit unit)
 383         throws InterruptedException {
 384 
 385         Objects.requireNonNull(e);
 386         long nanos = unit.toNanos(timeout);
 387         final ReentrantLock lock = this.lock;
 388         lock.lockInterruptibly();
 389         try {
 390             while (count == items.length) {
 391                 if (nanos <= 0L)
 392                     return false;
 393                 nanos = notFull.awaitNanos(nanos);
 394             }
 395             enqueue(e);
 396             return true;
 397         } finally {
 398             lock.unlock();
 399         }
 400     }
 401 
 402     public E poll() {
 403         final ReentrantLock lock = this.lock;
 404         lock.lock();
 405         try {
 406             return (count == 0) ? null : dequeue();
 407         } finally {
 408             lock.unlock();
 409         }
 410     }
 411 
 412     public E take() throws InterruptedException {
 413         final ReentrantLock lock = this.lock;
 414         lock.lockInterruptibly();
 415         try {
 416             while (count == 0)
 417                 notEmpty.await();
 418             return dequeue();
 419         } finally {
 420             lock.unlock();
 421         }
 422     }
 423 
 424     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 425         long nanos = unit.toNanos(timeout);
 426         final ReentrantLock lock = this.lock;
 427         lock.lockInterruptibly();
 428         try {
 429             while (count == 0) {
 430                 if (nanos <= 0L)
 431                     return null;
 432                 nanos = notEmpty.awaitNanos(nanos);
 433             }
 434             return dequeue();
 435         } finally {
 436             lock.unlock();
 437         }
 438     }
 439 
 440     public E peek() {
 441         final ReentrantLock lock = this.lock;
 442         lock.lock();
 443         try {
 444             return itemAt(takeIndex); // null when queue is empty
 445         } finally {
 446             lock.unlock();
 447         }
 448     }
 449 
 450     // this doc comment is overridden to remove the reference to collections
 451     // greater in size than Integer.MAX_VALUE
 452     /**
 453      * Returns the number of elements in this queue.
 454      *
 455      * @return the number of elements in this queue
 456      */
 457     public int size() {
 458         final ReentrantLock lock = this.lock;
 459         lock.lock();
 460         try {
 461             return count;
 462         } finally {
 463             lock.unlock();
 464         }
 465     }
 466 
 467     // this doc comment is a modified copy of the inherited doc comment,
 468     // without the reference to unlimited queues.
 469     /**
 470      * Returns the number of additional elements that this queue can ideally
 471      * (in the absence of memory or resource constraints) accept without
 472      * blocking. This is always equal to the initial capacity of this queue
 473      * less the current {@code size} of this queue.
 474      *
 475      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
 476      * an element will succeed by inspecting {@code remainingCapacity}
 477      * because it may be the case that another thread is about to
 478      * insert or remove an element.
 479      */
 480     public int remainingCapacity() {
 481         final ReentrantLock lock = this.lock;
 482         lock.lock();
 483         try {
 484             return items.length - count;
 485         } finally {
 486             lock.unlock();
 487         }
 488     }
 489 
 490     /**
 491      * Removes a single instance of the specified element from this queue,
 492      * if it is present.  More formally, removes an element {@code e} such
 493      * that {@code o.equals(e)}, if this queue contains one or more such
 494      * elements.
 495      * Returns {@code true} if this queue contained the specified element
 496      * (or equivalently, if this queue changed as a result of the call).
 497      *
 498      * <p>Removal of interior elements in circular array based queues
 499      * is an intrinsically slow and disruptive operation, so should
 500      * be undertaken only in exceptional circumstances, ideally
 501      * only when the queue is known not to be accessible by other
 502      * threads.
 503      *
 504      * @param o element to be removed from this queue, if present
 505      * @return {@code true} if this queue changed as a result of the call
 506      */
 507     public boolean remove(Object o) {
 508         if (o == null) return false;
 509         final ReentrantLock lock = this.lock;
 510         lock.lock();
 511         try {
 512             if (count > 0) {
 513                 final Object[] items = this.items;
 514                 for (int i = takeIndex, end = putIndex,
 515                          to = (i < end) ? end : items.length;
 516                      ; i = 0, to = end) {
 517                     for (; i < to; i++)
 518                         if (o.equals(items[i])) {
 519                             removeAt(i);
 520                             return true;
 521                         }
 522                     if (to == end) break;
 523                 }
 524             }
 525             return false;
 526         } finally {
 527             lock.unlock();
 528         }
 529     }
 530 
 531     /**
 532      * Returns {@code true} if this queue contains the specified element.
 533      * More formally, returns {@code true} if and only if this queue contains
 534      * at least one element {@code e} such that {@code o.equals(e)}.
 535      *
 536      * @param o object to be checked for containment in this queue
 537      * @return {@code true} if this queue contains the specified element
 538      */
 539     public boolean contains(Object o) {
 540         if (o == null) return false;
 541         final ReentrantLock lock = this.lock;
 542         lock.lock();
 543         try {
 544             if (count > 0) {
 545                 final Object[] items = this.items;
 546                 for (int i = takeIndex, end = putIndex,
 547                          to = (i < end) ? end : items.length;
 548                      ; i = 0, to = end) {
 549                     for (; i < to; i++)
 550                         if (o.equals(items[i]))
 551                             return true;
 552                     if (to == end) break;
 553                 }
 554             }
 555             return false;
 556         } finally {
 557             lock.unlock();
 558         }
 559     }
 560 
 561     /**
 562      * Returns an array containing all of the elements in this queue, in
 563      * proper sequence.
 564      *
 565      * <p>The returned array will be "safe" in that no references to it are
 566      * maintained by this queue.  (In other words, this method must allocate
 567      * a new array).  The caller is thus free to modify the returned array.
 568      *
 569      * <p>This method acts as bridge between array-based and collection-based
 570      * APIs.
 571      *
 572      * @return an array containing all of the elements in this queue
 573      */
 574     public Object[] toArray() {
 575         final ReentrantLock lock = this.lock;
 576         lock.lock();
 577         try {
 578             final Object[] items = this.items;
 579             final int end = takeIndex + count;
 580             final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
 581             if (end != putIndex)
 582                 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
 583             return a;
 584         } finally {
 585             lock.unlock();
 586         }
 587     }
 588 
 589     /**
 590      * Returns an array containing all of the elements in this queue, in
 591      * proper sequence; the runtime type of the returned array is that of
 592      * the specified array.  If the queue fits in the specified array, it
 593      * is returned therein.  Otherwise, a new array is allocated with the
 594      * runtime type of the specified array and the size of this queue.
 595      *
 596      * <p>If this queue fits in the specified array with room to spare
 597      * (i.e., the array has more elements than this queue), the element in
 598      * the array immediately following the end of the queue is set to
 599      * {@code null}.
 600      *
 601      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 602      * array-based and collection-based APIs.  Further, this method allows
 603      * precise control over the runtime type of the output array, and may,
 604      * under certain circumstances, be used to save allocation costs.
 605      *
 606      * <p>Suppose {@code x} is a queue known to contain only strings.
 607      * The following code can be used to dump the queue into a newly
 608      * allocated array of {@code String}:
 609      *
 610      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
 611      *
 612      * Note that {@code toArray(new Object[0])} is identical in function to
 613      * {@code toArray()}.
 614      *
 615      * @param a the array into which the elements of the queue are to
 616      *          be stored, if it is big enough; otherwise, a new array of the
 617      *          same runtime type is allocated for this purpose
 618      * @return an array containing all of the elements in this queue
 619      * @throws ArrayStoreException if the runtime type of the specified array
 620      *         is not a supertype of the runtime type of every element in
 621      *         this queue
 622      * @throws NullPointerException if the specified array is null
 623      */
 624     @SuppressWarnings("unchecked")
 625     public <T> T[] toArray(T[] a) {
 626         final ReentrantLock lock = this.lock;
 627         lock.lock();
 628         try {
 629             final Object[] items = this.items;
 630             final int count = this.count;
 631             final int firstLeg = Math.min(items.length - takeIndex, count);
 632             if (a.length < count) {
 633                 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
 634                                              a.getClass());
 635             } else {
 636                 System.arraycopy(items, takeIndex, a, 0, firstLeg);
 637                 if (a.length > count)
 638                     a[count] = null;
 639             }
 640             if (firstLeg < count)
 641                 System.arraycopy(items, 0, a, firstLeg, putIndex);
 642             return a;
 643         } finally {
 644             lock.unlock();
 645         }
 646     }
 647 
 648     public String toString() {
 649         return Helpers.collectionToString(this);
 650     }
 651 
 652     /**
 653      * Atomically removes all of the elements from this queue.
 654      * The queue will be empty after this call returns.
 655      */
 656     public void clear() {
 657         final ReentrantLock lock = this.lock;
 658         lock.lock();
 659         try {
 660             int k;
 661             if ((k = count) > 0) {
 662                 circularClear(items, takeIndex, putIndex);
 663                 takeIndex = putIndex;
 664                 count = 0;
 665                 if (itrs != null)
 666                     itrs.queueIsEmpty();
 667                 for (; k > 0 && lock.hasWaiters(notFull); k--)
 668                     notFull.signal();
 669             }
 670         } finally {
 671             lock.unlock();
 672         }
 673     }
 674 
 675     /**
 676      * Nulls out slots starting at array index i, upto index end.
 677      * Condition i == end means "full" - the entire array is cleared.
 678      */
 679     private static void circularClear(Object[] items, int i, int end) {
 680         // assert 0 <= i && i < items.length;
 681         // assert 0 <= end && end < items.length;
 682         for (int to = (i < end) ? end : items.length;
 683              ; i = 0, to = end) {
 684             for (; i < to; i++) items[i] = null;
 685             if (to == end) break;
 686         }
 687     }
 688 
 689     /**
 690      * @throws UnsupportedOperationException {@inheritDoc}
 691      * @throws ClassCastException            {@inheritDoc}
 692      * @throws NullPointerException          {@inheritDoc}
 693      * @throws IllegalArgumentException      {@inheritDoc}
 694      */
 695     public int drainTo(Collection<? super E> c) {
 696         return drainTo(c, Integer.MAX_VALUE);
 697     }
 698 
 699     /**
 700      * @throws UnsupportedOperationException {@inheritDoc}
 701      * @throws ClassCastException            {@inheritDoc}
 702      * @throws NullPointerException          {@inheritDoc}
 703      * @throws IllegalArgumentException      {@inheritDoc}
 704      */
 705     public int drainTo(Collection<? super E> c, int maxElements) {
 706         Objects.requireNonNull(c);
 707         if (c == this)
 708             throw new IllegalArgumentException();
 709         if (maxElements <= 0)
 710             return 0;
 711         final Object[] items = this.items;
 712         final ReentrantLock lock = this.lock;
 713         lock.lock();
 714         try {
 715             int n = Math.min(maxElements, count);
 716             int take = takeIndex;
 717             int i = 0;
 718             try {
 719                 while (i < n) {
 720                     @SuppressWarnings("unchecked")
 721                     E e = (E) items[take];
 722                     c.add(e);
 723                     items[take] = null;
 724                     if (++take == items.length) take = 0;
 725                     i++;
 726                 }
 727                 return n;
 728             } finally {
 729                 // Restore invariants even if c.add() threw
 730                 if (i > 0) {
 731                     count -= i;
 732                     takeIndex = take;
 733                     if (itrs != null) {
 734                         if (count == 0)
 735                             itrs.queueIsEmpty();
 736                         else if (i > take)
 737                             itrs.takeIndexWrapped();
 738                     }
 739                     for (; i > 0 && lock.hasWaiters(notFull); i--)
 740                         notFull.signal();
 741                 }
 742             }
 743         } finally {
 744             lock.unlock();
 745         }
 746     }
 747 
 748     /**
 749      * Returns an iterator over the elements in this queue in proper sequence.
 750      * The elements will be returned in order from first (head) to last (tail).
 751      *
 752      * <p>The returned iterator is
 753      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
 754      *
 755      * @return an iterator over the elements in this queue in proper sequence
 756      */
 757     public Iterator<E> iterator() {
 758         return new Itr();
 759     }
 760 
 761     /**
 762      * Shared data between iterators and their queue, allowing queue
 763      * modifications to update iterators when elements are removed.
 764      *
 765      * This adds a lot of complexity for the sake of correctly
 766      * handling some uncommon operations, but the combination of
 767      * circular-arrays and supporting interior removes (i.e., those
 768      * not at head) would cause iterators to sometimes lose their
 769      * places and/or (re)report elements they shouldn't.  To avoid
 770      * this, when a queue has one or more iterators, it keeps iterator
 771      * state consistent by:
 772      *
 773      * (1) keeping track of the number of "cycles", that is, the
 774      *     number of times takeIndex has wrapped around to 0.
 775      * (2) notifying all iterators via the callback removedAt whenever
 776      *     an interior element is removed (and thus other elements may
 777      *     be shifted).
 778      *
 779      * These suffice to eliminate iterator inconsistencies, but
 780      * unfortunately add the secondary responsibility of maintaining
 781      * the list of iterators.  We track all active iterators in a
 782      * simple linked list (accessed only when the queue's lock is
 783      * held) of weak references to Itr.  The list is cleaned up using
 784      * 3 different mechanisms:
 785      *
 786      * (1) Whenever a new iterator is created, do some O(1) checking for
 787      *     stale list elements.
 788      *
 789      * (2) Whenever takeIndex wraps around to 0, check for iterators
 790      *     that have been unused for more than one wrap-around cycle.
 791      *
 792      * (3) Whenever the queue becomes empty, all iterators are notified
 793      *     and this entire data structure is discarded.
 794      *
 795      * So in addition to the removedAt callback that is necessary for
 796      * correctness, iterators have the shutdown and takeIndexWrapped
 797      * callbacks that help remove stale iterators from the list.
 798      *
 799      * Whenever a list element is examined, it is expunged if either
 800      * the GC has determined that the iterator is discarded, or if the
 801      * iterator reports that it is "detached" (does not need any
 802      * further state updates).  Overhead is maximal when takeIndex
 803      * never advances, iterators are discarded before they are
 804      * exhausted, and all removals are interior removes, in which case
 805      * all stale iterators are discovered by the GC.  But even in this
 806      * case we don't increase the amortized complexity.
 807      *
 808      * Care must be taken to keep list sweeping methods from
 809      * reentrantly invoking another such method, causing subtle
 810      * corruption bugs.
 811      */
 812     class Itrs {
 813 
 814         /**
 815          * Node in a linked list of weak iterator references.
 816          */
 817         private class Node extends WeakReference<Itr> {
 818             Node next;
 819 
 820             Node(Itr iterator, Node next) {
 821                 super(iterator);
 822                 this.next = next;
 823             }
 824         }
 825 
 826         /** Incremented whenever takeIndex wraps around to 0 */
 827         int cycles;
 828 
 829         /** Linked list of weak iterator references */
 830         private Node head;
 831 
 832         /** Used to expunge stale iterators */
 833         private Node sweeper;
 834 
 835         private static final int SHORT_SWEEP_PROBES = 4;
 836         private static final int LONG_SWEEP_PROBES = 16;
 837 
 838         Itrs(Itr initial) {
 839             register(initial);
 840         }
 841 
 842         /**
 843          * Sweeps itrs, looking for and expunging stale iterators.
 844          * If at least one was found, tries harder to find more.
 845          * Called only from iterating thread.
 846          *
 847          * @param tryHarder whether to start in try-harder mode, because
 848          * there is known to be at least one iterator to collect
 849          */
 850         void doSomeSweeping(boolean tryHarder) {
 851             // assert lock.isHeldByCurrentThread();
 852             // assert head != null;
 853             int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
 854             Node o, p;
 855             final Node sweeper = this.sweeper;
 856             boolean passedGo;   // to limit search to one full sweep
 857 
 858             if (sweeper == null) {
 859                 o = null;
 860                 p = head;
 861                 passedGo = true;
 862             } else {
 863                 o = sweeper;
 864                 p = o.next;
 865                 passedGo = false;
 866             }
 867 
 868             for (; probes > 0; probes--) {
 869                 if (p == null) {
 870                     if (passedGo)
 871                         break;
 872                     o = null;
 873                     p = head;
 874                     passedGo = true;
 875                 }
 876                 final Itr it = p.get();
 877                 final Node next = p.next;
 878                 if (it == null || it.isDetached()) {
 879                     // found a discarded/exhausted iterator
 880                     probes = LONG_SWEEP_PROBES; // "try harder"
 881                     // unlink p
 882                     p.clear();
 883                     p.next = null;
 884                     if (o == null) {
 885                         head = next;
 886                         if (next == null) {
 887                             // We've run out of iterators to track; retire
 888                             itrs = null;
 889                             return;
 890                         }
 891                     }
 892                     else
 893                         o.next = next;
 894                 } else {
 895                     o = p;
 896                 }
 897                 p = next;
 898             }
 899 
 900             this.sweeper = (p == null) ? null : o;
 901         }
 902 
 903         /**
 904          * Adds a new iterator to the linked list of tracked iterators.
 905          */
 906         void register(Itr itr) {
 907             // assert lock.isHeldByCurrentThread();
 908             head = new Node(itr, head);
 909         }
 910 
 911         /**
 912          * Called whenever takeIndex wraps around to 0.
 913          *
 914          * Notifies all iterators, and expunges any that are now stale.
 915          */
 916         void takeIndexWrapped() {
 917             // assert lock.isHeldByCurrentThread();
 918             cycles++;
 919             for (Node o = null, p = head; p != null;) {
 920                 final Itr it = p.get();
 921                 final Node next = p.next;
 922                 if (it == null || it.takeIndexWrapped()) {
 923                     // unlink p
 924                     // assert it == null || it.isDetached();
 925                     p.clear();
 926                     p.next = null;
 927                     if (o == null)
 928                         head = next;
 929                     else
 930                         o.next = next;
 931                 } else {
 932                     o = p;
 933                 }
 934                 p = next;
 935             }
 936             if (head == null)   // no more iterators to track
 937                 itrs = null;
 938         }
 939 
 940         /**
 941          * Called whenever an interior remove (not at takeIndex) occurred.
 942          *
 943          * Notifies all iterators, and expunges any that are now stale.
 944          */
 945         void removedAt(int removedIndex) {
 946             for (Node o = null, p = head; p != null;) {
 947                 final Itr it = p.get();
 948                 final Node next = p.next;
 949                 if (it == null || it.removedAt(removedIndex)) {
 950                     // unlink p
 951                     // assert it == null || it.isDetached();
 952                     p.clear();
 953                     p.next = null;
 954                     if (o == null)
 955                         head = next;
 956                     else
 957                         o.next = next;
 958                 } else {
 959                     o = p;
 960                 }
 961                 p = next;
 962             }
 963             if (head == null)   // no more iterators to track
 964                 itrs = null;
 965         }
 966 
 967         /**
 968          * Called whenever the queue becomes empty.
 969          *
 970          * Notifies all active iterators that the queue is empty,
 971          * clears all weak refs, and unlinks the itrs datastructure.
 972          */
 973         void queueIsEmpty() {
 974             // assert lock.isHeldByCurrentThread();
 975             for (Node p = head; p != null; p = p.next) {
 976                 Itr it = p.get();
 977                 if (it != null) {
 978                     p.clear();
 979                     it.shutdown();
 980                 }
 981             }
 982             head = null;
 983             itrs = null;
 984         }
 985 
 986         /**
 987          * Called whenever an element has been dequeued (at takeIndex).
 988          */
 989         void elementDequeued() {
 990             // assert lock.isHeldByCurrentThread();
 991             if (count == 0)
 992                 queueIsEmpty();
 993             else if (takeIndex == 0)
 994                 takeIndexWrapped();
 995         }
 996     }
 997 
 998     /**
 999      * Iterator for ArrayBlockingQueue.
1000      *
1001      * To maintain weak consistency with respect to puts and takes, we
1002      * read ahead one slot, so as to not report hasNext true but then
1003      * not have an element to return.
1004      *
1005      * We switch into "detached" mode (allowing prompt unlinking from
1006      * itrs without help from the GC) when all indices are negative, or
1007      * when hasNext returns false for the first time.  This allows the
1008      * iterator to track concurrent updates completely accurately,
1009      * except for the corner case of the user calling Iterator.remove()
1010      * after hasNext() returned false.  Even in this case, we ensure
1011      * that we don't remove the wrong element by keeping track of the
1012      * expected element to remove, in lastItem.  Yes, we may fail to
1013      * remove lastItem from the queue if it moved due to an interleaved
1014      * interior remove while in detached mode.
1015      *
1016      * Method forEachRemaining, added in Java 8, is treated similarly
1017      * to hasNext returning false, in that we switch to detached mode,
1018      * but we regard it as an even stronger request to "close" this
1019      * iteration, and don't bother supporting subsequent remove().
1020      */
1021     private class Itr implements Iterator<E> {
1022         /** Index to look for new nextItem; NONE at end */
1023         private int cursor;
1024 
1025         /** Element to be returned by next call to next(); null if none */
1026         private E nextItem;
1027 
1028         /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
1029         private int nextIndex;
1030 
1031         /** Last element returned; null if none or not detached. */
1032         private E lastItem;
1033 
1034         /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1035         private int lastRet;
1036 
1037         /** Previous value of takeIndex, or DETACHED when detached */
1038         private int prevTakeIndex;
1039 
1040         /** Previous value of iters.cycles */
1041         private int prevCycles;
1042 
1043         /** Special index value indicating "not available" or "undefined" */
1044         private static final int NONE = -1;
1045 
1046         /**
1047          * Special index value indicating "removed elsewhere", that is,
1048          * removed by some operation other than a call to this.remove().
1049          */
1050         private static final int REMOVED = -2;
1051 
1052         /** Special value for prevTakeIndex indicating "detached mode" */
1053         private static final int DETACHED = -3;
1054 
1055         Itr() {
1056             lastRet = NONE;
1057             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1058             lock.lock();
1059             try {
1060                 if (count == 0) {
1061                     // assert itrs == null;
1062                     cursor = NONE;
1063                     nextIndex = NONE;
1064                     prevTakeIndex = DETACHED;
1065                 } else {
1066                     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1067                     prevTakeIndex = takeIndex;
1068                     nextItem = itemAt(nextIndex = takeIndex);
1069                     cursor = incCursor(takeIndex);
1070                     if (itrs == null) {
1071                         itrs = new Itrs(this);
1072                     } else {
1073                         itrs.register(this); // in this order
1074                         itrs.doSomeSweeping(false);
1075                     }
1076                     prevCycles = itrs.cycles;
1077                     // assert takeIndex >= 0;
1078                     // assert prevTakeIndex == takeIndex;
1079                     // assert nextIndex >= 0;
1080                     // assert nextItem != null;
1081                 }
1082             } finally {
1083                 lock.unlock();
1084             }
1085         }
1086 
1087         boolean isDetached() {
1088             // assert lock.isHeldByCurrentThread();
1089             return prevTakeIndex < 0;
1090         }
1091 
1092         private int incCursor(int index) {
1093             // assert lock.isHeldByCurrentThread();
1094             if (++index == items.length) index = 0;
1095             if (index == putIndex) index = NONE;
1096             return index;
1097         }
1098 
1099         /**
1100          * Returns true if index is invalidated by the given number of
1101          * dequeues, starting from prevTakeIndex.
1102          */
1103         private boolean invalidated(int index, int prevTakeIndex,
1104                                     long dequeues, int length) {
1105             if (index < 0)
1106                 return false;
1107             int distance = index - prevTakeIndex;
1108             if (distance < 0)
1109                 distance += length;
1110             return dequeues > distance;
1111         }
1112 
1113         /**
1114          * Adjusts indices to incorporate all dequeues since the last
1115          * operation on this iterator.  Call only from iterating thread.
1116          */
1117         private void incorporateDequeues() {
1118             // assert lock.isHeldByCurrentThread();
1119             // assert itrs != null;
1120             // assert !isDetached();
1121             // assert count > 0;
1122 
1123             final int cycles = itrs.cycles;
1124             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1125             final int prevCycles = this.prevCycles;
1126             final int prevTakeIndex = this.prevTakeIndex;
1127 
1128             if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1129                 final int len = items.length;
1130                 // how far takeIndex has advanced since the previous
1131                 // operation of this iterator
1132                 long dequeues = (cycles - prevCycles) * len
1133                     + (takeIndex - prevTakeIndex);
1134 
1135                 // Check indices for invalidation
1136                 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1137                     lastRet = REMOVED;
1138                 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1139                     nextIndex = REMOVED;
1140                 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1141                     cursor = takeIndex;
1142 
1143                 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1144                     detach();
1145                 else {
1146                     this.prevCycles = cycles;
1147                     this.prevTakeIndex = takeIndex;
1148                 }
1149             }
1150         }
1151 
1152         /**
1153          * Called when itrs should stop tracking this iterator, either
1154          * because there are no more indices to update (cursor < 0 &&
1155          * nextIndex < 0 && lastRet < 0) or as a special exception, when
1156          * lastRet >= 0, because hasNext() is about to return false for the
1157          * first time.  Call only from iterating thread.
1158          */
1159         private void detach() {
1160             // Switch to detached mode
1161             // assert lock.isHeldByCurrentThread();
1162             // assert cursor == NONE;
1163             // assert nextIndex < 0;
1164             // assert lastRet < 0 || nextItem == null;
1165             // assert lastRet < 0 ^ lastItem != null;
1166             if (prevTakeIndex >= 0) {
1167                 // assert itrs != null;
1168                 prevTakeIndex = DETACHED;
1169                 // try to unlink from itrs (but not too hard)
1170                 itrs.doSomeSweeping(true);
1171             }
1172         }
1173 
1174         /**
1175          * For performance reasons, we would like not to acquire a lock in
1176          * hasNext in the common case.  To allow for this, we only access
1177          * fields (i.e. nextItem) that are not modified by update operations
1178          * triggered by queue modifications.
1179          */
1180         public boolean hasNext() {
1181             if (nextItem != null)
1182                 return true;
1183             noNext();
1184             return false;
1185         }
1186 
1187         private void noNext() {
1188             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1189             lock.lock();
1190             try {
1191                 // assert cursor == NONE;
1192                 // assert nextIndex == NONE;
1193                 if (!isDetached()) {
1194                     // assert lastRet >= 0;
1195                     incorporateDequeues(); // might update lastRet
1196                     if (lastRet >= 0) {
1197                         lastItem = itemAt(lastRet);
1198                         // assert lastItem != null;
1199                         detach();
1200                     }
1201                 }
1202                 // assert isDetached();
1203                 // assert lastRet < 0 ^ lastItem != null;
1204             } finally {
1205                 lock.unlock();
1206             }
1207         }
1208 
1209         public E next() {
1210             final E e = nextItem;
1211             if (e == null)
1212                 throw new NoSuchElementException();
1213             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1214             lock.lock();
1215             try {
1216                 if (!isDetached())
1217                     incorporateDequeues();
1218                 // assert nextIndex != NONE;
1219                 // assert lastItem == null;
1220                 lastRet = nextIndex;
1221                 final int cursor = this.cursor;
1222                 if (cursor >= 0) {
1223                     nextItem = itemAt(nextIndex = cursor);
1224                     // assert nextItem != null;
1225                     this.cursor = incCursor(cursor);
1226                 } else {
1227                     nextIndex = NONE;
1228                     nextItem = null;
1229                     if (lastRet == REMOVED) detach();
1230                 }
1231             } finally {
1232                 lock.unlock();
1233             }
1234             return e;
1235         }
1236 
1237         public void forEachRemaining(Consumer<? super E> action) {
1238             Objects.requireNonNull(action);
1239             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1240             lock.lock();
1241             try {
1242                 final E e = nextItem;
1243                 if (e == null) return;
1244                 if (!isDetached())
1245                     incorporateDequeues();
1246                 action.accept(e);
1247                 if (isDetached() || cursor < 0) return;
1248                 final Object[] items = ArrayBlockingQueue.this.items;
1249                 for (int i = cursor, end = putIndex,
1250                          to = (i < end) ? end : items.length;
1251                      ; i = 0, to = end) {
1252                     for (; i < to; i++)
1253                         action.accept(itemAt(items, i));
1254                     if (to == end) break;
1255                 }
1256             } finally {
1257                 // Calling forEachRemaining is a strong hint that this
1258                 // iteration is surely over; supporting remove() after
1259                 // forEachRemaining() is more trouble than it's worth
1260                 cursor = nextIndex = lastRet = NONE;
1261                 nextItem = lastItem = null;
1262                 detach();
1263                 lock.unlock();
1264             }
1265         }
1266 
1267         public void remove() {
1268             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1269             lock.lock();
1270             // assert lock.getHoldCount() == 1;
1271             try {
1272                 if (!isDetached())
1273                     incorporateDequeues(); // might update lastRet or detach
1274                 final int lastRet = this.lastRet;
1275                 this.lastRet = NONE;
1276                 if (lastRet >= 0) {
1277                     if (!isDetached())
1278                         removeAt(lastRet);
1279                     else {
1280                         final E lastItem = this.lastItem;
1281                         // assert lastItem != null;
1282                         this.lastItem = null;
1283                         if (itemAt(lastRet) == lastItem)
1284                             removeAt(lastRet);
1285                     }
1286                 } else if (lastRet == NONE)
1287                     throw new IllegalStateException();
1288                 // else lastRet == REMOVED and the last returned element was
1289                 // previously asynchronously removed via an operation other
1290                 // than this.remove(), so nothing to do.
1291 
1292                 if (cursor < 0 && nextIndex < 0)
1293                     detach();
1294             } finally {
1295                 lock.unlock();
1296                 // assert lastRet == NONE;
1297                 // assert lastItem == null;
1298             }
1299         }
1300 
1301         /**
1302          * Called to notify the iterator that the queue is empty, or that it
1303          * has fallen hopelessly behind, so that it should abandon any
1304          * further iteration, except possibly to return one more element
1305          * from next(), as promised by returning true from hasNext().
1306          */
1307         void shutdown() {
1308             // assert lock.isHeldByCurrentThread();
1309             cursor = NONE;
1310             if (nextIndex >= 0)
1311                 nextIndex = REMOVED;
1312             if (lastRet >= 0) {
1313                 lastRet = REMOVED;
1314                 lastItem = null;
1315             }
1316             prevTakeIndex = DETACHED;
1317             // Don't set nextItem to null because we must continue to be
1318             // able to return it on next().
1319             //
1320             // Caller will unlink from itrs when convenient.
1321         }
1322 
1323         private int distance(int index, int prevTakeIndex, int length) {
1324             int distance = index - prevTakeIndex;
1325             if (distance < 0)
1326                 distance += length;
1327             return distance;
1328         }
1329 
1330         /**
1331          * Called whenever an interior remove (not at takeIndex) occurred.
1332          *
1333          * @return true if this iterator should be unlinked from itrs
1334          */
1335         boolean removedAt(int removedIndex) {
1336             // assert lock.isHeldByCurrentThread();
1337             if (isDetached())
1338                 return true;
1339 
1340             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1341             final int prevTakeIndex = this.prevTakeIndex;
1342             final int len = items.length;
1343             // distance from prevTakeIndex to removedIndex
1344             final int removedDistance =
1345                 len * (itrs.cycles - this.prevCycles
1346                        + ((removedIndex < takeIndex) ? 1 : 0))
1347                 + (removedIndex - prevTakeIndex);
1348             // assert itrs.cycles - this.prevCycles >= 0;
1349             // assert itrs.cycles - this.prevCycles <= 1;
1350             // assert removedDistance > 0;
1351             // assert removedIndex != takeIndex;
1352             int cursor = this.cursor;
1353             if (cursor >= 0) {
1354                 int x = distance(cursor, prevTakeIndex, len);
1355                 if (x == removedDistance) {
1356                     if (cursor == putIndex)
1357                         this.cursor = cursor = NONE;
1358                 }
1359                 else if (x > removedDistance) {
1360                     // assert cursor != prevTakeIndex;
1361                     this.cursor = cursor = dec(cursor, len);
1362                 }
1363             }
1364             int lastRet = this.lastRet;
1365             if (lastRet >= 0) {
1366                 int x = distance(lastRet, prevTakeIndex, len);
1367                 if (x == removedDistance)
1368                     this.lastRet = lastRet = REMOVED;
1369                 else if (x > removedDistance)
1370                     this.lastRet = lastRet = dec(lastRet, len);
1371             }
1372             int nextIndex = this.nextIndex;
1373             if (nextIndex >= 0) {
1374                 int x = distance(nextIndex, prevTakeIndex, len);
1375                 if (x == removedDistance)
1376                     this.nextIndex = nextIndex = REMOVED;
1377                 else if (x > removedDistance)
1378                     this.nextIndex = nextIndex = dec(nextIndex, len);
1379             }
1380             if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1381                 this.prevTakeIndex = DETACHED;
1382                 return true;
1383             }
1384             return false;
1385         }
1386 
1387         /**
1388          * Called whenever takeIndex wraps around to zero.
1389          *
1390          * @return true if this iterator should be unlinked from itrs
1391          */
1392         boolean takeIndexWrapped() {
1393             // assert lock.isHeldByCurrentThread();
1394             if (isDetached())
1395                 return true;
1396             if (itrs.cycles - prevCycles > 1) {
1397                 // All the elements that existed at the time of the last
1398                 // operation are gone, so abandon further iteration.
1399                 shutdown();
1400                 return true;
1401             }
1402             return false;
1403         }
1404 
1405 //         /** Uncomment for debugging. */
1406 //         public String toString() {
1407 //             return ("cursor=" + cursor + " " +
1408 //                     "nextIndex=" + nextIndex + " " +
1409 //                     "lastRet=" + lastRet + " " +
1410 //                     "nextItem=" + nextItem + " " +
1411 //                     "lastItem=" + lastItem + " " +
1412 //                     "prevCycles=" + prevCycles + " " +
1413 //                     "prevTakeIndex=" + prevTakeIndex + " " +
1414 //                     "size()=" + size() + " " +
1415 //                     "remainingCapacity()=" + remainingCapacity());
1416 //         }
1417     }
1418 
1419     /**
1420      * Returns a {@link Spliterator} over the elements in this queue.
1421      *
1422      * <p>The returned spliterator is
1423      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1424      *
1425      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1426      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1427      *
1428      * @implNote
1429      * The {@code Spliterator} implements {@code trySplit} to permit limited
1430      * parallelism.
1431      *
1432      * @return a {@code Spliterator} over the elements in this queue
1433      * @since 1.8
1434      */
1435     public Spliterator<E> spliterator() {
1436         return Spliterators.spliterator
1437             (this, (Spliterator.ORDERED |
1438                     Spliterator.NONNULL |
1439                     Spliterator.CONCURRENT));
1440     }
1441 
1442     /**
1443      * @throws NullPointerException {@inheritDoc}

1444      */
1445     public void forEach(Consumer<? super E> action) {
1446         Objects.requireNonNull(action);
1447         final ReentrantLock lock = this.lock;
1448         lock.lock();
1449         try {
1450             if (count > 0) {
1451                 final Object[] items = this.items;
1452                 for (int i = takeIndex, end = putIndex,
1453                          to = (i < end) ? end : items.length;
1454                      ; i = 0, to = end) {
1455                     for (; i < to; i++)
1456                         action.accept(itemAt(items, i));
1457                     if (to == end) break;
1458                 }
1459             }
1460         } finally {
1461             lock.unlock();
1462         }
1463     }
1464 
1465     /**
1466      * @throws NullPointerException {@inheritDoc}

1467      */
1468     public boolean removeIf(Predicate<? super E> filter) {
1469         Objects.requireNonNull(filter);
1470         return bulkRemove(filter);
1471     }
1472 
1473     /**
1474      * @throws NullPointerException {@inheritDoc}

1475      */
1476     public boolean removeAll(Collection<?> c) {
1477         Objects.requireNonNull(c);
1478         return bulkRemove(e -> c.contains(e));
1479     }
1480 
1481     /**
1482      * @throws NullPointerException {@inheritDoc}

1483      */
1484     public boolean retainAll(Collection<?> c) {
1485         Objects.requireNonNull(c);
1486         return bulkRemove(e -> !c.contains(e));
1487     }
1488 
1489     /** Implementation of bulk remove methods. */
1490     private boolean bulkRemove(Predicate<? super E> filter) {
1491         final ReentrantLock lock = this.lock;
1492         lock.lock();
1493         try {
1494             if (itrs == null) { // check for active iterators
1495                 if (count > 0) {
1496                     final Object[] items = this.items;
1497                     // Optimize for initial run of survivors
1498                     for (int i = takeIndex, end = putIndex,
1499                              to = (i < end) ? end : items.length;
1500                          ; i = 0, to = end) {
1501                         for (; i < to; i++)
1502                             if (filter.test(itemAt(items, i)))
1503                                 return bulkRemoveModified(filter, i);
1504                         if (to == end) break;
1505                     }
1506                 }
1507                 return false;
1508             }
1509         } finally {
1510             lock.unlock();
1511         }
1512         // Active iterators are too hairy!
1513         // Punting (for now) to the slow n^2 algorithm ...
1514         return super.removeIf(filter);
1515     }
1516 
1517     // A tiny bit set implementation
1518 
1519     private static long[] nBits(int n) {
1520         return new long[((n - 1) >> 6) + 1];
1521     }
1522     private static void setBit(long[] bits, int i) {
1523         bits[i >> 6] |= 1L << i;
1524     }
1525     private static boolean isClear(long[] bits, int i) {
1526         return (bits[i >> 6] & (1L << i)) == 0;
1527     }
1528 
1529     /**
1530      * Returns circular distance from i to j, disambiguating i == j to
1531      * items.length; never returns 0.
1532      */
1533     private int distanceNonEmpty(int i, int j) {
1534         if ((j -= i) <= 0) j += items.length;
1535         return j;
1536     }
1537 
1538     /**
1539      * Helper for bulkRemove, in case of at least one deletion.
1540      * Tolerate predicates that reentrantly access the collection for
1541      * read (but not write), so traverse once to find elements to
1542      * delete, a second pass to physically expunge.
1543      *
1544      * @param beg valid index of first element to be deleted
1545      */
1546     private boolean bulkRemoveModified(
1547         Predicate<? super E> filter, final int beg) {
1548         final Object[] es = items;
1549         final int capacity = items.length;
1550         final int end = putIndex;
1551         final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
1552         deathRow[0] = 1L;   // set bit 0
1553         for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1554              ; i = 0, to = end, k -= capacity) {
1555             for (; i < to; i++)
1556                 if (filter.test(itemAt(es, i)))
1557                     setBit(deathRow, i - k);
1558             if (to == end) break;
1559         }
1560         // a two-finger traversal, with hare i reading, tortoise w writing
1561         int w = beg;
1562         for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1563              ; w = 0) { // w rejoins i on second leg
1564             // In this loop, i and w are on the same leg, with i > w
1565             for (; i < to; i++)
1566                 if (isClear(deathRow, i - k))
1567                     es[w++] = es[i];
1568             if (to == end) break;
1569             // In this loop, w is on the first leg, i on the second
1570             for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
1571                 if (isClear(deathRow, i - k))
1572                     es[w++] = es[i];
1573             if (i >= to) {
1574                 if (w == capacity) w = 0; // "corner" case
1575                 break;
1576             }
1577         }
1578         count -= distanceNonEmpty(w, end);
1579         circularClear(es, putIndex = w, end);
1580         return true;
1581     }
1582 
1583     /** debugging */
1584     void checkInvariants() {
1585         // meta-assertions
1586         // assert lock.isHeldByCurrentThread();
1587         try {
1588             // Unlike ArrayDeque, we have a count field but no spare slot.
1589             // We prefer ArrayDeque's strategy (and the names of its fields!),
1590             // but our field layout is baked into the serial form, and so is
1591             // too annoying to change.
1592             //
1593             // putIndex == takeIndex must be disambiguated by checking count.
1594             int capacity = items.length;
1595             // assert capacity > 0;
1596             // assert takeIndex >= 0 && takeIndex < capacity;
1597             // assert putIndex >= 0 && putIndex < capacity;
1598             // assert count <= capacity;
1599             // assert takeIndex == putIndex || items[takeIndex] != null;
1600             // assert count == capacity || items[putIndex] == null;
1601             // assert takeIndex == putIndex || items[dec(putIndex, capacity)] != null;
1602         } catch (Throwable t) {
1603             System.err.printf("takeIndex=%d putIndex=%d count=%d capacity=%d%n",
1604                               takeIndex, putIndex, count, items.length);
1605             System.err.printf("items=%s%n",
1606                               Arrays.toString(items));
1607             throw t;
1608         }
1609     }
1610 
1611 }
--- EOF ---