--- old/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java 2019-09-14 11:10:59.528873990 -0700 +++ new/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java 2019-09-14 11:10:59.180873746 -0700 @@ -35,12 +35,12 @@ package java.util.concurrent.locks; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ForkJoinPool; +import jdk.internal.misc.Unsafe; /** * Provides a framework for implementing blocking locks and related @@ -312,265 +312,208 @@ */ protected AbstractQueuedSynchronizer() { } - /** - * Wait queue node class. + /* + * Overview. * - *

The wait queue is a variant of a "CLH" (Craig, Landin, and + * The wait queue is a variant of a "CLH" (Craig, Landin, and * Hagersten) lock queue. CLH locks are normally used for - * spinlocks. We instead use them for blocking synchronizers, but - * use the same basic tactic of holding some of the control - * information about a thread in the predecessor of its node. A - * "status" field in each node keeps track of whether a thread - * should block. A node is signalled when its predecessor - * releases. Each node of the queue otherwise serves as a - * specific-notification-style monitor holding a single waiting - * thread. The status field does NOT control whether threads are - * granted locks etc though. A thread may try to acquire if it is - * first in the queue. But being first does not guarantee success; - * it only gives the right to contend. So the currently released - * contender thread may need to rewait. - * - *

To enqueue into a CLH lock, you atomically splice it in as new - * tail. To dequeue, you just set the head field. - *

-     *      +------+  prev +-----+       +-----+
-     * head |      | <---- |     | <---- |     |  tail
-     *      +------+       +-----+       +-----+
-     * 
- * - *

Insertion into a CLH queue requires only a single atomic - * operation on "tail", so there is a simple atomic point of - * demarcation from unqueued to queued. Similarly, dequeuing - * involves only updating the "head". However, it takes a bit - * more work for nodes to determine who their successors are, - * in part to deal with possible cancellation due to timeouts - * and interrupts. - * - *

The "prev" links (not used in original CLH locks), are mainly - * needed to handle cancellation. If a node is cancelled, its - * successor is (normally) relinked to a non-cancelled - * predecessor. For explanation of similar mechanics in the case - * of spin locks, see the papers by Scott and Scherer at - * http://www.cs.rochester.edu/u/scott/synchronization/ - * - *

We also use "next" links to implement blocking mechanics. - * The thread id for each node is kept in its own node, so a - * predecessor signals the next node to wake up by traversing - * next link to determine which thread it is. Determination of - * successor must avoid races with newly queued nodes to set - * the "next" fields of their predecessors. This is solved - * when necessary by checking backwards from the atomically - * updated "tail" when a node's successor appears to be null. - * (Or, said differently, the next-links are an optimization - * so that we don't usually need a backward scan.) - * - *

Cancellation introduces some conservatism to the basic - * algorithms. Since we must poll for cancellation of other - * nodes, we can miss noticing whether a cancelled node is - * ahead or behind us. This is dealt with by always unparking - * successors upon cancellation, allowing them to stabilize on - * a new predecessor, unless we can identify an uncancelled - * predecessor who will carry this responsibility. + * spinlocks. We instead use them for blocking synchronizers by + * including explicit ("prev" and "next") links plus a "status" + * field that allow nodes to signal successors when releasing + * locks, and handle cancellation due to interrupts and timeouts. + * The status field includes bits that track whether a thread + * needs a signal (using LockSupport.unpark). Despite these + * additions, we maintain most CLH locality properties. + * + * To enqueue into a CLH lock, you atomically splice it in as new + * tail. To dequeue, you set the head field, so the next eligible + * waiter becomes first. + * + * +------+ prev +-------+ +------+ + * | head | <---- | first | <---- | tail | + * +------+ +-------+ +------+ + * + * Insertion into a CLH queue requires only a single atomic + * operation on "tail", so there is a simple point of demarcation + * from unqueued to queued. The "next" link of the predecessor is + * set by the enqueuing thread after successful CAS. Even though + * non-atomic, this suffices to ensure that any blocked thread is + * signalled by a predecessor when eligible (although in the case + * of cancellation, possibly with the assistance of a signal in + * method cleanQueue). Signalling is based in part on a + * Dekker-like scheme in which the to-be waiting thread indicates + * WAITING status, then retries acquiring, and then rechecks + * status before blocking. The signaller atomically clears WAITING + * status when unparking. + * + * Dequeuing on acquire involves detaching (nulling) a node's + * "prev" node and then updating the "head". Other threads check + * if a node is or was dequeued by checking "prev" rather than + * head. We enforce the nulling then setting order by spin-waiting + * if necessary. Because of this, the lock algorithm is not itself + * strictly "lock-free" because an acquiring thread may need to + * wait for a previous acquire to make progress. When used with + * exclusive locks, such progress is required anyway. However + * Shared mode may (uncommonly) require a spin-wait before + * setting head field to ensure proper propagation. (Historical + * note: This allows some simplifications and efficiencies + * compared to previous versions of this class.) + * + * A node's predecessor can change due to cancellation while it is + * waiting, until the node is first in queue, at which point it + * cannot change. The acquire methods cope with this by rechecking + * "prev" before waiting. The prev and next fields are modified + * only via CAS by cancelled nodes in method cleanQueue. The + * unsplice strategy is reminiscent of Michael-Scott queues in + * that after a successful CAS to prev field, other threads help + * fix next fields. Because cancellation often occurs in bunches + * that complicate decisions about necessary signals, each call to + * cleanQueue traverses the queue until a clean sweep. Nodes that + * become relinked as first are unconditionally unparked + * (sometimes unnecessarily, but those cases are not worth + * avoiding). + * + * A thread may try to acquire if it is first (frontmost) in the + * queue, and sometimes before. Being first does not guarantee + * success; it only gives the right to contend. We balance + * throughput, overhead, and fairness by allowing incoming threads + * to "barge" and acquire the synchronizer while in the process of + * enqueuing, in which case an awakened first thread may need to + * rewait. To counteract possible repeated unlucky rewaits, we + * exponentially increase retries (up to 256) to acquire each time + * a thread is unparked. Except in this case, AQS locks do not + * spin; they instead interleave attempts to acquire with + * bookkeeping steps. (Users who want spinlocks can use + * tryAcquire.) + * + * To improve garbage collectibility, fields of nodes not yet on + * list are null. (It is not rare to create and then throw away a + * node without using it.) Fields of nodes coming off the list are + * nulled out as soon as possible. This accentuates the challenge + * of externally determining the first waiting thread (as in + * method getFirstQueuedThread). This sometimes requires the + * fallback of traversing backwards from the atomically updated + * "tail" when fields appear null. (This is never needed in the + * process of signalling though.) * - *

CLH queues need a dummy header node to get started. But + * CLH queues need a dummy header node to get started. But * we don't create them on construction, because it would be wasted * effort if there is never contention. Instead, the node * is constructed and head and tail pointers are set upon first * contention. * - *

Threads waiting on Conditions use the same nodes, but - * use an additional link. Conditions only need to link nodes - * in simple (non-concurrent) linked queues because they are - * only accessed when exclusively held. Upon await, a node is - * inserted into a condition queue. Upon signal, the node is - * transferred to the main queue. A special value of status - * field is used to mark which queue a node is on. + * Shared mode operations differ from Exclusive in that an acquire + * signals the next waiter to try to acquire if it is also + * Shared. The tryAcquireShared API allows users to indicate the + * degree of propagation, but in most applications, it is more + * efficient to ignore this, allowing the successor to try + * acquiring in any case. + * + * Threads waiting on Conditions use nodes with an additional + * link to maintain the (FIFO) list of conditions. Conditions only + * need to link nodes in simple (non-concurrent) linked queues + * because they are only accessed when exclusively held. Upon + * await, a node is inserted into a condition queue. Upon signal, + * the node is enqueued on the main queue. A special status field + * value is used to track and atomically trigger this. + * + * Accesses to fields head, tail, and state use full Volatile + * mode, along with CAS. Node fields status, prev and next also do + * so while threads may be signallable, but sometimes use weaker + * modes otherwise. Accesses to field "waiter" (the thread to be + * signalled) are always sandwiched between other atomic accesses + * so are used in Plain mode. We use jdk.internal Unsafe versions + * of atomic access methods rather than VarHandles to avoid + * potential VM bootstrap issues. + * + * Most of the above is performed by primary internal method + * acquire, that is invoked in some way by all exported acquire + * methods. (It is usually easy for compilers to optimize + * call-site specializations when heavily used.) + * + * There are several arbitrary decisions about when and how to + * check interrupts in both acquire and await before and/or after + * blocking. The decisions are less arbitrary in implementation + * updates because some users appear to rely on original behaviors + * in ways that are racy and so (rarely) wrong in general but hard + * to justify changing. * - *

Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill + * Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill * Scherer and Michael Scott, along with members of JSR-166 * expert group, for helpful ideas, discussions, and critiques * on the design of this class. */ - static final class Node { - /** Marker to indicate a node is waiting in shared mode */ - static final Node SHARED = new Node(); - /** Marker to indicate a node is waiting in exclusive mode */ - static final Node EXCLUSIVE = null; - - /** waitStatus value to indicate thread has cancelled. */ - static final int CANCELLED = 1; - /** waitStatus value to indicate successor's thread needs unparking. */ - static final int SIGNAL = -1; - /** waitStatus value to indicate thread is waiting on condition. */ - static final int CONDITION = -2; - /** - * waitStatus value to indicate the next acquireShared should - * unconditionally propagate. - */ - static final int PROPAGATE = -3; - - /** - * Status field, taking on only the values: - * SIGNAL: The successor of this node is (or will soon be) - * blocked (via park), so the current node must - * unpark its successor when it releases or - * cancels. To avoid races, acquire methods must - * first indicate they need a signal, - * then retry the atomic acquire, and then, - * on failure, block. - * CANCELLED: This node is cancelled due to timeout or interrupt. - * Nodes never leave this state. In particular, - * a thread with cancelled node never again blocks. - * CONDITION: This node is currently on a condition queue. - * It will not be used as a sync queue node - * until transferred, at which time the status - * will be set to 0. (Use of this value here has - * nothing to do with the other uses of the - * field, but simplifies mechanics.) - * PROPAGATE: A releaseShared should be propagated to other - * nodes. This is set (for head node only) in - * doReleaseShared to ensure propagation - * continues, even if other operations have - * since intervened. - * 0: None of the above - * - * The values are arranged numerically to simplify use. - * Non-negative values mean that a node doesn't need to - * signal. So, most code doesn't need to check for particular - * values, just for sign. - * - * The field is initialized to 0 for normal sync nodes, and - * CONDITION for condition nodes. It is modified using CAS - * (or when possible, unconditional volatile writes). - */ - volatile int waitStatus; - - /** - * Link to predecessor node that current node/thread relies on - * for checking waitStatus. Assigned during enqueuing, and nulled - * out (for sake of GC) only upon dequeuing. Also, upon - * cancellation of a predecessor, we short-circuit while - * finding a non-cancelled one, which will always exist - * because the head node is never cancelled: A node becomes - * head only as a result of successful acquire. A - * cancelled thread never succeeds in acquiring, and a thread only - * cancels itself, not any other node. - */ - volatile Node prev; - /** - * Link to the successor node that the current node/thread - * unparks upon release. Assigned during enqueuing, adjusted - * when bypassing cancelled predecessors, and nulled out (for - * sake of GC) when dequeued. The enq operation does not - * assign next field of a predecessor until after attachment, - * so seeing a null next field does not necessarily mean that - * node is at end of queue. However, if a next field appears - * to be null, we can scan prev's from the tail to - * double-check. The next field of cancelled nodes is set to - * point to the node itself instead of null, to make life - * easier for isOnSyncQueue. - */ - volatile Node next; + // Node status bits, also used as argument and return values + static final int WAITING = 1; // must be 1 + static final int CANCELLED = 0x80000000; // must be negative + static final int COND = 2; // in a condition wait - /** - * The thread that enqueued this node. Initialized on - * construction and nulled out after use. - */ - volatile Thread thread; + /** CLH Nodes */ + abstract static class Node { + volatile Node prev; // initially attached via casTail + volatile Node next; // visibly nonnull when signallable + Thread waiter; // visibly nonnull when enqueued + volatile int status; // written by owner, atomic bit ops by others - /** - * Link to next node waiting on condition, or the special - * value SHARED. Because condition queues are accessed only - * when holding in exclusive mode, we just need a simple - * linked queue to hold nodes while they are waiting on - * conditions. They are then transferred to the queue to - * re-acquire. And because conditions can only be exclusive, - * we save a field by using special value to indicate shared - * mode. - */ - Node nextWaiter; - - /** - * Returns true if node is waiting in shared mode. - */ - final boolean isShared() { - return nextWaiter == SHARED; + // methods for atomic operations + final boolean casPrev(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, PREV, c, v); } - - /** - * Returns previous node, or throws NullPointerException if null. - * Use when predecessor cannot be null. The null check could - * be elided, but is present to help the VM. - * - * @return the predecessor of this node - */ - final Node predecessor() { - Node p = prev; - if (p == null) - throw new NullPointerException(); - else - return p; + final boolean casNext(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, NEXT, c, v); } - - /** Establishes initial head or SHARED marker. */ - Node() {} - - /** Constructor used by addWaiter. */ - Node(Node nextWaiter) { - this.nextWaiter = nextWaiter; - THREAD.set(this, Thread.currentThread()); + final int getAndUnsetStatus(int v) { // for signalling + return U.getAndBitwiseAndInt(this, STATUS, ~v); } - - /** Constructor used by addConditionWaiter. */ - Node(int waitStatus) { - WAITSTATUS.set(this, waitStatus); - THREAD.set(this, Thread.currentThread()); + final void setPrevRelaxed(Node p) { // for off-queue assignment + U.putReference(this, PREV, p); } - - /** CASes waitStatus field. */ - final boolean compareAndSetWaitStatus(int expect, int update) { - return WAITSTATUS.compareAndSet(this, expect, update); + final void setStatusRelaxed(int s) { // for off-queue assignment + U.putInt(this, STATUS, s); } - - /** CASes next field. */ - final boolean compareAndSetNext(Node expect, Node update) { - return NEXT.compareAndSet(this, expect, update); + final void clearStatus() { // for reducing unneeded signals + U.putIntOpaque(this, STATUS, 0); } - final void setPrevRelaxed(Node p) { - PREV.set(this, p); + private static final long STATUS + = U.objectFieldOffset(Node.class, "status"); + private static final long NEXT + = U.objectFieldOffset(Node.class, "next"); + private static final long PREV + = U.objectFieldOffset(Node.class, "prev"); + } + + // Concrete classes tagged by type + static final class ExclusiveNode extends Node { } + static final class SharedNode extends Node { } + + static final class ConditionNode extends Node + implements ForkJoinPool.ManagedBlocker { + ConditionNode nextWaiter; // link to next waiting node + + /** + * Allows Conditions to be used in ForkJoinPools without + * risking fixed pool exhaustion. This is usable only for + * untimed Condition waits, not timed versions. + */ + public final boolean isReleasable() { + return status <= 1 || Thread.currentThread().isInterrupted(); } - // VarHandle mechanics - private static final VarHandle NEXT; - private static final VarHandle PREV; - private static final VarHandle THREAD; - private static final VarHandle WAITSTATUS; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - NEXT = l.findVarHandle(Node.class, "next", Node.class); - PREV = l.findVarHandle(Node.class, "prev", Node.class); - THREAD = l.findVarHandle(Node.class, "thread", Thread.class); - WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } + public final boolean block() { + while (!isReleasable()) LockSupport.park(this); + return true; } } /** - * Head of the wait queue, lazily initialized. Except for - * initialization, it is modified only via method setHead. Note: - * If head exists, its waitStatus is guaranteed not to be - * CANCELLED. + * Head of the wait queue, lazily initialized. */ private transient volatile Node head; /** - * Tail of the wait queue, lazily initialized. Modified only via - * method enq to add new wait node. + * Tail of the wait queue. After initialization, modified only via casTail. */ private transient volatile Node tail; @@ -609,481 +552,235 @@ * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { - return STATE.compareAndSet(this, expect, update); + return U.compareAndSetInt(this, STATE, expect, update); } // Queuing utilities - /** - * The number of nanoseconds for which it is faster to spin - * rather than to use timed park. A rough estimate suffices - * to improve responsiveness with very short timeouts. - */ - static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; - - /** - * Inserts node into queue, initializing if necessary. See picture above. - * @param node the node to insert - * @return node's predecessor - */ - private Node enq(Node node) { - for (;;) { - Node oldTail = tail; - if (oldTail != null) { - node.setPrevRelaxed(oldTail); - if (compareAndSetTail(oldTail, node)) { - oldTail.next = node; - return oldTail; - } - } else { - initializeSyncQueue(); - } - } - } - - /** - * Creates and enqueues node for current thread and given mode. - * - * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared - * @return the new node - */ - private Node addWaiter(Node mode) { - Node node = new Node(mode); - - for (;;) { - Node oldTail = tail; - if (oldTail != null) { - node.setPrevRelaxed(oldTail); - if (compareAndSetTail(oldTail, node)) { - oldTail.next = node; - return node; - } - } else { - initializeSyncQueue(); - } - } + private boolean casTail(Node c, Node v) { + return U.compareAndSetReference(this, TAIL, c, v); } - /** - * Sets head of queue to be node, thus dequeuing. Called only by - * acquire methods. Also nulls out unused fields for sake of GC - * and to suppress unnecessary signals and traversals. - * - * @param node the node - */ - private void setHead(Node node) { - head = node; - node.thread = null; - node.prev = null; - } - - /** - * Wakes up node's successor, if one exists. - * - * @param node the node - */ - private void unparkSuccessor(Node node) { - /* - * If status is negative (i.e., possibly needing signal) try - * to clear in anticipation of signalling. It is OK if this - * fails or if status is changed by waiting thread. - */ - int ws = node.waitStatus; - if (ws < 0) - node.compareAndSetWaitStatus(ws, 0); - - /* - * Thread to unpark is held in successor, which is normally - * just the next node. But if cancelled or apparently null, - * traverse backwards from tail to find the actual - * non-cancelled successor. - */ - Node s = node.next; - if (s == null || s.waitStatus > 0) { - s = null; - for (Node p = tail; p != node && p != null; p = p.prev) - if (p.waitStatus <= 0) - s = p; - } - if (s != null) - LockSupport.unpark(s.thread); + /** tries once to CAS a new dummy node for head */ + private void tryInitializeHead() { + Node h = new ExclusiveNode(); + if (U.compareAndSetReference(this, HEAD, null, h)) + tail = h; } /** - * Release action for shared mode -- signals successor and ensures - * propagation. (Note: For exclusive mode, release just amounts - * to calling unparkSuccessor of head if it needs signal.) + * Enqueues the node unless null. (Currently used only for + * ConditionNodes; other cases are interleaved with acquires.) */ - private void doReleaseShared() { - /* - * Ensure that a release propagates, even if there are other - * in-progress acquires/releases. This proceeds in the usual - * way of trying to unparkSuccessor of head if it needs - * signal. But if it does not, status is set to PROPAGATE to - * ensure that upon release, propagation continues. - * Additionally, we must loop in case a new node is added - * while we are doing this. Also, unlike other uses of - * unparkSuccessor, we need to know if CAS to reset status - * fails, if so rechecking. - */ - for (;;) { - Node h = head; - if (h != null && h != tail) { - int ws = h.waitStatus; - if (ws == Node.SIGNAL) { - if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) - continue; // loop to recheck cases - unparkSuccessor(h); + final void enqueue(Node node) { + if (node != null) { + for (;;) { + Node t = tail; + node.setPrevRelaxed(t); // avoid unnecessary fence + if (t == null) // initialize + tryInitializeHead(); + else if (casTail(t, node)) { + t.next = node; + if (t.status < 0) // wake up to clean link + LockSupport.unpark(node.waiter); + break; } - else if (ws == 0 && - !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) - continue; // loop on failed CAS } - if (h == head) // loop if head changed - break; } } - /** - * Sets head of queue, and checks if successor may be waiting - * in shared mode, if so propagating if either propagate > 0 or - * PROPAGATE status was set. - * - * @param node the node - * @param propagate the return value from a tryAcquireShared - */ - private void setHeadAndPropagate(Node node, int propagate) { - Node h = head; // Record old head for check below - setHead(node); - /* - * Try to signal next queued node if: - * Propagation was indicated by caller, - * or was recorded (as h.waitStatus either before - * or after setHead) by a previous operation - * (note: this uses sign-check of waitStatus because - * PROPAGATE status may transition to SIGNAL.) - * and - * The next node is waiting in shared mode, - * or we don't know, because it appears null - * - * The conservatism in both of these checks may cause - * unnecessary wake-ups, but only when there are multiple - * racing acquires/releases, so most need signals now or soon - * anyway. - */ - if (propagate > 0 || h == null || h.waitStatus < 0 || - (h = head) == null || h.waitStatus < 0) { - Node s = node.next; - if (s == null || s.isShared()) - doReleaseShared(); - } + /** Returns true if node is found in traversal from tail */ + final boolean isEnqueued(Node node) { + for (Node t = tail; t != null; t = t.prev) + if (t == node) + return true; + return false; } - // Utilities for various versions of acquire - /** - * Cancels an ongoing attempt to acquire. - * - * @param node the node + * Wakes up the successor of given node, if one exists, and unsets its + * WAITING status to avoid park race. This may fail to wake up an + * eligible thread when one or more have been cancelled, but + * cancelAcquire ensures liveness. */ - private void cancelAcquire(Node node) { - // Ignore if node doesn't exist - if (node == null) - return; - - node.thread = null; - - // Skip cancelled predecessors - Node pred = node.prev; - while (pred.waitStatus > 0) - node.prev = pred = pred.prev; - - // predNext is the apparent node to unsplice. CASes below will - // fail if not, in which case, we lost race vs another cancel - // or signal, so no further action is necessary, although with - // a possibility that a cancelled node may transiently remain - // reachable. - Node predNext = pred.next; - - // Can use unconditional write instead of CAS here. - // After this atomic step, other Nodes can skip past us. - // Before, we are free of interference from other threads. - node.waitStatus = Node.CANCELLED; - - // If we are the tail, remove ourselves. - if (node == tail && compareAndSetTail(node, pred)) { - pred.compareAndSetNext(predNext, null); - } else { - // If successor needs signal, try to set pred's next-link - // so it will get one. Otherwise wake it up to propagate. - int ws; - if (pred != head && - ((ws = pred.waitStatus) == Node.SIGNAL || - (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && - pred.thread != null) { - Node next = node.next; - if (next != null && next.waitStatus <= 0) - pred.compareAndSetNext(predNext, next); - } else { - unparkSuccessor(node); - } - - node.next = node; // help GC + private static void signalNext(Node h) { + Node s; + if (h != null && (s = h.next) != null && s.status != 0) { + s.getAndUnsetStatus(WAITING); + LockSupport.unpark(s.waiter); } } - /** - * Checks and updates status for a node that failed to acquire. - * Returns true if thread should block. This is the main signal - * control in all acquire loops. Requires that pred == node.prev. - * - * @param pred node's predecessor holding status - * @param node the node - * @return {@code true} if thread should block - */ - private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { - int ws = pred.waitStatus; - if (ws == Node.SIGNAL) - /* - * This node has already set status asking a release - * to signal it, so it can safely park. - */ - return true; - if (ws > 0) { - /* - * Predecessor was cancelled. Skip over predecessors and - * indicate retry. - */ - do { - node.prev = pred = pred.prev; - } while (pred.waitStatus > 0); - pred.next = node; - } else { - /* - * waitStatus must be 0 or PROPAGATE. Indicate that we - * need a signal, but don't park yet. Caller will need to - * retry to make sure it cannot acquire before parking. - */ - pred.compareAndSetWaitStatus(ws, Node.SIGNAL); + /** Wakes up the given node if in shared mode */ + private static void signalNextIfShared(Node h) { + Node s; + if (h != null && (s = h.next) != null && + (s instanceof SharedNode) && s.status != 0) { + s.getAndUnsetStatus(WAITING); + LockSupport.unpark(s.waiter); } - return false; - } - - /** - * Convenience method to interrupt current thread. - */ - static void selfInterrupt() { - Thread.currentThread().interrupt(); - } - - /** - * Convenience method to park and then check if interrupted. - * - * @return {@code true} if interrupted - */ - private final boolean parkAndCheckInterrupt() { - LockSupport.park(this); - return Thread.interrupted(); } - /* - * Various flavors of acquire, varying in exclusive/shared and - * control modes. Each is mostly the same, but annoyingly - * different. Only a little bit of factoring is possible due to - * interactions of exception mechanics (including ensuring that we - * cancel if tryAcquire throws exception) and other control, at - * least not without hurting performance too much. - */ - /** - * Acquires in exclusive uninterruptible mode for thread already in - * queue. Used by condition wait methods as well as acquire. + * Main acquire method, invoked by all exported acquire methods. * - * @param node the node + * @param node null unless a reacquiring Condition * @param arg the acquire argument - * @return {@code true} if interrupted while waiting - */ - final boolean acquireQueued(final Node node, int arg) { - boolean interrupted = false; - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head && tryAcquire(arg)) { - setHead(node); - p.next = null; // help GC - return interrupted; - } - if (shouldParkAfterFailedAcquire(p, node)) - interrupted |= parkAndCheckInterrupt(); - } - } catch (Throwable t) { - cancelAcquire(node); - if (interrupted) - selfInterrupt(); - throw t; - } - } + * @param shared true if shared mode else exclusive + * @param interruptible if abort and return negative on interrupt + * @param timed if true use timed waits + * @param time if timed, the System.nanoTime value to timeout + * @return positive if acquired, 0 if timed out, negative if interrupted + */ + final int acquire(Node node, int arg, boolean shared, + boolean interruptible, boolean timed, long time) { + Thread current = Thread.currentThread(); + byte spins = 0, postSpins = 0; // retries upon unpark of first thread + boolean interrupted = false, first = false; + Node pred = null; // predecessor of node when enqueued - /** - * Acquires in exclusive interruptible mode. - * @param arg the acquire argument - */ - private void doAcquireInterruptibly(int arg) - throws InterruptedException { - final Node node = addWaiter(Node.EXCLUSIVE); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head && tryAcquire(arg)) { - setHead(node); - p.next = null; // help GC - return; + /* + * Repeatedly: + * Check if node now first + * if so, ensure head stable, else ensure valid predecessor + * if node is first or not yet enqueued, try acquiring + * else if node not yet created, create it + * else if not yet enqueued, try once to enqueue + * else if woken from park, retry (up to postSpins times) + * else if WAITING status not set, set and retry + * else park and clear WAITING status, and check cancellation + */ + + for (;;) { + if (!first && (pred = (node == null) ? null : node.prev) != null && + !(first = (head == pred))) { + if (pred.status < 0) { + cleanQueue(); // predecessor cancelled + continue; + } else if (pred.prev == null) { + Thread.onSpinWait(); // ensure serialization + continue; } - if (shouldParkAfterFailedAcquire(p, node) && - parkAndCheckInterrupt()) - throw new InterruptedException(); } - } catch (Throwable t) { - cancelAcquire(node); - throw t; - } - } - - /** - * Acquires in exclusive timed mode. - * - * @param arg the acquire argument - * @param nanosTimeout max wait time - * @return {@code true} if acquired - */ - private boolean doAcquireNanos(int arg, long nanosTimeout) - throws InterruptedException { - if (nanosTimeout <= 0L) - return false; - final long deadline = System.nanoTime() + nanosTimeout; - final Node node = addWaiter(Node.EXCLUSIVE); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head && tryAcquire(arg)) { - setHead(node); - p.next = null; // help GC - return true; + if (first || pred == null) { + boolean acquired; + try { + if (shared) + acquired = (tryAcquireShared(arg) >= 0); + else + acquired = tryAcquire(arg); + } catch (Throwable ex) { + cancelAcquire(node, interrupted, false); + throw ex; } - nanosTimeout = deadline - System.nanoTime(); - if (nanosTimeout <= 0L) { - cancelAcquire(node); - return false; + if (acquired) { + if (first) { + node.prev = null; + head = node; + pred.next = null; + node.waiter = null; + if (shared) + signalNextIfShared(node); + if (interrupted) + current.interrupt(); + } + return 1; } - if (shouldParkAfterFailedAcquire(p, node) && - nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if (Thread.interrupted()) - throw new InterruptedException(); } - } catch (Throwable t) { - cancelAcquire(node); - throw t; + if (node == null) { // allocate; retry before enqueue + if (shared) + node = new SharedNode(); + else + node = new ExclusiveNode(); + } else if (pred == null) { // try to enqueue + node.waiter = current; + Node t = tail; + node.setPrevRelaxed(t); // avoid unnecessary fence + if (t == null) + tryInitializeHead(); + else if (!casTail(t, node)) + node.setPrevRelaxed(null); // back out + else + t.next = node; + } else if (first && spins != 0) { + --spins; // reduce unfairness on rewaits + Thread.onSpinWait(); + } else if (node.status == 0) { + node.status = WAITING; // enable signal and recheck + } else { + long nanos; + spins = postSpins = (byte)((postSpins << 1) | 1); + if (!timed) + LockSupport.park(this); + else if ((nanos = time - System.nanoTime()) > 0L) + LockSupport.parkNanos(this, nanos); + else + break; + node.clearStatus(); + if ((interrupted |= Thread.interrupted()) && interruptible) + break; + } } + return cancelAcquire(node, interrupted, interruptible); } /** - * Acquires in shared uninterruptible mode. - * @param arg the acquire argument - */ - private void doAcquireShared(int arg) { - final Node node = addWaiter(Node.SHARED); - boolean interrupted = false; - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head) { - int r = tryAcquireShared(arg); - if (r >= 0) { - setHeadAndPropagate(node, r); - p.next = null; // help GC - return; + * Possibly repeatedly traverses from tail, unsplicing cancelled + * nodes until none are found. Unparks nodes that may have been + * relinked to be next eligible acquirer. + */ + private void cleanQueue() { + for (;;) { // restart point + for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples + if (q == null || (p = q.prev) == null) + return; // end of list + if (s == null ? tail != q : (s.prev != q || s.status < 0)) + break; // inconsistent + if (q.status < 0) { // cancelled + if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && + q.prev == p) { + p.casNext(q, s); // OK if fails + if (p.prev == null) + signalNext(p); } + break; } - if (shouldParkAfterFailedAcquire(p, node)) - interrupted |= parkAndCheckInterrupt(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; - } finally { - if (interrupted) - selfInterrupt(); - } - } - - /** - * Acquires in shared interruptible mode. - * @param arg the acquire argument - */ - private void doAcquireSharedInterruptibly(int arg) - throws InterruptedException { - final Node node = addWaiter(Node.SHARED); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head) { - int r = tryAcquireShared(arg); - if (r >= 0) { - setHeadAndPropagate(node, r); - p.next = null; // help GC - return; + if ((n = p.next) != q) { // help finish + if (n != null && q.prev == p) { + p.casNext(n, q); + if (p.prev == null) + signalNext(p); } + break; } - if (shouldParkAfterFailedAcquire(p, node) && - parkAndCheckInterrupt()) - throw new InterruptedException(); + s = q; + q = q.prev; } - } catch (Throwable t) { - cancelAcquire(node); - throw t; } } /** - * Acquires in shared timed mode. + * Cancels an ongoing attempt to acquire. * - * @param arg the acquire argument - * @param nanosTimeout max wait time - * @return {@code true} if acquired - */ - private boolean doAcquireSharedNanos(int arg, long nanosTimeout) - throws InterruptedException { - if (nanosTimeout <= 0L) - return false; - final long deadline = System.nanoTime() + nanosTimeout; - final Node node = addWaiter(Node.SHARED); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head) { - int r = tryAcquireShared(arg); - if (r >= 0) { - setHeadAndPropagate(node, r); - p.next = null; // help GC - return true; - } - } - nanosTimeout = deadline - System.nanoTime(); - if (nanosTimeout <= 0L) { - cancelAcquire(node); - return false; - } - if (shouldParkAfterFailedAcquire(p, node) && - nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if (Thread.interrupted()) - throw new InterruptedException(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; + * @param node the node (may be null if cancelled before enqueuing) + * @param interrupted true if thread interrupted + * @param interruptible if should report interruption vs reset + */ + private int cancelAcquire(Node node, boolean interrupted, + boolean interruptible) { + if (node != null) { + node.waiter = null; + node.status = CANCELLED; + if (node.prev != null) + cleanQueue(); + } + if (interrupted) { + if (interruptible) + return CANCELLED; + else + Thread.currentThread().interrupt(); } + return 0; } // Main exported methods @@ -1236,9 +933,8 @@ * can represent anything you like. */ public final void acquire(int arg) { - if (!tryAcquire(arg) && - acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) - selfInterrupt(); + if (!tryAcquire(arg)) + acquire(null, arg, false, false, false, 0L); } /** @@ -1256,11 +952,10 @@ * @throws InterruptedException if the current thread is interrupted */ public final void acquireInterruptibly(int arg) - throws InterruptedException { - if (Thread.interrupted()) + throws InterruptedException { + if (Thread.interrupted() || + (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) throw new InterruptedException(); - if (!tryAcquire(arg)) - doAcquireInterruptibly(arg); } /** @@ -1281,11 +976,20 @@ * @throws InterruptedException if the current thread is interrupted */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) - throws InterruptedException { - if (Thread.interrupted()) - throw new InterruptedException(); - return tryAcquire(arg) || - doAcquireNanos(arg, nanosTimeout); + throws InterruptedException { + if (!Thread.interrupted()) { + if (tryAcquire(arg)) + return true; + if (nanosTimeout <= 0L) + return false; + int stat = acquire(null, arg, false, true, true, + System.nanoTime() + nanosTimeout); + if (stat > 0) + return true; + if (stat == 0) + return false; + } + throw new InterruptedException(); } /** @@ -1300,9 +1004,7 @@ */ public final boolean release(int arg) { if (tryRelease(arg)) { - Node h = head; - if (h != null && h.waitStatus != 0) - unparkSuccessor(h); + signalNext(head); return true; } return false; @@ -1321,7 +1023,7 @@ */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) - doAcquireShared(arg); + acquire(null, arg, true, false, false, 0L); } /** @@ -1338,11 +1040,11 @@ * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) - throws InterruptedException { - if (Thread.interrupted()) + throws InterruptedException { + if (Thread.interrupted() || + (tryAcquireShared(arg) < 0 && + acquire(null, arg, true, true, false, 0L) < 0)) throw new InterruptedException(); - if (tryAcquireShared(arg) < 0) - doAcquireSharedInterruptibly(arg); } /** @@ -1363,10 +1065,19 @@ */ public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { - if (Thread.interrupted()) - throw new InterruptedException(); - return tryAcquireShared(arg) >= 0 || - doAcquireSharedNanos(arg, nanosTimeout); + if (!Thread.interrupted()) { + if (tryAcquireShared(arg) >= 0) + return true; + if (nanosTimeout <= 0L) + return false; + int stat = acquire(null, arg, true, true, true, + System.nanoTime() + nanosTimeout); + if (stat > 0) + return true; + if (stat == 0) + return false; + } + throw new InterruptedException(); } /** @@ -1380,7 +1091,7 @@ */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { - doReleaseShared(); + signalNext(head); return true; } return false; @@ -1398,7 +1109,7 @@ */ public final boolean hasQueuedThreads() { for (Node p = tail, h = head; p != h && p != null; p = p.prev) - if (p.waitStatus <= 0) + if (p.status >= 0) return true; return false; } @@ -1428,45 +1139,16 @@ * {@code null} if no threads are currently queued */ public final Thread getFirstQueuedThread() { - // handle only fast path, else relay - return (head == tail) ? null : fullGetFirstQueuedThread(); - } - - /** - * Version of getFirstQueuedThread called when fastpath fails. - */ - private Thread fullGetFirstQueuedThread() { - /* - * The first node is normally head.next. Try to get its - * thread field, ensuring consistent reads: If thread - * field is nulled out or s.prev is no longer head, then - * some other thread(s) concurrently performed setHead in - * between some of our reads. We try this twice before - * resorting to traversal. - */ - Node h, s; - Thread st; - if (((h = head) != null && (s = h.next) != null && - s.prev == head && (st = s.thread) != null) || - ((h = head) != null && (s = h.next) != null && - s.prev == head && (st = s.thread) != null)) - return st; - - /* - * Head's next field might not have been set yet, or may have - * been unset after setHead. So we must check to see if tail - * is actually first node. If not, we continue on, safely - * traversing from tail back to head to find first, - * guaranteeing termination. - */ - - Thread firstThread = null; - for (Node p = tail; p != null && p != head; p = p.prev) { - Thread t = p.thread; - if (t != null) - firstThread = t; + Thread first = null, w; Node h, s; + if ((h = head) != null && ((s = h.next) == null || + (first = s.waiter) == null || + s.prev == null)) { + // traverse from tail on stale reads + for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) + if ((w = p.waiter) != null) + first = w; } - return firstThread; + return first; } /** @@ -1483,7 +1165,7 @@ if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) - if (p.thread == thread) + if (p.waiter == thread) return true; return false; } @@ -1499,10 +1181,8 @@ */ final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; - return (h = head) != null && - (s = h.next) != null && - !s.isShared() && - s.thread != null; + return (h = head) != null && (s = h.next) != null && + !(s instanceof SharedNode) && s.waiter != null; } /** @@ -1549,19 +1229,12 @@ * @since 1.7 */ public final boolean hasQueuedPredecessors() { - Node h, s; - if ((h = head) != null) { - if ((s = h.next) == null || s.waitStatus > 0) { - s = null; // traverse in case of concurrent cancellation - for (Node p = tail; p != h && p != null; p = p.prev) { - if (p.waitStatus <= 0) - s = p; - } - } - if (s != null && s.thread != Thread.currentThread()) - return true; - } - return false; + Thread first = null; Node h, s; + if ((h = head) != null && ((s = h.next) == null || + (first = s.waiter) == null || + s.prev == null)) + first = getFirstQueuedThread(); // retry via getFirstQueuedThread + return first != null && first != Thread.currentThread(); } // Instrumentation and monitoring methods @@ -1578,7 +1251,7 @@ public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { - if (p.thread != null) + if (p.waiter != null) ++n; } return n; @@ -1598,7 +1271,7 @@ public final Collection getQueuedThreads() { ArrayList list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { - Thread t = p.thread; + Thread t = p.waiter; if (t != null) list.add(t); } @@ -1616,8 +1289,8 @@ public final Collection getExclusiveQueuedThreads() { ArrayList list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { - if (!p.isShared()) { - Thread t = p.thread; + if (!(p instanceof SharedNode)) { + Thread t = p.waiter; if (t != null) list.add(t); } @@ -1636,8 +1309,8 @@ public final Collection getSharedQueuedThreads() { ArrayList list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { - if (p.isShared()) { - Thread t = p.thread; + if (p instanceof SharedNode) { + Thread t = p.waiter; if (t != null) list.add(t); } @@ -1660,117 +1333,6 @@ + (hasQueuedThreads() ? "non" : "") + "empty queue]"; } - - // Internal support methods for Conditions - - /** - * Returns true if a node, always one that was initially placed on - * a condition queue, is now waiting to reacquire on sync queue. - * @param node the node - * @return true if is reacquiring - */ - final boolean isOnSyncQueue(Node node) { - if (node.waitStatus == Node.CONDITION || node.prev == null) - return false; - if (node.next != null) // If has successor, it must be on queue - return true; - /* - * node.prev can be non-null, but not yet on queue because - * the CAS to place it on queue can fail. So we have to - * traverse from tail to make sure it actually made it. It - * will always be near the tail in calls to this method, and - * unless the CAS failed (which is unlikely), it will be - * there, so we hardly ever traverse much. - */ - return findNodeFromTail(node); - } - - /** - * Returns true if node is on sync queue by searching backwards from tail. - * Called only when needed by isOnSyncQueue. - * @return true if present - */ - private boolean findNodeFromTail(Node node) { - // We check for node first, since it's likely to be at or near tail. - // tail is known to be non-null, so we could re-order to "save" - // one null check, but we leave it this way to help the VM. - for (Node p = tail;;) { - if (p == node) - return true; - if (p == null) - return false; - p = p.prev; - } - } - - /** - * Transfers a node from a condition queue onto sync queue. - * Returns true if successful. - * @param node the node - * @return true if successfully transferred (else the node was - * cancelled before signal) - */ - final boolean transferForSignal(Node node) { - /* - * If cannot change waitStatus, the node has been cancelled. - */ - if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) - return false; - - /* - * Splice onto queue and try to set waitStatus of predecessor to - * indicate that thread is (probably) waiting. If cancelled or - * attempt to set waitStatus fails, wake up to resync (in which - * case the waitStatus can be transiently and harmlessly wrong). - */ - Node p = enq(node); - int ws = p.waitStatus; - if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) - LockSupport.unpark(node.thread); - return true; - } - - /** - * Transfers node, if necessary, to sync queue after a cancelled wait. - * Returns true if thread was cancelled before being signalled. - * - * @param node the node - * @return true if cancelled before the node was signalled - */ - final boolean transferAfterCancelledWait(Node node) { - if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { - enq(node); - return true; - } - /* - * If we lost out to a signal(), then we can't proceed - * until it finishes its enq(). Cancelling during an - * incomplete transfer is both rare and transient, so just - * spin. - */ - while (!isOnSyncQueue(node)) - Thread.yield(); - return false; - } - - /** - * Invokes release with current state value; returns saved state. - * Cancels node and throws exception on failure. - * @param node the condition node for this wait - * @return previous sync state - */ - final int fullyRelease(Node node) { - try { - int savedState = getState(); - if (release(savedState)) - return savedState; - throw new IllegalMonitorStateException(); - } catch (Throwable t) { - node.waitStatus = Node.CANCELLED; - throw t; - } - } - // Instrumentation methods for conditions /** @@ -1868,106 +1430,34 @@ public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ - private transient Node firstWaiter; + private transient ConditionNode firstWaiter; /** Last node of condition queue. */ - private transient Node lastWaiter; + private transient ConditionNode lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } - // Internal methods + // Signalling methods /** - * Adds a new waiter to wait queue. - * @return its new wait node + * Removes and transfers one or all waiters to sync queue. */ - private Node addConditionWaiter() { - if (!isHeldExclusively()) - throw new IllegalMonitorStateException(); - Node t = lastWaiter; - // If lastWaiter is cancelled, clean out. - if (t != null && t.waitStatus != Node.CONDITION) { - unlinkCancelledWaiters(); - t = lastWaiter; - } - - Node node = new Node(Node.CONDITION); - - if (t == null) - firstWaiter = node; - else - t.nextWaiter = node; - lastWaiter = node; - return node; - } - - /** - * Removes and transfers nodes until hit non-cancelled one or - * null. Split out from signal in part to encourage compilers - * to inline the case of no waiters. - * @param first (non-null) the first node on condition queue - */ - private void doSignal(Node first) { - do { - if ( (firstWaiter = first.nextWaiter) == null) + private void doSignal(ConditionNode first, boolean all) { + while (first != null) { + ConditionNode next = first.nextWaiter; + if ((firstWaiter = next) == null) lastWaiter = null; - first.nextWaiter = null; - } while (!transferForSignal(first) && - (first = firstWaiter) != null); - } - - /** - * Removes and transfers all nodes. - * @param first (non-null) the first node on condition queue - */ - private void doSignalAll(Node first) { - lastWaiter = firstWaiter = null; - do { - Node next = first.nextWaiter; - first.nextWaiter = null; - transferForSignal(first); - first = next; - } while (first != null); - } - - /** - * Unlinks cancelled waiter nodes from condition queue. - * Called only while holding lock. This is called when - * cancellation occurred during condition wait, and upon - * insertion of a new waiter when lastWaiter is seen to have - * been cancelled. This method is needed to avoid garbage - * retention in the absence of signals. So even though it may - * require a full traversal, it comes into play only when - * timeouts or cancellations occur in the absence of - * signals. It traverses all nodes rather than stopping at a - * particular target to unlink all pointers to garbage nodes - * without requiring many re-traversals during cancellation - * storms. - */ - private void unlinkCancelledWaiters() { - Node t = firstWaiter; - Node trail = null; - while (t != null) { - Node next = t.nextWaiter; - if (t.waitStatus != Node.CONDITION) { - t.nextWaiter = null; - if (trail == null) - firstWaiter = next; - else - trail.nextWaiter = next; - if (next == null) - lastWaiter = trail; + if ((first.getAndUnsetStatus(COND) & COND) != 0) { + enqueue(first); + if (!all) + break; } - else - trail = t; - t = next; + first = next; } } - // public methods - /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the @@ -1977,11 +1467,11 @@ * returns {@code false} */ public final void signal() { + ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - Node first = firstWaiter; if (first != null) - doSignal(first); + doSignal(first, false); } /** @@ -1992,11 +1482,72 @@ * returns {@code false} */ public final void signalAll() { + ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - Node first = firstWaiter; if (first != null) - doSignalAll(first); + doSignal(first, true); + } + + // Waiting methods + + /** + * Adds node to condition list and releases lock. + * + * @param node the node + * @return savedState to reacquire after wait + */ + private int enableWait(ConditionNode node) { + if (isHeldExclusively()) { + node.waiter = Thread.currentThread(); + node.setStatusRelaxed(COND | WAITING); + ConditionNode last = lastWaiter; + if (last == null) + firstWaiter = node; + else + last.nextWaiter = node; + lastWaiter = node; + int savedState = getState(); + if (release(savedState)) + return savedState; + } + node.status = CANCELLED; // lock not held or inconsistent + throw new IllegalMonitorStateException(); + } + + /** + * Returns true if a node that was initially placed on a condition + * queue is now ready to reacquire on sync queue. + * @param node the node + * @return true if is reacquiring + */ + private boolean canReacquire(ConditionNode node) { + // check links, not status to avoid enqueue race + return node != null && node.prev != null && isEnqueued(node); + } + + /** + * Unlinks the given node and other non-waiting nodes from + * condition queue unless already unlinked. + */ + private void unlinkCancelledWaiters(ConditionNode node) { + if (node == null || node.nextWaiter != null || node == lastWaiter) { + ConditionNode w = firstWaiter, trail = null; + while (w != null) { + ConditionNode next = w.nextWaiter; + if ((w.status & COND) == 0) { + w.nextWaiter = null; + if (trail == null) + firstWaiter = next; + else + trail.nextWaiter = next; + if (next == null) + lastWaiter = trail; + } else + trail = w; + w = next; + } + } } /** @@ -2011,51 +1562,27 @@ * */ public final void awaitUninterruptibly() { - Node node = addConditionWaiter(); - int savedState = fullyRelease(node); + ConditionNode node = new ConditionNode(); + int savedState = enableWait(node); + LockSupport.setCurrentBlocker(this); // for back-compatibility boolean interrupted = false; - while (!isOnSyncQueue(node)) { - LockSupport.park(this); + while (!canReacquire(node)) { if (Thread.interrupted()) interrupted = true; + else if ((node.status & COND) != 0) { + try { + ForkJoinPool.managedBlock(node); + } catch (InterruptedException ie) { + interrupted = true; + } + } else + Thread.onSpinWait(); // awoke while enqueuing } - if (acquireQueued(node, savedState) || interrupted) - selfInterrupt(); - } - - /* - * For interruptible waits, we need to track whether to throw - * InterruptedException, if interrupted while blocked on - * condition, versus reinterrupt current thread, if - * interrupted while blocked waiting to re-acquire. - */ - - /** Mode meaning to reinterrupt on exit from wait */ - private static final int REINTERRUPT = 1; - /** Mode meaning to throw InterruptedException on exit from wait */ - private static final int THROW_IE = -1; - - /** - * Checks for interrupt, returning THROW_IE if interrupted - * before signalled, REINTERRUPT if after signalled, or - * 0 if not interrupted. - */ - private int checkInterruptWhileWaiting(Node node) { - return Thread.interrupted() ? - (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : - 0; - } - - /** - * Throws InterruptedException, reinterrupts current thread, or - * does nothing, depending on mode. - */ - private void reportInterruptAfterWait(int interruptMode) - throws InterruptedException { - if (interruptMode == THROW_IE) - throw new InterruptedException(); - else if (interruptMode == REINTERRUPT) - selfInterrupt(); + LockSupport.setCurrentBlocker(null); + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (interrupted) + Thread.currentThread().interrupt(); } /** @@ -2074,20 +1601,33 @@ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); - Node node = addConditionWaiter(); - int savedState = fullyRelease(node); - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - LockSupport.park(this); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; + ConditionNode node = new ConditionNode(); + int savedState = enableWait(node); + LockSupport.setCurrentBlocker(this); // for back-compatibility + boolean interrupted = false, cancelled = false; + while (!canReacquire(node)) { + if (interrupted |= Thread.interrupted()) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; // else interrupted after signal + } else if ((node.status & COND) != 0) { + try { + ForkJoinPool.managedBlock(node); + } catch (InterruptedException ie) { + interrupted = true; + } + } else + Thread.onSpinWait(); // awoke while enqueuing + } + LockSupport.setCurrentBlocker(null); + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (interrupted) { + if (cancelled) { + unlinkCancelledWaiters(node); + throw new InterruptedException(); + } + Thread.currentThread().interrupt(); } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) // clean up if cancelled - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); } /** @@ -2107,32 +1647,29 @@ throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); - // We don't check for nanosTimeout <= 0L here, to allow - // awaitNanos(0) as a way to "yield the lock". - final long deadline = System.nanoTime() + nanosTimeout; - long initialNanos = nanosTimeout; - Node node = addConditionWaiter(); - int savedState = fullyRelease(node); - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - if (nanosTimeout <= 0L) { - transferAfterCancelledWait(node); - break; - } - if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; - nanosTimeout = deadline - System.nanoTime(); - } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); + ConditionNode node = new ConditionNode(); + int savedState = enableWait(node); + long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; + long deadline = System.nanoTime() + nanos; + boolean cancelled = false, interrupted = false; + while (!canReacquire(node)) { + if ((interrupted |= Thread.interrupted()) || + (nanos = deadline - System.nanoTime()) <= 0L) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; + } else + LockSupport.parkNanos(this, nanos); + } + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (cancelled) { + unlinkCancelledWaiters(node); + if (interrupted) + throw new InterruptedException(); + } else if (interrupted) + Thread.currentThread().interrupt(); long remaining = deadline - System.nanoTime(); // avoid overflow - return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; + return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; } /** @@ -2154,26 +1691,26 @@ long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); - Node node = addConditionWaiter(); - int savedState = fullyRelease(node); - boolean timedout = false; - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - if (System.currentTimeMillis() >= abstime) { - timedout = transferAfterCancelledWait(node); - break; - } - LockSupport.parkUntil(this, abstime); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; - } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); - return !timedout; + ConditionNode node = new ConditionNode(); + int savedState = enableWait(node); + boolean cancelled = false, interrupted = false; + while (!canReacquire(node)) { + if ((interrupted |= Thread.interrupted()) || + System.currentTimeMillis() >= abstime) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; + } else + LockSupport.parkUntil(this, abstime); + } + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (cancelled) { + unlinkCancelledWaiters(node); + if (interrupted) + throw new InterruptedException(); + } else if (interrupted) + Thread.currentThread().interrupt(); + return !cancelled; } /** @@ -2195,31 +1732,28 @@ long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); - // We don't check for nanosTimeout <= 0L here, to allow - // await(0, unit) as a way to "yield the lock". - final long deadline = System.nanoTime() + nanosTimeout; - Node node = addConditionWaiter(); - int savedState = fullyRelease(node); - boolean timedout = false; - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - if (nanosTimeout <= 0L) { - timedout = transferAfterCancelledWait(node); - break; - } - if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; - nanosTimeout = deadline - System.nanoTime(); - } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); - return !timedout; + ConditionNode node = new ConditionNode(); + int savedState = enableWait(node); + long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; + long deadline = System.nanoTime() + nanos; + boolean cancelled = false, interrupted = false; + while (!canReacquire(node)) { + if ((interrupted |= Thread.interrupted()) || + (nanos = deadline - System.nanoTime()) <= 0L) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; + } else + LockSupport.parkNanos(this, nanos); + } + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (cancelled) { + unlinkCancelledWaiters(node); + if (interrupted) + throw new InterruptedException(); + } else if (interrupted) + Thread.currentThread().interrupt(); + return !cancelled; } // support for instrumentation @@ -2245,8 +1779,8 @@ protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - for (Node w = firstWaiter; w != null; w = w.nextWaiter) { - if (w.waitStatus == Node.CONDITION) + for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { + if ((w.status & COND) != 0) return true; } return false; @@ -2265,8 +1799,8 @@ if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; - for (Node w = firstWaiter; w != null; w = w.nextWaiter) { - if (w.waitStatus == Node.CONDITION) + for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { + if ((w.status & COND) != 0) ++n; } return n; @@ -2285,9 +1819,9 @@ if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList list = new ArrayList<>(); - for (Node w = firstWaiter; w != null; w = w.nextWaiter) { - if (w.waitStatus == Node.CONDITION) { - Thread t = w.thread; + for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { + if ((w.status & COND) != 0) { + Thread t = w.waiter; if (t != null) list.add(t); } @@ -2296,39 +1830,16 @@ } } - // VarHandle mechanics - private static final VarHandle STATE; - private static final VarHandle HEAD; - private static final VarHandle TAIL; + // Unsafe + private static final Unsafe U = Unsafe.getUnsafe(); + private static final long STATE + = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state"); + private static final long HEAD + = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "head"); + private static final long TAIL + = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "tail"); static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class); - HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class); - TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } - - // Reduce the risk of rare disastrous classloading in first call to - // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class ensureLoaded = LockSupport.class; } - - /** - * Initializes head and tail fields on first contention. - */ - private final void initializeSyncQueue() { - Node h; - if (HEAD.compareAndSet(this, null, (h = new Node()))) - tail = h; - } - - /** - * CASes tail field. - */ - private final boolean compareAndSetTail(Node expect, Node update) { - return TAIL.compareAndSet(this, expect, update); - } }