src/share/classes/java/util/concurrent/LinkedTransferQueue.java

Print this page




 313      *    by continuing the traversal at current head.
 314      *
 315      *    On successful append, if the call was ASYNC, return.
 316      *
 317      * 3. Await match or cancellation (method awaitMatch)
 318      *
 319      *    Wait for another thread to match node; instead cancelling if
 320      *    the current thread was interrupted or the wait timed out. On
 321      *    multiprocessors, we use front-of-queue spinning: If a node
 322      *    appears to be the first unmatched node in the queue, it
 323      *    spins a bit before blocking. In either case, before blocking
 324      *    it tries to unsplice any nodes between the current "head"
 325      *    and the first unmatched node.
 326      *
 327      *    Front-of-queue spinning vastly improves performance of
 328      *    heavily contended queues. And so long as it is relatively
 329      *    brief and "quiet", spinning does not much impact performance
 330      *    of less-contended queues.  During spins threads check their
 331      *    interrupt status and generate a thread-local random number
 332      *    to decide to occasionally perform a Thread.yield. While
 333      *    yield has underdefined specs, we assume that might it help,
 334      *    and will not hurt in limiting impact of spinning on busy
 335      *    systems.  We also use smaller (1/2) spins for nodes that are
 336      *    not known to be front but whose predecessors have not
 337      *    blocked -- these "chained" spins avoid artifacts of
 338      *    front-of-queue rules which otherwise lead to alternating
 339      *    nodes spinning vs blocking. Further, front threads that
 340      *    represent phase changes (from data to request node or vice
 341      *    versa) compared to their predecessors receive additional
 342      *    chained spins, reflecting longer paths typically required to
 343      *    unblock threads during phase changes.
 344      *
 345      *
 346      * ** Unlinking removed interior nodes **
 347      *
 348      * In addition to minimizing garbage retention via self-linking
 349      * described above, we also unlink removed interior nodes. These
 350      * may arise due to timed out or interrupted waits, or calls to
 351      * remove(x) or Iterator.remove.  Normally, given a node that was
 352      * at one time known to be the predecessor of some node s that is
 353      * to be removed, we can unsplice s by CASing the next field of
 354      * its predecessor if it still points to s (otherwise s must


 525         final boolean tryMatchData() {
 526             // assert isData;
 527             Object x = item;
 528             if (x != null && x != this && casItem(x, null)) {
 529                 LockSupport.unpark(waiter);
 530                 return true;
 531             }
 532             return false;
 533         }
 534 
 535         private static final long serialVersionUID = -3375979862319811754L;
 536 
 537         // Unsafe mechanics
 538         private static final sun.misc.Unsafe UNSAFE;
 539         private static final long itemOffset;
 540         private static final long nextOffset;
 541         private static final long waiterOffset;
 542         static {
 543             try {
 544                 UNSAFE = sun.misc.Unsafe.getUnsafe();
 545                 Class k = Node.class;
 546                 itemOffset = UNSAFE.objectFieldOffset
 547                     (k.getDeclaredField("item"));
 548                 nextOffset = UNSAFE.objectFieldOffset
 549                     (k.getDeclaredField("next"));
 550                 waiterOffset = UNSAFE.objectFieldOffset
 551                     (k.getDeclaredField("waiter"));
 552             } catch (Exception e) {
 553                 throw new Error(e);
 554             }
 555         }
 556     }
 557 
 558     /** head of the queue; null until first enqueue */
 559     transient volatile Node head;
 560 
 561     /** tail of the queue; null until first append */
 562     private transient volatile Node tail;
 563 
 564     /** The number of apparent failures to unsplice removed nodes */
 565     private transient volatile int sweepVotes;


 610         for (;;) {                            // restart on append race
 611 
 612             for (Node h = head, p = h; p != null;) { // find & match first node
 613                 boolean isData = p.isData;
 614                 Object item = p.item;
 615                 if (item != p && (item != null) == isData) { // unmatched
 616                     if (isData == haveData)   // can't match
 617                         break;
 618                     if (p.casItem(item, e)) { // match
 619                         for (Node q = p; q != h;) {
 620                             Node n = q.next;  // update by 2 unless singleton
 621                             if (head == h && casHead(h, n == null ? q : n)) {
 622                                 h.forgetNext();
 623                                 break;
 624                             }                 // advance and retry
 625                             if ((h = head)   == null ||
 626                                 (q = h.next) == null || !q.isMatched())
 627                                 break;        // unless slack < 2
 628                         }
 629                         LockSupport.unpark(p.waiter);
 630                         return this.<E>cast(item);
 631                     }
 632                 }
 633                 Node n = p.next;
 634                 p = (p != n) ? n : (h = head); // Use head if p offlist
 635             }
 636 
 637             if (how != NOW) {                 // No matches available
 638                 if (s == null)
 639                     s = new Node(e, haveData);
 640                 Node pred = tryAppend(s, haveData);
 641                 if (pred == null)
 642                     continue retry;           // lost race vs opposite mode
 643                 if (how != ASYNC)
 644                     return awaitMatch(s, pred, e, (how == TIMED), nanos);
 645             }
 646             return e; // not waiting
 647         }
 648     }
 649 
 650     /**


 688      * @param s the waiting node
 689      * @param pred the predecessor of s, or s itself if it has no
 690      * predecessor, or null if unknown (the null case does not occur
 691      * in any current calls but may in possible future extensions)
 692      * @param e the comparison value for checking match
 693      * @param timed if true, wait only until timeout elapses
 694      * @param nanos timeout in nanosecs, used only if timed is true
 695      * @return matched item, or e if unmatched on interrupt or timeout
 696      */
 697     private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
 698         long lastTime = timed ? System.nanoTime() : 0L;
 699         Thread w = Thread.currentThread();
 700         int spins = -1; // initialized after first item and cancel checks
 701         ThreadLocalRandom randomYields = null; // bound if needed
 702 
 703         for (;;) {
 704             Object item = s.item;
 705             if (item != e) {                  // matched
 706                 // assert item != s;
 707                 s.forgetContents();           // avoid garbage
 708                 return this.<E>cast(item);
 709             }
 710             if ((w.isInterrupted() || (timed && nanos <= 0)) &&
 711                     s.casItem(e, s)) {        // cancel
 712                 unsplice(pred, s);
 713                 return e;
 714             }
 715 
 716             if (spins < 0) {                  // establish spins at/near front
 717                 if ((spins = spinsFor(pred, s.isData)) > 0)
 718                     randomYields = ThreadLocalRandom.current();
 719             }
 720             else if (spins > 0) {             // spin
 721                 --spins;
 722                 if (randomYields.nextInt(CHAINED_SPINS) == 0)
 723                     Thread.yield();           // occasionally yield
 724             }
 725             else if (s.waiter == null) {
 726                 s.waiter = w;                 // request unpark then recheck
 727             }
 728             else if (timed) {


 769      * Returns the first unmatched node of the given mode, or null if
 770      * none.  Used by methods isEmpty, hasWaitingConsumer.
 771      */
 772     private Node firstOfMode(boolean isData) {
 773         for (Node p = head; p != null; p = succ(p)) {
 774             if (!p.isMatched())
 775                 return (p.isData == isData) ? p : null;
 776         }
 777         return null;
 778     }
 779 
 780     /**
 781      * Returns the item in the first unmatched node with isData; or
 782      * null if none.  Used by peek.
 783      */
 784     private E firstDataItem() {
 785         for (Node p = head; p != null; p = succ(p)) {
 786             Object item = p.item;
 787             if (p.isData) {
 788                 if (item != null && item != p)
 789                     return this.<E>cast(item);
 790             }
 791             else if (item == null)
 792                 return null;
 793         }
 794         return null;
 795     }
 796 
 797     /**
 798      * Traverses and counts unmatched nodes of the given mode.
 799      * Used by methods size and getWaitingConsumerCount.
 800      */
 801     private int countOfMode(boolean data) {
 802         int count = 0;
 803         for (Node p = head; p != null; ) {
 804             if (!p.isMatched()) {
 805                 if (p.isData != data)
 806                     return 0;
 807                 if (++count == Integer.MAX_VALUE) // saturated
 808                     break;
 809             }


 991                 Object item = p.item;
 992                 if (p.isData) {
 993                     if (item != null && item != p && e.equals(item) &&
 994                         p.tryMatchData()) {
 995                         unsplice(pred, p);
 996                         return true;
 997                     }
 998                 }
 999                 else if (item == null)
1000                     break;
1001                 pred = p;
1002                 if ((p = p.next) == pred) { // stale
1003                     pred = null;
1004                     p = head;
1005                 }
1006             }
1007         }
1008         return false;
1009     }
1010 
1011 
1012     /**
1013      * Creates an initially empty {@code LinkedTransferQueue}.
1014      */
1015     public LinkedTransferQueue() {
1016     }
1017 
1018     /**
1019      * Creates a {@code LinkedTransferQueue}
1020      * initially containing the elements of the given collection,
1021      * added in traversal order of the collection's iterator.
1022      *
1023      * @param c the collection of elements to initially contain
1024      * @throws NullPointerException if the specified collection or any
1025      *         of its elements are null
1026      */
1027     public LinkedTransferQueue(Collection<? extends E> c) {
1028         this();
1029         addAll(c);
1030     }
1031 
1032     /**
1033      * Inserts the specified element at the tail of this queue.
1034      * As the queue is unbounded, this method will never block.
1035      *
1036      * @throws NullPointerException if the specified element is null
1037      */
1038     public void put(E e) {
1039         xfer(e, true, ASYNC, 0);
1040     }
1041 
1042     /**
1043      * Inserts the specified element at the tail of this queue.
1044      * As the queue is unbounded, this method will never block or
1045      * return {@code false}.
1046      *
1047      * @return {@code true} (as specified by
1048      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})

1049      * @throws NullPointerException if the specified element is null
1050      */
1051     public boolean offer(E e, long timeout, TimeUnit unit) {
1052         xfer(e, true, ASYNC, 0);
1053         return true;
1054     }
1055 
1056     /**
1057      * Inserts the specified element at the tail of this queue.
1058      * As the queue is unbounded, this method will never return {@code false}.
1059      *
1060      * @return {@code true} (as specified by {@link Queue#offer})
1061      * @throws NullPointerException if the specified element is null
1062      */
1063     public boolean offer(E e) {
1064         xfer(e, true, ASYNC, 0);
1065         return true;
1066     }
1067 
1068     /**


1145         E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1146         if (e != null || !Thread.interrupted())
1147             return e;
1148         throw new InterruptedException();
1149     }
1150 
1151     public E poll() {
1152         return xfer(null, false, NOW, 0);
1153     }
1154 
1155     /**
1156      * @throws NullPointerException     {@inheritDoc}
1157      * @throws IllegalArgumentException {@inheritDoc}
1158      */
1159     public int drainTo(Collection<? super E> c) {
1160         if (c == null)
1161             throw new NullPointerException();
1162         if (c == this)
1163             throw new IllegalArgumentException();
1164         int n = 0;
1165         E e;
1166         while ( (e = poll()) != null) {
1167             c.add(e);
1168             ++n;
1169         }
1170         return n;
1171     }
1172 
1173     /**
1174      * @throws NullPointerException     {@inheritDoc}
1175      * @throws IllegalArgumentException {@inheritDoc}
1176      */
1177     public int drainTo(Collection<? super E> c, int maxElements) {
1178         if (c == null)
1179             throw new NullPointerException();
1180         if (c == this)
1181             throw new IllegalArgumentException();
1182         int n = 0;
1183         E e;
1184         while (n < maxElements && (e = poll()) != null) {
1185             c.add(e);
1186             ++n;
1187         }
1188         return n;
1189     }
1190 
1191     /**
1192      * Returns an iterator over the elements in this queue in proper sequence.
1193      * The elements will be returned in order from first (head) to last (tail).
1194      *
1195      * <p>The returned iterator is a "weakly consistent" iterator that
1196      * will never throw {@link java.util.ConcurrentModificationException
1197      * ConcurrentModificationException}, and guarantees to traverse
1198      * elements as they existed upon construction of the iterator, and
1199      * may (but is not guaranteed to) reflect any modifications
1200      * subsequent to construction.
1201      *
1202      * @return an iterator over the elements in this queue in proper sequence
1203      */
1204     public Iterator<E> iterator() {


1271      */
1272     public boolean contains(Object o) {
1273         if (o == null) return false;
1274         for (Node p = head; p != null; p = succ(p)) {
1275             Object item = p.item;
1276             if (p.isData) {
1277                 if (item != null && item != p && o.equals(item))
1278                     return true;
1279             }
1280             else if (item == null)
1281                 break;
1282         }
1283         return false;
1284     }
1285 
1286     /**
1287      * Always returns {@code Integer.MAX_VALUE} because a
1288      * {@code LinkedTransferQueue} is not capacity constrained.
1289      *
1290      * @return {@code Integer.MAX_VALUE} (as specified by
1291      *         {@link BlockingQueue#remainingCapacity()})

1292      */
1293     public int remainingCapacity() {
1294         return Integer.MAX_VALUE;
1295     }
1296 
1297     /**
1298      * Saves the state to a stream (that is, serializes it).
1299      *
1300      * @serialData All of the elements (each an {@code E}) in
1301      * the proper order, followed by a null
1302      * @param s the stream
1303      */
1304     private void writeObject(java.io.ObjectOutputStream s)
1305         throws java.io.IOException {
1306         s.defaultWriteObject();
1307         for (E e : this)
1308             s.writeObject(e);
1309         // Use trailing null as sentinel
1310         s.writeObject(null);
1311     }


1320         throws java.io.IOException, ClassNotFoundException {
1321         s.defaultReadObject();
1322         for (;;) {
1323             @SuppressWarnings("unchecked") E item = (E) s.readObject();
1324             if (item == null)
1325                 break;
1326             else
1327                 offer(item);
1328         }
1329     }
1330 
1331     // Unsafe mechanics
1332 
1333     private static final sun.misc.Unsafe UNSAFE;
1334     private static final long headOffset;
1335     private static final long tailOffset;
1336     private static final long sweepVotesOffset;
1337     static {
1338         try {
1339             UNSAFE = sun.misc.Unsafe.getUnsafe();
1340             Class k = LinkedTransferQueue.class;
1341             headOffset = UNSAFE.objectFieldOffset
1342                 (k.getDeclaredField("head"));
1343             tailOffset = UNSAFE.objectFieldOffset
1344                 (k.getDeclaredField("tail"));
1345             sweepVotesOffset = UNSAFE.objectFieldOffset
1346                 (k.getDeclaredField("sweepVotes"));
1347         } catch (Exception e) {
1348             throw new Error(e);
1349         }
1350     }
1351 }


 313      *    by continuing the traversal at current head.
 314      *
 315      *    On successful append, if the call was ASYNC, return.
 316      *
 317      * 3. Await match or cancellation (method awaitMatch)
 318      *
 319      *    Wait for another thread to match node; instead cancelling if
 320      *    the current thread was interrupted or the wait timed out. On
 321      *    multiprocessors, we use front-of-queue spinning: If a node
 322      *    appears to be the first unmatched node in the queue, it
 323      *    spins a bit before blocking. In either case, before blocking
 324      *    it tries to unsplice any nodes between the current "head"
 325      *    and the first unmatched node.
 326      *
 327      *    Front-of-queue spinning vastly improves performance of
 328      *    heavily contended queues. And so long as it is relatively
 329      *    brief and "quiet", spinning does not much impact performance
 330      *    of less-contended queues.  During spins threads check their
 331      *    interrupt status and generate a thread-local random number
 332      *    to decide to occasionally perform a Thread.yield. While
 333      *    yield has underdefined specs, we assume that it might help,
 334      *    and will not hurt, in limiting impact of spinning on busy
 335      *    systems.  We also use smaller (1/2) spins for nodes that are
 336      *    not known to be front but whose predecessors have not
 337      *    blocked -- these "chained" spins avoid artifacts of
 338      *    front-of-queue rules which otherwise lead to alternating
 339      *    nodes spinning vs blocking. Further, front threads that
 340      *    represent phase changes (from data to request node or vice
 341      *    versa) compared to their predecessors receive additional
 342      *    chained spins, reflecting longer paths typically required to
 343      *    unblock threads during phase changes.
 344      *
 345      *
 346      * ** Unlinking removed interior nodes **
 347      *
 348      * In addition to minimizing garbage retention via self-linking
 349      * described above, we also unlink removed interior nodes. These
 350      * may arise due to timed out or interrupted waits, or calls to
 351      * remove(x) or Iterator.remove.  Normally, given a node that was
 352      * at one time known to be the predecessor of some node s that is
 353      * to be removed, we can unsplice s by CASing the next field of
 354      * its predecessor if it still points to s (otherwise s must


 525         final boolean tryMatchData() {
 526             // assert isData;
 527             Object x = item;
 528             if (x != null && x != this && casItem(x, null)) {
 529                 LockSupport.unpark(waiter);
 530                 return true;
 531             }
 532             return false;
 533         }
 534 
 535         private static final long serialVersionUID = -3375979862319811754L;
 536 
 537         // Unsafe mechanics
 538         private static final sun.misc.Unsafe UNSAFE;
 539         private static final long itemOffset;
 540         private static final long nextOffset;
 541         private static final long waiterOffset;
 542         static {
 543             try {
 544                 UNSAFE = sun.misc.Unsafe.getUnsafe();
 545                 Class<?> k = Node.class;
 546                 itemOffset = UNSAFE.objectFieldOffset
 547                     (k.getDeclaredField("item"));
 548                 nextOffset = UNSAFE.objectFieldOffset
 549                     (k.getDeclaredField("next"));
 550                 waiterOffset = UNSAFE.objectFieldOffset
 551                     (k.getDeclaredField("waiter"));
 552             } catch (Exception e) {
 553                 throw new Error(e);
 554             }
 555         }
 556     }
 557 
 558     /** head of the queue; null until first enqueue */
 559     transient volatile Node head;
 560 
 561     /** tail of the queue; null until first append */
 562     private transient volatile Node tail;
 563 
 564     /** The number of apparent failures to unsplice removed nodes */
 565     private transient volatile int sweepVotes;


 610         for (;;) {                            // restart on append race
 611 
 612             for (Node h = head, p = h; p != null;) { // find & match first node
 613                 boolean isData = p.isData;
 614                 Object item = p.item;
 615                 if (item != p && (item != null) == isData) { // unmatched
 616                     if (isData == haveData)   // can't match
 617                         break;
 618                     if (p.casItem(item, e)) { // match
 619                         for (Node q = p; q != h;) {
 620                             Node n = q.next;  // update by 2 unless singleton
 621                             if (head == h && casHead(h, n == null ? q : n)) {
 622                                 h.forgetNext();
 623                                 break;
 624                             }                 // advance and retry
 625                             if ((h = head)   == null ||
 626                                 (q = h.next) == null || !q.isMatched())
 627                                 break;        // unless slack < 2
 628                         }
 629                         LockSupport.unpark(p.waiter);
 630                         return LinkedTransferQueue.<E>cast(item);
 631                     }
 632                 }
 633                 Node n = p.next;
 634                 p = (p != n) ? n : (h = head); // Use head if p offlist
 635             }
 636 
 637             if (how != NOW) {                 // No matches available
 638                 if (s == null)
 639                     s = new Node(e, haveData);
 640                 Node pred = tryAppend(s, haveData);
 641                 if (pred == null)
 642                     continue retry;           // lost race vs opposite mode
 643                 if (how != ASYNC)
 644                     return awaitMatch(s, pred, e, (how == TIMED), nanos);
 645             }
 646             return e; // not waiting
 647         }
 648     }
 649 
 650     /**


 688      * @param s the waiting node
 689      * @param pred the predecessor of s, or s itself if it has no
 690      * predecessor, or null if unknown (the null case does not occur
 691      * in any current calls but may in possible future extensions)
 692      * @param e the comparison value for checking match
 693      * @param timed if true, wait only until timeout elapses
 694      * @param nanos timeout in nanosecs, used only if timed is true
 695      * @return matched item, or e if unmatched on interrupt or timeout
 696      */
 697     private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
 698         long lastTime = timed ? System.nanoTime() : 0L;
 699         Thread w = Thread.currentThread();
 700         int spins = -1; // initialized after first item and cancel checks
 701         ThreadLocalRandom randomYields = null; // bound if needed
 702 
 703         for (;;) {
 704             Object item = s.item;
 705             if (item != e) {                  // matched
 706                 // assert item != s;
 707                 s.forgetContents();           // avoid garbage
 708                 return LinkedTransferQueue.<E>cast(item);
 709             }
 710             if ((w.isInterrupted() || (timed && nanos <= 0)) &&
 711                     s.casItem(e, s)) {        // cancel
 712                 unsplice(pred, s);
 713                 return e;
 714             }
 715 
 716             if (spins < 0) {                  // establish spins at/near front
 717                 if ((spins = spinsFor(pred, s.isData)) > 0)
 718                     randomYields = ThreadLocalRandom.current();
 719             }
 720             else if (spins > 0) {             // spin
 721                 --spins;
 722                 if (randomYields.nextInt(CHAINED_SPINS) == 0)
 723                     Thread.yield();           // occasionally yield
 724             }
 725             else if (s.waiter == null) {
 726                 s.waiter = w;                 // request unpark then recheck
 727             }
 728             else if (timed) {


 769      * Returns the first unmatched node of the given mode, or null if
 770      * none.  Used by methods isEmpty, hasWaitingConsumer.
 771      */
 772     private Node firstOfMode(boolean isData) {
 773         for (Node p = head; p != null; p = succ(p)) {
 774             if (!p.isMatched())
 775                 return (p.isData == isData) ? p : null;
 776         }
 777         return null;
 778     }
 779 
 780     /**
 781      * Returns the item in the first unmatched node with isData; or
 782      * null if none.  Used by peek.
 783      */
 784     private E firstDataItem() {
 785         for (Node p = head; p != null; p = succ(p)) {
 786             Object item = p.item;
 787             if (p.isData) {
 788                 if (item != null && item != p)
 789                     return LinkedTransferQueue.<E>cast(item);
 790             }
 791             else if (item == null)
 792                 return null;
 793         }
 794         return null;
 795     }
 796 
 797     /**
 798      * Traverses and counts unmatched nodes of the given mode.
 799      * Used by methods size and getWaitingConsumerCount.
 800      */
 801     private int countOfMode(boolean data) {
 802         int count = 0;
 803         for (Node p = head; p != null; ) {
 804             if (!p.isMatched()) {
 805                 if (p.isData != data)
 806                     return 0;
 807                 if (++count == Integer.MAX_VALUE) // saturated
 808                     break;
 809             }


 991                 Object item = p.item;
 992                 if (p.isData) {
 993                     if (item != null && item != p && e.equals(item) &&
 994                         p.tryMatchData()) {
 995                         unsplice(pred, p);
 996                         return true;
 997                     }
 998                 }
 999                 else if (item == null)
1000                     break;
1001                 pred = p;
1002                 if ((p = p.next) == pred) { // stale
1003                     pred = null;
1004                     p = head;
1005                 }
1006             }
1007         }
1008         return false;
1009     }
1010 

1011     /**
1012      * Creates an initially empty {@code LinkedTransferQueue}.
1013      */
1014     public LinkedTransferQueue() {
1015     }
1016 
1017     /**
1018      * Creates a {@code LinkedTransferQueue}
1019      * initially containing the elements of the given collection,
1020      * added in traversal order of the collection's iterator.
1021      *
1022      * @param c the collection of elements to initially contain
1023      * @throws NullPointerException if the specified collection or any
1024      *         of its elements are null
1025      */
1026     public LinkedTransferQueue(Collection<? extends E> c) {
1027         this();
1028         addAll(c);
1029     }
1030 
1031     /**
1032      * Inserts the specified element at the tail of this queue.
1033      * As the queue is unbounded, this method will never block.
1034      *
1035      * @throws NullPointerException if the specified element is null
1036      */
1037     public void put(E e) {
1038         xfer(e, true, ASYNC, 0);
1039     }
1040 
1041     /**
1042      * Inserts the specified element at the tail of this queue.
1043      * As the queue is unbounded, this method will never block or
1044      * return {@code false}.
1045      *
1046      * @return {@code true} (as specified by
1047      *  {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
1048      *  BlockingQueue.offer})
1049      * @throws NullPointerException if the specified element is null
1050      */
1051     public boolean offer(E e, long timeout, TimeUnit unit) {
1052         xfer(e, true, ASYNC, 0);
1053         return true;
1054     }
1055 
1056     /**
1057      * Inserts the specified element at the tail of this queue.
1058      * As the queue is unbounded, this method will never return {@code false}.
1059      *
1060      * @return {@code true} (as specified by {@link Queue#offer})
1061      * @throws NullPointerException if the specified element is null
1062      */
1063     public boolean offer(E e) {
1064         xfer(e, true, ASYNC, 0);
1065         return true;
1066     }
1067 
1068     /**


1145         E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1146         if (e != null || !Thread.interrupted())
1147             return e;
1148         throw new InterruptedException();
1149     }
1150 
1151     public E poll() {
1152         return xfer(null, false, NOW, 0);
1153     }
1154 
1155     /**
1156      * @throws NullPointerException     {@inheritDoc}
1157      * @throws IllegalArgumentException {@inheritDoc}
1158      */
1159     public int drainTo(Collection<? super E> c) {
1160         if (c == null)
1161             throw new NullPointerException();
1162         if (c == this)
1163             throw new IllegalArgumentException();
1164         int n = 0;
1165         for (E e; (e = poll()) != null;) {

1166             c.add(e);
1167             ++n;
1168         }
1169         return n;
1170     }
1171 
1172     /**
1173      * @throws NullPointerException     {@inheritDoc}
1174      * @throws IllegalArgumentException {@inheritDoc}
1175      */
1176     public int drainTo(Collection<? super E> c, int maxElements) {
1177         if (c == null)
1178             throw new NullPointerException();
1179         if (c == this)
1180             throw new IllegalArgumentException();
1181         int n = 0;
1182         for (E e; n < maxElements && (e = poll()) != null;) {

1183             c.add(e);
1184             ++n;
1185         }
1186         return n;
1187     }
1188 
1189     /**
1190      * Returns an iterator over the elements in this queue in proper sequence.
1191      * The elements will be returned in order from first (head) to last (tail).
1192      *
1193      * <p>The returned iterator is a "weakly consistent" iterator that
1194      * will never throw {@link java.util.ConcurrentModificationException
1195      * ConcurrentModificationException}, and guarantees to traverse
1196      * elements as they existed upon construction of the iterator, and
1197      * may (but is not guaranteed to) reflect any modifications
1198      * subsequent to construction.
1199      *
1200      * @return an iterator over the elements in this queue in proper sequence
1201      */
1202     public Iterator<E> iterator() {


1269      */
1270     public boolean contains(Object o) {
1271         if (o == null) return false;
1272         for (Node p = head; p != null; p = succ(p)) {
1273             Object item = p.item;
1274             if (p.isData) {
1275                 if (item != null && item != p && o.equals(item))
1276                     return true;
1277             }
1278             else if (item == null)
1279                 break;
1280         }
1281         return false;
1282     }
1283 
1284     /**
1285      * Always returns {@code Integer.MAX_VALUE} because a
1286      * {@code LinkedTransferQueue} is not capacity constrained.
1287      *
1288      * @return {@code Integer.MAX_VALUE} (as specified by
1289      *         {@link java.util.concurrent.BlockingQueue#remainingCapacity()
1290      *         BlockingQueue.remainingCapacity})
1291      */
1292     public int remainingCapacity() {
1293         return Integer.MAX_VALUE;
1294     }
1295 
1296     /**
1297      * Saves the state to a stream (that is, serializes it).
1298      *
1299      * @serialData All of the elements (each an {@code E}) in
1300      * the proper order, followed by a null
1301      * @param s the stream
1302      */
1303     private void writeObject(java.io.ObjectOutputStream s)
1304         throws java.io.IOException {
1305         s.defaultWriteObject();
1306         for (E e : this)
1307             s.writeObject(e);
1308         // Use trailing null as sentinel
1309         s.writeObject(null);
1310     }


1319         throws java.io.IOException, ClassNotFoundException {
1320         s.defaultReadObject();
1321         for (;;) {
1322             @SuppressWarnings("unchecked") E item = (E) s.readObject();
1323             if (item == null)
1324                 break;
1325             else
1326                 offer(item);
1327         }
1328     }
1329 
1330     // Unsafe mechanics
1331 
1332     private static final sun.misc.Unsafe UNSAFE;
1333     private static final long headOffset;
1334     private static final long tailOffset;
1335     private static final long sweepVotesOffset;
1336     static {
1337         try {
1338             UNSAFE = sun.misc.Unsafe.getUnsafe();
1339             Class<?> k = LinkedTransferQueue.class;
1340             headOffset = UNSAFE.objectFieldOffset
1341                 (k.getDeclaredField("head"));
1342             tailOffset = UNSAFE.objectFieldOffset
1343                 (k.getDeclaredField("tail"));
1344             sweepVotesOffset = UNSAFE.objectFieldOffset
1345                 (k.getDeclaredField("sweepVotes"));
1346         } catch (Exception e) {
1347             throw new Error(e);
1348         }
1349     }
1350 }