jdk/src/share/classes/java/util/concurrent/locks/StampedLock.java

Print this page

        

*** 224,234 **** * Anderson, especially http://www.cs.unc.edu/~bbb/diss/). In * particular, we use the phase-fair anti-barging rule: If an * incoming reader arrives while read lock is held but there is a * queued writer, this incoming reader is queued. (This rule is * responsible for some of the complexity of method acquireRead, ! * but without it, the lock becomes highly unfair.) * * These rules apply to threads actually queued. All tryLock forms * opportunistically try to acquire locks regardless of preference * rules, and so may "barge" their way in. Randomized spinning is * used in the acquire methods to reduce (increasingly expensive) --- 224,238 ---- * Anderson, especially http://www.cs.unc.edu/~bbb/diss/). In * particular, we use the phase-fair anti-barging rule: If an * incoming reader arrives while read lock is held but there is a * queued writer, this incoming reader is queued. (This rule is * responsible for some of the complexity of method acquireRead, ! * but without it, the lock becomes highly unfair.) Method release ! * does not (and sometimes cannot) itself wake up cowaiters. This ! * is done by the primary thread, but helped by any other threads ! * with nothing better to do in methods acquireRead and ! * acquireWrite. * * These rules apply to threads actually queued. All tryLock forms * opportunistically try to acquire locks regardless of preference * rules, and so may "barge" their way in. Randomized spinning is * used in the acquire methods to reduce (increasingly expensive)
*** 265,279 **** private static final long serialVersionUID = -6001602636862214147L; /** Number of processors, for spin control */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); ! /** Maximum number of retries before blocking on acquisition */ private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0; /** Maximum number of retries before re-blocking */ ! private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 12 : 0; /** The period for yielding when waiting for overflow spinlock */ private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 /** The number of bits to use for reader count before overflowing */ --- 269,286 ---- private static final long serialVersionUID = -6001602636862214147L; /** Number of processors, for spin control */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); ! /** Maximum number of retries before enqueuing on acquisition */ private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0; + /** Maximum number of retries before blocking at head on acquisition */ + private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0; + /** Maximum number of retries before re-blocking */ ! private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0; /** The period for yielding when waiting for overflow spinlock */ private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 /** The number of bits to use for reader count before overflowing */
*** 413,424 **** * until available. * * @return a stamp that can be used to unlock or convert mode */ public long readLock() { ! long s, next; // bypass acquireRead on fully unlocked case only ! return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? next : acquireRead(false, 0L)); } /** --- 420,431 ---- * until available. * * @return a stamp that can be used to unlock or convert mode */ public long readLock() { ! long s = state, next; // bypass acquireRead on common uncontended case ! return ((whead == wtail && (s & ABITS) < RFULL && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? next : acquireRead(false, 0L)); } /**
*** 1010,1031 **** if ((q = h.next) == null || q.status == CANCELLED) { for (WNode t = wtail; t != null && t != h; t = t.prev) if (t.status <= 0) q = t; } ! if (q != null) { ! for (WNode r = q;;) { // release co-waiters too ! if ((w = r.thread) != null) { ! r.thread = null; U.unpark(w); } - if ((r = q.cowait) == null) - break; - U.compareAndSwapObject(q, WCOWAIT, r, r.cowait); - } - } - } } /** * See above for explanation. * --- 1017,1029 ---- if ((q = h.next) == null || q.status == CANCELLED) { for (WNode t = wtail; t != null && t != h; t = t.prev) if (t.status <= 0) q = t; } ! if (q != null && (w = q.thread) != null) U.unpark(w); } } /** * See above for explanation. *
*** 1036,1092 **** * @return next state, or INTERRUPTED */ private long acquireWrite(boolean interruptible, long deadline) { WNode node = null, p; for (int spins = -1;;) { // spin while enqueuing ! long s, ns; ! if (((s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) return ns; } else if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else if ((p = wtail) == null) { // initialize queue ! WNode h = new WNode(WMODE, null); ! if (U.compareAndSwapObject(this, WHEAD, null, h)) ! wtail = h; } - else if (spins < 0) - spins = (p == whead) ? SPINS : 0; else if (node == null) node = new WNode(WMODE, p); else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break; } } ! for (int spins = SPINS;;) { ! WNode np, pp; int ps; long s, ns; Thread w; ! while ((np = node.prev) != p && np != null) ! (p = np).next = node; // stale ! if (whead == p) { for (int k = spins;;) { // spin at head if (((s = state) & ABITS) == 0L) { ! if (U.compareAndSwapLong(this, STATE, s, ns = s+WBIT)) { whead = node; node.prev = null; return ns; } } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } - if (spins < MAX_HEAD_SPINS) - spins <<= 1; } ! if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; --- 1034,1105 ---- * @return next state, or INTERRUPTED */ private long acquireWrite(boolean interruptible, long deadline) { WNode node = null, p; for (int spins = -1;;) { // spin while enqueuing ! long m, s, ns; ! if ((m = (s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) return ns; } + else if (spins < 0) + spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else if ((p = wtail) == null) { // initialize queue ! WNode hd = new WNode(WMODE, null); ! if (U.compareAndSwapObject(this, WHEAD, null, hd)) ! wtail = hd; } else if (node == null) node = new WNode(WMODE, p); else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break; } } ! for (int spins = -1;;) { ! WNode h, np, pp; int ps; ! if ((h = whead) == p) { ! if (spins < 0) ! spins = HEAD_SPINS; ! else if (spins < MAX_HEAD_SPINS) ! spins <<= 1; for (int k = spins;;) { // spin at head + long s, ns; if (((s = state) & ABITS) == 0L) { ! if (U.compareAndSwapLong(this, STATE, s, ! ns = s + WBIT)) { whead = node; node.prev = null; return ns; } } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } ! else if (h != null) { // help release stale waiters ! WNode c; Thread w; ! while ((c = h.cowait) != null) { ! if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && ! (w = c.thread) != null) ! U.unpark(w); ! } ! } ! if (whead == h) { ! if ((np = node.prev) != p) { ! if (np != null) ! (p = np).next = node; // stale ! } ! else if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node;
*** 1097,1118 **** if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); ! U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport.park node.thread = wt; ! if (node.prev == p && p.status == WAITING && // recheck ! (p != whead || (state & ABITS) != 0L)) ! U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } /** * See above for explanation. * * @param interruptible true if should check interrupts and if so --- 1110,1132 ---- if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); ! U.putObject(wt, PARKBLOCKER, this); node.thread = wt; ! if (p.status < 0 && (p != h || (state & ABITS) != 0L) && ! whead == h && node.prev == p) ! U.park(false, time); // emulate LockSupport.park node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } + } /** * See above for explanation. * * @param interruptible true if should check interrupts and if so
*** 1120,1238 **** * @param deadline if nonzero, the System.nanoTime value to timeout * at (and return zero) * @return next state, or INTERRUPTED */ private long acquireRead(boolean interruptible, long deadline) { ! WNode node = null, group = null, p; for (int spins = -1;;) { ! for (;;) { ! long s, m, ns; WNode h, q; Thread w; // anti-barging guard ! if (group == null && (h = whead) != null && ! (q = h.next) != null && q.mode != RMODE) ! break; if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : ! (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { ! if (group != null) { // help release others ! for (WNode r = group;;) { ! if ((w = r.thread) != null) { ! r.thread = null; ! U.unpark(w); } ! if ((r = group.cowait) == null) break; - U.compareAndSwapObject(group, WCOWAIT, r, r.cowait); } } - return ns; } - if (m >= WBIT) - break; } - if (spins > 0) { - if (LockSupport.nextSecondarySeed() >= 0) - --spins; } ! else if ((p = wtail) == null) { ! WNode h = new WNode(WMODE, null); ! if (U.compareAndSwapObject(this, WHEAD, null, h)) ! wtail = h; } - else if (spins < 0) - spins = (p == whead) ? SPINS : 0; else if (node == null) ! node = new WNode(WMODE, p); ! else if (node.prev != p) node.prev = p; ! else if (p.mode == RMODE && p != whead) { ! WNode pp = p.prev; // become co-waiter with group p ! if (pp != null && p == wtail && ! U.compareAndSwapObject(p, WCOWAIT, ! node.cowait = p.cowait, node)) { ! node.thread = Thread.currentThread(); ! for (long time;;) { if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); - if (node.thread == null) - break; - if (p.prev != pp || p.status == CANCELLED || - p == whead || p.prev != pp) { - node.thread = null; - break; - } Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); ! if (node.thread == null) // must recheck ! break; U.park(false, time); U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true); } - group = p; } - node = null; // throw away - } - else if (U.compareAndSwapObject(this, WTAIL, p, node)) { - p.next = node; - break; } } ! for (int spins = SPINS;;) { ! WNode np, pp, r; int ps; long m, s, ns; Thread w; ! while ((np = node.prev) != p && np != null) ! (p = np).next = node; ! if (whead == p) { ! for (int k = spins;;) { ! if ((m = (s = state) & ABITS) != WBIT) { ! if (m < RFULL ? ! U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT): ! (ns = tryIncReaderOverflow(s)) != 0L) { whead = node; node.prev = null; ! while ((r = node.cowait) != null) { if (U.compareAndSwapObject(node, WCOWAIT, ! r, r.cowait) && ! (w = r.thread) != null) { ! r.thread = null; ! U.unpark(w); // release co-waiter ! } } return ns; } ! } ! else if (LockSupport.nextSecondarySeed() >= 0 && ! --k <= 0) break; } - if (spins < MAX_HEAD_SPINS) - spins <<= 1; } ! if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; --- 1134,1271 ---- * @param deadline if nonzero, the System.nanoTime value to timeout * at (and return zero) * @return next state, or INTERRUPTED */ private long acquireRead(boolean interruptible, long deadline) { ! WNode node = null, p; for (int spins = -1;;) { ! WNode h; ! if ((h = whead) == (p = wtail)) { ! for (long m, s, ns;;) { if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : ! (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) ! return ns; ! else if (m >= WBIT) { ! if (spins > 0) { ! if (LockSupport.nextSecondarySeed() >= 0) ! --spins; } ! else { ! if (spins == 0) { ! WNode nh = whead, np = wtail; ! if ((nh == h && np == p) || (h = nh) != (p = np)) break; } + spins = SPINS; } } } } ! if (p == null) { // initialize queue ! WNode hd = new WNode(WMODE, null); ! if (U.compareAndSwapObject(this, WHEAD, null, hd)) ! wtail = hd; } else if (node == null) ! node = new WNode(RMODE, p); ! else if (h == p || p.mode != RMODE) { ! if (node.prev != p) node.prev = p; ! else if (U.compareAndSwapObject(this, WTAIL, p, node)) { ! p.next = node; ! break; ! } ! } ! else if (!U.compareAndSwapObject(p, WCOWAIT, ! node.cowait = p.cowait, node)) ! node.cowait = null; ! else { ! for (;;) { ! WNode pp, c; Thread w; ! if ((h = whead) != null && (c = h.cowait) != null && ! U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && ! (w = c.thread) != null) // help release ! U.unpark(w); ! if (h == (pp = p.prev) || h == p || pp == null) { ! long m, s, ns; ! do { ! if ((m = (s = state) & ABITS) < RFULL ? ! U.compareAndSwapLong(this, STATE, s, ! ns = s + RUNIT) : ! (m < WBIT && ! (ns = tryIncReaderOverflow(s)) != 0L)) ! return ns; ! } while (m < WBIT); ! } ! if (whead == h && p.prev == pp) { ! long time; ! if (pp == null || h == p || p.status > 0) { ! node = null; // throw away ! break; ! } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); ! node.thread = wt; ! if ((h != pp || (state & ABITS) == WBIT) && ! whead == h && p.prev == pp) U.park(false, time); + node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true); } } } } ! for (int spins = -1;;) { ! WNode h, np, pp; int ps; ! if ((h = whead) == p) { ! if (spins < 0) ! spins = HEAD_SPINS; ! else if (spins < MAX_HEAD_SPINS) ! spins <<= 1; ! for (int k = spins;;) { // spin at head ! long m, s, ns; ! if ((m = (s = state) & ABITS) < RFULL ? ! U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : ! (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { ! WNode c; Thread w; whead = node; node.prev = null; ! while ((c = node.cowait) != null) { if (U.compareAndSwapObject(node, WCOWAIT, ! c, c.cowait) && ! (w = c.thread) != null) ! U.unpark(w); } return ns; } ! else if (m >= WBIT && ! LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } ! else if (h != null) { ! WNode c; Thread w; ! while ((c = h.cowait) != null) { ! if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && ! (w = c.thread) != null) ! U.unpark(w); ! } ! } ! if (whead == h) { ! if ((np = node.prev) != p) { ! if (np != null) ! (p = np).next = node; // stale ! } ! else if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node;
*** 1245,1264 **** else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; ! if (node.prev == p && p.status == WAITING && ! (p != whead || (state & ABITS) != WBIT)) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } /** * If node non-null, forces cancel status and unsplices it from * queue if possible and wakes up any cowaiters (of the node, or * group, as applicable), and in any case helps release current --- 1278,1299 ---- else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; ! if (p.status < 0 && ! (p != h || (state & ABITS) == WBIT) && ! whead == h && node.prev == p) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } + } /** * If node non-null, forces cancel status and unsplices it from * queue if possible and wakes up any cowaiters (of the node, or * group, as applicable), and in any case helps release current
*** 1276,1301 **** */ private long cancelWaiter(WNode node, WNode group, boolean interrupted) { if (node != null && group != null) { Thread w; node.status = CANCELLED; - node.thread = null; // unsplice cancelled nodes from group for (WNode p = group, q; (q = p.cowait) != null;) { ! if (q.status == CANCELLED) ! U.compareAndSwapObject(p, WNEXT, q, q.next); else p = q; } if (group == node) { ! WNode r; // detach and wake up uncancelled co-waiters ! while ((r = node.cowait) != null) { ! if (U.compareAndSwapObject(node, WCOWAIT, r, r.cowait) && ! (w = r.thread) != null) { ! r.thread = null; ! U.unpark(w); ! } } for (WNode pred = node.prev; pred != null; ) { // unsplice WNode succ, pp; // find valid successor while ((succ = node.next) == null || succ.status == CANCELLED) { --- 1311,1333 ---- */ private long cancelWaiter(WNode node, WNode group, boolean interrupted) { if (node != null && group != null) { Thread w; node.status = CANCELLED; // unsplice cancelled nodes from group for (WNode p = group, q; (q = p.cowait) != null;) { ! if (q.status == CANCELLED) { ! U.compareAndSwapObject(p, WCOWAIT, q, q.cowait); ! p = group; // restart ! } else p = q; } if (group == node) { ! for (WNode r = group.cowait; r != null; r = r.cowait) { ! if ((w = r.thread) != null) ! U.unpark(w); // wake up uncancelled co-waiters } for (WNode pred = node.prev; pred != null; ) { // unsplice WNode succ, pp; // find valid successor while ((succ = node.next) == null || succ.status == CANCELLED) {