< prev index next >
src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
Print this page
8229442: AQS and lock classes refresh
Reviewed-by: martin
@@ -33,17 +33,16 @@
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent.locks;
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node;
+import java.util.concurrent.ForkJoinPool;
+import jdk.internal.misc.Unsafe;
/**
* A version of {@link AbstractQueuedSynchronizer} in
* which synchronization state is maintained as a {@code long}.
* This class has exactly the same structure, properties, and methods
@@ -71,27 +70,80 @@
* exactly cloned from AbstractQueuedSynchronizer, replacing class
* name and changing ints related with sync state to longs. Please
* keep it that way.
*/
+ // Node status bits, also used as argument and return values
+ static final int WAITING = 1; // must be 1
+ static final int CANCELLED = 0x80000000; // must be negative
+ static final int COND = 2; // in a condition wait
+
+ /** CLH Nodes */
+ abstract static class Node {
+ volatile Node prev; // initially attached via casTail
+ volatile Node next; // visibly nonnull when signallable
+ Thread waiter; // visibly nonnull when enqueued
+ volatile int status; // written by owner, atomic bit ops by others
+
+ // methods for atomic operations
+ final boolean casPrev(Node c, Node v) { // for cleanQueue
+ return U.weakCompareAndSetReference(this, PREV, c, v);
+ }
+ final boolean casNext(Node c, Node v) { // for cleanQueue
+ return U.weakCompareAndSetReference(this, NEXT, c, v);
+ }
+ final int getAndUnsetStatus(int v) { // for signalling
+ return U.getAndBitwiseAndInt(this, STATUS, ~v);
+ }
+ final void setPrevRelaxed(Node p) { // for off-queue assignment
+ U.putReference(this, PREV, p);
+ }
+ final void setStatusRelaxed(int s) { // for off-queue assignment
+ U.putInt(this, STATUS, s);
+ }
+ final void clearStatus() { // for reducing unneeded signals
+ U.putIntOpaque(this, STATUS, 0);
+ }
+
+ private static final long STATUS
+ = U.objectFieldOffset(Node.class, "status");
+ private static final long NEXT
+ = U.objectFieldOffset(Node.class, "next");
+ private static final long PREV
+ = U.objectFieldOffset(Node.class, "prev");
+ }
+
+ // Concrete classes tagged by type
+ static final class ExclusiveNode extends Node { }
+ static final class SharedNode extends Node { }
+
+ static final class ConditionNode extends Node
+ implements ForkJoinPool.ManagedBlocker {
+ ConditionNode nextWaiter; // link to next waiting node
+
/**
- * Creates a new {@code AbstractQueuedLongSynchronizer} instance
- * with initial synchronization state of zero.
+ * Allows Conditions to be used in ForkJoinPools without
+ * risking fixed pool exhaustion. This is usable only for
+ * untimed Condition waits, not timed versions.
*/
- protected AbstractQueuedLongSynchronizer() { }
+ public final boolean isReleasable() {
+ return status <= 1 || Thread.currentThread().isInterrupted();
+ }
+
+ public final boolean block() {
+ while (!isReleasable()) LockSupport.park(this);
+ return true;
+ }
+ }
/**
- * Head of the wait queue, lazily initialized. Except for
- * initialization, it is modified only via method setHead. Note:
- * If head exists, its waitStatus is guaranteed not to be
- * CANCELLED.
+ * Head of the wait queue, lazily initialized.
*/
private transient volatile Node head;
/**
- * Tail of the wait queue, lazily initialized. Modified only via
- * method enq to add new wait node.
+ * Tail of the wait queue. After initialization, modified only via casTail.
*/
private transient volatile Node tail;
/**
* The synchronization state.
@@ -111,12 +163,11 @@
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(long newState) {
- // See JDK-8180620: Clarify VarHandle mixed-access subtleties
- STATE.setVolatile(this, newState);
+ state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
@@ -127,485 +178,238 @@
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(long expect, long update) {
- return STATE.compareAndSet(this, expect, update);
+ return U.compareAndSetLong(this, STATE, expect, update);
}
// Queuing utilities
- /**
- * The number of nanoseconds for which it is faster to spin
- * rather than to use timed park. A rough estimate suffices
- * to improve responsiveness with very short timeouts.
- */
- static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
-
- /**
- * Inserts node into queue, initializing if necessary. See picture above.
- * @param node the node to insert
- * @return node's predecessor
- */
- private Node enq(Node node) {
- for (;;) {
- Node oldTail = tail;
- if (oldTail != null) {
- node.setPrevRelaxed(oldTail);
- if (compareAndSetTail(oldTail, node)) {
- oldTail.next = node;
- return oldTail;
- }
- } else {
- initializeSyncQueue();
- }
- }
- }
-
- /**
- * Creates and enqueues node for current thread and given mode.
- *
- * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
- * @return the new node
- */
- private Node addWaiter(Node mode) {
- Node node = new Node(mode);
-
- for (;;) {
- Node oldTail = tail;
- if (oldTail != null) {
- node.setPrevRelaxed(oldTail);
- if (compareAndSetTail(oldTail, node)) {
- oldTail.next = node;
- return node;
- }
- } else {
- initializeSyncQueue();
- }
- }
- }
-
- /**
- * Sets head of queue to be node, thus dequeuing. Called only by
- * acquire methods. Also nulls out unused fields for sake of GC
- * and to suppress unnecessary signals and traversals.
- *
- * @param node the node
- */
- private void setHead(Node node) {
- head = node;
- node.thread = null;
- node.prev = null;
+ private boolean casTail(Node c, Node v) {
+ return U.compareAndSetReference(this, TAIL, c, v);
}
- /**
- * Wakes up node's successor, if one exists.
- *
- * @param node the node
- */
- private void unparkSuccessor(Node node) {
- /*
- * If status is negative (i.e., possibly needing signal) try
- * to clear in anticipation of signalling. It is OK if this
- * fails or if status is changed by waiting thread.
- */
- int ws = node.waitStatus;
- if (ws < 0)
- node.compareAndSetWaitStatus(ws, 0);
-
- /*
- * Thread to unpark is held in successor, which is normally
- * just the next node. But if cancelled or apparently null,
- * traverse backwards from tail to find the actual
- * non-cancelled successor.
- */
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node p = tail; p != node && p != null; p = p.prev)
- if (p.waitStatus <= 0)
- s = p;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
+ /** tries once to CAS a new dummy node for head */
+ private void tryInitializeHead() {
+ Node h = new ExclusiveNode();
+ if (U.compareAndSetReference(this, HEAD, null, h))
+ tail = h;
}
/**
- * Release action for shared mode -- signals successor and ensures
- * propagation. (Note: For exclusive mode, release just amounts
- * to calling unparkSuccessor of head if it needs signal.)
- */
- private void doReleaseShared() {
- /*
- * Ensure that a release propagates, even if there are other
- * in-progress acquires/releases. This proceeds in the usual
- * way of trying to unparkSuccessor of head if it needs
- * signal. But if it does not, status is set to PROPAGATE to
- * ensure that upon release, propagation continues.
- * Additionally, we must loop in case a new node is added
- * while we are doing this. Also, unlike other uses of
- * unparkSuccessor, we need to know if CAS to reset status
- * fails, if so rechecking.
+ * Enqueues the node unless null. (Currently used only for
+ * ConditionNodes; other cases are interleaved with acquires.)
*/
+ final void enqueue(Node node) {
+ if (node != null) {
for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- if (ws == Node.SIGNAL) {
- if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
- continue; // loop to recheck cases
- unparkSuccessor(h);
- }
- else if (ws == 0 &&
- !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
+ Node t = tail;
+ node.setPrevRelaxed(t); // avoid unnecessary fence
+ if (t == null) // initialize
+ tryInitializeHead();
+ else if (casTail(t, node)) {
+ t.next = node;
+ if (t.status < 0) // wake up to clean link
+ LockSupport.unpark(node.waiter);
break;
}
}
-
- /**
- * Sets head of queue, and checks if successor may be waiting
- * in shared mode, if so propagating if either propagate > 0 or
- * PROPAGATE status was set.
- *
- * @param node the node
- * @param propagate the return value from a tryAcquireShared
- */
- private void setHeadAndPropagate(Node node, long propagate) {
- Node h = head; // Record old head for check below
- setHead(node);
- /*
- * Try to signal next queued node if:
- * Propagation was indicated by caller,
- * or was recorded (as h.waitStatus either before
- * or after setHead) by a previous operation
- * (note: this uses sign-check of waitStatus because
- * PROPAGATE status may transition to SIGNAL.)
- * and
- * The next node is waiting in shared mode,
- * or we don't know, because it appears null
- *
- * The conservatism in both of these checks may cause
- * unnecessary wake-ups, but only when there are multiple
- * racing acquires/releases, so most need signals now or soon
- * anyway.
- */
- if (propagate > 0 || h == null || h.waitStatus < 0 ||
- (h = head) == null || h.waitStatus < 0) {
- Node s = node.next;
- if (s == null || s.isShared())
- doReleaseShared();
}
}
- // Utilities for various versions of acquire
-
- /**
- * Cancels an ongoing attempt to acquire.
- *
- * @param node the node
- */
- private void cancelAcquire(Node node) {
- // Ignore if node doesn't exist
- if (node == null)
- return;
-
- node.thread = null;
-
- // Skip cancelled predecessors
- Node pred = node.prev;
- while (pred.waitStatus > 0)
- node.prev = pred = pred.prev;
-
- // predNext is the apparent node to unsplice. CASes below will
- // fail if not, in which case, we lost race vs another cancel
- // or signal, so no further action is necessary, although with
- // a possibility that a cancelled node may transiently remain
- // reachable.
- Node predNext = pred.next;
-
- // Can use unconditional write instead of CAS here.
- // After this atomic step, other Nodes can skip past us.
- // Before, we are free of interference from other threads.
- node.waitStatus = Node.CANCELLED;
-
- // If we are the tail, remove ourselves.
- if (node == tail && compareAndSetTail(node, pred)) {
- pred.compareAndSetNext(predNext, null);
- } else {
- // If successor needs signal, try to set pred's next-link
- // so it will get one. Otherwise wake it up to propagate.
- int ws;
- if (pred != head &&
- ((ws = pred.waitStatus) == Node.SIGNAL ||
- (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
- pred.thread != null) {
- Node next = node.next;
- if (next != null && next.waitStatus <= 0)
- pred.compareAndSetNext(predNext, next);
- } else {
- unparkSuccessor(node);
- }
-
- node.next = node; // help GC
- }
+ /** Returns true if node is found in traversal from tail */
+ final boolean isEnqueued(Node node) {
+ for (Node t = tail; t != null; t = t.prev)
+ if (t == node)
+ return true;
+ return false;
}
/**
- * Checks and updates status for a node that failed to acquire.
- * Returns true if thread should block. This is the main signal
- * control in all acquire loops. Requires that pred == node.prev.
- *
- * @param pred node's predecessor holding status
- * @param node the node
- * @return {@code true} if thread should block
+ * Wakes up the successor of given node, if one exists, and unsets its
+ * WAITING status to avoid park race. This may fail to wake up an
+ * eligible thread when one or more have been cancelled, but
+ * cancelAcquire ensures liveness.
*/
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- /*
- * This node has already set status asking a release
- * to signal it, so it can safely park.
- */
- return true;
- if (ws > 0) {
- /*
- * Predecessor was cancelled. Skip over predecessors and
- * indicate retry.
- */
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- /*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
- pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
+ private static void signalNext(Node h) {
+ Node s;
+ if (h != null && (s = h.next) != null && s.status != 0) {
+ s.getAndUnsetStatus(WAITING);
+ LockSupport.unpark(s.waiter);
}
- return false;
}
- /**
- * Convenience method to interrupt current thread.
- */
- static void selfInterrupt() {
- Thread.currentThread().interrupt();
+ /** Wakes up the given node if in shared mode */
+ private static void signalNextIfShared(Node h) {
+ Node s;
+ if (h != null && (s = h.next) != null &&
+ (s instanceof SharedNode) && s.status != 0) {
+ s.getAndUnsetStatus(WAITING);
+ LockSupport.unpark(s.waiter);
+ }
}
/**
- * Convenience method to park and then check if interrupted.
+ * Main acquire method, invoked by all exported acquire methods.
*
- * @return {@code true} if interrupted
- */
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
+ * @param node null unless a reacquiring Condition
+ * @param arg the acquire argument
+ * @param shared true if shared mode else exclusive
+ * @param interruptible if abort and return negative on interrupt
+ * @param timed if true use timed waits
+ * @param time if timed, the System.nanoTime value to timeout
+ * @return positive if acquired, 0 if timed out, negative if interrupted
+ */
+ final int acquire(Node node, long arg, boolean shared,
+ boolean interruptible, boolean timed, long time) {
+ Thread current = Thread.currentThread();
+ byte spins = 0, postSpins = 0; // retries upon unpark of first thread
+ boolean interrupted = false, first = false;
+ Node pred = null; // predecessor of node when enqueued
/*
- * Various flavors of acquire, varying in exclusive/shared and
- * control modes. Each is mostly the same, but annoyingly
- * different. Only a little bit of factoring is possible due to
- * interactions of exception mechanics (including ensuring that we
- * cancel if tryAcquire throws exception) and other control, at
- * least not without hurting performance too much.
+ * Repeatedly:
+ * Check if node now first
+ * if so, ensure head stable, else ensure valid predecessor
+ * if node is first or not yet enqueued, try acquiring
+ * else if node not yet created, create it
+ * else if not yet enqueued, try once to enqueue
+ * else if woken from park, retry (up to postSpins times)
+ * else if WAITING status not set, set and retry
+ * else park and clear WAITING status, and check cancellation
*/
- /**
- * Acquires in exclusive uninterruptible mode for thread already in
- * queue. Used by condition wait methods as well as acquire.
- *
- * @param node the node
- * @param arg the acquire argument
- * @return {@code true} if interrupted while waiting
- */
- final boolean acquireQueued(final Node node, long arg) {
- boolean interrupted = false;
- try {
for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node))
- interrupted |= parkAndCheckInterrupt();
- }
- } catch (Throwable t) {
- cancelAcquire(node);
- if (interrupted)
- selfInterrupt();
- throw t;
+ if (!first && (pred = (node == null) ? null : node.prev) != null &&
+ !(first = (head == pred))) {
+ if (pred.status < 0) {
+ cleanQueue(); // predecessor cancelled
+ continue;
+ } else if (pred.prev == null) {
+ Thread.onSpinWait(); // ensure serialization
+ continue;
}
}
-
- /**
- * Acquires in exclusive interruptible mode.
- * @param arg the acquire argument
- */
- private void doAcquireInterruptibly(long arg)
- throws InterruptedException {
- final Node node = addWaiter(Node.EXCLUSIVE);
+ if (first || pred == null) {
+ boolean acquired;
try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- return;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
+ if (shared)
+ acquired = (tryAcquireShared(arg) >= 0);
+ else
+ acquired = tryAcquire(arg);
+ } catch (Throwable ex) {
+ cancelAcquire(node, interrupted, false);
+ throw ex;
}
+ if (acquired) {
+ if (first) {
+ node.prev = null;
+ head = node;
+ pred.next = null;
+ node.waiter = null;
+ if (shared)
+ signalNextIfShared(node);
+ if (interrupted)
+ current.interrupt();
}
-
- /**
- * Acquires in exclusive timed mode.
- *
- * @param arg the acquire argument
- * @param nanosTimeout max wait time
- * @return {@code true} if acquired
- */
- private boolean doAcquireNanos(long arg, long nanosTimeout)
- throws InterruptedException {
- if (nanosTimeout <= 0L)
- return false;
- final long deadline = System.nanoTime() + nanosTimeout;
- final Node node = addWaiter(Node.EXCLUSIVE);
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- return true;
+ return 1;
}
- nanosTimeout = deadline - System.nanoTime();
- if (nanosTimeout <= 0L) {
- cancelAcquire(node);
- return false;
}
- if (shouldParkAfterFailedAcquire(p, node) &&
- nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanosTimeout);
- if (Thread.interrupted())
- throw new InterruptedException();
+ if (node == null) { // allocate; retry before enqueue
+ if (shared)
+ node = new SharedNode();
+ else
+ node = new ExclusiveNode();
+ } else if (pred == null) { // try to enqueue
+ node.waiter = current;
+ Node t = tail;
+ node.setPrevRelaxed(t); // avoid unnecessary fence
+ if (t == null)
+ tryInitializeHead();
+ else if (!casTail(t, node))
+ node.setPrevRelaxed(null); // back out
+ else
+ t.next = node;
+ } else if (first && spins != 0) {
+ --spins; // reduce unfairness on rewaits
+ Thread.onSpinWait();
+ } else if (node.status == 0) {
+ node.status = WAITING; // enable signal and recheck
+ } else {
+ long nanos;
+ spins = postSpins = (byte)((postSpins << 1) | 1);
+ if (!timed)
+ LockSupport.park(this);
+ else if ((nanos = time - System.nanoTime()) > 0L)
+ LockSupport.parkNanos(this, nanos);
+ else
+ break;
+ node.clearStatus();
+ if ((interrupted |= Thread.interrupted()) && interruptible)
+ break;
}
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
}
+ return cancelAcquire(node, interrupted, interruptible);
}
/**
- * Acquires in shared uninterruptible mode.
- * @param arg the acquire argument
+ * Possibly repeatedly traverses from tail, unsplicing cancelled
+ * nodes until none are found.
*/
- private void doAcquireShared(long arg) {
- final Node node = addWaiter(Node.SHARED);
- boolean interrupted = false;
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- long r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- return;
- }
- }
- if (shouldParkAfterFailedAcquire(p, node))
- interrupted |= parkAndCheckInterrupt();
- }
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
- } finally {
- if (interrupted)
- selfInterrupt();
+ private void cleanQueue() {
+ for (;;) { // restart point
+ for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
+ if (q == null || (p = q.prev) == null)
+ return; // end of list
+ if (s == null ? tail != q : (s.prev != q || s.status < 0))
+ break; // inconsistent
+ if (q.status < 0) { // cancelled
+ if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
+ q.prev == p) {
+ p.casNext(q, s); // OK if fails
+ if (p.prev == null)
+ signalNext(p);
}
+ break;
}
-
- /**
- * Acquires in shared interruptible mode.
- * @param arg the acquire argument
- */
- private void doAcquireSharedInterruptibly(long arg)
- throws InterruptedException {
- final Node node = addWaiter(Node.SHARED);
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- long r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- return;
+ if ((n = p.next) != q) { // help finish
+ if (n != null && q.prev == p) {
+ p.casNext(n, q);
+ if (p.prev == null)
+ signalNext(p);
}
+ break;
}
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
+ s = q;
+ q = q.prev;
}
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
}
}
/**
- * Acquires in shared timed mode.
+ * Cancels an ongoing attempt to acquire.
*
- * @param arg the acquire argument
- * @param nanosTimeout max wait time
- * @return {@code true} if acquired
- */
- private boolean doAcquireSharedNanos(long arg, long nanosTimeout)
- throws InterruptedException {
- if (nanosTimeout <= 0L)
- return false;
- final long deadline = System.nanoTime() + nanosTimeout;
- final Node node = addWaiter(Node.SHARED);
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- long r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- return true;
- }
- }
- nanosTimeout = deadline - System.nanoTime();
- if (nanosTimeout <= 0L) {
- cancelAcquire(node);
- return false;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanosTimeout);
- if (Thread.interrupted())
- throw new InterruptedException();
- }
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
+ * @param node the node (may be null if cancelled before enqueuing)
+ * @param interrupted true if thread interrupted
+ * @param interruptible if should report interruption vs reset
+ */
+ private int cancelAcquire(Node node, boolean interrupted,
+ boolean interruptible) {
+ if (node != null) {
+ node.waiter = null;
+ node.status = CANCELLED;
+ if (node.prev != null)
+ cleanQueue();
+ }
+ if (interrupted) {
+ if (interruptible)
+ return CANCELLED;
+ else
+ Thread.currentThread().interrupt();
}
+ return 0;
}
// Main exported methods
/**
@@ -754,13 +558,12 @@
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(long arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
+ if (!tryAcquire(arg))
+ acquire(null, arg, false, false, false, 0L);
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
@@ -775,14 +578,13 @@
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(long arg)
throws InterruptedException {
- if (Thread.interrupted())
+ if (Thread.interrupted() ||
+ (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
throw new InterruptedException();
- if (!tryAcquire(arg))
- doAcquireInterruptibly(arg);
}
/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
@@ -800,14 +602,23 @@
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(long arg, long nanosTimeout)
throws InterruptedException {
- if (Thread.interrupted())
+ if (!Thread.interrupted()) {
+ if (tryAcquire(arg))
+ return true;
+ if (nanosTimeout <= 0L)
+ return false;
+ int stat = acquire(null, arg, false, true, true,
+ System.nanoTime() + nanosTimeout);
+ if (stat > 0)
+ return true;
+ if (stat == 0)
+ return false;
+ }
throw new InterruptedException();
- return tryAcquire(arg) ||
- doAcquireNanos(arg, nanosTimeout);
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
@@ -818,13 +629,11 @@
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(long arg) {
if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
+ signalNext(head);
return true;
}
return false;
}
@@ -839,11 +648,11 @@
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(long arg) {
if (tryAcquireShared(arg) < 0)
- doAcquireShared(arg);
+ acquire(null, arg, true, false, false, 0L);
}
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
@@ -857,14 +666,14 @@
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(long arg)
throws InterruptedException {
- if (Thread.interrupted())
+ if (Thread.interrupted() ||
+ (tryAcquireShared(arg) < 0 &&
+ acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
}
/**
* Attempts to acquire in shared mode, aborting if interrupted, and
* failing if the given timeout elapses. Implemented by first
@@ -881,14 +690,23 @@
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
throws InterruptedException {
- if (Thread.interrupted())
+ if (!Thread.interrupted()) {
+ if (tryAcquireShared(arg) >= 0)
+ return true;
+ if (nanosTimeout <= 0L)
+ return false;
+ int stat = acquire(null, arg, true, true, true,
+ System.nanoTime() + nanosTimeout);
+ if (stat > 0)
+ return true;
+ if (stat == 0)
+ return false;
+ }
throw new InterruptedException();
- return tryAcquireShared(arg) >= 0 ||
- doAcquireSharedNanos(arg, nanosTimeout);
}
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
@@ -898,11 +716,11 @@
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(long arg) {
if (tryReleaseShared(arg)) {
- doReleaseShared();
+ signalNext(head);
return true;
}
return false;
}
@@ -916,11 +734,11 @@
*
* @return {@code true} if there may be other threads waiting to acquire
*/
public final boolean hasQueuedThreads() {
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
- if (p.waitStatus <= 0)
+ if (p.status >= 0)
return true;
return false;
}
/**
@@ -946,49 +764,20 @@
*
* @return the first (longest-waiting) thread in the queue, or
* {@code null} if no threads are currently queued
*/
public final Thread getFirstQueuedThread() {
- // handle only fast path, else relay
- return (head == tail) ? null : fullGetFirstQueuedThread();
- }
-
- /**
- * Version of getFirstQueuedThread called when fastpath fails.
- */
- private Thread fullGetFirstQueuedThread() {
- /*
- * The first node is normally head.next. Try to get its
- * thread field, ensuring consistent reads: If thread
- * field is nulled out or s.prev is no longer head, then
- * some other thread(s) concurrently performed setHead in
- * between some of our reads. We try this twice before
- * resorting to traversal.
- */
- Node h, s;
- Thread st;
- if (((h = head) != null && (s = h.next) != null &&
- s.prev == head && (st = s.thread) != null) ||
- ((h = head) != null && (s = h.next) != null &&
- s.prev == head && (st = s.thread) != null))
- return st;
-
- /*
- * Head's next field might not have been set yet, or may have
- * been unset after setHead. So we must check to see if tail
- * is actually first node. If not, we continue on, safely
- * traversing from tail back to head to find first,
- * guaranteeing termination.
- */
-
- Thread firstThread = null;
- for (Node p = tail; p != null && p != head; p = p.prev) {
- Thread t = p.thread;
- if (t != null)
- firstThread = t;
+ Thread first = null, w; Node h, s;
+ if ((h = head) != null && ((s = h.next) == null ||
+ (first = s.waiter) == null ||
+ s.prev == null)) {
+ // traverse from tail on stale reads
+ for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
+ if ((w = p.waiter) != null)
+ first = w;
}
- return firstThread;
+ return first;
}
/**
* Returns true if the given thread is currently queued.
*
@@ -1001,11 +790,11 @@
*/
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
- if (p.thread == thread)
+ if (p.waiter == thread)
return true;
return false;
}
/**
@@ -1017,14 +806,12 @@
* is not the first queued thread. Used only as a heuristic in
* ReentrantReadWriteLock.
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
- return (h = head) != null &&
- (s = h.next) != null &&
- !s.isShared() &&
- s.thread != null;
+ return (h = head) != null && (s = h.next) != null &&
+ !(s instanceof SharedNode) && s.waiter != null;
}
/**
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
@@ -1050,11 +837,11 @@
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
*
* <pre> {@code
- * protected boolean tryAcquire(int arg) {
+ * protected boolean tryAcquire(long arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
@@ -1067,23 +854,16 @@
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
- Node h, s;
- if ((h = head) != null) {
- if ((s = h.next) == null || s.waitStatus > 0) {
- s = null; // traverse in case of concurrent cancellation
- for (Node p = tail; p != h && p != null; p = p.prev) {
- if (p.waitStatus <= 0)
- s = p;
- }
- }
- if (s != null && s.thread != Thread.currentThread())
- return true;
- }
- return false;
+ Thread first = null; Node h, s;
+ if ((h = head) != null && ((s = h.next) == null ||
+ (first = s.waiter) == null ||
+ s.prev == null))
+ first = getFirstQueuedThread(); // retry via getFirstQueuedThread
+ return first != null && first != Thread.currentThread();
}
// Instrumentation and monitoring methods
/**
@@ -1096,11 +876,11 @@
* @return the estimated number of threads waiting to acquire
*/
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
- if (p.thread != null)
+ if (p.waiter != null)
++n;
}
return n;
}
@@ -1116,11 +896,11 @@
* @return the collection of threads
*/
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
- Thread t = p.thread;
+ Thread t = p.waiter;
if (t != null)
list.add(t);
}
return list;
}
@@ -1134,12 +914,12 @@
* @return the collection of threads
*/
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
- if (!p.isShared()) {
- Thread t = p.thread;
+ if (!(p instanceof SharedNode)) {
+ Thread t = p.waiter;
if (t != null)
list.add(t);
}
}
return list;
@@ -1154,12 +934,12 @@
* @return the collection of threads
*/
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
- if (p.isShared()) {
- Thread t = p.thread;
+ if (p instanceof SharedNode) {
+ Thread t = p.waiter;
if (t != null)
list.add(t);
}
}
return list;
@@ -1178,121 +958,10 @@
return super.toString()
+ "[State = " + getState() + ", "
+ (hasQueuedThreads() ? "non" : "") + "empty queue]";
}
-
- // Internal support methods for Conditions
-
- /**
- * Returns true if a node, always one that was initially placed on
- * a condition queue, is now waiting to reacquire on sync queue.
- * @param node the node
- * @return true if is reacquiring
- */
- final boolean isOnSyncQueue(Node node) {
- if (node.waitStatus == Node.CONDITION || node.prev == null)
- return false;
- if (node.next != null) // If has successor, it must be on queue
- return true;
- /*
- * node.prev can be non-null, but not yet on queue because
- * the CAS to place it on queue can fail. So we have to
- * traverse from tail to make sure it actually made it. It
- * will always be near the tail in calls to this method, and
- * unless the CAS failed (which is unlikely), it will be
- * there, so we hardly ever traverse much.
- */
- return findNodeFromTail(node);
- }
-
- /**
- * Returns true if node is on sync queue by searching backwards from tail.
- * Called only when needed by isOnSyncQueue.
- * @return true if present
- */
- private boolean findNodeFromTail(Node node) {
- // We check for node first, since it's likely to be at or near tail.
- // tail is known to be non-null, so we could re-order to "save"
- // one null check, but we leave it this way to help the VM.
- for (Node p = tail;;) {
- if (p == node)
- return true;
- if (p == null)
- return false;
- p = p.prev;
- }
- }
-
- /**
- * Transfers a node from a condition queue onto sync queue.
- * Returns true if successful.
- * @param node the node
- * @return true if successfully transferred (else the node was
- * cancelled before signal)
- */
- final boolean transferForSignal(Node node) {
- /*
- * If cannot change waitStatus, the node has been cancelled.
- */
- if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
- return false;
-
- /*
- * Splice onto queue and try to set waitStatus of predecessor to
- * indicate that thread is (probably) waiting. If cancelled or
- * attempt to set waitStatus fails, wake up to resync (in which
- * case the waitStatus can be transiently and harmlessly wrong).
- */
- Node p = enq(node);
- int ws = p.waitStatus;
- if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
- LockSupport.unpark(node.thread);
- return true;
- }
-
- /**
- * Transfers node, if necessary, to sync queue after a cancelled wait.
- * Returns true if thread was cancelled before being signalled.
- *
- * @param node the node
- * @return true if cancelled before the node was signalled
- */
- final boolean transferAfterCancelledWait(Node node) {
- if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
- enq(node);
- return true;
- }
- /*
- * If we lost out to a signal(), then we can't proceed
- * until it finishes its enq(). Cancelling during an
- * incomplete transfer is both rare and transient, so just
- * spin.
- */
- while (!isOnSyncQueue(node))
- Thread.yield();
- return false;
- }
-
- /**
- * Invokes release with current state value; returns saved state.
- * Cancels node and throws exception on failure.
- * @param node the condition node for this wait
- * @return previous sync state
- */
- final long fullyRelease(Node node) {
- try {
- long savedState = getState();
- if (release(savedState))
- return savedState;
- throw new IllegalMonitorStateException();
- } catch (Throwable t) {
- node.waitStatus = Node.CANCELLED;
- throw t;
- }
- }
-
// Instrumentation methods for conditions
/**
* Queries whether the given ConditionObject
* uses this synchronizer as its lock.
@@ -1382,145 +1051,132 @@
* condition semantics that rely on those of the associated
* {@code AbstractQueuedLongSynchronizer}.
*
* <p>This class is Serializable, but all fields are transient,
* so deserialized conditions have no waiters.
- *
- * @since 1.6
*/
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
- private transient Node firstWaiter;
+ private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
- private transient Node lastWaiter;
+ private transient ConditionNode lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
- // Internal methods
+ // Signalling methods
/**
- * Adds a new waiter to wait queue.
- * @return its new wait node
+ * Removes and transfers one or all waiters to sync queue.
*/
- private Node addConditionWaiter() {
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- Node t = lastWaiter;
- // If lastWaiter is cancelled, clean out.
- if (t != null && t.waitStatus != Node.CONDITION) {
- unlinkCancelledWaiters();
- t = lastWaiter;
- }
-
- Node node = new Node(Node.CONDITION);
-
- if (t == null)
- firstWaiter = node;
- else
- t.nextWaiter = node;
- lastWaiter = node;
- return node;
- }
-
- /**
- * Removes and transfers nodes until hit non-cancelled one or
- * null. Split out from signal in part to encourage compilers
- * to inline the case of no waiters.
- * @param first (non-null) the first node on condition queue
- */
- private void doSignal(Node first) {
- do {
- if ( (firstWaiter = first.nextWaiter) == null)
+ private void doSignal(ConditionNode first, boolean all) {
+ while (first != null) {
+ ConditionNode next = first.nextWaiter;
+ if ((firstWaiter = next) == null)
lastWaiter = null;
- first.nextWaiter = null;
- } while (!transferForSignal(first) &&
- (first = firstWaiter) != null);
+ if ((first.getAndUnsetStatus(COND) & COND) != 0) {
+ enqueue(first);
+ if (!all)
+ break;
}
-
- /**
- * Removes and transfers all nodes.
- * @param first (non-null) the first node on condition queue
- */
- private void doSignalAll(Node first) {
- lastWaiter = firstWaiter = null;
- do {
- Node next = first.nextWaiter;
- first.nextWaiter = null;
- transferForSignal(first);
first = next;
- } while (first != null);
}
-
- /**
- * Unlinks cancelled waiter nodes from condition queue.
- * Called only while holding lock. This is called when
- * cancellation occurred during condition wait, and upon
- * insertion of a new waiter when lastWaiter is seen to have
- * been cancelled. This method is needed to avoid garbage
- * retention in the absence of signals. So even though it may
- * require a full traversal, it comes into play only when
- * timeouts or cancellations occur in the absence of
- * signals. It traverses all nodes rather than stopping at a
- * particular target to unlink all pointers to garbage nodes
- * without requiring many re-traversals during cancellation
- * storms.
- */
- private void unlinkCancelledWaiters() {
- Node t = firstWaiter;
- Node trail = null;
- while (t != null) {
- Node next = t.nextWaiter;
- if (t.waitStatus != Node.CONDITION) {
- t.nextWaiter = null;
- if (trail == null)
- firstWaiter = next;
- else
- trail.nextWaiter = next;
- if (next == null)
- lastWaiter = trail;
- }
- else
- trail = t;
- t = next;
}
- }
-
- // public methods
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
+ ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
- Node first = firstWaiter;
if (first != null)
- doSignal(first);
+ doSignal(first, false);
}
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
+ ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
- Node first = firstWaiter;
if (first != null)
- doSignalAll(first);
+ doSignal(first, true);
+ }
+
+ // Waiting methods
+
+ /**
+ * Adds node to condition list and releases lock.
+ *
+ * @param node the node
+ * @return savedState to reacquire after wait
+ */
+ private long enableWait(ConditionNode node) {
+ if (isHeldExclusively()) {
+ node.waiter = Thread.currentThread();
+ node.setStatusRelaxed(COND | WAITING);
+ ConditionNode last = lastWaiter;
+ if (last == null)
+ firstWaiter = node;
+ else
+ last.nextWaiter = node;
+ lastWaiter = node;
+ long savedState = getState();
+ if (release(savedState))
+ return savedState;
+ }
+ node.status = CANCELLED; // lock not held or inconsistent
+ throw new IllegalMonitorStateException();
+ }
+
+ /**
+ * Returns true if a node that was initially placed on a condition
+ * queue is now ready to reacquire on sync queue.
+ * @param node the node
+ * @return true if is reacquiring
+ */
+ private boolean canReacquire(ConditionNode node) {
+ // check links, not status to avoid enqueue race
+ return node != null && node.prev != null && isEnqueued(node);
+ }
+
+ /**
+ * Unlinks the given node and other non-waiting nodes from
+ * condition queue unless already unlinked.
+ */
+ private void unlinkCancelledWaiters(ConditionNode node) {
+ if (node == null || node.nextWaiter != null || node == lastWaiter) {
+ ConditionNode w = firstWaiter, trail = null;
+ while (w != null) {
+ ConditionNode next = w.nextWaiter;
+ if ((w.status & COND) == 0) {
+ w.nextWaiter = null;
+ if (trail == null)
+ firstWaiter = next;
+ else
+ trail.nextWaiter = next;
+ if (next == null)
+ lastWaiter = trail;
+ } else
+ trail = w;
+ w = next;
+ }
+ }
}
/**
* Implements uninterruptible condition wait.
* <ol>
@@ -1531,55 +1187,31 @@
* <li>Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
*/
public final void awaitUninterruptibly() {
- Node node = addConditionWaiter();
- long savedState = fullyRelease(node);
+ ConditionNode node = new ConditionNode();
+ long savedState = enableWait(node);
+ LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false;
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
+ while (!canReacquire(node)) {
if (Thread.interrupted())
interrupted = true;
+ else if ((node.status & COND) != 0) {
+ try {
+ ForkJoinPool.managedBlock(node);
+ } catch (InterruptedException ie) {
+ interrupted = true;
}
- if (acquireQueued(node, savedState) || interrupted)
- selfInterrupt();
- }
-
- /*
- * For interruptible waits, we need to track whether to throw
- * InterruptedException, if interrupted while blocked on
- * condition, versus reinterrupt current thread, if
- * interrupted while blocked waiting to re-acquire.
- */
-
- /** Mode meaning to reinterrupt on exit from wait */
- private static final int REINTERRUPT = 1;
- /** Mode meaning to throw InterruptedException on exit from wait */
- private static final int THROW_IE = -1;
-
- /**
- * Checks for interrupt, returning THROW_IE if interrupted
- * before signalled, REINTERRUPT if after signalled, or
- * 0 if not interrupted.
- */
- private int checkInterruptWhileWaiting(Node node) {
- return Thread.interrupted() ?
- (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
- 0;
+ } else
+ Thread.onSpinWait(); // awoke while enqueuing
}
-
- /**
- * Throws InterruptedException, reinterrupts current thread, or
- * does nothing, depending on mode.
- */
- private void reportInterruptAfterWait(int interruptMode)
- throws InterruptedException {
- if (interruptMode == THROW_IE)
- throw new InterruptedException();
- else if (interruptMode == REINTERRUPT)
- selfInterrupt();
+ LockSupport.setCurrentBlocker(null);
+ node.clearStatus();
+ acquire(node, savedState, false, false, false, 0L);
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
/**
* Implements interruptible condition wait.
* <ol>
@@ -1594,24 +1226,37 @@
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
- Node node = addConditionWaiter();
- long savedState = fullyRelease(node);
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
+ ConditionNode node = new ConditionNode();
+ long savedState = enableWait(node);
+ LockSupport.setCurrentBlocker(this); // for back-compatibility
+ boolean interrupted = false, cancelled = false;
+ while (!canReacquire(node)) {
+ if (interrupted |= Thread.interrupted()) {
+ if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
+ break; // else interrupted after signal
+ } else if ((node.status & COND) != 0) {
+ try {
+ ForkJoinPool.managedBlock(node);
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+ } else
+ Thread.onSpinWait(); // awoke while enqueuing
+ }
+ LockSupport.setCurrentBlocker(null);
+ node.clearStatus();
+ acquire(node, savedState, false, false, false, 0L);
+ if (interrupted) {
+ if (cancelled) {
+ unlinkCancelledWaiters(node);
+ throw new InterruptedException();
+ }
+ Thread.currentThread().interrupt();
}
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
}
/**
* Implements timed condition wait.
* <ol>
@@ -1627,36 +1272,33 @@
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
- // We don't check for nanosTimeout <= 0L here, to allow
- // awaitNanos(0) as a way to "yield the lock".
- final long deadline = System.nanoTime() + nanosTimeout;
- long initialNanos = nanosTimeout;
- Node node = addConditionWaiter();
- long savedState = fullyRelease(node);
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- if (nanosTimeout <= 0L) {
- transferAfterCancelledWait(node);
- break;
- }
- if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanosTimeout);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
+ ConditionNode node = new ConditionNode();
+ long savedState = enableWait(node);
+ long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
+ long deadline = System.nanoTime() + nanos;
+ boolean cancelled = false, interrupted = false;
+ while (!canReacquire(node)) {
+ if ((interrupted |= Thread.interrupted()) ||
+ (nanos = deadline - System.nanoTime()) <= 0L) {
+ if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
- nanosTimeout = deadline - System.nanoTime();
+ } else
+ LockSupport.parkNanos(this, nanos);
}
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null)
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
+ node.clearStatus();
+ acquire(node, savedState, false, false, false, 0L);
+ if (cancelled) {
+ unlinkCancelledWaiters(node);
+ if (interrupted)
+ throw new InterruptedException();
+ } else if (interrupted)
+ Thread.currentThread().interrupt();
long remaining = deadline - System.nanoTime(); // avoid overflow
- return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
+ return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
}
/**
* Implements absolute timed condition wait.
* <ol>
@@ -1674,30 +1316,30 @@
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
- Node node = addConditionWaiter();
- long savedState = fullyRelease(node);
- boolean timedout = false;
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- if (System.currentTimeMillis() >= abstime) {
- timedout = transferAfterCancelledWait(node);
+ ConditionNode node = new ConditionNode();
+ long savedState = enableWait(node);
+ boolean cancelled = false, interrupted = false;
+ while (!canReacquire(node)) {
+ if ((interrupted |= Thread.interrupted()) ||
+ System.currentTimeMillis() >= abstime) {
+ if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
- }
+ } else
LockSupport.parkUntil(this, abstime);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
}
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null)
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- return !timedout;
+ node.clearStatus();
+ acquire(node, savedState, false, false, false, 0L);
+ if (cancelled) {
+ unlinkCancelledWaiters(node);
+ if (interrupted)
+ throw new InterruptedException();
+ } else if (interrupted)
+ Thread.currentThread().interrupt();
+ return !cancelled;
}
/**
* Implements timed condition wait.
* <ol>
@@ -1715,35 +1357,32 @@
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
- // We don't check for nanosTimeout <= 0L here, to allow
- // await(0, unit) as a way to "yield the lock".
- final long deadline = System.nanoTime() + nanosTimeout;
- Node node = addConditionWaiter();
- long savedState = fullyRelease(node);
- boolean timedout = false;
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- if (nanosTimeout <= 0L) {
- timedout = transferAfterCancelledWait(node);
+ ConditionNode node = new ConditionNode();
+ long savedState = enableWait(node);
+ long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
+ long deadline = System.nanoTime() + nanos;
+ boolean cancelled = false, interrupted = false;
+ while (!canReacquire(node)) {
+ if ((interrupted |= Thread.interrupted()) ||
+ (nanos = deadline - System.nanoTime()) <= 0L) {
+ if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
+ } else
+ LockSupport.parkNanos(this, nanos);
}
- if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanosTimeout);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- nanosTimeout = deadline - System.nanoTime();
- }
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null)
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- return !timedout;
+ node.clearStatus();
+ acquire(node, savedState, false, false, false, 0L);
+ if (cancelled) {
+ unlinkCancelledWaiters(node);
+ if (interrupted)
+ throw new InterruptedException();
+ } else if (interrupted)
+ Thread.currentThread().interrupt();
+ return !cancelled;
}
// support for instrumentation
/**
@@ -1765,12 +1404,12 @@
* returns {@code false}
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
- for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
- if (w.waitStatus == Node.CONDITION)
+ for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
+ if ((w.status & COND) != 0)
return true;
}
return false;
}
@@ -1785,12 +1424,12 @@
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
- for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
- if (w.waitStatus == Node.CONDITION)
+ for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
+ if ((w.status & COND) != 0)
++n;
}
return n;
}
@@ -1805,52 +1444,29 @@
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<>();
- for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
- if (w.waitStatus == Node.CONDITION) {
- Thread t = w.thread;
+ for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
+ if ((w.status & COND) != 0) {
+ Thread t = w.waiter;
if (t != null)
list.add(t);
}
}
return list;
}
}
- // VarHandle mechanics
- private static final VarHandle STATE;
- private static final VarHandle HEAD;
- private static final VarHandle TAIL;
+ // Unsafe
+ private static final Unsafe U = Unsafe.getUnsafe();
+ private static final long STATE
+ = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state");
+ private static final long HEAD
+ = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head");
+ private static final long TAIL
+ = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail");
static {
- try {
- MethodHandles.Lookup l = MethodHandles.lookup();
- STATE = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "state", long.class);
- HEAD = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "head", Node.class);
- TAIL = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "tail", Node.class);
- } catch (ReflectiveOperationException e) {
- throw new ExceptionInInitializerError(e);
- }
-
- // Reduce the risk of rare disastrous classloading in first call to
- // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
-
- /**
- * Initializes head and tail fields on first contention.
- */
- private final void initializeSyncQueue() {
- Node h;
- if (HEAD.compareAndSet(this, null, (h = new Node())))
- tail = h;
- }
-
- /**
- * CASes tail field.
- */
- private final boolean compareAndSetTail(Node expect, Node update) {
- return TAIL.compareAndSet(this, expect, update);
- }
}
< prev index next >