< prev index next >

src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java

Print this page
8246677: LinkedTransferQueue and SynchronousQueue synchronization updates
Reviewed-by: martin

*** 307,341 **** * CAS-advance the tail pointer by at least two hops. * * 2. Await match or cancellation (method awaitMatch) * * Wait for another thread to match node; instead cancelling if ! * the current thread was interrupted or the wait timed out. On ! * multiprocessors, we use front-of-queue spinning: If a node ! * appears to be the first unmatched node in the queue, it ! * spins a bit before blocking. In either case, before blocking ! * it tries to unsplice any nodes between the current "head" ! * and the first unmatched node. ! * ! * Front-of-queue spinning vastly improves performance of ! * heavily contended queues. And so long as it is relatively ! * brief and "quiet", spinning does not much impact performance ! * of less-contended queues. During spins threads check their ! * interrupt status and generate a thread-local random number ! * to decide to occasionally perform a Thread.yield. While ! * yield has underdefined specs, we assume that it might help, ! * and will not hurt, in limiting impact of spinning on busy ! * systems. We also use smaller (1/2) spins for nodes that are ! * not known to be front but whose predecessors have not ! * blocked -- these "chained" spins avoid artifacts of ! * front-of-queue rules which otherwise lead to alternating ! * nodes spinning vs blocking. Further, front threads that ! * represent phase changes (from data to request node or vice ! * versa) compared to their predecessors receive additional ! * chained spins, reflecting longer paths typically required to ! * unblock threads during phase changes. ! * * * ** Unlinking removed interior nodes ** * * In addition to minimizing garbage retention via self-linking * described above, we also unlink removed interior nodes. These --- 307,322 ---- * CAS-advance the tail pointer by at least two hops. * * 2. Await match or cancellation (method awaitMatch) * * Wait for another thread to match node; instead cancelling if ! * the current thread was interrupted or the wait timed out. To ! * improve performance in common single-source / single-sink ! * usages when there are more tasks that cores, an initial ! * Thread.yield is tried when there is apparently only one ! * waiter. In other cases, waiters may help with some ! * bookkeeping, then park/unpark. * * ** Unlinking removed interior nodes ** * * In addition to minimizing garbage retention via self-linking * described above, we also unlink removed interior nodes. These
*** 367,400 **** * never fall off the list because of an untimed call to take() at * the front of the queue. * * When these cases arise, rather than always retraversing the * entire list to find an actual predecessor to unlink (which ! * won't help for case (1) anyway), we record a conservative ! * estimate of possible unsplice failures (in "sweepVotes"). ! * We trigger a full sweep when the estimate exceeds a threshold ! * ("SWEEP_THRESHOLD") indicating the maximum number of estimated ! * removal failures to tolerate before sweeping through, unlinking ! * cancelled nodes that were not unlinked upon initial removal. ! * We perform sweeps by the thread hitting threshold (rather than ! * background threads or by spreading work to other threads) ! * because in the main contexts in which removal occurs, the ! * caller is timed-out or cancelled, which are not time-critical ! * enough to warrant the overhead that alternatives would impose ! * on other threads. ! * ! * Because the sweepVotes estimate is conservative, and because ! * nodes become unlinked "naturally" as they fall off the head of ! * the queue, and because we allow votes to accumulate even while ! * sweeps are in progress, there are typically significantly fewer ! * such nodes than estimated. Choice of a threshold value ! * balances the likelihood of wasted effort and contention, versus ! * providing a worst-case bound on retention of interior nodes in ! * quiescent queues. The value defined below was chosen ! * empirically to balance these under various timeout scenarios. ! * ! * Because traversal operations on the linked list of nodes are a * natural opportunity to sweep dead nodes, we generally do so, * including all the operations that might remove elements as they * traverse, such as removeIf and Iterator.remove. This largely * eliminates long chains of dead interior nodes, except from * cancelled or timed out blocking operations. --- 348,360 ---- * never fall off the list because of an untimed call to take() at * the front of the queue. * * When these cases arise, rather than always retraversing the * entire list to find an actual predecessor to unlink (which ! * won't help for case (1) anyway), we record the need to sweep the ! * next time any thread would otherwise block in awaitMatch. Also, ! * because traversal operations on the linked list of nodes are a * natural opportunity to sweep dead nodes, we generally do so, * including all the operations that might remove elements as they * traverse, such as removeIf and Iterator.remove. This largely * eliminates long chains of dead interior nodes, except from * cancelled or timed out blocking operations.
*** 403,434 **** * sweeps. However, the associated garbage chains terminate when * some successor ultimately falls off the head of the list and is * self-linked. */ - /** True if on multiprocessor */ - private static final boolean MP = - Runtime.getRuntime().availableProcessors() > 1; - - /** - * The number of times to spin (with randomly interspersed calls - * to Thread.yield) on multiprocessor before blocking when a node - * is apparently the first waiter in the queue. See above for - * explanation. Must be a power of two. The value is empirically - * derived -- it works pretty well across a variety of processors, - * numbers of CPUs, and OSes. - */ - private static final int FRONT_SPINS = 1 << 7; - /** ! * The number of times to spin before blocking when a node is ! * preceded by another node that is apparently spinning. Also ! * serves as an increment to FRONT_SPINS on phase changes, and as ! * base average frequency for yielding during spins. Must be a ! * power of two. */ ! private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; /** * The maximum number of estimated removal failures (sweepVotes) * to tolerate before sweeping through the queue unlinking * cancelled nodes that were not unlinked upon initial --- 363,378 ---- * sweeps. However, the associated garbage chains terminate when * some successor ultimately falls off the head of the list and is * self-linked. */ /** ! * The number of nanoseconds for which it is faster to spin ! * rather than to use timed park. A rough estimate suffices. ! * Using a power of two minus one simplifies some comparisons. */ ! static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L; /** * The maximum number of estimated removal failures (sweepVotes) * to tolerate before sweeping through the queue unlinking * cancelled nodes that were not unlinked upon initial
*** 440,450 **** /** * Queue nodes. Uses Object, not E, for items to allow forgetting * them after use. Writes that are intrinsically ordered wrt * other accesses or CASes use simple relaxed forms. */ ! static final class Node { final boolean isData; // false if this is a request node volatile Object item; // initially non-null if isData; CASed to match volatile Node next; volatile Thread waiter; // null when not waiting for a match --- 384,394 ---- /** * Queue nodes. Uses Object, not E, for items to allow forgetting * them after use. Writes that are intrinsically ordered wrt * other accesses or CASes use simple relaxed forms. */ ! static final class Node implements ForkJoinPool.ManagedBlocker { final boolean isData; // false if this is a request node volatile Object item; // initially non-null if isData; CASed to match volatile Node next; volatile Thread waiter; // null when not waiting for a match
*** 485,512 **** } final void appendRelaxed(Node next) { // assert next != null; // assert this.next == null; ! NEXT.set(this, next); ! } ! ! /** ! * Sets item (of a request node) to self and waiter to null, ! * to avoid garbage retention after matching or cancelling. ! * Uses relaxed writes because order is already constrained in ! * the only calling contexts: item is forgotten only after ! * volatile/atomic mechanics that extract items, and visitors ! * of request nodes only ever check whether item is null. ! * Similarly, clearing waiter follows either CAS or return ! * from park (if ever parked; else we don't care). ! */ ! final void forgetContents() { ! // assert isMatched(); ! if (!isData) ! ITEM.set(this, this); ! WAITER.set(this, null); } /** * Returns true if this node has been matched, including the * case of artificial matches due to cancellation. --- 429,439 ---- } final void appendRelaxed(Node next) { // assert next != null; // assert this.next == null; ! NEXT.setOpaque(this, next); } /** * Returns true if this node has been matched, including the * case of artificial matches due to cancellation.
*** 532,541 **** --- 459,478 ---- final boolean cannotPrecede(boolean haveData) { boolean d = isData; return d != haveData && d != (item == null); } + public final boolean isReleasable() { + return (isData == (item == null)) || + Thread.currentThread().isInterrupted(); + } + + public final boolean block() { + while (!isReleasable()) LockSupport.park(); + return true; + } + private static final long serialVersionUID = -3375979862319811754L; } /** * A node from which the first live (non-matched) node (if any)
*** 564,574 **** * - tail.next may or may not be self-linked. */ private transient volatile Node tail; /** The number of apparent failures to unsplice cancelled nodes */ ! private transient volatile int sweepVotes; private boolean casTail(Node cmp, Node val) { // assert cmp != null; // assert val != null; return TAIL.compareAndSet(this, cmp, val); --- 501,511 ---- * - tail.next may or may not be self-linked. */ private transient volatile Node tail; /** The number of apparent failures to unsplice cancelled nodes */ ! private transient volatile boolean needSweep; private boolean casTail(Node cmp, Node val) { // assert cmp != null; // assert val != null; return TAIL.compareAndSet(this, cmp, val);
*** 576,590 **** private boolean casHead(Node cmp, Node val) { return HEAD.compareAndSet(this, cmp, val); } - /** Atomic version of ++sweepVotes. */ - private int incSweepVotes() { - return (int) SWEEPVOTES.getAndAdd(this, 1) + 1; - } - /** * Tries to CAS pred.next (or head, if pred is null) from c to p. * Caller must ensure that we're not unlinking the trailing node. */ private boolean tryCasSuccessor(Node pred, Node c, Node p) { --- 513,522 ----
*** 687,766 **** } } } /** ! * Spins/yields/blocks until node s is matched or caller gives up. * * @param s the waiting node * @param pred the predecessor of s, or null if unknown (the null * case does not occur in any current calls but may in possible * future extensions) * @param e the comparison value for checking match * @param timed if true, wait only until timeout elapses * @param nanos timeout in nanosecs, used only if timed is true * @return matched item, or e if unmatched on interrupt or timeout */ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; ! Thread w = Thread.currentThread(); ! int spins = -1; // initialized after first item and cancel checks ! ThreadLocalRandom randomYields = null; // bound if needed ! ! for (;;) { ! final Object item; ! if ((item = s.item) != e) { // matched ! // assert item != s; ! s.forgetContents(); // avoid garbage ! @SuppressWarnings("unchecked") E itemE = (E) item; ! return itemE; ! } ! else if (w.isInterrupted() || (timed && nanos <= 0L)) { ! // try to cancel and unlink ! if (s.casItem(e, s.isData ? null : s)) { ! unsplice(pred, s); return e; } - // return normally if lost CAS } ! else if (spins < 0) { // establish spins at/near front ! if ((spins = spinsFor(pred, s.isData)) > 0) ! randomYields = ThreadLocalRandom.current(); } ! else if (spins > 0) { // spin ! --spins; ! if (randomYields.nextInt(CHAINED_SPINS) == 0) ! Thread.yield(); // occasionally yield } ! else if (s.waiter == null) { ! s.waiter = w; // request unpark then recheck } ! else if (timed) { ! nanos = deadline - System.nanoTime(); ! if (nanos > 0L) ! LockSupport.parkNanos(this, nanos); } else { ! LockSupport.park(this); ! } ! } } - - /** - * Returns spin/yield value for a node with given predecessor and - * data mode. See above for explanation. - */ - private static int spinsFor(Node pred, boolean haveData) { - if (MP && pred != null) { - if (pred.isData != haveData) // phase change - return FRONT_SPINS + CHAINED_SPINS; - if (pred.isMatched()) // probably at front - return FRONT_SPINS; - if (pred.waiter == null) // pred apparently spinning - return CHAINED_SPINS; } ! return 0; } /* -------------- Traversal methods -------------- */ /** --- 619,688 ---- } } } /** ! * Possibly blocks until node s is matched or caller gives up. * * @param s the waiting node * @param pred the predecessor of s, or null if unknown (the null * case does not occur in any current calls but may in possible * future extensions) * @param e the comparison value for checking match * @param timed if true, wait only until timeout elapses * @param nanos timeout in nanosecs, used only if timed is true * @return matched item, or e if unmatched on interrupt or timeout */ + @SuppressWarnings("unchecked") private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { + final boolean isData = s.isData; final long deadline = timed ? System.nanoTime() + nanos : 0L; ! final Thread w = Thread.currentThread(); ! int stat = -1; // -1: may yield, +1: park, else 0 ! Object item; ! while ((item = s.item) == e) { ! if (needSweep) // help clean ! sweep(); ! else if ((timed && nanos <= 0L) || w.isInterrupted()) { ! if (s.casItem(e, (e == null) ? s : null)) { ! unsplice(pred, s); // cancelled return e; } } ! else if (stat <= 0) { ! if (pred != null && pred.next == s) { ! if (stat < 0 && ! (pred.isData != isData || pred.isMatched())) { ! stat = 0; // yield once if first ! Thread.yield(); } ! else { ! stat = 1; ! s.waiter = w; // enable unpark } ! } // else signal in progress } ! else if ((item = s.item) != e) ! break; // recheck ! else if (!timed) { ! LockSupport.setCurrentBlocker(this); ! try { ! ForkJoinPool.managedBlock(s); ! } catch (InterruptedException cannotHappen) { } ! LockSupport.setCurrentBlocker(null); } else { ! nanos = deadline - System.nanoTime(); ! if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) ! LockSupport.parkNanos(this, nanos); } } ! if (stat == 1) ! WAITER.set(s, null); ! if (!isData) ! ITEM.set(s, s); // self-link to avoid garbage ! return (E) item; } /* -------------- Traversal methods -------------- */ /**
*** 1179,1190 **** s.waiter = null; // disable signals /* * See above for rationale. Briefly: if pred still points to * s, try to unlink s. If s cannot be unlinked, because it is * trailing node or pred might be unlinked, and neither pred ! * nor s are head or offlist, add to sweepVotes, and if enough ! * votes have accumulated, sweep. */ if (pred != null && pred.next == s) { Node n = s.next; if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) { --- 1101,1111 ---- s.waiter = null; // disable signals /* * See above for rationale. Briefly: if pred still points to * s, try to unlink s. If s cannot be unlinked, because it is * trailing node or pred might be unlinked, and neither pred ! * nor s are head or offlist, set needSweep; */ if (pred != null && pred.next == s) { Node n = s.next; if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) {
*** 1198,1220 **** if (hn == null) return; // now empty if (hn != h && casHead(h, hn)) h.selfLink(); // advance head } ! // sweep every SWEEP_THRESHOLD votes ! if (pred.next != pred && s.next != s // recheck if offlist ! && (incSweepVotes() & (SWEEP_THRESHOLD - 1)) == 0) ! sweep(); } } } /** * Unlinks matched (typically cancelled) nodes encountered in a * traversal from head. */ private void sweep() { for (Node p = head, s, n; p != null && (s = p.next) != null; ) { if (!s.isMatched()) // Unmatched nodes are never self-linked p = s; else if ((n = s.next) == null) // trailing node is pinned --- 1119,1140 ---- if (hn == null) return; // now empty if (hn != h && casHead(h, hn)) h.selfLink(); // advance head } ! if (pred.next != pred && s.next != s) ! needSweep = true; } } } /** * Unlinks matched (typically cancelled) nodes encountered in a * traversal from head. */ private void sweep() { + needSweep = false; for (Node p = head, s, n; p != null && (s = p.next) != null; ) { if (!s.isMatched()) // Unmatched nodes are never self-linked p = s; else if ((n = s.next) == null) // trailing node is pinned
*** 1263,1273 **** * As the queue is unbounded, this method will never block. * * @throws NullPointerException if the specified element is null */ public void put(E e) { ! xfer(e, true, ASYNC, 0); } /** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never block or --- 1183,1193 ---- * As the queue is unbounded, this method will never block. * * @throws NullPointerException if the specified element is null */ public void put(E e) { ! xfer(e, true, ASYNC, 0L); } /** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never block or
*** 1276,1286 **** * @return {@code true} (as specified by * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e, long timeout, TimeUnit unit) { ! xfer(e, true, ASYNC, 0); return true; } /** * Inserts the specified element at the tail of this queue. --- 1196,1206 ---- * @return {@code true} (as specified by * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e, long timeout, TimeUnit unit) { ! xfer(e, true, ASYNC, 0L); return true; } /** * Inserts the specified element at the tail of this queue.
*** 1288,1298 **** * * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { ! xfer(e, true, ASYNC, 0); return true; } /** * Inserts the specified element at the tail of this queue. --- 1208,1218 ---- * * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { ! xfer(e, true, ASYNC, 0L); return true; } /** * Inserts the specified element at the tail of this queue.
*** 1301,1311 **** * * @return {@code true} (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */ public boolean add(E e) { ! xfer(e, true, ASYNC, 0); return true; } /** * Transfers the element to a waiting consumer immediately, if possible. --- 1221,1231 ---- * * @return {@code true} (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */ public boolean add(E e) { ! xfer(e, true, ASYNC, 0L); return true; } /** * Transfers the element to a waiting consumer immediately, if possible.
*** 1316,1326 **** * otherwise returning {@code false} without enqueuing the element. * * @throws NullPointerException if the specified element is null */ public boolean tryTransfer(E e) { ! return xfer(e, true, NOW, 0) == null; } /** * Transfers the element to a consumer, waiting if necessary to do so. * --- 1236,1246 ---- * otherwise returning {@code false} without enqueuing the element. * * @throws NullPointerException if the specified element is null */ public boolean tryTransfer(E e) { ! return xfer(e, true, NOW, 0L) == null; } /** * Transfers the element to a consumer, waiting if necessary to do so. *
*** 1331,1341 **** * and waits until the element is received by a consumer. * * @throws NullPointerException if the specified element is null */ public void transfer(E e) throws InterruptedException { ! if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } --- 1251,1261 ---- * and waits until the element is received by a consumer. * * @throws NullPointerException if the specified element is null */ public void transfer(E e) throws InterruptedException { ! if (xfer(e, true, SYNC, 0L) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } }
*** 1361,1371 **** return false; throw new InterruptedException(); } public E take() throws InterruptedException { ! E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } --- 1281,1291 ---- return false; throw new InterruptedException(); } public E take() throws InterruptedException { ! E e = xfer(null, false, SYNC, 0L); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
*** 1376,1386 **** return e; throw new InterruptedException(); } public E poll() { ! return xfer(null, false, NOW, 0); } /** * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} --- 1296,1306 ---- return e; throw new InterruptedException(); } public E poll() { ! return xfer(null, false, NOW, 0L); } /** * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc}
*** 1720,1742 **** } // VarHandle mechanics private static final VarHandle HEAD; private static final VarHandle TAIL; - private static final VarHandle SWEEPVOTES; static final VarHandle ITEM; static final VarHandle NEXT; static final VarHandle WAITER; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); HEAD = l.findVarHandle(LinkedTransferQueue.class, "head", Node.class); TAIL = l.findVarHandle(LinkedTransferQueue.class, "tail", Node.class); - SWEEPVOTES = l.findVarHandle(LinkedTransferQueue.class, "sweepVotes", - int.class); ITEM = l.findVarHandle(Node.class, "item", Object.class); NEXT = l.findVarHandle(Node.class, "next", Node.class); WAITER = l.findVarHandle(Node.class, "waiter", Thread.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); --- 1640,1659 ----
< prev index next >