< prev index next >

src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java

Print this page
8229442: AQS and lock classes refresh
Reviewed-by: martin

*** 33,49 **** * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent.locks; - import java.lang.invoke.MethodHandles; - import java.lang.invoke.VarHandle; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.concurrent.TimeUnit; ! import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node; /** * A version of {@link AbstractQueuedSynchronizer} in * which synchronization state is maintained as a {@code long}. * This class has exactly the same structure, properties, and methods --- 33,48 ---- * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent.locks; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.concurrent.TimeUnit; ! import java.util.concurrent.ForkJoinPool; ! import jdk.internal.misc.Unsafe; /** * A version of {@link AbstractQueuedSynchronizer} in * which synchronization state is maintained as a {@code long}. * This class has exactly the same structure, properties, and methods
*** 71,97 **** * exactly cloned from AbstractQueuedSynchronizer, replacing class * name and changing ints related with sync state to longs. Please * keep it that way. */ /** ! * Creates a new {@code AbstractQueuedLongSynchronizer} instance ! * with initial synchronization state of zero. */ ! protected AbstractQueuedLongSynchronizer() { } /** ! * Head of the wait queue, lazily initialized. Except for ! * initialization, it is modified only via method setHead. Note: ! * If head exists, its waitStatus is guaranteed not to be ! * CANCELLED. */ private transient volatile Node head; /** ! * Tail of the wait queue, lazily initialized. Modified only via ! * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. --- 70,149 ---- * exactly cloned from AbstractQueuedSynchronizer, replacing class * name and changing ints related with sync state to longs. Please * keep it that way. */ + // Node status bits, also used as argument and return values + static final int WAITING = 1; // must be 1 + static final int CANCELLED = 0x80000000; // must be negative + static final int COND = 2; // in a condition wait + + /** CLH Nodes */ + abstract static class Node { + volatile Node prev; // initially attached via casTail + volatile Node next; // visibly nonnull when signallable + Thread waiter; // visibly nonnull when enqueued + volatile int status; // written by owner, atomic bit ops by others + + // methods for atomic operations + final boolean casPrev(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, PREV, c, v); + } + final boolean casNext(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, NEXT, c, v); + } + final int getAndUnsetStatus(int v) { // for signalling + return U.getAndBitwiseAndInt(this, STATUS, ~v); + } + final void setPrevRelaxed(Node p) { // for off-queue assignment + U.putReference(this, PREV, p); + } + final void setStatusRelaxed(int s) { // for off-queue assignment + U.putInt(this, STATUS, s); + } + final void clearStatus() { // for reducing unneeded signals + U.putIntOpaque(this, STATUS, 0); + } + + private static final long STATUS + = U.objectFieldOffset(Node.class, "status"); + private static final long NEXT + = U.objectFieldOffset(Node.class, "next"); + private static final long PREV + = U.objectFieldOffset(Node.class, "prev"); + } + + // Concrete classes tagged by type + static final class ExclusiveNode extends Node { } + static final class SharedNode extends Node { } + + static final class ConditionNode extends Node + implements ForkJoinPool.ManagedBlocker { + ConditionNode nextWaiter; // link to next waiting node + /** ! * Allows Conditions to be used in ForkJoinPools without ! * risking fixed pool exhaustion. This is usable only for ! * untimed Condition waits, not timed versions. */ ! public final boolean isReleasable() { ! return status <= 1 || Thread.currentThread().isInterrupted(); ! } ! ! public final boolean block() { ! while (!isReleasable()) LockSupport.park(this); ! return true; ! } ! } /** ! * Head of the wait queue, lazily initialized. */ private transient volatile Node head; /** ! * Tail of the wait queue. After initialization, modified only via casTail. */ private transient volatile Node tail; /** * The synchronization state.
*** 111,122 **** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protected final void setState(long newState) { ! // See JDK-8180620: Clarify VarHandle mixed-access subtleties ! STATE.setVolatile(this, newState); } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. --- 163,173 ---- * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protected final void setState(long newState) { ! state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value.
*** 127,611 **** * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(long expect, long update) { ! return STATE.compareAndSet(this, expect, update); } // Queuing utilities ! /** ! * The number of nanoseconds for which it is faster to spin ! * rather than to use timed park. A rough estimate suffices ! * to improve responsiveness with very short timeouts. ! */ ! static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; ! ! /** ! * Inserts node into queue, initializing if necessary. See picture above. ! * @param node the node to insert ! * @return node's predecessor ! */ ! private Node enq(Node node) { ! for (;;) { ! Node oldTail = tail; ! if (oldTail != null) { ! node.setPrevRelaxed(oldTail); ! if (compareAndSetTail(oldTail, node)) { ! oldTail.next = node; ! return oldTail; ! } ! } else { ! initializeSyncQueue(); ! } ! } ! } ! ! /** ! * Creates and enqueues node for current thread and given mode. ! * ! * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared ! * @return the new node ! */ ! private Node addWaiter(Node mode) { ! Node node = new Node(mode); ! ! for (;;) { ! Node oldTail = tail; ! if (oldTail != null) { ! node.setPrevRelaxed(oldTail); ! if (compareAndSetTail(oldTail, node)) { ! oldTail.next = node; ! return node; ! } ! } else { ! initializeSyncQueue(); ! } ! } ! } ! ! /** ! * Sets head of queue to be node, thus dequeuing. Called only by ! * acquire methods. Also nulls out unused fields for sake of GC ! * and to suppress unnecessary signals and traversals. ! * ! * @param node the node ! */ ! private void setHead(Node node) { ! head = node; ! node.thread = null; ! node.prev = null; } ! /** ! * Wakes up node's successor, if one exists. ! * ! * @param node the node ! */ ! private void unparkSuccessor(Node node) { ! /* ! * If status is negative (i.e., possibly needing signal) try ! * to clear in anticipation of signalling. It is OK if this ! * fails or if status is changed by waiting thread. ! */ ! int ws = node.waitStatus; ! if (ws < 0) ! node.compareAndSetWaitStatus(ws, 0); ! ! /* ! * Thread to unpark is held in successor, which is normally ! * just the next node. But if cancelled or apparently null, ! * traverse backwards from tail to find the actual ! * non-cancelled successor. ! */ ! Node s = node.next; ! if (s == null || s.waitStatus > 0) { ! s = null; ! for (Node p = tail; p != node && p != null; p = p.prev) ! if (p.waitStatus <= 0) ! s = p; ! } ! if (s != null) ! LockSupport.unpark(s.thread); } /** ! * Release action for shared mode -- signals successor and ensures ! * propagation. (Note: For exclusive mode, release just amounts ! * to calling unparkSuccessor of head if it needs signal.) ! */ ! private void doReleaseShared() { ! /* ! * Ensure that a release propagates, even if there are other ! * in-progress acquires/releases. This proceeds in the usual ! * way of trying to unparkSuccessor of head if it needs ! * signal. But if it does not, status is set to PROPAGATE to ! * ensure that upon release, propagation continues. ! * Additionally, we must loop in case a new node is added ! * while we are doing this. Also, unlike other uses of ! * unparkSuccessor, we need to know if CAS to reset status ! * fails, if so rechecking. */ for (;;) { ! Node h = head; ! if (h != null && h != tail) { ! int ws = h.waitStatus; ! if (ws == Node.SIGNAL) { ! if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) ! continue; // loop to recheck cases ! unparkSuccessor(h); ! } ! else if (ws == 0 && ! !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) ! continue; // loop on failed CAS ! } ! if (h == head) // loop if head changed break; } } - - /** - * Sets head of queue, and checks if successor may be waiting - * in shared mode, if so propagating if either propagate > 0 or - * PROPAGATE status was set. - * - * @param node the node - * @param propagate the return value from a tryAcquireShared - */ - private void setHeadAndPropagate(Node node, long propagate) { - Node h = head; // Record old head for check below - setHead(node); - /* - * Try to signal next queued node if: - * Propagation was indicated by caller, - * or was recorded (as h.waitStatus either before - * or after setHead) by a previous operation - * (note: this uses sign-check of waitStatus because - * PROPAGATE status may transition to SIGNAL.) - * and - * The next node is waiting in shared mode, - * or we don't know, because it appears null - * - * The conservatism in both of these checks may cause - * unnecessary wake-ups, but only when there are multiple - * racing acquires/releases, so most need signals now or soon - * anyway. - */ - if (propagate > 0 || h == null || h.waitStatus < 0 || - (h = head) == null || h.waitStatus < 0) { - Node s = node.next; - if (s == null || s.isShared()) - doReleaseShared(); } } ! // Utilities for various versions of acquire ! ! /** ! * Cancels an ongoing attempt to acquire. ! * ! * @param node the node ! */ ! private void cancelAcquire(Node node) { ! // Ignore if node doesn't exist ! if (node == null) ! return; ! ! node.thread = null; ! ! // Skip cancelled predecessors ! Node pred = node.prev; ! while (pred.waitStatus > 0) ! node.prev = pred = pred.prev; ! ! // predNext is the apparent node to unsplice. CASes below will ! // fail if not, in which case, we lost race vs another cancel ! // or signal, so no further action is necessary, although with ! // a possibility that a cancelled node may transiently remain ! // reachable. ! Node predNext = pred.next; ! ! // Can use unconditional write instead of CAS here. ! // After this atomic step, other Nodes can skip past us. ! // Before, we are free of interference from other threads. ! node.waitStatus = Node.CANCELLED; ! ! // If we are the tail, remove ourselves. ! if (node == tail && compareAndSetTail(node, pred)) { ! pred.compareAndSetNext(predNext, null); ! } else { ! // If successor needs signal, try to set pred's next-link ! // so it will get one. Otherwise wake it up to propagate. ! int ws; ! if (pred != head && ! ((ws = pred.waitStatus) == Node.SIGNAL || ! (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && ! pred.thread != null) { ! Node next = node.next; ! if (next != null && next.waitStatus <= 0) ! pred.compareAndSetNext(predNext, next); ! } else { ! unparkSuccessor(node); ! } ! ! node.next = node; // help GC ! } } /** ! * Checks and updates status for a node that failed to acquire. ! * Returns true if thread should block. This is the main signal ! * control in all acquire loops. Requires that pred == node.prev. ! * ! * @param pred node's predecessor holding status ! * @param node the node ! * @return {@code true} if thread should block */ ! private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { ! int ws = pred.waitStatus; ! if (ws == Node.SIGNAL) ! /* ! * This node has already set status asking a release ! * to signal it, so it can safely park. ! */ ! return true; ! if (ws > 0) { ! /* ! * Predecessor was cancelled. Skip over predecessors and ! * indicate retry. ! */ ! do { ! node.prev = pred = pred.prev; ! } while (pred.waitStatus > 0); ! pred.next = node; ! } else { ! /* ! * waitStatus must be 0 or PROPAGATE. Indicate that we ! * need a signal, but don't park yet. Caller will need to ! * retry to make sure it cannot acquire before parking. ! */ ! pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } - return false; } ! /** ! * Convenience method to interrupt current thread. ! */ ! static void selfInterrupt() { ! Thread.currentThread().interrupt(); } /** ! * Convenience method to park and then check if interrupted. * ! * @return {@code true} if interrupted ! */ ! private final boolean parkAndCheckInterrupt() { ! LockSupport.park(this); ! return Thread.interrupted(); ! } /* ! * Various flavors of acquire, varying in exclusive/shared and ! * control modes. Each is mostly the same, but annoyingly ! * different. Only a little bit of factoring is possible due to ! * interactions of exception mechanics (including ensuring that we ! * cancel if tryAcquire throws exception) and other control, at ! * least not without hurting performance too much. */ - /** - * Acquires in exclusive uninterruptible mode for thread already in - * queue. Used by condition wait methods as well as acquire. - * - * @param node the node - * @param arg the acquire argument - * @return {@code true} if interrupted while waiting - */ - final boolean acquireQueued(final Node node, long arg) { - boolean interrupted = false; - try { for (;;) { ! final Node p = node.predecessor(); ! if (p == head && tryAcquire(arg)) { ! setHead(node); ! p.next = null; // help GC ! return interrupted; ! } ! if (shouldParkAfterFailedAcquire(p, node)) ! interrupted |= parkAndCheckInterrupt(); ! } ! } catch (Throwable t) { ! cancelAcquire(node); ! if (interrupted) ! selfInterrupt(); ! throw t; } } ! ! /** ! * Acquires in exclusive interruptible mode. ! * @param arg the acquire argument ! */ ! private void doAcquireInterruptibly(long arg) ! throws InterruptedException { ! final Node node = addWaiter(Node.EXCLUSIVE); try { ! for (;;) { ! final Node p = node.predecessor(); ! if (p == head && tryAcquire(arg)) { ! setHead(node); ! p.next = null; // help GC ! return; ! } ! if (shouldParkAfterFailedAcquire(p, node) && ! parkAndCheckInterrupt()) ! throw new InterruptedException(); ! } ! } catch (Throwable t) { ! cancelAcquire(node); ! throw t; } } ! ! /** ! * Acquires in exclusive timed mode. ! * ! * @param arg the acquire argument ! * @param nanosTimeout max wait time ! * @return {@code true} if acquired ! */ ! private boolean doAcquireNanos(long arg, long nanosTimeout) ! throws InterruptedException { ! if (nanosTimeout <= 0L) ! return false; ! final long deadline = System.nanoTime() + nanosTimeout; ! final Node node = addWaiter(Node.EXCLUSIVE); ! try { ! for (;;) { ! final Node p = node.predecessor(); ! if (p == head && tryAcquire(arg)) { ! setHead(node); ! p.next = null; // help GC ! return true; } - nanosTimeout = deadline - System.nanoTime(); - if (nanosTimeout <= 0L) { - cancelAcquire(node); - return false; } ! if (shouldParkAfterFailedAcquire(p, node) && ! nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) ! LockSupport.parkNanos(this, nanosTimeout); ! if (Thread.interrupted()) ! throw new InterruptedException(); } - } catch (Throwable t) { - cancelAcquire(node); - throw t; } } /** ! * Acquires in shared uninterruptible mode. ! * @param arg the acquire argument */ ! private void doAcquireShared(long arg) { ! final Node node = addWaiter(Node.SHARED); ! boolean interrupted = false; ! try { ! for (;;) { ! final Node p = node.predecessor(); ! if (p == head) { ! long r = tryAcquireShared(arg); ! if (r >= 0) { ! setHeadAndPropagate(node, r); ! p.next = null; // help GC ! return; ! } ! } ! if (shouldParkAfterFailedAcquire(p, node)) ! interrupted |= parkAndCheckInterrupt(); ! } ! } catch (Throwable t) { ! cancelAcquire(node); ! throw t; ! } finally { ! if (interrupted) ! selfInterrupt(); } } ! ! /** ! * Acquires in shared interruptible mode. ! * @param arg the acquire argument ! */ ! private void doAcquireSharedInterruptibly(long arg) ! throws InterruptedException { ! final Node node = addWaiter(Node.SHARED); ! try { ! for (;;) { ! final Node p = node.predecessor(); ! if (p == head) { ! long r = tryAcquireShared(arg); ! if (r >= 0) { ! setHeadAndPropagate(node, r); ! p.next = null; // help GC ! return; } } ! if (shouldParkAfterFailedAcquire(p, node) && ! parkAndCheckInterrupt()) ! throw new InterruptedException(); } - } catch (Throwable t) { - cancelAcquire(node); - throw t; } } /** ! * Acquires in shared timed mode. * ! * @param arg the acquire argument ! * @param nanosTimeout max wait time ! * @return {@code true} if acquired ! */ ! private boolean doAcquireSharedNanos(long arg, long nanosTimeout) ! throws InterruptedException { ! if (nanosTimeout <= 0L) ! return false; ! final long deadline = System.nanoTime() + nanosTimeout; ! final Node node = addWaiter(Node.SHARED); ! try { ! for (;;) { ! final Node p = node.predecessor(); ! if (p == head) { ! long r = tryAcquireShared(arg); ! if (r >= 0) { ! setHeadAndPropagate(node, r); ! p.next = null; // help GC ! return true; ! } ! } ! nanosTimeout = deadline - System.nanoTime(); ! if (nanosTimeout <= 0L) { ! cancelAcquire(node); ! return false; ! } ! if (shouldParkAfterFailedAcquire(p, node) && ! nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) ! LockSupport.parkNanos(this, nanosTimeout); ! if (Thread.interrupted()) ! throw new InterruptedException(); ! } ! } catch (Throwable t) { ! cancelAcquire(node); ! throw t; } } // Main exported methods /** --- 178,415 ---- * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(long expect, long update) { ! return U.compareAndSetLong(this, STATE, expect, update); } // Queuing utilities ! private boolean casTail(Node c, Node v) { ! return U.compareAndSetReference(this, TAIL, c, v); } ! /** tries once to CAS a new dummy node for head */ ! private void tryInitializeHead() { ! Node h = new ExclusiveNode(); ! if (U.compareAndSetReference(this, HEAD, null, h)) ! tail = h; } /** ! * Enqueues the node unless null. (Currently used only for ! * ConditionNodes; other cases are interleaved with acquires.) */ + final void enqueue(Node node) { + if (node != null) { for (;;) { ! Node t = tail; ! node.setPrevRelaxed(t); // avoid unnecessary fence ! if (t == null) // initialize ! tryInitializeHead(); ! else if (casTail(t, node)) { ! t.next = node; ! if (t.status < 0) // wake up to clean link ! LockSupport.unpark(node.waiter); break; } } } } ! /** Returns true if node is found in traversal from tail */ ! final boolean isEnqueued(Node node) { ! for (Node t = tail; t != null; t = t.prev) ! if (t == node) ! return true; ! return false; } /** ! * Wakes up the successor of given node, if one exists, and unsets its ! * WAITING status to avoid park race. This may fail to wake up an ! * eligible thread when one or more have been cancelled, but ! * cancelAcquire ensures liveness. */ ! private static void signalNext(Node h) { ! Node s; ! if (h != null && (s = h.next) != null && s.status != 0) { ! s.getAndUnsetStatus(WAITING); ! LockSupport.unpark(s.waiter); } } ! /** Wakes up the given node if in shared mode */ ! private static void signalNextIfShared(Node h) { ! Node s; ! if (h != null && (s = h.next) != null && ! (s instanceof SharedNode) && s.status != 0) { ! s.getAndUnsetStatus(WAITING); ! LockSupport.unpark(s.waiter); ! } } /** ! * Main acquire method, invoked by all exported acquire methods. * ! * @param node null unless a reacquiring Condition ! * @param arg the acquire argument ! * @param shared true if shared mode else exclusive ! * @param interruptible if abort and return negative on interrupt ! * @param timed if true use timed waits ! * @param time if timed, the System.nanoTime value to timeout ! * @return positive if acquired, 0 if timed out, negative if interrupted ! */ ! final int acquire(Node node, long arg, boolean shared, ! boolean interruptible, boolean timed, long time) { ! Thread current = Thread.currentThread(); ! byte spins = 0, postSpins = 0; // retries upon unpark of first thread ! boolean interrupted = false, first = false; ! Node pred = null; // predecessor of node when enqueued /* ! * Repeatedly: ! * Check if node now first ! * if so, ensure head stable, else ensure valid predecessor ! * if node is first or not yet enqueued, try acquiring ! * else if node not yet created, create it ! * else if not yet enqueued, try once to enqueue ! * else if woken from park, retry (up to postSpins times) ! * else if WAITING status not set, set and retry ! * else park and clear WAITING status, and check cancellation */ for (;;) { ! if (!first && (pred = (node == null) ? null : node.prev) != null && ! !(first = (head == pred))) { ! if (pred.status < 0) { ! cleanQueue(); // predecessor cancelled ! continue; ! } else if (pred.prev == null) { ! Thread.onSpinWait(); // ensure serialization ! continue; } } ! if (first || pred == null) { ! boolean acquired; try { ! if (shared) ! acquired = (tryAcquireShared(arg) >= 0); ! else ! acquired = tryAcquire(arg); ! } catch (Throwable ex) { ! cancelAcquire(node, interrupted, false); ! throw ex; } + if (acquired) { + if (first) { + node.prev = null; + head = node; + pred.next = null; + node.waiter = null; + if (shared) + signalNextIfShared(node); + if (interrupted) + current.interrupt(); } ! return 1; } } ! if (node == null) { // allocate; retry before enqueue ! if (shared) ! node = new SharedNode(); ! else ! node = new ExclusiveNode(); ! } else if (pred == null) { // try to enqueue ! node.waiter = current; ! Node t = tail; ! node.setPrevRelaxed(t); // avoid unnecessary fence ! if (t == null) ! tryInitializeHead(); ! else if (!casTail(t, node)) ! node.setPrevRelaxed(null); // back out ! else ! t.next = node; ! } else if (first && spins != 0) { ! --spins; // reduce unfairness on rewaits ! Thread.onSpinWait(); ! } else if (node.status == 0) { ! node.status = WAITING; // enable signal and recheck ! } else { ! long nanos; ! spins = postSpins = (byte)((postSpins << 1) | 1); ! if (!timed) ! LockSupport.park(this); ! else if ((nanos = time - System.nanoTime()) > 0L) ! LockSupport.parkNanos(this, nanos); ! else ! break; ! node.clearStatus(); ! if ((interrupted |= Thread.interrupted()) && interruptible) ! break; } } + return cancelAcquire(node, interrupted, interruptible); } /** ! * Possibly repeatedly traverses from tail, unsplicing cancelled ! * nodes until none are found. */ ! private void cleanQueue() { ! for (;;) { // restart point ! for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples ! if (q == null || (p = q.prev) == null) ! return; // end of list ! if (s == null ? tail != q : (s.prev != q || s.status < 0)) ! break; // inconsistent ! if (q.status < 0) { // cancelled ! if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && ! q.prev == p) { ! p.casNext(q, s); // OK if fails ! if (p.prev == null) ! signalNext(p); } + break; } ! if ((n = p.next) != q) { // help finish ! if (n != null && q.prev == p) { ! p.casNext(n, q); ! if (p.prev == null) ! signalNext(p); } + break; } ! s = q; ! q = q.prev; } } } /** ! * Cancels an ongoing attempt to acquire. * ! * @param node the node (may be null if cancelled before enqueuing) ! * @param interrupted true if thread interrupted ! * @param interruptible if should report interruption vs reset ! */ ! private int cancelAcquire(Node node, boolean interrupted, ! boolean interruptible) { ! if (node != null) { ! node.waiter = null; ! node.status = CANCELLED; ! if (node.prev != null) ! cleanQueue(); ! } ! if (interrupted) { ! if (interruptible) ! return CANCELLED; ! else ! Thread.currentThread().interrupt(); } + return 0; } // Main exported methods /**
*** 754,766 **** * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(long arg) { ! if (!tryAcquire(arg) && ! acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) ! selfInterrupt(); } /** * Acquires in exclusive mode, aborting if interrupted. * Implemented by first checking interrupt status, then invoking --- 558,569 ---- * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(long arg) { ! if (!tryAcquire(arg)) ! acquire(null, arg, false, false, false, 0L); } /** * Acquires in exclusive mode, aborting if interrupted. * Implemented by first checking interrupt status, then invoking
*** 775,788 **** * can represent anything you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireInterruptibly(long arg) throws InterruptedException { ! if (Thread.interrupted()) throw new InterruptedException(); - if (!tryAcquire(arg)) - doAcquireInterruptibly(arg); } /** * Attempts to acquire in exclusive mode, aborting if interrupted, * and failing if the given timeout elapses. Implemented by first --- 578,590 ---- * can represent anything you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireInterruptibly(long arg) throws InterruptedException { ! if (Thread.interrupted() || ! (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) throw new InterruptedException(); } /** * Attempts to acquire in exclusive mode, aborting if interrupted, * and failing if the given timeout elapses. Implemented by first
*** 800,813 **** * @return {@code true} if acquired; {@code false} if timed out * @throws InterruptedException if the current thread is interrupted */ public final boolean tryAcquireNanos(long arg, long nanosTimeout) throws InterruptedException { ! if (Thread.interrupted()) throw new InterruptedException(); - return tryAcquire(arg) || - doAcquireNanos(arg, nanosTimeout); } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. --- 602,624 ---- * @return {@code true} if acquired; {@code false} if timed out * @throws InterruptedException if the current thread is interrupted */ public final boolean tryAcquireNanos(long arg, long nanosTimeout) throws InterruptedException { ! if (!Thread.interrupted()) { ! if (tryAcquire(arg)) ! return true; ! if (nanosTimeout <= 0L) ! return false; ! int stat = acquire(null, arg, false, true, true, ! System.nanoTime() + nanosTimeout); ! if (stat > 0) ! return true; ! if (stat == 0) ! return false; ! } throw new InterruptedException(); } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true.
*** 818,830 **** * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(long arg) { if (tryRelease(arg)) { ! Node h = head; ! if (h != null && h.waitStatus != 0) ! unparkSuccessor(h); return true; } return false; } --- 629,639 ---- * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(long arg) { if (tryRelease(arg)) { ! signalNext(head); return true; } return false; }
*** 839,849 **** * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(long arg) { if (tryAcquireShared(arg) < 0) ! doAcquireShared(arg); } /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once --- 648,658 ---- * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(long arg) { if (tryAcquireShared(arg) < 0) ! acquire(null, arg, true, false, false, 0L); } /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once
*** 857,870 **** * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(long arg) throws InterruptedException { ! if (Thread.interrupted()) throw new InterruptedException(); - if (tryAcquireShared(arg) < 0) - doAcquireSharedInterruptibly(arg); } /** * Attempts to acquire in shared mode, aborting if interrupted, and * failing if the given timeout elapses. Implemented by first --- 666,679 ---- * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(long arg) throws InterruptedException { ! if (Thread.interrupted() || ! (tryAcquireShared(arg) < 0 && ! acquire(null, arg, true, true, false, 0L) < 0)) throw new InterruptedException(); } /** * Attempts to acquire in shared mode, aborting if interrupted, and * failing if the given timeout elapses. Implemented by first
*** 881,894 **** * @return {@code true} if acquired; {@code false} if timed out * @throws InterruptedException if the current thread is interrupted */ public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) throws InterruptedException { ! if (Thread.interrupted()) throw new InterruptedException(); - return tryAcquireShared(arg) >= 0 || - doAcquireSharedNanos(arg, nanosTimeout); } /** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. --- 690,712 ---- * @return {@code true} if acquired; {@code false} if timed out * @throws InterruptedException if the current thread is interrupted */ public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) throws InterruptedException { ! if (!Thread.interrupted()) { ! if (tryAcquireShared(arg) >= 0) ! return true; ! if (nanosTimeout <= 0L) ! return false; ! int stat = acquire(null, arg, true, true, true, ! System.nanoTime() + nanosTimeout); ! if (stat > 0) ! return true; ! if (stat == 0) ! return false; ! } throw new InterruptedException(); } /** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true.
*** 898,908 **** * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(long arg) { if (tryReleaseShared(arg)) { ! doReleaseShared(); return true; } return false; } --- 716,726 ---- * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(long arg) { if (tryReleaseShared(arg)) { ! signalNext(head); return true; } return false; }
*** 916,926 **** * * @return {@code true} if there may be other threads waiting to acquire */ public final boolean hasQueuedThreads() { for (Node p = tail, h = head; p != h && p != null; p = p.prev) ! if (p.waitStatus <= 0) return true; return false; } /** --- 734,744 ---- * * @return {@code true} if there may be other threads waiting to acquire */ public final boolean hasQueuedThreads() { for (Node p = tail, h = head; p != h && p != null; p = p.prev) ! if (p.status >= 0) return true; return false; } /**
*** 946,994 **** * * @return the first (longest-waiting) thread in the queue, or * {@code null} if no threads are currently queued */ public final Thread getFirstQueuedThread() { ! // handle only fast path, else relay ! return (head == tail) ? null : fullGetFirstQueuedThread(); ! } ! ! /** ! * Version of getFirstQueuedThread called when fastpath fails. ! */ ! private Thread fullGetFirstQueuedThread() { ! /* ! * The first node is normally head.next. Try to get its ! * thread field, ensuring consistent reads: If thread ! * field is nulled out or s.prev is no longer head, then ! * some other thread(s) concurrently performed setHead in ! * between some of our reads. We try this twice before ! * resorting to traversal. ! */ ! Node h, s; ! Thread st; ! if (((h = head) != null && (s = h.next) != null && ! s.prev == head && (st = s.thread) != null) || ! ((h = head) != null && (s = h.next) != null && ! s.prev == head && (st = s.thread) != null)) ! return st; ! ! /* ! * Head's next field might not have been set yet, or may have ! * been unset after setHead. So we must check to see if tail ! * is actually first node. If not, we continue on, safely ! * traversing from tail back to head to find first, ! * guaranteeing termination. ! */ ! ! Thread firstThread = null; ! for (Node p = tail; p != null && p != head; p = p.prev) { ! Thread t = p.thread; ! if (t != null) ! firstThread = t; } ! return firstThread; } /** * Returns true if the given thread is currently queued. * --- 764,783 ---- * * @return the first (longest-waiting) thread in the queue, or * {@code null} if no threads are currently queued */ public final Thread getFirstQueuedThread() { ! Thread first = null, w; Node h, s; ! if ((h = head) != null && ((s = h.next) == null || ! (first = s.waiter) == null || ! s.prev == null)) { ! // traverse from tail on stale reads ! for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) ! if ((w = p.waiter) != null) ! first = w; } ! return first; } /** * Returns true if the given thread is currently queued. *
*** 1001,1011 **** */ public final boolean isQueued(Thread thread) { if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) ! if (p.thread == thread) return true; return false; } /** --- 790,800 ---- */ public final boolean isQueued(Thread thread) { if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) ! if (p.waiter == thread) return true; return false; } /**
*** 1017,1030 **** * is not the first queued thread. Used only as a heuristic in * ReentrantReadWriteLock. */ final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; ! return (h = head) != null && ! (s = h.next) != null && ! !s.isShared() && ! s.thread != null; } /** * Queries whether any threads have been waiting to acquire longer * than the current thread. --- 806,817 ---- * is not the first queued thread. Used only as a heuristic in * ReentrantReadWriteLock. */ final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; ! return (h = head) != null && (s = h.next) != null && ! !(s instanceof SharedNode) && s.waiter != null; } /** * Queries whether any threads have been waiting to acquire longer * than the current thread.
*** 1050,1060 **** * (unless this is a reentrant acquire). For example, the {@code * tryAcquire} method for a fair, reentrant, exclusive mode * synchronizer might look like this: * * <pre> {@code ! * protected boolean tryAcquire(int arg) { * if (isHeldExclusively()) { * // A reentrant acquire; increment hold count * return true; * } else if (hasQueuedPredecessors()) { * return false; --- 837,847 ---- * (unless this is a reentrant acquire). For example, the {@code * tryAcquire} method for a fair, reentrant, exclusive mode * synchronizer might look like this: * * <pre> {@code ! * protected boolean tryAcquire(long arg) { * if (isHeldExclusively()) { * // A reentrant acquire; increment hold count * return true; * } else if (hasQueuedPredecessors()) { * return false;
*** 1067,1089 **** * current thread, and {@code false} if the current thread * is at the head of the queue or the queue is empty * @since 1.7 */ public final boolean hasQueuedPredecessors() { ! Node h, s; ! if ((h = head) != null) { ! if ((s = h.next) == null || s.waitStatus > 0) { ! s = null; // traverse in case of concurrent cancellation ! for (Node p = tail; p != h && p != null; p = p.prev) { ! if (p.waitStatus <= 0) ! s = p; ! } ! } ! if (s != null && s.thread != Thread.currentThread()) ! return true; ! } ! return false; } // Instrumentation and monitoring methods /** --- 854,869 ---- * current thread, and {@code false} if the current thread * is at the head of the queue or the queue is empty * @since 1.7 */ public final boolean hasQueuedPredecessors() { ! Thread first = null; Node h, s; ! if ((h = head) != null && ((s = h.next) == null || ! (first = s.waiter) == null || ! s.prev == null)) ! first = getFirstQueuedThread(); // retry via getFirstQueuedThread ! return first != null && first != Thread.currentThread(); } // Instrumentation and monitoring methods /**
*** 1096,1106 **** * @return the estimated number of threads waiting to acquire */ public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { ! if (p.thread != null) ++n; } return n; } --- 876,886 ---- * @return the estimated number of threads waiting to acquire */ public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { ! if (p.waiter != null) ++n; } return n; }
*** 1116,1126 **** * @return the collection of threads */ public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { ! Thread t = p.thread; if (t != null) list.add(t); } return list; } --- 896,906 ---- * @return the collection of threads */ public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { ! Thread t = p.waiter; if (t != null) list.add(t); } return list; }
*** 1134,1145 **** * @return the collection of threads */ public final Collection<Thread> getExclusiveQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { ! if (!p.isShared()) { ! Thread t = p.thread; if (t != null) list.add(t); } } return list; --- 914,925 ---- * @return the collection of threads */ public final Collection<Thread> getExclusiveQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { ! if (!(p instanceof SharedNode)) { ! Thread t = p.waiter; if (t != null) list.add(t); } } return list;
*** 1154,1165 **** * @return the collection of threads */ public final Collection<Thread> getSharedQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { ! if (p.isShared()) { ! Thread t = p.thread; if (t != null) list.add(t); } } return list; --- 934,945 ---- * @return the collection of threads */ public final Collection<Thread> getSharedQueuedThreads() { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { ! if (p instanceof SharedNode) { ! Thread t = p.waiter; if (t != null) list.add(t); } } return list;
*** 1178,1298 **** return super.toString() + "[State = " + getState() + ", " + (hasQueuedThreads() ? "non" : "") + "empty queue]"; } - - // Internal support methods for Conditions - - /** - * Returns true if a node, always one that was initially placed on - * a condition queue, is now waiting to reacquire on sync queue. - * @param node the node - * @return true if is reacquiring - */ - final boolean isOnSyncQueue(Node node) { - if (node.waitStatus == Node.CONDITION || node.prev == null) - return false; - if (node.next != null) // If has successor, it must be on queue - return true; - /* - * node.prev can be non-null, but not yet on queue because - * the CAS to place it on queue can fail. So we have to - * traverse from tail to make sure it actually made it. It - * will always be near the tail in calls to this method, and - * unless the CAS failed (which is unlikely), it will be - * there, so we hardly ever traverse much. - */ - return findNodeFromTail(node); - } - - /** - * Returns true if node is on sync queue by searching backwards from tail. - * Called only when needed by isOnSyncQueue. - * @return true if present - */ - private boolean findNodeFromTail(Node node) { - // We check for node first, since it's likely to be at or near tail. - // tail is known to be non-null, so we could re-order to "save" - // one null check, but we leave it this way to help the VM. - for (Node p = tail;;) { - if (p == node) - return true; - if (p == null) - return false; - p = p.prev; - } - } - - /** - * Transfers a node from a condition queue onto sync queue. - * Returns true if successful. - * @param node the node - * @return true if successfully transferred (else the node was - * cancelled before signal) - */ - final boolean transferForSignal(Node node) { - /* - * If cannot change waitStatus, the node has been cancelled. - */ - if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) - return false; - - /* - * Splice onto queue and try to set waitStatus of predecessor to - * indicate that thread is (probably) waiting. If cancelled or - * attempt to set waitStatus fails, wake up to resync (in which - * case the waitStatus can be transiently and harmlessly wrong). - */ - Node p = enq(node); - int ws = p.waitStatus; - if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) - LockSupport.unpark(node.thread); - return true; - } - - /** - * Transfers node, if necessary, to sync queue after a cancelled wait. - * Returns true if thread was cancelled before being signalled. - * - * @param node the node - * @return true if cancelled before the node was signalled - */ - final boolean transferAfterCancelledWait(Node node) { - if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { - enq(node); - return true; - } - /* - * If we lost out to a signal(), then we can't proceed - * until it finishes its enq(). Cancelling during an - * incomplete transfer is both rare and transient, so just - * spin. - */ - while (!isOnSyncQueue(node)) - Thread.yield(); - return false; - } - - /** - * Invokes release with current state value; returns saved state. - * Cancels node and throws exception on failure. - * @param node the condition node for this wait - * @return previous sync state - */ - final long fullyRelease(Node node) { - try { - long savedState = getState(); - if (release(savedState)) - return savedState; - throw new IllegalMonitorStateException(); - } catch (Throwable t) { - node.waitStatus = Node.CANCELLED; - throw t; - } - } - // Instrumentation methods for conditions /** * Queries whether the given ConditionObject * uses this synchronizer as its lock. --- 958,967 ----
*** 1382,1526 **** * condition semantics that rely on those of the associated * {@code AbstractQueuedLongSynchronizer}. * * <p>This class is Serializable, but all fields are transient, * so deserialized conditions have no waiters. - * - * @since 1.6 */ public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ ! private transient Node firstWaiter; /** Last node of condition queue. */ ! private transient Node lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } ! // Internal methods /** ! * Adds a new waiter to wait queue. ! * @return its new wait node */ ! private Node addConditionWaiter() { ! if (!isHeldExclusively()) ! throw new IllegalMonitorStateException(); ! Node t = lastWaiter; ! // If lastWaiter is cancelled, clean out. ! if (t != null && t.waitStatus != Node.CONDITION) { ! unlinkCancelledWaiters(); ! t = lastWaiter; ! } ! ! Node node = new Node(Node.CONDITION); ! ! if (t == null) ! firstWaiter = node; ! else ! t.nextWaiter = node; ! lastWaiter = node; ! return node; ! } ! ! /** ! * Removes and transfers nodes until hit non-cancelled one or ! * null. Split out from signal in part to encourage compilers ! * to inline the case of no waiters. ! * @param first (non-null) the first node on condition queue ! */ ! private void doSignal(Node first) { ! do { ! if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; ! first.nextWaiter = null; ! } while (!transferForSignal(first) && ! (first = firstWaiter) != null); } - - /** - * Removes and transfers all nodes. - * @param first (non-null) the first node on condition queue - */ - private void doSignalAll(Node first) { - lastWaiter = firstWaiter = null; - do { - Node next = first.nextWaiter; - first.nextWaiter = null; - transferForSignal(first); first = next; - } while (first != null); } - - /** - * Unlinks cancelled waiter nodes from condition queue. - * Called only while holding lock. This is called when - * cancellation occurred during condition wait, and upon - * insertion of a new waiter when lastWaiter is seen to have - * been cancelled. This method is needed to avoid garbage - * retention in the absence of signals. So even though it may - * require a full traversal, it comes into play only when - * timeouts or cancellations occur in the absence of - * signals. It traverses all nodes rather than stopping at a - * particular target to unlink all pointers to garbage nodes - * without requiring many re-traversals during cancellation - * storms. - */ - private void unlinkCancelledWaiters() { - Node t = firstWaiter; - Node trail = null; - while (t != null) { - Node next = t.nextWaiter; - if (t.waitStatus != Node.CONDITION) { - t.nextWaiter = null; - if (trail == null) - firstWaiter = next; - else - trail.nextWaiter = next; - if (next == null) - lastWaiter = trail; - } - else - trail = t; - t = next; } - } - - // public methods /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - Node first = firstWaiter; if (first != null) ! doSignal(first); } /** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - Node first = firstWaiter; if (first != null) ! doSignalAll(first); } /** * Implements uninterruptible condition wait. * <ol> --- 1051,1182 ---- * condition semantics that rely on those of the associated * {@code AbstractQueuedLongSynchronizer}. * * <p>This class is Serializable, but all fields are transient, * so deserialized conditions have no waiters. */ public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ ! private transient ConditionNode firstWaiter; /** Last node of condition queue. */ ! private transient ConditionNode lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } ! // Signalling methods /** ! * Removes and transfers one or all waiters to sync queue. */ ! private void doSignal(ConditionNode first, boolean all) { ! while (first != null) { ! ConditionNode next = first.nextWaiter; ! if ((firstWaiter = next) == null) lastWaiter = null; ! if ((first.getAndUnsetStatus(COND) & COND) != 0) { ! enqueue(first); ! if (!all) ! break; } first = next; } } /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { + ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); if (first != null) ! doSignal(first, false); } /** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signalAll() { + ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); if (first != null) ! doSignal(first, true); ! } ! ! // Waiting methods ! ! /** ! * Adds node to condition list and releases lock. ! * ! * @param node the node ! * @return savedState to reacquire after wait ! */ ! private long enableWait(ConditionNode node) { ! if (isHeldExclusively()) { ! node.waiter = Thread.currentThread(); ! node.setStatusRelaxed(COND | WAITING); ! ConditionNode last = lastWaiter; ! if (last == null) ! firstWaiter = node; ! else ! last.nextWaiter = node; ! lastWaiter = node; ! long savedState = getState(); ! if (release(savedState)) ! return savedState; ! } ! node.status = CANCELLED; // lock not held or inconsistent ! throw new IllegalMonitorStateException(); ! } ! ! /** ! * Returns true if a node that was initially placed on a condition ! * queue is now ready to reacquire on sync queue. ! * @param node the node ! * @return true if is reacquiring ! */ ! private boolean canReacquire(ConditionNode node) { ! // check links, not status to avoid enqueue race ! return node != null && node.prev != null && isEnqueued(node); ! } ! ! /** ! * Unlinks the given node and other non-waiting nodes from ! * condition queue unless already unlinked. ! */ ! private void unlinkCancelledWaiters(ConditionNode node) { ! if (node == null || node.nextWaiter != null || node == lastWaiter) { ! ConditionNode w = firstWaiter, trail = null; ! while (w != null) { ! ConditionNode next = w.nextWaiter; ! if ((w.status & COND) == 0) { ! w.nextWaiter = null; ! if (trail == null) ! firstWaiter = next; ! else ! trail.nextWaiter = next; ! if (next == null) ! lastWaiter = trail; ! } else ! trail = w; ! w = next; ! } ! } } /** * Implements uninterruptible condition wait. * <ol>
*** 1531,1585 **** * <li>Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ public final void awaitUninterruptibly() { ! Node node = addConditionWaiter(); ! long savedState = fullyRelease(node); boolean interrupted = false; ! while (!isOnSyncQueue(node)) { ! LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } ! if (acquireQueued(node, savedState) || interrupted) ! selfInterrupt(); ! } ! ! /* ! * For interruptible waits, we need to track whether to throw ! * InterruptedException, if interrupted while blocked on ! * condition, versus reinterrupt current thread, if ! * interrupted while blocked waiting to re-acquire. ! */ ! ! /** Mode meaning to reinterrupt on exit from wait */ ! private static final int REINTERRUPT = 1; ! /** Mode meaning to throw InterruptedException on exit from wait */ ! private static final int THROW_IE = -1; ! ! /** ! * Checks for interrupt, returning THROW_IE if interrupted ! * before signalled, REINTERRUPT if after signalled, or ! * 0 if not interrupted. ! */ ! private int checkInterruptWhileWaiting(Node node) { ! return Thread.interrupted() ? ! (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : ! 0; } ! ! /** ! * Throws InterruptedException, reinterrupts current thread, or ! * does nothing, depending on mode. ! */ ! private void reportInterruptAfterWait(int interruptMode) ! throws InterruptedException { ! if (interruptMode == THROW_IE) ! throw new InterruptedException(); ! else if (interruptMode == REINTERRUPT) ! selfInterrupt(); } /** * Implements interruptible condition wait. * <ol> --- 1187,1217 ---- * <li>Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ public final void awaitUninterruptibly() { ! ConditionNode node = new ConditionNode(); ! long savedState = enableWait(node); ! LockSupport.setCurrentBlocker(this); // for back-compatibility boolean interrupted = false; ! while (!canReacquire(node)) { if (Thread.interrupted()) interrupted = true; + else if ((node.status & COND) != 0) { + try { + ForkJoinPool.managedBlock(node); + } catch (InterruptedException ie) { + interrupted = true; } ! } else ! Thread.onSpinWait(); // awoke while enqueuing } ! LockSupport.setCurrentBlocker(null); ! node.clearStatus(); ! acquire(node, savedState, false, false, false, 0L); ! if (interrupted) ! Thread.currentThread().interrupt(); } /** * Implements interruptible condition wait. * <ol>
*** 1594,1617 **** * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); ! Node node = addConditionWaiter(); ! long savedState = fullyRelease(node); ! int interruptMode = 0; ! while (!isOnSyncQueue(node)) { ! LockSupport.park(this); ! if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) ! break; } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) // clean up if cancelled - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); } /** * Implements timed condition wait. * <ol> --- 1226,1262 ---- * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); ! ConditionNode node = new ConditionNode(); ! long savedState = enableWait(node); ! LockSupport.setCurrentBlocker(this); // for back-compatibility ! boolean interrupted = false, cancelled = false; ! while (!canReacquire(node)) { ! if (interrupted |= Thread.interrupted()) { ! if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) ! break; // else interrupted after signal ! } else if ((node.status & COND) != 0) { ! try { ! ForkJoinPool.managedBlock(node); ! } catch (InterruptedException ie) { ! interrupted = true; ! } ! } else ! Thread.onSpinWait(); // awoke while enqueuing ! } ! LockSupport.setCurrentBlocker(null); ! node.clearStatus(); ! acquire(node, savedState, false, false, false, 0L); ! if (interrupted) { ! if (cancelled) { ! unlinkCancelledWaiters(node); ! throw new InterruptedException(); ! } ! Thread.currentThread().interrupt(); } } /** * Implements timed condition wait. * <ol>
*** 1627,1662 **** */ public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); ! // We don't check for nanosTimeout <= 0L here, to allow ! // awaitNanos(0) as a way to "yield the lock". ! final long deadline = System.nanoTime() + nanosTimeout; ! long initialNanos = nanosTimeout; ! Node node = addConditionWaiter(); ! long savedState = fullyRelease(node); ! int interruptMode = 0; ! while (!isOnSyncQueue(node)) { ! if (nanosTimeout <= 0L) { ! transferAfterCancelledWait(node); ! break; ! } ! if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) ! LockSupport.parkNanos(this, nanosTimeout); ! if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; ! nanosTimeout = deadline - System.nanoTime(); } ! if (acquireQueued(node, savedState) && interruptMode != THROW_IE) ! interruptMode = REINTERRUPT; ! if (node.nextWaiter != null) ! unlinkCancelledWaiters(); ! if (interruptMode != 0) ! reportInterruptAfterWait(interruptMode); long remaining = deadline - System.nanoTime(); // avoid overflow ! return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; } /** * Implements absolute timed condition wait. * <ol> --- 1272,1304 ---- */ public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); ! ConditionNode node = new ConditionNode(); ! long savedState = enableWait(node); ! long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; ! long deadline = System.nanoTime() + nanos; ! boolean cancelled = false, interrupted = false; ! while (!canReacquire(node)) { ! if ((interrupted |= Thread.interrupted()) || ! (nanos = deadline - System.nanoTime()) <= 0L) { ! if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) break; ! } else ! LockSupport.parkNanos(this, nanos); } ! node.clearStatus(); ! acquire(node, savedState, false, false, false, 0L); ! if (cancelled) { ! unlinkCancelledWaiters(node); ! if (interrupted) ! throw new InterruptedException(); ! } else if (interrupted) ! Thread.currentThread().interrupt(); long remaining = deadline - System.nanoTime(); // avoid overflow ! return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; } /** * Implements absolute timed condition wait. * <ol>
*** 1674,1703 **** public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); ! Node node = addConditionWaiter(); ! long savedState = fullyRelease(node); ! boolean timedout = false; ! int interruptMode = 0; ! while (!isOnSyncQueue(node)) { ! if (System.currentTimeMillis() >= abstime) { ! timedout = transferAfterCancelledWait(node); break; ! } LockSupport.parkUntil(this, abstime); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; } ! if (acquireQueued(node, savedState) && interruptMode != THROW_IE) ! interruptMode = REINTERRUPT; ! if (node.nextWaiter != null) ! unlinkCancelledWaiters(); ! if (interruptMode != 0) ! reportInterruptAfterWait(interruptMode); ! return !timedout; } /** * Implements timed condition wait. * <ol> --- 1316,1345 ---- public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); ! ConditionNode node = new ConditionNode(); ! long savedState = enableWait(node); ! boolean cancelled = false, interrupted = false; ! while (!canReacquire(node)) { ! if ((interrupted |= Thread.interrupted()) || ! System.currentTimeMillis() >= abstime) { ! if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) break; ! } else LockSupport.parkUntil(this, abstime); } ! node.clearStatus(); ! acquire(node, savedState, false, false, false, 0L); ! if (cancelled) { ! unlinkCancelledWaiters(node); ! if (interrupted) ! throw new InterruptedException(); ! } else if (interrupted) ! Thread.currentThread().interrupt(); ! return !cancelled; } /** * Implements timed condition wait. * <ol>
*** 1715,1749 **** public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); ! // We don't check for nanosTimeout <= 0L here, to allow ! // await(0, unit) as a way to "yield the lock". ! final long deadline = System.nanoTime() + nanosTimeout; ! Node node = addConditionWaiter(); ! long savedState = fullyRelease(node); ! boolean timedout = false; ! int interruptMode = 0; ! while (!isOnSyncQueue(node)) { ! if (nanosTimeout <= 0L) { ! timedout = transferAfterCancelledWait(node); break; } ! if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) ! LockSupport.parkNanos(this, nanosTimeout); ! if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) ! break; ! nanosTimeout = deadline - System.nanoTime(); ! } ! if (acquireQueued(node, savedState) && interruptMode != THROW_IE) ! interruptMode = REINTERRUPT; ! if (node.nextWaiter != null) ! unlinkCancelledWaiters(); ! if (interruptMode != 0) ! reportInterruptAfterWait(interruptMode); ! return !timedout; } // support for instrumentation /** --- 1357,1388 ---- public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); ! ConditionNode node = new ConditionNode(); ! long savedState = enableWait(node); ! long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; ! long deadline = System.nanoTime() + nanos; ! boolean cancelled = false, interrupted = false; ! while (!canReacquire(node)) { ! if ((interrupted |= Thread.interrupted()) || ! (nanos = deadline - System.nanoTime()) <= 0L) { ! if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) break; + } else + LockSupport.parkNanos(this, nanos); } ! node.clearStatus(); ! acquire(node, savedState, false, false, false, 0L); ! if (cancelled) { ! unlinkCancelledWaiters(node); ! if (interrupted) ! throw new InterruptedException(); ! } else if (interrupted) ! Thread.currentThread().interrupt(); ! return !cancelled; } // support for instrumentation /**
*** 1765,1776 **** * returns {@code false} */ protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ! for (Node w = firstWaiter; w != null; w = w.nextWaiter) { ! if (w.waitStatus == Node.CONDITION) return true; } return false; } --- 1404,1415 ---- * returns {@code false} */ protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ! for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { ! if ((w.status & COND) != 0) return true; } return false; }
*** 1785,1796 **** */ protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; ! for (Node w = firstWaiter; w != null; w = w.nextWaiter) { ! if (w.waitStatus == Node.CONDITION) ++n; } return n; } --- 1424,1435 ---- */ protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; ! for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { ! if ((w.status & COND) != 0) ++n; } return n; }
*** 1805,1856 **** */ protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<>(); ! for (Node w = firstWaiter; w != null; w = w.nextWaiter) { ! if (w.waitStatus == Node.CONDITION) { ! Thread t = w.thread; if (t != null) list.add(t); } } return list; } } ! // VarHandle mechanics ! private static final VarHandle STATE; ! private static final VarHandle HEAD; ! private static final VarHandle TAIL; static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - STATE = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "state", long.class); - HEAD = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "head", Node.class); - TAIL = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "tail", Node.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } - - // Reduce the risk of rare disastrous classloading in first call to - // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class<?> ensureLoaded = LockSupport.class; } - - /** - * Initializes head and tail fields on first contention. - */ - private final void initializeSyncQueue() { - Node h; - if (HEAD.compareAndSet(this, null, (h = new Node()))) - tail = h; - } - - /** - * CASes tail field. - */ - private final boolean compareAndSetTail(Node expect, Node update) { - return TAIL.compareAndSet(this, expect, update); - } } --- 1444,1472 ---- */ protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<>(); ! for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { ! if ((w.status & COND) != 0) { ! Thread t = w.waiter; if (t != null) list.add(t); } } return list; } } ! // Unsafe ! private static final Unsafe U = Unsafe.getUnsafe(); ! private static final long STATE ! = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state"); ! private static final long HEAD ! = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head"); ! private static final long TAIL ! = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail"); static { Class<?> ensureLoaded = LockSupport.class; } }
< prev index next >