--- old/src/java.base/share/classes/java/util/concurrent/locks/StampedLock.java 2019-09-14 11:11:06.116878624 -0700 +++ new/src/java.base/share/classes/java/util/concurrent/locks/StampedLock.java 2019-09-14 11:11:05.764878376 -0700 @@ -35,9 +35,8 @@ package java.util.concurrent.locks; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; import java.util.concurrent.TimeUnit; +import jdk.internal.misc.Unsafe; import jdk.internal.vm.annotation.ReservedStackAccess; /** @@ -132,9 +131,8 @@ * *

Memory Synchronization. Methods with the effect of * successfully locking in any mode have the same memory - * synchronization effects as a Lock action described in - * - * Chapter 17 of The Java™ Language Specification. + * synchronization effects as a Lock action, as described in + * Chapter 17 of The Java™ Language Specification. * Methods successfully unlocking in write mode have the same memory * synchronization effects as an Unlock action. In optimistic * read usages, actions prior to the most recent write mode unlock action @@ -237,6 +235,7 @@ * } * }} * + * @jls 17.4 Memory Model * @since 1.8 * @author Doug Lea */ @@ -264,122 +263,54 @@ * updates. * * Waiters use a modified form of CLH lock used in - * AbstractQueuedSynchronizer (see its internal documentation for - * a fuller account), where each node is tagged (field mode) as - * either a reader or writer. Sets of waiting readers are grouped - * (linked) under a common node (field cowait) so act as a single - * node with respect to most CLH mechanics. By virtue of the - * queue structure, wait nodes need not actually carry sequence - * numbers; we know each is greater than its predecessor. This - * simplifies the scheduling policy to a mainly-FIFO scheme that - * incorporates elements of Phase-Fair locks (see Brandenburg & - * Anderson, especially http://www.cs.unc.edu/~bbb/diss/). In - * particular, we use the phase-fair anti-barging rule: If an - * incoming reader arrives while read lock is held but there is a - * queued writer, this incoming reader is queued. (This rule is - * responsible for some of the complexity of method acquireRead, - * but without it, the lock becomes highly unfair.) Method release - * does not (and sometimes cannot) itself wake up cowaiters. This - * is done by the primary thread, but helped by any other threads - * with nothing better to do in methods acquireRead and - * acquireWrite. - * - * These rules apply to threads actually queued. All tryLock forms - * opportunistically try to acquire locks regardless of preference - * rules, and so may "barge" their way in. Randomized spinning is - * used in the acquire methods to reduce (increasingly expensive) - * context switching while also avoiding sustained memory - * thrashing among many threads. We limit spins to the head of - * queue. If, upon wakening, a thread fails to obtain lock, and is - * still (or becomes) the first waiting thread (which indicates - * that some other thread barged and obtained lock), it escalates - * spins (up to MAX_HEAD_SPINS) to reduce the likelihood of - * continually losing to barging threads. + * AbstractQueuedSynchronizer (AQS; see its internal documentation + * for a fuller account), where each node is either a ReaderNode + * or WriterNode. Implementation of queued Writer mode is + * identical to AQS except for lock-state operations. Sets of + * waiting readers are grouped (linked) under a common node (field + * cowaiters) so act as a single node with respect to most CLH + * mechanics. This simplifies the scheduling policy to a + * mainly-FIFO scheme that incorporates elements of Phase-Fair + * locks (see Brandenburg & Anderson, especially + * http://www.cs.unc.edu/~bbb/diss/). Method release does not + * itself wake up cowaiters. This is done by the primary thread, + * but helped by other cowaiters as they awaken. + * + * These rules apply to threads actually queued. Threads may also + * try to acquire locks before or in the process of enqueueing + * regardless of preference rules, and so may "barge" their way + * in. Methods writeLock and readLock (but not the other variants + * of each) first unconditionally try to CAS state, falling back + * to test-and-test-and-set retries on failure, slightly shrinking + * race windows on initial attempts, thus making success more + * likely. Also, when some threads cancel (via interrupt or + * timeout), phase-fairness is at best roughly approximated. * * Nearly all of these mechanics are carried out in methods * acquireWrite and acquireRead, that, as typical of such code, * sprawl out because actions and retries rely on consistent sets * of locally cached reads. * - * As noted in Boehm's paper (above), sequence validation (mainly - * method validate()) requires stricter ordering rules than apply - * to normal volatile reads (of "state"). To force orderings of - * reads before a validation and the validation itself in those - * cases where this is not already forced, we use acquireFence. - * Unlike in that paper, we allow writers to use plain writes. - * One would not expect reorderings of such writes with the lock - * acquisition CAS because there is a "control dependency", but it - * is theoretically possible, so we additionally add a - * storeStoreFence after lock acquisition CAS. - * - * ---------------------------------------------------------------- - * Here's an informal proof that plain reads by _successful_ - * readers see plain writes from preceding but not following - * writers (following Boehm and the C++ standard [atomics.fences]): - * - * Because of the total synchronization order of accesses to - * volatile long state containing the sequence number, writers and - * _successful_ readers can be globally sequenced. - * - * int x, y; - * - * Writer 1: - * inc sequence (odd - "locked") - * storeStoreFence(); - * x = 1; y = 2; - * inc sequence (even - "unlocked") - * - * Successful Reader: - * read sequence (even) - * // must see writes from Writer 1 but not Writer 2 - * r1 = x; r2 = y; - * acquireFence(); - * read sequence (even - validated unchanged) - * // use r1 and r2 - * - * Writer 2: - * inc sequence (odd - "locked") - * storeStoreFence(); - * x = 3; y = 4; - * inc sequence (even - "unlocked") - * - * Visibility of writer 1's stores is normal - reader's initial - * read of state synchronizes with writer 1's final write to state. - * Lack of visibility of writer 2's plain writes is less obvious. - * If reader's read of x or y saw writer 2's write, then (assuming - * semantics of C++ fences) the storeStoreFence would "synchronize" - * with reader's acquireFence and reader's validation read must see - * writer 2's initial write to state and so validation must fail. - * But making this "proof" formal and rigorous is an open problem! - * ---------------------------------------------------------------- + * For an explanation of the use of acquireFence, see + * http://gee.cs.oswego.edu/dl/html/j9mm.html as well as Boehm's + * paper (above). Note that sequence validation (mainly method + * validate()) requires stricter ordering rules than apply to + * normal volatile reads (of "state"). To ensure that writeLock + * acquisitions strictly precede subsequent writes in cases where + * this is not already forced, we use a storeStoreFence. * * The memory layout keeps lock state and queue pointers together * (normally on the same cache line). This usually works well for * read-mostly loads. In most other cases, the natural tendency of - * adaptive-spin CLH locks to reduce memory contention lessens - * motivation to further spread out contended locations, but might - * be subject to future improvements. + * CLH locks to reduce memory contention lessens motivation to + * further spread out contended locations, but might be subject to + * future improvements. */ private static final long serialVersionUID = -6001602636862214147L; - /** Number of processors, for spin control */ - private static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** Maximum number of retries before enqueuing on acquisition; at least 1 */ - private static final int SPINS = (NCPU > 1) ? 1 << 6 : 1; - - /** Maximum number of tries before blocking at head on acquisition */ - private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 1; - - /** Maximum number of retries before re-blocking */ - private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 1; - - /** The period for yielding when waiting for overflow spinlock */ - private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 - /** The number of bits to use for reader count before overflowing */ - private static final int LG_READERS = 7; + private static final int LG_READERS = 7; // 127 readers // Values for lock state and stamp operations private static final long RUNIT = 1L; @@ -388,6 +319,8 @@ private static final long RFULL = RBITS - 1L; private static final long ABITS = RBITS | WBIT; private static final long SBITS = ~RBITS; // note overlap with ABITS + // not writing and conservatively non-overflowing + private static final long RSAFE = ~(3L << (LG_READERS - 1)); /* * 3 stamp modes can be distinguished by examining (m = stamp & ABITS): @@ -408,29 +341,64 @@ // Special value from cancelled acquire methods so caller can throw IE private static final long INTERRUPTED = 1L; - // Values for node status; order matters - private static final int WAITING = -1; - private static final int CANCELLED = 1; - - // Modes for nodes (int not boolean to allow arithmetic) - private static final int RMODE = 0; - private static final int WMODE = 1; - - /** Wait nodes */ - static final class WNode { - volatile WNode prev; - volatile WNode next; - volatile WNode cowait; // list of linked readers - volatile Thread thread; // non-null while possibly parked - volatile int status; // 0, WAITING, or CANCELLED - final int mode; // RMODE or WMODE - WNode(int m, WNode p) { mode = m; prev = p; } + // Bits for Node.status + static final int WAITING = 1; + static final int CANCELLED = 0x80000000; // must be negative + + /** CLH nodes */ + abstract static class Node { + volatile Node prev; // initially attached via casTail + volatile Node next; // visibly nonnull when signallable + Thread waiter; // visibly nonnull when enqueued + volatile int status; // written by owner, atomic bit ops by others + + // methods for atomic operations + final boolean casPrev(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, PREV, c, v); + } + final boolean casNext(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, NEXT, c, v); + } + final int getAndUnsetStatus(int v) { // for signalling + return U.getAndBitwiseAndInt(this, STATUS, ~v); + } + final void setPrevRelaxed(Node p) { // for off-queue assignment + U.putReference(this, PREV, p); + } + final void setStatusRelaxed(int s) { // for off-queue assignment + U.putInt(this, STATUS, s); + } + final void clearStatus() { // for reducing unneeded signals + U.putIntOpaque(this, STATUS, 0); + } + + private static final long STATUS + = U.objectFieldOffset(Node.class, "status"); + private static final long NEXT + = U.objectFieldOffset(Node.class, "next"); + private static final long PREV + = U.objectFieldOffset(Node.class, "prev"); + } + + static final class WriterNode extends Node { // node for writers + } + + static final class ReaderNode extends Node { // node for readers + volatile ReaderNode cowaiters; // list of linked readers + final boolean casCowaiters(ReaderNode c, ReaderNode v) { + return U.weakCompareAndSetReference(this, COWAITERS, c, v); + } + final void setCowaitersRelaxed(ReaderNode p) { + U.putReference(this, COWAITERS, p); + } + private static final long COWAITERS + = U.objectFieldOffset(ReaderNode.class, "cowaiters"); } /** Head of CLH queue */ - private transient volatile WNode whead; + private transient volatile Node head; /** Tail (last) of CLH queue */ - private transient volatile WNode wtail; + private transient volatile Node tail; // views transient ReadLockView readLockView; @@ -449,20 +417,52 @@ state = ORIGIN; } - private boolean casState(long expectedValue, long newValue) { - return STATE.compareAndSet(this, expectedValue, newValue); + // internal lock methods + + private boolean casState(long expect, long update) { + return U.compareAndSetLong(this, STATE, expect, update); } - private long tryWriteLock(long s) { - // assert (s & ABITS) == 0L; - long next; - if (casState(s, next = s | WBIT)) { - VarHandle.storeStoreFence(); - return next; + @ReservedStackAccess + private long tryAcquireWrite() { + long s, nextState; + if (((s = state) & ABITS) == 0L && casState(s, nextState = s | WBIT)) { + U.storeStoreFence(); + return nextState; } return 0L; } + @ReservedStackAccess + private long tryAcquireRead() { + for (long s, m, nextState;;) { + if ((m = (s = state) & ABITS) < RFULL) { + if (casState(s, nextState = s + RUNIT)) + return nextState; + } + else if (m == WBIT) + return 0L; + else if ((nextState = tryIncReaderOverflow(s)) != 0L) + return nextState; + } + } + + /** + * Returns an unlocked state, incrementing the version and + * avoiding special failure value 0L. + * + * @param s a write-locked state (or stamp) + */ + private static long unlockWriteState(long s) { + return ((s += WBIT) == 0L) ? ORIGIN : s; + } + + private long releaseWrite(long s) { + long nextState = state = unlockWriteState(s); + signalNext(head); + return nextState; + } + /** * Exclusively acquires the lock, blocking if necessary * until available. @@ -471,8 +471,13 @@ */ @ReservedStackAccess public long writeLock() { - long next; - return ((next = tryWriteLock()) != 0L) ? next : acquireWrite(false, 0L); + // try unconditional CAS confirming weak read + long s = U.getLongOpaque(this, STATE) & ~ABITS, nextState; + if (casState(s, nextState = s | WBIT)) { + U.storeStoreFence(); + return nextState; + } + return acquireWrite(false, false, 0L); } /** @@ -481,10 +486,8 @@ * @return a write stamp that can be used to unlock or convert mode, * or zero if the lock is not available */ - @ReservedStackAccess public long tryWriteLock() { - long s; - return (((s = state) & ABITS) == 0L) ? tryWriteLock(s) : 0L; + return tryAcquireWrite(); } /** @@ -504,15 +507,14 @@ throws InterruptedException { long nanos = unit.toNanos(time); if (!Thread.interrupted()) { - long next, deadline; - if ((next = tryWriteLock()) != 0L) - return next; + long nextState; + if ((nextState = tryAcquireWrite()) != 0L) + return nextState; if (nanos <= 0L) return 0L; - if ((deadline = System.nanoTime() + nanos) == 0L) - deadline = 1L; - if ((next = acquireWrite(true, deadline)) != INTERRUPTED) - return next; + nextState = acquireWrite(true, true, System.nanoTime() + nanos); + if (nextState != INTERRUPTED) + return nextState; } throw new InterruptedException(); } @@ -527,12 +529,12 @@ * @throws InterruptedException if the current thread is interrupted * before acquiring the lock */ - @ReservedStackAccess public long writeLockInterruptibly() throws InterruptedException { - long next; + long nextState; if (!Thread.interrupted() && - (next = acquireWrite(true, 0L)) != INTERRUPTED) - return next; + ((nextState = tryAcquireWrite()) != 0L || + (nextState = acquireWrite(true, false, 0L)) != INTERRUPTED)) + return nextState; throw new InterruptedException(); } @@ -544,13 +546,12 @@ */ @ReservedStackAccess public long readLock() { - long s, next; - // bypass acquireRead on common uncontended case - return (whead == wtail - && ((s = state) & ABITS) < RFULL - && casState(s, next = s + RUNIT)) - ? next - : acquireRead(false, 0L); + // unconditionally optimistically try non-overflow case once + long s = U.getLongOpaque(this, STATE) & RSAFE, nextState; + if (casState(s, nextState = s + RUNIT)) + return nextState; + else + return acquireRead(false, false, 0L); } /** @@ -559,18 +560,8 @@ * @return a read stamp that can be used to unlock or convert mode, * or zero if the lock is not available */ - @ReservedStackAccess public long tryReadLock() { - long s, m, next; - while ((m = (s = state) & ABITS) != WBIT) { - if (m < RFULL) { - if (casState(s, next = s + RUNIT)) - return next; - } - else if ((next = tryIncReaderOverflow(s)) != 0L) - return next; - } - return 0L; + return tryAcquireRead(); } /** @@ -586,26 +577,18 @@ * @throws InterruptedException if the current thread is interrupted * before acquiring the lock */ - @ReservedStackAccess public long tryReadLock(long time, TimeUnit unit) throws InterruptedException { - long s, m, next, deadline; long nanos = unit.toNanos(time); if (!Thread.interrupted()) { - if ((m = (s = state) & ABITS) != WBIT) { - if (m < RFULL) { - if (casState(s, next = s + RUNIT)) - return next; - } - else if ((next = tryIncReaderOverflow(s)) != 0L) - return next; - } + long nextState; + if (tail == head && (nextState = tryAcquireRead()) != 0L) + return nextState; if (nanos <= 0L) return 0L; - if ((deadline = System.nanoTime() + nanos) == 0L) - deadline = 1L; - if ((next = acquireRead(true, deadline)) != INTERRUPTED) - return next; + nextState = acquireRead(true, true, System.nanoTime() + nanos); + if (nextState != INTERRUPTED) + return nextState; } throw new InterruptedException(); } @@ -620,17 +603,12 @@ * @throws InterruptedException if the current thread is interrupted * before acquiring the lock */ - @ReservedStackAccess public long readLockInterruptibly() throws InterruptedException { - long s, next; - if (!Thread.interrupted() - // bypass acquireRead on common uncontended case - && ((whead == wtail - && ((s = state) & ABITS) < RFULL - && casState(s, next = s + RUNIT)) - || - (next = acquireRead(true, 0L)) != INTERRUPTED)) - return next; + long nextState; + if (!Thread.interrupted() && + ((nextState = tryAcquireRead()) != 0L || + (nextState = acquireRead(true, false, 0L)) != INTERRUPTED)) + return nextState; throw new InterruptedException(); } @@ -658,29 +636,11 @@ * since issuance of the given stamp; else false */ public boolean validate(long stamp) { - VarHandle.acquireFence(); + U.loadFence(); return (stamp & SBITS) == (state & SBITS); } /** - * Returns an unlocked state, incrementing the version and - * avoiding special failure value 0L. - * - * @param s a write-locked state (or stamp) - */ - private static long unlockWriteState(long s) { - return ((s += WBIT) == 0L) ? ORIGIN : s; - } - - private long unlockWriteInternal(long s) { - long next; WNode h; - STATE.setVolatile(this, next = unlockWriteState(s)); - if ((h = whead) != null && h.status != 0) - release(h); - return next; - } - - /** * If the lock state matches the given stamp, releases the * exclusive lock. * @@ -692,7 +652,7 @@ public void unlockWrite(long stamp) { if (state != stamp || (stamp & WBIT) == 0L) throw new IllegalMonitorStateException(); - unlockWriteInternal(stamp); + releaseWrite(stamp); } /** @@ -705,19 +665,20 @@ */ @ReservedStackAccess public void unlockRead(long stamp) { - long s, m; WNode h; - while (((s = state) & SBITS) == (stamp & SBITS) - && (stamp & RBITS) > 0L - && ((m = s & RBITS) > 0L)) { - if (m < RFULL) { - if (casState(s, s - RUNIT)) { - if (m == RUNIT && (h = whead) != null && h.status != 0) - release(h); - return; + long s, m; + if ((stamp & RBITS) != 0L) { + while (((s = state) & SBITS) == (stamp & SBITS) && + ((m = s & RBITS) != 0L)) { + if (m < RFULL) { + if (casState(s, s - RUNIT)) { + if (m == RUNIT) + signalNext(head); + return; + } } + else if (tryDecReaderOverflow(s) != 0L) + return; } - else if (tryDecReaderOverflow(s) != 0L) - return; } throw new IllegalMonitorStateException(); } @@ -730,7 +691,6 @@ * @throws IllegalMonitorStateException if the stamp does * not match the current state of this lock */ - @ReservedStackAccess public void unlock(long stamp) { if ((stamp & WBIT) != 0L) unlockWrite(stamp); @@ -751,26 +711,23 @@ * @return a valid write stamp, or zero on failure */ public long tryConvertToWriteLock(long stamp) { - long a = stamp & ABITS, m, s, next; + long a = stamp & ABITS, m, s, nextState; while (((s = state) & SBITS) == (stamp & SBITS)) { if ((m = s & ABITS) == 0L) { if (a != 0L) break; - if ((next = tryWriteLock(s)) != 0L) - return next; - } - else if (m == WBIT) { + if (casState(s, nextState = s | WBIT)) { + U.storeStoreFence(); + return nextState; + } + } else if (m == WBIT) { if (a != m) break; return stamp; - } - else if (m == RUNIT && a != 0L) { - if (casState(s, next = s - RUNIT + WBIT)) { - VarHandle.storeStoreFence(); - return next; - } - } - else + } else if (m == RUNIT && a != 0L) { + if (casState(s, nextState = s - RUNIT + WBIT)) + return nextState; + } else break; } return 0L; @@ -788,28 +745,21 @@ * @return a valid read stamp, or zero on failure */ public long tryConvertToReadLock(long stamp) { - long a, s, next; WNode h; + long a, s, nextState; while (((s = state) & SBITS) == (stamp & SBITS)) { if ((a = stamp & ABITS) >= WBIT) { - // write stamp - if (s != stamp) + if (s != stamp) // write stamp break; - STATE.setVolatile(this, next = unlockWriteState(s) + RUNIT); - if ((h = whead) != null && h.status != 0) - release(h); - return next; - } - else if (a == 0L) { - // optimistic read stamp + nextState = state = unlockWriteState(s) + RUNIT; + signalNext(head); + return nextState; + } else if (a == 0L) { // optimistic read stamp if ((s & ABITS) < RFULL) { - if (casState(s, next = s + RUNIT)) - return next; - } - else if ((next = tryIncReaderOverflow(s)) != 0L) - return next; - } - else { - // already a read stamp + if (casState(s, nextState = s + RUNIT)) + return nextState; + } else if ((nextState = tryIncReaderOverflow(s)) != 0L) + return nextState; + } else { // already a read stamp if ((s & ABITS) == 0L) break; return stamp; @@ -829,29 +779,25 @@ * @return a valid optimistic read stamp, or zero on failure */ public long tryConvertToOptimisticRead(long stamp) { - long a, m, s, next; WNode h; - VarHandle.acquireFence(); + long a, m, s, nextState; + U.loadFence(); while (((s = state) & SBITS) == (stamp & SBITS)) { if ((a = stamp & ABITS) >= WBIT) { - // write stamp - if (s != stamp) + if (s != stamp) // write stamp break; - return unlockWriteInternal(s); - } - else if (a == 0L) - // already an optimistic read stamp + return releaseWrite(s); + } else if (a == 0L) { // already an optimistic read stamp return stamp; - else if ((m = s & ABITS) == 0L) // invalid read stamp + } else if ((m = s & ABITS) == 0L) { // invalid read stamp break; - else if (m < RFULL) { - if (casState(s, next = s - RUNIT)) { - if (m == RUNIT && (h = whead) != null && h.status != 0) - release(h); - return next & SBITS; + } else if (m < RFULL) { + if (casState(s, nextState = s - RUNIT)) { + if (m == RUNIT) + signalNext(head); + return nextState & SBITS; } - } - else if ((next = tryDecReaderOverflow(s)) != 0L) - return next & SBITS; + } else if ((nextState = tryDecReaderOverflow(s)) != 0L) + return nextState & SBITS; } return 0L; } @@ -867,7 +813,7 @@ public boolean tryUnlockWrite() { long s; if (((s = state) & WBIT) != 0L) { - unlockWriteInternal(s); + releaseWrite(s); return true; } return false; @@ -882,12 +828,12 @@ */ @ReservedStackAccess public boolean tryUnlockRead() { - long s, m; WNode h; + long s, m; while ((m = (s = state) & ABITS) != 0L && m < WBIT) { if (m < RFULL) { if (casState(s, s - RUNIT)) { - if (m == RUNIT && (h = whead) != null && h.status != 0) - release(h); + if (m == RUNIT) + signalNext(head); return true; } } @@ -1133,16 +1079,16 @@ long s; if (((s = state) & WBIT) == 0L) throw new IllegalMonitorStateException(); - unlockWriteInternal(s); + releaseWrite(s); } final void unstampedUnlockRead() { - long s, m; WNode h; + long s, m; while ((m = (s = state) & RBITS) > 0L) { if (m < RFULL) { if (casState(s, s - RUNIT)) { - if (m == RUNIT && (h = whead) != null && h.status != 0) - release(h); + if (m == RUNIT) + signalNext(head); return; } } @@ -1155,10 +1101,10 @@ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); - STATE.setVolatile(this, ORIGIN); // reset to unlocked state + state = ORIGIN; // reset to unlocked state } - // internals + // overflow handling methods /** * Tries to increment readerOverflow by first setting state @@ -1170,17 +1116,12 @@ */ private long tryIncReaderOverflow(long s) { // assert (s & ABITS) >= RFULL; - if ((s & ABITS) == RFULL) { - if (casState(s, s | RBITS)) { - ++readerOverflow; - STATE.setVolatile(this, s); - return s; - } - } - else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0) - Thread.yield(); - else + if ((s & ABITS) != RFULL) Thread.onSpinWait(); + else if (casState(s, s | RBITS)) { + ++readerOverflow; + return state = s; + } return 0L; } @@ -1192,153 +1133,132 @@ */ private long tryDecReaderOverflow(long s) { // assert (s & ABITS) >= RFULL; - if ((s & ABITS) == RFULL) { - if (casState(s, s | RBITS)) { - int r; long next; - if ((r = readerOverflow) > 0) { - readerOverflow = r - 1; - next = s; - } - else - next = s - RUNIT; - STATE.setVolatile(this, next); - return next; + if ((s & ABITS) != RFULL) + Thread.onSpinWait(); + else if (casState(s, s | RBITS)) { + int r; long nextState; + if ((r = readerOverflow) > 0) { + readerOverflow = r - 1; + nextState = s; } + else + nextState = s - RUNIT; + return state = nextState; } - else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0) - Thread.yield(); - else - Thread.onSpinWait(); return 0L; } + // release methods + + /** + * Wakes up the successor of given node, if one exists, and unsets its + * WAITING status to avoid park race. This may fail to wake up an + * eligible thread when one or more have been cancelled, but + * cancelAcquire ensures liveness. + */ + static final void signalNext(Node h) { + Node s; + if (h != null && (s = h.next) != null && s.status > 0) { + s.getAndUnsetStatus(WAITING); + LockSupport.unpark(s.waiter); + } + } + /** - * Wakes up the successor of h (normally whead). This is normally - * just h.next, but may require traversal from wtail if next - * pointers are lagging. This may fail to wake up an acquiring - * thread when one or more have been cancelled, but the cancel - * methods themselves provide extra safeguards to ensure liveness. - */ - private void release(WNode h) { - if (h != null) { - WNode q; Thread w; - WSTATUS.compareAndSet(h, WAITING, 0); - if ((q = h.next) == null || q.status == CANCELLED) { - for (WNode t = wtail; t != null && t != h; t = t.prev) - if (t.status <= 0) - q = t; + * Removes and unparks all cowaiters of node, if it exists. + */ + private static void signalCowaiters(ReaderNode node) { + if (node != null) { + for (ReaderNode c; (c = node.cowaiters) != null; ) { + if (node.casCowaiters(c, c.cowaiters)) + LockSupport.unpark(c.waiter); } - if (q != null && (w = q.thread) != null) - LockSupport.unpark(w); } } + // queue link methods + private boolean casTail(Node c, Node v) { + return U.compareAndSetReference(this, TAIL, c, v); + } + + /** tries once to CAS a new dummy node for head */ + private void tryInitializeHead() { + Node h = new WriterNode(); + if (U.compareAndSetReference(this, HEAD, null, h)) + tail = h; + } + /** - * See above for explanation. + * For explanation, see above and AbstractQueuedSynchronizer + * internal documentation. * * @param interruptible true if should check interrupts and if so * return INTERRUPTED - * @param deadline if nonzero, the System.nanoTime value to timeout - * at (and return zero) + * @param timed if true use timed waits + * @param time the System.nanoTime value to timeout at (and return zero) * @return next state, or INTERRUPTED */ - private long acquireWrite(boolean interruptible, long deadline) { - WNode node = null, p; - for (int spins = -1;;) { // spin while enqueuing - long m, s, ns; - if ((m = (s = state) & ABITS) == 0L) { - if ((ns = tryWriteLock(s)) != 0L) - return ns; - } - else if (spins < 0) - spins = (m == WBIT && wtail == whead) ? SPINS : 0; - else if (spins > 0) { + private long acquireWrite(boolean interruptible, boolean timed, long time) { + byte spins = 0, postSpins = 0; // retries upon unpark of first thread + boolean interrupted = false, first = false; + WriterNode node = null; + Node pred = null; + for (long s, nextState;;) { + if (!first && (pred = (node == null) ? null : node.prev) != null && + !(first = (head == pred))) { + if (pred.status < 0) { + cleanQueue(); // predecessor cancelled + continue; + } else if (pred.prev == null) { + Thread.onSpinWait(); // ensure serialization + continue; + } + } + if ((first || pred == null) && ((s = state) & ABITS) == 0L && + casState(s, nextState = s | WBIT)) { + U.storeStoreFence(); + if (first) { + node.prev = null; + head = node; + pred.next = null; + node.waiter = null; + if (interrupted) + Thread.currentThread().interrupt(); + } + return nextState; + } else if (node == null) { // retry before enqueuing + node = new WriterNode(); + } else if (pred == null) { // try to enqueue + Node t = tail; + node.setPrevRelaxed(t); + if (t == null) + tryInitializeHead(); + else if (!casTail(t, node)) + node.setPrevRelaxed(null); // back out + else + t.next = node; + } else if (first && spins != 0) { // reduce unfairness --spins; Thread.onSpinWait(); - } - else if ((p = wtail) == null) { // initialize queue - WNode hd = new WNode(WMODE, null); - if (WHEAD.weakCompareAndSet(this, null, hd)) - wtail = hd; - } - else if (node == null) - node = new WNode(WMODE, p); - else if (node.prev != p) - node.prev = p; - else if (WTAIL.weakCompareAndSet(this, p, node)) { - p.next = node; - break; - } - } - - boolean wasInterrupted = false; - for (int spins = -1;;) { - WNode h, np, pp; int ps; - if ((h = whead) == p) { - if (spins < 0) - spins = HEAD_SPINS; - else if (spins < MAX_HEAD_SPINS) - spins <<= 1; - for (int k = spins; k > 0; --k) { // spin at head - long s, ns; - if (((s = state) & ABITS) == 0L) { - if ((ns = tryWriteLock(s)) != 0L) { - whead = node; - node.prev = null; - if (wasInterrupted) - Thread.currentThread().interrupt(); - return ns; - } - } - else - Thread.onSpinWait(); - } - } - else if (h != null) { // help release stale waiters - WNode c; Thread w; - while ((c = h.cowait) != null) { - if (WCOWAIT.weakCompareAndSet(h, c, c.cowait) && - (w = c.thread) != null) - LockSupport.unpark(w); - } - } - if (whead == h) { - if ((np = node.prev) != p) { - if (np != null) - (p = np).next = node; // stale - } - else if ((ps = p.status) == 0) - WSTATUS.compareAndSet(p, 0, WAITING); - else if (ps == CANCELLED) { - if ((pp = p.prev) != null) { - node.prev = pp; - pp.next = node; - } - } - else { - long time; // 0 argument to park means no timeout - if (deadline == 0L) - time = 0L; - else if ((time = deadline - System.nanoTime()) <= 0L) - return cancelWaiter(node, node, false); - Thread wt = Thread.currentThread(); - node.thread = wt; - if (p.status < 0 && (p != h || (state & ABITS) != 0L) && - whead == h && node.prev == p) { - if (time == 0L) - LockSupport.park(this); - else - LockSupport.parkNanos(this, time); - } - node.thread = null; - if (Thread.interrupted()) { - if (interruptible) - return cancelWaiter(node, node, true); - wasInterrupted = true; - } - } + } else if (node.status == 0) { // enable signal + if (node.waiter == null) + node.waiter = Thread.currentThread(); + node.status = WAITING; + } else { + long nanos; + spins = postSpins = (byte)((postSpins << 1) | 1); + if (!timed) + LockSupport.park(this); + else if ((nanos = time - System.nanoTime()) > 0L) + LockSupport.parkNanos(this, nanos); + else + break; + node.clearStatus(); + if ((interrupted |= Thread.interrupted()) && interruptible) + break; } } + return cancelAcquire(node, interrupted); } /** @@ -1346,182 +1266,178 @@ * * @param interruptible true if should check interrupts and if so * return INTERRUPTED - * @param deadline if nonzero, the System.nanoTime value to timeout - * at (and return zero) + * @param timed if true use timed waits + * @param time the System.nanoTime value to timeout at (and return zero) * @return next state, or INTERRUPTED */ - private long acquireRead(boolean interruptible, long deadline) { - boolean wasInterrupted = false; - WNode node = null, p; - for (int spins = -1;;) { - WNode h; - if ((h = whead) == (p = wtail)) { - for (long m, s, ns;;) { - if ((m = (s = state) & ABITS) < RFULL ? - casState(s, ns = s + RUNIT) : - (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { - if (wasInterrupted) - Thread.currentThread().interrupt(); - return ns; + private long acquireRead(boolean interruptible, boolean timed, long time) { + boolean interrupted = false; + ReaderNode node = null; + /* + * Loop: + * if empty, try to acquire + * if tail is Reader, try to cowait; restart if leader stale or cancels + * else try to create and enqueue node, and wait in 2nd loop below + */ + for (;;) { + ReaderNode leader; long nextState; + Node tailPred = null, t = tail; + if ((t == null || (tailPred = t.prev) == null) && + (nextState = tryAcquireRead()) != 0L) // try now if empty + return nextState; + else if (t == null) + tryInitializeHead(); + else if (tailPred == null || !(t instanceof ReaderNode)) { + if (node == null) + node = new ReaderNode(); + if (tail == t) { + node.setPrevRelaxed(t); + if (casTail(t, node)) { + t.next = node; + break; // node is leader; wait in loop below } - else if (m >= WBIT) { - if (spins > 0) { - --spins; - Thread.onSpinWait(); - } - else { - if (spins == 0) { - WNode nh = whead, np = wtail; - if ((nh == h && np == p) || (h = nh) != (p = np)) - break; - } - spins = SPINS; - } + node.setPrevRelaxed(null); + } + } else if ((leader = (ReaderNode)t) == tail) { // try to cowait + for (boolean attached = false;;) { + if (leader.status < 0 || leader.prev == null) + break; + else if (node == null) + node = new ReaderNode(); + else if (node.waiter == null) + node.waiter = Thread.currentThread(); + else if (!attached) { + ReaderNode c = leader.cowaiters; + node.setCowaitersRelaxed(c); + attached = leader.casCowaiters(c, node); + if (!attached) + node.setCowaitersRelaxed(null); + } else { + long nanos = 0L; + if (!timed) + LockSupport.park(this); + else if ((nanos = time - System.nanoTime()) > 0L) + LockSupport.parkNanos(this, nanos); + interrupted |= Thread.interrupted(); + if ((interrupted && interruptible) || + (timed && nanos <= 0L)) + return cancelCowaiter(node, leader, interrupted); } } + if (node != null) + node.waiter = null; + long ns = tryAcquireRead(); + signalCowaiters(leader); + if (interrupted) + Thread.currentThread().interrupt(); + if (ns != 0L) + return ns; + else + node = null; // restart if stale, missed, or leader cancelled } - if (p == null) { // initialize queue - WNode hd = new WNode(WMODE, null); - if (WHEAD.weakCompareAndSet(this, null, hd)) - wtail = hd; - } - else if (node == null) - node = new WNode(RMODE, p); - else if (h == p || p.mode != RMODE) { - if (node.prev != p) - node.prev = p; - else if (WTAIL.weakCompareAndSet(this, p, node)) { - p.next = node; + } + + // node is leader of a cowait group; almost same as acquireWrite + byte spins = 0, postSpins = 0; // retries upon unpark of first thread + boolean first = false; + Node pred = null; + for (long nextState;;) { + if (!first && (pred = node.prev) != null && + !(first = (head == pred))) { + if (pred.status < 0) { + cleanQueue(); // predecessor cancelled + continue; + } else if (pred.prev == null) { + Thread.onSpinWait(); // ensure serialization + continue; + } + } + if ((first || pred == null) && + (nextState = tryAcquireRead()) != 0L) { + if (first) { + node.prev = null; + head = node; + pred.next = null; + node.waiter = null; + } + signalCowaiters(node); + if (interrupted) + Thread.currentThread().interrupt(); + return nextState; + } else if (first && spins != 0) { + --spins; + Thread.onSpinWait(); + } else if (node.status == 0) { + if (node.waiter == null) + node.waiter = Thread.currentThread(); + node.status = WAITING; + } else { + long nanos; + spins = postSpins = (byte)((postSpins << 1) | 1); + if (!timed) + LockSupport.park(this); + else if ((nanos = time - System.nanoTime()) > 0L) + LockSupport.parkNanos(this, nanos); + else + break; + node.clearStatus(); + if ((interrupted |= Thread.interrupted()) && interruptible) break; - } - } - else if (!WCOWAIT.compareAndSet(p, node.cowait = p.cowait, node)) - node.cowait = null; - else { - for (;;) { - WNode pp, c; Thread w; - if ((h = whead) != null && (c = h.cowait) != null && - WCOWAIT.compareAndSet(h, c, c.cowait) && - (w = c.thread) != null) // help release - LockSupport.unpark(w); - if (Thread.interrupted()) { - if (interruptible) - return cancelWaiter(node, p, true); - wasInterrupted = true; - } - if (h == (pp = p.prev) || h == p || pp == null) { - long m, s, ns; - do { - if ((m = (s = state) & ABITS) < RFULL ? - casState(s, ns = s + RUNIT) : - (m < WBIT && - (ns = tryIncReaderOverflow(s)) != 0L)) { - if (wasInterrupted) - Thread.currentThread().interrupt(); - return ns; - } - } while (m < WBIT); - } - if (whead == h && p.prev == pp) { - long time; - if (pp == null || h == p || p.status > 0) { - node = null; // throw away - break; - } - if (deadline == 0L) - time = 0L; - else if ((time = deadline - System.nanoTime()) <= 0L) { - if (wasInterrupted) - Thread.currentThread().interrupt(); - return cancelWaiter(node, p, false); - } - Thread wt = Thread.currentThread(); - node.thread = wt; - if ((h != pp || (state & ABITS) == WBIT) && - whead == h && p.prev == pp) { - if (time == 0L) - LockSupport.park(this); - else - LockSupport.parkNanos(this, time); - } - node.thread = null; - } - } } } + return cancelAcquire(node, interrupted); + } + + // Cancellation support - for (int spins = -1;;) { - WNode h, np, pp; int ps; - if ((h = whead) == p) { - if (spins < 0) - spins = HEAD_SPINS; - else if (spins < MAX_HEAD_SPINS) - spins <<= 1; - for (int k = spins;;) { // spin at head - long m, s, ns; - if ((m = (s = state) & ABITS) < RFULL ? - casState(s, ns = s + RUNIT) : - (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { - WNode c; Thread w; - whead = node; - node.prev = null; - while ((c = node.cowait) != null) { - if (WCOWAIT.compareAndSet(node, c, c.cowait) && - (w = c.thread) != null) - LockSupport.unpark(w); - } - if (wasInterrupted) - Thread.currentThread().interrupt(); - return ns; + /** + * Possibly repeatedly traverses from tail, unsplicing cancelled + * nodes until none are found. Unparks nodes that may have been + * relinked to be next eligible acquirer. + */ + private void cleanQueue() { + for (;;) { // restart point + for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples + if (q == null || (p = q.prev) == null) + return; // end of list + if (s == null ? tail != q : (s.prev != q || s.status < 0)) + break; // inconsistent + if (q.status < 0) { // cancelled + if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && + q.prev == p) { + p.casNext(q, s); // OK if fails + if (p.prev == null) + signalNext(p); } - else if (m >= WBIT && --k <= 0) - break; - else - Thread.onSpinWait(); - } - } - else if (h != null) { - WNode c; Thread w; - while ((c = h.cowait) != null) { - if (WCOWAIT.compareAndSet(h, c, c.cowait) && - (w = c.thread) != null) - LockSupport.unpark(w); - } - } - if (whead == h) { - if ((np = node.prev) != p) { - if (np != null) - (p = np).next = node; // stale + break; } - else if ((ps = p.status) == 0) - WSTATUS.compareAndSet(p, 0, WAITING); - else if (ps == CANCELLED) { - if ((pp = p.prev) != null) { - node.prev = pp; - pp.next = node; + if ((n = p.next) != q) { // help finish + if (n != null && q.prev == p && q.status >= 0) { + p.casNext(n, q); + if (p.prev == null) + signalNext(p); } + break; } - else { - long time; - if (deadline == 0L) - time = 0L; - else if ((time = deadline - System.nanoTime()) <= 0L) - return cancelWaiter(node, node, false); - Thread wt = Thread.currentThread(); - node.thread = wt; - if (p.status < 0 && - (p != h || (state & ABITS) == WBIT) && - whead == h && node.prev == p) { - if (time == 0L) - LockSupport.park(this); - else - LockSupport.parkNanos(this, time); - } - node.thread = null; - if (Thread.interrupted()) { - if (interruptible) - return cancelWaiter(node, node, true); - wasInterrupted = true; + s = q; + q = q.prev; + } + } + } + + /** + * If leader exists, possibly repeatedly traverses cowaiters, + * unsplicing the given cancelled node until not found. + */ + private void unlinkCowaiter(ReaderNode node, ReaderNode leader) { + if (leader != null) { + while (leader.prev != null && leader.status >= 0) { + for (ReaderNode p = leader, q; ; p = q) { + if ((q = p.cowaiters) == null) + return; + if (q == node) { + p.casCowaiters(q, q.cowaiters); + break; // recheck even if succeeded } } } @@ -1530,105 +1446,53 @@ /** * If node non-null, forces cancel status and unsplices it from - * queue if possible and wakes up any cowaiters (of the node, or - * group, as applicable), and in any case helps release current - * first waiter if lock is free. (Calling with null arguments - * serves as a conditional form of release, which is not currently - * needed but may be needed under possible future cancellation - * policies). This is a variant of cancellation methods in - * AbstractQueuedSynchronizer (see its detailed explanation in AQS - * internal documentation). + * queue, wakes up any cowaiters, and possibly wakes up successor + * to recheck status. * - * @param node if non-null, the waiter - * @param group either node or the group node is cowaiting with + * @param node the waiter (may be null if not yet enqueued) * @param interrupted if already interrupted * @return INTERRUPTED if interrupted or Thread.interrupted, else zero */ - private long cancelWaiter(WNode node, WNode group, boolean interrupted) { - if (node != null && group != null) { - Thread w; + private long cancelAcquire(Node node, boolean interrupted) { + if (node != null) { + node.waiter = null; node.status = CANCELLED; - // unsplice cancelled nodes from group - for (WNode p = group, q; (q = p.cowait) != null;) { - if (q.status == CANCELLED) { - WCOWAIT.compareAndSet(p, q, q.cowait); - p = group; // restart - } - else - p = q; - } - if (group == node) { - for (WNode r = group.cowait; r != null; r = r.cowait) { - if ((w = r.thread) != null) - LockSupport.unpark(w); // wake up uncancelled co-waiters - } - for (WNode pred = node.prev; pred != null; ) { // unsplice - WNode succ, pp; // find valid successor - while ((succ = node.next) == null || - succ.status == CANCELLED) { - WNode q = null; // find successor the slow way - for (WNode t = wtail; t != null && t != node; t = t.prev) - if (t.status != CANCELLED) - q = t; // don't link if succ cancelled - if (succ == q || // ensure accurate successor - WNEXT.compareAndSet(node, succ, succ = q)) { - if (succ == null && node == wtail) - WTAIL.compareAndSet(this, node, pred); - break; - } - } - if (pred.next == node) // unsplice pred link - WNEXT.compareAndSet(pred, node, succ); - if (succ != null && (w = succ.thread) != null) { - // wake up succ to observe new pred - succ.thread = null; - LockSupport.unpark(w); - } - if (pred.status != CANCELLED || (pp = pred.prev) == null) - break; - node.prev = pp; // repeat if new pred wrong/cancelled - WNEXT.compareAndSet(pp, pred, succ); - pred = pp; - } - } + cleanQueue(); + if (node instanceof ReaderNode) + signalCowaiters((ReaderNode)node); } - WNode h; // Possibly release first waiter - while ((h = whead) != null) { - long s; WNode q; // similar to release() but check eligibility - if ((q = h.next) == null || q.status == CANCELLED) { - for (WNode t = wtail; t != null && t != h; t = t.prev) - if (t.status <= 0) - q = t; - } - if (h == whead) { - if (q != null && h.status == 0 && - ((s = state) & ABITS) != WBIT && // waiter is eligible - (s == 0L || q.mode == RMODE)) - release(h); - break; - } + return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L; + } + + /** + * If node non-null, forces cancel status and unsplices from + * leader's cowaiters list unless/until it is also cancelled. + * + * @param node if non-null, the waiter + * @param leader if non-null, the node heading cowaiters list + * @param interrupted if already interrupted + * @return INTERRUPTED if interrupted or Thread.interrupted, else zero + */ + private long cancelCowaiter(ReaderNode node, ReaderNode leader, + boolean interrupted) { + if (node != null) { + node.waiter = null; + node.status = CANCELLED; + unlinkCowaiter(node, leader); } return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L; } - // VarHandle mechanics - private static final VarHandle STATE; - private static final VarHandle WHEAD; - private static final VarHandle WTAIL; - private static final VarHandle WNEXT; - private static final VarHandle WSTATUS; - private static final VarHandle WCOWAIT; + // Unsafe + private static final Unsafe U = Unsafe.getUnsafe(); + private static final long STATE + = U.objectFieldOffset(StampedLock.class, "state"); + private static final long HEAD + = U.objectFieldOffset(StampedLock.class, "head"); + private static final long TAIL + = U.objectFieldOffset(StampedLock.class, "tail"); + static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - STATE = l.findVarHandle(StampedLock.class, "state", long.class); - WHEAD = l.findVarHandle(StampedLock.class, "whead", WNode.class); - WTAIL = l.findVarHandle(StampedLock.class, "wtail", WNode.class); - WSTATUS = l.findVarHandle(WNode.class, "status", int.class); - WNEXT = l.findVarHandle(WNode.class, "next", WNode.class); - WCOWAIT = l.findVarHandle(WNode.class, "cowait", WNode.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } + Class ensureLoaded = LockSupport.class; } }