1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent.locks; 37 38 import java.util.ArrayList; 39 import java.util.Collection; 40 import java.util.Date; 41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.ForkJoinPool; 43 import jdk.internal.misc.Unsafe; 44 45 /** 46 * Provides a framework for implementing blocking locks and related 47 * synchronizers (semaphores, events, etc) that rely on 48 * first-in-first-out (FIFO) wait queues. This class is designed to 49 * be a useful basis for most kinds of synchronizers that rely on a 50 * single atomic {@code int} value to represent state. Subclasses 51 * must define the protected methods that change this state, and which 52 * define what that state means in terms of this object being acquired 53 * or released. Given these, the other methods in this class carry 54 * out all queuing and blocking mechanics. Subclasses can maintain 55 * other state fields, but only the atomically updated {@code int} 56 * value manipulated using methods {@link #getState}, {@link 57 * #setState} and {@link #compareAndSetState} is tracked with respect 58 * to synchronization. 59 * 60 * <p>Subclasses should be defined as non-public internal helper 61 * classes that are used to implement the synchronization properties 62 * of their enclosing class. Class 63 * {@code AbstractQueuedSynchronizer} does not implement any 64 * synchronization interface. Instead it defines methods such as 65 * {@link #acquireInterruptibly} that can be invoked as 66 * appropriate by concrete locks and related synchronizers to 67 * implement their public methods. 68 * 69 * <p>This class supports either or both a default <em>exclusive</em> 70 * mode and a <em>shared</em> mode. When acquired in exclusive mode, 71 * attempted acquires by other threads cannot succeed. Shared mode 72 * acquires by multiple threads may (but need not) succeed. This class 73 * does not "understand" these differences except in the 74 * mechanical sense that when a shared mode acquire succeeds, the next 75 * waiting thread (if one exists) must also determine whether it can 76 * acquire as well. Threads waiting in the different modes share the 77 * same FIFO queue. Usually, implementation subclasses support only 78 * one of these modes, but both can come into play for example in a 79 * {@link ReadWriteLock}. Subclasses that support only exclusive or 80 * only shared modes need not define the methods supporting the unused mode. 81 * 82 * <p>This class defines a nested {@link ConditionObject} class that 83 * can be used as a {@link Condition} implementation by subclasses 84 * supporting exclusive mode for which method {@link 85 * #isHeldExclusively} reports whether synchronization is exclusively 86 * held with respect to the current thread, method {@link #release} 87 * invoked with the current {@link #getState} value fully releases 88 * this object, and {@link #acquire}, given this saved state value, 89 * eventually restores this object to its previous acquired state. No 90 * {@code AbstractQueuedSynchronizer} method otherwise creates such a 91 * condition, so if this constraint cannot be met, do not use it. The 92 * behavior of {@link ConditionObject} depends of course on the 93 * semantics of its synchronizer implementation. 94 * 95 * <p>This class provides inspection, instrumentation, and monitoring 96 * methods for the internal queue, as well as similar methods for 97 * condition objects. These can be exported as desired into classes 98 * using an {@code AbstractQueuedSynchronizer} for their 99 * synchronization mechanics. 100 * 101 * <p>Serialization of this class stores only the underlying atomic 102 * integer maintaining state, so deserialized objects have empty 103 * thread queues. Typical subclasses requiring serializability will 104 * define a {@code readObject} method that restores this to a known 105 * initial state upon deserialization. 106 * 107 * <h2>Usage</h2> 108 * 109 * <p>To use this class as the basis of a synchronizer, redefine the 110 * following methods, as applicable, by inspecting and/or modifying 111 * the synchronization state using {@link #getState}, {@link 112 * #setState} and/or {@link #compareAndSetState}: 113 * 114 * <ul> 115 * <li>{@link #tryAcquire} 116 * <li>{@link #tryRelease} 117 * <li>{@link #tryAcquireShared} 118 * <li>{@link #tryReleaseShared} 119 * <li>{@link #isHeldExclusively} 120 * </ul> 121 * 122 * Each of these methods by default throws {@link 123 * UnsupportedOperationException}. Implementations of these methods 124 * must be internally thread-safe, and should in general be short and 125 * not block. Defining these methods is the <em>only</em> supported 126 * means of using this class. All other methods are declared 127 * {@code final} because they cannot be independently varied. 128 * 129 * <p>You may also find the inherited methods from {@link 130 * AbstractOwnableSynchronizer} useful to keep track of the thread 131 * owning an exclusive synchronizer. You are encouraged to use them 132 * -- this enables monitoring and diagnostic tools to assist users in 133 * determining which threads hold locks. 134 * 135 * <p>Even though this class is based on an internal FIFO queue, it 136 * does not automatically enforce FIFO acquisition policies. The core 137 * of exclusive synchronization takes the form: 138 * 139 * <pre> 140 * Acquire: 141 * while (!tryAcquire(arg)) { 142 * <em>enqueue thread if it is not already queued</em>; 143 * <em>possibly block current thread</em>; 144 * } 145 * 146 * Release: 147 * if (tryRelease(arg)) 148 * <em>unblock the first queued thread</em>; 149 * </pre> 150 * 151 * (Shared mode is similar but may involve cascading signals.) 152 * 153 * <p id="barging">Because checks in acquire are invoked before 154 * enqueuing, a newly acquiring thread may <em>barge</em> ahead of 155 * others that are blocked and queued. However, you can, if desired, 156 * define {@code tryAcquire} and/or {@code tryAcquireShared} to 157 * disable barging by internally invoking one or more of the inspection 158 * methods, thereby providing a <em>fair</em> FIFO acquisition order. 159 * In particular, most fair synchronizers can define {@code tryAcquire} 160 * to return {@code false} if {@link #hasQueuedPredecessors} (a method 161 * specifically designed to be used by fair synchronizers) returns 162 * {@code true}. Other variations are possible. 163 * 164 * <p>Throughput and scalability are generally highest for the 165 * default barging (also known as <em>greedy</em>, 166 * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy. 167 * While this is not guaranteed to be fair or starvation-free, earlier 168 * queued threads are allowed to recontend before later queued 169 * threads, and each recontention has an unbiased chance to succeed 170 * against incoming threads. Also, while acquires do not 171 * "spin" in the usual sense, they may perform multiple 172 * invocations of {@code tryAcquire} interspersed with other 173 * computations before blocking. This gives most of the benefits of 174 * spins when exclusive synchronization is only briefly held, without 175 * most of the liabilities when it isn't. If so desired, you can 176 * augment this by preceding calls to acquire methods with 177 * "fast-path" checks, possibly prechecking {@link #hasContended} 178 * and/or {@link #hasQueuedThreads} to only do so if the synchronizer 179 * is likely not to be contended. 180 * 181 * <p>This class provides an efficient and scalable basis for 182 * synchronization in part by specializing its range of use to 183 * synchronizers that can rely on {@code int} state, acquire, and 184 * release parameters, and an internal FIFO wait queue. When this does 185 * not suffice, you can build synchronizers from a lower level using 186 * {@link java.util.concurrent.atomic atomic} classes, your own custom 187 * {@link java.util.Queue} classes, and {@link LockSupport} blocking 188 * support. 189 * 190 * <h2>Usage Examples</h2> 191 * 192 * <p>Here is a non-reentrant mutual exclusion lock class that uses 193 * the value zero to represent the unlocked state, and one to 194 * represent the locked state. While a non-reentrant lock 195 * does not strictly require recording of the current owner 196 * thread, this class does so anyway to make usage easier to monitor. 197 * It also supports conditions and exposes some instrumentation methods: 198 * 199 * <pre> {@code 200 * class Mutex implements Lock, java.io.Serializable { 201 * 202 * // Our internal helper class 203 * private static class Sync extends AbstractQueuedSynchronizer { 204 * // Acquires the lock if state is zero 205 * public boolean tryAcquire(int acquires) { 206 * assert acquires == 1; // Otherwise unused 207 * if (compareAndSetState(0, 1)) { 208 * setExclusiveOwnerThread(Thread.currentThread()); 209 * return true; 210 * } 211 * return false; 212 * } 213 * 214 * // Releases the lock by setting state to zero 215 * protected boolean tryRelease(int releases) { 216 * assert releases == 1; // Otherwise unused 217 * if (!isHeldExclusively()) 218 * throw new IllegalMonitorStateException(); 219 * setExclusiveOwnerThread(null); 220 * setState(0); 221 * return true; 222 * } 223 * 224 * // Reports whether in locked state 225 * public boolean isLocked() { 226 * return getState() != 0; 227 * } 228 * 229 * public boolean isHeldExclusively() { 230 * // a data race, but safe due to out-of-thin-air guarantees 231 * return getExclusiveOwnerThread() == Thread.currentThread(); 232 * } 233 * 234 * // Provides a Condition 235 * public Condition newCondition() { 236 * return new ConditionObject(); 237 * } 238 * 239 * // Deserializes properly 240 * private void readObject(ObjectInputStream s) 241 * throws IOException, ClassNotFoundException { 242 * s.defaultReadObject(); 243 * setState(0); // reset to unlocked state 244 * } 245 * } 246 * 247 * // The sync object does all the hard work. We just forward to it. 248 * private final Sync sync = new Sync(); 249 * 250 * public void lock() { sync.acquire(1); } 251 * public boolean tryLock() { return sync.tryAcquire(1); } 252 * public void unlock() { sync.release(1); } 253 * public Condition newCondition() { return sync.newCondition(); } 254 * public boolean isLocked() { return sync.isLocked(); } 255 * public boolean isHeldByCurrentThread() { 256 * return sync.isHeldExclusively(); 257 * } 258 * public boolean hasQueuedThreads() { 259 * return sync.hasQueuedThreads(); 260 * } 261 * public void lockInterruptibly() throws InterruptedException { 262 * sync.acquireInterruptibly(1); 263 * } 264 * public boolean tryLock(long timeout, TimeUnit unit) 265 * throws InterruptedException { 266 * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 267 * } 268 * }}</pre> 269 * 270 * <p>Here is a latch class that is like a 271 * {@link java.util.concurrent.CountDownLatch CountDownLatch} 272 * except that it only requires a single {@code signal} to 273 * fire. Because a latch is non-exclusive, it uses the {@code shared} 274 * acquire and release methods. 275 * 276 * <pre> {@code 277 * class BooleanLatch { 278 * 279 * private static class Sync extends AbstractQueuedSynchronizer { 280 * boolean isSignalled() { return getState() != 0; } 281 * 282 * protected int tryAcquireShared(int ignore) { 283 * return isSignalled() ? 1 : -1; 284 * } 285 * 286 * protected boolean tryReleaseShared(int ignore) { 287 * setState(1); 288 * return true; 289 * } 290 * } 291 * 292 * private final Sync sync = new Sync(); 293 * public boolean isSignalled() { return sync.isSignalled(); } 294 * public void signal() { sync.releaseShared(1); } 295 * public void await() throws InterruptedException { 296 * sync.acquireSharedInterruptibly(1); 297 * } 298 * }}</pre> 299 * 300 * @since 1.5 301 * @author Doug Lea 302 */ 303 public abstract class AbstractQueuedSynchronizer 304 extends AbstractOwnableSynchronizer 305 implements java.io.Serializable { 306 307 private static final long serialVersionUID = 7373984972572414691L; 308 309 /** 310 * Creates a new {@code AbstractQueuedSynchronizer} instance 311 * with initial synchronization state of zero. 312 */ 313 protected AbstractQueuedSynchronizer() { } 314 315 /* 316 * Overview. 317 * 318 * The wait queue is a variant of a "CLH" (Craig, Landin, and 319 * Hagersten) lock queue. CLH locks are normally used for 320 * spinlocks. We instead use them for blocking synchronizers by 321 * including explicit ("prev" and "next") links plus a "status" 322 * field that allow nodes to signal successors when releasing 323 * locks, and handle cancellation due to interrupts and timeouts. 324 * The status field includes bits that track whether a thread 325 * needs a signal (using LockSupport.unpark). Despite these 326 * additions, we maintain most CLH locality properties. 327 * 328 * To enqueue into a CLH lock, you atomically splice it in as new 329 * tail. To dequeue, you set the head field, so the next eligible 330 * waiter becomes first. 331 * 332 * +------+ prev +-------+ +------+ 333 * | head | <---- | first | <---- | tail | 334 * +------+ +-------+ +------+ 335 * 336 * Insertion into a CLH queue requires only a single atomic 337 * operation on "tail", so there is a simple point of demarcation 338 * from unqueued to queued. The "next" link of the predecessor is 339 * set by the enqueuing thread after successful CAS. Even though 340 * non-atomic, this suffices to ensure that any blocked thread is 341 * signalled by a predecessor when eligible (although in the case 342 * of cancellation, possibly with the assistance of a signal in 343 * method cleanQueue). Signalling is based in part on a 344 * Dekker-like scheme in which the to-be waiting thread indicates 345 * WAITING status, then retries acquiring, and then rechecks 346 * status before blocking. The signaller atomically clears WAITING 347 * status when unparking. 348 * 349 * Dequeuing on acquire involves detaching (nulling) a node's 350 * "prev" node and then updating the "head". Other threads check 351 * if a node is or was dequeued by checking "prev" rather than 352 * head. We enforce the nulling then setting order by spin-waiting 353 * if necessary. Because of this, the lock algorithm is not itself 354 * strictly "lock-free" because an acquiring thread may need to 355 * wait for a previous acquire to make progress. When used with 356 * exclusive locks, such progress is required anyway. However 357 * Shared mode may (uncommonly) require a spin-wait before 358 * setting head field to ensure proper propagation. (Historical 359 * note: This allows some simplifications and efficiencies 360 * compared to previous versions of this class.) 361 * 362 * A node's predecessor can change due to cancellation while it is 363 * waiting, until the node is first in queue, at which point it 364 * cannot change. The acquire methods cope with this by rechecking 365 * "prev" before waiting. The prev and next fields are modified 366 * only via CAS by cancelled nodes in method cleanQueue. The 367 * unsplice strategy is reminiscent of Michael-Scott queues in 368 * that after a successful CAS to prev field, other threads help 369 * fix next fields. Because cancellation often occurs in bunches 370 * that complicate decisions about necessary signals, each call to 371 * cleanQueue traverses the queue until a clean sweep. Nodes that 372 * become relinked as first are unconditionally unparked 373 * (sometimes unnecessarily, but those cases are not worth 374 * avoiding). 375 * 376 * A thread may try to acquire if it is first (frontmost) in the 377 * queue, and sometimes before. Being first does not guarantee 378 * success; it only gives the right to contend. We balance 379 * throughput, overhead, and fairness by allowing incoming threads 380 * to "barge" and acquire the synchronizer while in the process of 381 * enqueuing, in which case an awakened first thread may need to 382 * rewait. To counteract possible repeated unlucky rewaits, we 383 * exponentially increase retries (up to 256) to acquire each time 384 * a thread is unparked. Except in this case, AQS locks do not 385 * spin; they instead interleave attempts to acquire with 386 * bookkeeping steps. (Users who want spinlocks can use 387 * tryAcquire.) 388 * 389 * To improve garbage collectibility, fields of nodes not yet on 390 * list are null. (It is not rare to create and then throw away a 391 * node without using it.) Fields of nodes coming off the list are 392 * nulled out as soon as possible. This accentuates the challenge 393 * of externally determining the first waiting thread (as in 394 * method getFirstQueuedThread). This sometimes requires the 395 * fallback of traversing backwards from the atomically updated 396 * "tail" when fields appear null. (This is never needed in the 397 * process of signalling though.) 398 * 399 * CLH queues need a dummy header node to get started. But 400 * we don't create them on construction, because it would be wasted 401 * effort if there is never contention. Instead, the node 402 * is constructed and head and tail pointers are set upon first 403 * contention. 404 * 405 * Shared mode operations differ from Exclusive in that an acquire 406 * signals the next waiter to try to acquire if it is also 407 * Shared. The tryAcquireShared API allows users to indicate the 408 * degree of propagation, but in most applications, it is more 409 * efficient to ignore this, allowing the successor to try 410 * acquiring in any case. 411 * 412 * Threads waiting on Conditions use nodes with an additional 413 * link to maintain the (FIFO) list of conditions. Conditions only 414 * need to link nodes in simple (non-concurrent) linked queues 415 * because they are only accessed when exclusively held. Upon 416 * await, a node is inserted into a condition queue. Upon signal, 417 * the node is enqueued on the main queue. A special status field 418 * value is used to track and atomically trigger this. 419 * 420 * Accesses to fields head, tail, and state use full Volatile 421 * mode, along with CAS. Node fields status, prev and next also do 422 * so while threads may be signallable, but sometimes use weaker 423 * modes otherwise. Accesses to field "waiter" (the thread to be 424 * signalled) are always sandwiched between other atomic accesses 425 * so are used in Plain mode. We use jdk.internal Unsafe versions 426 * of atomic access methods rather than VarHandles to avoid 427 * potential VM bootstrap issues. 428 * 429 * Most of the above is performed by primary internal method 430 * acquire, that is invoked in some way by all exported acquire 431 * methods. (It is usually easy for compilers to optimize 432 * call-site specializations when heavily used.) 433 * 434 * There are several arbitrary decisions about when and how to 435 * check interrupts in both acquire and await before and/or after 436 * blocking. The decisions are less arbitrary in implementation 437 * updates because some users appear to rely on original behaviors 438 * in ways that are racy and so (rarely) wrong in general but hard 439 * to justify changing. 440 * 441 * Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill 442 * Scherer and Michael Scott, along with members of JSR-166 443 * expert group, for helpful ideas, discussions, and critiques 444 * on the design of this class. 445 */ 446 447 // Node status bits, also used as argument and return values 448 static final int WAITING = 1; // must be 1 449 static final int CANCELLED = 0x80000000; // must be negative 450 static final int COND = 2; // in a condition wait 451 452 /** CLH Nodes */ 453 abstract static class Node { 454 volatile Node prev; // initially attached via casTail 455 volatile Node next; // visibly nonnull when signallable 456 Thread waiter; // visibly nonnull when enqueued 457 volatile int status; // written by owner, atomic bit ops by others 458 459 // methods for atomic operations 460 final boolean casPrev(Node c, Node v) { // for cleanQueue 461 return U.weakCompareAndSetReference(this, PREV, c, v); 462 } 463 final boolean casNext(Node c, Node v) { // for cleanQueue 464 return U.weakCompareAndSetReference(this, NEXT, c, v); 465 } 466 final int getAndUnsetStatus(int v) { // for signalling 467 return U.getAndBitwiseAndInt(this, STATUS, ~v); 468 } 469 final void setPrevRelaxed(Node p) { // for off-queue assignment 470 U.putReference(this, PREV, p); 471 } 472 final void setStatusRelaxed(int s) { // for off-queue assignment 473 U.putInt(this, STATUS, s); 474 } 475 final void clearStatus() { // for reducing unneeded signals 476 U.putIntOpaque(this, STATUS, 0); 477 } 478 479 private static final long STATUS 480 = U.objectFieldOffset(Node.class, "status"); 481 private static final long NEXT 482 = U.objectFieldOffset(Node.class, "next"); 483 private static final long PREV 484 = U.objectFieldOffset(Node.class, "prev"); 485 } 486 487 // Concrete classes tagged by type 488 static final class ExclusiveNode extends Node { } 489 static final class SharedNode extends Node { } 490 491 static final class ConditionNode extends Node 492 implements ForkJoinPool.ManagedBlocker { 493 ConditionNode nextWaiter; // link to next waiting node 494 495 /** 496 * Allows Conditions to be used in ForkJoinPools without 497 * risking fixed pool exhaustion. This is usable only for 498 * untimed Condition waits, not timed versions. 499 */ 500 public final boolean isReleasable() { 501 return status <= 1 || Thread.currentThread().isInterrupted(); 502 } 503 504 public final boolean block() { 505 while (!isReleasable()) LockSupport.park(this); 506 return true; 507 } 508 } 509 510 /** 511 * Head of the wait queue, lazily initialized. 512 */ 513 private transient volatile Node head; 514 515 /** 516 * Tail of the wait queue. After initialization, modified only via casTail. 517 */ 518 private transient volatile Node tail; 519 520 /** 521 * The synchronization state. 522 */ 523 private volatile int state; 524 525 /** 526 * Returns the current value of synchronization state. 527 * This operation has memory semantics of a {@code volatile} read. 528 * @return current state value 529 */ 530 protected final int getState() { 531 return state; 532 } 533 534 /** 535 * Sets the value of synchronization state. 536 * This operation has memory semantics of a {@code volatile} write. 537 * @param newState the new state value 538 */ 539 protected final void setState(int newState) { 540 state = newState; 541 } 542 543 /** 544 * Atomically sets synchronization state to the given updated 545 * value if the current state value equals the expected value. 546 * This operation has memory semantics of a {@code volatile} read 547 * and write. 548 * 549 * @param expect the expected value 550 * @param update the new value 551 * @return {@code true} if successful. False return indicates that the actual 552 * value was not equal to the expected value. 553 */ 554 protected final boolean compareAndSetState(int expect, int update) { 555 return U.compareAndSetInt(this, STATE, expect, update); 556 } 557 558 // Queuing utilities 559 560 private boolean casTail(Node c, Node v) { 561 return U.compareAndSetReference(this, TAIL, c, v); 562 } 563 564 /** tries once to CAS a new dummy node for head */ 565 private void tryInitializeHead() { 566 Node h = new ExclusiveNode(); 567 if (U.compareAndSetReference(this, HEAD, null, h)) 568 tail = h; 569 } 570 571 /** 572 * Enqueues the node unless null. (Currently used only for 573 * ConditionNodes; other cases are interleaved with acquires.) 574 */ 575 final void enqueue(Node node) { 576 if (node != null) { 577 for (;;) { 578 Node t = tail; 579 node.setPrevRelaxed(t); // avoid unnecessary fence 580 if (t == null) // initialize 581 tryInitializeHead(); 582 else if (casTail(t, node)) { 583 t.next = node; 584 if (t.status < 0) // wake up to clean link 585 LockSupport.unpark(node.waiter); 586 break; 587 } 588 } 589 } 590 } 591 592 /** Returns true if node is found in traversal from tail */ 593 final boolean isEnqueued(Node node) { 594 for (Node t = tail; t != null; t = t.prev) 595 if (t == node) 596 return true; 597 return false; 598 } 599 600 /** 601 * Wakes up the successor of given node, if one exists, and unsets its 602 * WAITING status to avoid park race. This may fail to wake up an 603 * eligible thread when one or more have been cancelled, but 604 * cancelAcquire ensures liveness. 605 */ 606 private static void signalNext(Node h) { 607 Node s; 608 if (h != null && (s = h.next) != null && s.status != 0) { 609 s.getAndUnsetStatus(WAITING); 610 LockSupport.unpark(s.waiter); 611 } 612 } 613 614 /** Wakes up the given node if in shared mode */ 615 private static void signalNextIfShared(Node h) { 616 Node s; 617 if (h != null && (s = h.next) != null && 618 (s instanceof SharedNode) && s.status != 0) { 619 s.getAndUnsetStatus(WAITING); 620 LockSupport.unpark(s.waiter); 621 } 622 } 623 624 /** 625 * Main acquire method, invoked by all exported acquire methods. 626 * 627 * @param node null unless a reacquiring Condition 628 * @param arg the acquire argument 629 * @param shared true if shared mode else exclusive 630 * @param interruptible if abort and return negative on interrupt 631 * @param timed if true use timed waits 632 * @param time if timed, the System.nanoTime value to timeout 633 * @return positive if acquired, 0 if timed out, negative if interrupted 634 */ 635 final int acquire(Node node, int arg, boolean shared, 636 boolean interruptible, boolean timed, long time) { 637 Thread current = Thread.currentThread(); 638 byte spins = 0, postSpins = 0; // retries upon unpark of first thread 639 boolean interrupted = false, first = false; 640 Node pred = null; // predecessor of node when enqueued 641 642 /* 643 * Repeatedly: 644 * Check if node now first 645 * if so, ensure head stable, else ensure valid predecessor 646 * if node is first or not yet enqueued, try acquiring 647 * else if node not yet created, create it 648 * else if not yet enqueued, try once to enqueue 649 * else if woken from park, retry (up to postSpins times) 650 * else if WAITING status not set, set and retry 651 * else park and clear WAITING status, and check cancellation 652 */ 653 654 for (;;) { 655 if (!first && (pred = (node == null) ? null : node.prev) != null && 656 !(first = (head == pred))) { 657 if (pred.status < 0) { 658 cleanQueue(); // predecessor cancelled 659 continue; 660 } else if (pred.prev == null) { 661 Thread.onSpinWait(); // ensure serialization 662 continue; 663 } 664 } 665 if (first || pred == null) { 666 boolean acquired; 667 try { 668 if (shared) 669 acquired = (tryAcquireShared(arg) >= 0); 670 else 671 acquired = tryAcquire(arg); 672 } catch (Throwable ex) { 673 cancelAcquire(node, interrupted, false); 674 throw ex; 675 } 676 if (acquired) { 677 if (first) { 678 node.prev = null; 679 head = node; 680 pred.next = null; 681 node.waiter = null; 682 if (shared) 683 signalNextIfShared(node); 684 if (interrupted) 685 current.interrupt(); 686 } 687 return 1; 688 } 689 } 690 if (node == null) { // allocate; retry before enqueue 691 if (shared) 692 node = new SharedNode(); 693 else 694 node = new ExclusiveNode(); 695 } else if (pred == null) { // try to enqueue 696 node.waiter = current; 697 Node t = tail; 698 node.setPrevRelaxed(t); // avoid unnecessary fence 699 if (t == null) 700 tryInitializeHead(); 701 else if (!casTail(t, node)) 702 node.setPrevRelaxed(null); // back out 703 else 704 t.next = node; 705 } else if (first && spins != 0) { 706 --spins; // reduce unfairness on rewaits 707 Thread.onSpinWait(); 708 } else if (node.status == 0) { 709 node.status = WAITING; // enable signal and recheck 710 } else { 711 long nanos; 712 spins = postSpins = (byte)((postSpins << 1) | 1); 713 if (!timed) 714 LockSupport.park(this); 715 else if ((nanos = time - System.nanoTime()) > 0L) 716 LockSupport.parkNanos(this, nanos); 717 else 718 break; 719 node.clearStatus(); 720 if ((interrupted |= Thread.interrupted()) && interruptible) 721 break; 722 } 723 } 724 return cancelAcquire(node, interrupted, interruptible); 725 } 726 727 /** 728 * Possibly repeatedly traverses from tail, unsplicing cancelled 729 * nodes until none are found. Unparks nodes that may have been 730 * relinked to be next eligible acquirer. 731 */ 732 private void cleanQueue() { 733 for (;;) { // restart point 734 for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples 735 if (q == null || (p = q.prev) == null) 736 return; // end of list 737 if (s == null ? tail != q : (s.prev != q || s.status < 0)) 738 break; // inconsistent 739 if (q.status < 0) { // cancelled 740 if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && 741 q.prev == p) { 742 p.casNext(q, s); // OK if fails 743 if (p.prev == null) 744 signalNext(p); 745 } 746 break; 747 } 748 if ((n = p.next) != q) { // help finish 749 if (n != null && q.prev == p) { 750 p.casNext(n, q); 751 if (p.prev == null) 752 signalNext(p); 753 } 754 break; 755 } 756 s = q; 757 q = q.prev; 758 } 759 } 760 } 761 762 /** 763 * Cancels an ongoing attempt to acquire. 764 * 765 * @param node the node (may be null if cancelled before enqueuing) 766 * @param interrupted true if thread interrupted 767 * @param interruptible if should report interruption vs reset 768 */ 769 private int cancelAcquire(Node node, boolean interrupted, 770 boolean interruptible) { 771 if (node != null) { 772 node.waiter = null; 773 node.status = CANCELLED; 774 if (node.prev != null) 775 cleanQueue(); 776 } 777 if (interrupted) { 778 if (interruptible) 779 return CANCELLED; 780 else 781 Thread.currentThread().interrupt(); 782 } 783 return 0; 784 } 785 786 // Main exported methods 787 788 /** 789 * Attempts to acquire in exclusive mode. This method should query 790 * if the state of the object permits it to be acquired in the 791 * exclusive mode, and if so to acquire it. 792 * 793 * <p>This method is always invoked by the thread performing 794 * acquire. If this method reports failure, the acquire method 795 * may queue the thread, if it is not already queued, until it is 796 * signalled by a release from some other thread. This can be used 797 * to implement method {@link Lock#tryLock()}. 798 * 799 * <p>The default 800 * implementation throws {@link UnsupportedOperationException}. 801 * 802 * @param arg the acquire argument. This value is always the one 803 * passed to an acquire method, or is the value saved on entry 804 * to a condition wait. The value is otherwise uninterpreted 805 * and can represent anything you like. 806 * @return {@code true} if successful. Upon success, this object has 807 * been acquired. 808 * @throws IllegalMonitorStateException if acquiring would place this 809 * synchronizer in an illegal state. This exception must be 810 * thrown in a consistent fashion for synchronization to work 811 * correctly. 812 * @throws UnsupportedOperationException if exclusive mode is not supported 813 */ 814 protected boolean tryAcquire(int arg) { 815 throw new UnsupportedOperationException(); 816 } 817 818 /** 819 * Attempts to set the state to reflect a release in exclusive 820 * mode. 821 * 822 * <p>This method is always invoked by the thread performing release. 823 * 824 * <p>The default implementation throws 825 * {@link UnsupportedOperationException}. 826 * 827 * @param arg the release argument. This value is always the one 828 * passed to a release method, or the current state value upon 829 * entry to a condition wait. The value is otherwise 830 * uninterpreted and can represent anything you like. 831 * @return {@code true} if this object is now in a fully released 832 * state, so that any waiting threads may attempt to acquire; 833 * and {@code false} otherwise. 834 * @throws IllegalMonitorStateException if releasing would place this 835 * synchronizer in an illegal state. This exception must be 836 * thrown in a consistent fashion for synchronization to work 837 * correctly. 838 * @throws UnsupportedOperationException if exclusive mode is not supported 839 */ 840 protected boolean tryRelease(int arg) { 841 throw new UnsupportedOperationException(); 842 } 843 844 /** 845 * Attempts to acquire in shared mode. This method should query if 846 * the state of the object permits it to be acquired in the shared 847 * mode, and if so to acquire it. 848 * 849 * <p>This method is always invoked by the thread performing 850 * acquire. If this method reports failure, the acquire method 851 * may queue the thread, if it is not already queued, until it is 852 * signalled by a release from some other thread. 853 * 854 * <p>The default implementation throws {@link 855 * UnsupportedOperationException}. 856 * 857 * @param arg the acquire argument. This value is always the one 858 * passed to an acquire method, or is the value saved on entry 859 * to a condition wait. The value is otherwise uninterpreted 860 * and can represent anything you like. 861 * @return a negative value on failure; zero if acquisition in shared 862 * mode succeeded but no subsequent shared-mode acquire can 863 * succeed; and a positive value if acquisition in shared 864 * mode succeeded and subsequent shared-mode acquires might 865 * also succeed, in which case a subsequent waiting thread 866 * must check availability. (Support for three different 867 * return values enables this method to be used in contexts 868 * where acquires only sometimes act exclusively.) Upon 869 * success, this object has been acquired. 870 * @throws IllegalMonitorStateException if acquiring would place this 871 * synchronizer in an illegal state. This exception must be 872 * thrown in a consistent fashion for synchronization to work 873 * correctly. 874 * @throws UnsupportedOperationException if shared mode is not supported 875 */ 876 protected int tryAcquireShared(int arg) { 877 throw new UnsupportedOperationException(); 878 } 879 880 /** 881 * Attempts to set the state to reflect a release in shared mode. 882 * 883 * <p>This method is always invoked by the thread performing release. 884 * 885 * <p>The default implementation throws 886 * {@link UnsupportedOperationException}. 887 * 888 * @param arg the release argument. This value is always the one 889 * passed to a release method, or the current state value upon 890 * entry to a condition wait. The value is otherwise 891 * uninterpreted and can represent anything you like. 892 * @return {@code true} if this release of shared mode may permit a 893 * waiting acquire (shared or exclusive) to succeed; and 894 * {@code false} otherwise 895 * @throws IllegalMonitorStateException if releasing would place this 896 * synchronizer in an illegal state. This exception must be 897 * thrown in a consistent fashion for synchronization to work 898 * correctly. 899 * @throws UnsupportedOperationException if shared mode is not supported 900 */ 901 protected boolean tryReleaseShared(int arg) { 902 throw new UnsupportedOperationException(); 903 } 904 905 /** 906 * Returns {@code true} if synchronization is held exclusively with 907 * respect to the current (calling) thread. This method is invoked 908 * upon each call to a {@link ConditionObject} method. 909 * 910 * <p>The default implementation throws {@link 911 * UnsupportedOperationException}. This method is invoked 912 * internally only within {@link ConditionObject} methods, so need 913 * not be defined if conditions are not used. 914 * 915 * @return {@code true} if synchronization is held exclusively; 916 * {@code false} otherwise 917 * @throws UnsupportedOperationException if conditions are not supported 918 */ 919 protected boolean isHeldExclusively() { 920 throw new UnsupportedOperationException(); 921 } 922 923 /** 924 * Acquires in exclusive mode, ignoring interrupts. Implemented 925 * by invoking at least once {@link #tryAcquire}, 926 * returning on success. Otherwise the thread is queued, possibly 927 * repeatedly blocking and unblocking, invoking {@link 928 * #tryAcquire} until success. This method can be used 929 * to implement method {@link Lock#lock}. 930 * 931 * @param arg the acquire argument. This value is conveyed to 932 * {@link #tryAcquire} but is otherwise uninterpreted and 933 * can represent anything you like. 934 */ 935 public final void acquire(int arg) { 936 if (!tryAcquire(arg)) 937 acquire(null, arg, false, false, false, 0L); 938 } 939 940 /** 941 * Acquires in exclusive mode, aborting if interrupted. 942 * Implemented by first checking interrupt status, then invoking 943 * at least once {@link #tryAcquire}, returning on 944 * success. Otherwise the thread is queued, possibly repeatedly 945 * blocking and unblocking, invoking {@link #tryAcquire} 946 * until success or the thread is interrupted. This method can be 947 * used to implement method {@link Lock#lockInterruptibly}. 948 * 949 * @param arg the acquire argument. This value is conveyed to 950 * {@link #tryAcquire} but is otherwise uninterpreted and 951 * can represent anything you like. 952 * @throws InterruptedException if the current thread is interrupted 953 */ 954 public final void acquireInterruptibly(int arg) 955 throws InterruptedException { 956 if (Thread.interrupted() || 957 (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) 958 throw new InterruptedException(); 959 } 960 961 /** 962 * Attempts to acquire in exclusive mode, aborting if interrupted, 963 * and failing if the given timeout elapses. Implemented by first 964 * checking interrupt status, then invoking at least once {@link 965 * #tryAcquire}, returning on success. Otherwise, the thread is 966 * queued, possibly repeatedly blocking and unblocking, invoking 967 * {@link #tryAcquire} until success or the thread is interrupted 968 * or the timeout elapses. This method can be used to implement 969 * method {@link Lock#tryLock(long, TimeUnit)}. 970 * 971 * @param arg the acquire argument. This value is conveyed to 972 * {@link #tryAcquire} but is otherwise uninterpreted and 973 * can represent anything you like. 974 * @param nanosTimeout the maximum number of nanoseconds to wait 975 * @return {@code true} if acquired; {@code false} if timed out 976 * @throws InterruptedException if the current thread is interrupted 977 */ 978 public final boolean tryAcquireNanos(int arg, long nanosTimeout) 979 throws InterruptedException { 980 if (!Thread.interrupted()) { 981 if (tryAcquire(arg)) 982 return true; 983 if (nanosTimeout <= 0L) 984 return false; 985 int stat = acquire(null, arg, false, true, true, 986 System.nanoTime() + nanosTimeout); 987 if (stat > 0) 988 return true; 989 if (stat == 0) 990 return false; 991 } 992 throw new InterruptedException(); 993 } 994 995 /** 996 * Releases in exclusive mode. Implemented by unblocking one or 997 * more threads if {@link #tryRelease} returns true. 998 * This method can be used to implement method {@link Lock#unlock}. 999 * 1000 * @param arg the release argument. This value is conveyed to 1001 * {@link #tryRelease} but is otherwise uninterpreted and 1002 * can represent anything you like. 1003 * @return the value returned from {@link #tryRelease} 1004 */ 1005 public final boolean release(int arg) { 1006 if (tryRelease(arg)) { 1007 signalNext(head); 1008 return true; 1009 } 1010 return false; 1011 } 1012 1013 /** 1014 * Acquires in shared mode, ignoring interrupts. Implemented by 1015 * first invoking at least once {@link #tryAcquireShared}, 1016 * returning on success. Otherwise the thread is queued, possibly 1017 * repeatedly blocking and unblocking, invoking {@link 1018 * #tryAcquireShared} until success. 1019 * 1020 * @param arg the acquire argument. This value is conveyed to 1021 * {@link #tryAcquireShared} but is otherwise uninterpreted 1022 * and can represent anything you like. 1023 */ 1024 public final void acquireShared(int arg) { 1025 if (tryAcquireShared(arg) < 0) 1026 acquire(null, arg, true, false, false, 0L); 1027 } 1028 1029 /** 1030 * Acquires in shared mode, aborting if interrupted. Implemented 1031 * by first checking interrupt status, then invoking at least once 1032 * {@link #tryAcquireShared}, returning on success. Otherwise the 1033 * thread is queued, possibly repeatedly blocking and unblocking, 1034 * invoking {@link #tryAcquireShared} until success or the thread 1035 * is interrupted. 1036 * @param arg the acquire argument. 1037 * This value is conveyed to {@link #tryAcquireShared} but is 1038 * otherwise uninterpreted and can represent anything 1039 * you like. 1040 * @throws InterruptedException if the current thread is interrupted 1041 */ 1042 public final void acquireSharedInterruptibly(int arg) 1043 throws InterruptedException { 1044 if (Thread.interrupted() || 1045 (tryAcquireShared(arg) < 0 && 1046 acquire(null, arg, true, true, false, 0L) < 0)) 1047 throw new InterruptedException(); 1048 } 1049 1050 /** 1051 * Attempts to acquire in shared mode, aborting if interrupted, and 1052 * failing if the given timeout elapses. Implemented by first 1053 * checking interrupt status, then invoking at least once {@link 1054 * #tryAcquireShared}, returning on success. Otherwise, the 1055 * thread is queued, possibly repeatedly blocking and unblocking, 1056 * invoking {@link #tryAcquireShared} until success or the thread 1057 * is interrupted or the timeout elapses. 1058 * 1059 * @param arg the acquire argument. This value is conveyed to 1060 * {@link #tryAcquireShared} but is otherwise uninterpreted 1061 * and can represent anything you like. 1062 * @param nanosTimeout the maximum number of nanoseconds to wait 1063 * @return {@code true} if acquired; {@code false} if timed out 1064 * @throws InterruptedException if the current thread is interrupted 1065 */ 1066 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 1067 throws InterruptedException { 1068 if (!Thread.interrupted()) { 1069 if (tryAcquireShared(arg) >= 0) 1070 return true; 1071 if (nanosTimeout <= 0L) 1072 return false; 1073 int stat = acquire(null, arg, true, true, true, 1074 System.nanoTime() + nanosTimeout); 1075 if (stat > 0) 1076 return true; 1077 if (stat == 0) 1078 return false; 1079 } 1080 throw new InterruptedException(); 1081 } 1082 1083 /** 1084 * Releases in shared mode. Implemented by unblocking one or more 1085 * threads if {@link #tryReleaseShared} returns true. 1086 * 1087 * @param arg the release argument. This value is conveyed to 1088 * {@link #tryReleaseShared} but is otherwise uninterpreted 1089 * and can represent anything you like. 1090 * @return the value returned from {@link #tryReleaseShared} 1091 */ 1092 public final boolean releaseShared(int arg) { 1093 if (tryReleaseShared(arg)) { 1094 signalNext(head); 1095 return true; 1096 } 1097 return false; 1098 } 1099 1100 // Queue inspection methods 1101 1102 /** 1103 * Queries whether any threads are waiting to acquire. Note that 1104 * because cancellations due to interrupts and timeouts may occur 1105 * at any time, a {@code true} return does not guarantee that any 1106 * other thread will ever acquire. 1107 * 1108 * @return {@code true} if there may be other threads waiting to acquire 1109 */ 1110 public final boolean hasQueuedThreads() { 1111 for (Node p = tail, h = head; p != h && p != null; p = p.prev) 1112 if (p.status >= 0) 1113 return true; 1114 return false; 1115 } 1116 1117 /** 1118 * Queries whether any threads have ever contended to acquire this 1119 * synchronizer; that is, if an acquire method has ever blocked. 1120 * 1121 * <p>In this implementation, this operation returns in 1122 * constant time. 1123 * 1124 * @return {@code true} if there has ever been contention 1125 */ 1126 public final boolean hasContended() { 1127 return head != null; 1128 } 1129 1130 /** 1131 * Returns the first (longest-waiting) thread in the queue, or 1132 * {@code null} if no threads are currently queued. 1133 * 1134 * <p>In this implementation, this operation normally returns in 1135 * constant time, but may iterate upon contention if other threads are 1136 * concurrently modifying the queue. 1137 * 1138 * @return the first (longest-waiting) thread in the queue, or 1139 * {@code null} if no threads are currently queued 1140 */ 1141 public final Thread getFirstQueuedThread() { 1142 Thread first = null, w; Node h, s; 1143 if ((h = head) != null && ((s = h.next) == null || 1144 (first = s.waiter) == null || 1145 s.prev == null)) { 1146 // traverse from tail on stale reads 1147 for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) 1148 if ((w = p.waiter) != null) 1149 first = w; 1150 } 1151 return first; 1152 } 1153 1154 /** 1155 * Returns true if the given thread is currently queued. 1156 * 1157 * <p>This implementation traverses the queue to determine 1158 * presence of the given thread. 1159 * 1160 * @param thread the thread 1161 * @return {@code true} if the given thread is on the queue 1162 * @throws NullPointerException if the thread is null 1163 */ 1164 public final boolean isQueued(Thread thread) { 1165 if (thread == null) 1166 throw new NullPointerException(); 1167 for (Node p = tail; p != null; p = p.prev) 1168 if (p.waiter == thread) 1169 return true; 1170 return false; 1171 } 1172 1173 /** 1174 * Returns {@code true} if the apparent first queued thread, if one 1175 * exists, is waiting in exclusive mode. If this method returns 1176 * {@code true}, and the current thread is attempting to acquire in 1177 * shared mode (that is, this method is invoked from {@link 1178 * #tryAcquireShared}) then it is guaranteed that the current thread 1179 * is not the first queued thread. Used only as a heuristic in 1180 * ReentrantReadWriteLock. 1181 */ 1182 final boolean apparentlyFirstQueuedIsExclusive() { 1183 Node h, s; 1184 return (h = head) != null && (s = h.next) != null && 1185 !(s instanceof SharedNode) && s.waiter != null; 1186 } 1187 1188 /** 1189 * Queries whether any threads have been waiting to acquire longer 1190 * than the current thread. 1191 * 1192 * <p>An invocation of this method is equivalent to (but may be 1193 * more efficient than): 1194 * <pre> {@code 1195 * getFirstQueuedThread() != Thread.currentThread() 1196 * && hasQueuedThreads()}</pre> 1197 * 1198 * <p>Note that because cancellations due to interrupts and 1199 * timeouts may occur at any time, a {@code true} return does not 1200 * guarantee that some other thread will acquire before the current 1201 * thread. Likewise, it is possible for another thread to win a 1202 * race to enqueue after this method has returned {@code false}, 1203 * due to the queue being empty. 1204 * 1205 * <p>This method is designed to be used by a fair synchronizer to 1206 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 1207 * Such a synchronizer's {@link #tryAcquire} method should return 1208 * {@code false}, and its {@link #tryAcquireShared} method should 1209 * return a negative value, if this method returns {@code true} 1210 * (unless this is a reentrant acquire). For example, the {@code 1211 * tryAcquire} method for a fair, reentrant, exclusive mode 1212 * synchronizer might look like this: 1213 * 1214 * <pre> {@code 1215 * protected boolean tryAcquire(int arg) { 1216 * if (isHeldExclusively()) { 1217 * // A reentrant acquire; increment hold count 1218 * return true; 1219 * } else if (hasQueuedPredecessors()) { 1220 * return false; 1221 * } else { 1222 * // try to acquire normally 1223 * } 1224 * }}</pre> 1225 * 1226 * @return {@code true} if there is a queued thread preceding the 1227 * current thread, and {@code false} if the current thread 1228 * is at the head of the queue or the queue is empty 1229 * @since 1.7 1230 */ 1231 public final boolean hasQueuedPredecessors() { 1232 Thread first = null; Node h, s; 1233 if ((h = head) != null && ((s = h.next) == null || 1234 (first = s.waiter) == null || 1235 s.prev == null)) 1236 first = getFirstQueuedThread(); // retry via getFirstQueuedThread 1237 return first != null && first != Thread.currentThread(); 1238 } 1239 1240 // Instrumentation and monitoring methods 1241 1242 /** 1243 * Returns an estimate of the number of threads waiting to 1244 * acquire. The value is only an estimate because the number of 1245 * threads may change dynamically while this method traverses 1246 * internal data structures. This method is designed for use in 1247 * monitoring system state, not for synchronization control. 1248 * 1249 * @return the estimated number of threads waiting to acquire 1250 */ 1251 public final int getQueueLength() { 1252 int n = 0; 1253 for (Node p = tail; p != null; p = p.prev) { 1254 if (p.waiter != null) 1255 ++n; 1256 } 1257 return n; 1258 } 1259 1260 /** 1261 * Returns a collection containing threads that may be waiting to 1262 * acquire. Because the actual set of threads may change 1263 * dynamically while constructing this result, the returned 1264 * collection is only a best-effort estimate. The elements of the 1265 * returned collection are in no particular order. This method is 1266 * designed to facilitate construction of subclasses that provide 1267 * more extensive monitoring facilities. 1268 * 1269 * @return the collection of threads 1270 */ 1271 public final Collection<Thread> getQueuedThreads() { 1272 ArrayList<Thread> list = new ArrayList<>(); 1273 for (Node p = tail; p != null; p = p.prev) { 1274 Thread t = p.waiter; 1275 if (t != null) 1276 list.add(t); 1277 } 1278 return list; 1279 } 1280 1281 /** 1282 * Returns a collection containing threads that may be waiting to 1283 * acquire in exclusive mode. This has the same properties 1284 * as {@link #getQueuedThreads} except that it only returns 1285 * those threads waiting due to an exclusive acquire. 1286 * 1287 * @return the collection of threads 1288 */ 1289 public final Collection<Thread> getExclusiveQueuedThreads() { 1290 ArrayList<Thread> list = new ArrayList<>(); 1291 for (Node p = tail; p != null; p = p.prev) { 1292 if (!(p instanceof SharedNode)) { 1293 Thread t = p.waiter; 1294 if (t != null) 1295 list.add(t); 1296 } 1297 } 1298 return list; 1299 } 1300 1301 /** 1302 * Returns a collection containing threads that may be waiting to 1303 * acquire in shared mode. This has the same properties 1304 * as {@link #getQueuedThreads} except that it only returns 1305 * those threads waiting due to a shared acquire. 1306 * 1307 * @return the collection of threads 1308 */ 1309 public final Collection<Thread> getSharedQueuedThreads() { 1310 ArrayList<Thread> list = new ArrayList<>(); 1311 for (Node p = tail; p != null; p = p.prev) { 1312 if (p instanceof SharedNode) { 1313 Thread t = p.waiter; 1314 if (t != null) 1315 list.add(t); 1316 } 1317 } 1318 return list; 1319 } 1320 1321 /** 1322 * Returns a string identifying this synchronizer, as well as its state. 1323 * The state, in brackets, includes the String {@code "State ="} 1324 * followed by the current value of {@link #getState}, and either 1325 * {@code "nonempty"} or {@code "empty"} depending on whether the 1326 * queue is empty. 1327 * 1328 * @return a string identifying this synchronizer, as well as its state 1329 */ 1330 public String toString() { 1331 return super.toString() 1332 + "[State = " + getState() + ", " 1333 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; 1334 } 1335 1336 // Instrumentation methods for conditions 1337 1338 /** 1339 * Queries whether the given ConditionObject 1340 * uses this synchronizer as its lock. 1341 * 1342 * @param condition the condition 1343 * @return {@code true} if owned 1344 * @throws NullPointerException if the condition is null 1345 */ 1346 public final boolean owns(ConditionObject condition) { 1347 return condition.isOwnedBy(this); 1348 } 1349 1350 /** 1351 * Queries whether any threads are waiting on the given condition 1352 * associated with this synchronizer. Note that because timeouts 1353 * and interrupts may occur at any time, a {@code true} return 1354 * does not guarantee that a future {@code signal} will awaken 1355 * any threads. This method is designed primarily for use in 1356 * monitoring of the system state. 1357 * 1358 * @param condition the condition 1359 * @return {@code true} if there are any waiting threads 1360 * @throws IllegalMonitorStateException if exclusive synchronization 1361 * is not held 1362 * @throws IllegalArgumentException if the given condition is 1363 * not associated with this synchronizer 1364 * @throws NullPointerException if the condition is null 1365 */ 1366 public final boolean hasWaiters(ConditionObject condition) { 1367 if (!owns(condition)) 1368 throw new IllegalArgumentException("Not owner"); 1369 return condition.hasWaiters(); 1370 } 1371 1372 /** 1373 * Returns an estimate of the number of threads waiting on the 1374 * given condition associated with this synchronizer. Note that 1375 * because timeouts and interrupts may occur at any time, the 1376 * estimate serves only as an upper bound on the actual number of 1377 * waiters. This method is designed for use in monitoring system 1378 * state, not for synchronization control. 1379 * 1380 * @param condition the condition 1381 * @return the estimated number of waiting threads 1382 * @throws IllegalMonitorStateException if exclusive synchronization 1383 * is not held 1384 * @throws IllegalArgumentException if the given condition is 1385 * not associated with this synchronizer 1386 * @throws NullPointerException if the condition is null 1387 */ 1388 public final int getWaitQueueLength(ConditionObject condition) { 1389 if (!owns(condition)) 1390 throw new IllegalArgumentException("Not owner"); 1391 return condition.getWaitQueueLength(); 1392 } 1393 1394 /** 1395 * Returns a collection containing those threads that may be 1396 * waiting on the given condition associated with this 1397 * synchronizer. Because the actual set of threads may change 1398 * dynamically while constructing this result, the returned 1399 * collection is only a best-effort estimate. The elements of the 1400 * returned collection are in no particular order. 1401 * 1402 * @param condition the condition 1403 * @return the collection of threads 1404 * @throws IllegalMonitorStateException if exclusive synchronization 1405 * is not held 1406 * @throws IllegalArgumentException if the given condition is 1407 * not associated with this synchronizer 1408 * @throws NullPointerException if the condition is null 1409 */ 1410 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1411 if (!owns(condition)) 1412 throw new IllegalArgumentException("Not owner"); 1413 return condition.getWaitingThreads(); 1414 } 1415 1416 /** 1417 * Condition implementation for a {@link AbstractQueuedSynchronizer} 1418 * serving as the basis of a {@link Lock} implementation. 1419 * 1420 * <p>Method documentation for this class describes mechanics, 1421 * not behavioral specifications from the point of view of Lock 1422 * and Condition users. Exported versions of this class will in 1423 * general need to be accompanied by documentation describing 1424 * condition semantics that rely on those of the associated 1425 * {@code AbstractQueuedSynchronizer}. 1426 * 1427 * <p>This class is Serializable, but all fields are transient, 1428 * so deserialized conditions have no waiters. 1429 */ 1430 public class ConditionObject implements Condition, java.io.Serializable { 1431 private static final long serialVersionUID = 1173984872572414699L; 1432 /** First node of condition queue. */ 1433 private transient ConditionNode firstWaiter; 1434 /** Last node of condition queue. */ 1435 private transient ConditionNode lastWaiter; 1436 1437 /** 1438 * Creates a new {@code ConditionObject} instance. 1439 */ 1440 public ConditionObject() { } 1441 1442 // Signalling methods 1443 1444 /** 1445 * Removes and transfers one or all waiters to sync queue. 1446 */ 1447 private void doSignal(ConditionNode first, boolean all) { 1448 while (first != null) { 1449 ConditionNode next = first.nextWaiter; 1450 if ((firstWaiter = next) == null) 1451 lastWaiter = null; 1452 if ((first.getAndUnsetStatus(COND) & COND) != 0) { 1453 enqueue(first); 1454 if (!all) 1455 break; 1456 } 1457 first = next; 1458 } 1459 } 1460 1461 /** 1462 * Moves the longest-waiting thread, if one exists, from the 1463 * wait queue for this condition to the wait queue for the 1464 * owning lock. 1465 * 1466 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1467 * returns {@code false} 1468 */ 1469 public final void signal() { 1470 ConditionNode first = firstWaiter; 1471 if (!isHeldExclusively()) 1472 throw new IllegalMonitorStateException(); 1473 if (first != null) 1474 doSignal(first, false); 1475 } 1476 1477 /** 1478 * Moves all threads from the wait queue for this condition to 1479 * the wait queue for the owning lock. 1480 * 1481 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1482 * returns {@code false} 1483 */ 1484 public final void signalAll() { 1485 ConditionNode first = firstWaiter; 1486 if (!isHeldExclusively()) 1487 throw new IllegalMonitorStateException(); 1488 if (first != null) 1489 doSignal(first, true); 1490 } 1491 1492 // Waiting methods 1493 1494 /** 1495 * Adds node to condition list and releases lock. 1496 * 1497 * @param node the node 1498 * @return savedState to reacquire after wait 1499 */ 1500 private int enableWait(ConditionNode node) { 1501 if (isHeldExclusively()) { 1502 node.waiter = Thread.currentThread(); 1503 node.setStatusRelaxed(COND | WAITING); 1504 ConditionNode last = lastWaiter; 1505 if (last == null) 1506 firstWaiter = node; 1507 else 1508 last.nextWaiter = node; 1509 lastWaiter = node; 1510 int savedState = getState(); 1511 if (release(savedState)) 1512 return savedState; 1513 } 1514 node.status = CANCELLED; // lock not held or inconsistent 1515 throw new IllegalMonitorStateException(); 1516 } 1517 1518 /** 1519 * Returns true if a node that was initially placed on a condition 1520 * queue is now ready to reacquire on sync queue. 1521 * @param node the node 1522 * @return true if is reacquiring 1523 */ 1524 private boolean canReacquire(ConditionNode node) { 1525 // check links, not status to avoid enqueue race 1526 return node != null && node.prev != null && isEnqueued(node); 1527 } 1528 1529 /** 1530 * Unlinks the given node and other non-waiting nodes from 1531 * condition queue unless already unlinked. 1532 */ 1533 private void unlinkCancelledWaiters(ConditionNode node) { 1534 if (node == null || node.nextWaiter != null || node == lastWaiter) { 1535 ConditionNode w = firstWaiter, trail = null; 1536 while (w != null) { 1537 ConditionNode next = w.nextWaiter; 1538 if ((w.status & COND) == 0) { 1539 w.nextWaiter = null; 1540 if (trail == null) 1541 firstWaiter = next; 1542 else 1543 trail.nextWaiter = next; 1544 if (next == null) 1545 lastWaiter = trail; 1546 } else 1547 trail = w; 1548 w = next; 1549 } 1550 } 1551 } 1552 1553 /** 1554 * Implements uninterruptible condition wait. 1555 * <ol> 1556 * <li>Save lock state returned by {@link #getState}. 1557 * <li>Invoke {@link #release} with saved state as argument, 1558 * throwing IllegalMonitorStateException if it fails. 1559 * <li>Block until signalled. 1560 * <li>Reacquire by invoking specialized version of 1561 * {@link #acquire} with saved state as argument. 1562 * </ol> 1563 */ 1564 public final void awaitUninterruptibly() { 1565 ConditionNode node = new ConditionNode(); 1566 int savedState = enableWait(node); 1567 LockSupport.setCurrentBlocker(this); // for back-compatibility 1568 boolean interrupted = false; 1569 while (!canReacquire(node)) { 1570 if (Thread.interrupted()) 1571 interrupted = true; 1572 else if ((node.status & COND) != 0) { 1573 try { 1574 ForkJoinPool.managedBlock(node); 1575 } catch (InterruptedException ie) { 1576 interrupted = true; 1577 } 1578 } else 1579 Thread.onSpinWait(); // awoke while enqueuing 1580 } 1581 LockSupport.setCurrentBlocker(null); 1582 node.clearStatus(); 1583 acquire(node, savedState, false, false, false, 0L); 1584 if (interrupted) 1585 Thread.currentThread().interrupt(); 1586 } 1587 1588 /** 1589 * Implements interruptible condition wait. 1590 * <ol> 1591 * <li>If current thread is interrupted, throw InterruptedException. 1592 * <li>Save lock state returned by {@link #getState}. 1593 * <li>Invoke {@link #release} with saved state as argument, 1594 * throwing IllegalMonitorStateException if it fails. 1595 * <li>Block until signalled or interrupted. 1596 * <li>Reacquire by invoking specialized version of 1597 * {@link #acquire} with saved state as argument. 1598 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1599 * </ol> 1600 */ 1601 public final void await() throws InterruptedException { 1602 if (Thread.interrupted()) 1603 throw new InterruptedException(); 1604 ConditionNode node = new ConditionNode(); 1605 int savedState = enableWait(node); 1606 LockSupport.setCurrentBlocker(this); // for back-compatibility 1607 boolean interrupted = false, cancelled = false; 1608 while (!canReacquire(node)) { 1609 if (interrupted |= Thread.interrupted()) { 1610 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1611 break; // else interrupted after signal 1612 } else if ((node.status & COND) != 0) { 1613 try { 1614 ForkJoinPool.managedBlock(node); 1615 } catch (InterruptedException ie) { 1616 interrupted = true; 1617 } 1618 } else 1619 Thread.onSpinWait(); // awoke while enqueuing 1620 } 1621 LockSupport.setCurrentBlocker(null); 1622 node.clearStatus(); 1623 acquire(node, savedState, false, false, false, 0L); 1624 if (interrupted) { 1625 if (cancelled) { 1626 unlinkCancelledWaiters(node); 1627 throw new InterruptedException(); 1628 } 1629 Thread.currentThread().interrupt(); 1630 } 1631 } 1632 1633 /** 1634 * Implements timed condition wait. 1635 * <ol> 1636 * <li>If current thread is interrupted, throw InterruptedException. 1637 * <li>Save lock state returned by {@link #getState}. 1638 * <li>Invoke {@link #release} with saved state as argument, 1639 * throwing IllegalMonitorStateException if it fails. 1640 * <li>Block until signalled, interrupted, or timed out. 1641 * <li>Reacquire by invoking specialized version of 1642 * {@link #acquire} with saved state as argument. 1643 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1644 * </ol> 1645 */ 1646 public final long awaitNanos(long nanosTimeout) 1647 throws InterruptedException { 1648 if (Thread.interrupted()) 1649 throw new InterruptedException(); 1650 ConditionNode node = new ConditionNode(); 1651 int savedState = enableWait(node); 1652 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1653 long deadline = System.nanoTime() + nanos; 1654 boolean cancelled = false, interrupted = false; 1655 while (!canReacquire(node)) { 1656 if ((interrupted |= Thread.interrupted()) || 1657 (nanos = deadline - System.nanoTime()) <= 0L) { 1658 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1659 break; 1660 } else 1661 LockSupport.parkNanos(this, nanos); 1662 } 1663 node.clearStatus(); 1664 acquire(node, savedState, false, false, false, 0L); 1665 if (cancelled) { 1666 unlinkCancelledWaiters(node); 1667 if (interrupted) 1668 throw new InterruptedException(); 1669 } else if (interrupted) 1670 Thread.currentThread().interrupt(); 1671 long remaining = deadline - System.nanoTime(); // avoid overflow 1672 return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; 1673 } 1674 1675 /** 1676 * Implements absolute timed condition wait. 1677 * <ol> 1678 * <li>If current thread is interrupted, throw InterruptedException. 1679 * <li>Save lock state returned by {@link #getState}. 1680 * <li>Invoke {@link #release} with saved state as argument, 1681 * throwing IllegalMonitorStateException if it fails. 1682 * <li>Block until signalled, interrupted, or timed out. 1683 * <li>Reacquire by invoking specialized version of 1684 * {@link #acquire} with saved state as argument. 1685 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1686 * <li>If timed out while blocked in step 4, return false, else true. 1687 * </ol> 1688 */ 1689 public final boolean awaitUntil(Date deadline) 1690 throws InterruptedException { 1691 long abstime = deadline.getTime(); 1692 if (Thread.interrupted()) 1693 throw new InterruptedException(); 1694 ConditionNode node = new ConditionNode(); 1695 int savedState = enableWait(node); 1696 boolean cancelled = false, interrupted = false; 1697 while (!canReacquire(node)) { 1698 if ((interrupted |= Thread.interrupted()) || 1699 System.currentTimeMillis() >= abstime) { 1700 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1701 break; 1702 } else 1703 LockSupport.parkUntil(this, abstime); 1704 } 1705 node.clearStatus(); 1706 acquire(node, savedState, false, false, false, 0L); 1707 if (cancelled) { 1708 unlinkCancelledWaiters(node); 1709 if (interrupted) 1710 throw new InterruptedException(); 1711 } else if (interrupted) 1712 Thread.currentThread().interrupt(); 1713 return !cancelled; 1714 } 1715 1716 /** 1717 * Implements timed condition wait. 1718 * <ol> 1719 * <li>If current thread is interrupted, throw InterruptedException. 1720 * <li>Save lock state returned by {@link #getState}. 1721 * <li>Invoke {@link #release} with saved state as argument, 1722 * throwing IllegalMonitorStateException if it fails. 1723 * <li>Block until signalled, interrupted, or timed out. 1724 * <li>Reacquire by invoking specialized version of 1725 * {@link #acquire} with saved state as argument. 1726 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1727 * <li>If timed out while blocked in step 4, return false, else true. 1728 * </ol> 1729 */ 1730 public final boolean await(long time, TimeUnit unit) 1731 throws InterruptedException { 1732 long nanosTimeout = unit.toNanos(time); 1733 if (Thread.interrupted()) 1734 throw new InterruptedException(); 1735 ConditionNode node = new ConditionNode(); 1736 int savedState = enableWait(node); 1737 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1738 long deadline = System.nanoTime() + nanos; 1739 boolean cancelled = false, interrupted = false; 1740 while (!canReacquire(node)) { 1741 if ((interrupted |= Thread.interrupted()) || 1742 (nanos = deadline - System.nanoTime()) <= 0L) { 1743 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1744 break; 1745 } else 1746 LockSupport.parkNanos(this, nanos); 1747 } 1748 node.clearStatus(); 1749 acquire(node, savedState, false, false, false, 0L); 1750 if (cancelled) { 1751 unlinkCancelledWaiters(node); 1752 if (interrupted) 1753 throw new InterruptedException(); 1754 } else if (interrupted) 1755 Thread.currentThread().interrupt(); 1756 return !cancelled; 1757 } 1758 1759 // support for instrumentation 1760 1761 /** 1762 * Returns true if this condition was created by the given 1763 * synchronization object. 1764 * 1765 * @return {@code true} if owned 1766 */ 1767 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { 1768 return sync == AbstractQueuedSynchronizer.this; 1769 } 1770 1771 /** 1772 * Queries whether any threads are waiting on this condition. 1773 * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. 1774 * 1775 * @return {@code true} if there are any waiting threads 1776 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1777 * returns {@code false} 1778 */ 1779 protected final boolean hasWaiters() { 1780 if (!isHeldExclusively()) 1781 throw new IllegalMonitorStateException(); 1782 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1783 if ((w.status & COND) != 0) 1784 return true; 1785 } 1786 return false; 1787 } 1788 1789 /** 1790 * Returns an estimate of the number of threads waiting on 1791 * this condition. 1792 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. 1793 * 1794 * @return the estimated number of waiting threads 1795 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1796 * returns {@code false} 1797 */ 1798 protected final int getWaitQueueLength() { 1799 if (!isHeldExclusively()) 1800 throw new IllegalMonitorStateException(); 1801 int n = 0; 1802 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1803 if ((w.status & COND) != 0) 1804 ++n; 1805 } 1806 return n; 1807 } 1808 1809 /** 1810 * Returns a collection containing those threads that may be 1811 * waiting on this Condition. 1812 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. 1813 * 1814 * @return the collection of threads 1815 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1816 * returns {@code false} 1817 */ 1818 protected final Collection<Thread> getWaitingThreads() { 1819 if (!isHeldExclusively()) 1820 throw new IllegalMonitorStateException(); 1821 ArrayList<Thread> list = new ArrayList<>(); 1822 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1823 if ((w.status & COND) != 0) { 1824 Thread t = w.waiter; 1825 if (t != null) 1826 list.add(t); 1827 } 1828 } 1829 return list; 1830 } 1831 } 1832 1833 // Unsafe 1834 private static final Unsafe U = Unsafe.getUnsafe(); 1835 private static final long STATE 1836 = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state"); 1837 private static final long HEAD 1838 = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "head"); 1839 private static final long TAIL 1840 = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "tail"); 1841 1842 static { 1843 Class<?> ensureLoaded = LockSupport.class; 1844 } 1845 }