1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.AbstractQueue;
  39 import java.util.Arrays;
  40 import java.util.Collection;
  41 import java.util.Iterator;
  42 import java.util.NoSuchElementException;
  43 import java.util.Queue;
  44 import java.util.Spliterator;
  45 import java.util.Spliterators;
  46 import java.util.concurrent.locks.LockSupport;
  47 import java.util.function.Consumer;
  48 
  49 /**
  50  * An unbounded {@link TransferQueue} based on linked nodes.
  51  * This queue orders elements FIFO (first-in-first-out) with respect
  52  * to any given producer.  The <em>head</em> of the queue is that
  53  * element that has been on the queue the longest time for some
  54  * producer.  The <em>tail</em> of the queue is that element that has
  55  * been on the queue the shortest time for some producer.
  56  *
  57  * <p>Beware that, unlike in most collections, the {@code size} method
  58  * is <em>NOT</em> a constant-time operation. Because of the
  59  * asynchronous nature of these queues, determining the current number
  60  * of elements requires a traversal of the elements, and so may report
  61  * inaccurate results if this collection is modified during traversal.
  62  * Additionally, the bulk operations {@code addAll},
  63  * {@code removeAll}, {@code retainAll}, {@code containsAll},
  64  * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
  65  * to be performed atomically. For example, an iterator operating
  66  * concurrently with an {@code addAll} operation might view only some
  67  * of the added elements.
  68  *
  69  * <p>This class and its iterator implement all of the
  70  * <em>optional</em> methods of the {@link Collection} and {@link
  71  * Iterator} interfaces.
  72  *
  73  * <p>Memory consistency effects: As with other concurrent
  74  * collections, actions in a thread prior to placing an object into a
  75  * {@code LinkedTransferQueue}
  76  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
  77  * actions subsequent to the access or removal of that element from
  78  * the {@code LinkedTransferQueue} in another thread.
  79  *
  80  * <p>This class is a member of the
  81  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  82  * Java Collections Framework</a>.
  83  *
  84  * @since 1.7
  85  * @author Doug Lea
  86  * @param <E> the type of elements held in this queue
  87  */
  88 public class LinkedTransferQueue<E> extends AbstractQueue<E>
  89     implements TransferQueue<E>, java.io.Serializable {
  90     private static final long serialVersionUID = -3223113410248163686L;
  91 
  92     /*
  93      * *** Overview of Dual Queues with Slack ***
  94      *
  95      * Dual Queues, introduced by Scherer and Scott
  96      * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
  97      * (linked) queues in which nodes may represent either data or
  98      * requests.  When a thread tries to enqueue a data node, but
  99      * encounters a request node, it instead "matches" and removes it;
 100      * and vice versa for enqueuing requests. Blocking Dual Queues
 101      * arrange that threads enqueuing unmatched requests block until
 102      * other threads provide the match. Dual Synchronous Queues (see
 103      * Scherer, Lea, & Scott
 104      * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
 105      * additionally arrange that threads enqueuing unmatched data also
 106      * block.  Dual Transfer Queues support all of these modes, as
 107      * dictated by callers.
 108      *
 109      * A FIFO dual queue may be implemented using a variation of the
 110      * Michael & Scott (M&S) lock-free queue algorithm
 111      * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf).
 112      * It maintains two pointer fields, "head", pointing to a
 113      * (matched) node that in turn points to the first actual
 114      * (unmatched) queue node (or null if empty); and "tail" that
 115      * points to the last node on the queue (or again null if
 116      * empty). For example, here is a possible queue with four data
 117      * elements:
 118      *
 119      *  head                tail
 120      *    |                   |
 121      *    v                   v
 122      *    M -> U -> U -> U -> U
 123      *
 124      * The M&S queue algorithm is known to be prone to scalability and
 125      * overhead limitations when maintaining (via CAS) these head and
 126      * tail pointers. This has led to the development of
 127      * contention-reducing variants such as elimination arrays (see
 128      * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
 129      * optimistic back pointers (see Ladan-Mozes & Shavit
 130      * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
 131      * However, the nature of dual queues enables a simpler tactic for
 132      * improving M&S-style implementations when dual-ness is needed.
 133      *
 134      * In a dual queue, each node must atomically maintain its match
 135      * status. While there are other possible variants, we implement
 136      * this here as: for a data-mode node, matching entails CASing an
 137      * "item" field from a non-null data value to null upon match, and
 138      * vice-versa for request nodes, CASing from null to a data
 139      * value. (Note that the linearization properties of this style of
 140      * queue are easy to verify -- elements are made available by
 141      * linking, and unavailable by matching.) Compared to plain M&S
 142      * queues, this property of dual queues requires one additional
 143      * successful atomic operation per enq/deq pair. But it also
 144      * enables lower cost variants of queue maintenance mechanics. (A
 145      * variation of this idea applies even for non-dual queues that
 146      * support deletion of interior elements, such as
 147      * j.u.c.ConcurrentLinkedQueue.)
 148      *
 149      * Once a node is matched, its match status can never again
 150      * change.  We may thus arrange that the linked list of them
 151      * contain a prefix of zero or more matched nodes, followed by a
 152      * suffix of zero or more unmatched nodes. (Note that we allow
 153      * both the prefix and suffix to be zero length, which in turn
 154      * means that we do not use a dummy header.)  If we were not
 155      * concerned with either time or space efficiency, we could
 156      * correctly perform enqueue and dequeue operations by traversing
 157      * from a pointer to the initial node; CASing the item of the
 158      * first unmatched node on match and CASing the next field of the
 159      * trailing node on appends. (Plus some special-casing when
 160      * initially empty).  While this would be a terrible idea in
 161      * itself, it does have the benefit of not requiring ANY atomic
 162      * updates on head/tail fields.
 163      *
 164      * We introduce here an approach that lies between the extremes of
 165      * never versus always updating queue (head and tail) pointers.
 166      * This offers a tradeoff between sometimes requiring extra
 167      * traversal steps to locate the first and/or last unmatched
 168      * nodes, versus the reduced overhead and contention of fewer
 169      * updates to queue pointers. For example, a possible snapshot of
 170      * a queue is:
 171      *
 172      *  head           tail
 173      *    |              |
 174      *    v              v
 175      *    M -> M -> U -> U -> U -> U
 176      *
 177      * The best value for this "slack" (the targeted maximum distance
 178      * between the value of "head" and the first unmatched node, and
 179      * similarly for "tail") is an empirical matter. We have found
 180      * that using very small constants in the range of 1-3 work best
 181      * over a range of platforms. Larger values introduce increasing
 182      * costs of cache misses and risks of long traversal chains, while
 183      * smaller values increase CAS contention and overhead.
 184      *
 185      * Dual queues with slack differ from plain M&S dual queues by
 186      * virtue of only sometimes updating head or tail pointers when
 187      * matching, appending, or even traversing nodes; in order to
 188      * maintain a targeted slack.  The idea of "sometimes" may be
 189      * operationalized in several ways. The simplest is to use a
 190      * per-operation counter incremented on each traversal step, and
 191      * to try (via CAS) to update the associated queue pointer
 192      * whenever the count exceeds a threshold. Another, that requires
 193      * more overhead, is to use random number generators to update
 194      * with a given probability per traversal step.
 195      *
 196      * In any strategy along these lines, because CASes updating
 197      * fields may fail, the actual slack may exceed targeted
 198      * slack. However, they may be retried at any time to maintain
 199      * targets.  Even when using very small slack values, this
 200      * approach works well for dual queues because it allows all
 201      * operations up to the point of matching or appending an item
 202      * (hence potentially allowing progress by another thread) to be
 203      * read-only, thus not introducing any further contention. As
 204      * described below, we implement this by performing slack
 205      * maintenance retries only after these points.
 206      *
 207      * As an accompaniment to such techniques, traversal overhead can
 208      * be further reduced without increasing contention of head
 209      * pointer updates: Threads may sometimes shortcut the "next" link
 210      * path from the current "head" node to be closer to the currently
 211      * known first unmatched node, and similarly for tail. Again, this
 212      * may be triggered with using thresholds or randomization.
 213      *
 214      * These ideas must be further extended to avoid unbounded amounts
 215      * of costly-to-reclaim garbage caused by the sequential "next"
 216      * links of nodes starting at old forgotten head nodes: As first
 217      * described in detail by Boehm
 218      * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a GC
 219      * delays noticing that any arbitrarily old node has become
 220      * garbage, all newer dead nodes will also be unreclaimed.
 221      * (Similar issues arise in non-GC environments.)  To cope with
 222      * this in our implementation, upon CASing to advance the head
 223      * pointer, we set the "next" link of the previous head to point
 224      * only to itself; thus limiting the length of connected dead lists.
 225      * (We also take similar care to wipe out possibly garbage
 226      * retaining values held in other Node fields.)  However, doing so
 227      * adds some further complexity to traversal: If any "next"
 228      * pointer links to itself, it indicates that the current thread
 229      * has lagged behind a head-update, and so the traversal must
 230      * continue from the "head".  Traversals trying to find the
 231      * current tail starting from "tail" may also encounter
 232      * self-links, in which case they also continue at "head".
 233      *
 234      * It is tempting in slack-based scheme to not even use CAS for
 235      * updates (similarly to Ladan-Mozes & Shavit). However, this
 236      * cannot be done for head updates under the above link-forgetting
 237      * mechanics because an update may leave head at a detached node.
 238      * And while direct writes are possible for tail updates, they
 239      * increase the risk of long retraversals, and hence long garbage
 240      * chains, which can be much more costly than is worthwhile
 241      * considering that the cost difference of performing a CAS vs
 242      * write is smaller when they are not triggered on each operation
 243      * (especially considering that writes and CASes equally require
 244      * additional GC bookkeeping ("write barriers") that are sometimes
 245      * more costly than the writes themselves because of contention).
 246      *
 247      * *** Overview of implementation ***
 248      *
 249      * We use a threshold-based approach to updates, with a slack
 250      * threshold of two -- that is, we update head/tail when the
 251      * current pointer appears to be two or more steps away from the
 252      * first/last node. The slack value is hard-wired: a path greater
 253      * than one is naturally implemented by checking equality of
 254      * traversal pointers except when the list has only one element,
 255      * in which case we keep slack threshold at one. Avoiding tracking
 256      * explicit counts across method calls slightly simplifies an
 257      * already-messy implementation. Using randomization would
 258      * probably work better if there were a low-quality dirt-cheap
 259      * per-thread one available, but even ThreadLocalRandom is too
 260      * heavy for these purposes.
 261      *
 262      * With such a small slack threshold value, it is not worthwhile
 263      * to augment this with path short-circuiting (i.e., unsplicing
 264      * interior nodes) except in the case of cancellation/removal (see
 265      * below).
 266      *
 267      * We allow both the head and tail fields to be null before any
 268      * nodes are enqueued; initializing upon first append.  This
 269      * simplifies some other logic, as well as providing more
 270      * efficient explicit control paths instead of letting JVMs insert
 271      * implicit NullPointerExceptions when they are null.  While not
 272      * currently fully implemented, we also leave open the possibility
 273      * of re-nulling these fields when empty (which is complicated to
 274      * arrange, for little benefit.)
 275      *
 276      * All enqueue/dequeue operations are handled by the single method
 277      * "xfer" with parameters indicating whether to act as some form
 278      * of offer, put, poll, take, or transfer (each possibly with
 279      * timeout). The relative complexity of using one monolithic
 280      * method outweighs the code bulk and maintenance problems of
 281      * using separate methods for each case.
 282      *
 283      * Operation consists of up to three phases. The first is
 284      * implemented within method xfer, the second in tryAppend, and
 285      * the third in method awaitMatch.
 286      *
 287      * 1. Try to match an existing node
 288      *
 289      *    Starting at head, skip already-matched nodes until finding
 290      *    an unmatched node of opposite mode, if one exists, in which
 291      *    case matching it and returning, also if necessary updating
 292      *    head to one past the matched node (or the node itself if the
 293      *    list has no other unmatched nodes). If the CAS misses, then
 294      *    a loop retries advancing head by two steps until either
 295      *    success or the slack is at most two. By requiring that each
 296      *    attempt advances head by two (if applicable), we ensure that
 297      *    the slack does not grow without bound. Traversals also check
 298      *    if the initial head is now off-list, in which case they
 299      *    start at the new head.
 300      *
 301      *    If no candidates are found and the call was untimed
 302      *    poll/offer, (argument "how" is NOW) return.
 303      *
 304      * 2. Try to append a new node (method tryAppend)
 305      *
 306      *    Starting at current tail pointer, find the actual last node
 307      *    and try to append a new node (or if head was null, establish
 308      *    the first node). Nodes can be appended only if their
 309      *    predecessors are either already matched or are of the same
 310      *    mode. If we detect otherwise, then a new node with opposite
 311      *    mode must have been appended during traversal, so we must
 312      *    restart at phase 1. The traversal and update steps are
 313      *    otherwise similar to phase 1: Retrying upon CAS misses and
 314      *    checking for staleness.  In particular, if a self-link is
 315      *    encountered, then we can safely jump to a node on the list
 316      *    by continuing the traversal at current head.
 317      *
 318      *    On successful append, if the call was ASYNC, return.
 319      *
 320      * 3. Await match or cancellation (method awaitMatch)
 321      *
 322      *    Wait for another thread to match node; instead cancelling if
 323      *    the current thread was interrupted or the wait timed out. On
 324      *    multiprocessors, we use front-of-queue spinning: If a node
 325      *    appears to be the first unmatched node in the queue, it
 326      *    spins a bit before blocking. In either case, before blocking
 327      *    it tries to unsplice any nodes between the current "head"
 328      *    and the first unmatched node.
 329      *
 330      *    Front-of-queue spinning vastly improves performance of
 331      *    heavily contended queues. And so long as it is relatively
 332      *    brief and "quiet", spinning does not much impact performance
 333      *    of less-contended queues.  During spins threads check their
 334      *    interrupt status and generate a thread-local random number
 335      *    to decide to occasionally perform a Thread.yield. While
 336      *    yield has underdefined specs, we assume that it might help,
 337      *    and will not hurt, in limiting impact of spinning on busy
 338      *    systems.  We also use smaller (1/2) spins for nodes that are
 339      *    not known to be front but whose predecessors have not
 340      *    blocked -- these "chained" spins avoid artifacts of
 341      *    front-of-queue rules which otherwise lead to alternating
 342      *    nodes spinning vs blocking. Further, front threads that
 343      *    represent phase changes (from data to request node or vice
 344      *    versa) compared to their predecessors receive additional
 345      *    chained spins, reflecting longer paths typically required to
 346      *    unblock threads during phase changes.
 347      *
 348      *
 349      * ** Unlinking removed interior nodes **
 350      *
 351      * In addition to minimizing garbage retention via self-linking
 352      * described above, we also unlink removed interior nodes. These
 353      * may arise due to timed out or interrupted waits, or calls to
 354      * remove(x) or Iterator.remove.  Normally, given a node that was
 355      * at one time known to be the predecessor of some node s that is
 356      * to be removed, we can unsplice s by CASing the next field of
 357      * its predecessor if it still points to s (otherwise s must
 358      * already have been removed or is now offlist). But there are two
 359      * situations in which we cannot guarantee to make node s
 360      * unreachable in this way: (1) If s is the trailing node of list
 361      * (i.e., with null next), then it is pinned as the target node
 362      * for appends, so can only be removed later after other nodes are
 363      * appended. (2) We cannot necessarily unlink s given a
 364      * predecessor node that is matched (including the case of being
 365      * cancelled): the predecessor may already be unspliced, in which
 366      * case some previous reachable node may still point to s.
 367      * (For further explanation see Herlihy & Shavit "The Art of
 368      * Multiprocessor Programming" chapter 9).  Although, in both
 369      * cases, we can rule out the need for further action if either s
 370      * or its predecessor are (or can be made to be) at, or fall off
 371      * from, the head of list.
 372      *
 373      * Without taking these into account, it would be possible for an
 374      * unbounded number of supposedly removed nodes to remain
 375      * reachable.  Situations leading to such buildup are uncommon but
 376      * can occur in practice; for example when a series of short timed
 377      * calls to poll repeatedly time out but never otherwise fall off
 378      * the list because of an untimed call to take at the front of the
 379      * queue.
 380      *
 381      * When these cases arise, rather than always retraversing the
 382      * entire list to find an actual predecessor to unlink (which
 383      * won't help for case (1) anyway), we record a conservative
 384      * estimate of possible unsplice failures (in "sweepVotes").
 385      * We trigger a full sweep when the estimate exceeds a threshold
 386      * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
 387      * removal failures to tolerate before sweeping through, unlinking
 388      * cancelled nodes that were not unlinked upon initial removal.
 389      * We perform sweeps by the thread hitting threshold (rather than
 390      * background threads or by spreading work to other threads)
 391      * because in the main contexts in which removal occurs, the
 392      * caller is already timed-out, cancelled, or performing a
 393      * potentially O(n) operation (e.g. remove(x)), none of which are
 394      * time-critical enough to warrant the overhead that alternatives
 395      * would impose on other threads.
 396      *
 397      * Because the sweepVotes estimate is conservative, and because
 398      * nodes become unlinked "naturally" as they fall off the head of
 399      * the queue, and because we allow votes to accumulate even while
 400      * sweeps are in progress, there are typically significantly fewer
 401      * such nodes than estimated.  Choice of a threshold value
 402      * balances the likelihood of wasted effort and contention, versus
 403      * providing a worst-case bound on retention of interior nodes in
 404      * quiescent queues. The value defined below was chosen
 405      * empirically to balance these under various timeout scenarios.
 406      *
 407      * Note that we cannot self-link unlinked interior nodes during
 408      * sweeps. However, the associated garbage chains terminate when
 409      * some successor ultimately falls off the head of the list and is
 410      * self-linked.
 411      */
 412 
 413     /** True if on multiprocessor */
 414     private static final boolean MP =
 415         Runtime.getRuntime().availableProcessors() > 1;
 416 
 417     /**
 418      * The number of times to spin (with randomly interspersed calls
 419      * to Thread.yield) on multiprocessor before blocking when a node
 420      * is apparently the first waiter in the queue.  See above for
 421      * explanation. Must be a power of two. The value is empirically
 422      * derived -- it works pretty well across a variety of processors,
 423      * numbers of CPUs, and OSes.
 424      */
 425     private static final int FRONT_SPINS   = 1 << 7;
 426 
 427     /**
 428      * The number of times to spin before blocking when a node is
 429      * preceded by another node that is apparently spinning.  Also
 430      * serves as an increment to FRONT_SPINS on phase changes, and as
 431      * base average frequency for yielding during spins. Must be a
 432      * power of two.
 433      */
 434     private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
 435 
 436     /**
 437      * The maximum number of estimated removal failures (sweepVotes)
 438      * to tolerate before sweeping through the queue unlinking
 439      * cancelled nodes that were not unlinked upon initial
 440      * removal. See above for explanation. The value must be at least
 441      * two to avoid useless sweeps when removing trailing nodes.
 442      */
 443     static final int SWEEP_THRESHOLD = 32;
 444 
 445     /**
 446      * Queue nodes. Uses Object, not E, for items to allow forgetting
 447      * them after use.  Relies heavily on Unsafe mechanics to minimize
 448      * unnecessary ordering constraints: Writes that are intrinsically
 449      * ordered wrt other accesses or CASes use simple relaxed forms.
 450      */
 451     static final class Node {
 452         final boolean isData;   // false if this is a request node
 453         volatile Object item;   // initially non-null if isData; CASed to match
 454         volatile Node next;
 455         volatile Thread waiter; // null until waiting
 456 
 457         // CAS methods for fields
 458         final boolean casNext(Node cmp, Node val) {
 459             return U.compareAndSwapObject(this, NEXT, cmp, val);
 460         }
 461 
 462         final boolean casItem(Object cmp, Object val) {
 463             // assert cmp == null || cmp.getClass() != Node.class;
 464             return U.compareAndSwapObject(this, ITEM, cmp, val);
 465         }
 466 
 467         /**
 468          * Constructs a new node.  Uses relaxed write because item can
 469          * only be seen after publication via casNext.
 470          */
 471         Node(Object item, boolean isData) {
 472             U.putObject(this, ITEM, item); // relaxed write
 473             this.isData = isData;
 474         }
 475 
 476         /**
 477          * Links node to itself to avoid garbage retention.  Called
 478          * only after CASing head field, so uses relaxed write.
 479          */
 480         final void forgetNext() {
 481             U.putObject(this, NEXT, this);
 482         }
 483 
 484         /**
 485          * Sets item to self and waiter to null, to avoid garbage
 486          * retention after matching or cancelling. Uses relaxed writes
 487          * because order is already constrained in the only calling
 488          * contexts: item is forgotten only after volatile/atomic
 489          * mechanics that extract items.  Similarly, clearing waiter
 490          * follows either CAS or return from park (if ever parked;
 491          * else we don't care).
 492          */
 493         final void forgetContents() {
 494             U.putObject(this, ITEM, this);
 495             U.putObject(this, WAITER, null);
 496         }
 497 
 498         /**
 499          * Returns true if this node has been matched, including the
 500          * case of artificial matches due to cancellation.
 501          */
 502         final boolean isMatched() {
 503             Object x = item;
 504             return (x == this) || ((x == null) == isData);
 505         }
 506 
 507         /**
 508          * Returns true if this is an unmatched request node.
 509          */
 510         final boolean isUnmatchedRequest() {
 511             return !isData && item == null;
 512         }
 513 
 514         /**
 515          * Returns true if a node with the given mode cannot be
 516          * appended to this node because this node is unmatched and
 517          * has opposite data mode.
 518          */
 519         final boolean cannotPrecede(boolean haveData) {
 520             boolean d = isData;
 521             Object x;
 522             return d != haveData && (x = item) != this && (x != null) == d;
 523         }
 524 
 525         /**
 526          * Tries to artificially match a data node -- used by remove.
 527          */
 528         final boolean tryMatchData() {
 529             // assert isData;
 530             Object x = item;
 531             if (x != null && x != this && casItem(x, null)) {
 532                 LockSupport.unpark(waiter);
 533                 return true;
 534             }
 535             return false;
 536         }
 537 
 538         private static final long serialVersionUID = -3375979862319811754L;
 539 
 540         // Unsafe mechanics
 541         private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
 542         private static final long ITEM;
 543         private static final long NEXT;
 544         private static final long WAITER;
 545         static {
 546             try {
 547                 ITEM = U.objectFieldOffset
 548                     (Node.class.getDeclaredField("item"));
 549                 NEXT = U.objectFieldOffset
 550                     (Node.class.getDeclaredField("next"));
 551                 WAITER = U.objectFieldOffset
 552                     (Node.class.getDeclaredField("waiter"));
 553             } catch (ReflectiveOperationException e) {
 554                 throw new Error(e);
 555             }
 556         }
 557     }
 558 
 559     /** head of the queue; null until first enqueue */
 560     transient volatile Node head;
 561 
 562     /** tail of the queue; null until first append */
 563     private transient volatile Node tail;
 564 
 565     /** The number of apparent failures to unsplice removed nodes */
 566     private transient volatile int sweepVotes;
 567 
 568     // CAS methods for fields
 569     private boolean casTail(Node cmp, Node val) {
 570         return U.compareAndSwapObject(this, TAIL, cmp, val);
 571     }
 572 
 573     private boolean casHead(Node cmp, Node val) {
 574         return U.compareAndSwapObject(this, HEAD, cmp, val);
 575     }
 576 
 577     private boolean casSweepVotes(int cmp, int val) {
 578         return U.compareAndSwapInt(this, SWEEPVOTES, cmp, val);
 579     }
 580 
 581     /*
 582      * Possible values for "how" argument in xfer method.
 583      */
 584     private static final int NOW   = 0; // for untimed poll, tryTransfer
 585     private static final int ASYNC = 1; // for offer, put, add
 586     private static final int SYNC  = 2; // for transfer, take
 587     private static final int TIMED = 3; // for timed poll, tryTransfer
 588 
 589     /**
 590      * Implements all queuing methods. See above for explanation.
 591      *
 592      * @param e the item or null for take
 593      * @param haveData true if this is a put, else a take
 594      * @param how NOW, ASYNC, SYNC, or TIMED
 595      * @param nanos timeout in nanosecs, used only if mode is TIMED
 596      * @return an item if matched, else e
 597      * @throws NullPointerException if haveData mode but e is null
 598      */
 599     private E xfer(E e, boolean haveData, int how, long nanos) {
 600         if (haveData && (e == null))
 601             throw new NullPointerException();
 602         Node s = null;                        // the node to append, if needed
 603 
 604         retry:
 605         for (;;) {                            // restart on append race
 606 
 607             for (Node h = head, p = h; p != null;) { // find & match first node
 608                 boolean isData = p.isData;
 609                 Object item = p.item;
 610                 if (item != p && (item != null) == isData) { // unmatched
 611                     if (isData == haveData)   // can't match
 612                         break;
 613                     if (p.casItem(item, e)) { // match
 614                         for (Node q = p; q != h;) {
 615                             Node n = q.next;  // update by 2 unless singleton
 616                             if (head == h && casHead(h, n == null ? q : n)) {
 617                                 h.forgetNext();
 618                                 break;
 619                             }                 // advance and retry
 620                             if ((h = head)   == null ||
 621                                 (q = h.next) == null || !q.isMatched())
 622                                 break;        // unless slack < 2
 623                         }
 624                         LockSupport.unpark(p.waiter);
 625                         @SuppressWarnings("unchecked") E itemE = (E) item;
 626                         return itemE;
 627                     }
 628                 }
 629                 Node n = p.next;
 630                 p = (p != n) ? n : (h = head); // Use head if p offlist
 631             }
 632 
 633             if (how != NOW) {                 // No matches available
 634                 if (s == null)
 635                     s = new Node(e, haveData);
 636                 Node pred = tryAppend(s, haveData);
 637                 if (pred == null)
 638                     continue retry;           // lost race vs opposite mode
 639                 if (how != ASYNC)
 640                     return awaitMatch(s, pred, e, (how == TIMED), nanos);
 641             }
 642             return e; // not waiting
 643         }
 644     }
 645 
 646     /**
 647      * Tries to append node s as tail.
 648      *
 649      * @param s the node to append
 650      * @param haveData true if appending in data mode
 651      * @return null on failure due to losing race with append in
 652      * different mode, else s's predecessor, or s itself if no
 653      * predecessor
 654      */
 655     private Node tryAppend(Node s, boolean haveData) {
 656         for (Node t = tail, p = t;;) {        // move p to last node and append
 657             Node n, u;                        // temps for reads of next & tail
 658             if (p == null && (p = head) == null) {
 659                 if (casHead(null, s))
 660                     return s;                 // initialize
 661             }
 662             else if (p.cannotPrecede(haveData))
 663                 return null;                  // lost race vs opposite mode
 664             else if ((n = p.next) != null)    // not last; keep traversing
 665                 p = p != t && t != (u = tail) ? (t = u) : // stale tail
 666                     (p != n) ? n : null;      // restart if off list
 667             else if (!p.casNext(null, s))
 668                 p = p.next;                   // re-read on CAS failure
 669             else {
 670                 if (p != t) {                 // update if slack now >= 2
 671                     while ((tail != t || !casTail(t, s)) &&
 672                            (t = tail)   != null &&
 673                            (s = t.next) != null && // advance and retry
 674                            (s = s.next) != null && s != t);
 675                 }
 676                 return p;
 677             }
 678         }
 679     }
 680 
 681     /**
 682      * Spins/yields/blocks until node s is matched or caller gives up.
 683      *
 684      * @param s the waiting node
 685      * @param pred the predecessor of s, or s itself if it has no
 686      * predecessor, or null if unknown (the null case does not occur
 687      * in any current calls but may in possible future extensions)
 688      * @param e the comparison value for checking match
 689      * @param timed if true, wait only until timeout elapses
 690      * @param nanos timeout in nanosecs, used only if timed is true
 691      * @return matched item, or e if unmatched on interrupt or timeout
 692      */
 693     private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
 694         final long deadline = timed ? System.nanoTime() + nanos : 0L;
 695         Thread w = Thread.currentThread();
 696         int spins = -1; // initialized after first item and cancel checks
 697         ThreadLocalRandom randomYields = null; // bound if needed
 698 
 699         for (;;) {
 700             Object item = s.item;
 701             if (item != e) {                  // matched
 702                 // assert item != s;
 703                 s.forgetContents();           // avoid garbage
 704                 @SuppressWarnings("unchecked") E itemE = (E) item;
 705                 return itemE;
 706             }
 707             else if (w.isInterrupted() || (timed && nanos <= 0L)) {
 708                 unsplice(pred, s);           // try to unlink and cancel
 709                 if (s.casItem(e, s))         // return normally if lost CAS
 710                     return e;
 711             }
 712             else if (spins < 0) {            // establish spins at/near front
 713                 if ((spins = spinsFor(pred, s.isData)) > 0)
 714                     randomYields = ThreadLocalRandom.current();
 715             }
 716             else if (spins > 0) {             // spin
 717                 --spins;
 718                 if (randomYields.nextInt(CHAINED_SPINS) == 0)
 719                     Thread.yield();           // occasionally yield
 720             }
 721             else if (s.waiter == null) {
 722                 s.waiter = w;                 // request unpark then recheck
 723             }
 724             else if (timed) {
 725                 nanos = deadline - System.nanoTime();
 726                 if (nanos > 0L)
 727                     LockSupport.parkNanos(this, nanos);
 728             }
 729             else {
 730                 LockSupport.park(this);
 731             }
 732         }
 733     }
 734 
 735     /**
 736      * Returns spin/yield value for a node with given predecessor and
 737      * data mode. See above for explanation.
 738      */
 739     private static int spinsFor(Node pred, boolean haveData) {
 740         if (MP && pred != null) {
 741             if (pred.isData != haveData)      // phase change
 742                 return FRONT_SPINS + CHAINED_SPINS;
 743             if (pred.isMatched())             // probably at front
 744                 return FRONT_SPINS;
 745             if (pred.waiter == null)          // pred apparently spinning
 746                 return CHAINED_SPINS;
 747         }
 748         return 0;
 749     }
 750 
 751     /* -------------- Traversal methods -------------- */
 752 
 753     /**
 754      * Returns the successor of p, or the head node if p.next has been
 755      * linked to self, which will only be true if traversing with a
 756      * stale pointer that is now off the list.
 757      */
 758     final Node succ(Node p) {
 759         Node next = p.next;
 760         return (p == next) ? head : next;
 761     }
 762 
 763     /**
 764      * Returns the first unmatched data node, or null if none.
 765      * Callers must recheck if the returned node's item field is null
 766      * or self-linked before using.
 767      */
 768     final Node firstDataNode() {
 769         restartFromHead: for (;;) {
 770             for (Node p = head; p != null;) {
 771                 Object item = p.item;
 772                 if (p.isData) {
 773                     if (item != null && item != p)
 774                         return p;
 775                 }
 776                 else if (item == null)
 777                     break;
 778                 if (p == (p = p.next))
 779                     continue restartFromHead;
 780             }
 781             return null;
 782         }
 783     }
 784 
 785     /**
 786      * Traverses and counts unmatched nodes of the given mode.
 787      * Used by methods size and getWaitingConsumerCount.
 788      */
 789     private int countOfMode(boolean data) {
 790         restartFromHead: for (;;) {
 791             int count = 0;
 792             for (Node p = head; p != null;) {
 793                 if (!p.isMatched()) {
 794                     if (p.isData != data)
 795                         return 0;
 796                     if (++count == Integer.MAX_VALUE)
 797                         break;  // @see Collection.size()
 798                 }
 799                 if (p == (p = p.next))
 800                     continue restartFromHead;
 801             }
 802             return count;
 803         }
 804     }
 805 
 806     public String toString() {
 807         String[] a = null;
 808         restartFromHead: for (;;) {
 809             int charLength = 0;
 810             int size = 0;
 811             for (Node p = head; p != null;) {
 812                 Object item = p.item;
 813                 if (p.isData) {
 814                     if (item != null && item != p) {
 815                         if (a == null)
 816                             a = new String[4];
 817                         else if (size == a.length)
 818                             a = Arrays.copyOf(a, 2 * size);
 819                         String s = item.toString();
 820                         a[size++] = s;
 821                         charLength += s.length();
 822                     }
 823                 } else if (item == null)
 824                     break;
 825                 if (p == (p = p.next))
 826                     continue restartFromHead;
 827             }
 828 
 829             if (size == 0)
 830                 return "[]";
 831 
 832             return Helpers.toString(a, size, charLength);
 833         }
 834     }
 835 
 836     private Object[] toArrayInternal(Object[] a) {
 837         Object[] x = a;
 838         restartFromHead: for (;;) {
 839             int size = 0;
 840             for (Node p = head; p != null;) {
 841                 Object item = p.item;
 842                 if (p.isData) {
 843                     if (item != null && item != p) {
 844                         if (x == null)
 845                             x = new Object[4];
 846                         else if (size == x.length)
 847                             x = Arrays.copyOf(x, 2 * (size + 4));
 848                         x[size++] = item;
 849                     }
 850                 } else if (item == null)
 851                     break;
 852                 if (p == (p = p.next))
 853                     continue restartFromHead;
 854             }
 855             if (x == null)
 856                 return new Object[0];
 857             else if (a != null && size <= a.length) {
 858                 if (a != x)
 859                     System.arraycopy(x, 0, a, 0, size);
 860                 if (size < a.length)
 861                     a[size] = null;
 862                 return a;
 863             }
 864             return (size == x.length) ? x : Arrays.copyOf(x, size);
 865         }
 866     }
 867 
 868     /**
 869      * Returns an array containing all of the elements in this queue, in
 870      * proper sequence.
 871      *
 872      * <p>The returned array will be "safe" in that no references to it are
 873      * maintained by this queue.  (In other words, this method must allocate
 874      * a new array).  The caller is thus free to modify the returned array.
 875      *
 876      * <p>This method acts as bridge between array-based and collection-based
 877      * APIs.
 878      *
 879      * @return an array containing all of the elements in this queue
 880      */
 881     public Object[] toArray() {
 882         return toArrayInternal(null);
 883     }
 884 
 885     /**
 886      * Returns an array containing all of the elements in this queue, in
 887      * proper sequence; the runtime type of the returned array is that of
 888      * the specified array.  If the queue fits in the specified array, it
 889      * is returned therein.  Otherwise, a new array is allocated with the
 890      * runtime type of the specified array and the size of this queue.
 891      *
 892      * <p>If this queue fits in the specified array with room to spare
 893      * (i.e., the array has more elements than this queue), the element in
 894      * the array immediately following the end of the queue is set to
 895      * {@code null}.
 896      *
 897      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 898      * array-based and collection-based APIs.  Further, this method allows
 899      * precise control over the runtime type of the output array, and may,
 900      * under certain circumstances, be used to save allocation costs.
 901      *
 902      * <p>Suppose {@code x} is a queue known to contain only strings.
 903      * The following code can be used to dump the queue into a newly
 904      * allocated array of {@code String}:
 905      *
 906      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
 907      *
 908      * Note that {@code toArray(new Object[0])} is identical in function to
 909      * {@code toArray()}.
 910      *
 911      * @param a the array into which the elements of the queue are to
 912      *          be stored, if it is big enough; otherwise, a new array of the
 913      *          same runtime type is allocated for this purpose
 914      * @return an array containing all of the elements in this queue
 915      * @throws ArrayStoreException if the runtime type of the specified array
 916      *         is not a supertype of the runtime type of every element in
 917      *         this queue
 918      * @throws NullPointerException if the specified array is null
 919      */
 920     @SuppressWarnings("unchecked")
 921     public <T> T[] toArray(T[] a) {
 922         if (a == null) throw new NullPointerException();
 923         return (T[]) toArrayInternal(a);
 924     }
 925 
 926     final class Itr implements Iterator<E> {
 927         private Node nextNode;   // next node to return item for
 928         private E nextItem;      // the corresponding item
 929         private Node lastRet;    // last returned node, to support remove
 930         private Node lastPred;   // predecessor to unlink lastRet
 931 
 932         /**
 933          * Moves to next node after prev, or first node if prev null.
 934          */
 935         private void advance(Node prev) {
 936             /*
 937              * To track and avoid buildup of deleted nodes in the face
 938              * of calls to both Queue.remove and Itr.remove, we must
 939              * include variants of unsplice and sweep upon each
 940              * advance: Upon Itr.remove, we may need to catch up links
 941              * from lastPred, and upon other removes, we might need to
 942              * skip ahead from stale nodes and unsplice deleted ones
 943              * found while advancing.
 944              */
 945 
 946             Node r, b; // reset lastPred upon possible deletion of lastRet
 947             if ((r = lastRet) != null && !r.isMatched())
 948                 lastPred = r;    // next lastPred is old lastRet
 949             else if ((b = lastPred) == null || b.isMatched())
 950                 lastPred = null; // at start of list
 951             else {
 952                 Node s, n;       // help with removal of lastPred.next
 953                 while ((s = b.next) != null &&
 954                        s != b && s.isMatched() &&
 955                        (n = s.next) != null && n != s)
 956                     b.casNext(s, n);
 957             }
 958 
 959             this.lastRet = prev;
 960 
 961             for (Node p = prev, s, n;;) {
 962                 s = (p == null) ? head : p.next;
 963                 if (s == null)
 964                     break;
 965                 else if (s == p) {
 966                     p = null;
 967                     continue;
 968                 }
 969                 Object item = s.item;
 970                 if (s.isData) {
 971                     if (item != null && item != s) {
 972                         @SuppressWarnings("unchecked") E itemE = (E) item;
 973                         nextItem = itemE;
 974                         nextNode = s;
 975                         return;
 976                     }
 977                 }
 978                 else if (item == null)
 979                     break;
 980                 // assert s.isMatched();
 981                 if (p == null)
 982                     p = s;
 983                 else if ((n = s.next) == null)
 984                     break;
 985                 else if (s == n)
 986                     p = null;
 987                 else
 988                     p.casNext(s, n);
 989             }
 990             nextNode = null;
 991             nextItem = null;
 992         }
 993 
 994         Itr() {
 995             advance(null);
 996         }
 997 
 998         public final boolean hasNext() {
 999             return nextNode != null;
1000         }
1001 
1002         public final E next() {
1003             Node p = nextNode;
1004             if (p == null) throw new NoSuchElementException();
1005             E e = nextItem;
1006             advance(p);
1007             return e;
1008         }
1009 
1010         public final void remove() {
1011             final Node lastRet = this.lastRet;
1012             if (lastRet == null)
1013                 throw new IllegalStateException();
1014             this.lastRet = null;
1015             if (lastRet.tryMatchData())
1016                 unsplice(lastPred, lastRet);
1017         }
1018     }
1019 
1020     /** A customized variant of Spliterators.IteratorSpliterator */
1021     final class LTQSpliterator<E> implements Spliterator<E> {
1022         static final int MAX_BATCH = 1 << 25;  // max batch array size;
1023         Node current;       // current node; null until initialized
1024         int batch;          // batch size for splits
1025         boolean exhausted;  // true when no more nodes
1026         LTQSpliterator() {}
1027 
1028         public Spliterator<E> trySplit() {
1029             Node p;
1030             int b = batch;
1031             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
1032             if (!exhausted &&
1033                 ((p = current) != null || (p = firstDataNode()) != null) &&
1034                 p.next != null) {
1035                 Object[] a = new Object[n];
1036                 int i = 0;
1037                 do {
1038                     Object e = p.item;
1039                     if (e != p && (a[i] = e) != null)
1040                         ++i;
1041                     if (p == (p = p.next))
1042                         p = firstDataNode();
1043                 } while (p != null && i < n && p.isData);
1044                 if ((current = p) == null)
1045                     exhausted = true;
1046                 if (i > 0) {
1047                     batch = i;
1048                     return Spliterators.spliterator
1049                         (a, 0, i, (Spliterator.ORDERED |
1050                                    Spliterator.NONNULL |
1051                                    Spliterator.CONCURRENT));
1052                 }
1053             }
1054             return null;
1055         }
1056 
1057         @SuppressWarnings("unchecked")
1058         public void forEachRemaining(Consumer<? super E> action) {
1059             Node p;
1060             if (action == null) throw new NullPointerException();
1061             if (!exhausted &&
1062                 ((p = current) != null || (p = firstDataNode()) != null)) {
1063                 exhausted = true;
1064                 do {
1065                     Object e = p.item;
1066                     if (e != null && e != p)
1067                         action.accept((E)e);
1068                     if (p == (p = p.next))
1069                         p = firstDataNode();
1070                 } while (p != null && p.isData);
1071             }
1072         }
1073 
1074         @SuppressWarnings("unchecked")
1075         public boolean tryAdvance(Consumer<? super E> action) {
1076             Node p;
1077             if (action == null) throw new NullPointerException();
1078             if (!exhausted &&
1079                 ((p = current) != null || (p = firstDataNode()) != null)) {
1080                 Object e;
1081                 do {
1082                     if ((e = p.item) == p)
1083                         e = null;
1084                     if (p == (p = p.next))
1085                         p = firstDataNode();
1086                 } while (e == null && p != null && p.isData);
1087                 if ((current = p) == null)
1088                     exhausted = true;
1089                 if (e != null) {
1090                     action.accept((E)e);
1091                     return true;
1092                 }
1093             }
1094             return false;
1095         }
1096 
1097         public long estimateSize() { return Long.MAX_VALUE; }
1098 
1099         public int characteristics() {
1100             return Spliterator.ORDERED | Spliterator.NONNULL |
1101                 Spliterator.CONCURRENT;
1102         }
1103     }
1104 
1105     /**
1106      * Returns a {@link Spliterator} over the elements in this queue.
1107      *
1108      * <p>The returned spliterator is
1109      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1110      *
1111      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1112      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1113      *
1114      * @implNote
1115      * The {@code Spliterator} implements {@code trySplit} to permit limited
1116      * parallelism.
1117      *
1118      * @return a {@code Spliterator} over the elements in this queue
1119      * @since 1.8
1120      */
1121     public Spliterator<E> spliterator() {
1122         return new LTQSpliterator<E>();
1123     }
1124 
1125     /* -------------- Removal methods -------------- */
1126 
1127     /**
1128      * Unsplices (now or later) the given deleted/cancelled node with
1129      * the given predecessor.
1130      *
1131      * @param pred a node that was at one time known to be the
1132      * predecessor of s, or null or s itself if s is/was at head
1133      * @param s the node to be unspliced
1134      */
1135     final void unsplice(Node pred, Node s) {
1136         s.waiter = null; // disable signals
1137         /*
1138          * See above for rationale. Briefly: if pred still points to
1139          * s, try to unlink s.  If s cannot be unlinked, because it is
1140          * trailing node or pred might be unlinked, and neither pred
1141          * nor s are head or offlist, add to sweepVotes, and if enough
1142          * votes have accumulated, sweep.
1143          */
1144         if (pred != null && pred != s && pred.next == s) {
1145             Node n = s.next;
1146             if (n == null ||
1147                 (n != s && pred.casNext(s, n) && pred.isMatched())) {
1148                 for (;;) {               // check if at, or could be, head
1149                     Node h = head;
1150                     if (h == pred || h == s || h == null)
1151                         return;          // at head or list empty
1152                     if (!h.isMatched())
1153                         break;
1154                     Node hn = h.next;
1155                     if (hn == null)
1156                         return;          // now empty
1157                     if (hn != h && casHead(h, hn))
1158                         h.forgetNext();  // advance head
1159                 }
1160                 if (pred.next != pred && s.next != s) { // recheck if offlist
1161                     for (;;) {           // sweep now if enough votes
1162                         int v = sweepVotes;
1163                         if (v < SWEEP_THRESHOLD) {
1164                             if (casSweepVotes(v, v + 1))
1165                                 break;
1166                         }
1167                         else if (casSweepVotes(v, 0)) {
1168                             sweep();
1169                             break;
1170                         }
1171                     }
1172                 }
1173             }
1174         }
1175     }
1176 
1177     /**
1178      * Unlinks matched (typically cancelled) nodes encountered in a
1179      * traversal from head.
1180      */
1181     private void sweep() {
1182         for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
1183             if (!s.isMatched())
1184                 // Unmatched nodes are never self-linked
1185                 p = s;
1186             else if ((n = s.next) == null) // trailing node is pinned
1187                 break;
1188             else if (s == n)    // stale
1189                 // No need to also check for p == s, since that implies s == n
1190                 p = head;
1191             else
1192                 p.casNext(s, n);
1193         }
1194     }
1195 
1196     /**
1197      * Main implementation of remove(Object)
1198      */
1199     private boolean findAndRemove(Object e) {
1200         if (e != null) {
1201             for (Node pred = null, p = head; p != null; ) {
1202                 Object item = p.item;
1203                 if (p.isData) {
1204                     if (item != null && item != p && e.equals(item) &&
1205                         p.tryMatchData()) {
1206                         unsplice(pred, p);
1207                         return true;
1208                     }
1209                 }
1210                 else if (item == null)
1211                     break;
1212                 pred = p;
1213                 if ((p = p.next) == pred) { // stale
1214                     pred = null;
1215                     p = head;
1216                 }
1217             }
1218         }
1219         return false;
1220     }
1221 
1222     /**
1223      * Creates an initially empty {@code LinkedTransferQueue}.
1224      */
1225     public LinkedTransferQueue() {
1226     }
1227 
1228     /**
1229      * Creates a {@code LinkedTransferQueue}
1230      * initially containing the elements of the given collection,
1231      * added in traversal order of the collection's iterator.
1232      *
1233      * @param c the collection of elements to initially contain
1234      * @throws NullPointerException if the specified collection or any
1235      *         of its elements are null
1236      */
1237     public LinkedTransferQueue(Collection<? extends E> c) {
1238         this();
1239         addAll(c);
1240     }
1241 
1242     /**
1243      * Inserts the specified element at the tail of this queue.
1244      * As the queue is unbounded, this method will never block.
1245      *
1246      * @throws NullPointerException if the specified element is null
1247      */
1248     public void put(E e) {
1249         xfer(e, true, ASYNC, 0);
1250     }
1251 
1252     /**
1253      * Inserts the specified element at the tail of this queue.
1254      * As the queue is unbounded, this method will never block or
1255      * return {@code false}.
1256      *
1257      * @return {@code true} (as specified by
1258      *  {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
1259      *  BlockingQueue.offer})
1260      * @throws NullPointerException if the specified element is null
1261      */
1262     public boolean offer(E e, long timeout, TimeUnit unit) {
1263         xfer(e, true, ASYNC, 0);
1264         return true;
1265     }
1266 
1267     /**
1268      * Inserts the specified element at the tail of this queue.
1269      * As the queue is unbounded, this method will never return {@code false}.
1270      *
1271      * @return {@code true} (as specified by {@link Queue#offer})
1272      * @throws NullPointerException if the specified element is null
1273      */
1274     public boolean offer(E e) {
1275         xfer(e, true, ASYNC, 0);
1276         return true;
1277     }
1278 
1279     /**
1280      * Inserts the specified element at the tail of this queue.
1281      * As the queue is unbounded, this method will never throw
1282      * {@link IllegalStateException} or return {@code false}.
1283      *
1284      * @return {@code true} (as specified by {@link Collection#add})
1285      * @throws NullPointerException if the specified element is null
1286      */
1287     public boolean add(E e) {
1288         xfer(e, true, ASYNC, 0);
1289         return true;
1290     }
1291 
1292     /**
1293      * Transfers the element to a waiting consumer immediately, if possible.
1294      *
1295      * <p>More precisely, transfers the specified element immediately
1296      * if there exists a consumer already waiting to receive it (in
1297      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1298      * otherwise returning {@code false} without enqueuing the element.
1299      *
1300      * @throws NullPointerException if the specified element is null
1301      */
1302     public boolean tryTransfer(E e) {
1303         return xfer(e, true, NOW, 0) == null;
1304     }
1305 
1306     /**
1307      * Transfers the element to a consumer, waiting if necessary to do so.
1308      *
1309      * <p>More precisely, transfers the specified element immediately
1310      * if there exists a consumer already waiting to receive it (in
1311      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1312      * else inserts the specified element at the tail of this queue
1313      * and waits until the element is received by a consumer.
1314      *
1315      * @throws NullPointerException if the specified element is null
1316      */
1317     public void transfer(E e) throws InterruptedException {
1318         if (xfer(e, true, SYNC, 0) != null) {
1319             Thread.interrupted(); // failure possible only due to interrupt
1320             throw new InterruptedException();
1321         }
1322     }
1323 
1324     /**
1325      * Transfers the element to a consumer if it is possible to do so
1326      * before the timeout elapses.
1327      *
1328      * <p>More precisely, transfers the specified element immediately
1329      * if there exists a consumer already waiting to receive it (in
1330      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1331      * else inserts the specified element at the tail of this queue
1332      * and waits until the element is received by a consumer,
1333      * returning {@code false} if the specified wait time elapses
1334      * before the element can be transferred.
1335      *
1336      * @throws NullPointerException if the specified element is null
1337      */
1338     public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1339         throws InterruptedException {
1340         if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1341             return true;
1342         if (!Thread.interrupted())
1343             return false;
1344         throw new InterruptedException();
1345     }
1346 
1347     public E take() throws InterruptedException {
1348         E e = xfer(null, false, SYNC, 0);
1349         if (e != null)
1350             return e;
1351         Thread.interrupted();
1352         throw new InterruptedException();
1353     }
1354 
1355     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1356         E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1357         if (e != null || !Thread.interrupted())
1358             return e;
1359         throw new InterruptedException();
1360     }
1361 
1362     public E poll() {
1363         return xfer(null, false, NOW, 0);
1364     }
1365 
1366     /**
1367      * @throws NullPointerException     {@inheritDoc}
1368      * @throws IllegalArgumentException {@inheritDoc}
1369      */
1370     public int drainTo(Collection<? super E> c) {
1371         if (c == null)
1372             throw new NullPointerException();
1373         if (c == this)
1374             throw new IllegalArgumentException();
1375         int n = 0;
1376         for (E e; (e = poll()) != null;) {
1377             c.add(e);
1378             ++n;
1379         }
1380         return n;
1381     }
1382 
1383     /**
1384      * @throws NullPointerException     {@inheritDoc}
1385      * @throws IllegalArgumentException {@inheritDoc}
1386      */
1387     public int drainTo(Collection<? super E> c, int maxElements) {
1388         if (c == null)
1389             throw new NullPointerException();
1390         if (c == this)
1391             throw new IllegalArgumentException();
1392         int n = 0;
1393         for (E e; n < maxElements && (e = poll()) != null;) {
1394             c.add(e);
1395             ++n;
1396         }
1397         return n;
1398     }
1399 
1400     /**
1401      * Returns an iterator over the elements in this queue in proper sequence.
1402      * The elements will be returned in order from first (head) to last (tail).
1403      *
1404      * <p>The returned iterator is
1405      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1406      *
1407      * @return an iterator over the elements in this queue in proper sequence
1408      */
1409     public Iterator<E> iterator() {
1410         return new Itr();
1411     }
1412 
1413     public E peek() {
1414         restartFromHead: for (;;) {
1415             for (Node p = head; p != null;) {
1416                 Object item = p.item;
1417                 if (p.isData) {
1418                     if (item != null && item != p) {
1419                         @SuppressWarnings("unchecked") E e = (E) item;
1420                         return e;
1421                     }
1422                 }
1423                 else if (item == null)
1424                     break;
1425                 if (p == (p = p.next))
1426                     continue restartFromHead;
1427             }
1428             return null;
1429         }
1430     }
1431 
1432     /**
1433      * Returns {@code true} if this queue contains no elements.
1434      *
1435      * @return {@code true} if this queue contains no elements
1436      */
1437     public boolean isEmpty() {
1438         return firstDataNode() == null;
1439     }
1440 
1441     public boolean hasWaitingConsumer() {
1442         restartFromHead: for (;;) {
1443             for (Node p = head; p != null;) {
1444                 Object item = p.item;
1445                 if (p.isData) {
1446                     if (item != null && item != p)
1447                         break;
1448                 }
1449                 else if (item == null)
1450                     return true;
1451                 if (p == (p = p.next))
1452                     continue restartFromHead;
1453             }
1454             return false;
1455         }
1456     }
1457 
1458     /**
1459      * Returns the number of elements in this queue.  If this queue
1460      * contains more than {@code Integer.MAX_VALUE} elements, returns
1461      * {@code Integer.MAX_VALUE}.
1462      *
1463      * <p>Beware that, unlike in most collections, this method is
1464      * <em>NOT</em> a constant-time operation. Because of the
1465      * asynchronous nature of these queues, determining the current
1466      * number of elements requires an O(n) traversal.
1467      *
1468      * @return the number of elements in this queue
1469      */
1470     public int size() {
1471         return countOfMode(true);
1472     }
1473 
1474     public int getWaitingConsumerCount() {
1475         return countOfMode(false);
1476     }
1477 
1478     /**
1479      * Removes a single instance of the specified element from this queue,
1480      * if it is present.  More formally, removes an element {@code e} such
1481      * that {@code o.equals(e)}, if this queue contains one or more such
1482      * elements.
1483      * Returns {@code true} if this queue contained the specified element
1484      * (or equivalently, if this queue changed as a result of the call).
1485      *
1486      * @param o element to be removed from this queue, if present
1487      * @return {@code true} if this queue changed as a result of the call
1488      */
1489     public boolean remove(Object o) {
1490         return findAndRemove(o);
1491     }
1492 
1493     /**
1494      * Returns {@code true} if this queue contains the specified element.
1495      * More formally, returns {@code true} if and only if this queue contains
1496      * at least one element {@code e} such that {@code o.equals(e)}.
1497      *
1498      * @param o object to be checked for containment in this queue
1499      * @return {@code true} if this queue contains the specified element
1500      */
1501     public boolean contains(Object o) {
1502         if (o != null) {
1503             for (Node p = head; p != null; p = succ(p)) {
1504                 Object item = p.item;
1505                 if (p.isData) {
1506                     if (item != null && item != p && o.equals(item))
1507                         return true;
1508                 }
1509                 else if (item == null)
1510                     break;
1511             }
1512         }
1513         return false;
1514     }
1515 
1516     /**
1517      * Always returns {@code Integer.MAX_VALUE} because a
1518      * {@code LinkedTransferQueue} is not capacity constrained.
1519      *
1520      * @return {@code Integer.MAX_VALUE} (as specified by
1521      *         {@link java.util.concurrent.BlockingQueue#remainingCapacity()
1522      *         BlockingQueue.remainingCapacity})
1523      */
1524     public int remainingCapacity() {
1525         return Integer.MAX_VALUE;
1526     }
1527 
1528     /**
1529      * Saves this queue to a stream (that is, serializes it).
1530      *
1531      * @param s the stream
1532      * @throws java.io.IOException if an I/O error occurs
1533      * @serialData All of the elements (each an {@code E}) in
1534      * the proper order, followed by a null
1535      */
1536     private void writeObject(java.io.ObjectOutputStream s)
1537         throws java.io.IOException {
1538         s.defaultWriteObject();
1539         for (E e : this)
1540             s.writeObject(e);
1541         // Use trailing null as sentinel
1542         s.writeObject(null);
1543     }
1544 
1545     /**
1546      * Reconstitutes this queue from a stream (that is, deserializes it).
1547      * @param s the stream
1548      * @throws ClassNotFoundException if the class of a serialized object
1549      *         could not be found
1550      * @throws java.io.IOException if an I/O error occurs
1551      */
1552     private void readObject(java.io.ObjectInputStream s)
1553         throws java.io.IOException, ClassNotFoundException {
1554         s.defaultReadObject();
1555         for (;;) {
1556             @SuppressWarnings("unchecked")
1557             E item = (E) s.readObject();
1558             if (item == null)
1559                 break;
1560             else
1561                 offer(item);
1562         }
1563     }
1564 
1565     // Unsafe mechanics
1566 
1567     private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
1568     private static final long HEAD;
1569     private static final long TAIL;
1570     private static final long SWEEPVOTES;
1571     static {
1572         try {
1573             HEAD = U.objectFieldOffset
1574                 (LinkedTransferQueue.class.getDeclaredField("head"));
1575             TAIL = U.objectFieldOffset
1576                 (LinkedTransferQueue.class.getDeclaredField("tail"));
1577             SWEEPVOTES = U.objectFieldOffset
1578                 (LinkedTransferQueue.class.getDeclaredField("sweepVotes"));
1579         } catch (ReflectiveOperationException e) {
1580             throw new Error(e);
1581         }
1582 
1583         // Reduce the risk of rare disastrous classloading in first call to
1584         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1585         Class<?> ensureLoaded = LockSupport.class;
1586     }
1587 }