< prev index next >
src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
Print this page
8229442: AQS and lock classes refresh
Reviewed-by: martin
*** 33,49 ****
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent.locks;
- import java.lang.invoke.MethodHandles;
- import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
! import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node;
/**
* A version of {@link AbstractQueuedSynchronizer} in
* which synchronization state is maintained as a {@code long}.
* This class has exactly the same structure, properties, and methods
--- 33,48 ----
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent.locks;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
! import java.util.concurrent.ForkJoinPool;
! import jdk.internal.misc.Unsafe;
/**
* A version of {@link AbstractQueuedSynchronizer} in
* which synchronization state is maintained as a {@code long}.
* This class has exactly the same structure, properties, and methods
*** 71,97 ****
* exactly cloned from AbstractQueuedSynchronizer, replacing class
* name and changing ints related with sync state to longs. Please
* keep it that way.
*/
/**
! * Creates a new {@code AbstractQueuedLongSynchronizer} instance
! * with initial synchronization state of zero.
*/
! protected AbstractQueuedLongSynchronizer() { }
/**
! * Head of the wait queue, lazily initialized. Except for
! * initialization, it is modified only via method setHead. Note:
! * If head exists, its waitStatus is guaranteed not to be
! * CANCELLED.
*/
private transient volatile Node head;
/**
! * Tail of the wait queue, lazily initialized. Modified only via
! * method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
--- 70,149 ----
* exactly cloned from AbstractQueuedSynchronizer, replacing class
* name and changing ints related with sync state to longs. Please
* keep it that way.
*/
+ // Node status bits, also used as argument and return values
+ static final int WAITING = 1; // must be 1
+ static final int CANCELLED = 0x80000000; // must be negative
+ static final int COND = 2; // in a condition wait
+
+ /** CLH Nodes */
+ abstract static class Node {
+ volatile Node prev; // initially attached via casTail
+ volatile Node next; // visibly nonnull when signallable
+ Thread waiter; // visibly nonnull when enqueued
+ volatile int status; // written by owner, atomic bit ops by others
+
+ // methods for atomic operations
+ final boolean casPrev(Node c, Node v) { // for cleanQueue
+ return U.weakCompareAndSetReference(this, PREV, c, v);
+ }
+ final boolean casNext(Node c, Node v) { // for cleanQueue
+ return U.weakCompareAndSetReference(this, NEXT, c, v);
+ }
+ final int getAndUnsetStatus(int v) { // for signalling
+ return U.getAndBitwiseAndInt(this, STATUS, ~v);
+ }
+ final void setPrevRelaxed(Node p) { // for off-queue assignment
+ U.putReference(this, PREV, p);
+ }
+ final void setStatusRelaxed(int s) { // for off-queue assignment
+ U.putInt(this, STATUS, s);
+ }
+ final void clearStatus() { // for reducing unneeded signals
+ U.putIntOpaque(this, STATUS, 0);
+ }
+
+ private static final long STATUS
+ = U.objectFieldOffset(Node.class, "status");
+ private static final long NEXT
+ = U.objectFieldOffset(Node.class, "next");
+ private static final long PREV
+ = U.objectFieldOffset(Node.class, "prev");
+ }
+
+ // Concrete classes tagged by type
+ static final class ExclusiveNode extends Node { }
+ static final class SharedNode extends Node { }
+
+ static final class ConditionNode extends Node
+ implements ForkJoinPool.ManagedBlocker {
+ ConditionNode nextWaiter; // link to next waiting node
+
/**
! * Allows Conditions to be used in ForkJoinPools without
! * risking fixed pool exhaustion. This is usable only for
! * untimed Condition waits, not timed versions.
*/
! public final boolean isReleasable() {
! return status <= 1 || Thread.currentThread().isInterrupted();
! }
!
! public final boolean block() {
! while (!isReleasable()) LockSupport.park(this);
! return true;
! }
! }
/**
! * Head of the wait queue, lazily initialized.
*/
private transient volatile Node head;
/**
! * Tail of the wait queue. After initialization, modified only via casTail.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*** 111,122 ****
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(long newState) {
! // See JDK-8180620: Clarify VarHandle mixed-access subtleties
! STATE.setVolatile(this, newState);
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
--- 163,173 ----
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(long newState) {
! state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
*** 127,611 ****
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(long expect, long update) {
! return STATE.compareAndSet(this, expect, update);
}
// Queuing utilities
! /**
! * The number of nanoseconds for which it is faster to spin
! * rather than to use timed park. A rough estimate suffices
! * to improve responsiveness with very short timeouts.
! */
! static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
!
! /**
! * Inserts node into queue, initializing if necessary. See picture above.
! * @param node the node to insert
! * @return node's predecessor
! */
! private Node enq(Node node) {
! for (;;) {
! Node oldTail = tail;
! if (oldTail != null) {
! node.setPrevRelaxed(oldTail);
! if (compareAndSetTail(oldTail, node)) {
! oldTail.next = node;
! return oldTail;
! }
! } else {
! initializeSyncQueue();
! }
! }
! }
!
! /**
! * Creates and enqueues node for current thread and given mode.
! *
! * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
! * @return the new node
! */
! private Node addWaiter(Node mode) {
! Node node = new Node(mode);
!
! for (;;) {
! Node oldTail = tail;
! if (oldTail != null) {
! node.setPrevRelaxed(oldTail);
! if (compareAndSetTail(oldTail, node)) {
! oldTail.next = node;
! return node;
! }
! } else {
! initializeSyncQueue();
! }
! }
! }
!
! /**
! * Sets head of queue to be node, thus dequeuing. Called only by
! * acquire methods. Also nulls out unused fields for sake of GC
! * and to suppress unnecessary signals and traversals.
! *
! * @param node the node
! */
! private void setHead(Node node) {
! head = node;
! node.thread = null;
! node.prev = null;
}
! /**
! * Wakes up node's successor, if one exists.
! *
! * @param node the node
! */
! private void unparkSuccessor(Node node) {
! /*
! * If status is negative (i.e., possibly needing signal) try
! * to clear in anticipation of signalling. It is OK if this
! * fails or if status is changed by waiting thread.
! */
! int ws = node.waitStatus;
! if (ws < 0)
! node.compareAndSetWaitStatus(ws, 0);
!
! /*
! * Thread to unpark is held in successor, which is normally
! * just the next node. But if cancelled or apparently null,
! * traverse backwards from tail to find the actual
! * non-cancelled successor.
! */
! Node s = node.next;
! if (s == null || s.waitStatus > 0) {
! s = null;
! for (Node p = tail; p != node && p != null; p = p.prev)
! if (p.waitStatus <= 0)
! s = p;
! }
! if (s != null)
! LockSupport.unpark(s.thread);
}
/**
! * Release action for shared mode -- signals successor and ensures
! * propagation. (Note: For exclusive mode, release just amounts
! * to calling unparkSuccessor of head if it needs signal.)
! */
! private void doReleaseShared() {
! /*
! * Ensure that a release propagates, even if there are other
! * in-progress acquires/releases. This proceeds in the usual
! * way of trying to unparkSuccessor of head if it needs
! * signal. But if it does not, status is set to PROPAGATE to
! * ensure that upon release, propagation continues.
! * Additionally, we must loop in case a new node is added
! * while we are doing this. Also, unlike other uses of
! * unparkSuccessor, we need to know if CAS to reset status
! * fails, if so rechecking.
*/
for (;;) {
! Node h = head;
! if (h != null && h != tail) {
! int ws = h.waitStatus;
! if (ws == Node.SIGNAL) {
! if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
! continue; // loop to recheck cases
! unparkSuccessor(h);
! }
! else if (ws == 0 &&
! !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
! continue; // loop on failed CAS
! }
! if (h == head) // loop if head changed
break;
}
}
-
- /**
- * Sets head of queue, and checks if successor may be waiting
- * in shared mode, if so propagating if either propagate > 0 or
- * PROPAGATE status was set.
- *
- * @param node the node
- * @param propagate the return value from a tryAcquireShared
- */
- private void setHeadAndPropagate(Node node, long propagate) {
- Node h = head; // Record old head for check below
- setHead(node);
- /*
- * Try to signal next queued node if:
- * Propagation was indicated by caller,
- * or was recorded (as h.waitStatus either before
- * or after setHead) by a previous operation
- * (note: this uses sign-check of waitStatus because
- * PROPAGATE status may transition to SIGNAL.)
- * and
- * The next node is waiting in shared mode,
- * or we don't know, because it appears null
- *
- * The conservatism in both of these checks may cause
- * unnecessary wake-ups, but only when there are multiple
- * racing acquires/releases, so most need signals now or soon
- * anyway.
- */
- if (propagate > 0 || h == null || h.waitStatus < 0 ||
- (h = head) == null || h.waitStatus < 0) {
- Node s = node.next;
- if (s == null || s.isShared())
- doReleaseShared();
}
}
! // Utilities for various versions of acquire
!
! /**
! * Cancels an ongoing attempt to acquire.
! *
! * @param node the node
! */
! private void cancelAcquire(Node node) {
! // Ignore if node doesn't exist
! if (node == null)
! return;
!
! node.thread = null;
!
! // Skip cancelled predecessors
! Node pred = node.prev;
! while (pred.waitStatus > 0)
! node.prev = pred = pred.prev;
!
! // predNext is the apparent node to unsplice. CASes below will
! // fail if not, in which case, we lost race vs another cancel
! // or signal, so no further action is necessary, although with
! // a possibility that a cancelled node may transiently remain
! // reachable.
! Node predNext = pred.next;
!
! // Can use unconditional write instead of CAS here.
! // After this atomic step, other Nodes can skip past us.
! // Before, we are free of interference from other threads.
! node.waitStatus = Node.CANCELLED;
!
! // If we are the tail, remove ourselves.
! if (node == tail && compareAndSetTail(node, pred)) {
! pred.compareAndSetNext(predNext, null);
! } else {
! // If successor needs signal, try to set pred's next-link
! // so it will get one. Otherwise wake it up to propagate.
! int ws;
! if (pred != head &&
! ((ws = pred.waitStatus) == Node.SIGNAL ||
! (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
! pred.thread != null) {
! Node next = node.next;
! if (next != null && next.waitStatus <= 0)
! pred.compareAndSetNext(predNext, next);
! } else {
! unparkSuccessor(node);
! }
!
! node.next = node; // help GC
! }
}
/**
! * Checks and updates status for a node that failed to acquire.
! * Returns true if thread should block. This is the main signal
! * control in all acquire loops. Requires that pred == node.prev.
! *
! * @param pred node's predecessor holding status
! * @param node the node
! * @return {@code true} if thread should block
*/
! private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
! int ws = pred.waitStatus;
! if (ws == Node.SIGNAL)
! /*
! * This node has already set status asking a release
! * to signal it, so it can safely park.
! */
! return true;
! if (ws > 0) {
! /*
! * Predecessor was cancelled. Skip over predecessors and
! * indicate retry.
! */
! do {
! node.prev = pred = pred.prev;
! } while (pred.waitStatus > 0);
! pred.next = node;
! } else {
! /*
! * waitStatus must be 0 or PROPAGATE. Indicate that we
! * need a signal, but don't park yet. Caller will need to
! * retry to make sure it cannot acquire before parking.
! */
! pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
- return false;
}
! /**
! * Convenience method to interrupt current thread.
! */
! static void selfInterrupt() {
! Thread.currentThread().interrupt();
}
/**
! * Convenience method to park and then check if interrupted.
*
! * @return {@code true} if interrupted
! */
! private final boolean parkAndCheckInterrupt() {
! LockSupport.park(this);
! return Thread.interrupted();
! }
/*
! * Various flavors of acquire, varying in exclusive/shared and
! * control modes. Each is mostly the same, but annoyingly
! * different. Only a little bit of factoring is possible due to
! * interactions of exception mechanics (including ensuring that we
! * cancel if tryAcquire throws exception) and other control, at
! * least not without hurting performance too much.
*/
- /**
- * Acquires in exclusive uninterruptible mode for thread already in
- * queue. Used by condition wait methods as well as acquire.
- *
- * @param node the node
- * @param arg the acquire argument
- * @return {@code true} if interrupted while waiting
- */
- final boolean acquireQueued(final Node node, long arg) {
- boolean interrupted = false;
- try {
for (;;) {
! final Node p = node.predecessor();
! if (p == head && tryAcquire(arg)) {
! setHead(node);
! p.next = null; // help GC
! return interrupted;
! }
! if (shouldParkAfterFailedAcquire(p, node))
! interrupted |= parkAndCheckInterrupt();
! }
! } catch (Throwable t) {
! cancelAcquire(node);
! if (interrupted)
! selfInterrupt();
! throw t;
}
}
!
! /**
! * Acquires in exclusive interruptible mode.
! * @param arg the acquire argument
! */
! private void doAcquireInterruptibly(long arg)
! throws InterruptedException {
! final Node node = addWaiter(Node.EXCLUSIVE);
try {
! for (;;) {
! final Node p = node.predecessor();
! if (p == head && tryAcquire(arg)) {
! setHead(node);
! p.next = null; // help GC
! return;
! }
! if (shouldParkAfterFailedAcquire(p, node) &&
! parkAndCheckInterrupt())
! throw new InterruptedException();
! }
! } catch (Throwable t) {
! cancelAcquire(node);
! throw t;
}
}
!
! /**
! * Acquires in exclusive timed mode.
! *
! * @param arg the acquire argument
! * @param nanosTimeout max wait time
! * @return {@code true} if acquired
! */
! private boolean doAcquireNanos(long arg, long nanosTimeout)
! throws InterruptedException {
! if (nanosTimeout <= 0L)
! return false;
! final long deadline = System.nanoTime() + nanosTimeout;
! final Node node = addWaiter(Node.EXCLUSIVE);
! try {
! for (;;) {
! final Node p = node.predecessor();
! if (p == head && tryAcquire(arg)) {
! setHead(node);
! p.next = null; // help GC
! return true;
}
- nanosTimeout = deadline - System.nanoTime();
- if (nanosTimeout <= 0L) {
- cancelAcquire(node);
- return false;
}
! if (shouldParkAfterFailedAcquire(p, node) &&
! nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
! LockSupport.parkNanos(this, nanosTimeout);
! if (Thread.interrupted())
! throw new InterruptedException();
}
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
}
}
/**
! * Acquires in shared uninterruptible mode.
! * @param arg the acquire argument
*/
! private void doAcquireShared(long arg) {
! final Node node = addWaiter(Node.SHARED);
! boolean interrupted = false;
! try {
! for (;;) {
! final Node p = node.predecessor();
! if (p == head) {
! long r = tryAcquireShared(arg);
! if (r >= 0) {
! setHeadAndPropagate(node, r);
! p.next = null; // help GC
! return;
! }
! }
! if (shouldParkAfterFailedAcquire(p, node))
! interrupted |= parkAndCheckInterrupt();
! }
! } catch (Throwable t) {
! cancelAcquire(node);
! throw t;
! } finally {
! if (interrupted)
! selfInterrupt();
}
}
!
! /**
! * Acquires in shared interruptible mode.
! * @param arg the acquire argument
! */
! private void doAcquireSharedInterruptibly(long arg)
! throws InterruptedException {
! final Node node = addWaiter(Node.SHARED);
! try {
! for (;;) {
! final Node p = node.predecessor();
! if (p == head) {
! long r = tryAcquireShared(arg);
! if (r >= 0) {
! setHeadAndPropagate(node, r);
! p.next = null; // help GC
! return;
}
}
! if (shouldParkAfterFailedAcquire(p, node) &&
! parkAndCheckInterrupt())
! throw new InterruptedException();
}
- } catch (Throwable t) {
- cancelAcquire(node);
- throw t;
}
}
/**
! * Acquires in shared timed mode.
*
! * @param arg the acquire argument
! * @param nanosTimeout max wait time
! * @return {@code true} if acquired
! */
! private boolean doAcquireSharedNanos(long arg, long nanosTimeout)
! throws InterruptedException {
! if (nanosTimeout <= 0L)
! return false;
! final long deadline = System.nanoTime() + nanosTimeout;
! final Node node = addWaiter(Node.SHARED);
! try {
! for (;;) {
! final Node p = node.predecessor();
! if (p == head) {
! long r = tryAcquireShared(arg);
! if (r >= 0) {
! setHeadAndPropagate(node, r);
! p.next = null; // help GC
! return true;
! }
! }
! nanosTimeout = deadline - System.nanoTime();
! if (nanosTimeout <= 0L) {
! cancelAcquire(node);
! return false;
! }
! if (shouldParkAfterFailedAcquire(p, node) &&
! nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
! LockSupport.parkNanos(this, nanosTimeout);
! if (Thread.interrupted())
! throw new InterruptedException();
! }
! } catch (Throwable t) {
! cancelAcquire(node);
! throw t;
}
}
// Main exported methods
/**
--- 178,415 ----
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(long expect, long update) {
! return U.compareAndSetLong(this, STATE, expect, update);
}
// Queuing utilities
! private boolean casTail(Node c, Node v) {
! return U.compareAndSetReference(this, TAIL, c, v);
}
! /** tries once to CAS a new dummy node for head */
! private void tryInitializeHead() {
! Node h = new ExclusiveNode();
! if (U.compareAndSetReference(this, HEAD, null, h))
! tail = h;
}
/**
! * Enqueues the node unless null. (Currently used only for
! * ConditionNodes; other cases are interleaved with acquires.)
*/
+ final void enqueue(Node node) {
+ if (node != null) {
for (;;) {
! Node t = tail;
! node.setPrevRelaxed(t); // avoid unnecessary fence
! if (t == null) // initialize
! tryInitializeHead();
! else if (casTail(t, node)) {
! t.next = node;
! if (t.status < 0) // wake up to clean link
! LockSupport.unpark(node.waiter);
break;
}
}
}
}
! /** Returns true if node is found in traversal from tail */
! final boolean isEnqueued(Node node) {
! for (Node t = tail; t != null; t = t.prev)
! if (t == node)
! return true;
! return false;
}
/**
! * Wakes up the successor of given node, if one exists, and unsets its
! * WAITING status to avoid park race. This may fail to wake up an
! * eligible thread when one or more have been cancelled, but
! * cancelAcquire ensures liveness.
*/
! private static void signalNext(Node h) {
! Node s;
! if (h != null && (s = h.next) != null && s.status != 0) {
! s.getAndUnsetStatus(WAITING);
! LockSupport.unpark(s.waiter);
}
}
! /** Wakes up the given node if in shared mode */
! private static void signalNextIfShared(Node h) {
! Node s;
! if (h != null && (s = h.next) != null &&
! (s instanceof SharedNode) && s.status != 0) {
! s.getAndUnsetStatus(WAITING);
! LockSupport.unpark(s.waiter);
! }
}
/**
! * Main acquire method, invoked by all exported acquire methods.
*
! * @param node null unless a reacquiring Condition
! * @param arg the acquire argument
! * @param shared true if shared mode else exclusive
! * @param interruptible if abort and return negative on interrupt
! * @param timed if true use timed waits
! * @param time if timed, the System.nanoTime value to timeout
! * @return positive if acquired, 0 if timed out, negative if interrupted
! */
! final int acquire(Node node, long arg, boolean shared,
! boolean interruptible, boolean timed, long time) {
! Thread current = Thread.currentThread();
! byte spins = 0, postSpins = 0; // retries upon unpark of first thread
! boolean interrupted = false, first = false;
! Node pred = null; // predecessor of node when enqueued
/*
! * Repeatedly:
! * Check if node now first
! * if so, ensure head stable, else ensure valid predecessor
! * if node is first or not yet enqueued, try acquiring
! * else if node not yet created, create it
! * else if not yet enqueued, try once to enqueue
! * else if woken from park, retry (up to postSpins times)
! * else if WAITING status not set, set and retry
! * else park and clear WAITING status, and check cancellation
*/
for (;;) {
! if (!first && (pred = (node == null) ? null : node.prev) != null &&
! !(first = (head == pred))) {
! if (pred.status < 0) {
! cleanQueue(); // predecessor cancelled
! continue;
! } else if (pred.prev == null) {
! Thread.onSpinWait(); // ensure serialization
! continue;
}
}
! if (first || pred == null) {
! boolean acquired;
try {
! if (shared)
! acquired = (tryAcquireShared(arg) >= 0);
! else
! acquired = tryAcquire(arg);
! } catch (Throwable ex) {
! cancelAcquire(node, interrupted, false);
! throw ex;
}
+ if (acquired) {
+ if (first) {
+ node.prev = null;
+ head = node;
+ pred.next = null;
+ node.waiter = null;
+ if (shared)
+ signalNextIfShared(node);
+ if (interrupted)
+ current.interrupt();
}
! return 1;
}
}
! if (node == null) { // allocate; retry before enqueue
! if (shared)
! node = new SharedNode();
! else
! node = new ExclusiveNode();
! } else if (pred == null) { // try to enqueue
! node.waiter = current;
! Node t = tail;
! node.setPrevRelaxed(t); // avoid unnecessary fence
! if (t == null)
! tryInitializeHead();
! else if (!casTail(t, node))
! node.setPrevRelaxed(null); // back out
! else
! t.next = node;
! } else if (first && spins != 0) {
! --spins; // reduce unfairness on rewaits
! Thread.onSpinWait();
! } else if (node.status == 0) {
! node.status = WAITING; // enable signal and recheck
! } else {
! long nanos;
! spins = postSpins = (byte)((postSpins << 1) | 1);
! if (!timed)
! LockSupport.park(this);
! else if ((nanos = time - System.nanoTime()) > 0L)
! LockSupport.parkNanos(this, nanos);
! else
! break;
! node.clearStatus();
! if ((interrupted |= Thread.interrupted()) && interruptible)
! break;
}
}
+ return cancelAcquire(node, interrupted, interruptible);
}
/**
! * Possibly repeatedly traverses from tail, unsplicing cancelled
! * nodes until none are found.
*/
! private void cleanQueue() {
! for (;;) { // restart point
! for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
! if (q == null || (p = q.prev) == null)
! return; // end of list
! if (s == null ? tail != q : (s.prev != q || s.status < 0))
! break; // inconsistent
! if (q.status < 0) { // cancelled
! if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
! q.prev == p) {
! p.casNext(q, s); // OK if fails
! if (p.prev == null)
! signalNext(p);
}
+ break;
}
! if ((n = p.next) != q) { // help finish
! if (n != null && q.prev == p) {
! p.casNext(n, q);
! if (p.prev == null)
! signalNext(p);
}
+ break;
}
! s = q;
! q = q.prev;
}
}
}
/**
! * Cancels an ongoing attempt to acquire.
*
! * @param node the node (may be null if cancelled before enqueuing)
! * @param interrupted true if thread interrupted
! * @param interruptible if should report interruption vs reset
! */
! private int cancelAcquire(Node node, boolean interrupted,
! boolean interruptible) {
! if (node != null) {
! node.waiter = null;
! node.status = CANCELLED;
! if (node.prev != null)
! cleanQueue();
! }
! if (interrupted) {
! if (interruptible)
! return CANCELLED;
! else
! Thread.currentThread().interrupt();
}
+ return 0;
}
// Main exported methods
/**
*** 754,766 ****
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(long arg) {
! if (!tryAcquire(arg) &&
! acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
! selfInterrupt();
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
--- 558,569 ----
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(long arg) {
! if (!tryAcquire(arg))
! acquire(null, arg, false, false, false, 0L);
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
*** 775,788 ****
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(long arg)
throws InterruptedException {
! if (Thread.interrupted())
throw new InterruptedException();
- if (!tryAcquire(arg))
- doAcquireInterruptibly(arg);
}
/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
--- 578,590 ----
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(long arg)
throws InterruptedException {
! if (Thread.interrupted() ||
! (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
throw new InterruptedException();
}
/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
*** 800,813 ****
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(long arg, long nanosTimeout)
throws InterruptedException {
! if (Thread.interrupted())
throw new InterruptedException();
- return tryAcquire(arg) ||
- doAcquireNanos(arg, nanosTimeout);
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
--- 602,624 ----
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(long arg, long nanosTimeout)
throws InterruptedException {
! if (!Thread.interrupted()) {
! if (tryAcquire(arg))
! return true;
! if (nanosTimeout <= 0L)
! return false;
! int stat = acquire(null, arg, false, true, true,
! System.nanoTime() + nanosTimeout);
! if (stat > 0)
! return true;
! if (stat == 0)
! return false;
! }
throw new InterruptedException();
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
*** 818,830 ****
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(long arg) {
if (tryRelease(arg)) {
! Node h = head;
! if (h != null && h.waitStatus != 0)
! unparkSuccessor(h);
return true;
}
return false;
}
--- 629,639 ----
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(long arg) {
if (tryRelease(arg)) {
! signalNext(head);
return true;
}
return false;
}
*** 839,849 ****
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(long arg) {
if (tryAcquireShared(arg) < 0)
! doAcquireShared(arg);
}
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
--- 648,658 ----
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(long arg) {
if (tryAcquireShared(arg) < 0)
! acquire(null, arg, true, false, false, 0L);
}
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
*** 857,870 ****
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(long arg)
throws InterruptedException {
! if (Thread.interrupted())
throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
}
/**
* Attempts to acquire in shared mode, aborting if interrupted, and
* failing if the given timeout elapses. Implemented by first
--- 666,679 ----
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(long arg)
throws InterruptedException {
! if (Thread.interrupted() ||
! (tryAcquireShared(arg) < 0 &&
! acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}
/**
* Attempts to acquire in shared mode, aborting if interrupted, and
* failing if the given timeout elapses. Implemented by first
*** 881,894 ****
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
throws InterruptedException {
! if (Thread.interrupted())
throw new InterruptedException();
- return tryAcquireShared(arg) >= 0 ||
- doAcquireSharedNanos(arg, nanosTimeout);
}
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
--- 690,712 ----
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
throws InterruptedException {
! if (!Thread.interrupted()) {
! if (tryAcquireShared(arg) >= 0)
! return true;
! if (nanosTimeout <= 0L)
! return false;
! int stat = acquire(null, arg, true, true, true,
! System.nanoTime() + nanosTimeout);
! if (stat > 0)
! return true;
! if (stat == 0)
! return false;
! }
throw new InterruptedException();
}
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*** 898,908 ****
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(long arg) {
if (tryReleaseShared(arg)) {
! doReleaseShared();
return true;
}
return false;
}
--- 716,726 ----
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(long arg) {
if (tryReleaseShared(arg)) {
! signalNext(head);
return true;
}
return false;
}
*** 916,926 ****
*
* @return {@code true} if there may be other threads waiting to acquire
*/
public final boolean hasQueuedThreads() {
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
! if (p.waitStatus <= 0)
return true;
return false;
}
/**
--- 734,744 ----
*
* @return {@code true} if there may be other threads waiting to acquire
*/
public final boolean hasQueuedThreads() {
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
! if (p.status >= 0)
return true;
return false;
}
/**
*** 946,994 ****
*
* @return the first (longest-waiting) thread in the queue, or
* {@code null} if no threads are currently queued
*/
public final Thread getFirstQueuedThread() {
! // handle only fast path, else relay
! return (head == tail) ? null : fullGetFirstQueuedThread();
! }
!
! /**
! * Version of getFirstQueuedThread called when fastpath fails.
! */
! private Thread fullGetFirstQueuedThread() {
! /*
! * The first node is normally head.next. Try to get its
! * thread field, ensuring consistent reads: If thread
! * field is nulled out or s.prev is no longer head, then
! * some other thread(s) concurrently performed setHead in
! * between some of our reads. We try this twice before
! * resorting to traversal.
! */
! Node h, s;
! Thread st;
! if (((h = head) != null && (s = h.next) != null &&
! s.prev == head && (st = s.thread) != null) ||
! ((h = head) != null && (s = h.next) != null &&
! s.prev == head && (st = s.thread) != null))
! return st;
!
! /*
! * Head's next field might not have been set yet, or may have
! * been unset after setHead. So we must check to see if tail
! * is actually first node. If not, we continue on, safely
! * traversing from tail back to head to find first,
! * guaranteeing termination.
! */
!
! Thread firstThread = null;
! for (Node p = tail; p != null && p != head; p = p.prev) {
! Thread t = p.thread;
! if (t != null)
! firstThread = t;
}
! return firstThread;
}
/**
* Returns true if the given thread is currently queued.
*
--- 764,783 ----
*
* @return the first (longest-waiting) thread in the queue, or
* {@code null} if no threads are currently queued
*/
public final Thread getFirstQueuedThread() {
! Thread first = null, w; Node h, s;
! if ((h = head) != null && ((s = h.next) == null ||
! (first = s.waiter) == null ||
! s.prev == null)) {
! // traverse from tail on stale reads
! for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
! if ((w = p.waiter) != null)
! first = w;
}
! return first;
}
/**
* Returns true if the given thread is currently queued.
*
*** 1001,1011 ****
*/
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
! if (p.thread == thread)
return true;
return false;
}
/**
--- 790,800 ----
*/
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
! if (p.waiter == thread)
return true;
return false;
}
/**
*** 1017,1030 ****
* is not the first queued thread. Used only as a heuristic in
* ReentrantReadWriteLock.
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
! return (h = head) != null &&
! (s = h.next) != null &&
! !s.isShared() &&
! s.thread != null;
}
/**
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
--- 806,817 ----
* is not the first queued thread. Used only as a heuristic in
* ReentrantReadWriteLock.
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
! return (h = head) != null && (s = h.next) != null &&
! !(s instanceof SharedNode) && s.waiter != null;
}
/**
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
*** 1050,1060 ****
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
*
* <pre> {@code
! * protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
--- 837,847 ----
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
*
* <pre> {@code
! * protected boolean tryAcquire(long arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
*** 1067,1089 ****
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
! Node h, s;
! if ((h = head) != null) {
! if ((s = h.next) == null || s.waitStatus > 0) {
! s = null; // traverse in case of concurrent cancellation
! for (Node p = tail; p != h && p != null; p = p.prev) {
! if (p.waitStatus <= 0)
! s = p;
! }
! }
! if (s != null && s.thread != Thread.currentThread())
! return true;
! }
! return false;
}
// Instrumentation and monitoring methods
/**
--- 854,869 ----
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
! Thread first = null; Node h, s;
! if ((h = head) != null && ((s = h.next) == null ||
! (first = s.waiter) == null ||
! s.prev == null))
! first = getFirstQueuedThread(); // retry via getFirstQueuedThread
! return first != null && first != Thread.currentThread();
}
// Instrumentation and monitoring methods
/**
*** 1096,1106 ****
* @return the estimated number of threads waiting to acquire
*/
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
! if (p.thread != null)
++n;
}
return n;
}
--- 876,886 ----
* @return the estimated number of threads waiting to acquire
*/
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
! if (p.waiter != null)
++n;
}
return n;
}
*** 1116,1126 ****
* @return the collection of threads
*/
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
! Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
--- 896,906 ----
* @return the collection of threads
*/
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
! Thread t = p.waiter;
if (t != null)
list.add(t);
}
return list;
}
*** 1134,1145 ****
* @return the collection of threads
*/
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
! if (!p.isShared()) {
! Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
--- 914,925 ----
* @return the collection of threads
*/
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
! if (!(p instanceof SharedNode)) {
! Thread t = p.waiter;
if (t != null)
list.add(t);
}
}
return list;
*** 1154,1165 ****
* @return the collection of threads
*/
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
! if (p.isShared()) {
! Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
--- 934,945 ----
* @return the collection of threads
*/
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<>();
for (Node p = tail; p != null; p = p.prev) {
! if (p instanceof SharedNode) {
! Thread t = p.waiter;
if (t != null)
list.add(t);
}
}
return list;
*** 1178,1298 ****
return super.toString()
+ "[State = " + getState() + ", "
+ (hasQueuedThreads() ? "non" : "") + "empty queue]";
}
-
- // Internal support methods for Conditions
-
- /**
- * Returns true if a node, always one that was initially placed on
- * a condition queue, is now waiting to reacquire on sync queue.
- * @param node the node
- * @return true if is reacquiring
- */
- final boolean isOnSyncQueue(Node node) {
- if (node.waitStatus == Node.CONDITION || node.prev == null)
- return false;
- if (node.next != null) // If has successor, it must be on queue
- return true;
- /*
- * node.prev can be non-null, but not yet on queue because
- * the CAS to place it on queue can fail. So we have to
- * traverse from tail to make sure it actually made it. It
- * will always be near the tail in calls to this method, and
- * unless the CAS failed (which is unlikely), it will be
- * there, so we hardly ever traverse much.
- */
- return findNodeFromTail(node);
- }
-
- /**
- * Returns true if node is on sync queue by searching backwards from tail.
- * Called only when needed by isOnSyncQueue.
- * @return true if present
- */
- private boolean findNodeFromTail(Node node) {
- // We check for node first, since it's likely to be at or near tail.
- // tail is known to be non-null, so we could re-order to "save"
- // one null check, but we leave it this way to help the VM.
- for (Node p = tail;;) {
- if (p == node)
- return true;
- if (p == null)
- return false;
- p = p.prev;
- }
- }
-
- /**
- * Transfers a node from a condition queue onto sync queue.
- * Returns true if successful.
- * @param node the node
- * @return true if successfully transferred (else the node was
- * cancelled before signal)
- */
- final boolean transferForSignal(Node node) {
- /*
- * If cannot change waitStatus, the node has been cancelled.
- */
- if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
- return false;
-
- /*
- * Splice onto queue and try to set waitStatus of predecessor to
- * indicate that thread is (probably) waiting. If cancelled or
- * attempt to set waitStatus fails, wake up to resync (in which
- * case the waitStatus can be transiently and harmlessly wrong).
- */
- Node p = enq(node);
- int ws = p.waitStatus;
- if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
- LockSupport.unpark(node.thread);
- return true;
- }
-
- /**
- * Transfers node, if necessary, to sync queue after a cancelled wait.
- * Returns true if thread was cancelled before being signalled.
- *
- * @param node the node
- * @return true if cancelled before the node was signalled
- */
- final boolean transferAfterCancelledWait(Node node) {
- if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
- enq(node);
- return true;
- }
- /*
- * If we lost out to a signal(), then we can't proceed
- * until it finishes its enq(). Cancelling during an
- * incomplete transfer is both rare and transient, so just
- * spin.
- */
- while (!isOnSyncQueue(node))
- Thread.yield();
- return false;
- }
-
- /**
- * Invokes release with current state value; returns saved state.
- * Cancels node and throws exception on failure.
- * @param node the condition node for this wait
- * @return previous sync state
- */
- final long fullyRelease(Node node) {
- try {
- long savedState = getState();
- if (release(savedState))
- return savedState;
- throw new IllegalMonitorStateException();
- } catch (Throwable t) {
- node.waitStatus = Node.CANCELLED;
- throw t;
- }
- }
-
// Instrumentation methods for conditions
/**
* Queries whether the given ConditionObject
* uses this synchronizer as its lock.
--- 958,967 ----
*** 1382,1526 ****
* condition semantics that rely on those of the associated
* {@code AbstractQueuedLongSynchronizer}.
*
* <p>This class is Serializable, but all fields are transient,
* so deserialized conditions have no waiters.
- *
- * @since 1.6
*/
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
! private transient Node firstWaiter;
/** Last node of condition queue. */
! private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
! // Internal methods
/**
! * Adds a new waiter to wait queue.
! * @return its new wait node
*/
! private Node addConditionWaiter() {
! if (!isHeldExclusively())
! throw new IllegalMonitorStateException();
! Node t = lastWaiter;
! // If lastWaiter is cancelled, clean out.
! if (t != null && t.waitStatus != Node.CONDITION) {
! unlinkCancelledWaiters();
! t = lastWaiter;
! }
!
! Node node = new Node(Node.CONDITION);
!
! if (t == null)
! firstWaiter = node;
! else
! t.nextWaiter = node;
! lastWaiter = node;
! return node;
! }
!
! /**
! * Removes and transfers nodes until hit non-cancelled one or
! * null. Split out from signal in part to encourage compilers
! * to inline the case of no waiters.
! * @param first (non-null) the first node on condition queue
! */
! private void doSignal(Node first) {
! do {
! if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
! first.nextWaiter = null;
! } while (!transferForSignal(first) &&
! (first = firstWaiter) != null);
}
-
- /**
- * Removes and transfers all nodes.
- * @param first (non-null) the first node on condition queue
- */
- private void doSignalAll(Node first) {
- lastWaiter = firstWaiter = null;
- do {
- Node next = first.nextWaiter;
- first.nextWaiter = null;
- transferForSignal(first);
first = next;
- } while (first != null);
}
-
- /**
- * Unlinks cancelled waiter nodes from condition queue.
- * Called only while holding lock. This is called when
- * cancellation occurred during condition wait, and upon
- * insertion of a new waiter when lastWaiter is seen to have
- * been cancelled. This method is needed to avoid garbage
- * retention in the absence of signals. So even though it may
- * require a full traversal, it comes into play only when
- * timeouts or cancellations occur in the absence of
- * signals. It traverses all nodes rather than stopping at a
- * particular target to unlink all pointers to garbage nodes
- * without requiring many re-traversals during cancellation
- * storms.
- */
- private void unlinkCancelledWaiters() {
- Node t = firstWaiter;
- Node trail = null;
- while (t != null) {
- Node next = t.nextWaiter;
- if (t.waitStatus != Node.CONDITION) {
- t.nextWaiter = null;
- if (trail == null)
- firstWaiter = next;
- else
- trail.nextWaiter = next;
- if (next == null)
- lastWaiter = trail;
- }
- else
- trail = t;
- t = next;
}
- }
-
- // public methods
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
- Node first = firstWaiter;
if (first != null)
! doSignal(first);
}
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
- Node first = firstWaiter;
if (first != null)
! doSignalAll(first);
}
/**
* Implements uninterruptible condition wait.
* <ol>
--- 1051,1182 ----
* condition semantics that rely on those of the associated
* {@code AbstractQueuedLongSynchronizer}.
*
* <p>This class is Serializable, but all fields are transient,
* so deserialized conditions have no waiters.
*/
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
! private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
! private transient ConditionNode lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
! // Signalling methods
/**
! * Removes and transfers one or all waiters to sync queue.
*/
! private void doSignal(ConditionNode first, boolean all) {
! while (first != null) {
! ConditionNode next = first.nextWaiter;
! if ((firstWaiter = next) == null)
lastWaiter = null;
! if ((first.getAndUnsetStatus(COND) & COND) != 0) {
! enqueue(first);
! if (!all)
! break;
}
first = next;
}
}
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
+ ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
! doSignal(first, false);
}
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
+ ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
! doSignal(first, true);
! }
!
! // Waiting methods
!
! /**
! * Adds node to condition list and releases lock.
! *
! * @param node the node
! * @return savedState to reacquire after wait
! */
! private long enableWait(ConditionNode node) {
! if (isHeldExclusively()) {
! node.waiter = Thread.currentThread();
! node.setStatusRelaxed(COND | WAITING);
! ConditionNode last = lastWaiter;
! if (last == null)
! firstWaiter = node;
! else
! last.nextWaiter = node;
! lastWaiter = node;
! long savedState = getState();
! if (release(savedState))
! return savedState;
! }
! node.status = CANCELLED; // lock not held or inconsistent
! throw new IllegalMonitorStateException();
! }
!
! /**
! * Returns true if a node that was initially placed on a condition
! * queue is now ready to reacquire on sync queue.
! * @param node the node
! * @return true if is reacquiring
! */
! private boolean canReacquire(ConditionNode node) {
! // check links, not status to avoid enqueue race
! return node != null && node.prev != null && isEnqueued(node);
! }
!
! /**
! * Unlinks the given node and other non-waiting nodes from
! * condition queue unless already unlinked.
! */
! private void unlinkCancelledWaiters(ConditionNode node) {
! if (node == null || node.nextWaiter != null || node == lastWaiter) {
! ConditionNode w = firstWaiter, trail = null;
! while (w != null) {
! ConditionNode next = w.nextWaiter;
! if ((w.status & COND) == 0) {
! w.nextWaiter = null;
! if (trail == null)
! firstWaiter = next;
! else
! trail.nextWaiter = next;
! if (next == null)
! lastWaiter = trail;
! } else
! trail = w;
! w = next;
! }
! }
}
/**
* Implements uninterruptible condition wait.
* <ol>
*** 1531,1585 ****
* <li>Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
*/
public final void awaitUninterruptibly() {
! Node node = addConditionWaiter();
! long savedState = fullyRelease(node);
boolean interrupted = false;
! while (!isOnSyncQueue(node)) {
! LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
! if (acquireQueued(node, savedState) || interrupted)
! selfInterrupt();
! }
!
! /*
! * For interruptible waits, we need to track whether to throw
! * InterruptedException, if interrupted while blocked on
! * condition, versus reinterrupt current thread, if
! * interrupted while blocked waiting to re-acquire.
! */
!
! /** Mode meaning to reinterrupt on exit from wait */
! private static final int REINTERRUPT = 1;
! /** Mode meaning to throw InterruptedException on exit from wait */
! private static final int THROW_IE = -1;
!
! /**
! * Checks for interrupt, returning THROW_IE if interrupted
! * before signalled, REINTERRUPT if after signalled, or
! * 0 if not interrupted.
! */
! private int checkInterruptWhileWaiting(Node node) {
! return Thread.interrupted() ?
! (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
! 0;
}
!
! /**
! * Throws InterruptedException, reinterrupts current thread, or
! * does nothing, depending on mode.
! */
! private void reportInterruptAfterWait(int interruptMode)
! throws InterruptedException {
! if (interruptMode == THROW_IE)
! throw new InterruptedException();
! else if (interruptMode == REINTERRUPT)
! selfInterrupt();
}
/**
* Implements interruptible condition wait.
* <ol>
--- 1187,1217 ----
* <li>Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
*/
public final void awaitUninterruptibly() {
! ConditionNode node = new ConditionNode();
! long savedState = enableWait(node);
! LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false;
! while (!canReacquire(node)) {
if (Thread.interrupted())
interrupted = true;
+ else if ((node.status & COND) != 0) {
+ try {
+ ForkJoinPool.managedBlock(node);
+ } catch (InterruptedException ie) {
+ interrupted = true;
}
! } else
! Thread.onSpinWait(); // awoke while enqueuing
}
! LockSupport.setCurrentBlocker(null);
! node.clearStatus();
! acquire(node, savedState, false, false, false, 0L);
! if (interrupted)
! Thread.currentThread().interrupt();
}
/**
* Implements interruptible condition wait.
* <ol>
*** 1594,1617 ****
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
! Node node = addConditionWaiter();
! long savedState = fullyRelease(node);
! int interruptMode = 0;
! while (!isOnSyncQueue(node)) {
! LockSupport.park(this);
! if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
! break;
}
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
}
/**
* Implements timed condition wait.
* <ol>
--- 1226,1262 ----
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
! ConditionNode node = new ConditionNode();
! long savedState = enableWait(node);
! LockSupport.setCurrentBlocker(this); // for back-compatibility
! boolean interrupted = false, cancelled = false;
! while (!canReacquire(node)) {
! if (interrupted |= Thread.interrupted()) {
! if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
! break; // else interrupted after signal
! } else if ((node.status & COND) != 0) {
! try {
! ForkJoinPool.managedBlock(node);
! } catch (InterruptedException ie) {
! interrupted = true;
! }
! } else
! Thread.onSpinWait(); // awoke while enqueuing
! }
! LockSupport.setCurrentBlocker(null);
! node.clearStatus();
! acquire(node, savedState, false, false, false, 0L);
! if (interrupted) {
! if (cancelled) {
! unlinkCancelledWaiters(node);
! throw new InterruptedException();
! }
! Thread.currentThread().interrupt();
}
}
/**
* Implements timed condition wait.
* <ol>
*** 1627,1662 ****
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
! // We don't check for nanosTimeout <= 0L here, to allow
! // awaitNanos(0) as a way to "yield the lock".
! final long deadline = System.nanoTime() + nanosTimeout;
! long initialNanos = nanosTimeout;
! Node node = addConditionWaiter();
! long savedState = fullyRelease(node);
! int interruptMode = 0;
! while (!isOnSyncQueue(node)) {
! if (nanosTimeout <= 0L) {
! transferAfterCancelledWait(node);
! break;
! }
! if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
! LockSupport.parkNanos(this, nanosTimeout);
! if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
! nanosTimeout = deadline - System.nanoTime();
}
! if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
! interruptMode = REINTERRUPT;
! if (node.nextWaiter != null)
! unlinkCancelledWaiters();
! if (interruptMode != 0)
! reportInterruptAfterWait(interruptMode);
long remaining = deadline - System.nanoTime(); // avoid overflow
! return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
}
/**
* Implements absolute timed condition wait.
* <ol>
--- 1272,1304 ----
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
! ConditionNode node = new ConditionNode();
! long savedState = enableWait(node);
! long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
! long deadline = System.nanoTime() + nanos;
! boolean cancelled = false, interrupted = false;
! while (!canReacquire(node)) {
! if ((interrupted |= Thread.interrupted()) ||
! (nanos = deadline - System.nanoTime()) <= 0L) {
! if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
! } else
! LockSupport.parkNanos(this, nanos);
}
! node.clearStatus();
! acquire(node, savedState, false, false, false, 0L);
! if (cancelled) {
! unlinkCancelledWaiters(node);
! if (interrupted)
! throw new InterruptedException();
! } else if (interrupted)
! Thread.currentThread().interrupt();
long remaining = deadline - System.nanoTime(); // avoid overflow
! return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
}
/**
* Implements absolute timed condition wait.
* <ol>
*** 1674,1703 ****
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
! Node node = addConditionWaiter();
! long savedState = fullyRelease(node);
! boolean timedout = false;
! int interruptMode = 0;
! while (!isOnSyncQueue(node)) {
! if (System.currentTimeMillis() >= abstime) {
! timedout = transferAfterCancelledWait(node);
break;
! }
LockSupport.parkUntil(this, abstime);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
}
! if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
! interruptMode = REINTERRUPT;
! if (node.nextWaiter != null)
! unlinkCancelledWaiters();
! if (interruptMode != 0)
! reportInterruptAfterWait(interruptMode);
! return !timedout;
}
/**
* Implements timed condition wait.
* <ol>
--- 1316,1345 ----
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
! ConditionNode node = new ConditionNode();
! long savedState = enableWait(node);
! boolean cancelled = false, interrupted = false;
! while (!canReacquire(node)) {
! if ((interrupted |= Thread.interrupted()) ||
! System.currentTimeMillis() >= abstime) {
! if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
! } else
LockSupport.parkUntil(this, abstime);
}
! node.clearStatus();
! acquire(node, savedState, false, false, false, 0L);
! if (cancelled) {
! unlinkCancelledWaiters(node);
! if (interrupted)
! throw new InterruptedException();
! } else if (interrupted)
! Thread.currentThread().interrupt();
! return !cancelled;
}
/**
* Implements timed condition wait.
* <ol>
*** 1715,1749 ****
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
! // We don't check for nanosTimeout <= 0L here, to allow
! // await(0, unit) as a way to "yield the lock".
! final long deadline = System.nanoTime() + nanosTimeout;
! Node node = addConditionWaiter();
! long savedState = fullyRelease(node);
! boolean timedout = false;
! int interruptMode = 0;
! while (!isOnSyncQueue(node)) {
! if (nanosTimeout <= 0L) {
! timedout = transferAfterCancelledWait(node);
break;
}
! if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
! LockSupport.parkNanos(this, nanosTimeout);
! if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
! break;
! nanosTimeout = deadline - System.nanoTime();
! }
! if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
! interruptMode = REINTERRUPT;
! if (node.nextWaiter != null)
! unlinkCancelledWaiters();
! if (interruptMode != 0)
! reportInterruptAfterWait(interruptMode);
! return !timedout;
}
// support for instrumentation
/**
--- 1357,1388 ----
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
! ConditionNode node = new ConditionNode();
! long savedState = enableWait(node);
! long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
! long deadline = System.nanoTime() + nanos;
! boolean cancelled = false, interrupted = false;
! while (!canReacquire(node)) {
! if ((interrupted |= Thread.interrupted()) ||
! (nanos = deadline - System.nanoTime()) <= 0L) {
! if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
+ } else
+ LockSupport.parkNanos(this, nanos);
}
! node.clearStatus();
! acquire(node, savedState, false, false, false, 0L);
! if (cancelled) {
! unlinkCancelledWaiters(node);
! if (interrupted)
! throw new InterruptedException();
! } else if (interrupted)
! Thread.currentThread().interrupt();
! return !cancelled;
}
// support for instrumentation
/**
*** 1765,1776 ****
* returns {@code false}
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
! for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
! if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
--- 1404,1415 ----
* returns {@code false}
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
! for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
! if ((w.status & COND) != 0)
return true;
}
return false;
}
*** 1785,1796 ****
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
! for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
! if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
--- 1424,1435 ----
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
! for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
! if ((w.status & COND) != 0)
++n;
}
return n;
}
*** 1805,1856 ****
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<>();
! for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
! if (w.waitStatus == Node.CONDITION) {
! Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
! // VarHandle mechanics
! private static final VarHandle STATE;
! private static final VarHandle HEAD;
! private static final VarHandle TAIL;
static {
- try {
- MethodHandles.Lookup l = MethodHandles.lookup();
- STATE = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "state", long.class);
- HEAD = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "head", Node.class);
- TAIL = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "tail", Node.class);
- } catch (ReflectiveOperationException e) {
- throw new ExceptionInInitializerError(e);
- }
-
- // Reduce the risk of rare disastrous classloading in first call to
- // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
-
- /**
- * Initializes head and tail fields on first contention.
- */
- private final void initializeSyncQueue() {
- Node h;
- if (HEAD.compareAndSet(this, null, (h = new Node())))
- tail = h;
- }
-
- /**
- * CASes tail field.
- */
- private final boolean compareAndSetTail(Node expect, Node update) {
- return TAIL.compareAndSet(this, expect, update);
- }
}
--- 1444,1472 ----
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<>();
! for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
! if ((w.status & COND) != 0) {
! Thread t = w.waiter;
if (t != null)
list.add(t);
}
}
return list;
}
}
! // Unsafe
! private static final Unsafe U = Unsafe.getUnsafe();
! private static final long STATE
! = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state");
! private static final long HEAD
! = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head");
! private static final long TAIL
! = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail");
static {
Class<?> ensureLoaded = LockSupport.class;
}
}
< prev index next >