1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.AbstractQueue;
  39 import java.util.Collection;
  40 import java.util.Iterator;
  41 import java.util.NoSuchElementException;
  42 import java.util.Objects;
  43 import java.util.Spliterator;
  44 import java.util.Spliterators;
  45 import java.util.concurrent.atomic.AtomicInteger;
  46 import java.util.concurrent.locks.Condition;
  47 import java.util.concurrent.locks.ReentrantLock;
  48 import java.util.function.Consumer;
  49 import java.util.function.Predicate;
  50 
  51 /**
  52  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
  53  * linked nodes.
  54  * This queue orders elements FIFO (first-in-first-out).
  55  * The <em>head</em> of the queue is that element that has been on the
  56  * queue the longest time.
  57  * The <em>tail</em> of the queue is that element that has been on the
  58  * queue the shortest time. New elements
  59  * are inserted at the tail of the queue, and the queue retrieval
  60  * operations obtain elements at the head of the queue.
  61  * Linked queues typically have higher throughput than array-based queues but
  62  * less predictable performance in most concurrent applications.
  63  *
  64  * <p>The optional capacity bound constructor argument serves as a
  65  * way to prevent excessive queue expansion. The capacity, if unspecified,
  66  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
  67  * dynamically created upon each insertion unless this would bring the
  68  * queue above capacity.
  69  *
  70  * <p>This class and its iterator implement all of the <em>optional</em>
  71  * methods of the {@link Collection} and {@link Iterator} interfaces.
  72  *
  73  * <p>This class is a member of the
  74  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  75  * Java Collections Framework</a>.
  76  *
  77  * @since 1.5
  78  * @author Doug Lea
  79  * @param <E> the type of elements held in this queue
  80  */
  81 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  82         implements BlockingQueue<E>, java.io.Serializable {
  83     private static final long serialVersionUID = -6903933977591709194L;
  84 
  85     /*
  86      * A variant of the "two lock queue" algorithm.  The putLock gates
  87      * entry to put (and offer), and has an associated condition for
  88      * waiting puts.  Similarly for the takeLock.  The "count" field
  89      * that they both rely on is maintained as an atomic to avoid
  90      * needing to get both locks in most cases. Also, to minimize need
  91      * for puts to get takeLock and vice-versa, cascading notifies are
  92      * used. When a put notices that it has enabled at least one take,
  93      * it signals taker. That taker in turn signals others if more
  94      * items have been entered since the signal. And symmetrically for
  95      * takes signalling puts. Operations such as remove(Object) and
  96      * iterators acquire both locks.
  97      *
  98      * Visibility between writers and readers is provided as follows:
  99      *
 100      * Whenever an element is enqueued, the putLock is acquired and
 101      * count updated.  A subsequent reader guarantees visibility to the
 102      * enqueued Node by either acquiring the putLock (via fullyLock)
 103      * or by acquiring the takeLock, and then reading n = count.get();
 104      * this gives visibility to the first n items.
 105      *
 106      * To implement weakly consistent iterators, it appears we need to
 107      * keep all Nodes GC-reachable from a predecessor dequeued Node.
 108      * That would cause two problems:
 109      * - allow a rogue Iterator to cause unbounded memory retention
 110      * - cause cross-generational linking of old Nodes to new Nodes if
 111      *   a Node was tenured while live, which generational GCs have a
 112      *   hard time dealing with, causing repeated major collections.
 113      * However, only non-deleted Nodes need to be reachable from
 114      * dequeued Nodes, and reachability does not necessarily have to
 115      * be of the kind understood by the GC.  We use the trick of
 116      * linking a Node that has just been dequeued to itself.  Such a
 117      * self-link implicitly means to advance to head.next.
 118      */
 119 
 120     /**
 121      * Linked list node class.
 122      */
 123     static class Node<E> {
 124         E item;
 125 
 126         /**
 127          * One of:
 128          * - the real successor Node
 129          * - this Node, meaning the successor is head.next
 130          * - null, meaning there is no successor (this is the last node)
 131          */
 132         Node<E> next;
 133 
 134         Node(E x) { item = x; }
 135     }
 136 
 137     /** The capacity bound, or Integer.MAX_VALUE if none */
 138     private final int capacity;
 139 
 140     /** Current number of elements */
 141     private final AtomicInteger count = new AtomicInteger();
 142 
 143     /**
 144      * Head of linked list.
 145      * Invariant: head.item == null
 146      */
 147     transient Node<E> head;
 148 
 149     /**
 150      * Tail of linked list.
 151      * Invariant: last.next == null
 152      */
 153     private transient Node<E> last;
 154 
 155     /** Lock held by take, poll, etc */
 156     private final ReentrantLock takeLock = new ReentrantLock();
 157 
 158     /** Wait queue for waiting takes */
 159     private final Condition notEmpty = takeLock.newCondition();
 160 
 161     /** Lock held by put, offer, etc */
 162     private final ReentrantLock putLock = new ReentrantLock();
 163 
 164     /** Wait queue for waiting puts */
 165     private final Condition notFull = putLock.newCondition();
 166 
 167     /**
 168      * Signals a waiting take. Called only from put/offer (which do not
 169      * otherwise ordinarily lock takeLock.)
 170      */
 171     private void signalNotEmpty() {
 172         final ReentrantLock takeLock = this.takeLock;
 173         takeLock.lock();
 174         try {
 175             notEmpty.signal();
 176         } finally {
 177             takeLock.unlock();
 178         }
 179     }
 180 
 181     /**
 182      * Signals a waiting put. Called only from take/poll.
 183      */
 184     private void signalNotFull() {
 185         final ReentrantLock putLock = this.putLock;
 186         putLock.lock();
 187         try {
 188             notFull.signal();
 189         } finally {
 190             putLock.unlock();
 191         }
 192     }
 193 
 194     /**
 195      * Links node at end of queue.
 196      *
 197      * @param node the node
 198      */
 199     private void enqueue(Node<E> node) {
 200         // assert putLock.isHeldByCurrentThread();
 201         // assert last.next == null;
 202         last = last.next = node;
 203     }
 204 
 205     /**
 206      * Removes a node from head of queue.
 207      *
 208      * @return the node
 209      */
 210     private E dequeue() {
 211         // assert takeLock.isHeldByCurrentThread();
 212         // assert head.item == null;
 213         Node<E> h = head;
 214         Node<E> first = h.next;
 215         h.next = h; // help GC
 216         head = first;
 217         E x = first.item;
 218         first.item = null;
 219         return x;
 220     }
 221 
 222     /**
 223      * Locks to prevent both puts and takes.
 224      */
 225     void fullyLock() {
 226         putLock.lock();
 227         takeLock.lock();
 228     }
 229 
 230     /**
 231      * Unlocks to allow both puts and takes.
 232      */
 233     void fullyUnlock() {
 234         takeLock.unlock();
 235         putLock.unlock();
 236     }
 237 
 238     /**
 239      * Creates a {@code LinkedBlockingQueue} with a capacity of
 240      * {@link Integer#MAX_VALUE}.
 241      */
 242     public LinkedBlockingQueue() {
 243         this(Integer.MAX_VALUE);
 244     }
 245 
 246     /**
 247      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
 248      *
 249      * @param capacity the capacity of this queue
 250      * @throws IllegalArgumentException if {@code capacity} is not greater
 251      *         than zero
 252      */
 253     public LinkedBlockingQueue(int capacity) {
 254         if (capacity <= 0) throw new IllegalArgumentException();
 255         this.capacity = capacity;
 256         last = head = new Node<E>(null);
 257     }
 258 
 259     /**
 260      * Creates a {@code LinkedBlockingQueue} with a capacity of
 261      * {@link Integer#MAX_VALUE}, initially containing the elements of the
 262      * given collection,
 263      * added in traversal order of the collection's iterator.
 264      *
 265      * @param c the collection of elements to initially contain
 266      * @throws NullPointerException if the specified collection or any
 267      *         of its elements are null
 268      */
 269     public LinkedBlockingQueue(Collection<? extends E> c) {
 270         this(Integer.MAX_VALUE);
 271         final ReentrantLock putLock = this.putLock;
 272         putLock.lock(); // Never contended, but necessary for visibility
 273         try {
 274             int n = 0;
 275             for (E e : c) {
 276                 if (e == null)
 277                     throw new NullPointerException();
 278                 if (n == capacity)
 279                     throw new IllegalStateException("Queue full");
 280                 enqueue(new Node<E>(e));
 281                 ++n;
 282             }
 283             count.set(n);
 284         } finally {
 285             putLock.unlock();
 286         }
 287     }
 288 
 289     // this doc comment is overridden to remove the reference to collections
 290     // greater in size than Integer.MAX_VALUE
 291     /**
 292      * Returns the number of elements in this queue.
 293      *
 294      * @return the number of elements in this queue
 295      */
 296     public int size() {
 297         return count.get();
 298     }
 299 
 300     // this doc comment is a modified copy of the inherited doc comment,
 301     // without the reference to unlimited queues.
 302     /**
 303      * Returns the number of additional elements that this queue can ideally
 304      * (in the absence of memory or resource constraints) accept without
 305      * blocking. This is always equal to the initial capacity of this queue
 306      * less the current {@code size} of this queue.
 307      *
 308      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
 309      * an element will succeed by inspecting {@code remainingCapacity}
 310      * because it may be the case that another thread is about to
 311      * insert or remove an element.
 312      */
 313     public int remainingCapacity() {
 314         return capacity - count.get();
 315     }
 316 
 317     /**
 318      * Inserts the specified element at the tail of this queue, waiting if
 319      * necessary for space to become available.
 320      *
 321      * @throws InterruptedException {@inheritDoc}
 322      * @throws NullPointerException {@inheritDoc}
 323      */
 324     public void put(E e) throws InterruptedException {
 325         if (e == null) throw new NullPointerException();
 326         // Note: convention in all put/take/etc is to preset local var
 327         // holding count negative to indicate failure unless set.
 328         int c = -1;
 329         Node<E> node = new Node<E>(e);
 330         final ReentrantLock putLock = this.putLock;
 331         final AtomicInteger count = this.count;
 332         putLock.lockInterruptibly();
 333         try {
 334             /*
 335              * Note that count is used in wait guard even though it is
 336              * not protected by lock. This works because count can
 337              * only decrease at this point (all other puts are shut
 338              * out by lock), and we (or some other waiting put) are
 339              * signalled if it ever changes from capacity. Similarly
 340              * for all other uses of count in other wait guards.
 341              */
 342             while (count.get() == capacity) {
 343                 notFull.await();
 344             }
 345             enqueue(node);
 346             c = count.getAndIncrement();
 347             if (c + 1 < capacity)
 348                 notFull.signal();
 349         } finally {
 350             putLock.unlock();
 351         }
 352         if (c == 0)
 353             signalNotEmpty();
 354     }
 355 
 356     /**
 357      * Inserts the specified element at the tail of this queue, waiting if
 358      * necessary up to the specified wait time for space to become available.
 359      *
 360      * @return {@code true} if successful, or {@code false} if
 361      *         the specified waiting time elapses before space is available
 362      * @throws InterruptedException {@inheritDoc}
 363      * @throws NullPointerException {@inheritDoc}
 364      */
 365     public boolean offer(E e, long timeout, TimeUnit unit)
 366         throws InterruptedException {
 367 
 368         if (e == null) throw new NullPointerException();
 369         long nanos = unit.toNanos(timeout);
 370         int c = -1;
 371         final ReentrantLock putLock = this.putLock;
 372         final AtomicInteger count = this.count;
 373         putLock.lockInterruptibly();
 374         try {
 375             while (count.get() == capacity) {
 376                 if (nanos <= 0L)
 377                     return false;
 378                 nanos = notFull.awaitNanos(nanos);
 379             }
 380             enqueue(new Node<E>(e));
 381             c = count.getAndIncrement();
 382             if (c + 1 < capacity)
 383                 notFull.signal();
 384         } finally {
 385             putLock.unlock();
 386         }
 387         if (c == 0)
 388             signalNotEmpty();
 389         return true;
 390     }
 391 
 392     /**
 393      * Inserts the specified element at the tail of this queue if it is
 394      * possible to do so immediately without exceeding the queue's capacity,
 395      * returning {@code true} upon success and {@code false} if this queue
 396      * is full.
 397      * When using a capacity-restricted queue, this method is generally
 398      * preferable to method {@link BlockingQueue#add add}, which can fail to
 399      * insert an element only by throwing an exception.
 400      *
 401      * @throws NullPointerException if the specified element is null
 402      */
 403     public boolean offer(E e) {
 404         if (e == null) throw new NullPointerException();
 405         final AtomicInteger count = this.count;
 406         if (count.get() == capacity)
 407             return false;
 408         int c = -1;
 409         Node<E> node = new Node<E>(e);
 410         final ReentrantLock putLock = this.putLock;
 411         putLock.lock();
 412         try {
 413             if (count.get() < capacity) {
 414                 enqueue(node);
 415                 c = count.getAndIncrement();
 416                 if (c + 1 < capacity)
 417                     notFull.signal();
 418             }
 419         } finally {
 420             putLock.unlock();
 421         }
 422         if (c == 0)
 423             signalNotEmpty();
 424         return c >= 0;
 425     }
 426 
 427     public E take() throws InterruptedException {
 428         E x;
 429         int c = -1;
 430         final AtomicInteger count = this.count;
 431         final ReentrantLock takeLock = this.takeLock;
 432         takeLock.lockInterruptibly();
 433         try {
 434             while (count.get() == 0) {
 435                 notEmpty.await();
 436             }
 437             x = dequeue();
 438             c = count.getAndDecrement();
 439             if (c > 1)
 440                 notEmpty.signal();
 441         } finally {
 442             takeLock.unlock();
 443         }
 444         if (c == capacity)
 445             signalNotFull();
 446         return x;
 447     }
 448 
 449     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 450         E x = null;
 451         int c = -1;
 452         long nanos = unit.toNanos(timeout);
 453         final AtomicInteger count = this.count;
 454         final ReentrantLock takeLock = this.takeLock;
 455         takeLock.lockInterruptibly();
 456         try {
 457             while (count.get() == 0) {
 458                 if (nanos <= 0L)
 459                     return null;
 460                 nanos = notEmpty.awaitNanos(nanos);
 461             }
 462             x = dequeue();
 463             c = count.getAndDecrement();
 464             if (c > 1)
 465                 notEmpty.signal();
 466         } finally {
 467             takeLock.unlock();
 468         }
 469         if (c == capacity)
 470             signalNotFull();
 471         return x;
 472     }
 473 
 474     public E poll() {
 475         final AtomicInteger count = this.count;
 476         if (count.get() == 0)
 477             return null;
 478         E x = null;
 479         int c = -1;
 480         final ReentrantLock takeLock = this.takeLock;
 481         takeLock.lock();
 482         try {
 483             if (count.get() > 0) {
 484                 x = dequeue();
 485                 c = count.getAndDecrement();
 486                 if (c > 1)
 487                     notEmpty.signal();
 488             }
 489         } finally {
 490             takeLock.unlock();
 491         }
 492         if (c == capacity)
 493             signalNotFull();
 494         return x;
 495     }
 496 
 497     public E peek() {
 498         if (count.get() == 0)
 499             return null;
 500         final ReentrantLock takeLock = this.takeLock;
 501         takeLock.lock();
 502         try {
 503             return (count.get() > 0) ? head.next.item : null;
 504         } finally {
 505             takeLock.unlock();
 506         }
 507     }
 508 
 509     /**
 510      * Unlinks interior Node p with predecessor pred.
 511      */
 512     void unlink(Node<E> p, Node<E> pred) {
 513         // assert putLock.isHeldByCurrentThread();
 514         // assert takeLock.isHeldByCurrentThread();
 515         // p.next is not changed, to allow iterators that are
 516         // traversing p to maintain their weak-consistency guarantee.
 517         p.item = null;
 518         pred.next = p.next;
 519         if (last == p)
 520             last = pred;
 521         if (count.getAndDecrement() == capacity)
 522             notFull.signal();
 523     }
 524 
 525     /**
 526      * Removes a single instance of the specified element from this queue,
 527      * if it is present.  More formally, removes an element {@code e} such
 528      * that {@code o.equals(e)}, if this queue contains one or more such
 529      * elements.
 530      * Returns {@code true} if this queue contained the specified element
 531      * (or equivalently, if this queue changed as a result of the call).
 532      *
 533      * @param o element to be removed from this queue, if present
 534      * @return {@code true} if this queue changed as a result of the call
 535      */
 536     public boolean remove(Object o) {
 537         if (o == null) return false;
 538         fullyLock();
 539         try {
 540             for (Node<E> pred = head, p = pred.next;
 541                  p != null;
 542                  pred = p, p = p.next) {
 543                 if (o.equals(p.item)) {
 544                     unlink(p, pred);
 545                     return true;
 546                 }
 547             }
 548             return false;
 549         } finally {
 550             fullyUnlock();
 551         }
 552     }
 553 
 554     /**
 555      * Returns {@code true} if this queue contains the specified element.
 556      * More formally, returns {@code true} if and only if this queue contains
 557      * at least one element {@code e} such that {@code o.equals(e)}.
 558      *
 559      * @param o object to be checked for containment in this queue
 560      * @return {@code true} if this queue contains the specified element
 561      */
 562     public boolean contains(Object o) {
 563         if (o == null) return false;
 564         fullyLock();
 565         try {
 566             for (Node<E> p = head.next; p != null; p = p.next)
 567                 if (o.equals(p.item))
 568                     return true;
 569             return false;
 570         } finally {
 571             fullyUnlock();
 572         }
 573     }
 574 
 575     /**
 576      * Returns an array containing all of the elements in this queue, in
 577      * proper sequence.
 578      *
 579      * <p>The returned array will be "safe" in that no references to it are
 580      * maintained by this queue.  (In other words, this method must allocate
 581      * a new array).  The caller is thus free to modify the returned array.
 582      *
 583      * <p>This method acts as bridge between array-based and collection-based
 584      * APIs.
 585      *
 586      * @return an array containing all of the elements in this queue
 587      */
 588     public Object[] toArray() {
 589         fullyLock();
 590         try {
 591             int size = count.get();
 592             Object[] a = new Object[size];
 593             int k = 0;
 594             for (Node<E> p = head.next; p != null; p = p.next)
 595                 a[k++] = p.item;
 596             return a;
 597         } finally {
 598             fullyUnlock();
 599         }
 600     }
 601 
 602     /**
 603      * Returns an array containing all of the elements in this queue, in
 604      * proper sequence; the runtime type of the returned array is that of
 605      * the specified array.  If the queue fits in the specified array, it
 606      * is returned therein.  Otherwise, a new array is allocated with the
 607      * runtime type of the specified array and the size of this queue.
 608      *
 609      * <p>If this queue fits in the specified array with room to spare
 610      * (i.e., the array has more elements than this queue), the element in
 611      * the array immediately following the end of the queue is set to
 612      * {@code null}.
 613      *
 614      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 615      * array-based and collection-based APIs.  Further, this method allows
 616      * precise control over the runtime type of the output array, and may,
 617      * under certain circumstances, be used to save allocation costs.
 618      *
 619      * <p>Suppose {@code x} is a queue known to contain only strings.
 620      * The following code can be used to dump the queue into a newly
 621      * allocated array of {@code String}:
 622      *
 623      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
 624      *
 625      * Note that {@code toArray(new Object[0])} is identical in function to
 626      * {@code toArray()}.
 627      *
 628      * @param a the array into which the elements of the queue are to
 629      *          be stored, if it is big enough; otherwise, a new array of the
 630      *          same runtime type is allocated for this purpose
 631      * @return an array containing all of the elements in this queue
 632      * @throws ArrayStoreException if the runtime type of the specified array
 633      *         is not a supertype of the runtime type of every element in
 634      *         this queue
 635      * @throws NullPointerException if the specified array is null
 636      */
 637     @SuppressWarnings("unchecked")
 638     public <T> T[] toArray(T[] a) {
 639         fullyLock();
 640         try {
 641             int size = count.get();
 642             if (a.length < size)
 643                 a = (T[])java.lang.reflect.Array.newInstance
 644                     (a.getClass().getComponentType(), size);
 645 
 646             int k = 0;
 647             for (Node<E> p = head.next; p != null; p = p.next)
 648                 a[k++] = (T)p.item;
 649             if (a.length > k)
 650                 a[k] = null;
 651             return a;
 652         } finally {
 653             fullyUnlock();
 654         }
 655     }
 656 
 657     public String toString() {
 658         return Helpers.collectionToString(this);
 659     }
 660 
 661     /**
 662      * Atomically removes all of the elements from this queue.
 663      * The queue will be empty after this call returns.
 664      */
 665     public void clear() {
 666         fullyLock();
 667         try {
 668             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
 669                 h.next = h;
 670                 p.item = null;
 671             }
 672             head = last;
 673             // assert head.item == null && head.next == null;
 674             if (count.getAndSet(0) == capacity)
 675                 notFull.signal();
 676         } finally {
 677             fullyUnlock();
 678         }
 679     }
 680 
 681     /**
 682      * @throws UnsupportedOperationException {@inheritDoc}
 683      * @throws ClassCastException            {@inheritDoc}
 684      * @throws NullPointerException          {@inheritDoc}
 685      * @throws IllegalArgumentException      {@inheritDoc}
 686      */
 687     public int drainTo(Collection<? super E> c) {
 688         return drainTo(c, Integer.MAX_VALUE);
 689     }
 690 
 691     /**
 692      * @throws UnsupportedOperationException {@inheritDoc}
 693      * @throws ClassCastException            {@inheritDoc}
 694      * @throws NullPointerException          {@inheritDoc}
 695      * @throws IllegalArgumentException      {@inheritDoc}
 696      */
 697     public int drainTo(Collection<? super E> c, int maxElements) {
 698         Objects.requireNonNull(c);
 699         if (c == this)
 700             throw new IllegalArgumentException();
 701         if (maxElements <= 0)
 702             return 0;
 703         boolean signalNotFull = false;
 704         final ReentrantLock takeLock = this.takeLock;
 705         takeLock.lock();
 706         try {
 707             int n = Math.min(maxElements, count.get());
 708             // count.get provides visibility to first n Nodes
 709             Node<E> h = head;
 710             int i = 0;
 711             try {
 712                 while (i < n) {
 713                     Node<E> p = h.next;
 714                     c.add(p.item);
 715                     p.item = null;
 716                     h.next = h;
 717                     h = p;
 718                     ++i;
 719                 }
 720                 return n;
 721             } finally {
 722                 // Restore invariants even if c.add() threw
 723                 if (i > 0) {
 724                     // assert h.item == null;
 725                     head = h;
 726                     signalNotFull = (count.getAndAdd(-i) == capacity);
 727                 }
 728             }
 729         } finally {
 730             takeLock.unlock();
 731             if (signalNotFull)
 732                 signalNotFull();
 733         }
 734     }
 735 
 736     /**
 737      * Used for any element traversal that is not entirely under lock.
 738      * Such traversals must handle both:
 739      * - dequeued nodes (p.next == p)
 740      * - (possibly multiple) interior removed nodes (p.item == null)
 741      */
 742     Node<E> succ(Node<E> p) {
 743         if (p == (p = p.next))
 744             p = head.next;
 745         return p;
 746     }
 747 
 748     /**
 749      * Returns an iterator over the elements in this queue in proper sequence.
 750      * The elements will be returned in order from first (head) to last (tail).
 751      *
 752      * <p>The returned iterator is
 753      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
 754      *
 755      * @return an iterator over the elements in this queue in proper sequence
 756      */
 757     public Iterator<E> iterator() {
 758         return new Itr();
 759     }
 760 
 761     /**
 762      * Weakly-consistent iterator.
 763      *
 764      * Lazily updated ancestor field provides expected O(1) remove(),
 765      * but still O(n) in the worst case, whenever the saved ancestor
 766      * is concurrently deleted.
 767      */
 768     private class Itr implements Iterator<E> {
 769         private Node<E> next;           // Node holding nextItem
 770         private E nextItem;             // next item to hand out
 771         private Node<E> lastRet;
 772         private Node<E> ancestor;       // Helps unlink lastRet on remove()
 773 
 774         Itr() {
 775             fullyLock();
 776             try {
 777                 if ((next = head.next) != null)
 778                     nextItem = next.item;
 779             } finally {
 780                 fullyUnlock();
 781             }
 782         }
 783 
 784         public boolean hasNext() {
 785             return next != null;
 786         }
 787 
 788         public E next() {
 789             Node<E> p;
 790             if ((p = next) == null)
 791                 throw new NoSuchElementException();
 792             lastRet = p;
 793             E x = nextItem;
 794             fullyLock();
 795             try {
 796                 E e = null;
 797                 for (p = p.next; p != null && (e = p.item) == null; )
 798                     p = succ(p);
 799                 next = p;
 800                 nextItem = e;
 801             } finally {
 802                 fullyUnlock();
 803             }
 804             return x;
 805         }
 806 
 807         public void forEachRemaining(Consumer<? super E> action) {
 808             // A variant of forEachFrom
 809             Objects.requireNonNull(action);
 810             Node<E> p;
 811             if ((p = next) == null) return;
 812             lastRet = p;
 813             next = null;
 814             final int batchSize = 64;
 815             Object[] es = null;
 816             int n, len = 1;
 817             do {
 818                 fullyLock();
 819                 try {
 820                     if (es == null) {
 821                         p = p.next;
 822                         for (Node<E> q = p; q != null; q = succ(q))
 823                             if (q.item != null && ++len == batchSize)
 824                                 break;
 825                         es = new Object[len];
 826                         es[0] = nextItem;
 827                         nextItem = null;
 828                         n = 1;
 829                     } else
 830                         n = 0;
 831                     for (; p != null && n < len; p = succ(p))
 832                         if ((es[n] = p.item) != null) {
 833                             lastRet = p;
 834                             n++;
 835                         }
 836                 } finally {
 837                     fullyUnlock();
 838                 }
 839                 for (int i = 0; i < n; i++) {
 840                     @SuppressWarnings("unchecked") E e = (E) es[i];
 841                     action.accept(e);
 842                 }
 843             } while (n > 0 && p != null);
 844         }
 845 
 846         public void remove() {
 847             Node<E> p = lastRet;
 848             if (p == null)
 849                 throw new IllegalStateException();
 850             lastRet = null;
 851             fullyLock();
 852             try {
 853                 if (p.item != null) {
 854                     if (ancestor == null)
 855                         ancestor = head;
 856                     ancestor = findPred(p, ancestor);
 857                     unlink(p, ancestor);
 858                 }
 859             } finally {
 860                 fullyUnlock();
 861             }
 862         }
 863     }
 864 
 865     /**
 866      * A customized variant of Spliterators.IteratorSpliterator.
 867      * Keep this class in sync with (very similar) LBDSpliterator.
 868      */
 869     private final class LBQSpliterator implements Spliterator<E> {
 870         static final int MAX_BATCH = 1 << 25;  // max batch array size;
 871         Node<E> current;    // current node; null until initialized
 872         int batch;          // batch size for splits
 873         boolean exhausted;  // true when no more nodes
 874         long est = size();  // size estimate
 875 
 876         LBQSpliterator() {}
 877 
 878         public long estimateSize() { return est; }
 879 
 880         public Spliterator<E> trySplit() {
 881             Node<E> h;
 882             if (!exhausted &&
 883                 ((h = current) != null || (h = head.next) != null)
 884                 && h.next != null) {
 885                 int n = batch = Math.min(batch + 1, MAX_BATCH);
 886                 Object[] a = new Object[n];
 887                 int i = 0;
 888                 Node<E> p = current;
 889                 fullyLock();
 890                 try {
 891                     if (p != null || (p = head.next) != null)
 892                         for (; p != null && i < n; p = succ(p))
 893                             if ((a[i] = p.item) != null)
 894                                 i++;
 895                 } finally {
 896                     fullyUnlock();
 897                 }
 898                 if ((current = p) == null) {
 899                     est = 0L;
 900                     exhausted = true;
 901                 }
 902                 else if ((est -= i) < 0L)
 903                     est = 0L;
 904                 if (i > 0)
 905                     return Spliterators.spliterator
 906                         (a, 0, i, (Spliterator.ORDERED |
 907                                    Spliterator.NONNULL |
 908                                    Spliterator.CONCURRENT));
 909             }
 910             return null;
 911         }
 912 
 913         public boolean tryAdvance(Consumer<? super E> action) {
 914             Objects.requireNonNull(action);
 915             if (!exhausted) {
 916                 E e = null;
 917                 fullyLock();
 918                 try {
 919                     Node<E> p;
 920                     if ((p = current) != null || (p = head.next) != null)
 921                         do {
 922                             e = p.item;
 923                             p = succ(p);
 924                         } while (e == null && p != null);
 925                     if ((current = p) == null)
 926                         exhausted = true;
 927                 } finally {
 928                     fullyUnlock();
 929                 }
 930                 if (e != null) {
 931                     action.accept(e);
 932                     return true;
 933                 }
 934             }
 935             return false;
 936         }
 937 
 938         public void forEachRemaining(Consumer<? super E> action) {
 939             Objects.requireNonNull(action);
 940             if (!exhausted) {
 941                 exhausted = true;
 942                 Node<E> p = current;
 943                 current = null;
 944                 forEachFrom(action, p);
 945             }
 946         }
 947 
 948         public int characteristics() {
 949             return (Spliterator.ORDERED |
 950                     Spliterator.NONNULL |
 951                     Spliterator.CONCURRENT);
 952         }
 953     }
 954 
 955     /**
 956      * Returns a {@link Spliterator} over the elements in this queue.
 957      *
 958      * <p>The returned spliterator is
 959      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
 960      *
 961      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
 962      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
 963      *
 964      * @implNote
 965      * The {@code Spliterator} implements {@code trySplit} to permit limited
 966      * parallelism.
 967      *
 968      * @return a {@code Spliterator} over the elements in this queue
 969      * @since 1.8
 970      */
 971     public Spliterator<E> spliterator() {
 972         return new LBQSpliterator();
 973     }
 974 
 975     /**
 976      * @throws NullPointerException {@inheritDoc}
 977      */
 978     public void forEach(Consumer<? super E> action) {
 979         Objects.requireNonNull(action);
 980         forEachFrom(action, null);
 981     }
 982 
 983     /**
 984      * Runs action on each element found during a traversal starting at p.
 985      * If p is null, traversal starts at head.
 986      */
 987     void forEachFrom(Consumer<? super E> action, Node<E> p) {
 988         // Extract batches of elements while holding the lock; then
 989         // run the action on the elements while not
 990         final int batchSize = 64;       // max number of elements per batch
 991         Object[] es = null;             // container for batch of elements
 992         int n, len = 0;
 993         do {
 994             fullyLock();
 995             try {
 996                 if (es == null) {
 997                     if (p == null) p = head.next;
 998                     for (Node<E> q = p; q != null; q = succ(q))
 999                         if (q.item != null && ++len == batchSize)
1000                             break;
1001                     es = new Object[len];
1002                 }
1003                 for (n = 0; p != null && n < len; p = succ(p))
1004                     if ((es[n] = p.item) != null)
1005                         n++;
1006             } finally {
1007                 fullyUnlock();
1008             }
1009             for (int i = 0; i < n; i++) {
1010                 @SuppressWarnings("unchecked") E e = (E) es[i];
1011                 action.accept(e);
1012             }
1013         } while (n > 0 && p != null);
1014     }
1015 
1016     /**
1017      * @throws NullPointerException {@inheritDoc}
1018      */
1019     public boolean removeIf(Predicate<? super E> filter) {
1020         Objects.requireNonNull(filter);
1021         return bulkRemove(filter);
1022     }
1023 
1024     /**
1025      * @throws NullPointerException {@inheritDoc}
1026      */
1027     public boolean removeAll(Collection<?> c) {
1028         Objects.requireNonNull(c);
1029         return bulkRemove(e -> c.contains(e));
1030     }
1031 
1032     /**
1033      * @throws NullPointerException {@inheritDoc}
1034      */
1035     public boolean retainAll(Collection<?> c) {
1036         Objects.requireNonNull(c);
1037         return bulkRemove(e -> !c.contains(e));
1038     }
1039 
1040     /**
1041      * Returns the predecessor of live node p, given a node that was
1042      * once a live ancestor of p (or head); allows unlinking of p.
1043      */
1044     Node<E> findPred(Node<E> p, Node<E> ancestor) {
1045         // assert p.item != null;
1046         if (ancestor.item == null)
1047             ancestor = head;
1048         // Fails with NPE if precondition not satisfied
1049         for (Node<E> q; (q = ancestor.next) != p; )
1050             ancestor = q;
1051         return ancestor;
1052     }
1053 
1054     /** Implementation of bulk remove methods. */
1055     @SuppressWarnings("unchecked")
1056     private boolean bulkRemove(Predicate<? super E> filter) {
1057         boolean removed = false;
1058         Node<E> p = null, ancestor = head;
1059         Node<E>[] nodes = null;
1060         int n, len = 0;
1061         do {
1062             // 1. Extract batch of up to 64 elements while holding the lock.
1063             long deathRow = 0;          // "bitset" of size 64
1064             fullyLock();
1065             try {
1066                 if (nodes == null) {
1067                     if (p == null) p = head.next;
1068                     for (Node<E> q = p; q != null; q = succ(q))
1069                         if (q.item != null && ++len == 64)
1070                             break;
1071                     nodes = (Node<E>[]) new Node<?>[len];
1072                 }
1073                 for (n = 0; p != null && n < len; p = succ(p))
1074                     nodes[n++] = p;
1075             } finally {
1076                 fullyUnlock();
1077             }
1078 
1079             // 2. Run the filter on the elements while lock is free.
1080             for (int i = 0; i < n; i++) {
1081                 final E e;
1082                 if ((e = nodes[i].item) != null && filter.test(e))
1083                     deathRow |= 1L << i;
1084             }
1085 
1086             // 3. Remove any filtered elements while holding the lock.
1087             if (deathRow != 0) {
1088                 fullyLock();
1089                 try {
1090                     for (int i = 0; i < n; i++) {
1091                         final Node<E> q;
1092                         if ((deathRow & (1L << i)) != 0L
1093                             && (q = nodes[i]).item != null) {
1094                             ancestor = findPred(q, ancestor);
1095                             unlink(q, ancestor);
1096                             removed = true;
1097                         }
1098                     }
1099                 } finally {
1100                     fullyUnlock();
1101                 }
1102             }
1103         } while (n > 0 && p != null);
1104         return removed;
1105     }
1106 
1107     /**
1108      * Saves this queue to a stream (that is, serializes it).
1109      *
1110      * @param s the stream
1111      * @throws java.io.IOException if an I/O error occurs
1112      * @serialData The capacity is emitted (int), followed by all of
1113      * its elements (each an {@code Object}) in the proper order,
1114      * followed by a null
1115      */
1116     private void writeObject(java.io.ObjectOutputStream s)
1117         throws java.io.IOException {
1118 
1119         fullyLock();
1120         try {
1121             // Write out any hidden stuff, plus capacity
1122             s.defaultWriteObject();
1123 
1124             // Write out all elements in the proper order.
1125             for (Node<E> p = head.next; p != null; p = p.next)
1126                 s.writeObject(p.item);
1127 
1128             // Use trailing null as sentinel
1129             s.writeObject(null);
1130         } finally {
1131             fullyUnlock();
1132         }
1133     }
1134 
1135     /**
1136      * Reconstitutes this queue from a stream (that is, deserializes it).
1137      * @param s the stream
1138      * @throws ClassNotFoundException if the class of a serialized object
1139      *         could not be found
1140      * @throws java.io.IOException if an I/O error occurs
1141      */
1142     private void readObject(java.io.ObjectInputStream s)
1143         throws java.io.IOException, ClassNotFoundException {
1144         // Read in capacity, and any hidden stuff
1145         s.defaultReadObject();
1146 
1147         count.set(0);
1148         last = head = new Node<E>(null);
1149 
1150         // Read in all elements and place in queue
1151         for (;;) {
1152             @SuppressWarnings("unchecked")
1153             E item = (E)s.readObject();
1154             if (item == null)
1155                 break;
1156             add(item);
1157         }
1158     }
1159 }