diff --git a/src/java.base/share/classes/java/util/concurrent/ConcurrentHashMap.java b/src/java.base/share/classes/java/util/concurrent/ConcurrentHashMap.java --- a/src/java.base/share/classes/java/util/concurrent/ConcurrentHashMap.java +++ b/src/java.base/share/classes/java/util/concurrent/ConcurrentHashMap.java @@ -385,7 +385,7 @@ * cases where old nodes can be reused because their next fields * won't change. On average, only about one-sixth of them need * cloning when a table doubles. The nodes they replace will be - * garbage collectable as soon as they are no longer referenced by + * garbage collectible as soon as they are no longer referenced by * any reader thread that may be in the midst of concurrently * traversing table. Upon transfer, the old table bin contains * only a special forwarding node (with hash field "MOVED") that @@ -3286,9 +3286,8 @@ return true; } - private static final Unsafe U = Unsafe.getUnsafe(); private static final long LOCKSTATE - = U.objectFieldOffset(TreeBin.class, "lockState"); + = U.objectFieldOffset(TreeBin.class, "lockState"); } /* ----------------Table Traversal -------------- */ @@ -6345,28 +6344,20 @@ // Unsafe mechanics private static final Unsafe U = Unsafe.getUnsafe(); - private static final long SIZECTL; - private static final long TRANSFERINDEX; - private static final long BASECOUNT; - private static final long CELLSBUSY; - private static final long CELLVALUE; - private static final int ABASE; + private static final long SIZECTL + = U.objectFieldOffset(ConcurrentHashMap.class, "sizeCtl"); + private static final long TRANSFERINDEX + = U.objectFieldOffset(ConcurrentHashMap.class, "transferIndex"); + private static final long BASECOUNT + = U.objectFieldOffset(ConcurrentHashMap.class, "baseCount"); + private static final long CELLSBUSY + = U.objectFieldOffset(ConcurrentHashMap.class, "cellsBusy"); + private static final long CELLVALUE + = U.objectFieldOffset(CounterCell.class, "value"); + private static final int ABASE = U.arrayBaseOffset(Node[].class); private static final int ASHIFT; static { - SIZECTL = U.objectFieldOffset - (ConcurrentHashMap.class, "sizeCtl"); - TRANSFERINDEX = U.objectFieldOffset - (ConcurrentHashMap.class, "transferIndex"); - BASECOUNT = U.objectFieldOffset - (ConcurrentHashMap.class, "baseCount"); - CELLSBUSY = U.objectFieldOffset - (ConcurrentHashMap.class, "cellsBusy"); - - CELLVALUE = U.objectFieldOffset - (CounterCell.class, "value"); - - ABASE = U.arrayBaseOffset(Node[].class); int scale = U.arrayIndexScale(Node[].class); if ((scale & (scale - 1)) != 0) throw new ExceptionInInitializerError("array index scale not a power of two"); diff --git a/src/java.base/share/classes/java/util/concurrent/Phaser.java b/src/java.base/share/classes/java/util/concurrent/Phaser.java --- a/src/java.base/share/classes/java/util/concurrent/Phaser.java +++ b/src/java.base/share/classes/java/util/concurrent/Phaser.java @@ -97,7 +97,7 @@ * associated recovery within handlers of those exceptions, * often after invoking {@code forceTermination}. Phasers may * also be used by tasks executing in a {@link ForkJoinPool}. - * Progress is ensured if the pool's parallelismLevel can + * Progress is ensured if the pool's parallelism level can * accommodate the maximum number of simultaneously blocked * parties. * diff --git a/src/java.base/share/classes/java/util/concurrent/ThreadLocalRandom.java b/src/java.base/share/classes/java/util/concurrent/ThreadLocalRandom.java --- a/src/java.base/share/classes/java/util/concurrent/ThreadLocalRandom.java +++ b/src/java.base/share/classes/java/util/concurrent/ThreadLocalRandom.java @@ -1053,18 +1053,18 @@ // Unsafe mechanics private static final Unsafe U = Unsafe.getUnsafe(); - private static final long SEED = U.objectFieldOffset - (Thread.class, "threadLocalRandomSeed"); - private static final long PROBE = U.objectFieldOffset - (Thread.class, "threadLocalRandomProbe"); - private static final long SECONDARY = U.objectFieldOffset - (Thread.class, "threadLocalRandomSecondarySeed"); - private static final long THREADLOCALS = U.objectFieldOffset - (Thread.class, "threadLocals"); - private static final long INHERITABLETHREADLOCALS = U.objectFieldOffset - (Thread.class, "inheritableThreadLocals"); - private static final long INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset - (Thread.class, "inheritedAccessControlContext"); + private static final long SEED + = U.objectFieldOffset(Thread.class, "threadLocalRandomSeed"); + private static final long PROBE + = U.objectFieldOffset(Thread.class, "threadLocalRandomProbe"); + private static final long SECONDARY + = U.objectFieldOffset(Thread.class, "threadLocalRandomSecondarySeed"); + private static final long THREADLOCALS + = U.objectFieldOffset(Thread.class, "threadLocals"); + private static final long INHERITABLETHREADLOCALS + = U.objectFieldOffset(Thread.class, "inheritableThreadLocals"); + private static final long INHERITEDACCESSCONTROLCONTEXT + = U.objectFieldOffset(Thread.class, "inheritedAccessControlContext"); /** Rarely-used holder for the second of a pair of Gaussians */ private static final ThreadLocal nextLocalGaussian = diff --git a/src/java.base/share/classes/java/util/concurrent/atomic/AtomicInteger.java b/src/java.base/share/classes/java/util/concurrent/atomic/AtomicInteger.java --- a/src/java.base/share/classes/java/util/concurrent/atomic/AtomicInteger.java +++ b/src/java.base/share/classes/java/util/concurrent/atomic/AtomicInteger.java @@ -38,6 +38,7 @@ import java.lang.invoke.VarHandle; import java.util.function.IntBinaryOperator; import java.util.function.IntUnaryOperator; +import jdk.internal.misc.Unsafe; /** * An {@code int} value that may be updated atomically. See the @@ -58,8 +59,9 @@ * This class intended to be implemented using VarHandles, but there * are unresolved cyclic startup dependencies. */ - private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe(); - private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value"); + private static final Unsafe U = Unsafe.getUnsafe(); + private static final long VALUE + = U.objectFieldOffset(AtomicInteger.class, "value"); private volatile int value; diff --git a/src/java.base/share/classes/java/util/concurrent/atomic/AtomicLong.java b/src/java.base/share/classes/java/util/concurrent/atomic/AtomicLong.java --- a/src/java.base/share/classes/java/util/concurrent/atomic/AtomicLong.java +++ b/src/java.base/share/classes/java/util/concurrent/atomic/AtomicLong.java @@ -38,6 +38,7 @@ import java.lang.invoke.VarHandle; import java.util.function.LongBinaryOperator; import java.util.function.LongUnaryOperator; +import jdk.internal.misc.Unsafe; /** * A {@code long} value that may be updated atomically. See the @@ -72,8 +73,9 @@ * This class intended to be implemented using VarHandles, but there * are unresolved cyclic startup dependencies. */ - private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe(); - private static final long VALUE = U.objectFieldOffset(AtomicLong.class, "value"); + private static final Unsafe U = Unsafe.getUnsafe(); + private static final long VALUE + = U.objectFieldOffset(AtomicLong.class, "value"); private volatile long value; diff --git a/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java b/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java --- a/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java +++ b/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java @@ -35,13 +35,12 @@ package java.util.concurrent.locks; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node; +import java.util.concurrent.ForkJoinPool; +import jdk.internal.misc.Unsafe; /** * A version of {@link AbstractQueuedSynchronizer} in @@ -73,23 +72,76 @@ * keep it that way. */ - /** - * Creates a new {@code AbstractQueuedLongSynchronizer} instance - * with initial synchronization state of zero. - */ - protected AbstractQueuedLongSynchronizer() { } + // Node status bits, also used as argument and return values + static final int WAITING = 1; // must be 1 + static final int CANCELLED = 0x80000000; // must be negative + static final int COND = 2; // in a condition wait + + /** CLH Nodes */ + abstract static class Node { + volatile Node prev; // initially attached via casTail + volatile Node next; // visibly nonnull when signallable + Thread waiter; // visibly nonnull when enqueued + volatile int status; // written by owner, atomic bit ops by others + + // methods for atomic operations + final boolean casPrev(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, PREV, c, v); + } + final boolean casNext(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, NEXT, c, v); + } + final int getAndUnsetStatus(int v) { // for signalling + return U.getAndBitwiseAndInt(this, STATUS, ~v); + } + final void setPrevRelaxed(Node p) { // for off-queue assignment + U.putReference(this, PREV, p); + } + final void setStatusRelaxed(int s) { // for off-queue assignment + U.putInt(this, STATUS, s); + } + final void clearStatus() { // for reducing unneeded signals + U.putIntOpaque(this, STATUS, 0); + } + + private static final long STATUS + = U.objectFieldOffset(Node.class, "status"); + private static final long NEXT + = U.objectFieldOffset(Node.class, "next"); + private static final long PREV + = U.objectFieldOffset(Node.class, "prev"); + } + + // Concrete classes tagged by type + static final class ExclusiveNode extends Node { } + static final class SharedNode extends Node { } + + static final class ConditionNode extends Node + implements ForkJoinPool.ManagedBlocker { + ConditionNode nextWaiter; // link to next waiting node + + /** + * Allows Conditions to be used in ForkJoinPools without + * risking fixed pool exhaustion. This is usable only for + * untimed Condition waits, not timed versions. + */ + public final boolean isReleasable() { + return status <= 1 || Thread.currentThread().isInterrupted(); + } + + public final boolean block() { + while (!isReleasable()) LockSupport.park(this); + return true; + } + } /** - * Head of the wait queue, lazily initialized. Except for - * initialization, it is modified only via method setHead. Note: - * If head exists, its waitStatus is guaranteed not to be - * CANCELLED. + * Head of the wait queue, lazily initialized. */ private transient volatile Node head; /** - * Tail of the wait queue, lazily initialized. Modified only via - * method enq to add new wait node. + * Tail of the wait queue. After initialization, modified only via casTail. */ private transient volatile Node tail; @@ -113,8 +165,7 @@ * @param newState the new state value */ protected final void setState(long newState) { - // See JDK-8180620: Clarify VarHandle mixed-access subtleties - STATE.setVolatile(this, newState); + state = newState; } /** @@ -129,481 +180,234 @@ * value was not equal to the expected value. */ protected final boolean compareAndSetState(long expect, long update) { - return STATE.compareAndSet(this, expect, update); + return U.compareAndSetLong(this, STATE, expect, update); } // Queuing utilities - /** - * The number of nanoseconds for which it is faster to spin - * rather than to use timed park. A rough estimate suffices - * to improve responsiveness with very short timeouts. - */ - static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; + private boolean casTail(Node c, Node v) { + return U.compareAndSetReference(this, TAIL, c, v); + } + + /** tries once to CAS a new dummy node for head */ + private void tryInitializeHead() { + Node h = new ExclusiveNode(); + if (U.compareAndSetReference(this, HEAD, null, h)) + tail = h; + } /** - * Inserts node into queue, initializing if necessary. See picture above. - * @param node the node to insert - * @return node's predecessor + * Enqueues the node unless null. (Currently used only for + * ConditionNodes; other cases are interleaved with acquires.) */ - private Node enq(Node node) { - for (;;) { - Node oldTail = tail; - if (oldTail != null) { - node.setPrevRelaxed(oldTail); - if (compareAndSetTail(oldTail, node)) { - oldTail.next = node; - return oldTail; + final void enqueue(Node node) { + if (node != null) { + for (;;) { + Node t = tail; + node.setPrevRelaxed(t); // avoid unnecessary fence + if (t == null) // initialize + tryInitializeHead(); + else if (casTail(t, node)) { + t.next = node; + if (t.status < 0) // wake up to clean link + LockSupport.unpark(node.waiter); + break; } - } else { - initializeSyncQueue(); } } } + /** Returns true if node is found in traversal from tail */ + final boolean isEnqueued(Node node) { + for (Node t = tail; t != null; t = t.prev) + if (t == node) + return true; + return false; + } + /** - * Creates and enqueues node for current thread and given mode. + * Wakes up the successor of given node, if one exists, and unsets its + * WAITING status to avoid park race. This may fail to wake up an + * eligible thread when one or more have been cancelled, but + * cancelAcquire ensures liveness. + */ + private static void signalNext(Node h) { + Node s; + if (h != null && (s = h.next) != null && s.status != 0) { + s.getAndUnsetStatus(WAITING); + LockSupport.unpark(s.waiter); + } + } + + /** Wakes up the given node if in shared mode */ + private static void signalNextIfShared(Node h) { + Node s; + if (h != null && (s = h.next) != null && + (s instanceof SharedNode) && s.status != 0) { + s.getAndUnsetStatus(WAITING); + LockSupport.unpark(s.waiter); + } + } + + /** + * Main acquire method, invoked by all exported acquire methods. * - * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared - * @return the new node + * @param node null unless a reacquiring Condition + * @param arg the acquire argument + * @param shared true if shared mode else exclusive + * @param interruptible if abort and return negative on interrupt + * @param timed if true use timed waits + * @param time if timed, the System.nanoTime value to timeout + * @return positive if acquired, 0 if timed out, negative if interrupted */ - private Node addWaiter(Node mode) { - Node node = new Node(mode); + final int acquire(Node node, long arg, boolean shared, + boolean interruptible, boolean timed, long time) { + Thread current = Thread.currentThread(); + byte spins = 0, postSpins = 0; // retries upon unpark of first thread + boolean interrupted = false, first = false; + Node pred = null; // predecessor of node when enqueued + + /* + * Repeatedly: + * Check if node now first + * if so, ensure head stable, else ensure valid predecessor + * if node is first or not yet enqueued, try acquiring + * else if node not yet created, create it + * else if not yet enqueued, try once to enqueue + * else if woken from park, retry (up to postSpins times) + * else if WAITING status not set, set and retry + * else park and clear WAITING status, and check cancellation + */ for (;;) { - Node oldTail = tail; - if (oldTail != null) { - node.setPrevRelaxed(oldTail); - if (compareAndSetTail(oldTail, node)) { - oldTail.next = node; - return node; + if (!first && (pred = (node == null) ? null : node.prev) != null && + !(first = (head == pred))) { + if (pred.status < 0) { + cleanQueue(); // predecessor cancelled + continue; + } else if (pred.prev == null) { + Thread.onSpinWait(); // ensure serialization + continue; } + } + if (first || pred == null) { + boolean acquired; + try { + if (shared) + acquired = (tryAcquireShared(arg) >= 0); + else + acquired = tryAcquire(arg); + } catch (Throwable ex) { + cancelAcquire(node, interrupted, false); + throw ex; + } + if (acquired) { + if (first) { + node.prev = null; + head = node; + pred.next = null; + node.waiter = null; + if (shared) + signalNextIfShared(node); + if (interrupted) + current.interrupt(); + } + return 1; + } + } + if (node == null) { // allocate; retry before enqueue + if (shared) + node = new SharedNode(); + else + node = new ExclusiveNode(); + } else if (pred == null) { // try to enqueue + node.waiter = current; + Node t = tail; + node.setPrevRelaxed(t); // avoid unnecessary fence + if (t == null) + tryInitializeHead(); + else if (!casTail(t, node)) + node.setPrevRelaxed(null); // back out + else + t.next = node; + } else if (first && spins != 0) { + --spins; // reduce unfairness on rewaits + Thread.onSpinWait(); + } else if (node.status == 0) { + node.status = WAITING; // enable signal and recheck } else { - initializeSyncQueue(); + long nanos; + spins = postSpins = (byte)((postSpins << 1) | 1); + if (!timed) + LockSupport.park(this); + else if ((nanos = time - System.nanoTime()) > 0L) + LockSupport.parkNanos(this, nanos); + else + break; + node.clearStatus(); + if ((interrupted |= Thread.interrupted()) && interruptible) + break; + } + } + return cancelAcquire(node, interrupted, interruptible); + } + + /** + * Possibly repeatedly traverses from tail, unsplicing cancelled + * nodes until none are found. + */ + private void cleanQueue() { + for (;;) { // restart point + for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples + if (q == null || (p = q.prev) == null) + return; // end of list + if (s == null ? tail != q : (s.prev != q || s.status < 0)) + break; // inconsistent + if (q.status < 0) { // cancelled + if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && + q.prev == p) { + p.casNext(q, s); // OK if fails + if (p.prev == null) + signalNext(p); + } + break; + } + if ((n = p.next) != q) { // help finish + if (n != null && q.prev == p) { + p.casNext(n, q); + if (p.prev == null) + signalNext(p); + } + break; + } + s = q; + q = q.prev; } } } /** - * Sets head of queue to be node, thus dequeuing. Called only by - * acquire methods. Also nulls out unused fields for sake of GC - * and to suppress unnecessary signals and traversals. - * - * @param node the node - */ - private void setHead(Node node) { - head = node; - node.thread = null; - node.prev = null; - } - - /** - * Wakes up node's successor, if one exists. - * - * @param node the node - */ - private void unparkSuccessor(Node node) { - /* - * If status is negative (i.e., possibly needing signal) try - * to clear in anticipation of signalling. It is OK if this - * fails or if status is changed by waiting thread. - */ - int ws = node.waitStatus; - if (ws < 0) - node.compareAndSetWaitStatus(ws, 0); - - /* - * Thread to unpark is held in successor, which is normally - * just the next node. But if cancelled or apparently null, - * traverse backwards from tail to find the actual - * non-cancelled successor. - */ - Node s = node.next; - if (s == null || s.waitStatus > 0) { - s = null; - for (Node p = tail; p != node && p != null; p = p.prev) - if (p.waitStatus <= 0) - s = p; - } - if (s != null) - LockSupport.unpark(s.thread); - } - - /** - * Release action for shared mode -- signals successor and ensures - * propagation. (Note: For exclusive mode, release just amounts - * to calling unparkSuccessor of head if it needs signal.) - */ - private void doReleaseShared() { - /* - * Ensure that a release propagates, even if there are other - * in-progress acquires/releases. This proceeds in the usual - * way of trying to unparkSuccessor of head if it needs - * signal. But if it does not, status is set to PROPAGATE to - * ensure that upon release, propagation continues. - * Additionally, we must loop in case a new node is added - * while we are doing this. Also, unlike other uses of - * unparkSuccessor, we need to know if CAS to reset status - * fails, if so rechecking. - */ - for (;;) { - Node h = head; - if (h != null && h != tail) { - int ws = h.waitStatus; - if (ws == Node.SIGNAL) { - if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) - continue; // loop to recheck cases - unparkSuccessor(h); - } - else if (ws == 0 && - !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) - continue; // loop on failed CAS - } - if (h == head) // loop if head changed - break; - } - } - - /** - * Sets head of queue, and checks if successor may be waiting - * in shared mode, if so propagating if either propagate > 0 or - * PROPAGATE status was set. - * - * @param node the node - * @param propagate the return value from a tryAcquireShared - */ - private void setHeadAndPropagate(Node node, long propagate) { - Node h = head; // Record old head for check below - setHead(node); - /* - * Try to signal next queued node if: - * Propagation was indicated by caller, - * or was recorded (as h.waitStatus either before - * or after setHead) by a previous operation - * (note: this uses sign-check of waitStatus because - * PROPAGATE status may transition to SIGNAL.) - * and - * The next node is waiting in shared mode, - * or we don't know, because it appears null - * - * The conservatism in both of these checks may cause - * unnecessary wake-ups, but only when there are multiple - * racing acquires/releases, so most need signals now or soon - * anyway. - */ - if (propagate > 0 || h == null || h.waitStatus < 0 || - (h = head) == null || h.waitStatus < 0) { - Node s = node.next; - if (s == null || s.isShared()) - doReleaseShared(); - } - } - - // Utilities for various versions of acquire - - /** * Cancels an ongoing attempt to acquire. * - * @param node the node - */ - private void cancelAcquire(Node node) { - // Ignore if node doesn't exist - if (node == null) - return; - - node.thread = null; - - // Skip cancelled predecessors - Node pred = node.prev; - while (pred.waitStatus > 0) - node.prev = pred = pred.prev; - - // predNext is the apparent node to unsplice. CASes below will - // fail if not, in which case, we lost race vs another cancel - // or signal, so no further action is necessary, although with - // a possibility that a cancelled node may transiently remain - // reachable. - Node predNext = pred.next; - - // Can use unconditional write instead of CAS here. - // After this atomic step, other Nodes can skip past us. - // Before, we are free of interference from other threads. - node.waitStatus = Node.CANCELLED; - - // If we are the tail, remove ourselves. - if (node == tail && compareAndSetTail(node, pred)) { - pred.compareAndSetNext(predNext, null); - } else { - // If successor needs signal, try to set pred's next-link - // so it will get one. Otherwise wake it up to propagate. - int ws; - if (pred != head && - ((ws = pred.waitStatus) == Node.SIGNAL || - (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && - pred.thread != null) { - Node next = node.next; - if (next != null && next.waitStatus <= 0) - pred.compareAndSetNext(predNext, next); - } else { - unparkSuccessor(node); - } - - node.next = node; // help GC - } - } - - /** - * Checks and updates status for a node that failed to acquire. - * Returns true if thread should block. This is the main signal - * control in all acquire loops. Requires that pred == node.prev. - * - * @param pred node's predecessor holding status - * @param node the node - * @return {@code true} if thread should block - */ - private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { - int ws = pred.waitStatus; - if (ws == Node.SIGNAL) - /* - * This node has already set status asking a release - * to signal it, so it can safely park. - */ - return true; - if (ws > 0) { - /* - * Predecessor was cancelled. Skip over predecessors and - * indicate retry. - */ - do { - node.prev = pred = pred.prev; - } while (pred.waitStatus > 0); - pred.next = node; - } else { - /* - * waitStatus must be 0 or PROPAGATE. Indicate that we - * need a signal, but don't park yet. Caller will need to - * retry to make sure it cannot acquire before parking. - */ - pred.compareAndSetWaitStatus(ws, Node.SIGNAL); - } - return false; - } - - /** - * Convenience method to interrupt current thread. - */ - static void selfInterrupt() { - Thread.currentThread().interrupt(); - } - - /** - * Convenience method to park and then check if interrupted. - * - * @return {@code true} if interrupted - */ - private final boolean parkAndCheckInterrupt() { - LockSupport.park(this); - return Thread.interrupted(); - } - - /* - * Various flavors of acquire, varying in exclusive/shared and - * control modes. Each is mostly the same, but annoyingly - * different. Only a little bit of factoring is possible due to - * interactions of exception mechanics (including ensuring that we - * cancel if tryAcquire throws exception) and other control, at - * least not without hurting performance too much. - */ - - /** - * Acquires in exclusive uninterruptible mode for thread already in - * queue. Used by condition wait methods as well as acquire. - * - * @param node the node - * @param arg the acquire argument - * @return {@code true} if interrupted while waiting - */ - final boolean acquireQueued(final Node node, long arg) { - boolean interrupted = false; - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head && tryAcquire(arg)) { - setHead(node); - p.next = null; // help GC - return interrupted; - } - if (shouldParkAfterFailedAcquire(p, node)) - interrupted |= parkAndCheckInterrupt(); - } - } catch (Throwable t) { - cancelAcquire(node); - if (interrupted) - selfInterrupt(); - throw t; - } - } - - /** - * Acquires in exclusive interruptible mode. - * @param arg the acquire argument + * @param node the node (may be null if cancelled before enqueuing) + * @param interrupted true if thread interrupted + * @param interruptible if should report interruption vs reset */ - private void doAcquireInterruptibly(long arg) - throws InterruptedException { - final Node node = addWaiter(Node.EXCLUSIVE); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head && tryAcquire(arg)) { - setHead(node); - p.next = null; // help GC - return; - } - if (shouldParkAfterFailedAcquire(p, node) && - parkAndCheckInterrupt()) - throw new InterruptedException(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; - } - } - - /** - * Acquires in exclusive timed mode. - * - * @param arg the acquire argument - * @param nanosTimeout max wait time - * @return {@code true} if acquired - */ - private boolean doAcquireNanos(long arg, long nanosTimeout) - throws InterruptedException { - if (nanosTimeout <= 0L) - return false; - final long deadline = System.nanoTime() + nanosTimeout; - final Node node = addWaiter(Node.EXCLUSIVE); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head && tryAcquire(arg)) { - setHead(node); - p.next = null; // help GC - return true; - } - nanosTimeout = deadline - System.nanoTime(); - if (nanosTimeout <= 0L) { - cancelAcquire(node); - return false; - } - if (shouldParkAfterFailedAcquire(p, node) && - nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if (Thread.interrupted()) - throw new InterruptedException(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; + private int cancelAcquire(Node node, boolean interrupted, + boolean interruptible) { + if (node != null) { + node.waiter = null; + node.status = CANCELLED; + if (node.prev != null) + cleanQueue(); } - } - - /** - * Acquires in shared uninterruptible mode. - * @param arg the acquire argument - */ - private void doAcquireShared(long arg) { - final Node node = addWaiter(Node.SHARED); - boolean interrupted = false; - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head) { - long r = tryAcquireShared(arg); - if (r >= 0) { - setHeadAndPropagate(node, r); - p.next = null; // help GC - return; - } - } - if (shouldParkAfterFailedAcquire(p, node)) - interrupted |= parkAndCheckInterrupt(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; - } finally { - if (interrupted) - selfInterrupt(); + if (interrupted) { + if (interruptible) + return CANCELLED; + else + Thread.currentThread().interrupt(); } - } - - /** - * Acquires in shared interruptible mode. - * @param arg the acquire argument - */ - private void doAcquireSharedInterruptibly(long arg) - throws InterruptedException { - final Node node = addWaiter(Node.SHARED); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head) { - long r = tryAcquireShared(arg); - if (r >= 0) { - setHeadAndPropagate(node, r); - p.next = null; // help GC - return; - } - } - if (shouldParkAfterFailedAcquire(p, node) && - parkAndCheckInterrupt()) - throw new InterruptedException(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; - } - } - - /** - * Acquires in shared timed mode. - * - * @param arg the acquire argument - * @param nanosTimeout max wait time - * @return {@code true} if acquired - */ - private boolean doAcquireSharedNanos(long arg, long nanosTimeout) - throws InterruptedException { - if (nanosTimeout <= 0L) - return false; - final long deadline = System.nanoTime() + nanosTimeout; - final Node node = addWaiter(Node.SHARED); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head) { - long r = tryAcquireShared(arg); - if (r >= 0) { - setHeadAndPropagate(node, r); - p.next = null; // help GC - return true; - } - } - nanosTimeout = deadline - System.nanoTime(); - if (nanosTimeout <= 0L) { - cancelAcquire(node); - return false; - } - if (shouldParkAfterFailedAcquire(p, node) && - nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if (Thread.interrupted()) - throw new InterruptedException(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; - } + return 0; } // Main exported methods @@ -756,9 +560,8 @@ * can represent anything you like. */ public final void acquire(long arg) { - if (!tryAcquire(arg) && - acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) - selfInterrupt(); + if (!tryAcquire(arg)) + acquire(null, arg, false, false, false, 0L); } /** @@ -776,11 +579,10 @@ * @throws InterruptedException if the current thread is interrupted */ public final void acquireInterruptibly(long arg) - throws InterruptedException { - if (Thread.interrupted()) + throws InterruptedException { + if (Thread.interrupted() || + (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) throw new InterruptedException(); - if (!tryAcquire(arg)) - doAcquireInterruptibly(arg); } /** @@ -801,11 +603,20 @@ * @throws InterruptedException if the current thread is interrupted */ public final boolean tryAcquireNanos(long arg, long nanosTimeout) - throws InterruptedException { - if (Thread.interrupted()) - throw new InterruptedException(); - return tryAcquire(arg) || - doAcquireNanos(arg, nanosTimeout); + throws InterruptedException { + if (!Thread.interrupted()) { + if (tryAcquire(arg)) + return true; + if (nanosTimeout <= 0L) + return false; + int stat = acquire(null, arg, false, true, true, + System.nanoTime() + nanosTimeout); + if (stat > 0) + return true; + if (stat == 0) + return false; + } + throw new InterruptedException(); } /** @@ -820,9 +631,7 @@ */ public final boolean release(long arg) { if (tryRelease(arg)) { - Node h = head; - if (h != null && h.waitStatus != 0) - unparkSuccessor(h); + signalNext(head); return true; } return false; @@ -841,7 +650,7 @@ */ public final void acquireShared(long arg) { if (tryAcquireShared(arg) < 0) - doAcquireShared(arg); + acquire(null, arg, true, false, false, 0L); } /** @@ -858,11 +667,11 @@ * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(long arg) - throws InterruptedException { - if (Thread.interrupted()) + throws InterruptedException { + if (Thread.interrupted() || + (tryAcquireShared(arg) < 0 && + acquire(null, arg, true, true, false, 0L) < 0)) throw new InterruptedException(); - if (tryAcquireShared(arg) < 0) - doAcquireSharedInterruptibly(arg); } /** @@ -883,10 +692,19 @@ */ public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) throws InterruptedException { - if (Thread.interrupted()) - throw new InterruptedException(); - return tryAcquireShared(arg) >= 0 || - doAcquireSharedNanos(arg, nanosTimeout); + if (!Thread.interrupted()) { + if (tryAcquireShared(arg) >= 0) + return true; + if (nanosTimeout <= 0L) + return false; + int stat = acquire(null, arg, true, true, true, + System.nanoTime() + nanosTimeout); + if (stat > 0) + return true; + if (stat == 0) + return false; + } + throw new InterruptedException(); } /** @@ -900,7 +718,7 @@ */ public final boolean releaseShared(long arg) { if (tryReleaseShared(arg)) { - doReleaseShared(); + signalNext(head); return true; } return false; @@ -918,7 +736,7 @@ */ public final boolean hasQueuedThreads() { for (Node p = tail, h = head; p != h && p != null; p = p.prev) - if (p.waitStatus <= 0) + if (p.status >= 0) return true; return false; } @@ -948,45 +766,16 @@ * {@code null} if no threads are currently queued */ public final Thread getFirstQueuedThread() { - // handle only fast path, else relay - return (head == tail) ? null : fullGetFirstQueuedThread(); - } - - /** - * Version of getFirstQueuedThread called when fastpath fails. - */ - private Thread fullGetFirstQueuedThread() { - /* - * The first node is normally head.next. Try to get its - * thread field, ensuring consistent reads: If thread - * field is nulled out or s.prev is no longer head, then - * some other thread(s) concurrently performed setHead in - * between some of our reads. We try this twice before - * resorting to traversal. - */ - Node h, s; - Thread st; - if (((h = head) != null && (s = h.next) != null && - s.prev == head && (st = s.thread) != null) || - ((h = head) != null && (s = h.next) != null && - s.prev == head && (st = s.thread) != null)) - return st; - - /* - * Head's next field might not have been set yet, or may have - * been unset after setHead. So we must check to see if tail - * is actually first node. If not, we continue on, safely - * traversing from tail back to head to find first, - * guaranteeing termination. - */ - - Thread firstThread = null; - for (Node p = tail; p != null && p != head; p = p.prev) { - Thread t = p.thread; - if (t != null) - firstThread = t; + Thread first = null, w; Node h, s; + if ((h = head) != null && ((s = h.next) == null || + (first = s.waiter) == null || + s.prev == null)) { + // traverse from tail on stale reads + for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) + if ((w = p.waiter) != null) + first = w; } - return firstThread; + return first; } /** @@ -1003,7 +792,7 @@ if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) - if (p.thread == thread) + if (p.waiter == thread) return true; return false; } @@ -1019,10 +808,8 @@ */ final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; - return (h = head) != null && - (s = h.next) != null && - !s.isShared() && - s.thread != null; + return (h = head) != null && (s = h.next) != null && + !(s instanceof SharedNode) && s.waiter != null; } /** @@ -1052,7 +839,7 @@ * synchronizer might look like this: * *
 {@code
-     * protected boolean tryAcquire(int arg) {
+     * protected boolean tryAcquire(long arg) {
      *   if (isHeldExclusively()) {
      *     // A reentrant acquire; increment hold count
      *     return true;
@@ -1069,19 +856,12 @@
      * @since 1.7
      */
     public final boolean hasQueuedPredecessors() {
-        Node h, s;
-        if ((h = head) != null) {
-            if ((s = h.next) == null || s.waitStatus > 0) {
-                s = null; // traverse in case of concurrent cancellation
-                for (Node p = tail; p != h && p != null; p = p.prev) {
-                    if (p.waitStatus <= 0)
-                        s = p;
-                }
-            }
-            if (s != null && s.thread != Thread.currentThread())
-                return true;
-        }
-        return false;
+        Thread first = null; Node h, s;
+        if ((h = head) != null && ((s = h.next) == null ||
+                                   (first = s.waiter) == null ||
+                                   s.prev == null))
+            first = getFirstQueuedThread(); // retry via getFirstQueuedThread
+        return first != null && first != Thread.currentThread();
     }
 
     // Instrumentation and monitoring methods
@@ -1098,7 +878,7 @@
     public final int getQueueLength() {
         int n = 0;
         for (Node p = tail; p != null; p = p.prev) {
-            if (p.thread != null)
+            if (p.waiter != null)
                 ++n;
         }
         return n;
@@ -1118,7 +898,7 @@
     public final Collection getQueuedThreads() {
         ArrayList list = new ArrayList<>();
         for (Node p = tail; p != null; p = p.prev) {
-            Thread t = p.thread;
+            Thread t = p.waiter;
             if (t != null)
                 list.add(t);
         }
@@ -1136,8 +916,8 @@
     public final Collection getExclusiveQueuedThreads() {
         ArrayList list = new ArrayList<>();
         for (Node p = tail; p != null; p = p.prev) {
-            if (!p.isShared()) {
-                Thread t = p.thread;
+            if (!(p instanceof SharedNode)) {
+                Thread t = p.waiter;
                 if (t != null)
                     list.add(t);
             }
@@ -1156,8 +936,8 @@
     public final Collection getSharedQueuedThreads() {
         ArrayList list = new ArrayList<>();
         for (Node p = tail; p != null; p = p.prev) {
-            if (p.isShared()) {
-                Thread t = p.thread;
+            if (p instanceof SharedNode) {
+                Thread t = p.waiter;
                 if (t != null)
                     list.add(t);
             }
@@ -1180,117 +960,6 @@
             + (hasQueuedThreads() ? "non" : "") + "empty queue]";
     }
 
-
-    // Internal support methods for Conditions
-
-    /**
-     * Returns true if a node, always one that was initially placed on
-     * a condition queue, is now waiting to reacquire on sync queue.
-     * @param node the node
-     * @return true if is reacquiring
-     */
-    final boolean isOnSyncQueue(Node node) {
-        if (node.waitStatus == Node.CONDITION || node.prev == null)
-            return false;
-        if (node.next != null) // If has successor, it must be on queue
-            return true;
-        /*
-         * node.prev can be non-null, but not yet on queue because
-         * the CAS to place it on queue can fail. So we have to
-         * traverse from tail to make sure it actually made it.  It
-         * will always be near the tail in calls to this method, and
-         * unless the CAS failed (which is unlikely), it will be
-         * there, so we hardly ever traverse much.
-         */
-        return findNodeFromTail(node);
-    }
-
-    /**
-     * Returns true if node is on sync queue by searching backwards from tail.
-     * Called only when needed by isOnSyncQueue.
-     * @return true if present
-     */
-    private boolean findNodeFromTail(Node node) {
-        // We check for node first, since it's likely to be at or near tail.
-        // tail is known to be non-null, so we could re-order to "save"
-        // one null check, but we leave it this way to help the VM.
-        for (Node p = tail;;) {
-            if (p == node)
-                return true;
-            if (p == null)
-                return false;
-            p = p.prev;
-        }
-    }
-
-    /**
-     * Transfers a node from a condition queue onto sync queue.
-     * Returns true if successful.
-     * @param node the node
-     * @return true if successfully transferred (else the node was
-     * cancelled before signal)
-     */
-    final boolean transferForSignal(Node node) {
-        /*
-         * If cannot change waitStatus, the node has been cancelled.
-         */
-        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
-            return false;
-
-        /*
-         * Splice onto queue and try to set waitStatus of predecessor to
-         * indicate that thread is (probably) waiting. If cancelled or
-         * attempt to set waitStatus fails, wake up to resync (in which
-         * case the waitStatus can be transiently and harmlessly wrong).
-         */
-        Node p = enq(node);
-        int ws = p.waitStatus;
-        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
-            LockSupport.unpark(node.thread);
-        return true;
-    }
-
-    /**
-     * Transfers node, if necessary, to sync queue after a cancelled wait.
-     * Returns true if thread was cancelled before being signalled.
-     *
-     * @param node the node
-     * @return true if cancelled before the node was signalled
-     */
-    final boolean transferAfterCancelledWait(Node node) {
-        if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
-            enq(node);
-            return true;
-        }
-        /*
-         * If we lost out to a signal(), then we can't proceed
-         * until it finishes its enq().  Cancelling during an
-         * incomplete transfer is both rare and transient, so just
-         * spin.
-         */
-        while (!isOnSyncQueue(node))
-            Thread.yield();
-        return false;
-    }
-
-    /**
-     * Invokes release with current state value; returns saved state.
-     * Cancels node and throws exception on failure.
-     * @param node the condition node for this wait
-     * @return previous sync state
-     */
-    final long fullyRelease(Node node) {
-        try {
-            long savedState = getState();
-            if (release(savedState))
-                return savedState;
-            throw new IllegalMonitorStateException();
-        } catch (Throwable t) {
-            node.waitStatus = Node.CANCELLED;
-            throw t;
-        }
-    }
-
     // Instrumentation methods for conditions
 
     /**
@@ -1384,112 +1053,38 @@
      *
      * 

This class is Serializable, but all fields are transient, * so deserialized conditions have no waiters. - * - * @since 1.6 */ public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ - private transient Node firstWaiter; + private transient ConditionNode firstWaiter; /** Last node of condition queue. */ - private transient Node lastWaiter; + private transient ConditionNode lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } - // Internal methods - - /** - * Adds a new waiter to wait queue. - * @return its new wait node - */ - private Node addConditionWaiter() { - if (!isHeldExclusively()) - throw new IllegalMonitorStateException(); - Node t = lastWaiter; - // If lastWaiter is cancelled, clean out. - if (t != null && t.waitStatus != Node.CONDITION) { - unlinkCancelledWaiters(); - t = lastWaiter; - } - - Node node = new Node(Node.CONDITION); - - if (t == null) - firstWaiter = node; - else - t.nextWaiter = node; - lastWaiter = node; - return node; - } - - /** - * Removes and transfers nodes until hit non-cancelled one or - * null. Split out from signal in part to encourage compilers - * to inline the case of no waiters. - * @param first (non-null) the first node on condition queue - */ - private void doSignal(Node first) { - do { - if ( (firstWaiter = first.nextWaiter) == null) - lastWaiter = null; - first.nextWaiter = null; - } while (!transferForSignal(first) && - (first = firstWaiter) != null); - } + // Signalling methods /** - * Removes and transfers all nodes. - * @param first (non-null) the first node on condition queue + * Removes and transfers one or all waiters to sync queue. */ - private void doSignalAll(Node first) { - lastWaiter = firstWaiter = null; - do { - Node next = first.nextWaiter; - first.nextWaiter = null; - transferForSignal(first); + private void doSignal(ConditionNode first, boolean all) { + while (first != null) { + ConditionNode next = first.nextWaiter; + if ((firstWaiter = next) == null) + lastWaiter = null; + if ((first.getAndUnsetStatus(COND) & COND) != 0) { + enqueue(first); + if (!all) + break; + } first = next; - } while (first != null); - } - - /** - * Unlinks cancelled waiter nodes from condition queue. - * Called only while holding lock. This is called when - * cancellation occurred during condition wait, and upon - * insertion of a new waiter when lastWaiter is seen to have - * been cancelled. This method is needed to avoid garbage - * retention in the absence of signals. So even though it may - * require a full traversal, it comes into play only when - * timeouts or cancellations occur in the absence of - * signals. It traverses all nodes rather than stopping at a - * particular target to unlink all pointers to garbage nodes - * without requiring many re-traversals during cancellation - * storms. - */ - private void unlinkCancelledWaiters() { - Node t = firstWaiter; - Node trail = null; - while (t != null) { - Node next = t.nextWaiter; - if (t.waitStatus != Node.CONDITION) { - t.nextWaiter = null; - if (trail == null) - firstWaiter = next; - else - trail.nextWaiter = next; - if (next == null) - lastWaiter = trail; - } - else - trail = t; - t = next; } } - // public methods - /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the @@ -1499,11 +1094,11 @@ * returns {@code false} */ public final void signal() { + ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - Node first = firstWaiter; if (first != null) - doSignal(first); + doSignal(first, false); } /** @@ -1514,11 +1109,72 @@ * returns {@code false} */ public final void signalAll() { + ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - Node first = firstWaiter; if (first != null) - doSignalAll(first); + doSignal(first, true); + } + + // Waiting methods + + /** + * Adds node to condition list and releases lock. + * + * @param node the node + * @return savedState to reacquire after wait + */ + private long enableWait(ConditionNode node) { + if (isHeldExclusively()) { + node.waiter = Thread.currentThread(); + node.setStatusRelaxed(COND | WAITING); + ConditionNode last = lastWaiter; + if (last == null) + firstWaiter = node; + else + last.nextWaiter = node; + lastWaiter = node; + long savedState = getState(); + if (release(savedState)) + return savedState; + } + node.status = CANCELLED; // lock not held or inconsistent + throw new IllegalMonitorStateException(); + } + + /** + * Returns true if a node that was initially placed on a condition + * queue is now ready to reacquire on sync queue. + * @param node the node + * @return true if is reacquiring + */ + private boolean canReacquire(ConditionNode node) { + // check links, not status to avoid enqueue race + return node != null && node.prev != null && isEnqueued(node); + } + + /** + * Unlinks the given node and other non-waiting nodes from + * condition queue unless already unlinked. + */ + private void unlinkCancelledWaiters(ConditionNode node) { + if (node == null || node.nextWaiter != null || node == lastWaiter) { + ConditionNode w = firstWaiter, trail = null; + while (w != null) { + ConditionNode next = w.nextWaiter; + if ((w.status & COND) == 0) { + w.nextWaiter = null; + if (trail == null) + firstWaiter = next; + else + trail.nextWaiter = next; + if (next == null) + lastWaiter = trail; + } else + trail = w; + w = next; + } + } } /** @@ -1533,51 +1189,27 @@ * */ public final void awaitUninterruptibly() { - Node node = addConditionWaiter(); - long savedState = fullyRelease(node); + ConditionNode node = new ConditionNode(); + long savedState = enableWait(node); + LockSupport.setCurrentBlocker(this); // for back-compatibility boolean interrupted = false; - while (!isOnSyncQueue(node)) { - LockSupport.park(this); + while (!canReacquire(node)) { if (Thread.interrupted()) interrupted = true; + else if ((node.status & COND) != 0) { + try { + ForkJoinPool.managedBlock(node); + } catch (InterruptedException ie) { + interrupted = true; + } + } else + Thread.onSpinWait(); // awoke while enqueuing } - if (acquireQueued(node, savedState) || interrupted) - selfInterrupt(); - } - - /* - * For interruptible waits, we need to track whether to throw - * InterruptedException, if interrupted while blocked on - * condition, versus reinterrupt current thread, if - * interrupted while blocked waiting to re-acquire. - */ - - /** Mode meaning to reinterrupt on exit from wait */ - private static final int REINTERRUPT = 1; - /** Mode meaning to throw InterruptedException on exit from wait */ - private static final int THROW_IE = -1; - - /** - * Checks for interrupt, returning THROW_IE if interrupted - * before signalled, REINTERRUPT if after signalled, or - * 0 if not interrupted. - */ - private int checkInterruptWhileWaiting(Node node) { - return Thread.interrupted() ? - (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : - 0; - } - - /** - * Throws InterruptedException, reinterrupts current thread, or - * does nothing, depending on mode. - */ - private void reportInterruptAfterWait(int interruptMode) - throws InterruptedException { - if (interruptMode == THROW_IE) - throw new InterruptedException(); - else if (interruptMode == REINTERRUPT) - selfInterrupt(); + LockSupport.setCurrentBlocker(null); + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (interrupted) + Thread.currentThread().interrupt(); } /** @@ -1596,20 +1228,33 @@ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); - Node node = addConditionWaiter(); - long savedState = fullyRelease(node); - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - LockSupport.park(this); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; + ConditionNode node = new ConditionNode(); + long savedState = enableWait(node); + LockSupport.setCurrentBlocker(this); // for back-compatibility + boolean interrupted = false, cancelled = false; + while (!canReacquire(node)) { + if (interrupted |= Thread.interrupted()) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; // else interrupted after signal + } else if ((node.status & COND) != 0) { + try { + ForkJoinPool.managedBlock(node); + } catch (InterruptedException ie) { + interrupted = true; + } + } else + Thread.onSpinWait(); // awoke while enqueuing } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) // clean up if cancelled - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); + LockSupport.setCurrentBlocker(null); + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (interrupted) { + if (cancelled) { + unlinkCancelledWaiters(node); + throw new InterruptedException(); + } + Thread.currentThread().interrupt(); + } } /** @@ -1629,32 +1274,29 @@ throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); - // We don't check for nanosTimeout <= 0L here, to allow - // awaitNanos(0) as a way to "yield the lock". - final long deadline = System.nanoTime() + nanosTimeout; - long initialNanos = nanosTimeout; - Node node = addConditionWaiter(); - long savedState = fullyRelease(node); - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - if (nanosTimeout <= 0L) { - transferAfterCancelledWait(node); - break; - } - if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; - nanosTimeout = deadline - System.nanoTime(); + ConditionNode node = new ConditionNode(); + long savedState = enableWait(node); + long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; + long deadline = System.nanoTime() + nanos; + boolean cancelled = false, interrupted = false; + while (!canReacquire(node)) { + if ((interrupted |= Thread.interrupted()) || + (nanos = deadline - System.nanoTime()) <= 0L) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; + } else + LockSupport.parkNanos(this, nanos); } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (cancelled) { + unlinkCancelledWaiters(node); + if (interrupted) + throw new InterruptedException(); + } else if (interrupted) + Thread.currentThread().interrupt(); long remaining = deadline - System.nanoTime(); // avoid overflow - return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; + return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; } /** @@ -1676,26 +1318,26 @@ long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); - Node node = addConditionWaiter(); - long savedState = fullyRelease(node); - boolean timedout = false; - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - if (System.currentTimeMillis() >= abstime) { - timedout = transferAfterCancelledWait(node); - break; - } - LockSupport.parkUntil(this, abstime); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; + ConditionNode node = new ConditionNode(); + long savedState = enableWait(node); + boolean cancelled = false, interrupted = false; + while (!canReacquire(node)) { + if ((interrupted |= Thread.interrupted()) || + System.currentTimeMillis() >= abstime) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; + } else + LockSupport.parkUntil(this, abstime); } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); - return !timedout; + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (cancelled) { + unlinkCancelledWaiters(node); + if (interrupted) + throw new InterruptedException(); + } else if (interrupted) + Thread.currentThread().interrupt(); + return !cancelled; } /** @@ -1717,31 +1359,28 @@ long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); - // We don't check for nanosTimeout <= 0L here, to allow - // await(0, unit) as a way to "yield the lock". - final long deadline = System.nanoTime() + nanosTimeout; - Node node = addConditionWaiter(); - long savedState = fullyRelease(node); - boolean timedout = false; - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - if (nanosTimeout <= 0L) { - timedout = transferAfterCancelledWait(node); - break; - } - if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; - nanosTimeout = deadline - System.nanoTime(); + ConditionNode node = new ConditionNode(); + long savedState = enableWait(node); + long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; + long deadline = System.nanoTime() + nanos; + boolean cancelled = false, interrupted = false; + while (!canReacquire(node)) { + if ((interrupted |= Thread.interrupted()) || + (nanos = deadline - System.nanoTime()) <= 0L) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; + } else + LockSupport.parkNanos(this, nanos); } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); - return !timedout; + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (cancelled) { + unlinkCancelledWaiters(node); + if (interrupted) + throw new InterruptedException(); + } else if (interrupted) + Thread.currentThread().interrupt(); + return !cancelled; } // support for instrumentation @@ -1767,8 +1406,8 @@ protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - for (Node w = firstWaiter; w != null; w = w.nextWaiter) { - if (w.waitStatus == Node.CONDITION) + for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { + if ((w.status & COND) != 0) return true; } return false; @@ -1787,8 +1426,8 @@ if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; - for (Node w = firstWaiter; w != null; w = w.nextWaiter) { - if (w.waitStatus == Node.CONDITION) + for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { + if ((w.status & COND) != 0) ++n; } return n; @@ -1807,9 +1446,9 @@ if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList list = new ArrayList<>(); - for (Node w = firstWaiter; w != null; w = w.nextWaiter) { - if (w.waitStatus == Node.CONDITION) { - Thread t = w.thread; + for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { + if ((w.status & COND) != 0) { + Thread t = w.waiter; if (t != null) list.add(t); } @@ -1818,39 +1457,16 @@ } } - // VarHandle mechanics - private static final VarHandle STATE; - private static final VarHandle HEAD; - private static final VarHandle TAIL; + // Unsafe + private static final Unsafe U = Unsafe.getUnsafe(); + private static final long STATE + = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state"); + private static final long HEAD + = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head"); + private static final long TAIL + = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail"); static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - STATE = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "state", long.class); - HEAD = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "head", Node.class); - TAIL = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "tail", Node.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } - - // Reduce the risk of rare disastrous classloading in first call to - // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class ensureLoaded = LockSupport.class; } - - /** - * Initializes head and tail fields on first contention. - */ - private final void initializeSyncQueue() { - Node h; - if (HEAD.compareAndSet(this, null, (h = new Node()))) - tail = h; - } - - /** - * CASes tail field. - */ - private final boolean compareAndSetTail(Node expect, Node update) { - return TAIL.compareAndSet(this, expect, update); - } } diff --git a/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java b/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java --- a/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java +++ b/src/java.base/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java @@ -35,12 +35,12 @@ package java.util.concurrent.locks; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ForkJoinPool; +import jdk.internal.misc.Unsafe; /** * Provides a framework for implementing blocking locks and related @@ -312,265 +312,208 @@ */ protected AbstractQueuedSynchronizer() { } - /** - * Wait queue node class. - * - *

The wait queue is a variant of a "CLH" (Craig, Landin, and - * Hagersten) lock queue. CLH locks are normally used for - * spinlocks. We instead use them for blocking synchronizers, but - * use the same basic tactic of holding some of the control - * information about a thread in the predecessor of its node. A - * "status" field in each node keeps track of whether a thread - * should block. A node is signalled when its predecessor - * releases. Each node of the queue otherwise serves as a - * specific-notification-style monitor holding a single waiting - * thread. The status field does NOT control whether threads are - * granted locks etc though. A thread may try to acquire if it is - * first in the queue. But being first does not guarantee success; - * it only gives the right to contend. So the currently released - * contender thread may need to rewait. - * - *

To enqueue into a CLH lock, you atomically splice it in as new - * tail. To dequeue, you just set the head field. - *

-     *      +------+  prev +-----+       +-----+
-     * head |      | <---- |     | <---- |     |  tail
-     *      +------+       +-----+       +-----+
-     * 
+ /* + * Overview. * - *

Insertion into a CLH queue requires only a single atomic - * operation on "tail", so there is a simple atomic point of - * demarcation from unqueued to queued. Similarly, dequeuing - * involves only updating the "head". However, it takes a bit - * more work for nodes to determine who their successors are, - * in part to deal with possible cancellation due to timeouts - * and interrupts. - * - *

The "prev" links (not used in original CLH locks), are mainly - * needed to handle cancellation. If a node is cancelled, its - * successor is (normally) relinked to a non-cancelled - * predecessor. For explanation of similar mechanics in the case - * of spin locks, see the papers by Scott and Scherer at - * http://www.cs.rochester.edu/u/scott/synchronization/ + * The wait queue is a variant of a "CLH" (Craig, Landin, and + * Hagersten) lock queue. CLH locks are normally used for + * spinlocks. We instead use them for blocking synchronizers by + * including explicit ("prev" and "next") links plus a "status" + * field that allow nodes to signal successors when releasing + * locks, and handle cancellation due to interrupts and timeouts. + * The status field includes bits that track whether a thread + * needs a signal (using LockSupport.unpark). Despite these + * additions, we maintain most CLH locality properties. * - *

We also use "next" links to implement blocking mechanics. - * The thread id for each node is kept in its own node, so a - * predecessor signals the next node to wake up by traversing - * next link to determine which thread it is. Determination of - * successor must avoid races with newly queued nodes to set - * the "next" fields of their predecessors. This is solved - * when necessary by checking backwards from the atomically - * updated "tail" when a node's successor appears to be null. - * (Or, said differently, the next-links are an optimization - * so that we don't usually need a backward scan.) + * To enqueue into a CLH lock, you atomically splice it in as new + * tail. To dequeue, you set the head field, so the next eligible + * waiter becomes first. * - *

Cancellation introduces some conservatism to the basic - * algorithms. Since we must poll for cancellation of other - * nodes, we can miss noticing whether a cancelled node is - * ahead or behind us. This is dealt with by always unparking - * successors upon cancellation, allowing them to stabilize on - * a new predecessor, unless we can identify an uncancelled - * predecessor who will carry this responsibility. + * +------+ prev +-------+ +------+ + * | head | <---- | first | <---- | tail | + * +------+ +-------+ +------+ * - *

CLH queues need a dummy header node to get started. But + * Insertion into a CLH queue requires only a single atomic + * operation on "tail", so there is a simple point of demarcation + * from unqueued to queued. The "next" link of the predecessor is + * set by the enqueuing thread after successful CAS. Even though + * non-atomic, this suffices to ensure that any blocked thread is + * signalled by a predecessor when eligible (although in the case + * of cancellation, possibly with the assistance of a signal in + * method cleanQueue). Signalling is based in part on a + * Dekker-like scheme in which the to-be waiting thread indicates + * WAITING status, then retries acquiring, and then rechecks + * status before blocking. The signaller atomically clears WAITING + * status when unparking. + * + * Dequeuing on acquire involves detaching (nulling) a node's + * "prev" node and then updating the "head". Other threads check + * if a node is or was dequeued by checking "prev" rather than + * head. We enforce the nulling then setting order by spin-waiting + * if necessary. Because of this, the lock algorithm is not itself + * strictly "lock-free" because an acquiring thread may need to + * wait for a previous acquire to make progress. When used with + * exclusive locks, such progress is required anyway. However + * Shared mode may (uncommonly) require a spin-wait before + * setting head field to ensure proper propagation. (Historical + * note: This allows some simplifications and efficiencies + * compared to previous versions of this class.) + * + * A node's predecessor can change due to cancellation while it is + * waiting, until the node is first in queue, at which point it + * cannot change. The acquire methods cope with this by rechecking + * "prev" before waiting. The prev and next fields are modified + * only via CAS by cancelled nodes in method cleanQueue. The + * unsplice strategy is reminiscent of Michael-Scott queues in + * that after a successful CAS to prev field, other threads help + * fix next fields. Because cancellation often occurs in bunches + * that complicate decisions about necessary signals, each call to + * cleanQueue traverses the queue until a clean sweep. Nodes that + * become relinked as first are unconditionally unparked + * (sometimes unnecessarily, but those cases are not worth + * avoiding). + * + * A thread may try to acquire if it is first (frontmost) in the + * queue, and sometimes before. Being first does not guarantee + * success; it only gives the right to contend. We balance + * throughput, overhead, and fairness by allowing incoming threads + * to "barge" and acquire the synchronizer while in the process of + * enqueuing, in which case an awakened first thread may need to + * rewait. To counteract possible repeated unlucky rewaits, we + * exponentially increase retries (up to 256) to acquire each time + * a thread is unparked. Except in this case, AQS locks do not + * spin; they instead interleave attempts to acquire with + * bookkeeping steps. (Users who want spinlocks can use + * tryAcquire.) + * + * To improve garbage collectibility, fields of nodes not yet on + * list are null. (It is not rare to create and then throw away a + * node without using it.) Fields of nodes coming off the list are + * nulled out as soon as possible. This accentuates the challenge + * of externally determining the first waiting thread (as in + * method getFirstQueuedThread). This sometimes requires the + * fallback of traversing backwards from the atomically updated + * "tail" when fields appear null. (This is never needed in the + * process of signalling though.) + * + * CLH queues need a dummy header node to get started. But * we don't create them on construction, because it would be wasted * effort if there is never contention. Instead, the node * is constructed and head and tail pointers are set upon first * contention. * - *

Threads waiting on Conditions use the same nodes, but - * use an additional link. Conditions only need to link nodes - * in simple (non-concurrent) linked queues because they are - * only accessed when exclusively held. Upon await, a node is - * inserted into a condition queue. Upon signal, the node is - * transferred to the main queue. A special value of status - * field is used to mark which queue a node is on. + * Shared mode operations differ from Exclusive in that an acquire + * signals the next waiter to try to acquire if it is also + * Shared. The tryAcquireShared API allows users to indicate the + * degree of propagation, but in most applications, it is more + * efficient to ignore this, allowing the successor to try + * acquiring in any case. * - *

Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill + * Threads waiting on Conditions use nodes with an additional + * link to maintain the (FIFO) list of conditions. Conditions only + * need to link nodes in simple (non-concurrent) linked queues + * because they are only accessed when exclusively held. Upon + * await, a node is inserted into a condition queue. Upon signal, + * the node is enqueued on the main queue. A special status field + * value is used to track and atomically trigger this. + * + * Accesses to fields head, tail, and state use full Volatile + * mode, along with CAS. Node fields status, prev and next also do + * so while threads may be signallable, but sometimes use weaker + * modes otherwise. Accesses to field "waiter" (the thread to be + * signalled) are always sandwiched between other atomic accesses + * so are used in Plain mode. We use jdk.internal Unsafe versions + * of atomic access methods rather than VarHandles to avoid + * potential VM bootstrap issues. + * + * Most of the above is performed by primary internal method + * acquire, that is invoked in some way by all exported acquire + * methods. (It is usually easy for compilers to optimize + * call-site specializations when heavily used.) + * + * There are several arbitrary decisions about when and how to + * check interrupts in both acquire and await before and/or after + * blocking. The decisions are less arbitrary in implementation + * updates because some users appear to rely on original behaviors + * in ways that are racy and so (rarely) wrong in general but hard + * to justify changing. + * + * Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill * Scherer and Michael Scott, along with members of JSR-166 * expert group, for helpful ideas, discussions, and critiques * on the design of this class. */ - static final class Node { - /** Marker to indicate a node is waiting in shared mode */ - static final Node SHARED = new Node(); - /** Marker to indicate a node is waiting in exclusive mode */ - static final Node EXCLUSIVE = null; - - /** waitStatus value to indicate thread has cancelled. */ - static final int CANCELLED = 1; - /** waitStatus value to indicate successor's thread needs unparking. */ - static final int SIGNAL = -1; - /** waitStatus value to indicate thread is waiting on condition. */ - static final int CONDITION = -2; - /** - * waitStatus value to indicate the next acquireShared should - * unconditionally propagate. - */ - static final int PROPAGATE = -3; - /** - * Status field, taking on only the values: - * SIGNAL: The successor of this node is (or will soon be) - * blocked (via park), so the current node must - * unpark its successor when it releases or - * cancels. To avoid races, acquire methods must - * first indicate they need a signal, - * then retry the atomic acquire, and then, - * on failure, block. - * CANCELLED: This node is cancelled due to timeout or interrupt. - * Nodes never leave this state. In particular, - * a thread with cancelled node never again blocks. - * CONDITION: This node is currently on a condition queue. - * It will not be used as a sync queue node - * until transferred, at which time the status - * will be set to 0. (Use of this value here has - * nothing to do with the other uses of the - * field, but simplifies mechanics.) - * PROPAGATE: A releaseShared should be propagated to other - * nodes. This is set (for head node only) in - * doReleaseShared to ensure propagation - * continues, even if other operations have - * since intervened. - * 0: None of the above - * - * The values are arranged numerically to simplify use. - * Non-negative values mean that a node doesn't need to - * signal. So, most code doesn't need to check for particular - * values, just for sign. - * - * The field is initialized to 0 for normal sync nodes, and - * CONDITION for condition nodes. It is modified using CAS - * (or when possible, unconditional volatile writes). - */ - volatile int waitStatus; + // Node status bits, also used as argument and return values + static final int WAITING = 1; // must be 1 + static final int CANCELLED = 0x80000000; // must be negative + static final int COND = 2; // in a condition wait - /** - * Link to predecessor node that current node/thread relies on - * for checking waitStatus. Assigned during enqueuing, and nulled - * out (for sake of GC) only upon dequeuing. Also, upon - * cancellation of a predecessor, we short-circuit while - * finding a non-cancelled one, which will always exist - * because the head node is never cancelled: A node becomes - * head only as a result of successful acquire. A - * cancelled thread never succeeds in acquiring, and a thread only - * cancels itself, not any other node. - */ - volatile Node prev; + /** CLH Nodes */ + abstract static class Node { + volatile Node prev; // initially attached via casTail + volatile Node next; // visibly nonnull when signallable + Thread waiter; // visibly nonnull when enqueued + volatile int status; // written by owner, atomic bit ops by others - /** - * Link to the successor node that the current node/thread - * unparks upon release. Assigned during enqueuing, adjusted - * when bypassing cancelled predecessors, and nulled out (for - * sake of GC) when dequeued. The enq operation does not - * assign next field of a predecessor until after attachment, - * so seeing a null next field does not necessarily mean that - * node is at end of queue. However, if a next field appears - * to be null, we can scan prev's from the tail to - * double-check. The next field of cancelled nodes is set to - * point to the node itself instead of null, to make life - * easier for isOnSyncQueue. - */ - volatile Node next; + // methods for atomic operations + final boolean casPrev(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, PREV, c, v); + } + final boolean casNext(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, NEXT, c, v); + } + final int getAndUnsetStatus(int v) { // for signalling + return U.getAndBitwiseAndInt(this, STATUS, ~v); + } + final void setPrevRelaxed(Node p) { // for off-queue assignment + U.putReference(this, PREV, p); + } + final void setStatusRelaxed(int s) { // for off-queue assignment + U.putInt(this, STATUS, s); + } + final void clearStatus() { // for reducing unneeded signals + U.putIntOpaque(this, STATUS, 0); + } + + private static final long STATUS + = U.objectFieldOffset(Node.class, "status"); + private static final long NEXT + = U.objectFieldOffset(Node.class, "next"); + private static final long PREV + = U.objectFieldOffset(Node.class, "prev"); + } + + // Concrete classes tagged by type + static final class ExclusiveNode extends Node { } + static final class SharedNode extends Node { } + + static final class ConditionNode extends Node + implements ForkJoinPool.ManagedBlocker { + ConditionNode nextWaiter; // link to next waiting node /** - * The thread that enqueued this node. Initialized on - * construction and nulled out after use. - */ - volatile Thread thread; - - /** - * Link to next node waiting on condition, or the special - * value SHARED. Because condition queues are accessed only - * when holding in exclusive mode, we just need a simple - * linked queue to hold nodes while they are waiting on - * conditions. They are then transferred to the queue to - * re-acquire. And because conditions can only be exclusive, - * we save a field by using special value to indicate shared - * mode. + * Allows Conditions to be used in ForkJoinPools without + * risking fixed pool exhaustion. This is usable only for + * untimed Condition waits, not timed versions. */ - Node nextWaiter; - - /** - * Returns true if node is waiting in shared mode. - */ - final boolean isShared() { - return nextWaiter == SHARED; - } - - /** - * Returns previous node, or throws NullPointerException if null. - * Use when predecessor cannot be null. The null check could - * be elided, but is present to help the VM. - * - * @return the predecessor of this node - */ - final Node predecessor() { - Node p = prev; - if (p == null) - throw new NullPointerException(); - else - return p; + public final boolean isReleasable() { + return status <= 1 || Thread.currentThread().isInterrupted(); } - /** Establishes initial head or SHARED marker. */ - Node() {} - - /** Constructor used by addWaiter. */ - Node(Node nextWaiter) { - this.nextWaiter = nextWaiter; - THREAD.set(this, Thread.currentThread()); - } - - /** Constructor used by addConditionWaiter. */ - Node(int waitStatus) { - WAITSTATUS.set(this, waitStatus); - THREAD.set(this, Thread.currentThread()); - } - - /** CASes waitStatus field. */ - final boolean compareAndSetWaitStatus(int expect, int update) { - return WAITSTATUS.compareAndSet(this, expect, update); - } - - /** CASes next field. */ - final boolean compareAndSetNext(Node expect, Node update) { - return NEXT.compareAndSet(this, expect, update); - } - - final void setPrevRelaxed(Node p) { - PREV.set(this, p); - } - - // VarHandle mechanics - private static final VarHandle NEXT; - private static final VarHandle PREV; - private static final VarHandle THREAD; - private static final VarHandle WAITSTATUS; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - NEXT = l.findVarHandle(Node.class, "next", Node.class); - PREV = l.findVarHandle(Node.class, "prev", Node.class); - THREAD = l.findVarHandle(Node.class, "thread", Thread.class); - WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } + public final boolean block() { + while (!isReleasable()) LockSupport.park(this); + return true; } } /** - * Head of the wait queue, lazily initialized. Except for - * initialization, it is modified only via method setHead. Note: - * If head exists, its waitStatus is guaranteed not to be - * CANCELLED. + * Head of the wait queue, lazily initialized. */ private transient volatile Node head; /** - * Tail of the wait queue, lazily initialized. Modified only via - * method enq to add new wait node. + * Tail of the wait queue. After initialization, modified only via casTail. */ private transient volatile Node tail; @@ -609,481 +552,235 @@ * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { - return STATE.compareAndSet(this, expect, update); + return U.compareAndSetInt(this, STATE, expect, update); } // Queuing utilities - /** - * The number of nanoseconds for which it is faster to spin - * rather than to use timed park. A rough estimate suffices - * to improve responsiveness with very short timeouts. - */ - static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; + private boolean casTail(Node c, Node v) { + return U.compareAndSetReference(this, TAIL, c, v); + } + + /** tries once to CAS a new dummy node for head */ + private void tryInitializeHead() { + Node h = new ExclusiveNode(); + if (U.compareAndSetReference(this, HEAD, null, h)) + tail = h; + } /** - * Inserts node into queue, initializing if necessary. See picture above. - * @param node the node to insert - * @return node's predecessor + * Enqueues the node unless null. (Currently used only for + * ConditionNodes; other cases are interleaved with acquires.) */ - private Node enq(Node node) { - for (;;) { - Node oldTail = tail; - if (oldTail != null) { - node.setPrevRelaxed(oldTail); - if (compareAndSetTail(oldTail, node)) { - oldTail.next = node; - return oldTail; + final void enqueue(Node node) { + if (node != null) { + for (;;) { + Node t = tail; + node.setPrevRelaxed(t); // avoid unnecessary fence + if (t == null) // initialize + tryInitializeHead(); + else if (casTail(t, node)) { + t.next = node; + if (t.status < 0) // wake up to clean link + LockSupport.unpark(node.waiter); + break; } - } else { - initializeSyncQueue(); } } } + /** Returns true if node is found in traversal from tail */ + final boolean isEnqueued(Node node) { + for (Node t = tail; t != null; t = t.prev) + if (t == node) + return true; + return false; + } + /** - * Creates and enqueues node for current thread and given mode. + * Wakes up the successor of given node, if one exists, and unsets its + * WAITING status to avoid park race. This may fail to wake up an + * eligible thread when one or more have been cancelled, but + * cancelAcquire ensures liveness. + */ + private static void signalNext(Node h) { + Node s; + if (h != null && (s = h.next) != null && s.status != 0) { + s.getAndUnsetStatus(WAITING); + LockSupport.unpark(s.waiter); + } + } + + /** Wakes up the given node if in shared mode */ + private static void signalNextIfShared(Node h) { + Node s; + if (h != null && (s = h.next) != null && + (s instanceof SharedNode) && s.status != 0) { + s.getAndUnsetStatus(WAITING); + LockSupport.unpark(s.waiter); + } + } + + /** + * Main acquire method, invoked by all exported acquire methods. * - * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared - * @return the new node + * @param node null unless a reacquiring Condition + * @param arg the acquire argument + * @param shared true if shared mode else exclusive + * @param interruptible if abort and return negative on interrupt + * @param timed if true use timed waits + * @param time if timed, the System.nanoTime value to timeout + * @return positive if acquired, 0 if timed out, negative if interrupted */ - private Node addWaiter(Node mode) { - Node node = new Node(mode); + final int acquire(Node node, int arg, boolean shared, + boolean interruptible, boolean timed, long time) { + Thread current = Thread.currentThread(); + byte spins = 0, postSpins = 0; // retries upon unpark of first thread + boolean interrupted = false, first = false; + Node pred = null; // predecessor of node when enqueued + + /* + * Repeatedly: + * Check if node now first + * if so, ensure head stable, else ensure valid predecessor + * if node is first or not yet enqueued, try acquiring + * else if node not yet created, create it + * else if not yet enqueued, try once to enqueue + * else if woken from park, retry (up to postSpins times) + * else if WAITING status not set, set and retry + * else park and clear WAITING status, and check cancellation + */ for (;;) { - Node oldTail = tail; - if (oldTail != null) { - node.setPrevRelaxed(oldTail); - if (compareAndSetTail(oldTail, node)) { - oldTail.next = node; - return node; + if (!first && (pred = (node == null) ? null : node.prev) != null && + !(first = (head == pred))) { + if (pred.status < 0) { + cleanQueue(); // predecessor cancelled + continue; + } else if (pred.prev == null) { + Thread.onSpinWait(); // ensure serialization + continue; } + } + if (first || pred == null) { + boolean acquired; + try { + if (shared) + acquired = (tryAcquireShared(arg) >= 0); + else + acquired = tryAcquire(arg); + } catch (Throwable ex) { + cancelAcquire(node, interrupted, false); + throw ex; + } + if (acquired) { + if (first) { + node.prev = null; + head = node; + pred.next = null; + node.waiter = null; + if (shared) + signalNextIfShared(node); + if (interrupted) + current.interrupt(); + } + return 1; + } + } + if (node == null) { // allocate; retry before enqueue + if (shared) + node = new SharedNode(); + else + node = new ExclusiveNode(); + } else if (pred == null) { // try to enqueue + node.waiter = current; + Node t = tail; + node.setPrevRelaxed(t); // avoid unnecessary fence + if (t == null) + tryInitializeHead(); + else if (!casTail(t, node)) + node.setPrevRelaxed(null); // back out + else + t.next = node; + } else if (first && spins != 0) { + --spins; // reduce unfairness on rewaits + Thread.onSpinWait(); + } else if (node.status == 0) { + node.status = WAITING; // enable signal and recheck } else { - initializeSyncQueue(); + long nanos; + spins = postSpins = (byte)((postSpins << 1) | 1); + if (!timed) + LockSupport.park(this); + else if ((nanos = time - System.nanoTime()) > 0L) + LockSupport.parkNanos(this, nanos); + else + break; + node.clearStatus(); + if ((interrupted |= Thread.interrupted()) && interruptible) + break; + } + } + return cancelAcquire(node, interrupted, interruptible); + } + + /** + * Possibly repeatedly traverses from tail, unsplicing cancelled + * nodes until none are found. Unparks nodes that may have been + * relinked to be next eligible acquirer. + */ + private void cleanQueue() { + for (;;) { // restart point + for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples + if (q == null || (p = q.prev) == null) + return; // end of list + if (s == null ? tail != q : (s.prev != q || s.status < 0)) + break; // inconsistent + if (q.status < 0) { // cancelled + if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && + q.prev == p) { + p.casNext(q, s); // OK if fails + if (p.prev == null) + signalNext(p); + } + break; + } + if ((n = p.next) != q) { // help finish + if (n != null && q.prev == p) { + p.casNext(n, q); + if (p.prev == null) + signalNext(p); + } + break; + } + s = q; + q = q.prev; } } } /** - * Sets head of queue to be node, thus dequeuing. Called only by - * acquire methods. Also nulls out unused fields for sake of GC - * and to suppress unnecessary signals and traversals. - * - * @param node the node - */ - private void setHead(Node node) { - head = node; - node.thread = null; - node.prev = null; - } - - /** - * Wakes up node's successor, if one exists. - * - * @param node the node - */ - private void unparkSuccessor(Node node) { - /* - * If status is negative (i.e., possibly needing signal) try - * to clear in anticipation of signalling. It is OK if this - * fails or if status is changed by waiting thread. - */ - int ws = node.waitStatus; - if (ws < 0) - node.compareAndSetWaitStatus(ws, 0); - - /* - * Thread to unpark is held in successor, which is normally - * just the next node. But if cancelled or apparently null, - * traverse backwards from tail to find the actual - * non-cancelled successor. - */ - Node s = node.next; - if (s == null || s.waitStatus > 0) { - s = null; - for (Node p = tail; p != node && p != null; p = p.prev) - if (p.waitStatus <= 0) - s = p; - } - if (s != null) - LockSupport.unpark(s.thread); - } - - /** - * Release action for shared mode -- signals successor and ensures - * propagation. (Note: For exclusive mode, release just amounts - * to calling unparkSuccessor of head if it needs signal.) - */ - private void doReleaseShared() { - /* - * Ensure that a release propagates, even if there are other - * in-progress acquires/releases. This proceeds in the usual - * way of trying to unparkSuccessor of head if it needs - * signal. But if it does not, status is set to PROPAGATE to - * ensure that upon release, propagation continues. - * Additionally, we must loop in case a new node is added - * while we are doing this. Also, unlike other uses of - * unparkSuccessor, we need to know if CAS to reset status - * fails, if so rechecking. - */ - for (;;) { - Node h = head; - if (h != null && h != tail) { - int ws = h.waitStatus; - if (ws == Node.SIGNAL) { - if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) - continue; // loop to recheck cases - unparkSuccessor(h); - } - else if (ws == 0 && - !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) - continue; // loop on failed CAS - } - if (h == head) // loop if head changed - break; - } - } - - /** - * Sets head of queue, and checks if successor may be waiting - * in shared mode, if so propagating if either propagate > 0 or - * PROPAGATE status was set. - * - * @param node the node - * @param propagate the return value from a tryAcquireShared - */ - private void setHeadAndPropagate(Node node, int propagate) { - Node h = head; // Record old head for check below - setHead(node); - /* - * Try to signal next queued node if: - * Propagation was indicated by caller, - * or was recorded (as h.waitStatus either before - * or after setHead) by a previous operation - * (note: this uses sign-check of waitStatus because - * PROPAGATE status may transition to SIGNAL.) - * and - * The next node is waiting in shared mode, - * or we don't know, because it appears null - * - * The conservatism in both of these checks may cause - * unnecessary wake-ups, but only when there are multiple - * racing acquires/releases, so most need signals now or soon - * anyway. - */ - if (propagate > 0 || h == null || h.waitStatus < 0 || - (h = head) == null || h.waitStatus < 0) { - Node s = node.next; - if (s == null || s.isShared()) - doReleaseShared(); - } - } - - // Utilities for various versions of acquire - - /** * Cancels an ongoing attempt to acquire. * - * @param node the node - */ - private void cancelAcquire(Node node) { - // Ignore if node doesn't exist - if (node == null) - return; - - node.thread = null; - - // Skip cancelled predecessors - Node pred = node.prev; - while (pred.waitStatus > 0) - node.prev = pred = pred.prev; - - // predNext is the apparent node to unsplice. CASes below will - // fail if not, in which case, we lost race vs another cancel - // or signal, so no further action is necessary, although with - // a possibility that a cancelled node may transiently remain - // reachable. - Node predNext = pred.next; - - // Can use unconditional write instead of CAS here. - // After this atomic step, other Nodes can skip past us. - // Before, we are free of interference from other threads. - node.waitStatus = Node.CANCELLED; - - // If we are the tail, remove ourselves. - if (node == tail && compareAndSetTail(node, pred)) { - pred.compareAndSetNext(predNext, null); - } else { - // If successor needs signal, try to set pred's next-link - // so it will get one. Otherwise wake it up to propagate. - int ws; - if (pred != head && - ((ws = pred.waitStatus) == Node.SIGNAL || - (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && - pred.thread != null) { - Node next = node.next; - if (next != null && next.waitStatus <= 0) - pred.compareAndSetNext(predNext, next); - } else { - unparkSuccessor(node); - } - - node.next = node; // help GC - } - } - - /** - * Checks and updates status for a node that failed to acquire. - * Returns true if thread should block. This is the main signal - * control in all acquire loops. Requires that pred == node.prev. - * - * @param pred node's predecessor holding status - * @param node the node - * @return {@code true} if thread should block - */ - private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { - int ws = pred.waitStatus; - if (ws == Node.SIGNAL) - /* - * This node has already set status asking a release - * to signal it, so it can safely park. - */ - return true; - if (ws > 0) { - /* - * Predecessor was cancelled. Skip over predecessors and - * indicate retry. - */ - do { - node.prev = pred = pred.prev; - } while (pred.waitStatus > 0); - pred.next = node; - } else { - /* - * waitStatus must be 0 or PROPAGATE. Indicate that we - * need a signal, but don't park yet. Caller will need to - * retry to make sure it cannot acquire before parking. - */ - pred.compareAndSetWaitStatus(ws, Node.SIGNAL); - } - return false; - } - - /** - * Convenience method to interrupt current thread. - */ - static void selfInterrupt() { - Thread.currentThread().interrupt(); - } - - /** - * Convenience method to park and then check if interrupted. - * - * @return {@code true} if interrupted - */ - private final boolean parkAndCheckInterrupt() { - LockSupport.park(this); - return Thread.interrupted(); - } - - /* - * Various flavors of acquire, varying in exclusive/shared and - * control modes. Each is mostly the same, but annoyingly - * different. Only a little bit of factoring is possible due to - * interactions of exception mechanics (including ensuring that we - * cancel if tryAcquire throws exception) and other control, at - * least not without hurting performance too much. - */ - - /** - * Acquires in exclusive uninterruptible mode for thread already in - * queue. Used by condition wait methods as well as acquire. - * - * @param node the node - * @param arg the acquire argument - * @return {@code true} if interrupted while waiting - */ - final boolean acquireQueued(final Node node, int arg) { - boolean interrupted = false; - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head && tryAcquire(arg)) { - setHead(node); - p.next = null; // help GC - return interrupted; - } - if (shouldParkAfterFailedAcquire(p, node)) - interrupted |= parkAndCheckInterrupt(); - } - } catch (Throwable t) { - cancelAcquire(node); - if (interrupted) - selfInterrupt(); - throw t; - } - } - - /** - * Acquires in exclusive interruptible mode. - * @param arg the acquire argument + * @param node the node (may be null if cancelled before enqueuing) + * @param interrupted true if thread interrupted + * @param interruptible if should report interruption vs reset */ - private void doAcquireInterruptibly(int arg) - throws InterruptedException { - final Node node = addWaiter(Node.EXCLUSIVE); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head && tryAcquire(arg)) { - setHead(node); - p.next = null; // help GC - return; - } - if (shouldParkAfterFailedAcquire(p, node) && - parkAndCheckInterrupt()) - throw new InterruptedException(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; - } - } - - /** - * Acquires in exclusive timed mode. - * - * @param arg the acquire argument - * @param nanosTimeout max wait time - * @return {@code true} if acquired - */ - private boolean doAcquireNanos(int arg, long nanosTimeout) - throws InterruptedException { - if (nanosTimeout <= 0L) - return false; - final long deadline = System.nanoTime() + nanosTimeout; - final Node node = addWaiter(Node.EXCLUSIVE); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head && tryAcquire(arg)) { - setHead(node); - p.next = null; // help GC - return true; - } - nanosTimeout = deadline - System.nanoTime(); - if (nanosTimeout <= 0L) { - cancelAcquire(node); - return false; - } - if (shouldParkAfterFailedAcquire(p, node) && - nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if (Thread.interrupted()) - throw new InterruptedException(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; + private int cancelAcquire(Node node, boolean interrupted, + boolean interruptible) { + if (node != null) { + node.waiter = null; + node.status = CANCELLED; + if (node.prev != null) + cleanQueue(); } - } - - /** - * Acquires in shared uninterruptible mode. - * @param arg the acquire argument - */ - private void doAcquireShared(int arg) { - final Node node = addWaiter(Node.SHARED); - boolean interrupted = false; - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head) { - int r = tryAcquireShared(arg); - if (r >= 0) { - setHeadAndPropagate(node, r); - p.next = null; // help GC - return; - } - } - if (shouldParkAfterFailedAcquire(p, node)) - interrupted |= parkAndCheckInterrupt(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; - } finally { - if (interrupted) - selfInterrupt(); + if (interrupted) { + if (interruptible) + return CANCELLED; + else + Thread.currentThread().interrupt(); } - } - - /** - * Acquires in shared interruptible mode. - * @param arg the acquire argument - */ - private void doAcquireSharedInterruptibly(int arg) - throws InterruptedException { - final Node node = addWaiter(Node.SHARED); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head) { - int r = tryAcquireShared(arg); - if (r >= 0) { - setHeadAndPropagate(node, r); - p.next = null; // help GC - return; - } - } - if (shouldParkAfterFailedAcquire(p, node) && - parkAndCheckInterrupt()) - throw new InterruptedException(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; - } - } - - /** - * Acquires in shared timed mode. - * - * @param arg the acquire argument - * @param nanosTimeout max wait time - * @return {@code true} if acquired - */ - private boolean doAcquireSharedNanos(int arg, long nanosTimeout) - throws InterruptedException { - if (nanosTimeout <= 0L) - return false; - final long deadline = System.nanoTime() + nanosTimeout; - final Node node = addWaiter(Node.SHARED); - try { - for (;;) { - final Node p = node.predecessor(); - if (p == head) { - int r = tryAcquireShared(arg); - if (r >= 0) { - setHeadAndPropagate(node, r); - p.next = null; // help GC - return true; - } - } - nanosTimeout = deadline - System.nanoTime(); - if (nanosTimeout <= 0L) { - cancelAcquire(node); - return false; - } - if (shouldParkAfterFailedAcquire(p, node) && - nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if (Thread.interrupted()) - throw new InterruptedException(); - } - } catch (Throwable t) { - cancelAcquire(node); - throw t; - } + return 0; } // Main exported methods @@ -1236,9 +933,8 @@ * can represent anything you like. */ public final void acquire(int arg) { - if (!tryAcquire(arg) && - acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) - selfInterrupt(); + if (!tryAcquire(arg)) + acquire(null, arg, false, false, false, 0L); } /** @@ -1256,11 +952,10 @@ * @throws InterruptedException if the current thread is interrupted */ public final void acquireInterruptibly(int arg) - throws InterruptedException { - if (Thread.interrupted()) + throws InterruptedException { + if (Thread.interrupted() || + (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) throw new InterruptedException(); - if (!tryAcquire(arg)) - doAcquireInterruptibly(arg); } /** @@ -1281,11 +976,20 @@ * @throws InterruptedException if the current thread is interrupted */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) - throws InterruptedException { - if (Thread.interrupted()) - throw new InterruptedException(); - return tryAcquire(arg) || - doAcquireNanos(arg, nanosTimeout); + throws InterruptedException { + if (!Thread.interrupted()) { + if (tryAcquire(arg)) + return true; + if (nanosTimeout <= 0L) + return false; + int stat = acquire(null, arg, false, true, true, + System.nanoTime() + nanosTimeout); + if (stat > 0) + return true; + if (stat == 0) + return false; + } + throw new InterruptedException(); } /** @@ -1300,9 +1004,7 @@ */ public final boolean release(int arg) { if (tryRelease(arg)) { - Node h = head; - if (h != null && h.waitStatus != 0) - unparkSuccessor(h); + signalNext(head); return true; } return false; @@ -1321,7 +1023,7 @@ */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) - doAcquireShared(arg); + acquire(null, arg, true, false, false, 0L); } /** @@ -1338,11 +1040,11 @@ * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) - throws InterruptedException { - if (Thread.interrupted()) + throws InterruptedException { + if (Thread.interrupted() || + (tryAcquireShared(arg) < 0 && + acquire(null, arg, true, true, false, 0L) < 0)) throw new InterruptedException(); - if (tryAcquireShared(arg) < 0) - doAcquireSharedInterruptibly(arg); } /** @@ -1363,10 +1065,19 @@ */ public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { - if (Thread.interrupted()) - throw new InterruptedException(); - return tryAcquireShared(arg) >= 0 || - doAcquireSharedNanos(arg, nanosTimeout); + if (!Thread.interrupted()) { + if (tryAcquireShared(arg) >= 0) + return true; + if (nanosTimeout <= 0L) + return false; + int stat = acquire(null, arg, true, true, true, + System.nanoTime() + nanosTimeout); + if (stat > 0) + return true; + if (stat == 0) + return false; + } + throw new InterruptedException(); } /** @@ -1380,7 +1091,7 @@ */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { - doReleaseShared(); + signalNext(head); return true; } return false; @@ -1398,7 +1109,7 @@ */ public final boolean hasQueuedThreads() { for (Node p = tail, h = head; p != h && p != null; p = p.prev) - if (p.waitStatus <= 0) + if (p.status >= 0) return true; return false; } @@ -1428,45 +1139,16 @@ * {@code null} if no threads are currently queued */ public final Thread getFirstQueuedThread() { - // handle only fast path, else relay - return (head == tail) ? null : fullGetFirstQueuedThread(); - } - - /** - * Version of getFirstQueuedThread called when fastpath fails. - */ - private Thread fullGetFirstQueuedThread() { - /* - * The first node is normally head.next. Try to get its - * thread field, ensuring consistent reads: If thread - * field is nulled out or s.prev is no longer head, then - * some other thread(s) concurrently performed setHead in - * between some of our reads. We try this twice before - * resorting to traversal. - */ - Node h, s; - Thread st; - if (((h = head) != null && (s = h.next) != null && - s.prev == head && (st = s.thread) != null) || - ((h = head) != null && (s = h.next) != null && - s.prev == head && (st = s.thread) != null)) - return st; - - /* - * Head's next field might not have been set yet, or may have - * been unset after setHead. So we must check to see if tail - * is actually first node. If not, we continue on, safely - * traversing from tail back to head to find first, - * guaranteeing termination. - */ - - Thread firstThread = null; - for (Node p = tail; p != null && p != head; p = p.prev) { - Thread t = p.thread; - if (t != null) - firstThread = t; + Thread first = null, w; Node h, s; + if ((h = head) != null && ((s = h.next) == null || + (first = s.waiter) == null || + s.prev == null)) { + // traverse from tail on stale reads + for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) + if ((w = p.waiter) != null) + first = w; } - return firstThread; + return first; } /** @@ -1483,7 +1165,7 @@ if (thread == null) throw new NullPointerException(); for (Node p = tail; p != null; p = p.prev) - if (p.thread == thread) + if (p.waiter == thread) return true; return false; } @@ -1499,10 +1181,8 @@ */ final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; - return (h = head) != null && - (s = h.next) != null && - !s.isShared() && - s.thread != null; + return (h = head) != null && (s = h.next) != null && + !(s instanceof SharedNode) && s.waiter != null; } /** @@ -1549,19 +1229,12 @@ * @since 1.7 */ public final boolean hasQueuedPredecessors() { - Node h, s; - if ((h = head) != null) { - if ((s = h.next) == null || s.waitStatus > 0) { - s = null; // traverse in case of concurrent cancellation - for (Node p = tail; p != h && p != null; p = p.prev) { - if (p.waitStatus <= 0) - s = p; - } - } - if (s != null && s.thread != Thread.currentThread()) - return true; - } - return false; + Thread first = null; Node h, s; + if ((h = head) != null && ((s = h.next) == null || + (first = s.waiter) == null || + s.prev == null)) + first = getFirstQueuedThread(); // retry via getFirstQueuedThread + return first != null && first != Thread.currentThread(); } // Instrumentation and monitoring methods @@ -1578,7 +1251,7 @@ public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { - if (p.thread != null) + if (p.waiter != null) ++n; } return n; @@ -1598,7 +1271,7 @@ public final Collection getQueuedThreads() { ArrayList list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { - Thread t = p.thread; + Thread t = p.waiter; if (t != null) list.add(t); } @@ -1616,8 +1289,8 @@ public final Collection getExclusiveQueuedThreads() { ArrayList list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { - if (!p.isShared()) { - Thread t = p.thread; + if (!(p instanceof SharedNode)) { + Thread t = p.waiter; if (t != null) list.add(t); } @@ -1636,8 +1309,8 @@ public final Collection getSharedQueuedThreads() { ArrayList list = new ArrayList<>(); for (Node p = tail; p != null; p = p.prev) { - if (p.isShared()) { - Thread t = p.thread; + if (p instanceof SharedNode) { + Thread t = p.waiter; if (t != null) list.add(t); } @@ -1660,117 +1333,6 @@ + (hasQueuedThreads() ? "non" : "") + "empty queue]"; } - - // Internal support methods for Conditions - - /** - * Returns true if a node, always one that was initially placed on - * a condition queue, is now waiting to reacquire on sync queue. - * @param node the node - * @return true if is reacquiring - */ - final boolean isOnSyncQueue(Node node) { - if (node.waitStatus == Node.CONDITION || node.prev == null) - return false; - if (node.next != null) // If has successor, it must be on queue - return true; - /* - * node.prev can be non-null, but not yet on queue because - * the CAS to place it on queue can fail. So we have to - * traverse from tail to make sure it actually made it. It - * will always be near the tail in calls to this method, and - * unless the CAS failed (which is unlikely), it will be - * there, so we hardly ever traverse much. - */ - return findNodeFromTail(node); - } - - /** - * Returns true if node is on sync queue by searching backwards from tail. - * Called only when needed by isOnSyncQueue. - * @return true if present - */ - private boolean findNodeFromTail(Node node) { - // We check for node first, since it's likely to be at or near tail. - // tail is known to be non-null, so we could re-order to "save" - // one null check, but we leave it this way to help the VM. - for (Node p = tail;;) { - if (p == node) - return true; - if (p == null) - return false; - p = p.prev; - } - } - - /** - * Transfers a node from a condition queue onto sync queue. - * Returns true if successful. - * @param node the node - * @return true if successfully transferred (else the node was - * cancelled before signal) - */ - final boolean transferForSignal(Node node) { - /* - * If cannot change waitStatus, the node has been cancelled. - */ - if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) - return false; - - /* - * Splice onto queue and try to set waitStatus of predecessor to - * indicate that thread is (probably) waiting. If cancelled or - * attempt to set waitStatus fails, wake up to resync (in which - * case the waitStatus can be transiently and harmlessly wrong). - */ - Node p = enq(node); - int ws = p.waitStatus; - if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) - LockSupport.unpark(node.thread); - return true; - } - - /** - * Transfers node, if necessary, to sync queue after a cancelled wait. - * Returns true if thread was cancelled before being signalled. - * - * @param node the node - * @return true if cancelled before the node was signalled - */ - final boolean transferAfterCancelledWait(Node node) { - if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { - enq(node); - return true; - } - /* - * If we lost out to a signal(), then we can't proceed - * until it finishes its enq(). Cancelling during an - * incomplete transfer is both rare and transient, so just - * spin. - */ - while (!isOnSyncQueue(node)) - Thread.yield(); - return false; - } - - /** - * Invokes release with current state value; returns saved state. - * Cancels node and throws exception on failure. - * @param node the condition node for this wait - * @return previous sync state - */ - final int fullyRelease(Node node) { - try { - int savedState = getState(); - if (release(savedState)) - return savedState; - throw new IllegalMonitorStateException(); - } catch (Throwable t) { - node.waitStatus = Node.CANCELLED; - throw t; - } - } - // Instrumentation methods for conditions /** @@ -1868,106 +1430,34 @@ public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ - private transient Node firstWaiter; + private transient ConditionNode firstWaiter; /** Last node of condition queue. */ - private transient Node lastWaiter; + private transient ConditionNode lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } - // Internal methods - - /** - * Adds a new waiter to wait queue. - * @return its new wait node - */ - private Node addConditionWaiter() { - if (!isHeldExclusively()) - throw new IllegalMonitorStateException(); - Node t = lastWaiter; - // If lastWaiter is cancelled, clean out. - if (t != null && t.waitStatus != Node.CONDITION) { - unlinkCancelledWaiters(); - t = lastWaiter; - } - - Node node = new Node(Node.CONDITION); - - if (t == null) - firstWaiter = node; - else - t.nextWaiter = node; - lastWaiter = node; - return node; - } - - /** - * Removes and transfers nodes until hit non-cancelled one or - * null. Split out from signal in part to encourage compilers - * to inline the case of no waiters. - * @param first (non-null) the first node on condition queue - */ - private void doSignal(Node first) { - do { - if ( (firstWaiter = first.nextWaiter) == null) - lastWaiter = null; - first.nextWaiter = null; - } while (!transferForSignal(first) && - (first = firstWaiter) != null); - } + // Signalling methods /** - * Removes and transfers all nodes. - * @param first (non-null) the first node on condition queue + * Removes and transfers one or all waiters to sync queue. */ - private void doSignalAll(Node first) { - lastWaiter = firstWaiter = null; - do { - Node next = first.nextWaiter; - first.nextWaiter = null; - transferForSignal(first); + private void doSignal(ConditionNode first, boolean all) { + while (first != null) { + ConditionNode next = first.nextWaiter; + if ((firstWaiter = next) == null) + lastWaiter = null; + if ((first.getAndUnsetStatus(COND) & COND) != 0) { + enqueue(first); + if (!all) + break; + } first = next; - } while (first != null); - } - - /** - * Unlinks cancelled waiter nodes from condition queue. - * Called only while holding lock. This is called when - * cancellation occurred during condition wait, and upon - * insertion of a new waiter when lastWaiter is seen to have - * been cancelled. This method is needed to avoid garbage - * retention in the absence of signals. So even though it may - * require a full traversal, it comes into play only when - * timeouts or cancellations occur in the absence of - * signals. It traverses all nodes rather than stopping at a - * particular target to unlink all pointers to garbage nodes - * without requiring many re-traversals during cancellation - * storms. - */ - private void unlinkCancelledWaiters() { - Node t = firstWaiter; - Node trail = null; - while (t != null) { - Node next = t.nextWaiter; - if (t.waitStatus != Node.CONDITION) { - t.nextWaiter = null; - if (trail == null) - firstWaiter = next; - else - trail.nextWaiter = next; - if (next == null) - lastWaiter = trail; - } - else - trail = t; - t = next; } } - // public methods - /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the @@ -1977,11 +1467,11 @@ * returns {@code false} */ public final void signal() { + ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - Node first = firstWaiter; if (first != null) - doSignal(first); + doSignal(first, false); } /** @@ -1992,11 +1482,72 @@ * returns {@code false} */ public final void signalAll() { + ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - Node first = firstWaiter; if (first != null) - doSignalAll(first); + doSignal(first, true); + } + + // Waiting methods + + /** + * Adds node to condition list and releases lock. + * + * @param node the node + * @return savedState to reacquire after wait + */ + private int enableWait(ConditionNode node) { + if (isHeldExclusively()) { + node.waiter = Thread.currentThread(); + node.setStatusRelaxed(COND | WAITING); + ConditionNode last = lastWaiter; + if (last == null) + firstWaiter = node; + else + last.nextWaiter = node; + lastWaiter = node; + int savedState = getState(); + if (release(savedState)) + return savedState; + } + node.status = CANCELLED; // lock not held or inconsistent + throw new IllegalMonitorStateException(); + } + + /** + * Returns true if a node that was initially placed on a condition + * queue is now ready to reacquire on sync queue. + * @param node the node + * @return true if is reacquiring + */ + private boolean canReacquire(ConditionNode node) { + // check links, not status to avoid enqueue race + return node != null && node.prev != null && isEnqueued(node); + } + + /** + * Unlinks the given node and other non-waiting nodes from + * condition queue unless already unlinked. + */ + private void unlinkCancelledWaiters(ConditionNode node) { + if (node == null || node.nextWaiter != null || node == lastWaiter) { + ConditionNode w = firstWaiter, trail = null; + while (w != null) { + ConditionNode next = w.nextWaiter; + if ((w.status & COND) == 0) { + w.nextWaiter = null; + if (trail == null) + firstWaiter = next; + else + trail.nextWaiter = next; + if (next == null) + lastWaiter = trail; + } else + trail = w; + w = next; + } + } } /** @@ -2011,51 +1562,27 @@ * */ public final void awaitUninterruptibly() { - Node node = addConditionWaiter(); - int savedState = fullyRelease(node); + ConditionNode node = new ConditionNode(); + int savedState = enableWait(node); + LockSupport.setCurrentBlocker(this); // for back-compatibility boolean interrupted = false; - while (!isOnSyncQueue(node)) { - LockSupport.park(this); + while (!canReacquire(node)) { if (Thread.interrupted()) interrupted = true; + else if ((node.status & COND) != 0) { + try { + ForkJoinPool.managedBlock(node); + } catch (InterruptedException ie) { + interrupted = true; + } + } else + Thread.onSpinWait(); // awoke while enqueuing } - if (acquireQueued(node, savedState) || interrupted) - selfInterrupt(); - } - - /* - * For interruptible waits, we need to track whether to throw - * InterruptedException, if interrupted while blocked on - * condition, versus reinterrupt current thread, if - * interrupted while blocked waiting to re-acquire. - */ - - /** Mode meaning to reinterrupt on exit from wait */ - private static final int REINTERRUPT = 1; - /** Mode meaning to throw InterruptedException on exit from wait */ - private static final int THROW_IE = -1; - - /** - * Checks for interrupt, returning THROW_IE if interrupted - * before signalled, REINTERRUPT if after signalled, or - * 0 if not interrupted. - */ - private int checkInterruptWhileWaiting(Node node) { - return Thread.interrupted() ? - (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : - 0; - } - - /** - * Throws InterruptedException, reinterrupts current thread, or - * does nothing, depending on mode. - */ - private void reportInterruptAfterWait(int interruptMode) - throws InterruptedException { - if (interruptMode == THROW_IE) - throw new InterruptedException(); - else if (interruptMode == REINTERRUPT) - selfInterrupt(); + LockSupport.setCurrentBlocker(null); + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (interrupted) + Thread.currentThread().interrupt(); } /** @@ -2074,20 +1601,33 @@ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); - Node node = addConditionWaiter(); - int savedState = fullyRelease(node); - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - LockSupport.park(this); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; + ConditionNode node = new ConditionNode(); + int savedState = enableWait(node); + LockSupport.setCurrentBlocker(this); // for back-compatibility + boolean interrupted = false, cancelled = false; + while (!canReacquire(node)) { + if (interrupted |= Thread.interrupted()) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; // else interrupted after signal + } else if ((node.status & COND) != 0) { + try { + ForkJoinPool.managedBlock(node); + } catch (InterruptedException ie) { + interrupted = true; + } + } else + Thread.onSpinWait(); // awoke while enqueuing } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) // clean up if cancelled - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); + LockSupport.setCurrentBlocker(null); + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (interrupted) { + if (cancelled) { + unlinkCancelledWaiters(node); + throw new InterruptedException(); + } + Thread.currentThread().interrupt(); + } } /** @@ -2107,32 +1647,29 @@ throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); - // We don't check for nanosTimeout <= 0L here, to allow - // awaitNanos(0) as a way to "yield the lock". - final long deadline = System.nanoTime() + nanosTimeout; - long initialNanos = nanosTimeout; - Node node = addConditionWaiter(); - int savedState = fullyRelease(node); - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - if (nanosTimeout <= 0L) { - transferAfterCancelledWait(node); - break; - } - if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; - nanosTimeout = deadline - System.nanoTime(); + ConditionNode node = new ConditionNode(); + int savedState = enableWait(node); + long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; + long deadline = System.nanoTime() + nanos; + boolean cancelled = false, interrupted = false; + while (!canReacquire(node)) { + if ((interrupted |= Thread.interrupted()) || + (nanos = deadline - System.nanoTime()) <= 0L) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; + } else + LockSupport.parkNanos(this, nanos); } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (cancelled) { + unlinkCancelledWaiters(node); + if (interrupted) + throw new InterruptedException(); + } else if (interrupted) + Thread.currentThread().interrupt(); long remaining = deadline - System.nanoTime(); // avoid overflow - return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; + return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; } /** @@ -2154,26 +1691,26 @@ long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); - Node node = addConditionWaiter(); - int savedState = fullyRelease(node); - boolean timedout = false; - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - if (System.currentTimeMillis() >= abstime) { - timedout = transferAfterCancelledWait(node); - break; - } - LockSupport.parkUntil(this, abstime); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; + ConditionNode node = new ConditionNode(); + int savedState = enableWait(node); + boolean cancelled = false, interrupted = false; + while (!canReacquire(node)) { + if ((interrupted |= Thread.interrupted()) || + System.currentTimeMillis() >= abstime) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; + } else + LockSupport.parkUntil(this, abstime); } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); - return !timedout; + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (cancelled) { + unlinkCancelledWaiters(node); + if (interrupted) + throw new InterruptedException(); + } else if (interrupted) + Thread.currentThread().interrupt(); + return !cancelled; } /** @@ -2195,31 +1732,28 @@ long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); - // We don't check for nanosTimeout <= 0L here, to allow - // await(0, unit) as a way to "yield the lock". - final long deadline = System.nanoTime() + nanosTimeout; - Node node = addConditionWaiter(); - int savedState = fullyRelease(node); - boolean timedout = false; - int interruptMode = 0; - while (!isOnSyncQueue(node)) { - if (nanosTimeout <= 0L) { - timedout = transferAfterCancelledWait(node); - break; - } - if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanosTimeout); - if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) - break; - nanosTimeout = deadline - System.nanoTime(); + ConditionNode node = new ConditionNode(); + int savedState = enableWait(node); + long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; + long deadline = System.nanoTime() + nanos; + boolean cancelled = false, interrupted = false; + while (!canReacquire(node)) { + if ((interrupted |= Thread.interrupted()) || + (nanos = deadline - System.nanoTime()) <= 0L) { + if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) + break; + } else + LockSupport.parkNanos(this, nanos); } - if (acquireQueued(node, savedState) && interruptMode != THROW_IE) - interruptMode = REINTERRUPT; - if (node.nextWaiter != null) - unlinkCancelledWaiters(); - if (interruptMode != 0) - reportInterruptAfterWait(interruptMode); - return !timedout; + node.clearStatus(); + acquire(node, savedState, false, false, false, 0L); + if (cancelled) { + unlinkCancelledWaiters(node); + if (interrupted) + throw new InterruptedException(); + } else if (interrupted) + Thread.currentThread().interrupt(); + return !cancelled; } // support for instrumentation @@ -2245,8 +1779,8 @@ protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); - for (Node w = firstWaiter; w != null; w = w.nextWaiter) { - if (w.waitStatus == Node.CONDITION) + for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { + if ((w.status & COND) != 0) return true; } return false; @@ -2265,8 +1799,8 @@ if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; - for (Node w = firstWaiter; w != null; w = w.nextWaiter) { - if (w.waitStatus == Node.CONDITION) + for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { + if ((w.status & COND) != 0) ++n; } return n; @@ -2285,9 +1819,9 @@ if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList list = new ArrayList<>(); - for (Node w = firstWaiter; w != null; w = w.nextWaiter) { - if (w.waitStatus == Node.CONDITION) { - Thread t = w.thread; + for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { + if ((w.status & COND) != 0) { + Thread t = w.waiter; if (t != null) list.add(t); } @@ -2296,39 +1830,16 @@ } } - // VarHandle mechanics - private static final VarHandle STATE; - private static final VarHandle HEAD; - private static final VarHandle TAIL; + // Unsafe + private static final Unsafe U = Unsafe.getUnsafe(); + private static final long STATE + = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state"); + private static final long HEAD + = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "head"); + private static final long TAIL + = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "tail"); static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class); - HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class); - TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } - - // Reduce the risk of rare disastrous classloading in first call to - // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class ensureLoaded = LockSupport.class; } - - /** - * Initializes head and tail fields on first contention. - */ - private final void initializeSyncQueue() { - Node h; - if (HEAD.compareAndSet(this, null, (h = new Node()))) - tail = h; - } - - /** - * CASes tail field. - */ - private final boolean compareAndSetTail(Node expect, Node update) { - return TAIL.compareAndSet(this, expect, update); - } } diff --git a/src/java.base/share/classes/java/util/concurrent/locks/Lock.java b/src/java.base/share/classes/java/util/concurrent/locks/Lock.java --- a/src/java.base/share/classes/java/util/concurrent/locks/Lock.java +++ b/src/java.base/share/classes/java/util/concurrent/locks/Lock.java @@ -122,9 +122,8 @@ *

All {@code Lock} implementations must enforce the same * memory synchronization semantics as provided by the built-in monitor * lock, as described in - * * Chapter 17 of - * The Java™ Language Specification: + * The Java™ Language Specification: *

    *
  • A successful {@code lock} operation has the same memory * synchronization effects as a successful Lock action. @@ -162,6 +161,7 @@ * @see ReentrantLock * @see Condition * @see ReadWriteLock + * @jls 17.4 Memory Model * * @since 1.5 * @author Doug Lea diff --git a/src/java.base/share/classes/java/util/concurrent/locks/LockSupport.java b/src/java.base/share/classes/java/util/concurrent/locks/LockSupport.java --- a/src/java.base/share/classes/java/util/concurrent/locks/LockSupport.java +++ b/src/java.base/share/classes/java/util/concurrent/locks/LockSupport.java @@ -140,8 +140,25 @@ private LockSupport() {} // Cannot be instantiated. private static void setBlocker(Thread t, Object arg) { - // Even though volatile, hotspot doesn't need a write barrier here. - U.putReference(t, PARKBLOCKER, arg); + U.putReferenceOpaque(t, PARKBLOCKER, arg); + } + + /** + * Sets the object to be returned by invocations of {@link + * #getBlocker getBlocker} for the current thread. This method may + * be used before invoking the no-argument version of {@link + * LockSupport#park() park()} from non-public objects, allowing + * more helpful diagnostics, or retaining compatibility with + * previous implementations of blocking methods. Previous values + * of the blocker are not automatically restored after blocking. + * To obtain the effects of {@code park(b}}, use {@code + * setCurrentBlocker(b); park(); setCurrentBlocker(null);} + * + * @param blocker the blocker object + * @since 14 + */ + public static void setCurrentBlocker(Object blocker) { + U.putReferenceOpaque(Thread.currentThread(), PARKBLOCKER, blocker); } /** @@ -292,7 +309,7 @@ public static Object getBlocker(Thread t) { if (t == null) throw new NullPointerException(); - return U.getReferenceVolatile(t, PARKBLOCKER); + return U.getReferenceOpaque(t, PARKBLOCKER); } /** @@ -394,24 +411,6 @@ } /** - * Returns the pseudo-randomly initialized or updated secondary seed. - * Copied from ThreadLocalRandom due to package access restrictions. - */ - static final int nextSecondarySeed() { - int r; - Thread t = Thread.currentThread(); - if ((r = U.getInt(t, SECONDARY)) != 0) { - r ^= r << 13; // xorshift - r ^= r >>> 17; - r ^= r << 5; - } - else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0) - r = 1; // avoid zero - U.putInt(t, SECONDARY, r); - return r; - } - - /** * Returns the thread id for the given thread. We must access * this directly rather than via method Thread.getId() because * getId() has been known to be overridden in ways that do not @@ -423,11 +422,9 @@ // Hotspot implementation via intrinsics API private static final Unsafe U = Unsafe.getUnsafe(); - private static final long PARKBLOCKER = U.objectFieldOffset - (Thread.class, "parkBlocker"); - private static final long SECONDARY = U.objectFieldOffset - (Thread.class, "threadLocalRandomSecondarySeed"); - private static final long TID = U.objectFieldOffset - (Thread.class, "tid"); + private static final long PARKBLOCKER + = U.objectFieldOffset(Thread.class, "parkBlocker"); + private static final long TID + = U.objectFieldOffset(Thread.class, "tid"); } diff --git a/src/java.base/share/classes/java/util/concurrent/locks/ReentrantLock.java b/src/java.base/share/classes/java/util/concurrent/locks/ReentrantLock.java --- a/src/java.base/share/classes/java/util/concurrent/locks/ReentrantLock.java +++ b/src/java.base/share/classes/java/util/concurrent/locks/ReentrantLock.java @@ -119,39 +119,63 @@ private static final long serialVersionUID = -5179523762034025860L; /** - * Performs non-fair tryLock. tryAcquire is implemented in - * subclasses, but both need nonfair try for trylock method. + * Performs non-fair tryLock. */ @ReservedStackAccess - final boolean nonfairTryAcquire(int acquires) { - final Thread current = Thread.currentThread(); + final boolean tryLock() { + Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { - if (compareAndSetState(0, acquires)) { + if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(current); return true; } - } - else if (current == getExclusiveOwnerThread()) { - int nextc = c + acquires; - if (nextc < 0) // overflow + } else if (getExclusiveOwnerThread() == current) { + if (++c < 0) // overflow throw new Error("Maximum lock count exceeded"); - setState(nextc); + setState(c); return true; } return false; } + /** + * Checks for reentrancy and acquires if lock immediately + * available under fair vs nonfair rules. Locking methods + * perform initialTryLock check before relaying to + * corresponding AQS acquire methods. + */ + abstract boolean initialTryLock(); + + @ReservedStackAccess + final void lock() { + if (!initialTryLock()) + acquire(1); + } + + @ReservedStackAccess + final void lockInterruptibly() throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + if (!initialTryLock()) + acquireInterruptibly(1); + } + + @ReservedStackAccess + final boolean tryLockNanos(long nanos) throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + return initialTryLock() || tryAcquireNanos(1, nanos); + } + @ReservedStackAccess protected final boolean tryRelease(int releases) { int c = getState() - releases; - if (Thread.currentThread() != getExclusiveOwnerThread()) + if (getExclusiveOwnerThread() != Thread.currentThread()) throw new IllegalMonitorStateException(); - boolean free = false; - if (c == 0) { - free = true; + boolean free = (c == 0); + if (free) setExclusiveOwnerThread(null); - } setState(c); return free; } @@ -195,8 +219,31 @@ */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; + + final boolean initialTryLock() { + Thread current = Thread.currentThread(); + if (compareAndSetState(0, 1)) { // first attempt is unguarded + setExclusiveOwnerThread(current); + return true; + } else if (getExclusiveOwnerThread() == current) { + int c = getState() + 1; + if (c < 0) // overflow + throw new Error("Maximum lock count exceeded"); + setState(c); + return true; + } else + return false; + } + + /** + * Acquire for non-reentrant cases after initialTryLock prescreen + */ protected final boolean tryAcquire(int acquires) { - return nonfairTryAcquire(acquires); + if (getState() == 0 && compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(Thread.currentThread()); + return true; + } + return false; } } @@ -205,26 +252,34 @@ */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; + /** - * Fair version of tryAcquire. Don't grant access unless - * recursive call or no waiters or is first. + * Acquires only if reentrant or queue is empty. */ - @ReservedStackAccess - protected final boolean tryAcquire(int acquires) { - final Thread current = Thread.currentThread(); + final boolean initialTryLock() { + Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { - if (!hasQueuedPredecessors() && - compareAndSetState(0, acquires)) { + if (!hasQueuedThreads() && compareAndSetState(0, 1)) { setExclusiveOwnerThread(current); return true; } + } else if (getExclusiveOwnerThread() == current) { + if (++c < 0) // overflow + throw new Error("Maximum lock count exceeded"); + setState(c); + return true; } - else if (current == getExclusiveOwnerThread()) { - int nextc = c + acquires; - if (nextc < 0) - throw new Error("Maximum lock count exceeded"); - setState(nextc); + return false; + } + + /** + * Acquires only if thread is first waiter or empty + */ + protected final boolean tryAcquire(int acquires) { + if (getState() == 0 && !hasQueuedPredecessors() && + compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; @@ -264,7 +319,7 @@ * at which time the lock hold count is set to one. */ public void lock() { - sync.acquire(1); + sync.lock(); } /** @@ -314,7 +369,7 @@ * @throws InterruptedException if the current thread is interrupted */ public void lockInterruptibly() throws InterruptedException { - sync.acquireInterruptibly(1); + sync.lockInterruptibly(); } /** @@ -344,7 +399,7 @@ * thread; and {@code false} otherwise */ public boolean tryLock() { - return sync.nonfairTryAcquire(1); + return sync.tryLock(); } /** @@ -421,7 +476,7 @@ */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { - return sync.tryAcquireNanos(1, unit.toNanos(timeout)); + return sync.tryLockNanos(unit.toNanos(timeout)); } /** diff --git a/src/java.base/share/classes/java/util/concurrent/locks/StampedLock.java b/src/java.base/share/classes/java/util/concurrent/locks/StampedLock.java --- a/src/java.base/share/classes/java/util/concurrent/locks/StampedLock.java +++ b/src/java.base/share/classes/java/util/concurrent/locks/StampedLock.java @@ -35,9 +35,8 @@ package java.util.concurrent.locks; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; import java.util.concurrent.TimeUnit; +import jdk.internal.misc.Unsafe; import jdk.internal.vm.annotation.ReservedStackAccess; /** @@ -132,9 +131,8 @@ * *

    Memory Synchronization. Methods with the effect of * successfully locking in any mode have the same memory - * synchronization effects as a Lock action described in - * - * Chapter 17 of The Java™ Language Specification. + * synchronization effects as a Lock action, as described in + * Chapter 17 of The Java™ Language Specification. * Methods successfully unlocking in write mode have the same memory * synchronization effects as an Unlock action. In optimistic * read usages, actions prior to the most recent write mode unlock action @@ -237,6 +235,7 @@ * } * }}

* + * @jls 17.4 Memory Model * @since 1.8 * @author Doug Lea */ @@ -264,122 +263,54 @@ * updates. * * Waiters use a modified form of CLH lock used in - * AbstractQueuedSynchronizer (see its internal documentation for - * a fuller account), where each node is tagged (field mode) as - * either a reader or writer. Sets of waiting readers are grouped - * (linked) under a common node (field cowait) so act as a single - * node with respect to most CLH mechanics. By virtue of the - * queue structure, wait nodes need not actually carry sequence - * numbers; we know each is greater than its predecessor. This - * simplifies the scheduling policy to a mainly-FIFO scheme that - * incorporates elements of Phase-Fair locks (see Brandenburg & - * Anderson, especially http://www.cs.unc.edu/~bbb/diss/). In - * particular, we use the phase-fair anti-barging rule: If an - * incoming reader arrives while read lock is held but there is a - * queued writer, this incoming reader is queued. (This rule is - * responsible for some of the complexity of method acquireRead, - * but without it, the lock becomes highly unfair.) Method release - * does not (and sometimes cannot) itself wake up cowaiters. This - * is done by the primary thread, but helped by any other threads - * with nothing better to do in methods acquireRead and - * acquireWrite. + * AbstractQueuedSynchronizer (AQS; see its internal documentation + * for a fuller account), where each node is either a ReaderNode + * or WriterNode. Implementation of queued Writer mode is + * identical to AQS except for lock-state operations. Sets of + * waiting readers are grouped (linked) under a common node (field + * cowaiters) so act as a single node with respect to most CLH + * mechanics. This simplifies the scheduling policy to a + * mainly-FIFO scheme that incorporates elements of Phase-Fair + * locks (see Brandenburg & Anderson, especially + * http://www.cs.unc.edu/~bbb/diss/). Method release does not + * itself wake up cowaiters. This is done by the primary thread, + * but helped by other cowaiters as they awaken. * - * These rules apply to threads actually queued. All tryLock forms - * opportunistically try to acquire locks regardless of preference - * rules, and so may "barge" their way in. Randomized spinning is - * used in the acquire methods to reduce (increasingly expensive) - * context switching while also avoiding sustained memory - * thrashing among many threads. We limit spins to the head of - * queue. If, upon wakening, a thread fails to obtain lock, and is - * still (or becomes) the first waiting thread (which indicates - * that some other thread barged and obtained lock), it escalates - * spins (up to MAX_HEAD_SPINS) to reduce the likelihood of - * continually losing to barging threads. + * These rules apply to threads actually queued. Threads may also + * try to acquire locks before or in the process of enqueueing + * regardless of preference rules, and so may "barge" their way + * in. Methods writeLock and readLock (but not the other variants + * of each) first unconditionally try to CAS state, falling back + * to test-and-test-and-set retries on failure, slightly shrinking + * race windows on initial attempts, thus making success more + * likely. Also, when some threads cancel (via interrupt or + * timeout), phase-fairness is at best roughly approximated. * * Nearly all of these mechanics are carried out in methods * acquireWrite and acquireRead, that, as typical of such code, * sprawl out because actions and retries rely on consistent sets * of locally cached reads. * - * As noted in Boehm's paper (above), sequence validation (mainly - * method validate()) requires stricter ordering rules than apply - * to normal volatile reads (of "state"). To force orderings of - * reads before a validation and the validation itself in those - * cases where this is not already forced, we use acquireFence. - * Unlike in that paper, we allow writers to use plain writes. - * One would not expect reorderings of such writes with the lock - * acquisition CAS because there is a "control dependency", but it - * is theoretically possible, so we additionally add a - * storeStoreFence after lock acquisition CAS. - * - * ---------------------------------------------------------------- - * Here's an informal proof that plain reads by _successful_ - * readers see plain writes from preceding but not following - * writers (following Boehm and the C++ standard [atomics.fences]): - * - * Because of the total synchronization order of accesses to - * volatile long state containing the sequence number, writers and - * _successful_ readers can be globally sequenced. - * - * int x, y; - * - * Writer 1: - * inc sequence (odd - "locked") - * storeStoreFence(); - * x = 1; y = 2; - * inc sequence (even - "unlocked") - * - * Successful Reader: - * read sequence (even) - * // must see writes from Writer 1 but not Writer 2 - * r1 = x; r2 = y; - * acquireFence(); - * read sequence (even - validated unchanged) - * // use r1 and r2 - * - * Writer 2: - * inc sequence (odd - "locked") - * storeStoreFence(); - * x = 3; y = 4; - * inc sequence (even - "unlocked") - * - * Visibility of writer 1's stores is normal - reader's initial - * read of state synchronizes with writer 1's final write to state. - * Lack of visibility of writer 2's plain writes is less obvious. - * If reader's read of x or y saw writer 2's write, then (assuming - * semantics of C++ fences) the storeStoreFence would "synchronize" - * with reader's acquireFence and reader's validation read must see - * writer 2's initial write to state and so validation must fail. - * But making this "proof" formal and rigorous is an open problem! - * ---------------------------------------------------------------- + * For an explanation of the use of acquireFence, see + * http://gee.cs.oswego.edu/dl/html/j9mm.html as well as Boehm's + * paper (above). Note that sequence validation (mainly method + * validate()) requires stricter ordering rules than apply to + * normal volatile reads (of "state"). To ensure that writeLock + * acquisitions strictly precede subsequent writes in cases where + * this is not already forced, we use a storeStoreFence. * * The memory layout keeps lock state and queue pointers together * (normally on the same cache line). This usually works well for * read-mostly loads. In most other cases, the natural tendency of - * adaptive-spin CLH locks to reduce memory contention lessens - * motivation to further spread out contended locations, but might - * be subject to future improvements. + * CLH locks to reduce memory contention lessens motivation to + * further spread out contended locations, but might be subject to + * future improvements. */ private static final long serialVersionUID = -6001602636862214147L; - /** Number of processors, for spin control */ - private static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** Maximum number of retries before enqueuing on acquisition; at least 1 */ - private static final int SPINS = (NCPU > 1) ? 1 << 6 : 1; - - /** Maximum number of tries before blocking at head on acquisition */ - private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 1; - - /** Maximum number of retries before re-blocking */ - private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 1; - - /** The period for yielding when waiting for overflow spinlock */ - private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 - /** The number of bits to use for reader count before overflowing */ - private static final int LG_READERS = 7; + private static final int LG_READERS = 7; // 127 readers // Values for lock state and stamp operations private static final long RUNIT = 1L; @@ -388,6 +319,8 @@ private static final long RFULL = RBITS - 1L; private static final long ABITS = RBITS | WBIT; private static final long SBITS = ~RBITS; // note overlap with ABITS + // not writing and conservatively non-overflowing + private static final long RSAFE = ~(3L << (LG_READERS - 1)); /* * 3 stamp modes can be distinguished by examining (m = stamp & ABITS): @@ -408,29 +341,64 @@ // Special value from cancelled acquire methods so caller can throw IE private static final long INTERRUPTED = 1L; - // Values for node status; order matters - private static final int WAITING = -1; - private static final int CANCELLED = 1; - - // Modes for nodes (int not boolean to allow arithmetic) - private static final int RMODE = 0; - private static final int WMODE = 1; + // Bits for Node.status + static final int WAITING = 1; + static final int CANCELLED = 0x80000000; // must be negative - /** Wait nodes */ - static final class WNode { - volatile WNode prev; - volatile WNode next; - volatile WNode cowait; // list of linked readers - volatile Thread thread; // non-null while possibly parked - volatile int status; // 0, WAITING, or CANCELLED - final int mode; // RMODE or WMODE - WNode(int m, WNode p) { mode = m; prev = p; } + /** CLH nodes */ + abstract static class Node { + volatile Node prev; // initially attached via casTail + volatile Node next; // visibly nonnull when signallable + Thread waiter; // visibly nonnull when enqueued + volatile int status; // written by owner, atomic bit ops by others + + // methods for atomic operations + final boolean casPrev(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, PREV, c, v); + } + final boolean casNext(Node c, Node v) { // for cleanQueue + return U.weakCompareAndSetReference(this, NEXT, c, v); + } + final int getAndUnsetStatus(int v) { // for signalling + return U.getAndBitwiseAndInt(this, STATUS, ~v); + } + final void setPrevRelaxed(Node p) { // for off-queue assignment + U.putReference(this, PREV, p); + } + final void setStatusRelaxed(int s) { // for off-queue assignment + U.putInt(this, STATUS, s); + } + final void clearStatus() { // for reducing unneeded signals + U.putIntOpaque(this, STATUS, 0); + } + + private static final long STATUS + = U.objectFieldOffset(Node.class, "status"); + private static final long NEXT + = U.objectFieldOffset(Node.class, "next"); + private static final long PREV + = U.objectFieldOffset(Node.class, "prev"); + } + + static final class WriterNode extends Node { // node for writers + } + + static final class ReaderNode extends Node { // node for readers + volatile ReaderNode cowaiters; // list of linked readers + final boolean casCowaiters(ReaderNode c, ReaderNode v) { + return U.weakCompareAndSetReference(this, COWAITERS, c, v); + } + final void setCowaitersRelaxed(ReaderNode p) { + U.putReference(this, COWAITERS, p); + } + private static final long COWAITERS + = U.objectFieldOffset(ReaderNode.class, "cowaiters"); } /** Head of CLH queue */ - private transient volatile WNode whead; + private transient volatile Node head; /** Tail (last) of CLH queue */ - private transient volatile WNode wtail; + private transient volatile Node tail; // views transient ReadLockView readLockView; @@ -449,20 +417,52 @@ state = ORIGIN; } - private boolean casState(long expectedValue, long newValue) { - return STATE.compareAndSet(this, expectedValue, newValue); + // internal lock methods + + private boolean casState(long expect, long update) { + return U.compareAndSetLong(this, STATE, expect, update); } - private long tryWriteLock(long s) { - // assert (s & ABITS) == 0L; - long next; - if (casState(s, next = s | WBIT)) { - VarHandle.storeStoreFence(); - return next; + @ReservedStackAccess + private long tryAcquireWrite() { + long s, nextState; + if (((s = state) & ABITS) == 0L && casState(s, nextState = s | WBIT)) { + U.storeStoreFence(); + return nextState; } return 0L; } + @ReservedStackAccess + private long tryAcquireRead() { + for (long s, m, nextState;;) { + if ((m = (s = state) & ABITS) < RFULL) { + if (casState(s, nextState = s + RUNIT)) + return nextState; + } + else if (m == WBIT) + return 0L; + else if ((nextState = tryIncReaderOverflow(s)) != 0L) + return nextState; + } + } + + /** + * Returns an unlocked state, incrementing the version and + * avoiding special failure value 0L. + * + * @param s a write-locked state (or stamp) + */ + private static long unlockWriteState(long s) { + return ((s += WBIT) == 0L) ? ORIGIN : s; + } + + private long releaseWrite(long s) { + long nextState = state = unlockWriteState(s); + signalNext(head); + return nextState; + } + /** * Exclusively acquires the lock, blocking if necessary * until available. @@ -471,8 +471,13 @@ */ @ReservedStackAccess public long writeLock() { - long next; - return ((next = tryWriteLock()) != 0L) ? next : acquireWrite(false, 0L); + // try unconditional CAS confirming weak read + long s = U.getLongOpaque(this, STATE) & ~ABITS, nextState; + if (casState(s, nextState = s | WBIT)) { + U.storeStoreFence(); + return nextState; + } + return acquireWrite(false, false, 0L); } /** @@ -481,10 +486,8 @@ * @return a write stamp that can be used to unlock or convert mode, * or zero if the lock is not available */ - @ReservedStackAccess public long tryWriteLock() { - long s; - return (((s = state) & ABITS) == 0L) ? tryWriteLock(s) : 0L; + return tryAcquireWrite(); } /** @@ -504,15 +507,14 @@ throws InterruptedException { long nanos = unit.toNanos(time); if (!Thread.interrupted()) { - long next, deadline; - if ((next = tryWriteLock()) != 0L) - return next; + long nextState; + if ((nextState = tryAcquireWrite()) != 0L) + return nextState; if (nanos <= 0L) return 0L; - if ((deadline = System.nanoTime() + nanos) == 0L) - deadline = 1L; - if ((next = acquireWrite(true, deadline)) != INTERRUPTED) - return next; + nextState = acquireWrite(true, true, System.nanoTime() + nanos); + if (nextState != INTERRUPTED) + return nextState; } throw new InterruptedException(); } @@ -527,12 +529,12 @@ * @throws InterruptedException if the current thread is interrupted * before acquiring the lock */ - @ReservedStackAccess public long writeLockInterruptibly() throws InterruptedException { - long next; + long nextState; if (!Thread.interrupted() && - (next = acquireWrite(true, 0L)) != INTERRUPTED) - return next; + ((nextState = tryAcquireWrite()) != 0L || + (nextState = acquireWrite(true, false, 0L)) != INTERRUPTED)) + return nextState; throw new InterruptedException(); } @@ -544,13 +546,12 @@ */ @ReservedStackAccess public long readLock() { - long s, next; - // bypass acquireRead on common uncontended case - return (whead == wtail - && ((s = state) & ABITS) < RFULL - && casState(s, next = s + RUNIT)) - ? next - : acquireRead(false, 0L); + // unconditionally optimistically try non-overflow case once + long s = U.getLongOpaque(this, STATE) & RSAFE, nextState; + if (casState(s, nextState = s + RUNIT)) + return nextState; + else + return acquireRead(false, false, 0L); } /** @@ -559,18 +560,8 @@ * @return a read stamp that can be used to unlock or convert mode, * or zero if the lock is not available */ - @ReservedStackAccess public long tryReadLock() { - long s, m, next; - while ((m = (s = state) & ABITS) != WBIT) { - if (m < RFULL) { - if (casState(s, next = s + RUNIT)) - return next; - } - else if ((next = tryIncReaderOverflow(s)) != 0L) - return next; - } - return 0L; + return tryAcquireRead(); } /** @@ -586,26 +577,18 @@ * @throws InterruptedException if the current thread is interrupted * before acquiring the lock */ - @ReservedStackAccess public long tryReadLock(long time, TimeUnit unit) throws InterruptedException { - long s, m, next, deadline; long nanos = unit.toNanos(time); if (!Thread.interrupted()) { - if ((m = (s = state) & ABITS) != WBIT) { - if (m < RFULL) { - if (casState(s, next = s + RUNIT)) - return next; - } - else if ((next = tryIncReaderOverflow(s)) != 0L) - return next; - } + long nextState; + if (tail == head && (nextState = tryAcquireRead()) != 0L) + return nextState; if (nanos <= 0L) return 0L; - if ((deadline = System.nanoTime() + nanos) == 0L) - deadline = 1L; - if ((next = acquireRead(true, deadline)) != INTERRUPTED) - return next; + nextState = acquireRead(true, true, System.nanoTime() + nanos); + if (nextState != INTERRUPTED) + return nextState; } throw new InterruptedException(); } @@ -620,17 +603,12 @@ * @throws InterruptedException if the current thread is interrupted * before acquiring the lock */ - @ReservedStackAccess public long readLockInterruptibly() throws InterruptedException { - long s, next; - if (!Thread.interrupted() - // bypass acquireRead on common uncontended case - && ((whead == wtail - && ((s = state) & ABITS) < RFULL - && casState(s, next = s + RUNIT)) - || - (next = acquireRead(true, 0L)) != INTERRUPTED)) - return next; + long nextState; + if (!Thread.interrupted() && + ((nextState = tryAcquireRead()) != 0L || + (nextState = acquireRead(true, false, 0L)) != INTERRUPTED)) + return nextState; throw new InterruptedException(); } @@ -658,29 +636,11 @@ * since issuance of the given stamp; else false */ public boolean validate(long stamp) { - VarHandle.acquireFence(); + U.loadFence(); return (stamp & SBITS) == (state & SBITS); } /** - * Returns an unlocked state, incrementing the version and - * avoiding special failure value 0L. - * - * @param s a write-locked state (or stamp) - */ - private static long unlockWriteState(long s) { - return ((s += WBIT) == 0L) ? ORIGIN : s; - } - - private long unlockWriteInternal(long s) { - long next; WNode h; - STATE.setVolatile(this, next = unlockWriteState(s)); - if ((h = whead) != null && h.status != 0) - release(h); - return next; - } - - /** * If the lock state matches the given stamp, releases the * exclusive lock. * @@ -692,7 +652,7 @@ public void unlockWrite(long stamp) { if (state != stamp || (stamp & WBIT) == 0L) throw new IllegalMonitorStateException(); - unlockWriteInternal(stamp); + releaseWrite(stamp); } /** @@ -705,19 +665,20 @@ */ @ReservedStackAccess public void unlockRead(long stamp) { - long s, m; WNode h; - while (((s = state) & SBITS) == (stamp & SBITS) - && (stamp & RBITS) > 0L - && ((m = s & RBITS) > 0L)) { - if (m < RFULL) { - if (casState(s, s - RUNIT)) { - if (m == RUNIT && (h = whead) != null && h.status != 0) - release(h); + long s, m; + if ((stamp & RBITS) != 0L) { + while (((s = state) & SBITS) == (stamp & SBITS) && + ((m = s & RBITS) != 0L)) { + if (m < RFULL) { + if (casState(s, s - RUNIT)) { + if (m == RUNIT) + signalNext(head); + return; + } + } + else if (tryDecReaderOverflow(s) != 0L) return; - } } - else if (tryDecReaderOverflow(s) != 0L) - return; } throw new IllegalMonitorStateException(); } @@ -730,7 +691,6 @@ * @throws IllegalMonitorStateException if the stamp does * not match the current state of this lock */ - @ReservedStackAccess public void unlock(long stamp) { if ((stamp & WBIT) != 0L) unlockWrite(stamp); @@ -751,26 +711,23 @@ * @return a valid write stamp, or zero on failure */ public long tryConvertToWriteLock(long stamp) { - long a = stamp & ABITS, m, s, next; + long a = stamp & ABITS, m, s, nextState; while (((s = state) & SBITS) == (stamp & SBITS)) { if ((m = s & ABITS) == 0L) { if (a != 0L) break; - if ((next = tryWriteLock(s)) != 0L) - return next; - } - else if (m == WBIT) { + if (casState(s, nextState = s | WBIT)) { + U.storeStoreFence(); + return nextState; + } + } else if (m == WBIT) { if (a != m) break; return stamp; - } - else if (m == RUNIT && a != 0L) { - if (casState(s, next = s - RUNIT + WBIT)) { - VarHandle.storeStoreFence(); - return next; - } - } - else + } else if (m == RUNIT && a != 0L) { + if (casState(s, nextState = s - RUNIT + WBIT)) + return nextState; + } else break; } return 0L; @@ -788,28 +745,21 @@ * @return a valid read stamp, or zero on failure */ public long tryConvertToReadLock(long stamp) { - long a, s, next; WNode h; + long a, s, nextState; while (((s = state) & SBITS) == (stamp & SBITS)) { if ((a = stamp & ABITS) >= WBIT) { - // write stamp - if (s != stamp) + if (s != stamp) // write stamp break; - STATE.setVolatile(this, next = unlockWriteState(s) + RUNIT); - if ((h = whead) != null && h.status != 0) - release(h); - return next; - } - else if (a == 0L) { - // optimistic read stamp + nextState = state = unlockWriteState(s) + RUNIT; + signalNext(head); + return nextState; + } else if (a == 0L) { // optimistic read stamp if ((s & ABITS) < RFULL) { - if (casState(s, next = s + RUNIT)) - return next; - } - else if ((next = tryIncReaderOverflow(s)) != 0L) - return next; - } - else { - // already a read stamp + if (casState(s, nextState = s + RUNIT)) + return nextState; + } else if ((nextState = tryIncReaderOverflow(s)) != 0L) + return nextState; + } else { // already a read stamp if ((s & ABITS) == 0L) break; return stamp; @@ -829,29 +779,25 @@ * @return a valid optimistic read stamp, or zero on failure */ public long tryConvertToOptimisticRead(long stamp) { - long a, m, s, next; WNode h; - VarHandle.acquireFence(); + long a, m, s, nextState; + U.loadFence(); while (((s = state) & SBITS) == (stamp & SBITS)) { if ((a = stamp & ABITS) >= WBIT) { - // write stamp - if (s != stamp) + if (s != stamp) // write stamp break; - return unlockWriteInternal(s); - } - else if (a == 0L) - // already an optimistic read stamp + return releaseWrite(s); + } else if (a == 0L) { // already an optimistic read stamp return stamp; - else if ((m = s & ABITS) == 0L) // invalid read stamp + } else if ((m = s & ABITS) == 0L) { // invalid read stamp break; - else if (m < RFULL) { - if (casState(s, next = s - RUNIT)) { - if (m == RUNIT && (h = whead) != null && h.status != 0) - release(h); - return next & SBITS; + } else if (m < RFULL) { + if (casState(s, nextState = s - RUNIT)) { + if (m == RUNIT) + signalNext(head); + return nextState & SBITS; } - } - else if ((next = tryDecReaderOverflow(s)) != 0L) - return next & SBITS; + } else if ((nextState = tryDecReaderOverflow(s)) != 0L) + return nextState & SBITS; } return 0L; } @@ -867,7 +813,7 @@ public boolean tryUnlockWrite() { long s; if (((s = state) & WBIT) != 0L) { - unlockWriteInternal(s); + releaseWrite(s); return true; } return false; @@ -882,12 +828,12 @@ */ @ReservedStackAccess public boolean tryUnlockRead() { - long s, m; WNode h; + long s, m; while ((m = (s = state) & ABITS) != 0L && m < WBIT) { if (m < RFULL) { if (casState(s, s - RUNIT)) { - if (m == RUNIT && (h = whead) != null && h.status != 0) - release(h); + if (m == RUNIT) + signalNext(head); return true; } } @@ -1133,16 +1079,16 @@ long s; if (((s = state) & WBIT) == 0L) throw new IllegalMonitorStateException(); - unlockWriteInternal(s); + releaseWrite(s); } final void unstampedUnlockRead() { - long s, m; WNode h; + long s, m; while ((m = (s = state) & RBITS) > 0L) { if (m < RFULL) { if (casState(s, s - RUNIT)) { - if (m == RUNIT && (h = whead) != null && h.status != 0) - release(h); + if (m == RUNIT) + signalNext(head); return; } } @@ -1155,10 +1101,10 @@ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); - STATE.setVolatile(this, ORIGIN); // reset to unlocked state + state = ORIGIN; // reset to unlocked state } - // internals + // overflow handling methods /** * Tries to increment readerOverflow by first setting state @@ -1170,17 +1116,12 @@ */ private long tryIncReaderOverflow(long s) { // assert (s & ABITS) >= RFULL; - if ((s & ABITS) == RFULL) { - if (casState(s, s | RBITS)) { - ++readerOverflow; - STATE.setVolatile(this, s); - return s; - } + if ((s & ABITS) != RFULL) + Thread.onSpinWait(); + else if (casState(s, s | RBITS)) { + ++readerOverflow; + return state = s; } - else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0) - Thread.yield(); - else - Thread.onSpinWait(); return 0L; } @@ -1192,153 +1133,132 @@ */ private long tryDecReaderOverflow(long s) { // assert (s & ABITS) >= RFULL; - if ((s & ABITS) == RFULL) { - if (casState(s, s | RBITS)) { - int r; long next; - if ((r = readerOverflow) > 0) { - readerOverflow = r - 1; - next = s; - } - else - next = s - RUNIT; - STATE.setVolatile(this, next); - return next; + if ((s & ABITS) != RFULL) + Thread.onSpinWait(); + else if (casState(s, s | RBITS)) { + int r; long nextState; + if ((r = readerOverflow) > 0) { + readerOverflow = r - 1; + nextState = s; } + else + nextState = s - RUNIT; + return state = nextState; } - else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0) - Thread.yield(); - else - Thread.onSpinWait(); return 0L; } + // release methods + /** - * Wakes up the successor of h (normally whead). This is normally - * just h.next, but may require traversal from wtail if next - * pointers are lagging. This may fail to wake up an acquiring - * thread when one or more have been cancelled, but the cancel - * methods themselves provide extra safeguards to ensure liveness. + * Wakes up the successor of given node, if one exists, and unsets its + * WAITING status to avoid park race. This may fail to wake up an + * eligible thread when one or more have been cancelled, but + * cancelAcquire ensures liveness. */ - private void release(WNode h) { - if (h != null) { - WNode q; Thread w; - WSTATUS.compareAndSet(h, WAITING, 0); - if ((q = h.next) == null || q.status == CANCELLED) { - for (WNode t = wtail; t != null && t != h; t = t.prev) - if (t.status <= 0) - q = t; - } - if (q != null && (w = q.thread) != null) - LockSupport.unpark(w); + static final void signalNext(Node h) { + Node s; + if (h != null && (s = h.next) != null && s.status > 0) { + s.getAndUnsetStatus(WAITING); + LockSupport.unpark(s.waiter); } } /** - * See above for explanation. + * Removes and unparks all cowaiters of node, if it exists. + */ + private static void signalCowaiters(ReaderNode node) { + if (node != null) { + for (ReaderNode c; (c = node.cowaiters) != null; ) { + if (node.casCowaiters(c, c.cowaiters)) + LockSupport.unpark(c.waiter); + } + } + } + + // queue link methods + private boolean casTail(Node c, Node v) { + return U.compareAndSetReference(this, TAIL, c, v); + } + + /** tries once to CAS a new dummy node for head */ + private void tryInitializeHead() { + Node h = new WriterNode(); + if (U.compareAndSetReference(this, HEAD, null, h)) + tail = h; + } + + /** + * For explanation, see above and AbstractQueuedSynchronizer + * internal documentation. * * @param interruptible true if should check interrupts and if so * return INTERRUPTED - * @param deadline if nonzero, the System.nanoTime value to timeout - * at (and return zero) + * @param timed if true use timed waits + * @param time the System.nanoTime value to timeout at (and return zero) * @return next state, or INTERRUPTED */ - private long acquireWrite(boolean interruptible, long deadline) { - WNode node = null, p; - for (int spins = -1;;) { // spin while enqueuing - long m, s, ns; - if ((m = (s = state) & ABITS) == 0L) { - if ((ns = tryWriteLock(s)) != 0L) - return ns; + private long acquireWrite(boolean interruptible, boolean timed, long time) { + byte spins = 0, postSpins = 0; // retries upon unpark of first thread + boolean interrupted = false, first = false; + WriterNode node = null; + Node pred = null; + for (long s, nextState;;) { + if (!first && (pred = (node == null) ? null : node.prev) != null && + !(first = (head == pred))) { + if (pred.status < 0) { + cleanQueue(); // predecessor cancelled + continue; + } else if (pred.prev == null) { + Thread.onSpinWait(); // ensure serialization + continue; + } } - else if (spins < 0) - spins = (m == WBIT && wtail == whead) ? SPINS : 0; - else if (spins > 0) { + if ((first || pred == null) && ((s = state) & ABITS) == 0L && + casState(s, nextState = s | WBIT)) { + U.storeStoreFence(); + if (first) { + node.prev = null; + head = node; + pred.next = null; + node.waiter = null; + if (interrupted) + Thread.currentThread().interrupt(); + } + return nextState; + } else if (node == null) { // retry before enqueuing + node = new WriterNode(); + } else if (pred == null) { // try to enqueue + Node t = tail; + node.setPrevRelaxed(t); + if (t == null) + tryInitializeHead(); + else if (!casTail(t, node)) + node.setPrevRelaxed(null); // back out + else + t.next = node; + } else if (first && spins != 0) { // reduce unfairness --spins; Thread.onSpinWait(); - } - else if ((p = wtail) == null) { // initialize queue - WNode hd = new WNode(WMODE, null); - if (WHEAD.weakCompareAndSet(this, null, hd)) - wtail = hd; - } - else if (node == null) - node = new WNode(WMODE, p); - else if (node.prev != p) - node.prev = p; - else if (WTAIL.weakCompareAndSet(this, p, node)) { - p.next = node; - break; + } else if (node.status == 0) { // enable signal + if (node.waiter == null) + node.waiter = Thread.currentThread(); + node.status = WAITING; + } else { + long nanos; + spins = postSpins = (byte)((postSpins << 1) | 1); + if (!timed) + LockSupport.park(this); + else if ((nanos = time - System.nanoTime()) > 0L) + LockSupport.parkNanos(this, nanos); + else + break; + node.clearStatus(); + if ((interrupted |= Thread.interrupted()) && interruptible) + break; } } - - boolean wasInterrupted = false; - for (int spins = -1;;) { - WNode h, np, pp; int ps; - if ((h = whead) == p) { - if (spins < 0) - spins = HEAD_SPINS; - else if (spins < MAX_HEAD_SPINS) - spins <<= 1; - for (int k = spins; k > 0; --k) { // spin at head - long s, ns; - if (((s = state) & ABITS) == 0L) { - if ((ns = tryWriteLock(s)) != 0L) { - whead = node; - node.prev = null; - if (wasInterrupted) - Thread.currentThread().interrupt(); - return ns; - } - } - else - Thread.onSpinWait(); - } - } - else if (h != null) { // help release stale waiters - WNode c; Thread w; - while ((c = h.cowait) != null) { - if (WCOWAIT.weakCompareAndSet(h, c, c.cowait) && - (w = c.thread) != null) - LockSupport.unpark(w); - } - } - if (whead == h) { - if ((np = node.prev) != p) { - if (np != null) - (p = np).next = node; // stale - } - else if ((ps = p.status) == 0) - WSTATUS.compareAndSet(p, 0, WAITING); - else if (ps == CANCELLED) { - if ((pp = p.prev) != null) { - node.prev = pp; - pp.next = node; - } - } - else { - long time; // 0 argument to park means no timeout - if (deadline == 0L) - time = 0L; - else if ((time = deadline - System.nanoTime()) <= 0L) - return cancelWaiter(node, node, false); - Thread wt = Thread.currentThread(); - node.thread = wt; - if (p.status < 0 && (p != h || (state & ABITS) != 0L) && - whead == h && node.prev == p) { - if (time == 0L) - LockSupport.park(this); - else - LockSupport.parkNanos(this, time); - } - node.thread = null; - if (Thread.interrupted()) { - if (interruptible) - return cancelWaiter(node, node, true); - wasInterrupted = true; - } - } - } - } + return cancelAcquire(node, interrupted); } /** @@ -1346,182 +1266,178 @@ * * @param interruptible true if should check interrupts and if so * return INTERRUPTED - * @param deadline if nonzero, the System.nanoTime value to timeout - * at (and return zero) + * @param timed if true use timed waits + * @param time the System.nanoTime value to timeout at (and return zero) * @return next state, or INTERRUPTED */ - private long acquireRead(boolean interruptible, long deadline) { - boolean wasInterrupted = false; - WNode node = null, p; - for (int spins = -1;;) { - WNode h; - if ((h = whead) == (p = wtail)) { - for (long m, s, ns;;) { - if ((m = (s = state) & ABITS) < RFULL ? - casState(s, ns = s + RUNIT) : - (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { - if (wasInterrupted) - Thread.currentThread().interrupt(); - return ns; + private long acquireRead(boolean interruptible, boolean timed, long time) { + boolean interrupted = false; + ReaderNode node = null; + /* + * Loop: + * if empty, try to acquire + * if tail is Reader, try to cowait; restart if leader stale or cancels + * else try to create and enqueue node, and wait in 2nd loop below + */ + for (;;) { + ReaderNode leader; long nextState; + Node tailPred = null, t = tail; + if ((t == null || (tailPred = t.prev) == null) && + (nextState = tryAcquireRead()) != 0L) // try now if empty + return nextState; + else if (t == null) + tryInitializeHead(); + else if (tailPred == null || !(t instanceof ReaderNode)) { + if (node == null) + node = new ReaderNode(); + if (tail == t) { + node.setPrevRelaxed(t); + if (casTail(t, node)) { + t.next = node; + break; // node is leader; wait in loop below } - else if (m >= WBIT) { - if (spins > 0) { - --spins; - Thread.onSpinWait(); - } - else { - if (spins == 0) { - WNode nh = whead, np = wtail; - if ((nh == h && np == p) || (h = nh) != (p = np)) - break; - } - spins = SPINS; - } + node.setPrevRelaxed(null); + } + } else if ((leader = (ReaderNode)t) == tail) { // try to cowait + for (boolean attached = false;;) { + if (leader.status < 0 || leader.prev == null) + break; + else if (node == null) + node = new ReaderNode(); + else if (node.waiter == null) + node.waiter = Thread.currentThread(); + else if (!attached) { + ReaderNode c = leader.cowaiters; + node.setCowaitersRelaxed(c); + attached = leader.casCowaiters(c, node); + if (!attached) + node.setCowaitersRelaxed(null); + } else { + long nanos = 0L; + if (!timed) + LockSupport.park(this); + else if ((nanos = time - System.nanoTime()) > 0L) + LockSupport.parkNanos(this, nanos); + interrupted |= Thread.interrupted(); + if ((interrupted && interruptible) || + (timed && nanos <= 0L)) + return cancelCowaiter(node, leader, interrupted); } } - } - if (p == null) { // initialize queue - WNode hd = new WNode(WMODE, null); - if (WHEAD.weakCompareAndSet(this, null, hd)) - wtail = hd; - } - else if (node == null) - node = new WNode(RMODE, p); - else if (h == p || p.mode != RMODE) { - if (node.prev != p) - node.prev = p; - else if (WTAIL.weakCompareAndSet(this, p, node)) { - p.next = node; - break; - } - } - else if (!WCOWAIT.compareAndSet(p, node.cowait = p.cowait, node)) - node.cowait = null; - else { - for (;;) { - WNode pp, c; Thread w; - if ((h = whead) != null && (c = h.cowait) != null && - WCOWAIT.compareAndSet(h, c, c.cowait) && - (w = c.thread) != null) // help release - LockSupport.unpark(w); - if (Thread.interrupted()) { - if (interruptible) - return cancelWaiter(node, p, true); - wasInterrupted = true; - } - if (h == (pp = p.prev) || h == p || pp == null) { - long m, s, ns; - do { - if ((m = (s = state) & ABITS) < RFULL ? - casState(s, ns = s + RUNIT) : - (m < WBIT && - (ns = tryIncReaderOverflow(s)) != 0L)) { - if (wasInterrupted) - Thread.currentThread().interrupt(); - return ns; - } - } while (m < WBIT); - } - if (whead == h && p.prev == pp) { - long time; - if (pp == null || h == p || p.status > 0) { - node = null; // throw away - break; - } - if (deadline == 0L) - time = 0L; - else if ((time = deadline - System.nanoTime()) <= 0L) { - if (wasInterrupted) - Thread.currentThread().interrupt(); - return cancelWaiter(node, p, false); - } - Thread wt = Thread.currentThread(); - node.thread = wt; - if ((h != pp || (state & ABITS) == WBIT) && - whead == h && p.prev == pp) { - if (time == 0L) - LockSupport.park(this); - else - LockSupport.parkNanos(this, time); - } - node.thread = null; - } - } + if (node != null) + node.waiter = null; + long ns = tryAcquireRead(); + signalCowaiters(leader); + if (interrupted) + Thread.currentThread().interrupt(); + if (ns != 0L) + return ns; + else + node = null; // restart if stale, missed, or leader cancelled } } - for (int spins = -1;;) { - WNode h, np, pp; int ps; - if ((h = whead) == p) { - if (spins < 0) - spins = HEAD_SPINS; - else if (spins < MAX_HEAD_SPINS) - spins <<= 1; - for (int k = spins;;) { // spin at head - long m, s, ns; - if ((m = (s = state) & ABITS) < RFULL ? - casState(s, ns = s + RUNIT) : - (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { - WNode c; Thread w; - whead = node; - node.prev = null; - while ((c = node.cowait) != null) { - if (WCOWAIT.compareAndSet(node, c, c.cowait) && - (w = c.thread) != null) - LockSupport.unpark(w); - } - if (wasInterrupted) - Thread.currentThread().interrupt(); - return ns; - } - else if (m >= WBIT && --k <= 0) - break; - else - Thread.onSpinWait(); + // node is leader of a cowait group; almost same as acquireWrite + byte spins = 0, postSpins = 0; // retries upon unpark of first thread + boolean first = false; + Node pred = null; + for (long nextState;;) { + if (!first && (pred = node.prev) != null && + !(first = (head == pred))) { + if (pred.status < 0) { + cleanQueue(); // predecessor cancelled + continue; + } else if (pred.prev == null) { + Thread.onSpinWait(); // ensure serialization + continue; } } - else if (h != null) { - WNode c; Thread w; - while ((c = h.cowait) != null) { - if (WCOWAIT.compareAndSet(h, c, c.cowait) && - (w = c.thread) != null) - LockSupport.unpark(w); - } - } - if (whead == h) { - if ((np = node.prev) != p) { - if (np != null) - (p = np).next = node; // stale - } - else if ((ps = p.status) == 0) - WSTATUS.compareAndSet(p, 0, WAITING); - else if (ps == CANCELLED) { - if ((pp = p.prev) != null) { - node.prev = pp; - pp.next = node; - } + if ((first || pred == null) && + (nextState = tryAcquireRead()) != 0L) { + if (first) { + node.prev = null; + head = node; + pred.next = null; + node.waiter = null; } - else { - long time; - if (deadline == 0L) - time = 0L; - else if ((time = deadline - System.nanoTime()) <= 0L) - return cancelWaiter(node, node, false); - Thread wt = Thread.currentThread(); - node.thread = wt; - if (p.status < 0 && - (p != h || (state & ABITS) == WBIT) && - whead == h && node.prev == p) { - if (time == 0L) - LockSupport.park(this); - else - LockSupport.parkNanos(this, time); + signalCowaiters(node); + if (interrupted) + Thread.currentThread().interrupt(); + return nextState; + } else if (first && spins != 0) { + --spins; + Thread.onSpinWait(); + } else if (node.status == 0) { + if (node.waiter == null) + node.waiter = Thread.currentThread(); + node.status = WAITING; + } else { + long nanos; + spins = postSpins = (byte)((postSpins << 1) | 1); + if (!timed) + LockSupport.park(this); + else if ((nanos = time - System.nanoTime()) > 0L) + LockSupport.parkNanos(this, nanos); + else + break; + node.clearStatus(); + if ((interrupted |= Thread.interrupted()) && interruptible) + break; + } + } + return cancelAcquire(node, interrupted); + } + + // Cancellation support + + /** + * Possibly repeatedly traverses from tail, unsplicing cancelled + * nodes until none are found. Unparks nodes that may have been + * relinked to be next eligible acquirer. + */ + private void cleanQueue() { + for (;;) { // restart point + for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples + if (q == null || (p = q.prev) == null) + return; // end of list + if (s == null ? tail != q : (s.prev != q || s.status < 0)) + break; // inconsistent + if (q.status < 0) { // cancelled + if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && + q.prev == p) { + p.casNext(q, s); // OK if fails + if (p.prev == null) + signalNext(p); } - node.thread = null; - if (Thread.interrupted()) { - if (interruptible) - return cancelWaiter(node, node, true); - wasInterrupted = true; + break; + } + if ((n = p.next) != q) { // help finish + if (n != null && q.prev == p && q.status >= 0) { + p.casNext(n, q); + if (p.prev == null) + signalNext(p); + } + break; + } + s = q; + q = q.prev; + } + } + } + + /** + * If leader exists, possibly repeatedly traverses cowaiters, + * unsplicing the given cancelled node until not found. + */ + private void unlinkCowaiter(ReaderNode node, ReaderNode leader) { + if (leader != null) { + while (leader.prev != null && leader.status >= 0) { + for (ReaderNode p = leader, q; ; p = q) { + if ((q = p.cowaiters) == null) + return; + if (q == node) { + p.casCowaiters(q, q.cowaiters); + break; // recheck even if succeeded } } } @@ -1530,105 +1446,53 @@ /** * If node non-null, forces cancel status and unsplices it from - * queue if possible and wakes up any cowaiters (of the node, or - * group, as applicable), and in any case helps release current - * first waiter if lock is free. (Calling with null arguments - * serves as a conditional form of release, which is not currently - * needed but may be needed under possible future cancellation - * policies). This is a variant of cancellation methods in - * AbstractQueuedSynchronizer (see its detailed explanation in AQS - * internal documentation). + * queue, wakes up any cowaiters, and possibly wakes up successor + * to recheck status. * - * @param node if non-null, the waiter - * @param group either node or the group node is cowaiting with + * @param node the waiter (may be null if not yet enqueued) * @param interrupted if already interrupted * @return INTERRUPTED if interrupted or Thread.interrupted, else zero */ - private long cancelWaiter(WNode node, WNode group, boolean interrupted) { - if (node != null && group != null) { - Thread w; + private long cancelAcquire(Node node, boolean interrupted) { + if (node != null) { + node.waiter = null; node.status = CANCELLED; - // unsplice cancelled nodes from group - for (WNode p = group, q; (q = p.cowait) != null;) { - if (q.status == CANCELLED) { - WCOWAIT.compareAndSet(p, q, q.cowait); - p = group; // restart - } - else - p = q; - } - if (group == node) { - for (WNode r = group.cowait; r != null; r = r.cowait) { - if ((w = r.thread) != null) - LockSupport.unpark(w); // wake up uncancelled co-waiters - } - for (WNode pred = node.prev; pred != null; ) { // unsplice - WNode succ, pp; // find valid successor - while ((succ = node.next) == null || - succ.status == CANCELLED) { - WNode q = null; // find successor the slow way - for (WNode t = wtail; t != null && t != node; t = t.prev) - if (t.status != CANCELLED) - q = t; // don't link if succ cancelled - if (succ == q || // ensure accurate successor - WNEXT.compareAndSet(node, succ, succ = q)) { - if (succ == null && node == wtail) - WTAIL.compareAndSet(this, node, pred); - break; - } - } - if (pred.next == node) // unsplice pred link - WNEXT.compareAndSet(pred, node, succ); - if (succ != null && (w = succ.thread) != null) { - // wake up succ to observe new pred - succ.thread = null; - LockSupport.unpark(w); - } - if (pred.status != CANCELLED || (pp = pred.prev) == null) - break; - node.prev = pp; // repeat if new pred wrong/cancelled - WNEXT.compareAndSet(pp, pred, succ); - pred = pp; - } - } - } - WNode h; // Possibly release first waiter - while ((h = whead) != null) { - long s; WNode q; // similar to release() but check eligibility - if ((q = h.next) == null || q.status == CANCELLED) { - for (WNode t = wtail; t != null && t != h; t = t.prev) - if (t.status <= 0) - q = t; - } - if (h == whead) { - if (q != null && h.status == 0 && - ((s = state) & ABITS) != WBIT && // waiter is eligible - (s == 0L || q.mode == RMODE)) - release(h); - break; - } + cleanQueue(); + if (node instanceof ReaderNode) + signalCowaiters((ReaderNode)node); } return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L; } - // VarHandle mechanics - private static final VarHandle STATE; - private static final VarHandle WHEAD; - private static final VarHandle WTAIL; - private static final VarHandle WNEXT; - private static final VarHandle WSTATUS; - private static final VarHandle WCOWAIT; + /** + * If node non-null, forces cancel status and unsplices from + * leader's cowaiters list unless/until it is also cancelled. + * + * @param node if non-null, the waiter + * @param leader if non-null, the node heading cowaiters list + * @param interrupted if already interrupted + * @return INTERRUPTED if interrupted or Thread.interrupted, else zero + */ + private long cancelCowaiter(ReaderNode node, ReaderNode leader, + boolean interrupted) { + if (node != null) { + node.waiter = null; + node.status = CANCELLED; + unlinkCowaiter(node, leader); + } + return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L; + } + + // Unsafe + private static final Unsafe U = Unsafe.getUnsafe(); + private static final long STATE + = U.objectFieldOffset(StampedLock.class, "state"); + private static final long HEAD + = U.objectFieldOffset(StampedLock.class, "head"); + private static final long TAIL + = U.objectFieldOffset(StampedLock.class, "tail"); + static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - STATE = l.findVarHandle(StampedLock.class, "state", long.class); - WHEAD = l.findVarHandle(StampedLock.class, "whead", WNode.class); - WTAIL = l.findVarHandle(StampedLock.class, "wtail", WNode.class); - WSTATUS = l.findVarHandle(WNode.class, "status", int.class); - WNEXT = l.findVarHandle(WNode.class, "next", WNode.class); - WCOWAIT = l.findVarHandle(WNode.class, "cowait", WNode.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } + Class ensureLoaded = LockSupport.class; } } diff --git a/src/java.base/share/classes/java/util/concurrent/package-info.java b/src/java.base/share/classes/java/util/concurrent/package-info.java --- a/src/java.base/share/classes/java/util/concurrent/package-info.java +++ b/src/java.base/share/classes/java/util/concurrent/package-info.java @@ -226,9 +226,8 @@ * *

Memory Consistency Properties

* - * * Chapter 17 of - * The Java™ Language Specification defines the + * The Java™ Language Specification defines the * happens-before relation on memory operations such as reads and * writes of shared variables. The results of a write by one thread are * guaranteed to be visible to a read by another thread only if the write @@ -302,6 +301,8 @@ * * * + * @jls 17.4.5 Happens-before Order + * * @since 1.5 */ package java.util.concurrent; diff --git a/test/jdk/java/util/Map/Get.java b/test/jdk/java/util/Map/Get.java --- a/test/jdk/java/util/Map/Get.java +++ b/test/jdk/java/util/Map/Get.java @@ -120,8 +120,8 @@ //--------------------- Infrastructure --------------------------- static volatile int passed = 0, failed = 0; static void pass() { passed++; } - static void fail() { failed++; (new Error("Failure")).printStackTrace(System.err); } - static void fail(String msg) { failed++; (new Error("Failure: " + msg)).printStackTrace(System.err); } + static void fail() { failed++; new Error("Failure").printStackTrace(System.err); } + static void fail(String msg) { failed++; new Error("Failure: " + msg).printStackTrace(System.err); } static void unexpected(String msg, Throwable t) { System.err.println("Unexpected: " + msg); unexpected(t); } static void unexpected(Throwable t) { failed++; t.printStackTrace(System.err); } static void check(boolean cond) { if (cond) pass(); else fail(); } diff --git a/test/jdk/java/util/concurrent/BlockingQueue/OfferDrainToLoops.java b/test/jdk/java/util/concurrent/BlockingQueue/OfferDrainToLoops.java --- a/test/jdk/java/util/concurrent/BlockingQueue/OfferDrainToLoops.java +++ b/test/jdk/java/util/concurrent/BlockingQueue/OfferDrainToLoops.java @@ -34,6 +34,7 @@ /* * @test * @bug 6805775 6815766 + * @library /test/lib * @run main OfferDrainToLoops 100 * @summary Test concurrent offer vs. drainTo */ @@ -47,10 +48,12 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.atomic.AtomicLong; +import jdk.test.lib.Utils; @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) public class OfferDrainToLoops { - final long testDurationMillisDefault = 10L * 1000L; + static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000); + final long testDurationMillisDefault = 10_000L; final long testDurationMillis; OfferDrainToLoops(String[] args) { @@ -76,7 +79,6 @@ System.out.println(q.getClass().getSimpleName()); final long testDurationNanos = testDurationMillis * 1000L * 1000L; final long quittingTimeNanos = System.nanoTime() + testDurationNanos; - final long timeoutMillis = 10L * 1000L; final SplittableRandom rnd = new SplittableRandom(); // Poor man's bounded buffer. @@ -155,13 +157,13 @@ }}}; for (Thread thread : new Thread[] { offerer, drainer, scanner }) { - thread.join(timeoutMillis + testDurationMillis); + thread.join(LONG_DELAY_MS + testDurationMillis); if (thread.isAlive()) { System.err.printf("Hung thread: %s%n", thread.getName()); failed++; for (StackTraceElement e : thread.getStackTrace()) System.err.println(e); - thread.join(timeoutMillis); + thread.join(LONG_DELAY_MS); } } } diff --git a/test/jdk/java/util/concurrent/ConcurrentHashMap/MapCheck.java b/test/jdk/java/util/concurrent/ConcurrentHashMap/MapCheck.java --- a/test/jdk/java/util/concurrent/ConcurrentHashMap/MapCheck.java +++ b/test/jdk/java/util/concurrent/ConcurrentHashMap/MapCheck.java @@ -537,7 +537,7 @@ static void printStats() { for (Iterator it = accum.entrySet().iterator(); it.hasNext(); ) { Map.Entry e = (Map.Entry)(it.next()); - Stats stats = ((Stats)(e.getValue())); + Stats stats = (Stats)(e.getValue()); int n = stats.number; double t; if (n > 0) diff --git a/test/jdk/java/util/concurrent/ConcurrentHashMap/MapLoops.java b/test/jdk/java/util/concurrent/ConcurrentHashMap/MapLoops.java --- a/test/jdk/java/util/concurrent/ConcurrentHashMap/MapLoops.java +++ b/test/jdk/java/util/concurrent/ConcurrentHashMap/MapLoops.java @@ -99,8 +99,8 @@ nops = Integer.parseInt(args[5]); // normalize probabilities wrt random number generator - removesPerMaxRandom = (int)(((double)premove/100.0 * 0x7FFFFFFFL)); - insertsPerMaxRandom = (int)(((double)pinsert/100.0 * 0x7FFFFFFFL)); + removesPerMaxRandom = (int)((double)premove/100.0 * 0x7FFFFFFFL); + insertsPerMaxRandom = (int)((double)pinsert/100.0 * 0x7FFFFFFFL); System.out.print("Class: " + mapClass.getName()); System.out.print(" threads: " + maxThreads); @@ -172,7 +172,7 @@ long time = timer.getTime(); long tpo = time / (i * (long)nops); System.out.print(LoopHelpers.rightJustify(tpo) + " ns per op"); - double secs = (double)(time) / 1000000000.0; + double secs = (double)time / 1000000000.0; System.out.println("\t " + secs + "s run time"); map.clear(); } diff --git a/test/jdk/java/util/concurrent/ConcurrentHashMap/ToArray.java b/test/jdk/java/util/concurrent/ConcurrentHashMap/ToArray.java --- a/test/jdk/java/util/concurrent/ConcurrentHashMap/ToArray.java +++ b/test/jdk/java/util/concurrent/ConcurrentHashMap/ToArray.java @@ -24,37 +24,40 @@ /* * @test * @bug 4486658 8010293 - * @summary thread safety of toArray methods of subCollections + * @summary thread safety of toArray methods of collection views * @author Martin Buchholz */ +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; import java.util.stream.IntStream; public class ToArray { public static void main(String[] args) throws Throwable { - // Execute a number of times to increase the probability of - // failure if there is an issue - for (int i = 0; i < 16; i++) { + final int runsPerTest = Integer.getInteger("jsr166.runsPerTest", 1); + final int reps = 10 * runsPerTest; + for (int i = reps; i--> 0; ) executeTest(); - } } static void executeTest() throws Throwable { - final Throwable[] throwable = new Throwable[1]; final ConcurrentHashMap m = new ConcurrentHashMap<>(); - - // Number of workers equal to the number of processors - // Each worker will put globally unique keys into the map - final int nWorkers = Runtime.getRuntime().availableProcessors(); + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + final int nCPU = Runtime.getRuntime().availableProcessors(); + final int minWorkers = 2; + final int maxWorkers = Math.max(minWorkers, Math.min(32, nCPU)); + final int nWorkers = rnd.nextInt(minWorkers, maxWorkers + 1); final int sizePerWorker = 1024; final int maxSize = nWorkers * sizePerWorker; - // The foreman keeps checking that the size of the arrays - // obtained from the key and value sets is never less than the - // previously observed size and is never greater than the maximum size + // The foreman busy-checks that the size of the arrays obtained + // from the keys and values views grows monotonically until it + // reaches the maximum size. + // NOTE: these size constraints are not specific to toArray and are // applicable to any form of traversal of the collection views CompletableFuture foreman = CompletableFuture.runAsync(new Runnable() { @@ -62,44 +65,37 @@ private boolean checkProgress(Object[] a) { int size = a.length; - if (size < prevSize) throw new RuntimeException("WRONG WAY"); - if (size > maxSize) throw new RuntimeException("OVERSHOOT"); - if (size == maxSize) return true; + if (size < prevSize || size > maxSize) + throw new AssertionError( + String.format("prevSize=%d size=%d maxSize=%d", + prevSize, size, maxSize)); prevSize = size; - return false; + return size == maxSize; } - @Override public void run() { - try { - Integer[] empty = new Integer[0]; - while (true) { - if (checkProgress(m.values().toArray())) return; - if (checkProgress(m.keySet().toArray())) return; - if (checkProgress(m.values().toArray(empty))) return; - if (checkProgress(m.keySet().toArray(empty))) return; - } - } - catch (Throwable t) { - throwable[0] = t; - } + Integer[] empty = new Integer[0]; + for (;;) + if (checkProgress(m.values().toArray()) + & checkProgress(m.keySet().toArray()) + & checkProgress(m.values().toArray(empty)) + & checkProgress(m.keySet().toArray(empty))) + return; } }); - // Create workers - // Each worker will put globally unique keys into the map - CompletableFuture[] workers = IntStream.range(0, nWorkers). - mapToObj(w -> CompletableFuture.runAsync(() -> { - for (int i = 0, o = w * sizePerWorker; i < sizePerWorker; i++) - m.put(o + i, i); - })). - toArray(CompletableFuture[]::new); + // Each worker puts globally unique keys into the map + List> workers = + IntStream.range(0, nWorkers) + .mapToObj(w -> (Runnable) () -> { + for (int i = 0, o = w * sizePerWorker; i < sizePerWorker; i++) + m.put(o + i, i); + }) + .map(CompletableFuture::runAsync) + .collect(Collectors.toList()); - // Wait for workers and then foreman to complete - CompletableFuture.allOf(workers).join(); + // Wait for workers and foreman to complete + workers.forEach(CompletableFuture::join); foreman.join(); - - if (throwable[0] != null) - throw throwable[0]; } } diff --git a/test/jdk/java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java b/test/jdk/java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java --- a/test/jdk/java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java +++ b/test/jdk/java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java @@ -25,7 +25,8 @@ * @test * @bug 6316155 6595669 6871697 6868712 * @summary Test concurrent offer vs. remove - * @run main OfferRemoveLoops 300 + * @library /test/lib + * @run main OfferRemoveLoops 100 * @author Martin Buchholz */ @@ -43,10 +44,12 @@ import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.Semaphore; +import jdk.test.lib.Utils; @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) public class OfferRemoveLoops { - final long testDurationMillisDefault = 10L * 1000L; + static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000); + final long testDurationMillisDefault = 10_000L; final long testDurationMillis; OfferRemoveLoops(String[] args) { @@ -75,7 +78,6 @@ System.err.println(q.getClass().getSimpleName()); final long testDurationNanos = testDurationMillis * 1000L * 1000L; final long quittingTimeNanos = System.nanoTime() + testDurationNanos; - final long timeoutMillis = 10L * 1000L; final int maxChunkSize = 1042; final int maxQueueSize = 10 * maxChunkSize; final CountDownLatch done = new CountDownLatch(3); @@ -156,7 +158,7 @@ done.countDown(); }}; - if (! done.await(timeoutMillis + testDurationMillis, MILLISECONDS)) { + if (! done.await(LONG_DELAY_MS + testDurationMillis, MILLISECONDS)) { for (Thread thread : new Thread[] { offerer, remover, scanner }) { if (thread.isAlive()) { System.err.printf("Hung thread: %s%n", thread.getName()); diff --git a/test/jdk/java/util/concurrent/CountDownLatch/Basic.java b/test/jdk/java/util/concurrent/CountDownLatch/Basic.java --- a/test/jdk/java/util/concurrent/CountDownLatch/Basic.java +++ b/test/jdk/java/util/concurrent/CountDownLatch/Basic.java @@ -23,70 +23,57 @@ /* * @test - * @bug 6332435 + * @bug 6332435 8221168 * @summary Basic tests for CountDownLatch * @library /test/lib * @author Seetharam Avadhanam, Martin Buchholz */ import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import jdk.test.lib.Utils; public class Basic { static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000); - interface AwaiterFactory { - Awaiter getAwaiter(); - } - abstract static class Awaiter extends Thread { - private volatile Throwable result = null; - protected void result(Throwable result) { this.result = result; } - public Throwable result() { return this.result; } - } - - private void toTheStartingGate(CountDownLatch gate) { - try { - gate.await(); - } - catch (Throwable t) { fail(t); } + volatile Throwable exception; + volatile boolean interrupted; + abstract void realRun() throws Exception; + public final void run() { + try { realRun(); } + catch (Throwable ex) { exception = ex; } + interrupted = Thread.interrupted(); + }; } - private Awaiter awaiter(final CountDownLatch latch, - final CountDownLatch gate) { - return new Awaiter() { public void run() { - System.out.println("without millis: " + latch.toString()); - gate.countDown(); - - try { + static Awaiter awaiter(CountDownLatch latch, + CountDownLatch gate) { + return new Awaiter() { + public void realRun() throws InterruptedException { + gate.countDown(); latch.await(); - System.out.println("without millis - ComingOut"); - } - catch (Throwable result) { result(result); }}}; + }}; } - private Awaiter awaiter(final CountDownLatch latch, - final CountDownLatch gate, - final long millis) { - return new Awaiter() { public void run() { - System.out.println("with millis: "+latch.toString()); - gate.countDown(); - - try { - latch.await(millis, TimeUnit.MILLISECONDS); - System.out.println("with millis - ComingOut"); - } - catch (Throwable result) { result(result); }}}; + static Awaiter awaiter(CountDownLatch latch, + CountDownLatch gate, + long timeoutMillis) { + return new Awaiter() { + public void realRun() throws InterruptedException { + gate.countDown(); + latch.await(timeoutMillis, TimeUnit.MILLISECONDS); + }}; } - AwaiterFactory awaiterFactory(CountDownLatch latch, CountDownLatch gate) { - return () -> awaiter(latch, gate); - } - - AwaiterFactory timedAwaiterFactory(CountDownLatch latch, CountDownLatch gate) { - return () -> awaiter(latch, gate, LONG_DELAY_MS); + static Supplier randomAwaiterSupplier( + CountDownLatch latch, CountDownLatch gate) { + return () -> (ThreadLocalRandom.current().nextBoolean()) + ? awaiter(latch, gate) + : awaiter(latch, gate, LONG_DELAY_MS); } //---------------------------------------------------------------- @@ -94,28 +81,24 @@ //---------------------------------------------------------------- public static void normalUse() throws Throwable { int count = 0; - Basic test = new Basic(); CountDownLatch latch = new CountDownLatch(3); Awaiter[] a = new Awaiter[12]; for (int i = 0; i < 3; i++) { CountDownLatch gate = new CountDownLatch(4); - AwaiterFactory factory1 = test.awaiterFactory(latch, gate); - AwaiterFactory factory2 = test.timedAwaiterFactory(latch, gate); - a[count] = factory1.getAwaiter(); a[count++].start(); - a[count] = factory1.getAwaiter(); a[count++].start(); - a[count] = factory2.getAwaiter(); a[count++].start(); - a[count] = factory2.getAwaiter(); a[count++].start(); - test.toTheStartingGate(gate); - System.out.println("Main Thread: " + latch.toString()); + Supplier s = randomAwaiterSupplier(latch, gate); + a[count] = s.get(); a[count++].start(); + a[count] = s.get(); a[count++].start(); + a[count] = s.get(); a[count++].start(); + a[count] = s.get(); a[count++].start(); + gate.await(); latch.countDown(); checkCount(latch, 2-i); } - for (int i = 0; i < 12; i++) - a[i].join(); - - for (int i = 0; i < 12; i++) - checkResult(a[i], null); + for (Awaiter awaiter : a) + awaiter.join(); + for (Awaiter awaiter : a) + checkException(awaiter, null); } //---------------------------------------------------------------- @@ -123,38 +106,38 @@ //---------------------------------------------------------------- public static void threadInterrupted() throws Throwable { int count = 0; - Basic test = new Basic(); CountDownLatch latch = new CountDownLatch(3); Awaiter[] a = new Awaiter[12]; for (int i = 0; i < 3; i++) { CountDownLatch gate = new CountDownLatch(4); - AwaiterFactory factory1 = test.awaiterFactory(latch, gate); - AwaiterFactory factory2 = test.timedAwaiterFactory(latch, gate); - a[count] = factory1.getAwaiter(); a[count++].start(); - a[count] = factory1.getAwaiter(); a[count++].start(); - a[count] = factory2.getAwaiter(); a[count++].start(); - a[count] = factory2.getAwaiter(); a[count++].start(); + Supplier s = randomAwaiterSupplier(latch, gate); + a[count] = s.get(); a[count++].start(); + a[count] = s.get(); a[count++].start(); + a[count] = s.get(); a[count++].start(); + a[count] = s.get(); a[count++].start(); + gate.await(); a[count-1].interrupt(); - test.toTheStartingGate(gate); - System.out.println("Main Thread: " + latch.toString()); latch.countDown(); checkCount(latch, 2-i); } - for (int i = 0; i < 12; i++) - a[i].join(); - - for (int i = 0; i < 12; i++) - checkResult(a[i], - (i % 4) == 3 ? InterruptedException.class : null); + for (Awaiter awaiter : a) + awaiter.join(); + for (int i = 0; i < a.length; i++) { + Awaiter awaiter = a[i]; + Throwable ex = awaiter.exception; + if ((i % 4) == 3 && !awaiter.interrupted) + checkException(awaiter, InterruptedException.class); + else + checkException(awaiter, null); + } } //---------------------------------------------------------------- // One thread timed out //---------------------------------------------------------------- public static void timeOut() throws Throwable { - int count =0; - Basic test = new Basic(); + int count = 0; CountDownLatch latch = new CountDownLatch(3); Awaiter[] a = new Awaiter[12]; @@ -162,54 +145,56 @@ for (int i = 0; i < 3; i++) { CountDownLatch gate = new CountDownLatch(4); - AwaiterFactory factory1 = test.awaiterFactory(latch, gate); - AwaiterFactory factory2 = test.timedAwaiterFactory(latch, gate); - a[count] = test.awaiter(latch, gate, timeout[i]); a[count++].start(); - a[count] = factory1.getAwaiter(); a[count++].start(); - a[count] = factory2.getAwaiter(); a[count++].start(); - a[count] = factory2.getAwaiter(); a[count++].start(); - test.toTheStartingGate(gate); - System.out.println("Main Thread: " + latch.toString()); + Supplier s = randomAwaiterSupplier(latch, gate); + a[count] = awaiter(latch, gate, timeout[i]); a[count++].start(); + a[count] = s.get(); a[count++].start(); + a[count] = s.get(); a[count++].start(); + a[count] = s.get(); a[count++].start(); + gate.await(); latch.countDown(); checkCount(latch, 2-i); } - for (int i = 0; i < 12; i++) - a[i].join(); - - for (int i = 0; i < 12; i++) - checkResult(a[i], null); + for (Awaiter awaiter : a) + awaiter.join(); + for (Awaiter awaiter : a) + checkException(awaiter, null); } public static void main(String[] args) throws Throwable { - normalUse(); - threadInterrupted(); - timeOut(); + try { + normalUse(); + } catch (Throwable ex) { fail(ex); } + try { + threadInterrupted(); + } catch (Throwable ex) { fail(ex); } + try { + timeOut(); + } catch (Throwable ex) { fail(ex); } + if (failures.get() > 0L) throw new AssertionError(failures.get() + " failures"); } - private static final AtomicInteger failures = new AtomicInteger(0); + static final AtomicInteger failures = new AtomicInteger(0); - private static void fail(String msg) { + static void fail(String msg) { fail(new AssertionError(msg)); } - private static void fail(Throwable t) { + static void fail(Throwable t) { t.printStackTrace(); failures.getAndIncrement(); } - private static void checkCount(CountDownLatch b, int expected) { + static void checkCount(CountDownLatch b, int expected) { if (b.getCount() != expected) fail("Count = " + b.getCount() + ", expected = " + expected); } - private static void checkResult(Awaiter a, Class c) { - Throwable t = a.result(); - if (! ((t == null && c == null) || c.isInstance(t))) { - System.out.println("Mismatch: " + t + ", " + c.getName()); - failures.getAndIncrement(); - } + static void checkException(Awaiter awaiter, Class c) { + Throwable ex = awaiter.exception; + if (! ((ex == null && c == null) || c.isInstance(ex))) + fail("Expected: " + c + ", got: " + ex); } } diff --git a/test/jdk/java/util/concurrent/CyclicBarrier/Basic.java b/test/jdk/java/util/concurrent/CyclicBarrier/Basic.java --- a/test/jdk/java/util/concurrent/CyclicBarrier/Basic.java +++ b/test/jdk/java/util/concurrent/CyclicBarrier/Basic.java @@ -37,6 +37,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import jdk.test.lib.Utils; @@ -293,37 +294,42 @@ * Handling of extra interrupts while waiting - tests for bug 6366811 */ private static void testInterrupts() { - final int N = 10; + final int N = ThreadLocalRandom.current().nextInt(2, 10); final CyclicBarrier startingGate = new CyclicBarrier(N+1); /** * A version of Awaiter that also records interrupted state. */ class Waiter extends CheckedThread { - private boolean timed; - private CyclicBarrier barrier; - private CountDownLatch doneSignal; - private Throwable throwable; - private boolean interrupted; + private final boolean timed; + private final CyclicBarrier barrier; + private final CountDownLatch doneSignal; + volatile Throwable throwable; + volatile boolean interruptStatusSetAfterAwait; - public Waiter(boolean timed, - CountDownLatch doneSignal, - CyclicBarrier barrier) { - this.timed = timed; + public Waiter(CountDownLatch doneSignal, CyclicBarrier barrier) { + this.timed = ThreadLocalRandom.current().nextBoolean(); this.doneSignal = doneSignal; this.barrier = barrier; } - Throwable throwable() { return this.throwable; } - boolean interruptBit() { return this.interrupted; } + void realRun() throws Throwable { startingGate.await(LONG_DELAY_MS, MILLISECONDS); + try { if (timed) barrier.await(LONG_DELAY_MS, MILLISECONDS); - else barrier.await(); } - catch (Throwable throwable) { this.throwable = throwable; } + else barrier.await(); + } catch (Throwable throwable) { + this.throwable = throwable; + } - try { doneSignal.await(LONG_DELAY_MS, MILLISECONDS); } - catch (InterruptedException e) { interrupted = true; } + try { + check(doneSignal.await(LONG_DELAY_MS, MILLISECONDS)); + if (Thread.interrupted()) + interruptStatusSetAfterAwait = true; + } catch (InterruptedException e) { + interruptStatusSetAfterAwait = true; + } } } @@ -352,7 +358,7 @@ } catch (Throwable t) { unexpected(t); } }}; for (int i = 0; i < N; i++) { - Waiter waiter = new Waiter(i < N/2, doneSignal, barrier); + Waiter waiter = new Waiter(doneSignal, barrier); waiter.start(); waiters.add(waiter); } @@ -360,16 +366,14 @@ while (barrier.getNumberWaiting() < N) Thread.yield(); barrier.await(); doneSignal.countDown(); - int countInterrupted = 0; - int countInterruptedException = 0; - int countBrokenBarrierException = 0; + int countInterruptStatusSetAfterAwait = 0; for (Waiter waiter : waiters) { waiter.join(); - equal(waiter.throwable(), null); - if (waiter.interruptBit()) - countInterrupted++; + equal(waiter.throwable, null); + if (waiter.interruptStatusSetAfterAwait) + countInterruptStatusSetAfterAwait++; } - equal(countInterrupted, N/2); + equal(countInterruptStatusSetAfterAwait, N/2); check(! barrier.isBroken()); } catch (Throwable t) { unexpected(t); } @@ -381,31 +385,33 @@ final CyclicBarrier barrier = new CyclicBarrier(N+1); final List waiters = new ArrayList<>(N); for (int i = 0; i < N; i++) { - Waiter waiter = new Waiter(i < N/2, doneSignal, barrier); + Waiter waiter = new Waiter(doneSignal, barrier); waiter.start(); waiters.add(waiter); } startingGate.await(LONG_DELAY_MS, MILLISECONDS); while (barrier.getNumberWaiting() < N) Thread.yield(); - for (int i = 0; i < N/2; i++) - waiters.get(i).interrupt(); + for (int i = 0; i < N/2; i++) { + Thread waiter = waiters.get(i); + waiter.interrupt(); + } doneSignal.countDown(); - int countInterrupted = 0; int countInterruptedException = 0; int countBrokenBarrierException = 0; + int countInterruptStatusSetAfterAwait = 0; for (Waiter waiter : waiters) { waiter.join(); - if (waiter.throwable() instanceof InterruptedException) + if (waiter.throwable instanceof InterruptedException) countInterruptedException++; - if (waiter.throwable() instanceof BrokenBarrierException) + if (waiter.throwable instanceof BrokenBarrierException) countBrokenBarrierException++; - if (waiter.interruptBit()) - countInterrupted++; + if (waiter.interruptStatusSetAfterAwait) + countInterruptStatusSetAfterAwait++; } - equal(countInterrupted, N/2-1); equal(countInterruptedException, 1); equal(countBrokenBarrierException, N-1); checkBroken(barrier); + equal(countInterruptStatusSetAfterAwait, N/2-1); reset(barrier); } catch (Throwable t) { unexpected(t); } } diff --git a/test/jdk/java/util/concurrent/FutureTask/BlockingTaskExecutor.java b/test/jdk/java/util/concurrent/FutureTask/BlockingTaskExecutor.java --- a/test/jdk/java/util/concurrent/FutureTask/BlockingTaskExecutor.java +++ b/test/jdk/java/util/concurrent/FutureTask/BlockingTaskExecutor.java @@ -103,7 +103,7 @@ */ static class NotificationReceiver { /** Has the notifiee been notified? */ - boolean notified = false; + boolean notified; /** * Notify the notification receiver. diff --git a/test/jdk/java/util/concurrent/FutureTask/CancelledFutureLoops.java b/test/jdk/java/util/concurrent/FutureTask/CancelledFutureLoops.java --- a/test/jdk/java/util/concurrent/FutureTask/CancelledFutureLoops.java +++ b/test/jdk/java/util/concurrent/FutureTask/CancelledFutureLoops.java @@ -130,7 +130,7 @@ long endTime = System.nanoTime(); long time = endTime - timer.startTime; if (print) { - double secs = (double)(time) / 1000000000.0; + double secs = (double)time / 1000000000.0; System.out.println("\t " + secs + "s run time"); } diff --git a/test/jdk/java/util/concurrent/FutureTask/DoneTimedGetLoops.java b/test/jdk/java/util/concurrent/FutureTask/DoneTimedGetLoops.java --- a/test/jdk/java/util/concurrent/FutureTask/DoneTimedGetLoops.java +++ b/test/jdk/java/util/concurrent/FutureTask/DoneTimedGetLoops.java @@ -33,6 +33,7 @@ /* * @test + * @library /test/lib * @run main DoneTimedGetLoops 300 * @summary isDone returning true guarantees that subsequent timed get * will never throw TimeoutException. @@ -42,10 +43,12 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import jdk.test.lib.Utils; @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) public class DoneTimedGetLoops { - final long testDurationMillisDefault = 10L * 1000L; + static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000); + final long testDurationMillisDefault = 10_000L; final long testDurationMillis; static class PublicFutureTask extends FutureTask { @@ -63,7 +66,6 @@ void test(String[] args) throws Throwable { final long testDurationNanos = testDurationMillis * 1000L * 1000L; final long quittingTimeNanos = System.nanoTime() + testDurationNanos; - final long timeoutMillis = 10L * 1000L; final AtomicReference normalRef = new AtomicReference<>(); @@ -136,13 +138,13 @@ setterException, doneTimedGetNormal, doneTimedGetAbnormal }) { - thread.join(timeoutMillis + testDurationMillis); + thread.join(LONG_DELAY_MS + testDurationMillis); if (thread.isAlive()) { System.err.printf("Hung thread: %s%n", thread.getName()); failed++; for (StackTraceElement e : thread.getStackTrace()) System.err.println(e); - thread.join(timeoutMillis); + thread.join(LONG_DELAY_MS); } } } diff --git a/test/jdk/java/util/concurrent/Phaser/FickleRegister.java b/test/jdk/java/util/concurrent/Phaser/FickleRegister.java --- a/test/jdk/java/util/concurrent/Phaser/FickleRegister.java +++ b/test/jdk/java/util/concurrent/Phaser/FickleRegister.java @@ -43,7 +43,7 @@ public class FickleRegister { final AtomicLong count = new AtomicLong(0); - final long testDurationMillisDefault = 10L * 1000L; + final long testDurationMillisDefault = 10_000L; final long testDurationMillis; final long quittingTimeNanos; final int chunkSize = 1000; diff --git a/test/jdk/java/util/concurrent/Phaser/TieredArriveLoops.java b/test/jdk/java/util/concurrent/Phaser/TieredArriveLoops.java --- a/test/jdk/java/util/concurrent/Phaser/TieredArriveLoops.java +++ b/test/jdk/java/util/concurrent/Phaser/TieredArriveLoops.java @@ -40,7 +40,7 @@ import java.util.concurrent.Phaser; public class TieredArriveLoops { - final long testDurationMillisDefault = 10L * 1000L; + final long testDurationMillisDefault = 10_000L; final long testDurationMillis; final long quittingTimeNanos; diff --git a/test/jdk/java/util/concurrent/ScheduledThreadPoolExecutor/GCRetention.java b/test/jdk/java/util/concurrent/ScheduledThreadPoolExecutor/GCRetention.java --- a/test/jdk/java/util/concurrent/ScheduledThreadPoolExecutor/GCRetention.java +++ b/test/jdk/java/util/concurrent/ScheduledThreadPoolExecutor/GCRetention.java @@ -86,7 +86,7 @@ if (q.remove(1000) != null) break; System.out.printf( - "%d/%d unqueued references remaining%n", j, n); + "%d/%d unqueued references remaining%n", j + 1, n); } } } diff --git a/test/jdk/java/util/concurrent/TimeUnit/Basic.java b/test/jdk/java/util/concurrent/TimeUnit/Basic.java --- a/test/jdk/java/util/concurrent/TimeUnit/Basic.java +++ b/test/jdk/java/util/concurrent/TimeUnit/Basic.java @@ -24,6 +24,7 @@ /* @test * @bug 5057341 6363898 * @summary Basic tests for TimeUnit + * @library /test/lib * @author Martin Buchholz */ @@ -39,12 +40,20 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.List; import java.io.ObjectOutputStream; import java.io.ObjectInputStream; import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import jdk.test.lib.Utils; public class Basic { + static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000); + private static void realMain(String[] args) throws Throwable { for (TimeUnit u : TimeUnit.values()) { @@ -71,18 +80,33 @@ equal(1000L, MILLISECONDS.toMicros(1)); equal(1000L, MICROSECONDS.toNanos(1)); - long t0 = System.nanoTime(); - MILLISECONDS.sleep(3); /* See windows bug 6313903, might not sleep */ - long elapsedMillis = (System.nanoTime() - t0)/(1000L * 1000L); - System.out.printf("elapsed=%d%n", elapsedMillis); - check(elapsedMillis >= 0); - /* Might not sleep on windows: check(elapsedMillis >= 3); */ - check(elapsedMillis < 1000); + //---------------------------------------------------------------- + // TimeUnit.sleep sleeps for at least the specified time. + // TimeUnit.sleep(x, unit) for x <= 0 does not sleep at all. + //---------------------------------------------------------------- + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + int maxTimeoutMillis = rnd.nextInt(1, 12); + List> workers = + IntStream.range(-1, maxTimeoutMillis + 1) + .mapToObj(timeoutMillis -> (Runnable) () -> { + try { + long startTime = System.nanoTime(); + MILLISECONDS.sleep(timeoutMillis); + long elapsedNanos = System.nanoTime() - startTime; + long timeoutNanos = MILLISECONDS.toNanos(timeoutMillis); + check(elapsedNanos >= timeoutNanos); + } catch (InterruptedException fail) { + throw new AssertionError(fail); + }}) + .map(CompletableFuture::runAsync) + .collect(Collectors.toList()); + + workers.forEach(CompletableFuture::join); //---------------------------------------------------------------- // Tests for serialized form compatibility with previous release //---------------------------------------------------------------- - byte[] serializedForm = /* Generated using tiger */ + byte[] serializedForm = /* Generated using JDK 5 */ {-84, -19, 0, 5, '~', 'r', 0, 29, 'j', 'a', 'v', 'a', '.', 'u', 't', 'i', 'l', '.', 'c', 'o', 'n', 'c', 'u', 'r', 'r', 'e', 'n', 't', '.', 'T', 'i', 'm', 'e', 'U', 'n', 'i', 't', 0, 0, diff --git a/test/jdk/java/util/concurrent/atomic/DoubleAdderDemo.java b/test/jdk/java/util/concurrent/atomic/DoubleAdderDemo.java --- a/test/jdk/java/util/concurrent/atomic/DoubleAdderDemo.java +++ b/test/jdk/java/util/concurrent/atomic/DoubleAdderDemo.java @@ -103,8 +103,8 @@ long total = (long)nthreads * incs; if (sum != (double)total) throw new Error(sum + " != " + total); - double secs = (double)time / (1000L * 1000 * 1000); - long rate = total * (1000L) / time; + double secs = (double)time / 1000_000_000L; + long rate = total * 1000L / time; System.out.printf("threads:%3d Time: %7.3fsec Incs per microsec: %4d\n", nthreads, secs, rate); } diff --git a/test/jdk/java/util/concurrent/locks/Lock/CheckedLockLoops.java b/test/jdk/java/util/concurrent/locks/Lock/CheckedLockLoops.java --- a/test/jdk/java/util/concurrent/locks/Lock/CheckedLockLoops.java +++ b/test/jdk/java/util/concurrent/locks/Lock/CheckedLockLoops.java @@ -136,7 +136,7 @@ long time = timer.getTime(); long tpi = time / (iters * nthreads); System.out.print("\t" + LoopHelpers.rightJustify(tpi) + " ns per update"); - // double secs = (double)(time) / 1000000000.0; + // double secs = (double)time / 1000000000.0; // System.out.print("\t " + secs + "s run time"); System.out.println(); diff --git a/test/jdk/java/util/concurrent/locks/Lock/FlakyMutex.java b/test/jdk/java/util/concurrent/locks/Lock/FlakyMutex.java --- a/test/jdk/java/util/concurrent/locks/Lock/FlakyMutex.java +++ b/test/jdk/java/util/concurrent/locks/Lock/FlakyMutex.java @@ -49,31 +49,51 @@ static class MyRuntimeException extends RuntimeException {} static void checkThrowable(Throwable t) { - check((t instanceof MyError) || + if (!((t instanceof MyError) || (t instanceof MyException) || - (t instanceof MyRuntimeException)); + (t instanceof MyRuntimeException))) + unexpected(t); } static void realMain(String[] args) throws Throwable { - final int nThreads = 3; + final ThreadLocalRandom rndMain = ThreadLocalRandom.current(); + final int nCpus = Runtime.getRuntime().availableProcessors(); + final int maxThreads = Math.min(4, nCpus); + final int nThreads = rndMain.nextInt(1, maxThreads + 1); final int iterations = 10_000; final CyclicBarrier startingGate = new CyclicBarrier(nThreads); + final ExecutorService es = Executors.newFixedThreadPool(nThreads); final FlakyMutex mutex = new FlakyMutex(); - final ExecutorService es = Executors.newFixedThreadPool(nThreads); final Runnable task = () -> { try { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); startingGate.await(); for (int i = 0; i < iterations; i++) { for (;;) { - try { mutex.lock(); break; } - catch (Throwable t) { checkThrowable(t); } + try { + if (rnd.nextBoolean()) + mutex.lock(); + else + mutex.lockInterruptibly(); + break; + } catch (Throwable t) { checkThrowable(t); } } - try { check(! mutex.tryLock()); } - catch (Throwable t) { checkThrowable(t); } + if (rnd.nextBoolean()) { + try { + check(! mutex.tryLock()); + } catch (Throwable t) { checkThrowable(t); } + } - try { check(! mutex.tryLock(1, TimeUnit.MICROSECONDS)); } - catch (Throwable t) { checkThrowable(t); } + if (rnd.nextInt(10) == 0) { + try { + check(! mutex.tryLock(1, TimeUnit.MICROSECONDS)); + } catch (Throwable t) { checkThrowable(t); } + } + + if (rnd.nextBoolean()) { + check(mutex.isLocked()); + } mutex.unlock(); } @@ -146,7 +166,11 @@ if (x == null ? y == null : x.equals(y)) pass(); else fail(x + " not equal to " + y);} public static void main(String[] args) throws Throwable { - try {realMain(args);} catch (Throwable t) {unexpected(t);} + int runsPerTest = Integer.getInteger("jsr166.runsPerTest", 1); + try { + for (int i = runsPerTest; i--> 0; ) + realMain(args); + } catch (Throwable t) { unexpected(t); } System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); if (failed > 0) throw new AssertionError("Some tests failed");} @SuppressWarnings("unchecked") diff --git a/test/jdk/java/util/concurrent/locks/Lock/TimedAcquireLeak.java b/test/jdk/java/util/concurrent/locks/Lock/TimedAcquireLeak.java --- a/test/jdk/java/util/concurrent/locks/Lock/TimedAcquireLeak.java +++ b/test/jdk/java/util/concurrent/locks/Lock/TimedAcquireLeak.java @@ -42,6 +42,8 @@ import java.io.Reader; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -72,9 +74,9 @@ return new File(bin, programName).getPath(); } - static final String java = javaProgramPath("java"); - static final String jmap = javaProgramPath("jmap"); - static final String jps = javaProgramPath("jps"); + static final String javaPath = javaProgramPath("java"); + static final String jmapPath = javaProgramPath("jmap"); + static final String jpsPath = javaProgramPath("jps"); static String outputOf(Reader r) throws IOException { final StringBuilder sb = new StringBuilder(); @@ -159,7 +161,11 @@ static String match(String s, String regex, int group) { Matcher matcher = Pattern.compile(regex).matcher(s); - matcher.find(); + if (! matcher.find()) { + String msg = String.format( + "match failed: s=%s regex=%s", s, regex); + throw new AssertionError(msg); + } return matcher.group(group); } @@ -171,21 +177,20 @@ static int objectsInUse(final Process child, final String childPid, - final String className) { - final String regex = - "(?m)^ *[0-9]+: +([0-9]+) +[0-9]+ +\\Q"+className+"\\E(?:$| )"; - final Callable objectsInUse = - new Callable() { public Integer call() { - Integer i = Integer.parseInt( - match(commandOutputOf(jmap, "-histo:live", childPid), - regex, 1)); - if (i > 100) - System.out.print( - commandOutputOf(jmap, - "-dump:file=dump,format=b", - childPid)); - return i; - }}; + final String classNameRegex) { + String regex = + "(?m)^ *[0-9]+: +([0-9]+) +[0-9]+ +"+classNameRegex+"(?:$| )"; + Callable objectsInUse = () -> { + int i = Integer.parseInt( + match(commandOutputOf(jmapPath, "-histo:live", childPid), + regex, 1)); + if (i > 100) + System.out.print( + commandOutputOf(jmapPath, + "-dump:file=dump,format=b", + childPid)); + return i; + }; try { return rendezvousParent(child, objectsInUse); } catch (Throwable t) { unexpected(t); return -1; } } @@ -196,26 +201,27 @@ return; final String childClassName = Job.class.getName(); - final String classToCheckForLeaks = Job.classToCheckForLeaks(); - final String uniqueID = - String.valueOf(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)); + final String classNameRegex = Job.classNameRegexToCheckForLeaks(); + final String uniqueID = String.valueOf( + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)); - final String[] jobCmd = { - java, "-Xmx8m", "-XX:+UsePerfData", - "-classpath", System.getProperty("test.class.path"), - childClassName, uniqueID - }; + final ArrayList jobCmd = new ArrayList<>(); + Collections.addAll( + jobCmd, javaPath, "-Xmx8m", "-XX:+UsePerfData", + "-classpath", System.getProperty("test.class.path")); + Collections.addAll(jobCmd, Utils.getTestJavaOpts()); + Collections.addAll(jobCmd, childClassName, uniqueID); final Process p = new ProcessBuilder(jobCmd).start(); // Ensure subprocess jvm has started, so that jps can find it p.getInputStream().read(); sendByte(p.getOutputStream()); final String childPid = - match(commandOutputOf(jps, "-m"), + match(commandOutputOf(jpsPath, "-m"), "(?m)^ *([0-9]+) +\\Q"+childClassName+"\\E *"+uniqueID+"$", 1); - final int n0 = objectsInUse(p, childPid, classToCheckForLeaks); - final int n1 = objectsInUse(p, childPid, classToCheckForLeaks); + final int n0 = objectsInUse(p, childPid, classNameRegex); + final int n1 = objectsInUse(p, childPid, classNameRegex); equal(p.waitFor(), 0); equal(p.exitValue(), 0); failed += p.exitValue(); @@ -226,7 +232,7 @@ // implementation, and needing occasional adjustment. System.out.printf("%d -> %d%n", n0, n1); // Almost always n0 == n1 - // Maximum jitter observed in practice is 10 -> 17 + // Maximum jitter observed in practice is 7 check(Math.abs(n1 - n0) < 10); check(n1 < 25); drainers.shutdown(); @@ -244,9 +250,9 @@ // - in between calls to rendezvousChild, run code that may leak. //---------------------------------------------------------------- public static class Job { - static String classToCheckForLeaks() { + static String classNameRegexToCheckForLeaks() { return - "java.util.concurrent.locks.AbstractQueuedSynchronizer$Node"; + "\\Qjava.util.concurrent.locks.AbstractQueuedSynchronizer$\\E[A-Za-z]+"; } public static void main(String[] args) throws Throwable { diff --git a/test/jdk/java/util/concurrent/locks/ReentrantLock/CancelledLockLoops.java b/test/jdk/java/util/concurrent/locks/ReentrantLock/CancelledLockLoops.java --- a/test/jdk/java/util/concurrent/locks/ReentrantLock/CancelledLockLoops.java +++ b/test/jdk/java/util/concurrent/locks/ReentrantLock/CancelledLockLoops.java @@ -93,7 +93,7 @@ barrier.await(); if (print) { long time = timer.getTime(); - double secs = (double)(time) / 1000000000.0; + double secs = (double)time / 1000000000.0; System.out.println("\t " + secs + "s run time"); } diff --git a/test/jdk/java/util/concurrent/locks/ReentrantLock/LockOncePerThreadLoops.java b/test/jdk/java/util/concurrent/locks/ReentrantLock/LockOncePerThreadLoops.java --- a/test/jdk/java/util/concurrent/locks/ReentrantLock/LockOncePerThreadLoops.java +++ b/test/jdk/java/util/concurrent/locks/ReentrantLock/LockOncePerThreadLoops.java @@ -94,7 +94,7 @@ barrier.await(); if (print) { long time = timer.getTime(); - double secs = (double)(time) / 1000000000.0; + double secs = (double)time / 1000000000.0; System.out.println("\t " + secs + "s run time"); } diff --git a/test/jdk/java/util/concurrent/locks/ReentrantLock/SimpleReentrantLockLoops.java b/test/jdk/java/util/concurrent/locks/ReentrantLock/SimpleReentrantLockLoops.java --- a/test/jdk/java/util/concurrent/locks/ReentrantLock/SimpleReentrantLockLoops.java +++ b/test/jdk/java/util/concurrent/locks/ReentrantLock/SimpleReentrantLockLoops.java @@ -95,7 +95,7 @@ long time = timer.getTime(); long tpi = time / ((long)iters * nthreads); System.out.print("\t" + LoopHelpers.rightJustify(tpi) + " ns per lock"); - double secs = (double)(time) / 1000000000.0; + double secs = (double)time / 1000000000.0; System.out.println("\t " + secs + "s run time"); } diff --git a/test/jdk/java/util/concurrent/locks/ReentrantLock/TimeoutLockLoops.java b/test/jdk/java/util/concurrent/locks/ReentrantLock/TimeoutLockLoops.java --- a/test/jdk/java/util/concurrent/locks/ReentrantLock/TimeoutLockLoops.java +++ b/test/jdk/java/util/concurrent/locks/ReentrantLock/TimeoutLockLoops.java @@ -96,7 +96,7 @@ barrier.await(); if (print) { long time = timer.getTime(); - double secs = (double)(time) / 1000000000.0; + double secs = (double)time / 1000000000.0; System.out.println("\t " + secs + "s run time"); } diff --git a/test/jdk/java/util/concurrent/locks/ReentrantReadWriteLock/MapLoops.java b/test/jdk/java/util/concurrent/locks/ReentrantReadWriteLock/MapLoops.java --- a/test/jdk/java/util/concurrent/locks/ReentrantReadWriteLock/MapLoops.java +++ b/test/jdk/java/util/concurrent/locks/ReentrantReadWriteLock/MapLoops.java @@ -91,8 +91,8 @@ premove = Integer.parseInt(args[4]); // normalize probabilities wrt random number generator - removesPerMaxRandom = (int)(((double)premove/100.0 * 0x7FFFFFFFL)); - insertsPerMaxRandom = (int)(((double)pinsert/100.0 * 0x7FFFFFFFL)); + removesPerMaxRandom = (int)((double)premove/100.0 * 0x7FFFFFFFL); + insertsPerMaxRandom = (int)((double)pinsert/100.0 * 0x7FFFFFFFL); System.out.println("Using " + mapClass.getName()); @@ -125,7 +125,7 @@ long time = timer.getTime(); long tpo = time / (i * (long)nops); System.out.print(LoopHelpers.rightJustify(tpo) + " ns per op"); - double secs = (double)(time) / 1000000000.0; + double secs = (double)time / 1000000000.0; System.out.println("\t " + secs + "s run time"); map.clear(); } diff --git a/test/jdk/java/util/concurrent/tck/AbstractQueuedLongSynchronizerTest.java b/test/jdk/java/util/concurrent/tck/AbstractQueuedLongSynchronizerTest.java --- a/test/jdk/java/util/concurrent/tck/AbstractQueuedLongSynchronizerTest.java +++ b/test/jdk/java/util/concurrent/tck/AbstractQueuedLongSynchronizerTest.java @@ -39,7 +39,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; -import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; import java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject; @@ -1286,19 +1286,31 @@ /** * Tests scenario for * JDK-8191937: Lost interrupt in AbstractQueuedSynchronizer when tryAcquire methods throw + * ant -Djsr166.tckTestClass=AbstractQueuedLongSynchronizerTest -Djsr166.methodFilter=testInterruptedFailingAcquire -Djsr166.runsPerTest=10000 tck */ - public void testInterruptedFailingAcquire() throws InterruptedException { - final RuntimeException ex = new RuntimeException(); + public void testInterruptedFailingAcquire() throws Throwable { + class PleaseThrow extends RuntimeException {} + final PleaseThrow ex = new PleaseThrow(); + final AtomicBoolean thrown = new AtomicBoolean(); // A synchronizer only offering a choice of failure modes class Sync extends AbstractQueuedLongSynchronizer { - boolean pleaseThrow; + volatile boolean pleaseThrow; + void maybeThrow() { + if (pleaseThrow) { + // assert: tryAcquire methods can throw at most once + if (! thrown.compareAndSet(false, true)) + throw new AssertionError(); + throw ex; + } + } + @Override protected boolean tryAcquire(long ignored) { - if (pleaseThrow) throw ex; + maybeThrow(); return false; } @Override protected long tryAcquireShared(long ignored) { - if (pleaseThrow) throw ex; + maybeThrow(); return -1; } @Override protected boolean tryRelease(long ignored) { @@ -1310,30 +1322,87 @@ } final Sync s = new Sync(); + final boolean acquireInterruptibly = randomBoolean(); + final Action[] uninterruptibleAcquireActions = { + () -> s.acquire(1), + () -> s.acquireShared(1), + }; + final long nanosTimeout = MILLISECONDS.toNanos(2 * LONG_DELAY_MS); + final Action[] interruptibleAcquireActions = { + () -> s.acquireInterruptibly(1), + () -> s.acquireSharedInterruptibly(1), + () -> s.tryAcquireNanos(1, nanosTimeout), + () -> s.tryAcquireSharedNanos(1, nanosTimeout), + }; + final Action[] releaseActions = { + () -> s.release(1), + () -> s.releaseShared(1), + }; + final Action acquireAction = acquireInterruptibly + ? chooseRandomly(interruptibleAcquireActions) + : chooseRandomly(uninterruptibleAcquireActions); + final Action releaseAction + = chooseRandomly(releaseActions); + // From os_posix.cpp: + // + // NOTE that since there is no "lock" around the interrupt and + // is_interrupted operations, there is the possibility that the + // interrupted flag (in osThread) will be "false" but that the + // low-level events will be in the signaled state. This is + // intentional. The effect of this is that Object.wait() and + // LockSupport.park() will appear to have a spurious wakeup, which + // is allowed and not harmful, and the possibility is so rare that + // it is not worth the added complexity to add yet another lock. final Thread thread = newStartedThread(new CheckedRunnable() { - public void realRun() { + public void realRun() throws Throwable { try { - if (ThreadLocalRandom.current().nextBoolean()) - s.acquire(1); - else - s.acquireShared(1); + acquireAction.run(); shouldThrow(); - } catch (Throwable t) { - assertSame(ex, t); - assertTrue(Thread.interrupted()); + } catch (InterruptedException possible) { + assertTrue(acquireInterruptibly); + assertFalse(Thread.interrupted()); + } catch (PleaseThrow possible) { + awaitInterrupted(); } }}); - waitForThreadToEnterWaitState(thread); - assertSame(thread, s.getFirstQueuedThread()); - assertTrue(s.hasQueuedPredecessors()); - assertTrue(s.hasQueuedThreads()); - assertEquals(1, s.getQueueLength()); + for (long startTime = 0L;; ) { + waitForThreadToEnterWaitState(thread); + if (s.getFirstQueuedThread() == thread + && s.hasQueuedPredecessors() + && s.hasQueuedThreads() + && s.getQueueLength() == 1 + && s.hasContended()) + break; + if (startTime == 0L) + startTime = System.nanoTime(); + else if (millisElapsedSince(startTime) > LONG_DELAY_MS) + fail("timed out waiting for AQS state: " + + "thread state=" + thread.getState() + + ", queued threads=" + s.getQueuedThreads()); + Thread.yield(); + } s.pleaseThrow = true; - thread.interrupt(); - s.release(1); + // release and interrupt, in random order + if (randomBoolean()) { + thread.interrupt(); + releaseAction.run(); + } else { + releaseAction.run(); + thread.interrupt(); + } awaitTermination(thread); + + if (! acquireInterruptibly) + assertTrue(thrown.get()); + + assertNull(s.getFirstQueuedThread()); + assertFalse(s.hasQueuedPredecessors()); + assertFalse(s.hasQueuedThreads()); + assertEquals(0, s.getQueueLength()); + assertTrue(s.getQueuedThreads().isEmpty()); + assertTrue(s.hasContended()); } } diff --git a/test/jdk/java/util/concurrent/tck/AbstractQueuedSynchronizerTest.java b/test/jdk/java/util/concurrent/tck/AbstractQueuedSynchronizerTest.java --- a/test/jdk/java/util/concurrent/tck/AbstractQueuedSynchronizerTest.java +++ b/test/jdk/java/util/concurrent/tck/AbstractQueuedSynchronizerTest.java @@ -40,7 +40,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; -import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject; @@ -1337,19 +1337,31 @@ /** * Tests scenario for * JDK-8191937: Lost interrupt in AbstractQueuedSynchronizer when tryAcquire methods throw + * ant -Djsr166.tckTestClass=AbstractQueuedSynchronizerTest -Djsr166.methodFilter=testInterruptedFailingAcquire -Djsr166.runsPerTest=10000 tck */ - public void testInterruptedFailingAcquire() throws InterruptedException { - final RuntimeException ex = new RuntimeException(); + public void testInterruptedFailingAcquire() throws Throwable { + class PleaseThrow extends RuntimeException {} + final PleaseThrow ex = new PleaseThrow(); + final AtomicBoolean thrown = new AtomicBoolean(); // A synchronizer only offering a choice of failure modes class Sync extends AbstractQueuedSynchronizer { - boolean pleaseThrow; + volatile boolean pleaseThrow; + void maybeThrow() { + if (pleaseThrow) { + // assert: tryAcquire methods can throw at most once + if (! thrown.compareAndSet(false, true)) + throw new AssertionError(); + throw ex; + } + } + @Override protected boolean tryAcquire(int ignored) { - if (pleaseThrow) throw ex; + maybeThrow(); return false; } @Override protected int tryAcquireShared(int ignored) { - if (pleaseThrow) throw ex; + maybeThrow(); return -1; } @Override protected boolean tryRelease(int ignored) { @@ -1361,30 +1373,87 @@ } final Sync s = new Sync(); + final boolean acquireInterruptibly = randomBoolean(); + final Action[] uninterruptibleAcquireActions = { + () -> s.acquire(1), + () -> s.acquireShared(1), + }; + final long nanosTimeout = MILLISECONDS.toNanos(2 * LONG_DELAY_MS); + final Action[] interruptibleAcquireActions = { + () -> s.acquireInterruptibly(1), + () -> s.acquireSharedInterruptibly(1), + () -> s.tryAcquireNanos(1, nanosTimeout), + () -> s.tryAcquireSharedNanos(1, nanosTimeout), + }; + final Action[] releaseActions = { + () -> s.release(1), + () -> s.releaseShared(1), + }; + final Action acquireAction = acquireInterruptibly + ? chooseRandomly(interruptibleAcquireActions) + : chooseRandomly(uninterruptibleAcquireActions); + final Action releaseAction + = chooseRandomly(releaseActions); + // From os_posix.cpp: + // + // NOTE that since there is no "lock" around the interrupt and + // is_interrupted operations, there is the possibility that the + // interrupted flag (in osThread) will be "false" but that the + // low-level events will be in the signaled state. This is + // intentional. The effect of this is that Object.wait() and + // LockSupport.park() will appear to have a spurious wakeup, which + // is allowed and not harmful, and the possibility is so rare that + // it is not worth the added complexity to add yet another lock. final Thread thread = newStartedThread(new CheckedRunnable() { - public void realRun() { + public void realRun() throws Throwable { try { - if (ThreadLocalRandom.current().nextBoolean()) - s.acquire(1); - else - s.acquireShared(1); + acquireAction.run(); shouldThrow(); - } catch (Throwable t) { - assertSame(ex, t); - assertTrue(Thread.interrupted()); + } catch (InterruptedException possible) { + assertTrue(acquireInterruptibly); + assertFalse(Thread.interrupted()); + } catch (PleaseThrow possible) { + awaitInterrupted(); } }}); - waitForThreadToEnterWaitState(thread); - assertSame(thread, s.getFirstQueuedThread()); - assertTrue(s.hasQueuedPredecessors()); - assertTrue(s.hasQueuedThreads()); - assertEquals(1, s.getQueueLength()); + for (long startTime = 0L;; ) { + waitForThreadToEnterWaitState(thread); + if (s.getFirstQueuedThread() == thread + && s.hasQueuedPredecessors() + && s.hasQueuedThreads() + && s.getQueueLength() == 1 + && s.hasContended()) + break; + if (startTime == 0L) + startTime = System.nanoTime(); + else if (millisElapsedSince(startTime) > LONG_DELAY_MS) + fail("timed out waiting for AQS state: " + + "thread state=" + thread.getState() + + ", queued threads=" + s.getQueuedThreads()); + Thread.yield(); + } s.pleaseThrow = true; - thread.interrupt(); - s.release(1); + // release and interrupt, in random order + if (randomBoolean()) { + thread.interrupt(); + releaseAction.run(); + } else { + releaseAction.run(); + thread.interrupt(); + } awaitTermination(thread); + + if (! acquireInterruptibly) + assertTrue(thrown.get()); + + assertNull(s.getFirstQueuedThread()); + assertFalse(s.hasQueuedPredecessors()); + assertFalse(s.hasQueuedThreads()); + assertEquals(0, s.getQueueLength()); + assertTrue(s.getQueuedThreads().isEmpty()); + assertTrue(s.hasContended()); } } diff --git a/test/jdk/java/util/concurrent/tck/ArrayBlockingQueueTest.java b/test/jdk/java/util/concurrent/tck/ArrayBlockingQueueTest.java --- a/test/jdk/java/util/concurrent/tck/ArrayBlockingQueueTest.java +++ b/test/jdk/java/util/concurrent/tck/ArrayBlockingQueueTest.java @@ -61,7 +61,7 @@ class Implementation implements CollectionImplementation { public Class klazz() { return ArrayBlockingQueue.class; } public Collection emptyCollection() { - boolean fair = ThreadLocalRandom.current().nextBoolean(); + boolean fair = randomBoolean(); return populatedQueue(0, SIZE, 2 * SIZE, fair); } public Object makeElement(int i) { return i; } @@ -367,7 +367,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(SIZE, q.size()); @@ -409,7 +409,7 @@ assertEquals(0, q.take()); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(0, q.remainingCapacity()); @@ -431,21 +431,21 @@ Thread.currentThread().interrupt(); try { - q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offer(new Object(), randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offer(new Object(), LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -486,7 +486,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -539,29 +539,26 @@ final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - long startTime = System.nanoTime(); for (int i = 0; i < SIZE; i++) assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); Thread.currentThread().interrupt(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); - - assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); checkEmpty(q); diff --git a/test/jdk/java/util/concurrent/tck/BlockingQueueTest.java b/test/jdk/java/util/concurrent/tck/BlockingQueueTest.java --- a/test/jdk/java/util/concurrent/tck/BlockingQueueTest.java +++ b/test/jdk/java/util/concurrent/tck/BlockingQueueTest.java @@ -253,7 +253,7 @@ Thread.currentThread().interrupt(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); @@ -274,7 +274,7 @@ assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); barrier.await(); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -296,7 +296,7 @@ }}); await(threadStarted); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -325,19 +325,19 @@ */ public void testTimedPollFromEmptyBlocksInterruptibly() { final BlockingQueue q = emptyCollection(); - final CountDownLatch threadStarted = new CountDownLatch(1); + final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() { - threadStarted.countDown(); + pleaseInterrupt.countDown(); try { - q.poll(2 * LONG_DELAY_MS, MILLISECONDS); + q.poll(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); }}); - await(threadStarted); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + await(pleaseInterrupt); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -352,7 +352,7 @@ public void realRun() { Thread.currentThread().interrupt(); try { - q.poll(2 * LONG_DELAY_MS, MILLISECONDS); + q.poll(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); diff --git a/test/jdk/java/util/concurrent/tck/ConcurrentLinkedDequeTest.java b/test/jdk/java/util/concurrent/tck/ConcurrentLinkedDequeTest.java --- a/test/jdk/java/util/concurrent/tck/ConcurrentLinkedDequeTest.java +++ b/test/jdk/java/util/concurrent/tck/ConcurrentLinkedDequeTest.java @@ -940,7 +940,7 @@ } void runAsync(Runnable r1, Runnable r2) { - boolean b = ThreadLocalRandom.current().nextBoolean(); + boolean b = randomBoolean(); CompletableFuture f1 = CompletableFuture.runAsync(b ? r1 : r2); CompletableFuture f2 = CompletableFuture.runAsync(b ? r2 : r1); f1.join(); @@ -1003,10 +1003,6 @@ } } - T chooseRandomly(T... choices) { - return choices[ThreadLocalRandom.current().nextInt(choices.length)]; - } - /** * Non-traversing Deque operations (that return null) are linearizable. * Don't return null when the deque is observably never empty. diff --git a/test/jdk/java/util/concurrent/tck/CountDownLatchTest.java b/test/jdk/java/util/concurrent/tck/CountDownLatchTest.java --- a/test/jdk/java/util/concurrent/tck/CountDownLatchTest.java +++ b/test/jdk/java/util/concurrent/tck/CountDownLatchTest.java @@ -36,6 +36,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import junit.framework.Test; import junit.framework.TestSuite; @@ -99,7 +100,7 @@ assertEquals(2, l.getCount()); l.countDown(); assertEquals(1, l.getCount()); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); l.countDown(); assertEquals(0, l.getCount()); awaitTermination(t); @@ -124,7 +125,7 @@ assertEquals(2, l.getCount()); l.countDown(); assertEquals(1, l.getCount()); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); l.countDown(); assertEquals(0, l.getCount()); awaitTermination(t); @@ -156,7 +157,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -165,29 +166,30 @@ * timed await throws InterruptedException if interrupted before counted down */ public void testTimedAwait_Interruptible() { - final CountDownLatch l = new CountDownLatch(1); + final int initialCount = ThreadLocalRandom.current().nextInt(1, 3); + final CountDownLatch l = new CountDownLatch(initialCount); final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { Thread.currentThread().interrupt(); try { - l.await(LONG_DELAY_MS, MILLISECONDS); + l.await(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - l.await(LONG_DELAY_MS, MILLISECONDS); + l.await(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); - assertEquals(1, l.getCount()); + assertEquals(initialCount, l.getCount()); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -200,7 +202,11 @@ Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { assertEquals(1, l.getCount()); + + long startTime = System.nanoTime(); assertFalse(l.await(timeoutMillis(), MILLISECONDS)); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + assertEquals(1, l.getCount()); }}); diff --git a/test/jdk/java/util/concurrent/tck/CyclicBarrierTest.java b/test/jdk/java/util/concurrent/tck/CyclicBarrierTest.java --- a/test/jdk/java/util/concurrent/tck/CyclicBarrierTest.java +++ b/test/jdk/java/util/concurrent/tck/CyclicBarrierTest.java @@ -42,7 +42,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Test; @@ -323,34 +322,6 @@ } /** - * All threads block while a barrier is broken. - */ - public void testReset_Leakage() throws InterruptedException { - final CyclicBarrier c = new CyclicBarrier(2); - final AtomicBoolean done = new AtomicBoolean(); - Thread t = newStartedThread(new CheckedRunnable() { - public void realRun() { - while (!done.get()) { - try { - while (c.isBroken()) - c.reset(); - - c.await(); - shouldThrow(); - } - catch (BrokenBarrierException | InterruptedException ok) {} - }}}); - - for (int i = 0; i < 4; i++) { - delay(timeoutMillis()); - t.interrupt(); - } - done.set(true); - t.interrupt(); - awaitTermination(t); - } - - /** * Reset of a non-broken barrier does not break barrier */ public void testResetWithoutBreakage() throws Exception { @@ -505,7 +476,7 @@ final ExecutorService e = Executors.newFixedThreadPool(nTasks); final Runnable awaiter = () -> { try { - if (ThreadLocalRandom.current().nextBoolean()) + if (randomBoolean()) barrier.await(); else barrier.await(LONG_DELAY_MS, MILLISECONDS); diff --git a/test/jdk/java/util/concurrent/tck/DelayQueueTest.java b/test/jdk/java/util/concurrent/tck/DelayQueueTest.java --- a/test/jdk/java/util/concurrent/tck/DelayQueueTest.java +++ b/test/jdk/java/util/concurrent/tck/DelayQueueTest.java @@ -332,7 +332,7 @@ } /** - * timed offer does not time out + * Queue is unbounded, so timed offer never times out */ public void testTimedOffer() throws InterruptedException { final DelayQueue q = new DelayQueue(); @@ -384,7 +384,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -436,30 +436,27 @@ final DelayQueue q = populatedQueue(SIZE); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - long startTime = System.nanoTime(); for (int i = 0; i < SIZE; i++) assertEquals(new PDelay(i), ((PDelay)q.poll(LONG_DELAY_MS, MILLISECONDS))); Thread.currentThread().interrupt(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); - - assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); checkEmpty(q); diff --git a/test/jdk/java/util/concurrent/tck/DoubleAccumulatorTest.java b/test/jdk/java/util/concurrent/tck/DoubleAccumulatorTest.java --- a/test/jdk/java/util/concurrent/tck/DoubleAccumulatorTest.java +++ b/test/jdk/java/util/concurrent/tck/DoubleAccumulatorTest.java @@ -156,7 +156,7 @@ = new DoubleAccumulator((x, y) -> x + y, 0.0); final int nThreads = ThreadLocalRandom.current().nextInt(1, 5); final Phaser phaser = new Phaser(nThreads + 1); - final int incs = 1_000_000; + final int incs = expensiveTests ? 1_000_000 : 100_000; final double total = nThreads * incs/2.0 * (incs - 1); // Gauss final Runnable task = () -> { phaser.arriveAndAwaitAdvance(); diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinPool9Test.java b/test/jdk/java/util/concurrent/tck/ForkJoinPool9Test.java --- a/test/jdk/java/util/concurrent/tck/ForkJoinPool9Test.java +++ b/test/jdk/java/util/concurrent/tck/ForkJoinPool9Test.java @@ -38,7 +38,6 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Stream; import junit.framework.Test; @@ -81,7 +80,7 @@ Thread currentThread = Thread.currentThread(); Stream.of(systemClassLoader, null).forEach(cl -> { - if (ThreadLocalRandom.current().nextBoolean()) + if (randomBoolean()) // should always be permitted, without effect currentThread.setContextClassLoader(cl); }); diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinTask8Test.java b/test/jdk/java/util/concurrent/tck/ForkJoinTask8Test.java --- a/test/jdk/java/util/concurrent/tck/ForkJoinTask8Test.java +++ b/test/jdk/java/util/concurrent/tck/ForkJoinTask8Test.java @@ -559,6 +559,8 @@ AsyncFib f = new AsyncFib(8); assertSame(f, f.fork()); helpQuiesce(); + while (!f.isDone()) // wait out race + ; assertEquals(0, getQueuedTaskCount()); f.checkCompletedNormally(); }}; diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java b/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java --- a/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java +++ b/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java @@ -526,6 +526,8 @@ AsyncFib f = new AsyncFib(8); assertSame(f, f.fork()); helpQuiesce(); + while (!f.isDone()) // wait out race + ; assertEquals(21, f.number); assertEquals(0, getQueuedTaskCount()); checkCompletedNormally(f); diff --git a/test/jdk/java/util/concurrent/tck/FutureTaskTest.java b/test/jdk/java/util/concurrent/tck/FutureTaskTest.java --- a/test/jdk/java/util/concurrent/tck/FutureTaskTest.java +++ b/test/jdk/java/util/concurrent/tck/FutureTaskTest.java @@ -747,7 +747,7 @@ /** * get is interruptible */ - public void testGet_interruptible() { + public void testGet_Interruptible() { final CountDownLatch pleaseInterrupt = new CountDownLatch(1); final FutureTask task = new FutureTask(new NoOpCallable()); Thread t = newStartedThread(new CheckedRunnable() { @@ -776,27 +776,28 @@ /** * timed get is interruptible */ - public void testTimedGet_interruptible() { + public void testTimedGet_Interruptible() { final CountDownLatch pleaseInterrupt = new CountDownLatch(1); final FutureTask task = new FutureTask(new NoOpCallable()); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws Exception { Thread.currentThread().interrupt(); try { - task.get(2*LONG_DELAY_MS, MILLISECONDS); + task.get(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - task.get(2*LONG_DELAY_MS, MILLISECONDS); + task.get(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); }}); await(pleaseInterrupt); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); checkNotDone(task); diff --git a/test/jdk/java/util/concurrent/tck/JSR166TestCase.java b/test/jdk/java/util/concurrent/tck/JSR166TestCase.java --- a/test/jdk/java/util/concurrent/tck/JSR166TestCase.java +++ b/test/jdk/java/util/concurrent/tck/JSR166TestCase.java @@ -311,12 +311,13 @@ static volatile TestCase currentTestCase; // static volatile int currentRun = 0; static { - Runnable checkForWedgedTest = new Runnable() { public void run() { + Runnable wedgedTestDetector = new Runnable() { public void run() { // Avoid spurious reports with enormous runsPerTest. // A single test case run should never take more than 1 second. // But let's cap it at the high end too ... - final int timeoutMinutes = - Math.min(15, Math.max(runsPerTest / 60, 1)); + final int timeoutMinutesMin = Math.max(runsPerTest / 60, 1) + * Math.max((int) delayFactor, 1); + final int timeoutMinutes = Math.min(15, timeoutMinutesMin); for (TestCase lastTestCase = currentTestCase;;) { try { MINUTES.sleep(timeoutMinutes); } catch (InterruptedException unexpected) { break; } @@ -336,7 +337,7 @@ } lastTestCase = currentTestCase; }}}; - Thread thread = new Thread(checkForWedgedTest, "checkForWedgedTest"); + Thread thread = new Thread(wedgedTestDetector, "WedgedTestDetector"); thread.setDaemon(true); thread.start(); } @@ -380,7 +381,7 @@ // Never report first run of any test; treat it as a // warmup run, notably to trigger all needed classloading, if (i > 0) - System.out.printf("%n%s: %d%n", toString(), elapsedMillis); + System.out.printf("%s: %d%n", toString(), elapsedMillis); } } @@ -683,6 +684,12 @@ public static long MEDIUM_DELAY_MS; public static long LONG_DELAY_MS; + /** + * A delay significantly longer than LONG_DELAY_MS. + * Use this in a thread that is waited for via awaitTermination(Thread). + */ + public static long LONGER_DELAY_MS; + private static final long RANDOM_TIMEOUT; private static final long RANDOM_EXPIRED_TIMEOUT; private static final TimeUnit RANDOM_TIMEUNIT; @@ -711,6 +718,20 @@ static TimeUnit randomTimeUnit() { return RANDOM_TIMEUNIT; } /** + * Returns a random boolean; a "coin flip". + */ + static boolean randomBoolean() { + return ThreadLocalRandom.current().nextBoolean(); + } + + /** + * Returns a random element from given choices. + */ + T chooseRandomly(T... choices) { + return choices[ThreadLocalRandom.current().nextInt(choices.length)]; + } + + /** * Returns the shortest timed delay. This can be scaled up for * slow machines using the jsr166.delay.factor system property, * or via jtreg's -timeoutFactor: flag. @@ -728,6 +749,7 @@ SMALL_DELAY_MS = SHORT_DELAY_MS * 5; MEDIUM_DELAY_MS = SHORT_DELAY_MS * 10; LONG_DELAY_MS = SHORT_DELAY_MS * 200; + LONGER_DELAY_MS = 2 * LONG_DELAY_MS; } private static final long TIMEOUT_DELAY_MS @@ -766,8 +788,8 @@ */ public void threadRecordFailure(Throwable t) { System.err.println(t); - dumpTestThreads(); - threadFailure.compareAndSet(null, t); + if (threadFailure.compareAndSet(null, t)) + dumpTestThreads(); } public void setUp() { @@ -1088,6 +1110,39 @@ } } + /** Returns true if thread info might be useful in a thread dump. */ + static boolean threadOfInterest(ThreadInfo info) { + final String name = info.getThreadName(); + String lockName; + if (name == null) + return true; + if (name.equals("Signal Dispatcher") + || name.equals("WedgedTestDetector")) + return false; + if (name.equals("Reference Handler")) { + // Reference Handler stacktrace changed in JDK-8156500 + StackTraceElement[] stackTrace; String methodName; + if ((stackTrace = info.getStackTrace()) != null + && stackTrace.length > 0 + && (methodName = stackTrace[0].getMethodName()) != null + && methodName.equals("waitForReferencePendingList")) + return false; + // jdk8 Reference Handler stacktrace + if ((lockName = info.getLockName()) != null + && lockName.startsWith("java.lang.ref")) + return false; + } + if ((name.equals("Finalizer") || name.equals("Common-Cleaner")) + && (lockName = info.getLockName()) != null + && lockName.startsWith("java.lang.ref")) + return false; + if (name.startsWith("ForkJoinPool.commonPool-worker") + && (lockName = info.getLockName()) != null + && lockName.startsWith("java.util.concurrent.ForkJoinPool")) + return false; + return true; + } + /** * A debugging tool to print stack traces of most threads, as jstack does. * Uninteresting threads are filtered out. @@ -1104,23 +1159,9 @@ ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); System.err.println("------ stacktrace dump start ------"); - for (ThreadInfo info : threadMXBean.dumpAllThreads(true, true)) { - final String name = info.getThreadName(); - String lockName; - if ("Signal Dispatcher".equals(name)) - continue; - if ("Reference Handler".equals(name) - && (lockName = info.getLockName()) != null - && lockName.startsWith("java.lang.ref.Reference$Lock")) - continue; - if ("Finalizer".equals(name) - && (lockName = info.getLockName()) != null - && lockName.startsWith("java.lang.ref.ReferenceQueue$Lock")) - continue; - if ("checkForWedgedTest".equals(name)) - continue; - System.err.print(info); - } + for (ThreadInfo info : threadMXBean.dumpAllThreads(true, true)) + if (threadOfInterest(info)) + System.err.print(info); System.err.println("------ stacktrace dump end ------"); if (sm != null) System.setSecurityManager(sm); @@ -1393,6 +1434,20 @@ } /** + * Spin-waits up to LONG_DELAY_MS milliseconds for the current thread to + * be interrupted. Clears the interrupt status before returning. + */ + void awaitInterrupted() { + for (long startTime = 0L; !Thread.interrupted(); ) { + if (startTime == 0L) + startTime = System.nanoTime(); + else if (millisElapsedSince(startTime) > LONG_DELAY_MS) + fail("timed out waiting for thread interrupt"); + Thread.yield(); + } + } + + /** * Returns the number of milliseconds since time given by * startNanoTime, which must have been previously returned from a * call to {@link System#nanoTime()}. @@ -1401,19 +1456,6 @@ return NANOSECONDS.toMillis(System.nanoTime() - startNanoTime); } -// void assertTerminatesPromptly(long timeoutMillis, Runnable r) { -// long startTime = System.nanoTime(); -// try { -// r.run(); -// } catch (Throwable fail) { threadUnexpectedException(fail); } -// if (millisElapsedSince(startTime) > timeoutMillis/2) -// throw new AssertionError("did not return promptly"); -// } - -// void assertTerminatesPromptly(Runnable r) { -// assertTerminatesPromptly(LONG_DELAY_MS/2, r); -// } - /** * Checks that timed f.get() returns the expected value, and does not * wait for the timeout to elapse before returning. @@ -1448,15 +1490,21 @@ * to terminate (using {@link Thread#join(long)}), else interrupts * the thread (in the hope that it may terminate later) and fails. */ - void awaitTermination(Thread t, long timeoutMillis) { + void awaitTermination(Thread thread, long timeoutMillis) { try { - t.join(timeoutMillis); + thread.join(timeoutMillis); } catch (InterruptedException fail) { threadUnexpectedException(fail); - } finally { - if (t.getState() != Thread.State.TERMINATED) { - t.interrupt(); - threadFail("timed out waiting for thread to terminate"); + } + if (thread.getState() != Thread.State.TERMINATED) { + String detail = String.format( + "timed out waiting for thread to terminate, thread=%s, state=%s" , + thread, thread.getState()); + try { + threadFail(detail); + } finally { + // Interrupt thread __after__ having reported its stack trace + thread.interrupt(); } } } diff --git a/test/jdk/java/util/concurrent/tck/LinkedBlockingDequeTest.java b/test/jdk/java/util/concurrent/tck/LinkedBlockingDequeTest.java --- a/test/jdk/java/util/concurrent/tck/LinkedBlockingDequeTest.java +++ b/test/jdk/java/util/concurrent/tck/LinkedBlockingDequeTest.java @@ -631,7 +631,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(SIZE, q.size()); @@ -673,7 +673,7 @@ assertEquals(0, q.take()); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(0, q.remainingCapacity()); @@ -690,26 +690,27 @@ q.put(new Object()); q.put(new Object()); long startTime = System.nanoTime(); + assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); Thread.currentThread().interrupt(); try { - q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offer(new Object(), randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offer(new Object(), LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -750,7 +751,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -802,29 +803,26 @@ final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - long startTime = System.nanoTime(); for (int i = 0; i < SIZE; i++) assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); Thread.currentThread().interrupt(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); - - assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); checkEmpty(q); @@ -883,7 +881,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(SIZE, q.size()); @@ -918,7 +916,7 @@ assertEquals(capacity - 1, q.take()); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(0, q.remainingCapacity()); @@ -935,26 +933,27 @@ q.putFirst(new Object()); q.putFirst(new Object()); long startTime = System.nanoTime(); + assertFalse(q.offerFirst(new Object(), timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); Thread.currentThread().interrupt(); try { - q.offerFirst(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offerFirst(new Object(), randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.offerFirst(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offerFirst(new Object(), LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -986,7 +985,7 @@ }}); await(threadStarted); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -1027,7 +1026,7 @@ }}); await(threadStarted); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -1077,7 +1076,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -1118,29 +1117,26 @@ final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - long startTime = System.nanoTime(); for (int i = 0; i < SIZE; i++) assertEquals(i, q.pollFirst(LONG_DELAY_MS, MILLISECONDS)); Thread.currentThread().interrupt(); try { - q.pollFirst(LONG_DELAY_MS, MILLISECONDS); + q.pollFirst(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.pollFirst(LONG_DELAY_MS, MILLISECONDS); + q.pollFirst(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); - - assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -1164,7 +1160,7 @@ Thread.currentThread().interrupt(); try { - q.pollFirst(LONG_DELAY_MS, MILLISECONDS); + q.pollFirst(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} @@ -1174,6 +1170,7 @@ shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); + assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); @@ -1182,7 +1179,7 @@ assertTrue(q.offerFirst(zero, LONG_DELAY_MS, MILLISECONDS)); assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); barrier.await(); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -1240,7 +1237,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(SIZE, q.size()); @@ -1282,7 +1279,7 @@ assertEquals(0, q.take()); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(0, q.remainingCapacity()); @@ -1299,24 +1296,25 @@ q.putLast(new Object()); q.putLast(new Object()); long startTime = System.nanoTime(); + assertFalse(q.offerLast(new Object(), timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); Thread.currentThread().interrupt(); try { - q.offerLast(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offerLast(new Object(), randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} pleaseInterrupt.countDown(); try { - q.offerLast(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offerLast(new Object(), LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -1358,7 +1356,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -1399,30 +1397,27 @@ final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - long startTime = System.nanoTime(); for (int i = 0; i < SIZE; i++) assertEquals(SIZE - i - 1, q.pollLast(LONG_DELAY_MS, MILLISECONDS)); Thread.currentThread().interrupt(); try { - q.pollLast(LONG_DELAY_MS, MILLISECONDS); + q.pollLast(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.pollLast(LONG_DELAY_MS, MILLISECONDS); + q.pollLast(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); - - assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); checkEmpty(q); @@ -1447,7 +1442,7 @@ Thread.currentThread().interrupt(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); @@ -1468,7 +1463,7 @@ assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); barrier.await(); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } diff --git a/test/jdk/java/util/concurrent/tck/LinkedBlockingQueueTest.java b/test/jdk/java/util/concurrent/tck/LinkedBlockingQueueTest.java --- a/test/jdk/java/util/concurrent/tck/LinkedBlockingQueueTest.java +++ b/test/jdk/java/util/concurrent/tck/LinkedBlockingQueueTest.java @@ -318,7 +318,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(SIZE, q.size()); @@ -360,7 +360,7 @@ assertEquals(0, q.take()); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(0, q.remainingCapacity()); @@ -376,28 +376,28 @@ public void realRun() throws InterruptedException { q.put(new Object()); q.put(new Object()); + long startTime = System.nanoTime(); - long startTime = System.nanoTime(); assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); Thread.currentThread().interrupt(); try { - q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offer(new Object(), randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offer(new Object(), LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -438,7 +438,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -490,29 +490,26 @@ final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - long startTime = System.nanoTime(); for (int i = 0; i < SIZE; i++) assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); Thread.currentThread().interrupt(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); - - assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); checkEmpty(q); diff --git a/test/jdk/java/util/concurrent/tck/LinkedTransferQueueTest.java b/test/jdk/java/util/concurrent/tck/LinkedTransferQueueTest.java --- a/test/jdk/java/util/concurrent/tck/LinkedTransferQueueTest.java +++ b/test/jdk/java/util/concurrent/tck/LinkedTransferQueueTest.java @@ -254,7 +254,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -308,29 +308,26 @@ final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - long startTime = System.nanoTime(); for (int i = 0; i < SIZE; i++) assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); Thread.currentThread().interrupt(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); - - assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); checkEmpty(q); @@ -344,16 +341,14 @@ final BlockingQueue q = populatedQueue(SIZE); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - long startTime = System.nanoTime(); Thread.currentThread().interrupt(); for (int i = 0; i < SIZE; ++i) - assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); + assertEquals(i, (int) q.poll(randomTimeout(), randomTimeUnit())); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); - assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); awaitTermination(t); @@ -982,25 +977,23 @@ Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - long startTime = System.nanoTime(); Thread.currentThread().interrupt(); try { - q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS); + q.tryTransfer(new Object(), randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS); + q.tryTransfer(new Object(), LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); - assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); checkEmpty(q); diff --git a/test/jdk/java/util/concurrent/tck/LongAccumulatorTest.java b/test/jdk/java/util/concurrent/tck/LongAccumulatorTest.java --- a/test/jdk/java/util/concurrent/tck/LongAccumulatorTest.java +++ b/test/jdk/java/util/concurrent/tck/LongAccumulatorTest.java @@ -150,7 +150,7 @@ = new LongAccumulator((x, y) -> x + y, 0L); final int nThreads = ThreadLocalRandom.current().nextInt(1, 5); final Phaser phaser = new Phaser(nThreads + 1); - final int incs = 1_000_000; + final int incs = expensiveTests ? 1_000_000 : 100_000; final long total = nThreads * incs/2L * (incs - 1); // Gauss final Runnable task = () -> { phaser.arriveAndAwaitAdvance(); diff --git a/test/jdk/java/util/concurrent/tck/MapTest.java b/test/jdk/java/util/concurrent/tck/MapTest.java --- a/test/jdk/java/util/concurrent/tck/MapTest.java +++ b/test/jdk/java/util/concurrent/tck/MapTest.java @@ -122,6 +122,7 @@ */ public void testBug8186171() { if (!impl.supportsSetValue()) return; + if (!atLeastJava10()) return; // jdk9 is no longer maintained final ThreadLocalRandom rnd = ThreadLocalRandom.current(); final boolean permitsNullValues = impl.permitsNullValues(); final Object v1 = (permitsNullValues && rnd.nextBoolean()) diff --git a/test/jdk/java/util/concurrent/tck/PhaserTest.java b/test/jdk/java/util/concurrent/tck/PhaserTest.java --- a/test/jdk/java/util/concurrent/tck/PhaserTest.java +++ b/test/jdk/java/util/concurrent/tck/PhaserTest.java @@ -480,7 +480,7 @@ /** * awaitAdvanceInterruptibly blocks interruptibly */ - public void testAwaitAdvanceInterruptibly_interruptible() throws InterruptedException { + public void testAwaitAdvanceInterruptibly_Interruptible() throws InterruptedException { final Phaser phaser = new Phaser(1); final CountDownLatch pleaseInterrupt = new CountDownLatch(2); @@ -505,14 +505,14 @@ public void realRun() throws TimeoutException { Thread.currentThread().interrupt(); try { - phaser.awaitAdvanceInterruptibly(0, 2*LONG_DELAY_MS, MILLISECONDS); + phaser.awaitAdvanceInterruptibly(0, randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - phaser.awaitAdvanceInterruptibly(0, 2*LONG_DELAY_MS, MILLISECONDS); + phaser.awaitAdvanceInterruptibly(0, LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); @@ -520,8 +520,8 @@ await(pleaseInterrupt); assertState(phaser, 0, 1, 1); - assertThreadBlocks(t1, Thread.State.WAITING); - assertThreadBlocks(t2, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t1, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t2, Thread.State.TIMED_WAITING); t1.interrupt(); t2.interrupt(); awaitTermination(t1); diff --git a/test/jdk/java/util/concurrent/tck/PriorityBlockingQueueTest.java b/test/jdk/java/util/concurrent/tck/PriorityBlockingQueueTest.java --- a/test/jdk/java/util/concurrent/tck/PriorityBlockingQueueTest.java +++ b/test/jdk/java/util/concurrent/tck/PriorityBlockingQueueTest.java @@ -346,7 +346,7 @@ } /** - * timed offer does not time out + * Queue is unbounded, so timed offer never times out */ public void testTimedOffer() { final PriorityBlockingQueue q = new PriorityBlockingQueue(2); @@ -397,7 +397,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); } @@ -449,29 +449,26 @@ final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - long startTime = System.nanoTime(); for (int i = 0; i < SIZE; i++) assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); Thread.currentThread().interrupt(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); - - assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } diff --git a/test/jdk/java/util/concurrent/tck/ScheduledExecutorSubclassTest.java b/test/jdk/java/util/concurrent/tck/ScheduledExecutorSubclassTest.java --- a/test/jdk/java/util/concurrent/tck/ScheduledExecutorSubclassTest.java +++ b/test/jdk/java/util/concurrent/tck/ScheduledExecutorSubclassTest.java @@ -707,7 +707,7 @@ Runnable waiter = new CheckedRunnable() { public void realRun() { threadsStarted.countDown(); try { - MILLISECONDS.sleep(2 * LONG_DELAY_MS); + MILLISECONDS.sleep(LONGER_DELAY_MS); } catch (InterruptedException success) {} ran.getAndIncrement(); }}; diff --git a/test/jdk/java/util/concurrent/tck/ScheduledExecutorTest.java b/test/jdk/java/util/concurrent/tck/ScheduledExecutorTest.java --- a/test/jdk/java/util/concurrent/tck/ScheduledExecutorTest.java +++ b/test/jdk/java/util/concurrent/tck/ScheduledExecutorTest.java @@ -666,7 +666,7 @@ Runnable waiter = new CheckedRunnable() { public void realRun() { threadsStarted.countDown(); try { - MILLISECONDS.sleep(2 * LONG_DELAY_MS); + MILLISECONDS.sleep(LONGER_DELAY_MS); } catch (InterruptedException success) {} ran.getAndIncrement(); }}; diff --git a/test/jdk/java/util/concurrent/tck/SemaphoreTest.java b/test/jdk/java/util/concurrent/tck/SemaphoreTest.java --- a/test/jdk/java/util/concurrent/tck/SemaphoreTest.java +++ b/test/jdk/java/util/concurrent/tck/SemaphoreTest.java @@ -38,7 +38,6 @@ import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadLocalRandom; import junit.framework.Test; import junit.framework.TestSuite; @@ -220,24 +219,22 @@ /** * timed tryAcquire times out */ - public void testTryAcquire_timeout() { - final boolean fair = ThreadLocalRandom.current().nextBoolean(); + public void testTryAcquire_timeout() throws InterruptedException { + final boolean fair = randomBoolean(); final Semaphore s = new Semaphore(0, fair); final long startTime = System.nanoTime(); - try { assertFalse(s.tryAcquire(timeoutMillis(), MILLISECONDS)); } - catch (InterruptedException e) { threadUnexpectedException(e); } + assertFalse(s.tryAcquire(timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); } /** * timed tryAcquire(N) times out */ - public void testTryAcquireN_timeout() { - final boolean fair = ThreadLocalRandom.current().nextBoolean(); + public void testTryAcquireN_timeout() throws InterruptedException { + final boolean fair = randomBoolean(); final Semaphore s = new Semaphore(2, fair); final long startTime = System.nanoTime(); - try { assertFalse(s.tryAcquire(3, timeoutMillis(), MILLISECONDS)); } - catch (InterruptedException e) { threadUnexpectedException(e); } + assertFalse(s.tryAcquire(3, timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); } diff --git a/test/jdk/java/util/concurrent/tck/SynchronousQueueTest.java b/test/jdk/java/util/concurrent/tck/SynchronousQueueTest.java --- a/test/jdk/java/util/concurrent/tck/SynchronousQueueTest.java +++ b/test/jdk/java/util/concurrent/tck/SynchronousQueueTest.java @@ -45,7 +45,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadLocalRandom; import junit.framework.Test; @@ -166,7 +165,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(0, q.remainingCapacity()); @@ -207,7 +206,7 @@ catch (InterruptedException e) { threadUnexpectedException(e); } await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); t.interrupt(); awaitTermination(t); assertEquals(0, q.remainingCapacity()); @@ -217,32 +216,33 @@ * timed offer times out if elements not taken */ public void testTimedOffer() { - final boolean fair = ThreadLocalRandom.current().nextBoolean(); + final boolean fair = randomBoolean(); final SynchronousQueue q = new SynchronousQueue(fair); final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { long startTime = System.nanoTime(); + assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); Thread.currentThread().interrupt(); try { - q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offer(new Object(), randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); + q.offer(new Object(), LONGER_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -272,7 +272,7 @@ * timed poll with nonzero timeout times out if no active putter */ public void testTimedPoll() { - final boolean fair = ThreadLocalRandom.current().nextBoolean(); + final boolean fair = randomBoolean(); final SynchronousQueue q = new SynchronousQueue(fair); final long startTime = System.nanoTime(); try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); } @@ -285,7 +285,7 @@ * after offer succeeds; on interruption throws */ public void testTimedPollWithOffer() { - final boolean fair = ThreadLocalRandom.current().nextBoolean(); + final boolean fair = randomBoolean(); final SynchronousQueue q = new SynchronousQueue(fair); final CountDownLatch pleaseOffer = new CountDownLatch(1); final CountDownLatch pleaseInterrupt = new CountDownLatch(1); @@ -301,7 +301,7 @@ Thread.currentThread().interrupt(); try { - q.poll(LONG_DELAY_MS, MILLISECONDS); + q.poll(randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); @@ -323,7 +323,7 @@ assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } diff --git a/test/jdk/java/util/concurrent/tck/ThreadPoolExecutorSubclassTest.java b/test/jdk/java/util/concurrent/tck/ThreadPoolExecutorSubclassTest.java --- a/test/jdk/java/util/concurrent/tck/ThreadPoolExecutorSubclassTest.java +++ b/test/jdk/java/util/concurrent/tck/ThreadPoolExecutorSubclassTest.java @@ -796,7 +796,7 @@ Runnable waiter = new CheckedRunnable() { public void realRun() { threadsStarted.countDown(); try { - MILLISECONDS.sleep(2 * LONG_DELAY_MS); + MILLISECONDS.sleep(LONGER_DELAY_MS); } catch (InterruptedException success) {} ran.getAndIncrement(); }}; @@ -1669,7 +1669,7 @@ l.add(latchAwaitingStringTask(latch)); l.add(null); try { - e.invokeAny(l, randomTimeout(), MILLISECONDS); + e.invokeAny(l, randomTimeout(), randomTimeUnit()); shouldThrow(); } catch (NullPointerException success) {} latch.countDown(); diff --git a/test/jdk/java/util/concurrent/tck/ThreadPoolExecutorTest.java b/test/jdk/java/util/concurrent/tck/ThreadPoolExecutorTest.java --- a/test/jdk/java/util/concurrent/tck/ThreadPoolExecutorTest.java +++ b/test/jdk/java/util/concurrent/tck/ThreadPoolExecutorTest.java @@ -699,7 +699,7 @@ Runnable waiter = new CheckedRunnable() { public void realRun() { threadsStarted.countDown(); try { - MILLISECONDS.sleep(2 * LONG_DELAY_MS); + MILLISECONDS.sleep(LONGER_DELAY_MS); } catch (InterruptedException success) {} ran.getAndIncrement(); }}; diff --git a/test/jdk/java/util/concurrent/tck/TimeUnitTest.java b/test/jdk/java/util/concurrent/tck/TimeUnitTest.java --- a/test/jdk/java/util/concurrent/tck/TimeUnitTest.java +++ b/test/jdk/java/util/concurrent/tck/TimeUnitTest.java @@ -56,243 +56,56 @@ return new TestSuite(TimeUnitTest.class); } - // (loops to 88888 check increments at all time divisions.) - - /** - * convert correctly converts sample values across the units - */ - public void testConvert() { - for (long t = 0; t < 88888; ++t) { - assertEquals(t*60*60*24, - SECONDS.convert(t, DAYS)); - assertEquals(t*60*60, - SECONDS.convert(t, HOURS)); - assertEquals(t*60, - SECONDS.convert(t, MINUTES)); - assertEquals(t, - SECONDS.convert(t, SECONDS)); - assertEquals(t, - SECONDS.convert(1000L*t, MILLISECONDS)); - assertEquals(t, - SECONDS.convert(1000000L*t, MICROSECONDS)); - assertEquals(t, - SECONDS.convert(1000000000L*t, NANOSECONDS)); - - assertEquals(1000L*t*60*60*24, - MILLISECONDS.convert(t, DAYS)); - assertEquals(1000L*t*60*60, - MILLISECONDS.convert(t, HOURS)); - assertEquals(1000L*t*60, - MILLISECONDS.convert(t, MINUTES)); - assertEquals(1000L*t, - MILLISECONDS.convert(t, SECONDS)); - assertEquals(t, - MILLISECONDS.convert(t, MILLISECONDS)); - assertEquals(t, - MILLISECONDS.convert(1000L*t, MICROSECONDS)); - assertEquals(t, - MILLISECONDS.convert(1000000L*t, NANOSECONDS)); - - assertEquals(1000000L*t*60*60*24, - MICROSECONDS.convert(t, DAYS)); - assertEquals(1000000L*t*60*60, - MICROSECONDS.convert(t, HOURS)); - assertEquals(1000000L*t*60, - MICROSECONDS.convert(t, MINUTES)); - assertEquals(1000000L*t, - MICROSECONDS.convert(t, SECONDS)); - assertEquals(1000L*t, - MICROSECONDS.convert(t, MILLISECONDS)); - assertEquals(t, - MICROSECONDS.convert(t, MICROSECONDS)); - assertEquals(t, - MICROSECONDS.convert(1000L*t, NANOSECONDS)); - - assertEquals(1000000000L*t*60*60*24, - NANOSECONDS.convert(t, DAYS)); - assertEquals(1000000000L*t*60*60, - NANOSECONDS.convert(t, HOURS)); - assertEquals(1000000000L*t*60, - NANOSECONDS.convert(t, MINUTES)); - assertEquals(1000000000L*t, - NANOSECONDS.convert(t, SECONDS)); - assertEquals(1000000L*t, - NANOSECONDS.convert(t, MILLISECONDS)); - assertEquals(1000L*t, - NANOSECONDS.convert(t, MICROSECONDS)); - assertEquals(t, - NANOSECONDS.convert(t, NANOSECONDS)); + void testConversion(TimeUnit x, TimeUnit y, long n, long expected) { + assertEquals(expected, x.convert(n, y)); + switch (x) { + case NANOSECONDS: assertEquals(expected, y.toNanos(n)); break; + case MICROSECONDS: assertEquals(expected, y.toMicros(n)); break; + case MILLISECONDS: assertEquals(expected, y.toMillis(n)); break; + case SECONDS: assertEquals(expected, y.toSeconds(n)); break; + case MINUTES: assertEquals(expected, y.toMinutes(n)); break; + case HOURS: assertEquals(expected, y.toHours(n)); break; + case DAYS: assertEquals(expected, y.toDays(n)); break; + default: throw new AssertionError(); } - for (TimeUnit x : TimeUnit.values()) { - long[] zs = { - 0, 1, -1, - Integer.MAX_VALUE, Integer.MIN_VALUE, - Long.MAX_VALUE, Long.MIN_VALUE, - }; - for (long z : zs) assertEquals(z, x.convert(z, x)); - } + if (n > 0) testConversion(x, y, -n, -expected); } - /** - * toNanos correctly converts sample values in different units to - * nanoseconds - */ - public void testToNanos() { - for (long t = 0; t < 88888; ++t) { - assertEquals(t*1000000000L*60*60*24, - DAYS.toNanos(t)); - assertEquals(t*1000000000L*60*60, - HOURS.toNanos(t)); - assertEquals(t*1000000000L*60, - MINUTES.toNanos(t)); - assertEquals(1000000000L*t, - SECONDS.toNanos(t)); - assertEquals(1000000L*t, - MILLISECONDS.toNanos(t)); - assertEquals(1000L*t, - MICROSECONDS.toNanos(t)); - assertEquals(t, - NANOSECONDS.toNanos(t)); + void testConversion(TimeUnit x, TimeUnit y) { + long ratio = x.toNanos(1)/y.toNanos(1); + assertTrue(ratio > 0); + long[] ns = { 0, 1, 2, Long.MAX_VALUE/ratio, Long.MIN_VALUE/ratio }; + for (long n : ns) { + testConversion(y, x, n, n * ratio); + long[] ks = { n * ratio, n * ratio + 1, n * ratio - 1 }; + for (long k : ks) { + testConversion(x, y, k, k / ratio); + } } } /** - * toMicros correctly converts sample values in different units to - * microseconds - */ - public void testToMicros() { - for (long t = 0; t < 88888; ++t) { - assertEquals(t*1000000L*60*60*24, - DAYS.toMicros(t)); - assertEquals(t*1000000L*60*60, - HOURS.toMicros(t)); - assertEquals(t*1000000L*60, - MINUTES.toMicros(t)); - assertEquals(1000000L*t, - SECONDS.toMicros(t)); - assertEquals(1000L*t, - MILLISECONDS.toMicros(t)); - assertEquals(t, - MICROSECONDS.toMicros(t)); - assertEquals(t, - NANOSECONDS.toMicros(t*1000L)); - } - } - - /** - * toMillis correctly converts sample values in different units to - * milliseconds + * Conversion methods correctly convert sample values */ - public void testToMillis() { - for (long t = 0; t < 88888; ++t) { - assertEquals(t*1000L*60*60*24, - DAYS.toMillis(t)); - assertEquals(t*1000L*60*60, - HOURS.toMillis(t)); - assertEquals(t*1000L*60, - MINUTES.toMillis(t)); - assertEquals(1000L*t, - SECONDS.toMillis(t)); - assertEquals(t, - MILLISECONDS.toMillis(t)); - assertEquals(t, - MICROSECONDS.toMillis(t*1000L)); - assertEquals(t, - NANOSECONDS.toMillis(t*1000000L)); - } - } + public void testConversions() { + // Sanity check + assertEquals(1, NANOSECONDS.toNanos(1)); + assertEquals(1000L * NANOSECONDS.toNanos(1), MICROSECONDS.toNanos(1)); + assertEquals(1000L * MICROSECONDS.toNanos(1), MILLISECONDS.toNanos(1)); + assertEquals(1000L * MILLISECONDS.toNanos(1), SECONDS.toNanos(1)); + assertEquals(60L * SECONDS.toNanos(1), MINUTES.toNanos(1)); + assertEquals(60L * MINUTES.toNanos(1), HOURS.toNanos(1)); + assertEquals(24L * HOURS.toNanos(1), DAYS.toNanos(1)); - /** - * toSeconds correctly converts sample values in different units to - * seconds - */ - public void testToSeconds() { - for (long t = 0; t < 88888; ++t) { - assertEquals(t*60*60*24, - DAYS.toSeconds(t)); - assertEquals(t*60*60, - HOURS.toSeconds(t)); - assertEquals(t*60, - MINUTES.toSeconds(t)); - assertEquals(t, - SECONDS.toSeconds(t)); - assertEquals(t, - MILLISECONDS.toSeconds(t*1000L)); - assertEquals(t, - MICROSECONDS.toSeconds(t*1000000L)); - assertEquals(t, - NANOSECONDS.toSeconds(t*1000000000L)); + for (TimeUnit x : TimeUnit.values()) { + assertEquals(x.toNanos(1), NANOSECONDS.convert(1, x)); } - } - - /** - * toMinutes correctly converts sample values in different units to - * minutes - */ - public void testToMinutes() { - for (long t = 0; t < 88888; ++t) { - assertEquals(t*60*24, - DAYS.toMinutes(t)); - assertEquals(t*60, - HOURS.toMinutes(t)); - assertEquals(t, - MINUTES.toMinutes(t)); - assertEquals(t, - SECONDS.toMinutes(t*60)); - assertEquals(t, - MILLISECONDS.toMinutes(t*1000L*60)); - assertEquals(t, - MICROSECONDS.toMinutes(t*1000000L*60)); - assertEquals(t, - NANOSECONDS.toMinutes(t*1000000000L*60)); - } - } - /** - * toHours correctly converts sample values in different units to - * hours - */ - public void testToHours() { - for (long t = 0; t < 88888; ++t) { - assertEquals(t*24, - DAYS.toHours(t)); - assertEquals(t, - HOURS.toHours(t)); - assertEquals(t, - MINUTES.toHours(t*60)); - assertEquals(t, - SECONDS.toHours(t*60*60)); - assertEquals(t, - MILLISECONDS.toHours(t*1000L*60*60)); - assertEquals(t, - MICROSECONDS.toHours(t*1000000L*60*60)); - assertEquals(t, - NANOSECONDS.toHours(t*1000000000L*60*60)); - } - } - - /** - * toDays correctly converts sample values in different units to - * days - */ - public void testToDays() { - for (long t = 0; t < 88888; ++t) { - assertEquals(t, - DAYS.toDays(t)); - assertEquals(t, - HOURS.toDays(t*24)); - assertEquals(t, - MINUTES.toDays(t*60*24)); - assertEquals(t, - SECONDS.toDays(t*60*60*24)); - assertEquals(t, - MILLISECONDS.toDays(t*1000L*60*60*24)); - assertEquals(t, - MICROSECONDS.toDays(t*1000000L*60*60*24)); - assertEquals(t, - NANOSECONDS.toDays(t*1000000000L*60*60*24)); - } + for (TimeUnit x : TimeUnit.values()) + for (TimeUnit y : TimeUnit.values()) + if (x.toNanos(1) >= y.toNanos(1)) + testConversion(x, y); } /** @@ -494,14 +307,21 @@ * toString returns name of unit */ public void testToString() { + assertEquals("NANOSECONDS", NANOSECONDS.toString()); + assertEquals("MICROSECONDS", MICROSECONDS.toString()); + assertEquals("MILLISECONDS", MILLISECONDS.toString()); assertEquals("SECONDS", SECONDS.toString()); + assertEquals("MINUTES", MINUTES.toString()); + assertEquals("HOURS", HOURS.toString()); + assertEquals("DAYS", DAYS.toString()); } /** * name returns name of unit */ public void testName() { - assertEquals("SECONDS", SECONDS.name()); + for (TimeUnit x : TimeUnit.values()) + assertEquals(x.toString(), x.name()); } /** @@ -512,10 +332,8 @@ Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { Object o = new Object(); - TimeUnit tu = MILLISECONDS; - try { - tu.timedWait(o, LONG_DELAY_MS); + MILLISECONDS.timedWait(o, LONGER_DELAY_MS); threadShouldThrow(); } catch (IllegalMonitorStateException success) {} }}); @@ -531,12 +349,11 @@ Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { Object o = new Object(); - TimeUnit tu = MILLISECONDS; Thread.currentThread().interrupt(); try { synchronized (o) { - tu.timedWait(o, LONG_DELAY_MS); + MILLISECONDS.timedWait(o, LONGER_DELAY_MS); } shouldThrow(); } catch (InterruptedException success) {} @@ -545,7 +362,7 @@ pleaseInterrupt.countDown(); try { synchronized (o) { - tu.timedWait(o, LONG_DELAY_MS); + MILLISECONDS.timedWait(o, LONGER_DELAY_MS); } shouldThrow(); } catch (InterruptedException success) {} @@ -553,7 +370,7 @@ }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } @@ -565,28 +382,27 @@ final CountDownLatch pleaseInterrupt = new CountDownLatch(1); final Thread s = newStartedThread(new CheckedInterruptedRunnable() { public void realRun() throws InterruptedException { - Thread.sleep(LONG_DELAY_MS); + Thread.sleep(LONGER_DELAY_MS); }}); final Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - TimeUnit tu = MILLISECONDS; Thread.currentThread().interrupt(); try { - tu.timedJoin(s, LONG_DELAY_MS); + MILLISECONDS.timedJoin(s, LONGER_DELAY_MS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - tu.timedJoin(s, LONG_DELAY_MS); + MILLISECONDS.timedJoin(s, LONGER_DELAY_MS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); s.interrupt(); @@ -594,35 +410,46 @@ } /** - * timedSleep throws InterruptedException when interrupted + * timeUnit.sleep throws InterruptedException when interrupted */ public void testTimedSleep_Interruptible() { final CountDownLatch pleaseInterrupt = new CountDownLatch(1); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() throws InterruptedException { - TimeUnit tu = MILLISECONDS; Thread.currentThread().interrupt(); try { - tu.sleep(LONG_DELAY_MS); + MILLISECONDS.sleep(LONGER_DELAY_MS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); pleaseInterrupt.countDown(); try { - tu.sleep(LONG_DELAY_MS); + MILLISECONDS.sleep(LONGER_DELAY_MS); shouldThrow(); } catch (InterruptedException success) {} assertFalse(Thread.interrupted()); }}); await(pleaseInterrupt); - assertThreadBlocks(t, Thread.State.TIMED_WAITING); + if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); t.interrupt(); awaitTermination(t); } /** + * timeUnit.sleep(x) for x <= 0 does not sleep at all. + */ + public void testTimedSleep_nonPositive() throws InterruptedException { + boolean interrupt = randomBoolean(); + if (interrupt) Thread.currentThread().interrupt(); + randomTimeUnit().sleep(0L); + randomTimeUnit().sleep(-1L); + randomTimeUnit().sleep(Long.MIN_VALUE); + if (interrupt) assertTrue(Thread.interrupted()); + } + + /** * a deserialized/reserialized unit is the same instance */ public void testSerialization() throws Exception {