1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.concurrent.atomic.AtomicInteger;
  39 import java.util.concurrent.locks.Condition;
  40 import java.util.concurrent.locks.ReentrantLock;
  41 import java.util.AbstractQueue;
  42 import java.util.Collection;
  43 import java.util.Iterator;
  44 import java.util.NoSuchElementException;
  45 
  46 /**
  47  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
  48  * linked nodes.
  49  * This queue orders elements FIFO (first-in-first-out).
  50  * The <em>head</em> of the queue is that element that has been on the
  51  * queue the longest time.
  52  * The <em>tail</em> of the queue is that element that has been on the
  53  * queue the shortest time. New elements
  54  * are inserted at the tail of the queue, and the queue retrieval
  55  * operations obtain elements at the head of the queue.
  56  * Linked queues typically have higher throughput than array-based queues but
  57  * less predictable performance in most concurrent applications.
  58  *
  59  * <p> The optional capacity bound constructor argument serves as a
  60  * way to prevent excessive queue expansion. The capacity, if unspecified,
  61  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
  62  * dynamically created upon each insertion unless this would bring the
  63  * queue above capacity.
  64  *
  65  * <p>This class and its iterator implement all of the
  66  * <em>optional</em> methods of the {@link Collection} and {@link
  67  * Iterator} interfaces.
  68  *
  69  * <p>This class is a member of the
  70  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  71  * Java Collections Framework</a>.
  72  *
  73  * @since 1.5
  74  * @author Doug Lea
  75  * @param <E> the type of elements held in this collection
  76  *
  77  */
  78 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  79         implements BlockingQueue<E>, java.io.Serializable {
  80     private static final long serialVersionUID = -6903933977591709194L;
  81 
  82     /*
  83      * A variant of the "two lock queue" algorithm.  The putLock gates
  84      * entry to put (and offer), and has an associated condition for
  85      * waiting puts.  Similarly for the takeLock.  The "count" field
  86      * that they both rely on is maintained as an atomic to avoid
  87      * needing to get both locks in most cases. Also, to minimize need
  88      * for puts to get takeLock and vice-versa, cascading notifies are
  89      * used. When a put notices that it has enabled at least one take,
  90      * it signals taker. That taker in turn signals others if more
  91      * items have been entered since the signal. And symmetrically for
  92      * takes signalling puts. Operations such as remove(Object) and
  93      * iterators acquire both locks.
  94      *
  95      * Visibility between writers and readers is provided as follows:
  96      *
  97      * Whenever an element is enqueued, the putLock is acquired and
  98      * count updated.  A subsequent reader guarantees visibility to the
  99      * enqueued Node by either acquiring the putLock (via fullyLock)
 100      * or by acquiring the takeLock, and then reading n = count.get();
 101      * this gives visibility to the first n items.
 102      *
 103      * To implement weakly consistent iterators, it appears we need to
 104      * keep all Nodes GC-reachable from a predecessor dequeued Node.
 105      * That would cause two problems:
 106      * - allow a rogue Iterator to cause unbounded memory retention
 107      * - cause cross-generational linking of old Nodes to new Nodes if
 108      *   a Node was tenured while live, which generational GCs have a
 109      *   hard time dealing with, causing repeated major collections.
 110      * However, only non-deleted Nodes need to be reachable from
 111      * dequeued Nodes, and reachability does not necessarily have to
 112      * be of the kind understood by the GC.  We use the trick of
 113      * linking a Node that has just been dequeued to itself.  Such a
 114      * self-link implicitly means to advance to head.next.
 115      */
 116 
 117     /**
 118      * Linked list node class
 119      */
 120     static class Node<E> {
 121         E item;
 122 
 123         /**
 124          * One of:
 125          * - the real successor Node
 126          * - this Node, meaning the successor is head.next
 127          * - null, meaning there is no successor (this is the last node)
 128          */
 129         Node<E> next;
 130 
 131         Node(E x) { item = x; }
 132     }
 133 
 134     /** The capacity bound, or Integer.MAX_VALUE if none */
 135     private final int capacity;
 136 
 137     /** Current number of elements */
 138     private final AtomicInteger count = new AtomicInteger(0);
 139 
 140     /**
 141      * Head of linked list.
 142      * Invariant: head.item == null
 143      */
 144     private transient Node<E> head;
 145 
 146     /**
 147      * Tail of linked list.
 148      * Invariant: last.next == null
 149      */
 150     private transient Node<E> last;
 151 
 152     /** Lock held by take, poll, etc */
 153     private final ReentrantLock takeLock = new ReentrantLock();
 154 
 155     /** Wait queue for waiting takes */
 156     private final Condition notEmpty = takeLock.newCondition();
 157 
 158     /** Lock held by put, offer, etc */
 159     private final ReentrantLock putLock = new ReentrantLock();
 160 
 161     /** Wait queue for waiting puts */
 162     private final Condition notFull = putLock.newCondition();
 163 
 164     /**
 165      * Signals a waiting take. Called only from put/offer (which do not
 166      * otherwise ordinarily lock takeLock.)
 167      */
 168     private void signalNotEmpty() {
 169         final ReentrantLock takeLock = this.takeLock;
 170         takeLock.lock();
 171         try {
 172             notEmpty.signal();
 173         } finally {
 174             takeLock.unlock();
 175         }
 176     }
 177 
 178     /**
 179      * Signals a waiting put. Called only from take/poll.
 180      */
 181     private void signalNotFull() {
 182         final ReentrantLock putLock = this.putLock;
 183         putLock.lock();
 184         try {
 185             notFull.signal();
 186         } finally {
 187             putLock.unlock();
 188         }
 189     }
 190 
 191     /**
 192      * Creates a node and links it at end of queue.
 193      *
 194      * @param x the item
 195      */
 196     private void enqueue(E x) {
 197         // assert putLock.isHeldByCurrentThread();
 198         // assert last.next == null;
 199         last = last.next = new Node<E>(x);
 200     }
 201 
 202     /**
 203      * Removes a node from head of queue.
 204      *
 205      * @return the node
 206      */
 207     private E dequeue() {
 208         // assert takeLock.isHeldByCurrentThread();
 209         // assert head.item == null;
 210         Node<E> h = head;
 211         Node<E> first = h.next;
 212         h.next = h; // help GC
 213         head = first;
 214         E x = first.item;
 215         first.item = null;
 216         return x;
 217     }
 218 
 219     /**
 220      * Lock to prevent both puts and takes.
 221      */
 222     void fullyLock() {
 223         putLock.lock();
 224         takeLock.lock();
 225     }
 226 
 227     /**
 228      * Unlock to allow both puts and takes.
 229      */
 230     void fullyUnlock() {
 231         takeLock.unlock();
 232         putLock.unlock();
 233     }
 234 
 235 //     /**
 236 //      * Tells whether both locks are held by current thread.
 237 //      */
 238 //     boolean isFullyLocked() {
 239 //         return (putLock.isHeldByCurrentThread() &&
 240 //                 takeLock.isHeldByCurrentThread());
 241 //     }
 242 
 243     /**
 244      * Creates a {@code LinkedBlockingQueue} with a capacity of
 245      * {@link Integer#MAX_VALUE}.
 246      */
 247     public LinkedBlockingQueue() {
 248         this(Integer.MAX_VALUE);
 249     }
 250 
 251     /**
 252      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
 253      *
 254      * @param capacity the capacity of this queue
 255      * @throws IllegalArgumentException if {@code capacity} is not greater
 256      *         than zero
 257      */
 258     public LinkedBlockingQueue(int capacity) {
 259         if (capacity <= 0) throw new IllegalArgumentException();
 260         this.capacity = capacity;
 261         last = head = new Node<E>(null);
 262     }
 263 
 264     /**
 265      * Creates a {@code LinkedBlockingQueue} with a capacity of
 266      * {@link Integer#MAX_VALUE}, initially containing the elements of the
 267      * given collection,
 268      * added in traversal order of the collection's iterator.
 269      *
 270      * @param c the collection of elements to initially contain
 271      * @throws NullPointerException if the specified collection or any
 272      *         of its elements are null
 273      */
 274     public LinkedBlockingQueue(Collection<? extends E> c) {
 275         this(Integer.MAX_VALUE);
 276         final ReentrantLock putLock = this.putLock;
 277         putLock.lock(); // Never contended, but necessary for visibility
 278         try {
 279             int n = 0;
 280             for (E e : c) {
 281                 if (e == null)
 282                     throw new NullPointerException();
 283                 if (n == capacity)
 284                     throw new IllegalStateException("Queue full");
 285                 enqueue(e);
 286                 ++n;
 287             }
 288             count.set(n);
 289         } finally {
 290             putLock.unlock();
 291         }
 292     }
 293 
 294 
 295     // this doc comment is overridden to remove the reference to collections
 296     // greater in size than Integer.MAX_VALUE
 297     /**
 298      * Returns the number of elements in this queue.
 299      *
 300      * @return the number of elements in this queue
 301      */
 302     public int size() {
 303         return count.get();
 304     }
 305 
 306     // this doc comment is a modified copy of the inherited doc comment,
 307     // without the reference to unlimited queues.
 308     /**
 309      * Returns the number of additional elements that this queue can ideally
 310      * (in the absence of memory or resource constraints) accept without
 311      * blocking. This is always equal to the initial capacity of this queue
 312      * less the current {@code size} of this queue.
 313      *
 314      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
 315      * an element will succeed by inspecting {@code remainingCapacity}
 316      * because it may be the case that another thread is about to
 317      * insert or remove an element.
 318      */
 319     public int remainingCapacity() {
 320         return capacity - count.get();
 321     }
 322 
 323     /**
 324      * Inserts the specified element at the tail of this queue, waiting if
 325      * necessary for space to become available.
 326      *
 327      * @throws InterruptedException {@inheritDoc}
 328      * @throws NullPointerException {@inheritDoc}
 329      */
 330     public void put(E e) throws InterruptedException {
 331         if (e == null) throw new NullPointerException();
 332         // Note: convention in all put/take/etc is to preset local var
 333         // holding count negative to indicate failure unless set.
 334         int c = -1;

 335         final ReentrantLock putLock = this.putLock;
 336         final AtomicInteger count = this.count;
 337         putLock.lockInterruptibly();
 338         try {
 339             /*
 340              * Note that count is used in wait guard even though it is
 341              * not protected by lock. This works because count can
 342              * only decrease at this point (all other puts are shut
 343              * out by lock), and we (or some other waiting put) are
 344              * signalled if it ever changes from capacity. Similarly
 345              * for all other uses of count in other wait guards.
 346              */
 347             while (count.get() == capacity) {
 348                 notFull.await();
 349             }
 350             enqueue(e);
 351             c = count.getAndIncrement();
 352             if (c + 1 < capacity)
 353                 notFull.signal();
 354         } finally {
 355             putLock.unlock();
 356         }
 357         if (c == 0)
 358             signalNotEmpty();
 359     }
 360 
 361     /**
 362      * Inserts the specified element at the tail of this queue, waiting if
 363      * necessary up to the specified wait time for space to become available.
 364      *
 365      * @return {@code true} if successful, or {@code false} if
 366      *         the specified waiting time elapses before space is available.
 367      * @throws InterruptedException {@inheritDoc}
 368      * @throws NullPointerException {@inheritDoc}
 369      */
 370     public boolean offer(E e, long timeout, TimeUnit unit)
 371         throws InterruptedException {
 372 
 373         if (e == null) throw new NullPointerException();
 374         long nanos = unit.toNanos(timeout);
 375         int c = -1;
 376         final ReentrantLock putLock = this.putLock;
 377         final AtomicInteger count = this.count;
 378         putLock.lockInterruptibly();
 379         try {
 380             while (count.get() == capacity) {
 381                 if (nanos <= 0)
 382                     return false;
 383                 nanos = notFull.awaitNanos(nanos);
 384             }
 385             enqueue(e);
 386             c = count.getAndIncrement();
 387             if (c + 1 < capacity)
 388                 notFull.signal();
 389         } finally {
 390             putLock.unlock();
 391         }
 392         if (c == 0)
 393             signalNotEmpty();
 394         return true;
 395     }
 396 
 397     /**
 398      * Inserts the specified element at the tail of this queue if it is
 399      * possible to do so immediately without exceeding the queue's capacity,
 400      * returning {@code true} upon success and {@code false} if this queue
 401      * is full.
 402      * When using a capacity-restricted queue, this method is generally
 403      * preferable to method {@link BlockingQueue#add add}, which can fail to
 404      * insert an element only by throwing an exception.
 405      *
 406      * @throws NullPointerException if the specified element is null
 407      */
 408     public boolean offer(E e) {
 409         if (e == null) throw new NullPointerException();
 410         final AtomicInteger count = this.count;
 411         if (count.get() == capacity)
 412             return false;
 413         int c = -1;

 414         final ReentrantLock putLock = this.putLock;
 415         putLock.lock();
 416         try {
 417             if (count.get() < capacity) {
 418                 enqueue(e);
 419                 c = count.getAndIncrement();
 420                 if (c + 1 < capacity)
 421                     notFull.signal();
 422             }
 423         } finally {
 424             putLock.unlock();
 425         }
 426         if (c == 0)
 427             signalNotEmpty();
 428         return c >= 0;
 429     }
 430 
 431 
 432     public E take() throws InterruptedException {
 433         E x;
 434         int c = -1;
 435         final AtomicInteger count = this.count;
 436         final ReentrantLock takeLock = this.takeLock;
 437         takeLock.lockInterruptibly();
 438         try {
 439             while (count.get() == 0) {
 440                 notEmpty.await();
 441             }
 442             x = dequeue();
 443             c = count.getAndDecrement();
 444             if (c > 1)
 445                 notEmpty.signal();
 446         } finally {
 447             takeLock.unlock();
 448         }
 449         if (c == capacity)
 450             signalNotFull();
 451         return x;
 452     }
 453 
 454     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 455         E x = null;
 456         int c = -1;
 457         long nanos = unit.toNanos(timeout);
 458         final AtomicInteger count = this.count;
 459         final ReentrantLock takeLock = this.takeLock;
 460         takeLock.lockInterruptibly();
 461         try {
 462             while (count.get() == 0) {
 463                 if (nanos <= 0)
 464                     return null;
 465                 nanos = notEmpty.awaitNanos(nanos);
 466             }
 467             x = dequeue();
 468             c = count.getAndDecrement();
 469             if (c > 1)
 470                 notEmpty.signal();
 471         } finally {
 472             takeLock.unlock();
 473         }
 474         if (c == capacity)
 475             signalNotFull();
 476         return x;
 477     }
 478 
 479     public E poll() {
 480         final AtomicInteger count = this.count;
 481         if (count.get() == 0)
 482             return null;
 483         E x = null;
 484         int c = -1;
 485         final ReentrantLock takeLock = this.takeLock;
 486         takeLock.lock();
 487         try {
 488             if (count.get() > 0) {
 489                 x = dequeue();
 490                 c = count.getAndDecrement();
 491                 if (c > 1)
 492                     notEmpty.signal();
 493             }
 494         } finally {
 495             takeLock.unlock();
 496         }
 497         if (c == capacity)
 498             signalNotFull();
 499         return x;
 500     }
 501 
 502     public E peek() {
 503         if (count.get() == 0)
 504             return null;
 505         final ReentrantLock takeLock = this.takeLock;
 506         takeLock.lock();
 507         try {
 508             Node<E> first = head.next;
 509             if (first == null)
 510                 return null;
 511             else
 512                 return first.item;
 513         } finally {
 514             takeLock.unlock();
 515         }
 516     }
 517 
 518     /**
 519      * Unlinks interior Node p with predecessor trail.
 520      */
 521     void unlink(Node<E> p, Node<E> trail) {
 522         // assert isFullyLocked();
 523         // p.next is not changed, to allow iterators that are
 524         // traversing p to maintain their weak-consistency guarantee.
 525         p.item = null;
 526         trail.next = p.next;
 527         if (last == p)
 528             last = trail;
 529         if (count.getAndDecrement() == capacity)
 530             notFull.signal();
 531     }
 532 
 533     /**
 534      * Removes a single instance of the specified element from this queue,
 535      * if it is present.  More formally, removes an element {@code e} such
 536      * that {@code o.equals(e)}, if this queue contains one or more such
 537      * elements.
 538      * Returns {@code true} if this queue contained the specified element
 539      * (or equivalently, if this queue changed as a result of the call).
 540      *
 541      * @param o element to be removed from this queue, if present
 542      * @return {@code true} if this queue changed as a result of the call
 543      */
 544     public boolean remove(Object o) {
 545         if (o == null) return false;
 546         fullyLock();
 547         try {
 548             for (Node<E> trail = head, p = trail.next;
 549                  p != null;
 550                  trail = p, p = p.next) {
 551                 if (o.equals(p.item)) {
 552                     unlink(p, trail);
 553                     return true;
 554                 }
 555             }
 556             return false;
 557         } finally {
 558             fullyUnlock();
 559         }
 560     }
 561 
 562     /**





















 563      * Returns an array containing all of the elements in this queue, in
 564      * proper sequence.
 565      *
 566      * <p>The returned array will be "safe" in that no references to it are
 567      * maintained by this queue.  (In other words, this method must allocate
 568      * a new array).  The caller is thus free to modify the returned array.
 569      *
 570      * <p>This method acts as bridge between array-based and collection-based
 571      * APIs.
 572      *
 573      * @return an array containing all of the elements in this queue
 574      */
 575     public Object[] toArray() {
 576         fullyLock();
 577         try {
 578             int size = count.get();
 579             Object[] a = new Object[size];
 580             int k = 0;
 581             for (Node<E> p = head.next; p != null; p = p.next)
 582                 a[k++] = p.item;
 583             return a;
 584         } finally {
 585             fullyUnlock();
 586         }
 587     }
 588 
 589     /**
 590      * Returns an array containing all of the elements in this queue, in
 591      * proper sequence; the runtime type of the returned array is that of
 592      * the specified array.  If the queue fits in the specified array, it
 593      * is returned therein.  Otherwise, a new array is allocated with the
 594      * runtime type of the specified array and the size of this queue.
 595      *
 596      * <p>If this queue fits in the specified array with room to spare
 597      * (i.e., the array has more elements than this queue), the element in
 598      * the array immediately following the end of the queue is set to
 599      * {@code null}.
 600      *
 601      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 602      * array-based and collection-based APIs.  Further, this method allows
 603      * precise control over the runtime type of the output array, and may,
 604      * under certain circumstances, be used to save allocation costs.
 605      *
 606      * <p>Suppose {@code x} is a queue known to contain only strings.
 607      * The following code can be used to dump the queue into a newly
 608      * allocated array of {@code String}:
 609      *
 610      * <pre>
 611      *     String[] y = x.toArray(new String[0]);</pre>
 612      *
 613      * Note that {@code toArray(new Object[0])} is identical in function to
 614      * {@code toArray()}.
 615      *
 616      * @param a the array into which the elements of the queue are to
 617      *          be stored, if it is big enough; otherwise, a new array of the
 618      *          same runtime type is allocated for this purpose
 619      * @return an array containing all of the elements in this queue
 620      * @throws ArrayStoreException if the runtime type of the specified array
 621      *         is not a supertype of the runtime type of every element in
 622      *         this queue
 623      * @throws NullPointerException if the specified array is null
 624      */
 625     @SuppressWarnings("unchecked")
 626     public <T> T[] toArray(T[] a) {
 627         fullyLock();
 628         try {
 629             int size = count.get();
 630             if (a.length < size)
 631                 a = (T[])java.lang.reflect.Array.newInstance
 632                     (a.getClass().getComponentType(), size);
 633 
 634             int k = 0;
 635             for (Node<E> p = head.next; p != null; p = p.next)
 636                 a[k++] = (T)p.item;
 637             if (a.length > k)
 638                 a[k] = null;
 639             return a;
 640         } finally {
 641             fullyUnlock();
 642         }
 643     }
 644 
 645     public String toString() {
 646         fullyLock();
 647         try {
 648             return super.toString();













 649         } finally {
 650             fullyUnlock();
 651         }
 652     }
 653 
 654     /**
 655      * Atomically removes all of the elements from this queue.
 656      * The queue will be empty after this call returns.
 657      */
 658     public void clear() {
 659         fullyLock();
 660         try {
 661             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
 662                 h.next = h;
 663                 p.item = null;
 664             }
 665             head = last;
 666             // assert head.item == null && head.next == null;
 667             if (count.getAndSet(0) == capacity)
 668                 notFull.signal();
 669         } finally {
 670             fullyUnlock();
 671         }
 672     }
 673 
 674     /**
 675      * @throws UnsupportedOperationException {@inheritDoc}
 676      * @throws ClassCastException            {@inheritDoc}
 677      * @throws NullPointerException          {@inheritDoc}
 678      * @throws IllegalArgumentException      {@inheritDoc}
 679      */
 680     public int drainTo(Collection<? super E> c) {
 681         return drainTo(c, Integer.MAX_VALUE);
 682     }
 683 
 684     /**
 685      * @throws UnsupportedOperationException {@inheritDoc}
 686      * @throws ClassCastException            {@inheritDoc}
 687      * @throws NullPointerException          {@inheritDoc}
 688      * @throws IllegalArgumentException      {@inheritDoc}
 689      */
 690     public int drainTo(Collection<? super E> c, int maxElements) {
 691         if (c == null)
 692             throw new NullPointerException();
 693         if (c == this)
 694             throw new IllegalArgumentException();
 695         boolean signalNotFull = false;
 696         final ReentrantLock takeLock = this.takeLock;
 697         takeLock.lock();
 698         try {
 699             int n = Math.min(maxElements, count.get());
 700             // count.get provides visibility to first n Nodes
 701             Node<E> h = head;
 702             int i = 0;
 703             try {
 704                 while (i < n) {
 705                     Node<E> p = h.next;
 706                     c.add(p.item);
 707                     p.item = null;
 708                     h.next = h;
 709                     h = p;
 710                     ++i;
 711                 }
 712                 return n;
 713             } finally {
 714                 // Restore invariants even if c.add() threw
 715                 if (i > 0) {
 716                     // assert h.item == null;
 717                     head = h;
 718                     signalNotFull = (count.getAndAdd(-i) == capacity);
 719                 }
 720             }
 721         } finally {
 722             takeLock.unlock();
 723             if (signalNotFull)
 724                 signalNotFull();
 725         }
 726     }
 727 
 728     /**
 729      * Returns an iterator over the elements in this queue in proper sequence.
 730      * The returned {@code Iterator} is a "weakly consistent" iterator that


 731      * will never throw {@link java.util.ConcurrentModificationException
 732      * ConcurrentModificationException},
 733      * and guarantees to traverse elements as they existed upon
 734      * construction of the iterator, and may (but is not guaranteed to)
 735      * reflect any modifications subsequent to construction.
 736      *
 737      * @return an iterator over the elements in this queue in proper sequence
 738      */
 739     public Iterator<E> iterator() {
 740       return new Itr();
 741     }
 742 
 743     private class Itr implements Iterator<E> {
 744         /*
 745          * Basic weakly-consistent iterator.  At all times hold the next
 746          * item to hand out so that if hasNext() reports true, we will
 747          * still have it to return even if lost race with a take etc.
 748          */
 749         private Node<E> current;
 750         private Node<E> lastRet;
 751         private E currentElement;
 752 
 753         Itr() {
 754             fullyLock();
 755             try {
 756                 current = head.next;
 757                 if (current != null)
 758                     currentElement = current.item;
 759             } finally {
 760                 fullyUnlock();
 761             }
 762         }
 763 
 764         public boolean hasNext() {
 765             return current != null;
 766         }
 767 
 768         /**
 769          * Returns the next live successor of p, or null if no such.
 770          *
 771          * Unlike other traversal methods, iterators need to handle both:
 772          * - dequeued nodes (p.next == p)
 773          * - (possibly multiple) interior removed nodes (p.item == null)
 774          */
 775         private Node<E> nextNode(Node<E> p) {
 776             for (;;) {
 777                 Node<E> s = p.next;
 778                 if (s == p)
 779                     return head.next;
 780                 if (s == null || s.item != null)
 781                     return s;
 782                 p = s;
 783             }
 784         }
 785 
 786         public E next() {
 787             fullyLock();
 788             try {
 789                 if (current == null)
 790                     throw new NoSuchElementException();
 791                 E x = currentElement;
 792                 lastRet = current;
 793                 current = nextNode(current);
 794                 currentElement = (current == null) ? null : current.item;
 795                 return x;
 796             } finally {
 797                 fullyUnlock();
 798             }
 799         }
 800 
 801         public void remove() {
 802             if (lastRet == null)
 803                 throw new IllegalStateException();
 804             fullyLock();
 805             try {
 806                 Node<E> node = lastRet;
 807                 lastRet = null;
 808                 for (Node<E> trail = head, p = trail.next;
 809                      p != null;
 810                      trail = p, p = p.next) {
 811                     if (p == node) {
 812                         unlink(p, trail);
 813                         break;
 814                     }
 815                 }
 816             } finally {
 817                 fullyUnlock();
 818             }
 819         }
 820     }
 821 
 822     /**
 823      * Save the state to a stream (that is, serialize it).
 824      *
 825      * @serialData The capacity is emitted (int), followed by all of
 826      * its elements (each an {@code Object}) in the proper order,
 827      * followed by a null
 828      * @param s the stream
 829      */
 830     private void writeObject(java.io.ObjectOutputStream s)
 831         throws java.io.IOException {
 832 
 833         fullyLock();
 834         try {
 835             // Write out any hidden stuff, plus capacity
 836             s.defaultWriteObject();
 837 
 838             // Write out all elements in the proper order.
 839             for (Node<E> p = head.next; p != null; p = p.next)
 840                 s.writeObject(p.item);
 841 
 842             // Use trailing null as sentinel
 843             s.writeObject(null);
 844         } finally {
 845             fullyUnlock();
 846         }
 847     }
 848 
 849     /**
 850      * Reconstitute this queue instance from a stream (that is,
 851      * deserialize it).
 852      *
 853      * @param s the stream
 854      */
 855     private void readObject(java.io.ObjectInputStream s)
 856         throws java.io.IOException, ClassNotFoundException {
 857         // Read in capacity, and any hidden stuff
 858         s.defaultReadObject();
 859 
 860         count.set(0);
 861         last = head = new Node<E>(null);
 862 
 863         // Read in all elements and place in queue
 864         for (;;) {
 865             @SuppressWarnings("unchecked")
 866             E item = (E)s.readObject();
 867             if (item == null)
 868                 break;
 869             add(item);
 870         }
 871     }
 872 }
--- EOF ---