< prev index next >

src/java.base/share/classes/java/util/concurrent/SynchronousQueue.java

Print this page
8246677: LinkedTransferQueue and SynchronousQueue synchronization updates
Reviewed-by: martin

*** 164,173 **** --- 164,185 ---- * node's link to now point to the node itself. This doesn't arise * much for Stack nodes (because blocked threads do not hang on to * old head pointers), but references in Queue nodes must be * aggressively forgotten to avoid reachability of everything any * node has ever referred to since arrival. + * + * The above steps improve throughput when many threads produce + * and/or consume data. But they don't help much with + * single-source / single-sink usages in which one side or the + * other is always transiently blocked, and so throughput is + * mainly a function of thread scheduling. This is not usually + * noticeably improved with bounded short spin-waits. Instead both + * forms of transfer try Thread.yield if apparently the sole + * waiter. This works well when there are more tasks that cores, + * which is expected to be the main usage context of this mode. In + * other cases, waiters may help with some bookkeeping, then + * park/unpark. */ /** * Shared internal API for dual stacks and queues. */
*** 187,217 **** */ abstract E transfer(E e, boolean timed, long nanos); } /** - * The number of times to spin before blocking in timed waits. - * The value is empirically derived -- it works well across a - * variety of processors and OSes. Empirically, the best value - * seems not to vary with number of CPUs (beyond 2) so is just - * a constant. - */ - static final int MAX_TIMED_SPINS = - (Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32; - - /** - * The number of times to spin before blocking in untimed waits. - * This is greater than timed value because untimed waits spin - * faster since they don't need to check times on each spin. - */ - static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16; - - /** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices. */ ! static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; /** Dual stack */ static final class TransferStack<E> extends Transferer<E> { /* * This extends Scherer-Scott dual stack algorithm, differing, --- 199,212 ---- */ abstract E transfer(E e, boolean timed, long nanos); } /** * The number of nanoseconds for which it is faster to spin * rather than to use timed park. A rough estimate suffices. */ ! static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L; /** Dual stack */ static final class TransferStack<E> extends Transferer<E> { /* * This extends Scherer-Scott dual stack algorithm, differing,
*** 231,241 **** /** Returns true if m has fulfilling bit set. */ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } /** Node class for TransferStacks. */ ! static final class SNode { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode; --- 226,236 ---- /** Returns true if m has fulfilling bit set. */ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } /** Node class for TransferStacks. */ ! static final class SNode implements ForkJoinPool.ManagedBlocker { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode;
*** 259,299 **** * * @param s the node to match * @return true if successfully matched to s */ boolean tryMatch(SNode s) { ! if (match == null && ! SMATCH.compareAndSet(this, null, s)) { ! Thread w = waiter; ! if (w != null) { // waiters need at most one unpark ! waiter = null; LockSupport.unpark(w); - } return true; } ! return match == s; } /** * Tries to cancel a wait by matching node to itself. */ ! void tryCancel() { ! SMATCH.compareAndSet(this, null, this); } boolean isCancelled() { return match == this; } // VarHandle mechanics private static final VarHandle SMATCH; private static final VarHandle SNEXT; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); SMATCH = l.findVarHandle(SNode.class, "match", SNode.class); SNEXT = l.findVarHandle(SNode.class, "next", SNode.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } } --- 254,310 ---- * * @param s the node to match * @return true if successfully matched to s */ boolean tryMatch(SNode s) { ! SNode m; Thread w; ! if ((m = match) == null) { ! if (SMATCH.compareAndSet(this, null, s)) { ! if ((w = waiter) != null) LockSupport.unpark(w); return true; } ! else ! m = match; ! } ! return m == s; } /** * Tries to cancel a wait by matching node to itself. */ ! boolean tryCancel() { ! return SMATCH.compareAndSet(this, null, this); } boolean isCancelled() { return match == this; } + public final boolean isReleasable() { + return match != null || Thread.currentThread().isInterrupted(); + } + + public final boolean block() { + while (!isReleasable()) LockSupport.park(); + return true; + } + + void forgetWaiter() { + SWAITER.setOpaque(this, null); + } + // VarHandle mechanics private static final VarHandle SMATCH; private static final VarHandle SNEXT; + private static final VarHandle SWAITER; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); SMATCH = l.findVarHandle(SNode.class, "match", SNode.class); SNEXT = l.findVarHandle(SNode.class, "next", SNode.class); + SWAITER = l.findVarHandle(SNode.class, "waiter", Thread.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } }
*** 356,373 **** if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { ! SNode m = awaitFulfill(s, timed, nanos); ! if (m == s) { // wait was cancelled ! clean(s); return null; } ! if ((h = head) != null && h.next == s) ! casHead(h, s.next); // help s's fulfiller ! return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { --- 367,413 ---- if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { ! long deadline = timed ? System.nanoTime() + nanos : 0L; ! Thread w = Thread.currentThread(); ! int stat = -1; // -1: may yield, +1: park, else 0 ! SNode m; // await fulfill or cancel ! while ((m = s.match) == null) { ! if ((timed && ! (nanos = deadline - System.nanoTime()) <= 0) || ! w.isInterrupted()) { ! if (s.tryCancel()) { ! clean(s); // wait cancelled return null; } ! } else if ((m = s.match) != null) { ! break; // recheck ! } else if (stat <= 0) { ! if (stat < 0 && h == null && head == s) { ! stat = 0; // yield once if was empty ! Thread.yield(); ! } else { ! stat = 1; ! s.waiter = w; // enable signal ! } ! } else if (!timed) { ! LockSupport.setCurrentBlocker(this); ! try { ! ForkJoinPool.managedBlock(s); ! } catch (InterruptedException cannotHappen) { } ! LockSupport.setCurrentBlocker(null); ! } else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) ! LockSupport.parkNanos(this, nanos); ! } ! if (stat == 1) ! s.forgetWaiter(); ! Object result = (mode == REQUEST) ? m.item : s.item; ! if (h != null && h.next == s) ! casHead(h, s.next); // help fulfiller ! return (E) result; } } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
*** 400,485 **** } } } /** - * Spins/blocks until node s is matched by a fulfill operation. - * - * @param s the waiting node - * @param timed true if timed wait - * @param nanos timeout value - * @return matched node, or s if cancelled - */ - SNode awaitFulfill(SNode s, boolean timed, long nanos) { - /* - * When a node/thread is about to block, it sets its waiter - * field and then rechecks state at least one more time - * before actually parking, thus covering race vs - * fulfiller noticing that waiter is non-null so should be - * woken. - * - * When invoked by nodes that appear at the point of call - * to be at the head of the stack, calls to park are - * preceded by spins to avoid blocking when producers and - * consumers are arriving very close in time. This can - * happen enough to bother only on multiprocessors. - * - * The order of checks for returning out of main loop - * reflects fact that interrupts have precedence over - * normal returns, which have precedence over - * timeouts. (So, on timeout, one last check for match is - * done before giving up.) Except that calls from untimed - * SynchronousQueue.{poll/offer} don't check interrupts - * and don't wait at all, so are trapped in transfer - * method rather than calling awaitFulfill. - */ - final long deadline = timed ? System.nanoTime() + nanos : 0L; - Thread w = Thread.currentThread(); - int spins = shouldSpin(s) - ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS) - : 0; - for (;;) { - if (w.isInterrupted()) - s.tryCancel(); - SNode m = s.match; - if (m != null) - return m; - if (timed) { - nanos = deadline - System.nanoTime(); - if (nanos <= 0L) { - s.tryCancel(); - continue; - } - } - if (spins > 0) { - Thread.onSpinWait(); - spins = shouldSpin(s) ? (spins - 1) : 0; - } - else if (s.waiter == null) - s.waiter = w; // establish waiter so can park next iter - else if (!timed) - LockSupport.park(this); - else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanos); - } - } - - /** - * Returns true if node s is at head or there is an active - * fulfiller. - */ - boolean shouldSpin(SNode s) { - SNode h = head; - return (h == s || h == null || isFulfilling(h.mode)); - } - - /** * Unlinks s from the stack. */ void clean(SNode s) { s.item = null; // forget item ! s.waiter = null; // forget thread /* * At worst we may need to traverse entire stack to unlink * s. If there are multiple concurrent calls to clean, we * might not see s if another thread has already removed --- 440,454 ---- } } } /** * Unlinks s from the stack. */ void clean(SNode s) { s.item = null; // forget item ! s.forgetWaiter(); /* * At worst we may need to traverse entire stack to unlink * s. If there are multiple concurrent calls to clean, we * might not see s if another thread has already removed
*** 531,541 **** * nodes, and matching is done by CAS'ing QNode.item field * from non-null to null (for put) or vice versa (for take). */ /** Node class for TransferQueue. */ ! static final class QNode { volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; --- 500,510 ---- * nodes, and matching is done by CAS'ing QNode.item field * from non-null to null (for put) or vice versa (for take). */ /** Node class for TransferQueue. */ ! static final class QNode implements ForkJoinPool.ManagedBlocker { volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData;
*** 555,566 **** } /** * Tries to cancel by CAS'ing ref to this as item. */ ! void tryCancel(Object cmp) { ! QITEM.compareAndSet(this, cmp, this); } boolean isCancelled() { return item == this; } --- 524,535 ---- } /** * Tries to cancel by CAS'ing ref to this as item. */ ! boolean tryCancel(Object cmp) { ! return QITEM.compareAndSet(this, cmp, this); } boolean isCancelled() { return item == this; }
*** 572,589 **** --- 541,580 ---- */ boolean isOffList() { return next == this; } + void forgetWaiter() { + QWAITER.setOpaque(this, null); + } + + boolean isFulfilled() { + Object x; + return isData == ((x = item) == null) || x == this; + } + + public final boolean isReleasable() { + Object x; + return isData == ((x = item) == null) || x == this || + Thread.currentThread().isInterrupted(); + } + + public final boolean block() { + while (!isReleasable()) LockSupport.park(); + return true; + } + // VarHandle mechanics private static final VarHandle QITEM; private static final VarHandle QNEXT; + private static final VarHandle QWAITER; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); QITEM = l.findVarHandle(QNode.class, "item", Object.class); QNEXT = l.findVarHandle(QNode.class, "next", QNode.class); + QWAITER = l.findVarHandle(QNode.class, "waiter", Thread.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } }
*** 661,774 **** * than having them implicitly interspersed. */ QNode s = null; // constructed/reused as needed boolean isData = (e != null); - for (;;) { ! QNode t = tail; ! QNode h = head; ! if (t == null || h == null) // saw uninitialized value ! continue; // spin ! ! if (h == t || t.isData == isData) { // empty or same-mode ! QNode tn = t.next; ! if (t != tail) // inconsistent read ! continue; ! if (tn != null) { // lagging tail advanceTail(t, tn); ! continue; ! } ! if (timed && nanos <= 0L) // can't wait return null; ! if (s == null) ! s = new QNode(e, isData); ! if (!t.casNext(null, s)) // failed to link in ! continue; ! ! advanceTail(t, s); // swing tail and wait ! Object x = awaitFulfill(s, e, timed, nanos); ! if (x == s) { // wait was cancelled clean(t, s); return null; } ! if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head ! if (x != null) // and forget fields s.item = s; - s.waiter = null; } ! return (x != null) ? (E)x : e; ! ! } else { // complementary-mode ! QNode m = h.next; // node to fulfill ! if (t != tail || m == null || h != head) ! continue; // inconsistent read ! ! Object x = m.item; ! if (isData == (x != null) || // m already fulfilled ! x == m || // m cancelled ! !m.casItem(x, e)) { // lost CAS ! advanceHead(h, m); // dequeue and retry ! continue; } ! advanceHead(h, m); // successfully fulfilled ! LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } } - - /** - * Spins/blocks until node s is fulfilled. - * - * @param s the waiting node - * @param e the comparison value for checking match - * @param timed true if timed wait - * @param nanos timeout value - * @return matched item, or s if cancelled - */ - Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { - /* Same idea as TransferStack.awaitFulfill */ - final long deadline = timed ? System.nanoTime() + nanos : 0L; - Thread w = Thread.currentThread(); - int spins = (head.next == s) - ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS) - : 0; - for (;;) { - if (w.isInterrupted()) - s.tryCancel(e); - Object x = s.item; - if (x != e) - return x; - if (timed) { - nanos = deadline - System.nanoTime(); - if (nanos <= 0L) { - s.tryCancel(e); - continue; - } - } - if (spins > 0) { - --spins; - Thread.onSpinWait(); - } - else if (s.waiter == null) - s.waiter = w; - else if (!timed) - LockSupport.park(this); - else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanos); - } } /** * Gets rid of cancelled node s with original predecessor pred. */ void clean(QNode pred, QNode s) { ! s.waiter = null; // forget thread /* * At any given time, exactly one node on list cannot be * deleted -- the last inserted node. To accommodate this, * if we cannot delete s, we save its predecessor as * "cleanMe", deleting the previously saved version --- 652,740 ---- * than having them implicitly interspersed. */ QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { ! QNode t = tail, h = head, m, tn; // m is node to fulfill ! if (t == null || h == null) ! ; // inconsistent ! else if (h == t || t.isData == isData) { // empty or same-mode ! if (t != tail) // inconsistent ! ; ! else if ((tn = t.next) != null) // lagging tail advanceTail(t, tn); ! else if (timed && nanos <= 0L) // can't wait return null; ! else if (t.casNext(null, (s != null) ? s : ! (s = new QNode(e, isData)))) { ! advanceTail(t, s); ! long deadline = timed ? System.nanoTime() + nanos : 0L; ! Thread w = Thread.currentThread(); ! int stat = -1; // same idea as TransferStack ! Object item; ! while ((item = s.item) == e) { ! if ((timed && ! (nanos = deadline - System.nanoTime()) <= 0) || ! w.isInterrupted()) { ! if (s.tryCancel(e)) { clean(t, s); return null; } ! } else if ((item = s.item) != e) { ! break; // recheck ! } else if (stat <= 0) { ! if (t.next == s) { ! if (stat < 0 && t.isFulfilled()) { ! stat = 0; // yield once if first ! Thread.yield(); ! } ! else { ! stat = 1; ! s.waiter = w; ! } ! } ! } else if (!timed) { ! LockSupport.setCurrentBlocker(this); ! try { ! ForkJoinPool.managedBlock(s); ! } catch (InterruptedException cannotHappen) { } ! LockSupport.setCurrentBlocker(null); ! } ! else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) ! LockSupport.parkNanos(this, nanos); ! } ! if (stat == 1) ! s.forgetWaiter(); if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head ! if (item != null) // and forget fields s.item = s; } ! return (item != null) ? (E)item : e; } ! } else if ((m = h.next) != null && t == tail && h == head) { ! Thread waiter; ! Object x = m.item; ! boolean fulfilled = ((isData == (x == null)) && ! x != m && m.casItem(x, e)); ! advanceHead(h, m); // (help) dequeue ! if (fulfilled) { ! if ((waiter = m.waiter) != null) ! LockSupport.unpark(waiter); return (x != null) ? (E)x : e; } } } } /** * Gets rid of cancelled node s with original predecessor pred. */ void clean(QNode pred, QNode s) { ! s.forgetWaiter(); /* * At any given time, exactly one node on list cannot be * deleted -- the last inserted node. To accommodate this, * if we cannot delete s, we save its predecessor as * "cleanMe", deleting the previously saved version
< prev index next >