rev 12972 : 8140606: Update library code to use internal Unsafe
Reviewed-by: duke

   1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea, Bill Scherer, and Michael Scott with
  32  * assistance from members of JCP JSR-166 Expert Group and released to
  33  * the public domain, as explained at
  34  * http://creativecommons.org/publicdomain/zero/1.0/
  35  */
  36 
  37 package java.util.concurrent;
  38 
  39 import java.util.AbstractQueue;
  40 import java.util.Collection;
  41 import java.util.Collections;
  42 import java.util.Iterator;
  43 import java.util.Spliterator;
  44 import java.util.Spliterators;
  45 import java.util.concurrent.locks.LockSupport;
  46 import java.util.concurrent.locks.ReentrantLock;
  47 
  48 /**
  49  * A {@linkplain BlockingQueue blocking queue} in which each insert
  50  * operation must wait for a corresponding remove operation by another
  51  * thread, and vice versa.  A synchronous queue does not have any
  52  * internal capacity, not even a capacity of one.  You cannot
  53  * {@code peek} at a synchronous queue because an element is only
  54  * present when you try to remove it; you cannot insert an element
  55  * (using any method) unless another thread is trying to remove it;
  56  * you cannot iterate as there is nothing to iterate.  The
  57  * <em>head</em> of the queue is the element that the first queued
  58  * inserting thread is trying to add to the queue; if there is no such
  59  * queued thread then no element is available for removal and
  60  * {@code poll()} will return {@code null}.  For purposes of other
  61  * {@code Collection} methods (for example {@code contains}), a
  62  * {@code SynchronousQueue} acts as an empty collection.  This queue
  63  * does not permit {@code null} elements.
  64  *
  65  * <p>Synchronous queues are similar to rendezvous channels used in
  66  * CSP and Ada. They are well suited for handoff designs, in which an
  67  * object running in one thread must sync up with an object running
  68  * in another thread in order to hand it some information, event, or
  69  * task.
  70  *
  71  * <p>This class supports an optional fairness policy for ordering
  72  * waiting producer and consumer threads.  By default, this ordering
  73  * is not guaranteed. However, a queue constructed with fairness set
  74  * to {@code true} grants threads access in FIFO order.
  75  *
  76  * <p>This class and its iterator implement all of the
  77  * <em>optional</em> methods of the {@link Collection} and {@link
  78  * Iterator} interfaces.
  79  *
  80  * <p>This class is a member of the
  81  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  82  * Java Collections Framework</a>.
  83  *
  84  * @since 1.5
  85  * @author Doug Lea and Bill Scherer and Michael Scott
  86  * @param <E> the type of elements held in this queue
  87  */
  88 public class SynchronousQueue<E> extends AbstractQueue<E>
  89     implements BlockingQueue<E>, java.io.Serializable {
  90     private static final long serialVersionUID = -3223113410248163686L;
  91 
  92     /*
  93      * This class implements extensions of the dual stack and dual
  94      * queue algorithms described in "Nonblocking Concurrent Objects
  95      * with Condition Synchronization", by W. N. Scherer III and
  96      * M. L. Scott.  18th Annual Conf. on Distributed Computing,
  97      * Oct. 2004 (see also
  98      * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html).
  99      * The (Lifo) stack is used for non-fair mode, and the (Fifo)
 100      * queue for fair mode. The performance of the two is generally
 101      * similar. Fifo usually supports higher throughput under
 102      * contention but Lifo maintains higher thread locality in common
 103      * applications.
 104      *
 105      * A dual queue (and similarly stack) is one that at any given
 106      * time either holds "data" -- items provided by put operations,
 107      * or "requests" -- slots representing take operations, or is
 108      * empty. A call to "fulfill" (i.e., a call requesting an item
 109      * from a queue holding data or vice versa) dequeues a
 110      * complementary node.  The most interesting feature of these
 111      * queues is that any operation can figure out which mode the
 112      * queue is in, and act accordingly without needing locks.
 113      *
 114      * Both the queue and stack extend abstract class Transferer
 115      * defining the single method transfer that does a put or a
 116      * take. These are unified into a single method because in dual
 117      * data structures, the put and take operations are symmetrical,
 118      * so nearly all code can be combined. The resulting transfer
 119      * methods are on the long side, but are easier to follow than
 120      * they would be if broken up into nearly-duplicated parts.
 121      *
 122      * The queue and stack data structures share many conceptual
 123      * similarities but very few concrete details. For simplicity,
 124      * they are kept distinct so that they can later evolve
 125      * separately.
 126      *
 127      * The algorithms here differ from the versions in the above paper
 128      * in extending them for use in synchronous queues, as well as
 129      * dealing with cancellation. The main differences include:
 130      *
 131      *  1. The original algorithms used bit-marked pointers, but
 132      *     the ones here use mode bits in nodes, leading to a number
 133      *     of further adaptations.
 134      *  2. SynchronousQueues must block threads waiting to become
 135      *     fulfilled.
 136      *  3. Support for cancellation via timeout and interrupts,
 137      *     including cleaning out cancelled nodes/threads
 138      *     from lists to avoid garbage retention and memory depletion.
 139      *
 140      * Blocking is mainly accomplished using LockSupport park/unpark,
 141      * except that nodes that appear to be the next ones to become
 142      * fulfilled first spin a bit (on multiprocessors only). On very
 143      * busy synchronous queues, spinning can dramatically improve
 144      * throughput. And on less busy ones, the amount of spinning is
 145      * small enough not to be noticeable.
 146      *
 147      * Cleaning is done in different ways in queues vs stacks.  For
 148      * queues, we can almost always remove a node immediately in O(1)
 149      * time (modulo retries for consistency checks) when it is
 150      * cancelled. But if it may be pinned as the current tail, it must
 151      * wait until some subsequent cancellation. For stacks, we need a
 152      * potentially O(n) traversal to be sure that we can remove the
 153      * node, but this can run concurrently with other threads
 154      * accessing the stack.
 155      *
 156      * While garbage collection takes care of most node reclamation
 157      * issues that otherwise complicate nonblocking algorithms, care
 158      * is taken to "forget" references to data, other nodes, and
 159      * threads that might be held on to long-term by blocked
 160      * threads. In cases where setting to null would otherwise
 161      * conflict with main algorithms, this is done by changing a
 162      * node's link to now point to the node itself. This doesn't arise
 163      * much for Stack nodes (because blocked threads do not hang on to
 164      * old head pointers), but references in Queue nodes must be
 165      * aggressively forgotten to avoid reachability of everything any
 166      * node has ever referred to since arrival.
 167      */
 168 
 169     /**
 170      * Shared internal API for dual stacks and queues.
 171      */
 172     abstract static class Transferer<E> {
 173         /**
 174          * Performs a put or take.
 175          *
 176          * @param e if non-null, the item to be handed to a consumer;
 177          *          if null, requests that transfer return an item
 178          *          offered by producer.
 179          * @param timed if this operation should timeout
 180          * @param nanos the timeout, in nanoseconds
 181          * @return if non-null, the item provided or received; if null,
 182          *         the operation failed due to timeout or interrupt --
 183          *         the caller can distinguish which of these occurred
 184          *         by checking Thread.interrupted.
 185          */
 186         abstract E transfer(E e, boolean timed, long nanos);
 187     }
 188 
 189     /**
 190      * The number of times to spin before blocking in timed waits.
 191      * The value is empirically derived -- it works well across a
 192      * variety of processors and OSes. Empirically, the best value
 193      * seems not to vary with number of CPUs (beyond 2) so is just
 194      * a constant.
 195      */
 196     static final int MAX_TIMED_SPINS =
 197         (Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32;
 198 
 199     /**
 200      * The number of times to spin before blocking in untimed waits.
 201      * This is greater than timed value because untimed waits spin
 202      * faster since they don't need to check times on each spin.
 203      */
 204     static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16;
 205 
 206     /**
 207      * The number of nanoseconds for which it is faster to spin
 208      * rather than to use timed park. A rough estimate suffices.
 209      */
 210     static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
 211 
 212     /** Dual stack */
 213     static final class TransferStack<E> extends Transferer<E> {
 214         /*
 215          * This extends Scherer-Scott dual stack algorithm, differing,
 216          * among other ways, by using "covering" nodes rather than
 217          * bit-marked pointers: Fulfilling operations push on marker
 218          * nodes (with FULFILLING bit set in mode) to reserve a spot
 219          * to match a waiting node.
 220          */
 221 
 222         /* Modes for SNodes, ORed together in node fields */
 223         /** Node represents an unfulfilled consumer */
 224         static final int REQUEST    = 0;
 225         /** Node represents an unfulfilled producer */
 226         static final int DATA       = 1;
 227         /** Node is fulfilling another unfulfilled DATA or REQUEST */
 228         static final int FULFILLING = 2;
 229 
 230         /** Returns true if m has fulfilling bit set. */
 231         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
 232 
 233         /** Node class for TransferStacks. */
 234         static final class SNode {
 235             volatile SNode next;        // next node in stack
 236             volatile SNode match;       // the node matched to this
 237             volatile Thread waiter;     // to control park/unpark
 238             Object item;                // data; or null for REQUESTs
 239             int mode;
 240             // Note: item and mode fields don't need to be volatile
 241             // since they are always written before, and read after,
 242             // other volatile/atomic operations.
 243 
 244             SNode(Object item) {
 245                 this.item = item;
 246             }
 247 
 248             boolean casNext(SNode cmp, SNode val) {
 249                 return cmp == next &&
 250                     U.compareAndSwapObject(this, NEXT, cmp, val);
 251             }
 252 
 253             /**
 254              * Tries to match node s to this node, if so, waking up thread.
 255              * Fulfillers call tryMatch to identify their waiters.
 256              * Waiters block until they have been matched.
 257              *
 258              * @param s the node to match
 259              * @return true if successfully matched to s
 260              */
 261             boolean tryMatch(SNode s) {
 262                 if (match == null &&
 263                     U.compareAndSwapObject(this, MATCH, null, s)) {
 264                     Thread w = waiter;
 265                     if (w != null) {    // waiters need at most one unpark
 266                         waiter = null;
 267                         LockSupport.unpark(w);
 268                     }
 269                     return true;
 270                 }
 271                 return match == s;
 272             }
 273 
 274             /**
 275              * Tries to cancel a wait by matching node to itself.
 276              */
 277             void tryCancel() {
 278                 U.compareAndSwapObject(this, MATCH, null, this);
 279             }
 280 
 281             boolean isCancelled() {
 282                 return match == this;
 283             }
 284 
 285             // Unsafe mechanics
 286             private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
 287             private static final long MATCH;
 288             private static final long NEXT;
 289 
 290             static {
 291                 try {
 292                     MATCH = U.objectFieldOffset
 293                         (SNode.class.getDeclaredField("match"));
 294                     NEXT = U.objectFieldOffset
 295                         (SNode.class.getDeclaredField("next"));
 296                 } catch (ReflectiveOperationException e) {
 297                     throw new Error(e);
 298                 }
 299             }
 300         }
 301 
 302         /** The head (top) of the stack */
 303         volatile SNode head;
 304 
 305         boolean casHead(SNode h, SNode nh) {
 306             return h == head &&
 307                 U.compareAndSwapObject(this, HEAD, h, nh);
 308         }
 309 
 310         /**
 311          * Creates or resets fields of a node. Called only from transfer
 312          * where the node to push on stack is lazily created and
 313          * reused when possible to help reduce intervals between reads
 314          * and CASes of head and to avoid surges of garbage when CASes
 315          * to push nodes fail due to contention.
 316          */
 317         static SNode snode(SNode s, Object e, SNode next, int mode) {
 318             if (s == null) s = new SNode(e);
 319             s.mode = mode;
 320             s.next = next;
 321             return s;
 322         }
 323 
 324         /**
 325          * Puts or takes an item.
 326          */
 327         @SuppressWarnings("unchecked")
 328         E transfer(E e, boolean timed, long nanos) {
 329             /*
 330              * Basic algorithm is to loop trying one of three actions:
 331              *
 332              * 1. If apparently empty or already containing nodes of same
 333              *    mode, try to push node on stack and wait for a match,
 334              *    returning it, or null if cancelled.
 335              *
 336              * 2. If apparently containing node of complementary mode,
 337              *    try to push a fulfilling node on to stack, match
 338              *    with corresponding waiting node, pop both from
 339              *    stack, and return matched item. The matching or
 340              *    unlinking might not actually be necessary because of
 341              *    other threads performing action 3:
 342              *
 343              * 3. If top of stack already holds another fulfilling node,
 344              *    help it out by doing its match and/or pop
 345              *    operations, and then continue. The code for helping
 346              *    is essentially the same as for fulfilling, except
 347              *    that it doesn't return the item.
 348              */
 349 
 350             SNode s = null; // constructed/reused as needed
 351             int mode = (e == null) ? REQUEST : DATA;
 352 
 353             for (;;) {
 354                 SNode h = head;
 355                 if (h == null || h.mode == mode) {  // empty or same-mode
 356                     if (timed && nanos <= 0L) {     // can't wait
 357                         if (h != null && h.isCancelled())
 358                             casHead(h, h.next);     // pop cancelled node
 359                         else
 360                             return null;
 361                     } else if (casHead(h, s = snode(s, e, h, mode))) {
 362                         SNode m = awaitFulfill(s, timed, nanos);
 363                         if (m == s) {               // wait was cancelled
 364                             clean(s);
 365                             return null;
 366                         }
 367                         if ((h = head) != null && h.next == s)
 368                             casHead(h, s.next);     // help s's fulfiller
 369                         return (E) ((mode == REQUEST) ? m.item : s.item);
 370                     }
 371                 } else if (!isFulfilling(h.mode)) { // try to fulfill
 372                     if (h.isCancelled())            // already cancelled
 373                         casHead(h, h.next);         // pop and retry
 374                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
 375                         for (;;) { // loop until matched or waiters disappear
 376                             SNode m = s.next;       // m is s's match
 377                             if (m == null) {        // all waiters are gone
 378                                 casHead(s, null);   // pop fulfill node
 379                                 s = null;           // use new node next time
 380                                 break;              // restart main loop
 381                             }
 382                             SNode mn = m.next;
 383                             if (m.tryMatch(s)) {
 384                                 casHead(s, mn);     // pop both s and m
 385                                 return (E) ((mode == REQUEST) ? m.item : s.item);
 386                             } else                  // lost match
 387                                 s.casNext(m, mn);   // help unlink
 388                         }
 389                     }
 390                 } else {                            // help a fulfiller
 391                     SNode m = h.next;               // m is h's match
 392                     if (m == null)                  // waiter is gone
 393                         casHead(h, null);           // pop fulfilling node
 394                     else {
 395                         SNode mn = m.next;
 396                         if (m.tryMatch(h))          // help match
 397                             casHead(h, mn);         // pop both h and m
 398                         else                        // lost match
 399                             h.casNext(m, mn);       // help unlink
 400                     }
 401                 }
 402             }
 403         }
 404 
 405         /**
 406          * Spins/blocks until node s is matched by a fulfill operation.
 407          *
 408          * @param s the waiting node
 409          * @param timed true if timed wait
 410          * @param nanos timeout value
 411          * @return matched node, or s if cancelled
 412          */
 413         SNode awaitFulfill(SNode s, boolean timed, long nanos) {
 414             /*
 415              * When a node/thread is about to block, it sets its waiter
 416              * field and then rechecks state at least one more time
 417              * before actually parking, thus covering race vs
 418              * fulfiller noticing that waiter is non-null so should be
 419              * woken.
 420              *
 421              * When invoked by nodes that appear at the point of call
 422              * to be at the head of the stack, calls to park are
 423              * preceded by spins to avoid blocking when producers and
 424              * consumers are arriving very close in time.  This can
 425              * happen enough to bother only on multiprocessors.
 426              *
 427              * The order of checks for returning out of main loop
 428              * reflects fact that interrupts have precedence over
 429              * normal returns, which have precedence over
 430              * timeouts. (So, on timeout, one last check for match is
 431              * done before giving up.) Except that calls from untimed
 432              * SynchronousQueue.{poll/offer} don't check interrupts
 433              * and don't wait at all, so are trapped in transfer
 434              * method rather than calling awaitFulfill.
 435              */
 436             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 437             Thread w = Thread.currentThread();
 438             int spins = shouldSpin(s)
 439                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
 440                 : 0;
 441             for (;;) {
 442                 if (w.isInterrupted())
 443                     s.tryCancel();
 444                 SNode m = s.match;
 445                 if (m != null)
 446                     return m;
 447                 if (timed) {
 448                     nanos = deadline - System.nanoTime();
 449                     if (nanos <= 0L) {
 450                         s.tryCancel();
 451                         continue;
 452                     }
 453                 }
 454                 if (spins > 0)
 455                     spins = shouldSpin(s) ? (spins - 1) : 0;
 456                 else if (s.waiter == null)
 457                     s.waiter = w; // establish waiter so can park next iter
 458                 else if (!timed)
 459                     LockSupport.park(this);
 460                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
 461                     LockSupport.parkNanos(this, nanos);
 462             }
 463         }
 464 
 465         /**
 466          * Returns true if node s is at head or there is an active
 467          * fulfiller.
 468          */
 469         boolean shouldSpin(SNode s) {
 470             SNode h = head;
 471             return (h == s || h == null || isFulfilling(h.mode));
 472         }
 473 
 474         /**
 475          * Unlinks s from the stack.
 476          */
 477         void clean(SNode s) {
 478             s.item = null;   // forget item
 479             s.waiter = null; // forget thread
 480 
 481             /*
 482              * At worst we may need to traverse entire stack to unlink
 483              * s. If there are multiple concurrent calls to clean, we
 484              * might not see s if another thread has already removed
 485              * it. But we can stop when we see any node known to
 486              * follow s. We use s.next unless it too is cancelled, in
 487              * which case we try the node one past. We don't check any
 488              * further because we don't want to doubly traverse just to
 489              * find sentinel.
 490              */
 491 
 492             SNode past = s.next;
 493             if (past != null && past.isCancelled())
 494                 past = past.next;
 495 
 496             // Absorb cancelled nodes at head
 497             SNode p;
 498             while ((p = head) != null && p != past && p.isCancelled())
 499                 casHead(p, p.next);
 500 
 501             // Unsplice embedded nodes
 502             while (p != null && p != past) {
 503                 SNode n = p.next;
 504                 if (n != null && n.isCancelled())
 505                     p.casNext(n, n.next);
 506                 else
 507                     p = n;
 508             }
 509         }
 510 
 511         // Unsafe mechanics
 512         private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
 513         private static final long HEAD;
 514         static {
 515             try {
 516                 HEAD = U.objectFieldOffset
 517                     (TransferStack.class.getDeclaredField("head"));
 518             } catch (ReflectiveOperationException e) {
 519                 throw new Error(e);
 520             }
 521         }
 522     }
 523 
 524     /** Dual Queue */
 525     static final class TransferQueue<E> extends Transferer<E> {
 526         /*
 527          * This extends Scherer-Scott dual queue algorithm, differing,
 528          * among other ways, by using modes within nodes rather than
 529          * marked pointers. The algorithm is a little simpler than
 530          * that for stacks because fulfillers do not need explicit
 531          * nodes, and matching is done by CAS'ing QNode.item field
 532          * from non-null to null (for put) or vice versa (for take).
 533          */
 534 
 535         /** Node class for TransferQueue. */
 536         static final class QNode {
 537             volatile QNode next;          // next node in queue
 538             volatile Object item;         // CAS'ed to or from null
 539             volatile Thread waiter;       // to control park/unpark
 540             final boolean isData;
 541 
 542             QNode(Object item, boolean isData) {
 543                 this.item = item;
 544                 this.isData = isData;
 545             }
 546 
 547             boolean casNext(QNode cmp, QNode val) {
 548                 return next == cmp &&
 549                     U.compareAndSwapObject(this, NEXT, cmp, val);
 550             }
 551 
 552             boolean casItem(Object cmp, Object val) {
 553                 return item == cmp &&
 554                     U.compareAndSwapObject(this, ITEM, cmp, val);
 555             }
 556 
 557             /**
 558              * Tries to cancel by CAS'ing ref to this as item.
 559              */
 560             void tryCancel(Object cmp) {
 561                 U.compareAndSwapObject(this, ITEM, cmp, this);
 562             }
 563 
 564             boolean isCancelled() {
 565                 return item == this;
 566             }
 567 
 568             /**
 569              * Returns true if this node is known to be off the queue
 570              * because its next pointer has been forgotten due to
 571              * an advanceHead operation.
 572              */
 573             boolean isOffList() {
 574                 return next == this;
 575             }
 576 
 577             // Unsafe mechanics
 578             private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
 579             private static final long ITEM;
 580             private static final long NEXT;
 581 
 582             static {
 583                 try {
 584                     ITEM = U.objectFieldOffset
 585                         (QNode.class.getDeclaredField("item"));
 586                     NEXT = U.objectFieldOffset
 587                         (QNode.class.getDeclaredField("next"));
 588                 } catch (ReflectiveOperationException e) {
 589                     throw new Error(e);
 590                 }
 591             }
 592         }
 593 
 594         /** Head of queue */
 595         transient volatile QNode head;
 596         /** Tail of queue */
 597         transient volatile QNode tail;
 598         /**
 599          * Reference to a cancelled node that might not yet have been
 600          * unlinked from queue because it was the last inserted node
 601          * when it was cancelled.
 602          */
 603         transient volatile QNode cleanMe;
 604 
 605         TransferQueue() {
 606             QNode h = new QNode(null, false); // initialize to dummy node.
 607             head = h;
 608             tail = h;
 609         }
 610 
 611         /**
 612          * Tries to cas nh as new head; if successful, unlink
 613          * old head's next node to avoid garbage retention.
 614          */
 615         void advanceHead(QNode h, QNode nh) {
 616             if (h == head &&
 617                 U.compareAndSwapObject(this, HEAD, h, nh))
 618                 h.next = h; // forget old next
 619         }
 620 
 621         /**
 622          * Tries to cas nt as new tail.
 623          */
 624         void advanceTail(QNode t, QNode nt) {
 625             if (tail == t)
 626                 U.compareAndSwapObject(this, TAIL, t, nt);
 627         }
 628 
 629         /**
 630          * Tries to CAS cleanMe slot.
 631          */
 632         boolean casCleanMe(QNode cmp, QNode val) {
 633             return cleanMe == cmp &&
 634                 U.compareAndSwapObject(this, CLEANME, cmp, val);
 635         }
 636 
 637         /**
 638          * Puts or takes an item.
 639          */
 640         @SuppressWarnings("unchecked")
 641         E transfer(E e, boolean timed, long nanos) {
 642             /* Basic algorithm is to loop trying to take either of
 643              * two actions:
 644              *
 645              * 1. If queue apparently empty or holding same-mode nodes,
 646              *    try to add node to queue of waiters, wait to be
 647              *    fulfilled (or cancelled) and return matching item.
 648              *
 649              * 2. If queue apparently contains waiting items, and this
 650              *    call is of complementary mode, try to fulfill by CAS'ing
 651              *    item field of waiting node and dequeuing it, and then
 652              *    returning matching item.
 653              *
 654              * In each case, along the way, check for and try to help
 655              * advance head and tail on behalf of other stalled/slow
 656              * threads.
 657              *
 658              * The loop starts off with a null check guarding against
 659              * seeing uninitialized head or tail values. This never
 660              * happens in current SynchronousQueue, but could if
 661              * callers held non-volatile/final ref to the
 662              * transferer. The check is here anyway because it places
 663              * null checks at top of loop, which is usually faster
 664              * than having them implicitly interspersed.
 665              */
 666 
 667             QNode s = null; // constructed/reused as needed
 668             boolean isData = (e != null);
 669 
 670             for (;;) {
 671                 QNode t = tail;
 672                 QNode h = head;
 673                 if (t == null || h == null)         // saw uninitialized value
 674                     continue;                       // spin
 675 
 676                 if (h == t || t.isData == isData) { // empty or same-mode
 677                     QNode tn = t.next;
 678                     if (t != tail)                  // inconsistent read
 679                         continue;
 680                     if (tn != null) {               // lagging tail
 681                         advanceTail(t, tn);
 682                         continue;
 683                     }
 684                     if (timed && nanos <= 0L)       // can't wait
 685                         return null;
 686                     if (s == null)
 687                         s = new QNode(e, isData);
 688                     if (!t.casNext(null, s))        // failed to link in
 689                         continue;
 690 
 691                     advanceTail(t, s);              // swing tail and wait
 692                     Object x = awaitFulfill(s, e, timed, nanos);
 693                     if (x == s) {                   // wait was cancelled
 694                         clean(t, s);
 695                         return null;
 696                     }
 697 
 698                     if (!s.isOffList()) {           // not already unlinked
 699                         advanceHead(t, s);          // unlink if head
 700                         if (x != null)              // and forget fields
 701                             s.item = s;
 702                         s.waiter = null;
 703                     }
 704                     return (x != null) ? (E)x : e;
 705 
 706                 } else {                            // complementary-mode
 707                     QNode m = h.next;               // node to fulfill
 708                     if (t != tail || m == null || h != head)
 709                         continue;                   // inconsistent read
 710 
 711                     Object x = m.item;
 712                     if (isData == (x != null) ||    // m already fulfilled
 713                         x == m ||                   // m cancelled
 714                         !m.casItem(x, e)) {         // lost CAS
 715                         advanceHead(h, m);          // dequeue and retry
 716                         continue;
 717                     }
 718 
 719                     advanceHead(h, m);              // successfully fulfilled
 720                     LockSupport.unpark(m.waiter);
 721                     return (x != null) ? (E)x : e;
 722                 }
 723             }
 724         }
 725 
 726         /**
 727          * Spins/blocks until node s is fulfilled.
 728          *
 729          * @param s the waiting node
 730          * @param e the comparison value for checking match
 731          * @param timed true if timed wait
 732          * @param nanos timeout value
 733          * @return matched item, or s if cancelled
 734          */
 735         Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
 736             /* Same idea as TransferStack.awaitFulfill */
 737             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 738             Thread w = Thread.currentThread();
 739             int spins = (head.next == s)
 740                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
 741                 : 0;
 742             for (;;) {
 743                 if (w.isInterrupted())
 744                     s.tryCancel(e);
 745                 Object x = s.item;
 746                 if (x != e)
 747                     return x;
 748                 if (timed) {
 749                     nanos = deadline - System.nanoTime();
 750                     if (nanos <= 0L) {
 751                         s.tryCancel(e);
 752                         continue;
 753                     }
 754                 }
 755                 if (spins > 0)
 756                     --spins;
 757                 else if (s.waiter == null)
 758                     s.waiter = w;
 759                 else if (!timed)
 760                     LockSupport.park(this);
 761                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
 762                     LockSupport.parkNanos(this, nanos);
 763             }
 764         }
 765 
 766         /**
 767          * Gets rid of cancelled node s with original predecessor pred.
 768          */
 769         void clean(QNode pred, QNode s) {
 770             s.waiter = null; // forget thread
 771             /*
 772              * At any given time, exactly one node on list cannot be
 773              * deleted -- the last inserted node. To accommodate this,
 774              * if we cannot delete s, we save its predecessor as
 775              * "cleanMe", deleting the previously saved version
 776              * first. At least one of node s or the node previously
 777              * saved can always be deleted, so this always terminates.
 778              */
 779             while (pred.next == s) { // Return early if already unlinked
 780                 QNode h = head;
 781                 QNode hn = h.next;   // Absorb cancelled first node as head
 782                 if (hn != null && hn.isCancelled()) {
 783                     advanceHead(h, hn);
 784                     continue;
 785                 }
 786                 QNode t = tail;      // Ensure consistent read for tail
 787                 if (t == h)
 788                     return;
 789                 QNode tn = t.next;
 790                 if (t != tail)
 791                     continue;
 792                 if (tn != null) {
 793                     advanceTail(t, tn);
 794                     continue;
 795                 }
 796                 if (s != t) {        // If not tail, try to unsplice
 797                     QNode sn = s.next;
 798                     if (sn == s || pred.casNext(s, sn))
 799                         return;
 800                 }
 801                 QNode dp = cleanMe;
 802                 if (dp != null) {    // Try unlinking previous cancelled node
 803                     QNode d = dp.next;
 804                     QNode dn;
 805                     if (d == null ||               // d is gone or
 806                         d == dp ||                 // d is off list or
 807                         !d.isCancelled() ||        // d not cancelled or
 808                         (d != t &&                 // d not tail and
 809                          (dn = d.next) != null &&  //   has successor
 810                          dn != d &&                //   that is on list
 811                          dp.casNext(d, dn)))       // d unspliced
 812                         casCleanMe(dp, null);
 813                     if (dp == pred)
 814                         return;      // s is already saved node
 815                 } else if (casCleanMe(null, pred))
 816                     return;          // Postpone cleaning s
 817             }
 818         }
 819 
 820         private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
 821         private static final long HEAD;
 822         private static final long TAIL;
 823         private static final long CLEANME;
 824         static {
 825             try {
 826                 HEAD = U.objectFieldOffset
 827                     (TransferQueue.class.getDeclaredField("head"));
 828                 TAIL = U.objectFieldOffset
 829                     (TransferQueue.class.getDeclaredField("tail"));
 830                 CLEANME = U.objectFieldOffset
 831                     (TransferQueue.class.getDeclaredField("cleanMe"));
 832             } catch (ReflectiveOperationException e) {
 833                 throw new Error(e);
 834             }
 835         }
 836     }
 837 
 838     /**
 839      * The transferer. Set only in constructor, but cannot be declared
 840      * as final without further complicating serialization.  Since
 841      * this is accessed only at most once per public method, there
 842      * isn't a noticeable performance penalty for using volatile
 843      * instead of final here.
 844      */
 845     private transient volatile Transferer<E> transferer;
 846 
 847     /**
 848      * Creates a {@code SynchronousQueue} with nonfair access policy.
 849      */
 850     public SynchronousQueue() {
 851         this(false);
 852     }
 853 
 854     /**
 855      * Creates a {@code SynchronousQueue} with the specified fairness policy.
 856      *
 857      * @param fair if true, waiting threads contend in FIFO order for
 858      *        access; otherwise the order is unspecified.
 859      */
 860     public SynchronousQueue(boolean fair) {
 861         transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
 862     }
 863 
 864     /**
 865      * Adds the specified element to this queue, waiting if necessary for
 866      * another thread to receive it.
 867      *
 868      * @throws InterruptedException {@inheritDoc}
 869      * @throws NullPointerException {@inheritDoc}
 870      */
 871     public void put(E e) throws InterruptedException {
 872         if (e == null) throw new NullPointerException();
 873         if (transferer.transfer(e, false, 0) == null) {
 874             Thread.interrupted();
 875             throw new InterruptedException();
 876         }
 877     }
 878 
 879     /**
 880      * Inserts the specified element into this queue, waiting if necessary
 881      * up to the specified wait time for another thread to receive it.
 882      *
 883      * @return {@code true} if successful, or {@code false} if the
 884      *         specified waiting time elapses before a consumer appears
 885      * @throws InterruptedException {@inheritDoc}
 886      * @throws NullPointerException {@inheritDoc}
 887      */
 888     public boolean offer(E e, long timeout, TimeUnit unit)
 889         throws InterruptedException {
 890         if (e == null) throw new NullPointerException();
 891         if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
 892             return true;
 893         if (!Thread.interrupted())
 894             return false;
 895         throw new InterruptedException();
 896     }
 897 
 898     /**
 899      * Inserts the specified element into this queue, if another thread is
 900      * waiting to receive it.
 901      *
 902      * @param e the element to add
 903      * @return {@code true} if the element was added to this queue, else
 904      *         {@code false}
 905      * @throws NullPointerException if the specified element is null
 906      */
 907     public boolean offer(E e) {
 908         if (e == null) throw new NullPointerException();
 909         return transferer.transfer(e, true, 0) != null;
 910     }
 911 
 912     /**
 913      * Retrieves and removes the head of this queue, waiting if necessary
 914      * for another thread to insert it.
 915      *
 916      * @return the head of this queue
 917      * @throws InterruptedException {@inheritDoc}
 918      */
 919     public E take() throws InterruptedException {
 920         E e = transferer.transfer(null, false, 0);
 921         if (e != null)
 922             return e;
 923         Thread.interrupted();
 924         throw new InterruptedException();
 925     }
 926 
 927     /**
 928      * Retrieves and removes the head of this queue, waiting
 929      * if necessary up to the specified wait time, for another thread
 930      * to insert it.
 931      *
 932      * @return the head of this queue, or {@code null} if the
 933      *         specified waiting time elapses before an element is present
 934      * @throws InterruptedException {@inheritDoc}
 935      */
 936     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 937         E e = transferer.transfer(null, true, unit.toNanos(timeout));
 938         if (e != null || !Thread.interrupted())
 939             return e;
 940         throw new InterruptedException();
 941     }
 942 
 943     /**
 944      * Retrieves and removes the head of this queue, if another thread
 945      * is currently making an element available.
 946      *
 947      * @return the head of this queue, or {@code null} if no
 948      *         element is available
 949      */
 950     public E poll() {
 951         return transferer.transfer(null, true, 0);
 952     }
 953 
 954     /**
 955      * Always returns {@code true}.
 956      * A {@code SynchronousQueue} has no internal capacity.
 957      *
 958      * @return {@code true}
 959      */
 960     public boolean isEmpty() {
 961         return true;
 962     }
 963 
 964     /**
 965      * Always returns zero.
 966      * A {@code SynchronousQueue} has no internal capacity.
 967      *
 968      * @return zero
 969      */
 970     public int size() {
 971         return 0;
 972     }
 973 
 974     /**
 975      * Always returns zero.
 976      * A {@code SynchronousQueue} has no internal capacity.
 977      *
 978      * @return zero
 979      */
 980     public int remainingCapacity() {
 981         return 0;
 982     }
 983 
 984     /**
 985      * Does nothing.
 986      * A {@code SynchronousQueue} has no internal capacity.
 987      */
 988     public void clear() {
 989     }
 990 
 991     /**
 992      * Always returns {@code false}.
 993      * A {@code SynchronousQueue} has no internal capacity.
 994      *
 995      * @param o the element
 996      * @return {@code false}
 997      */
 998     public boolean contains(Object o) {
 999         return false;
1000     }
1001 
1002     /**
1003      * Always returns {@code false}.
1004      * A {@code SynchronousQueue} has no internal capacity.
1005      *
1006      * @param o the element to remove
1007      * @return {@code false}
1008      */
1009     public boolean remove(Object o) {
1010         return false;
1011     }
1012 
1013     /**
1014      * Returns {@code false} unless the given collection is empty.
1015      * A {@code SynchronousQueue} has no internal capacity.
1016      *
1017      * @param c the collection
1018      * @return {@code false} unless given collection is empty
1019      */
1020     public boolean containsAll(Collection<?> c) {
1021         return c.isEmpty();
1022     }
1023 
1024     /**
1025      * Always returns {@code false}.
1026      * A {@code SynchronousQueue} has no internal capacity.
1027      *
1028      * @param c the collection
1029      * @return {@code false}
1030      */
1031     public boolean removeAll(Collection<?> c) {
1032         return false;
1033     }
1034 
1035     /**
1036      * Always returns {@code false}.
1037      * A {@code SynchronousQueue} has no internal capacity.
1038      *
1039      * @param c the collection
1040      * @return {@code false}
1041      */
1042     public boolean retainAll(Collection<?> c) {
1043         return false;
1044     }
1045 
1046     /**
1047      * Always returns {@code null}.
1048      * A {@code SynchronousQueue} does not return elements
1049      * unless actively waited on.
1050      *
1051      * @return {@code null}
1052      */
1053     public E peek() {
1054         return null;
1055     }
1056 
1057     /**
1058      * Returns an empty iterator in which {@code hasNext} always returns
1059      * {@code false}.
1060      *
1061      * @return an empty iterator
1062      */
1063     public Iterator<E> iterator() {
1064         return Collections.emptyIterator();
1065     }
1066 
1067     /**
1068      * Returns an empty spliterator in which calls to
1069      * {@link java.util.Spliterator#trySplit()} always return {@code null}.
1070      *
1071      * @return an empty spliterator
1072      * @since 1.8
1073      */
1074     public Spliterator<E> spliterator() {
1075         return Spliterators.emptySpliterator();
1076     }
1077 
1078     /**
1079      * Returns a zero-length array.
1080      * @return a zero-length array
1081      */
1082     public Object[] toArray() {
1083         return new Object[0];
1084     }
1085 
1086     /**
1087      * Sets the zeroth element of the specified array to {@code null}
1088      * (if the array has non-zero length) and returns it.
1089      *
1090      * @param a the array
1091      * @return the specified array
1092      * @throws NullPointerException if the specified array is null
1093      */
1094     public <T> T[] toArray(T[] a) {
1095         if (a.length > 0)
1096             a[0] = null;
1097         return a;
1098     }
1099 
1100     /**
1101      * Always returns {@code "[]"}.
1102      * @return {@code "[]"}
1103      */
1104     public String toString() {
1105         return "[]";
1106     }
1107 
1108     /**
1109      * @throws UnsupportedOperationException {@inheritDoc}
1110      * @throws ClassCastException            {@inheritDoc}
1111      * @throws NullPointerException          {@inheritDoc}
1112      * @throws IllegalArgumentException      {@inheritDoc}
1113      */
1114     public int drainTo(Collection<? super E> c) {
1115         if (c == null)
1116             throw new NullPointerException();
1117         if (c == this)
1118             throw new IllegalArgumentException();
1119         int n = 0;
1120         for (E e; (e = poll()) != null;) {
1121             c.add(e);
1122             ++n;
1123         }
1124         return n;
1125     }
1126 
1127     /**
1128      * @throws UnsupportedOperationException {@inheritDoc}
1129      * @throws ClassCastException            {@inheritDoc}
1130      * @throws NullPointerException          {@inheritDoc}
1131      * @throws IllegalArgumentException      {@inheritDoc}
1132      */
1133     public int drainTo(Collection<? super E> c, int maxElements) {
1134         if (c == null)
1135             throw new NullPointerException();
1136         if (c == this)
1137             throw new IllegalArgumentException();
1138         int n = 0;
1139         for (E e; n < maxElements && (e = poll()) != null;) {
1140             c.add(e);
1141             ++n;
1142         }
1143         return n;
1144     }
1145 
1146     /*
1147      * To cope with serialization strategy in the 1.5 version of
1148      * SynchronousQueue, we declare some unused classes and fields
1149      * that exist solely to enable serializability across versions.
1150      * These fields are never used, so are initialized only if this
1151      * object is ever serialized or deserialized.
1152      */
1153 
1154     @SuppressWarnings("serial")
1155     static class WaitQueue implements java.io.Serializable { }
1156     static class LifoWaitQueue extends WaitQueue {
1157         private static final long serialVersionUID = -3633113410248163686L;
1158     }
1159     static class FifoWaitQueue extends WaitQueue {
1160         private static final long serialVersionUID = -3623113410248163686L;
1161     }
1162     private ReentrantLock qlock;
1163     private WaitQueue waitingProducers;
1164     private WaitQueue waitingConsumers;
1165 
1166     /**
1167      * Saves this queue to a stream (that is, serializes it).
1168      * @param s the stream
1169      * @throws java.io.IOException if an I/O error occurs
1170      */
1171     private void writeObject(java.io.ObjectOutputStream s)
1172         throws java.io.IOException {
1173         boolean fair = transferer instanceof TransferQueue;
1174         if (fair) {
1175             qlock = new ReentrantLock(true);
1176             waitingProducers = new FifoWaitQueue();
1177             waitingConsumers = new FifoWaitQueue();
1178         }
1179         else {
1180             qlock = new ReentrantLock();
1181             waitingProducers = new LifoWaitQueue();
1182             waitingConsumers = new LifoWaitQueue();
1183         }
1184         s.defaultWriteObject();
1185     }
1186 
1187     /**
1188      * Reconstitutes this queue from a stream (that is, deserializes it).
1189      * @param s the stream
1190      * @throws ClassNotFoundException if the class of a serialized object
1191      *         could not be found
1192      * @throws java.io.IOException if an I/O error occurs
1193      */
1194     private void readObject(java.io.ObjectInputStream s)
1195         throws java.io.IOException, ClassNotFoundException {
1196         s.defaultReadObject();
1197         if (waitingProducers instanceof FifoWaitQueue)
1198             transferer = new TransferQueue<E>();
1199         else
1200             transferer = new TransferStack<E>();
1201     }
1202 
1203     static {
1204         // Reduce the risk of rare disastrous classloading in first call to
1205         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1206         Class<?> ensureLoaded = LockSupport.class;
1207     }
1208 }
--- EOF ---