1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.atomic.AtomicInteger;
  39 import java.util.concurrent.locks.Condition;
  40 import java.util.concurrent.locks.ReentrantLock;
  41 import java.util.AbstractQueue;
  42 import java.util.Collection;
  43 import java.util.Iterator;
  44 import java.util.NoSuchElementException;
  45 
  46 /**
  47  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
  48  * linked nodes.
  49  * This queue orders elements FIFO (first-in-first-out).
  50  * The <em>head</em> of the queue is that element that has been on the
  51  * queue the longest time.
  52  * The <em>tail</em> of the queue is that element that has been on the
  53  * queue the shortest time. New elements
  54  * are inserted at the tail of the queue, and the queue retrieval
  55  * operations obtain elements at the head of the queue.
  56  * Linked queues typically have higher throughput than array-based queues but
  57  * less predictable performance in most concurrent applications.
  58  *
  59  * <p> The optional capacity bound constructor argument serves as a
  60  * way to prevent excessive queue expansion. The capacity, if unspecified,
  61  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
  62  * dynamically created upon each insertion unless this would bring the
  63  * queue above capacity.
  64  *
  65  * <p>This class and its iterator implement all of the
  66  * <em>optional</em> methods of the {@link Collection} and {@link
  67  * Iterator} interfaces.
  68  *
  69  * <p>This class is a member of the
  70  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  71  * Java Collections Framework</a>.
  72  *
  73  * @since 1.5
  74  * @author Doug Lea
  75  * @param <E> the type of elements held in this collection
  76  *
  77  */
  78 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  79         implements BlockingQueue<E>, java.io.Serializable {
  80     private static final long serialVersionUID = -6903933977591709194L;
  81 
  82     /*
  83      * A variant of the "two lock queue" algorithm.  The putLock gates
  84      * entry to put (and offer), and has an associated condition for
  85      * waiting puts.  Similarly for the takeLock.  The "count" field
  86      * that they both rely on is maintained as an atomic to avoid
  87      * needing to get both locks in most cases. Also, to minimize need
  88      * for puts to get takeLock and vice-versa, cascading notifies are
  89      * used. When a put notices that it has enabled at least one take,
  90      * it signals taker. That taker in turn signals others if more
  91      * items have been entered since the signal. And symmetrically for
  92      * takes signalling puts. Operations such as remove(Object) and
  93      * iterators acquire both locks.
  94      *
  95      * Visibility between writers and readers is provided as follows:
  96      *
  97      * Whenever an element is enqueued, the putLock is acquired and
  98      * count updated.  A subsequent reader guarantees visibility to the
  99      * enqueued Node by either acquiring the putLock (via fullyLock)
 100      * or by acquiring the takeLock, and then reading n = count.get();
 101      * this gives visibility to the first n items.
 102      *
 103      * To implement weakly consistent iterators, it appears we need to
 104      * keep all Nodes GC-reachable from a predecessor dequeued Node.
 105      * That would cause two problems:
 106      * - allow a rogue Iterator to cause unbounded memory retention
 107      * - cause cross-generational linking of old Nodes to new Nodes if
 108      *   a Node was tenured while live, which generational GCs have a
 109      *   hard time dealing with, causing repeated major collections.
 110      * However, only non-deleted Nodes need to be reachable from
 111      * dequeued Nodes, and reachability does not necessarily have to
 112      * be of the kind understood by the GC.  We use the trick of
 113      * linking a Node that has just been dequeued to itself.  Such a
 114      * self-link implicitly means to advance to head.next.
 115      */
 116 
 117     /**
 118      * Linked list node class
 119      */
 120     static class Node<E> {
 121         E item;
 122 
 123         /**
 124          * One of:
 125          * - the real successor Node
 126          * - this Node, meaning the successor is head.next
 127          * - null, meaning there is no successor (this is the last node)
 128          */
 129         Node<E> next;
 130 
 131         Node(E x) { item = x; }
 132     }
 133 
 134     /** The capacity bound, or Integer.MAX_VALUE if none */
 135     private final int capacity;
 136 
 137     /** Current number of elements */
 138     private final AtomicInteger count = new AtomicInteger(0);
 139 
 140     /**
 141      * Head of linked list.
 142      * Invariant: head.item == null
 143      */
 144     private transient Node<E> head;
 145 
 146     /**
 147      * Tail of linked list.
 148      * Invariant: last.next == null
 149      */
 150     private transient Node<E> last;
 151 
 152     /** Lock held by take, poll, etc */
 153     private final ReentrantLock takeLock = new ReentrantLock();
 154 
 155     /** Wait queue for waiting takes */
 156     private final Condition notEmpty = takeLock.newCondition();
 157 
 158     /** Lock held by put, offer, etc */
 159     private final ReentrantLock putLock = new ReentrantLock();
 160 
 161     /** Wait queue for waiting puts */
 162     private final Condition notFull = putLock.newCondition();
 163 
 164     /**
 165      * Signals a waiting take. Called only from put/offer (which do not
 166      * otherwise ordinarily lock takeLock.)
 167      */
 168     private void signalNotEmpty() {
 169         final ReentrantLock takeLock = this.takeLock;
 170         takeLock.lock();
 171         try {
 172             notEmpty.signal();
 173         } finally {
 174             takeLock.unlock();
 175         }
 176     }
 177 
 178     /**
 179      * Signals a waiting put. Called only from take/poll.
 180      */
 181     private void signalNotFull() {
 182         final ReentrantLock putLock = this.putLock;
 183         putLock.lock();
 184         try {
 185             notFull.signal();
 186         } finally {
 187             putLock.unlock();
 188         }
 189     }
 190 
 191     /**
 192      * Links node at end of queue.
 193      *
 194      * @param node the node
 195      */
 196     private void enqueue(Node<E> node) {
 197         // assert putLock.isHeldByCurrentThread();
 198         // assert last.next == null;
 199         last = last.next = node;
 200     }
 201 
 202     /**
 203      * Removes a node from head of queue.
 204      *
 205      * @return the node
 206      */
 207     private E dequeue() {
 208         // assert takeLock.isHeldByCurrentThread();
 209         // assert head.item == null;
 210         Node<E> h = head;
 211         Node<E> first = h.next;
 212         h.next = h; // help GC
 213         head = first;
 214         E x = first.item;
 215         first.item = null;
 216         return x;
 217     }
 218 
 219     /**
 220      * Lock to prevent both puts and takes.
 221      */
 222     void fullyLock() {
 223         putLock.lock();
 224         takeLock.lock();
 225     }
 226 
 227     /**
 228      * Unlock to allow both puts and takes.
 229      */
 230     void fullyUnlock() {
 231         takeLock.unlock();
 232         putLock.unlock();
 233     }
 234 
 235 //     /**
 236 //      * Tells whether both locks are held by current thread.
 237 //      */
 238 //     boolean isFullyLocked() {
 239 //         return (putLock.isHeldByCurrentThread() &&
 240 //                 takeLock.isHeldByCurrentThread());
 241 //     }
 242 
 243     /**
 244      * Creates a {@code LinkedBlockingQueue} with a capacity of
 245      * {@link Integer#MAX_VALUE}.
 246      */
 247     public LinkedBlockingQueue() {
 248         this(Integer.MAX_VALUE);
 249     }
 250 
 251     /**
 252      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
 253      *
 254      * @param capacity the capacity of this queue
 255      * @throws IllegalArgumentException if {@code capacity} is not greater
 256      *         than zero
 257      */
 258     public LinkedBlockingQueue(int capacity) {
 259         if (capacity <= 0) throw new IllegalArgumentException();
 260         this.capacity = capacity;
 261         last = head = new Node<E>(null);
 262     }
 263 
 264     /**
 265      * Creates a {@code LinkedBlockingQueue} with a capacity of
 266      * {@link Integer#MAX_VALUE}, initially containing the elements of the
 267      * given collection,
 268      * added in traversal order of the collection's iterator.
 269      *
 270      * @param c the collection of elements to initially contain
 271      * @throws NullPointerException if the specified collection or any
 272      *         of its elements are null
 273      */
 274     public LinkedBlockingQueue(Collection<? extends E> c) {
 275         this(Integer.MAX_VALUE);
 276         final ReentrantLock putLock = this.putLock;
 277         putLock.lock(); // Never contended, but necessary for visibility
 278         try {
 279             int n = 0;
 280             for (E e : c) {
 281                 if (e == null)
 282                     throw new NullPointerException();
 283                 if (n == capacity)
 284                     throw new IllegalStateException("Queue full");
 285                 enqueue(new Node<E>(e));
 286                 ++n;
 287             }
 288             count.set(n);
 289         } finally {
 290             putLock.unlock();
 291         }
 292     }
 293 
 294 
 295     // this doc comment is overridden to remove the reference to collections
 296     // greater in size than Integer.MAX_VALUE
 297     /**
 298      * Returns the number of elements in this queue.
 299      *
 300      * @return the number of elements in this queue
 301      */
 302     public int size() {
 303         return count.get();
 304     }
 305 
 306     // this doc comment is a modified copy of the inherited doc comment,
 307     // without the reference to unlimited queues.
 308     /**
 309      * Returns the number of additional elements that this queue can ideally
 310      * (in the absence of memory or resource constraints) accept without
 311      * blocking. This is always equal to the initial capacity of this queue
 312      * less the current {@code size} of this queue.
 313      *
 314      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
 315      * an element will succeed by inspecting {@code remainingCapacity}
 316      * because it may be the case that another thread is about to
 317      * insert or remove an element.
 318      */
 319     public int remainingCapacity() {
 320         return capacity - count.get();
 321     }
 322 
 323     /**
 324      * Inserts the specified element at the tail of this queue, waiting if
 325      * necessary for space to become available.
 326      *
 327      * @throws InterruptedException {@inheritDoc}
 328      * @throws NullPointerException {@inheritDoc}
 329      */
 330     public void put(E e) throws InterruptedException {
 331         if (e == null) throw new NullPointerException();
 332         // Note: convention in all put/take/etc is to preset local var
 333         // holding count negative to indicate failure unless set.
 334         int c = -1;
 335         Node<E> node = new Node<E>(e);
 336         final ReentrantLock putLock = this.putLock;
 337         final AtomicInteger count = this.count;
 338         putLock.lockInterruptibly();
 339         try {
 340             /*
 341              * Note that count is used in wait guard even though it is
 342              * not protected by lock. This works because count can
 343              * only decrease at this point (all other puts are shut
 344              * out by lock), and we (or some other waiting put) are
 345              * signalled if it ever changes from capacity. Similarly
 346              * for all other uses of count in other wait guards.
 347              */
 348             while (count.get() == capacity) {
 349                 notFull.await();
 350             }
 351             enqueue(node);
 352             c = count.getAndIncrement();
 353             if (c + 1 < capacity)
 354                 notFull.signal();
 355         } finally {
 356             putLock.unlock();
 357         }
 358         if (c == 0)
 359             signalNotEmpty();
 360     }
 361 
 362     /**
 363      * Inserts the specified element at the tail of this queue, waiting if
 364      * necessary up to the specified wait time for space to become available.
 365      *
 366      * @return {@code true} if successful, or {@code false} if
 367      *         the specified waiting time elapses before space is available.
 368      * @throws InterruptedException {@inheritDoc}
 369      * @throws NullPointerException {@inheritDoc}
 370      */
 371     public boolean offer(E e, long timeout, TimeUnit unit)
 372         throws InterruptedException {
 373 
 374         if (e == null) throw new NullPointerException();
 375         long nanos = unit.toNanos(timeout);
 376         int c = -1;
 377         final ReentrantLock putLock = this.putLock;
 378         final AtomicInteger count = this.count;
 379         putLock.lockInterruptibly();
 380         try {
 381             while (count.get() == capacity) {
 382                 if (nanos <= 0)
 383                     return false;
 384                 nanos = notFull.awaitNanos(nanos);
 385             }
 386             enqueue(new Node<E>(e));
 387             c = count.getAndIncrement();
 388             if (c + 1 < capacity)
 389                 notFull.signal();
 390         } finally {
 391             putLock.unlock();
 392         }
 393         if (c == 0)
 394             signalNotEmpty();
 395         return true;
 396     }
 397 
 398     /**
 399      * Inserts the specified element at the tail of this queue if it is
 400      * possible to do so immediately without exceeding the queue's capacity,
 401      * returning {@code true} upon success and {@code false} if this queue
 402      * is full.
 403      * When using a capacity-restricted queue, this method is generally
 404      * preferable to method {@link BlockingQueue#add add}, which can fail to
 405      * insert an element only by throwing an exception.
 406      *
 407      * @throws NullPointerException if the specified element is null
 408      */
 409     public boolean offer(E e) {
 410         if (e == null) throw new NullPointerException();
 411         final AtomicInteger count = this.count;
 412         if (count.get() == capacity)
 413             return false;
 414         int c = -1;
 415         Node<E> node = new Node<E>(e);
 416         final ReentrantLock putLock = this.putLock;
 417         putLock.lock();
 418         try {
 419             if (count.get() < capacity) {
 420                 enqueue(node);
 421                 c = count.getAndIncrement();
 422                 if (c + 1 < capacity)
 423                     notFull.signal();
 424             }
 425         } finally {
 426             putLock.unlock();
 427         }
 428         if (c == 0)
 429             signalNotEmpty();
 430         return c >= 0;
 431     }
 432 
 433 
 434     public E take() throws InterruptedException {
 435         E x;
 436         int c = -1;
 437         final AtomicInteger count = this.count;
 438         final ReentrantLock takeLock = this.takeLock;
 439         takeLock.lockInterruptibly();
 440         try {
 441             while (count.get() == 0) {
 442                 notEmpty.await();
 443             }
 444             x = dequeue();
 445             c = count.getAndDecrement();
 446             if (c > 1)
 447                 notEmpty.signal();
 448         } finally {
 449             takeLock.unlock();
 450         }
 451         if (c == capacity)
 452             signalNotFull();
 453         return x;
 454     }
 455 
 456     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 457         E x = null;
 458         int c = -1;
 459         long nanos = unit.toNanos(timeout);
 460         final AtomicInteger count = this.count;
 461         final ReentrantLock takeLock = this.takeLock;
 462         takeLock.lockInterruptibly();
 463         try {
 464             while (count.get() == 0) {
 465                 if (nanos <= 0)
 466                     return null;
 467                 nanos = notEmpty.awaitNanos(nanos);
 468             }
 469             x = dequeue();
 470             c = count.getAndDecrement();
 471             if (c > 1)
 472                 notEmpty.signal();
 473         } finally {
 474             takeLock.unlock();
 475         }
 476         if (c == capacity)
 477             signalNotFull();
 478         return x;
 479     }
 480 
 481     public E poll() {
 482         final AtomicInteger count = this.count;
 483         if (count.get() == 0)
 484             return null;
 485         E x = null;
 486         int c = -1;
 487         final ReentrantLock takeLock = this.takeLock;
 488         takeLock.lock();
 489         try {
 490             if (count.get() > 0) {
 491                 x = dequeue();
 492                 c = count.getAndDecrement();
 493                 if (c > 1)
 494                     notEmpty.signal();
 495             }
 496         } finally {
 497             takeLock.unlock();
 498         }
 499         if (c == capacity)
 500             signalNotFull();
 501         return x;
 502     }
 503 
 504     public E peek() {
 505         if (count.get() == 0)
 506             return null;
 507         final ReentrantLock takeLock = this.takeLock;
 508         takeLock.lock();
 509         try {
 510             Node<E> first = head.next;
 511             if (first == null)
 512                 return null;
 513             else
 514                 return first.item;
 515         } finally {
 516             takeLock.unlock();
 517         }
 518     }
 519 
 520     /**
 521      * Unlinks interior Node p with predecessor trail.
 522      */
 523     void unlink(Node<E> p, Node<E> trail) {
 524         // assert isFullyLocked();
 525         // p.next is not changed, to allow iterators that are
 526         // traversing p to maintain their weak-consistency guarantee.
 527         p.item = null;
 528         trail.next = p.next;
 529         if (last == p)
 530             last = trail;
 531         if (count.getAndDecrement() == capacity)
 532             notFull.signal();
 533     }
 534 
 535     /**
 536      * Removes a single instance of the specified element from this queue,
 537      * if it is present.  More formally, removes an element {@code e} such
 538      * that {@code o.equals(e)}, if this queue contains one or more such
 539      * elements.
 540      * Returns {@code true} if this queue contained the specified element
 541      * (or equivalently, if this queue changed as a result of the call).
 542      *
 543      * @param o element to be removed from this queue, if present
 544      * @return {@code true} if this queue changed as a result of the call
 545      */
 546     public boolean remove(Object o) {
 547         if (o == null) return false;
 548         fullyLock();
 549         try {
 550             for (Node<E> trail = head, p = trail.next;
 551                  p != null;
 552                  trail = p, p = p.next) {
 553                 if (o.equals(p.item)) {
 554                     unlink(p, trail);
 555                     return true;
 556                 }
 557             }
 558             return false;
 559         } finally {
 560             fullyUnlock();
 561         }
 562     }
 563 
 564     /**
 565      * Returns {@code true} if this queue contains the specified element.
 566      * More formally, returns {@code true} if and only if this queue contains
 567      * at least one element {@code e} such that {@code o.equals(e)}.
 568      *
 569      * @param o object to be checked for containment in this queue
 570      * @return {@code true} if this queue contains the specified element
 571      */
 572     public boolean contains(Object o) {
 573         if (o == null) return false;
 574         fullyLock();
 575         try {
 576             for (Node<E> p = head.next; p != null; p = p.next)
 577                 if (o.equals(p.item))
 578                     return true;
 579             return false;
 580         } finally {
 581             fullyUnlock();
 582         }
 583     }
 584 
 585     /**
 586      * Returns an array containing all of the elements in this queue, in
 587      * proper sequence.
 588      *
 589      * <p>The returned array will be "safe" in that no references to it are
 590      * maintained by this queue.  (In other words, this method must allocate
 591      * a new array).  The caller is thus free to modify the returned array.
 592      *
 593      * <p>This method acts as bridge between array-based and collection-based
 594      * APIs.
 595      *
 596      * @return an array containing all of the elements in this queue
 597      */
 598     public Object[] toArray() {
 599         fullyLock();
 600         try {
 601             int size = count.get();
 602             Object[] a = new Object[size];
 603             int k = 0;
 604             for (Node<E> p = head.next; p != null; p = p.next)
 605                 a[k++] = p.item;
 606             return a;
 607         } finally {
 608             fullyUnlock();
 609         }
 610     }
 611 
 612     /**
 613      * Returns an array containing all of the elements in this queue, in
 614      * proper sequence; the runtime type of the returned array is that of
 615      * the specified array.  If the queue fits in the specified array, it
 616      * is returned therein.  Otherwise, a new array is allocated with the
 617      * runtime type of the specified array and the size of this queue.
 618      *
 619      * <p>If this queue fits in the specified array with room to spare
 620      * (i.e., the array has more elements than this queue), the element in
 621      * the array immediately following the end of the queue is set to
 622      * {@code null}.
 623      *
 624      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 625      * array-based and collection-based APIs.  Further, this method allows
 626      * precise control over the runtime type of the output array, and may,
 627      * under certain circumstances, be used to save allocation costs.
 628      *
 629      * <p>Suppose {@code x} is a queue known to contain only strings.
 630      * The following code can be used to dump the queue into a newly
 631      * allocated array of {@code String}:
 632      *
 633      * <pre>
 634      *     String[] y = x.toArray(new String[0]);</pre>
 635      *
 636      * Note that {@code toArray(new Object[0])} is identical in function to
 637      * {@code toArray()}.
 638      *
 639      * @param a the array into which the elements of the queue are to
 640      *          be stored, if it is big enough; otherwise, a new array of the
 641      *          same runtime type is allocated for this purpose
 642      * @return an array containing all of the elements in this queue
 643      * @throws ArrayStoreException if the runtime type of the specified array
 644      *         is not a supertype of the runtime type of every element in
 645      *         this queue
 646      * @throws NullPointerException if the specified array is null
 647      */
 648     @SuppressWarnings("unchecked")
 649     public <T> T[] toArray(T[] a) {
 650         fullyLock();
 651         try {
 652             int size = count.get();
 653             if (a.length < size)
 654                 a = (T[])java.lang.reflect.Array.newInstance
 655                     (a.getClass().getComponentType(), size);
 656 
 657             int k = 0;
 658             for (Node<E> p = head.next; p != null; p = p.next)
 659                 a[k++] = (T)p.item;
 660             if (a.length > k)
 661                 a[k] = null;
 662             return a;
 663         } finally {
 664             fullyUnlock();
 665         }
 666     }
 667 
 668     public String toString() {
 669         fullyLock();
 670         try {
 671             Node<E> p = head.next;
 672             if (p == null)
 673                 return "[]";
 674 
 675             StringBuilder sb = new StringBuilder();
 676             sb.append('[');
 677             for (;;) {
 678                 E e = p.item;
 679                 sb.append(e == this ? "(this Collection)" : e);
 680                 p = p.next;
 681                 if (p == null)
 682                     return sb.append(']').toString();
 683                 sb.append(',').append(' ');
 684             }
 685         } finally {
 686             fullyUnlock();
 687         }
 688     }
 689 
 690     /**
 691      * Atomically removes all of the elements from this queue.
 692      * The queue will be empty after this call returns.
 693      */
 694     public void clear() {
 695         fullyLock();
 696         try {
 697             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
 698                 h.next = h;
 699                 p.item = null;
 700             }
 701             head = last;
 702             // assert head.item == null && head.next == null;
 703             if (count.getAndSet(0) == capacity)
 704                 notFull.signal();
 705         } finally {
 706             fullyUnlock();
 707         }
 708     }
 709 
 710     /**
 711      * @throws UnsupportedOperationException {@inheritDoc}
 712      * @throws ClassCastException            {@inheritDoc}
 713      * @throws NullPointerException          {@inheritDoc}
 714      * @throws IllegalArgumentException      {@inheritDoc}
 715      */
 716     public int drainTo(Collection<? super E> c) {
 717         return drainTo(c, Integer.MAX_VALUE);
 718     }
 719 
 720     /**
 721      * @throws UnsupportedOperationException {@inheritDoc}
 722      * @throws ClassCastException            {@inheritDoc}
 723      * @throws NullPointerException          {@inheritDoc}
 724      * @throws IllegalArgumentException      {@inheritDoc}
 725      */
 726     public int drainTo(Collection<? super E> c, int maxElements) {
 727         if (c == null)
 728             throw new NullPointerException();
 729         if (c == this)
 730             throw new IllegalArgumentException();
 731         if (maxElements <= 0)
 732             return 0;
 733         boolean signalNotFull = false;
 734         final ReentrantLock takeLock = this.takeLock;
 735         takeLock.lock();
 736         try {
 737             int n = Math.min(maxElements, count.get());
 738             // count.get provides visibility to first n Nodes
 739             Node<E> h = head;
 740             int i = 0;
 741             try {
 742                 while (i < n) {
 743                     Node<E> p = h.next;
 744                     c.add(p.item);
 745                     p.item = null;
 746                     h.next = h;
 747                     h = p;
 748                     ++i;
 749                 }
 750                 return n;
 751             } finally {
 752                 // Restore invariants even if c.add() threw
 753                 if (i > 0) {
 754                     // assert h.item == null;
 755                     head = h;
 756                     signalNotFull = (count.getAndAdd(-i) == capacity);
 757                 }
 758             }
 759         } finally {
 760             takeLock.unlock();
 761             if (signalNotFull)
 762                 signalNotFull();
 763         }
 764     }
 765 
 766     /**
 767      * Returns an iterator over the elements in this queue in proper sequence.
 768      * The elements will be returned in order from first (head) to last (tail).
 769      *
 770      * <p>The returned iterator is a "weakly consistent" iterator that
 771      * will never throw {@link java.util.ConcurrentModificationException
 772      * ConcurrentModificationException}, and guarantees to traverse
 773      * elements as they existed upon construction of the iterator, and
 774      * may (but is not guaranteed to) reflect any modifications
 775      * subsequent to construction.
 776      *
 777      * @return an iterator over the elements in this queue in proper sequence
 778      */
 779     public Iterator<E> iterator() {
 780       return new Itr();
 781     }
 782 
 783     private class Itr implements Iterator<E> {
 784         /*
 785          * Basic weakly-consistent iterator.  At all times hold the next
 786          * item to hand out so that if hasNext() reports true, we will
 787          * still have it to return even if lost race with a take etc.
 788          */
 789         private Node<E> current;
 790         private Node<E> lastRet;
 791         private E currentElement;
 792 
 793         Itr() {
 794             fullyLock();
 795             try {
 796                 current = head.next;
 797                 if (current != null)
 798                     currentElement = current.item;
 799             } finally {
 800                 fullyUnlock();
 801             }
 802         }
 803 
 804         public boolean hasNext() {
 805             return current != null;
 806         }
 807 
 808         /**
 809          * Returns the next live successor of p, or null if no such.
 810          *
 811          * Unlike other traversal methods, iterators need to handle both:
 812          * - dequeued nodes (p.next == p)
 813          * - (possibly multiple) interior removed nodes (p.item == null)
 814          */
 815         private Node<E> nextNode(Node<E> p) {
 816             for (;;) {
 817                 Node<E> s = p.next;
 818                 if (s == p)
 819                     return head.next;
 820                 if (s == null || s.item != null)
 821                     return s;
 822                 p = s;
 823             }
 824         }
 825 
 826         public E next() {
 827             fullyLock();
 828             try {
 829                 if (current == null)
 830                     throw new NoSuchElementException();
 831                 E x = currentElement;
 832                 lastRet = current;
 833                 current = nextNode(current);
 834                 currentElement = (current == null) ? null : current.item;
 835                 return x;
 836             } finally {
 837                 fullyUnlock();
 838             }
 839         }
 840 
 841         public void remove() {
 842             if (lastRet == null)
 843                 throw new IllegalStateException();
 844             fullyLock();
 845             try {
 846                 Node<E> node = lastRet;
 847                 lastRet = null;
 848                 for (Node<E> trail = head, p = trail.next;
 849                      p != null;
 850                      trail = p, p = p.next) {
 851                     if (p == node) {
 852                         unlink(p, trail);
 853                         break;
 854                     }
 855                 }
 856             } finally {
 857                 fullyUnlock();
 858             }
 859         }
 860     }
 861 
 862     /**
 863      * Save the state to a stream (that is, serialize it).
 864      *
 865      * @serialData The capacity is emitted (int), followed by all of
 866      * its elements (each an {@code Object}) in the proper order,
 867      * followed by a null
 868      * @param s the stream
 869      */
 870     private void writeObject(java.io.ObjectOutputStream s)
 871         throws java.io.IOException {
 872 
 873         fullyLock();
 874         try {
 875             // Write out any hidden stuff, plus capacity
 876             s.defaultWriteObject();
 877 
 878             // Write out all elements in the proper order.
 879             for (Node<E> p = head.next; p != null; p = p.next)
 880                 s.writeObject(p.item);
 881 
 882             // Use trailing null as sentinel
 883             s.writeObject(null);
 884         } finally {
 885             fullyUnlock();
 886         }
 887     }
 888 
 889     /**
 890      * Reconstitute this queue instance from a stream (that is,
 891      * deserialize it).
 892      *
 893      * @param s the stream
 894      */
 895     private void readObject(java.io.ObjectInputStream s)
 896         throws java.io.IOException, ClassNotFoundException {
 897         // Read in capacity, and any hidden stuff
 898         s.defaultReadObject();
 899 
 900         count.set(0);
 901         last = head = new Node<E>(null);
 902 
 903         // Read in all elements and place in queue
 904         for (;;) {
 905             @SuppressWarnings("unchecked")
 906             E item = (E)s.readObject();
 907             if (item == null)
 908                 break;
 909             add(item);
 910         }
 911     }
 912 }