1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent.locks; 37 38 import java.util.ArrayList; 39 import java.util.Collection; 40 import java.util.Date; 41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.ForkJoinPool; 43 import jdk.internal.misc.Unsafe; 44 45 /** 46 * A version of {@link AbstractQueuedSynchronizer} in 47 * which synchronization state is maintained as a {@code long}. 48 * This class has exactly the same structure, properties, and methods 49 * as {@code AbstractQueuedSynchronizer} with the exception 50 * that all state-related parameters and results are defined 51 * as {@code long} rather than {@code int}. This class 52 * may be useful when creating synchronizers such as 53 * multilevel locks and barriers that require 54 * 64 bits of state. 55 * 56 * <p>See {@link AbstractQueuedSynchronizer} for usage 57 * notes and examples. 58 * 59 * @since 1.6 60 * @author Doug Lea 61 */ 62 public abstract class AbstractQueuedLongSynchronizer 63 extends AbstractOwnableSynchronizer 64 implements java.io.Serializable { 65 66 private static final long serialVersionUID = 7373984972572414692L; 67 68 /* 69 * To keep sources in sync, the remainder of this source file is 70 * exactly cloned from AbstractQueuedSynchronizer, replacing class 71 * name and changing ints related with sync state to longs. Please 72 * keep it that way. 73 */ 74 75 // Node status bits, also used as argument and return values 76 static final int WAITING = 1; // must be 1 77 static final int CANCELLED = 0x80000000; // must be negative 78 static final int COND = 2; // in a condition wait 79 80 /** CLH Nodes */ 81 abstract static class Node { 82 volatile Node prev; // initially attached via casTail 83 volatile Node next; // visibly nonnull when signallable 84 Thread waiter; // visibly nonnull when enqueued 85 volatile int status; // written by owner, atomic bit ops by others 86 87 // methods for atomic operations 88 final boolean casPrev(Node c, Node v) { // for cleanQueue 89 return U.weakCompareAndSetReference(this, PREV, c, v); 90 } 91 final boolean casNext(Node c, Node v) { // for cleanQueue 92 return U.weakCompareAndSetReference(this, NEXT, c, v); 93 } 94 final int getAndUnsetStatus(int v) { // for signalling 95 return U.getAndBitwiseAndInt(this, STATUS, ~v); 96 } 97 final void setPrevRelaxed(Node p) { // for off-queue assignment 98 U.putReference(this, PREV, p); 99 } 100 final void setStatusRelaxed(int s) { // for off-queue assignment 101 U.putInt(this, STATUS, s); 102 } 103 final void clearStatus() { // for reducing unneeded signals 104 U.putIntOpaque(this, STATUS, 0); 105 } 106 107 private static final long STATUS 108 = U.objectFieldOffset(Node.class, "status"); 109 private static final long NEXT 110 = U.objectFieldOffset(Node.class, "next"); 111 private static final long PREV 112 = U.objectFieldOffset(Node.class, "prev"); 113 } 114 115 // Concrete classes tagged by type 116 static final class ExclusiveNode extends Node { } 117 static final class SharedNode extends Node { } 118 119 static final class ConditionNode extends Node 120 implements ForkJoinPool.ManagedBlocker { 121 ConditionNode nextWaiter; // link to next waiting node 122 123 /** 124 * Allows Conditions to be used in ForkJoinPools without 125 * risking fixed pool exhaustion. This is usable only for 126 * untimed Condition waits, not timed versions. 127 */ 128 public final boolean isReleasable() { 129 return status <= 1 || Thread.currentThread().isInterrupted(); 130 } 131 132 public final boolean block() { 133 while (!isReleasable()) LockSupport.park(); 134 return true; 135 } 136 } 137 138 /** 139 * Head of the wait queue, lazily initialized. 140 */ 141 private transient volatile Node head; 142 143 /** 144 * Tail of the wait queue. After initialization, modified only via casTail. 145 */ 146 private transient volatile Node tail; 147 148 /** 149 * The synchronization state. 150 */ 151 private volatile long state; 152 153 /** 154 * Returns the current value of synchronization state. 155 * This operation has memory semantics of a {@code volatile} read. 156 * @return current state value 157 */ 158 protected final long getState() { 159 return state; 160 } 161 162 /** 163 * Sets the value of synchronization state. 164 * This operation has memory semantics of a {@code volatile} write. 165 * @param newState the new state value 166 */ 167 protected final void setState(long newState) { 168 state = newState; 169 } 170 171 /** 172 * Atomically sets synchronization state to the given updated 173 * value if the current state value equals the expected value. 174 * This operation has memory semantics of a {@code volatile} read 175 * and write. 176 * 177 * @param expect the expected value 178 * @param update the new value 179 * @return {@code true} if successful. False return indicates that the actual 180 * value was not equal to the expected value. 181 */ 182 protected final boolean compareAndSetState(long expect, long update) { 183 return U.compareAndSetLong(this, STATE, expect, update); 184 } 185 186 // Queuing utilities 187 188 private boolean casTail(Node c, Node v) { 189 return U.compareAndSetReference(this, TAIL, c, v); 190 } 191 192 /** tries once to CAS a new dummy node for head */ 193 private void tryInitializeHead() { 194 Node h = new ExclusiveNode(); 195 if (U.compareAndSetReference(this, HEAD, null, h)) 196 tail = h; 197 } 198 199 /** 200 * Enqueues the node unless null. (Currently used only for 201 * ConditionNodes; other cases are interleaved with acquires.) 202 */ 203 final void enqueue(Node node) { 204 if (node != null) { 205 for (;;) { 206 Node t = tail; 207 node.setPrevRelaxed(t); // avoid unnecessary fence 208 if (t == null) // initialize 209 tryInitializeHead(); 210 else if (casTail(t, node)) { 211 t.next = node; 212 if (t.status < 0) // wake up to clean link 213 LockSupport.unpark(node.waiter); 214 break; 215 } 216 } 217 } 218 } 219 220 /** Returns true if node is found in traversal from tail */ 221 final boolean isEnqueued(Node node) { 222 for (Node t = tail; t != null; t = t.prev) 223 if (t == node) 224 return true; 225 return false; 226 } 227 228 /** 229 * Wakes up the successor of given node, if one exists, and unsets its 230 * WAITING status to avoid park race. This may fail to wake up an 231 * eligible thread when one or more have been cancelled, but 232 * cancelAcquire ensures liveness. 233 */ 234 private static void signalNext(Node h) { 235 Node s; 236 if (h != null && (s = h.next) != null && s.status != 0) { 237 s.getAndUnsetStatus(WAITING); 238 LockSupport.unpark(s.waiter); 239 } 240 } 241 242 /** Wakes up the given node if in shared mode */ 243 private static void signalNextIfShared(Node h) { 244 Node s; 245 if (h != null && (s = h.next) != null && 246 (s instanceof SharedNode) && s.status != 0) { 247 s.getAndUnsetStatus(WAITING); 248 LockSupport.unpark(s.waiter); 249 } 250 } 251 252 /** 253 * Main acquire method, invoked by all exported acquire methods. 254 * 255 * @param node null unless a reacquiring Condition 256 * @param arg the acquire argument 257 * @param shared true if shared mode else exclusive 258 * @param interruptible if abort and return negative on interrupt 259 * @param timed if true use timed waits 260 * @param time if timed, the System.nanoTime value to timeout 261 * @return positive if acquired, 0 if timed out, negative if interrupted 262 */ 263 final int acquire(Node node, long arg, boolean shared, 264 boolean interruptible, boolean timed, long time) { 265 Thread current = Thread.currentThread(); 266 byte spins = 0, postSpins = 0; // retries upon unpark of first thread 267 boolean interrupted = false, first = false; 268 Node pred = null; // predecessor of node when enqueued 269 270 /* 271 * Repeatedly: 272 * Check if node now first 273 * if so, ensure head stable, else ensure valid predecessor 274 * if node is first or not yet enqueued, try acquiring 275 * else if node not yet created, create it 276 * else if not yet enqueued, try once to enqueue 277 * else if woken from park, retry (up to postSpins times) 278 * else if WAITING status not set, set and retry 279 * else park and clear WAITING status, and check cancellation 280 */ 281 282 for (;;) { 283 if (!first && (pred = (node == null) ? null : node.prev) != null && 284 !(first = (head == pred))) { 285 if (pred.status < 0) { 286 cleanQueue(); // predecessor cancelled 287 continue; 288 } else if (pred.prev == null) { 289 Thread.onSpinWait(); // ensure serialization 290 continue; 291 } 292 } 293 if (first || pred == null) { 294 boolean acquired; 295 try { 296 if (shared) 297 acquired = (tryAcquireShared(arg) >= 0); 298 else 299 acquired = tryAcquire(arg); 300 } catch (Throwable ex) { 301 cancelAcquire(node, interrupted, false); 302 throw ex; 303 } 304 if (acquired) { 305 if (first) { 306 node.prev = null; 307 head = node; 308 pred.next = null; 309 node.waiter = null; 310 if (shared) 311 signalNextIfShared(node); 312 if (interrupted) 313 current.interrupt(); 314 } 315 return 1; 316 } 317 } 318 if (node == null) { // allocate; retry before enqueue 319 if (shared) 320 node = new SharedNode(); 321 else 322 node = new ExclusiveNode(); 323 } else if (pred == null) { // try to enqueue 324 node.waiter = current; 325 Node t = tail; 326 node.setPrevRelaxed(t); // avoid unnecessary fence 327 if (t == null) 328 tryInitializeHead(); 329 else if (!casTail(t, node)) 330 node.setPrevRelaxed(null); // back out 331 else 332 t.next = node; 333 } else if (first && spins != 0) { 334 --spins; // reduce unfairness on rewaits 335 Thread.onSpinWait(); 336 } else if (node.status == 0) { 337 node.status = WAITING; // enable signal and recheck 338 } else { 339 long nanos; 340 spins = postSpins = (byte)((postSpins << 1) | 1); 341 if (!timed) 342 LockSupport.park(this); 343 else if ((nanos = time - System.nanoTime()) > 0L) 344 LockSupport.parkNanos(this, nanos); 345 else 346 break; 347 node.clearStatus(); 348 if ((interrupted |= Thread.interrupted()) && interruptible) 349 break; 350 } 351 } 352 return cancelAcquire(node, interrupted, interruptible); 353 } 354 355 /** 356 * Possibly repeatedly traverses from tail, unsplicing cancelled 357 * nodes until none are found. 358 */ 359 private void cleanQueue() { 360 for (;;) { // restart point 361 for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples 362 if (q == null || (p = q.prev) == null) 363 return; // end of list 364 if (s == null ? tail != q : (s.prev != q || s.status < 0)) 365 break; // inconsistent 366 if (q.status < 0) { // cancelled 367 if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && 368 q.prev == p) { 369 p.casNext(q, s); // OK if fails 370 if (p.prev == null) 371 signalNext(p); 372 } 373 break; 374 } 375 if ((n = p.next) != q) { // help finish 376 if (n != null && q.prev == p) { 377 p.casNext(n, q); 378 if (p.prev == null) 379 signalNext(p); 380 } 381 break; 382 } 383 s = q; 384 q = q.prev; 385 } 386 } 387 } 388 389 /** 390 * Cancels an ongoing attempt to acquire. 391 * 392 * @param node the node (may be null if cancelled before enqueuing) 393 * @param interrupted true if thread interrupted 394 * @param interruptible if should report interruption vs reset 395 */ 396 private int cancelAcquire(Node node, boolean interrupted, 397 boolean interruptible) { 398 if (node != null) { 399 node.waiter = null; 400 node.status = CANCELLED; 401 if (node.prev != null) 402 cleanQueue(); 403 } 404 if (interrupted) { 405 if (interruptible) 406 return CANCELLED; 407 else 408 Thread.currentThread().interrupt(); 409 } 410 return 0; 411 } 412 413 // Main exported methods 414 415 /** 416 * Attempts to acquire in exclusive mode. This method should query 417 * if the state of the object permits it to be acquired in the 418 * exclusive mode, and if so to acquire it. 419 * 420 * <p>This method is always invoked by the thread performing 421 * acquire. If this method reports failure, the acquire method 422 * may queue the thread, if it is not already queued, until it is 423 * signalled by a release from some other thread. This can be used 424 * to implement method {@link Lock#tryLock()}. 425 * 426 * <p>The default 427 * implementation throws {@link UnsupportedOperationException}. 428 * 429 * @param arg the acquire argument. This value is always the one 430 * passed to an acquire method, or is the value saved on entry 431 * to a condition wait. The value is otherwise uninterpreted 432 * and can represent anything you like. 433 * @return {@code true} if successful. Upon success, this object has 434 * been acquired. 435 * @throws IllegalMonitorStateException if acquiring would place this 436 * synchronizer in an illegal state. This exception must be 437 * thrown in a consistent fashion for synchronization to work 438 * correctly. 439 * @throws UnsupportedOperationException if exclusive mode is not supported 440 */ 441 protected boolean tryAcquire(long arg) { 442 throw new UnsupportedOperationException(); 443 } 444 445 /** 446 * Attempts to set the state to reflect a release in exclusive 447 * mode. 448 * 449 * <p>This method is always invoked by the thread performing release. 450 * 451 * <p>The default implementation throws 452 * {@link UnsupportedOperationException}. 453 * 454 * @param arg the release argument. This value is always the one 455 * passed to a release method, or the current state value upon 456 * entry to a condition wait. The value is otherwise 457 * uninterpreted and can represent anything you like. 458 * @return {@code true} if this object is now in a fully released 459 * state, so that any waiting threads may attempt to acquire; 460 * and {@code false} otherwise. 461 * @throws IllegalMonitorStateException if releasing would place this 462 * synchronizer in an illegal state. This exception must be 463 * thrown in a consistent fashion for synchronization to work 464 * correctly. 465 * @throws UnsupportedOperationException if exclusive mode is not supported 466 */ 467 protected boolean tryRelease(long arg) { 468 throw new UnsupportedOperationException(); 469 } 470 471 /** 472 * Attempts to acquire in shared mode. This method should query if 473 * the state of the object permits it to be acquired in the shared 474 * mode, and if so to acquire it. 475 * 476 * <p>This method is always invoked by the thread performing 477 * acquire. If this method reports failure, the acquire method 478 * may queue the thread, if it is not already queued, until it is 479 * signalled by a release from some other thread. 480 * 481 * <p>The default implementation throws {@link 482 * UnsupportedOperationException}. 483 * 484 * @param arg the acquire argument. This value is always the one 485 * passed to an acquire method, or is the value saved on entry 486 * to a condition wait. The value is otherwise uninterpreted 487 * and can represent anything you like. 488 * @return a negative value on failure; zero if acquisition in shared 489 * mode succeeded but no subsequent shared-mode acquire can 490 * succeed; and a positive value if acquisition in shared 491 * mode succeeded and subsequent shared-mode acquires might 492 * also succeed, in which case a subsequent waiting thread 493 * must check availability. (Support for three different 494 * return values enables this method to be used in contexts 495 * where acquires only sometimes act exclusively.) Upon 496 * success, this object has been acquired. 497 * @throws IllegalMonitorStateException if acquiring would place this 498 * synchronizer in an illegal state. This exception must be 499 * thrown in a consistent fashion for synchronization to work 500 * correctly. 501 * @throws UnsupportedOperationException if shared mode is not supported 502 */ 503 protected long tryAcquireShared(long arg) { 504 throw new UnsupportedOperationException(); 505 } 506 507 /** 508 * Attempts to set the state to reflect a release in shared mode. 509 * 510 * <p>This method is always invoked by the thread performing release. 511 * 512 * <p>The default implementation throws 513 * {@link UnsupportedOperationException}. 514 * 515 * @param arg the release argument. This value is always the one 516 * passed to a release method, or the current state value upon 517 * entry to a condition wait. The value is otherwise 518 * uninterpreted and can represent anything you like. 519 * @return {@code true} if this release of shared mode may permit a 520 * waiting acquire (shared or exclusive) to succeed; and 521 * {@code false} otherwise 522 * @throws IllegalMonitorStateException if releasing would place this 523 * synchronizer in an illegal state. This exception must be 524 * thrown in a consistent fashion for synchronization to work 525 * correctly. 526 * @throws UnsupportedOperationException if shared mode is not supported 527 */ 528 protected boolean tryReleaseShared(long arg) { 529 throw new UnsupportedOperationException(); 530 } 531 532 /** 533 * Returns {@code true} if synchronization is held exclusively with 534 * respect to the current (calling) thread. This method is invoked 535 * upon each call to a {@link ConditionObject} method. 536 * 537 * <p>The default implementation throws {@link 538 * UnsupportedOperationException}. This method is invoked 539 * internally only within {@link ConditionObject} methods, so need 540 * not be defined if conditions are not used. 541 * 542 * @return {@code true} if synchronization is held exclusively; 543 * {@code false} otherwise 544 * @throws UnsupportedOperationException if conditions are not supported 545 */ 546 protected boolean isHeldExclusively() { 547 throw new UnsupportedOperationException(); 548 } 549 550 /** 551 * Acquires in exclusive mode, ignoring interrupts. Implemented 552 * by invoking at least once {@link #tryAcquire}, 553 * returning on success. Otherwise the thread is queued, possibly 554 * repeatedly blocking and unblocking, invoking {@link 555 * #tryAcquire} until success. This method can be used 556 * to implement method {@link Lock#lock}. 557 * 558 * @param arg the acquire argument. This value is conveyed to 559 * {@link #tryAcquire} but is otherwise uninterpreted and 560 * can represent anything you like. 561 */ 562 public final void acquire(long arg) { 563 if (!tryAcquire(arg)) 564 acquire(null, arg, false, false, false, 0L); 565 } 566 567 /** 568 * Acquires in exclusive mode, aborting if interrupted. 569 * Implemented by first checking interrupt status, then invoking 570 * at least once {@link #tryAcquire}, returning on 571 * success. Otherwise the thread is queued, possibly repeatedly 572 * blocking and unblocking, invoking {@link #tryAcquire} 573 * until success or the thread is interrupted. This method can be 574 * used to implement method {@link Lock#lockInterruptibly}. 575 * 576 * @param arg the acquire argument. This value is conveyed to 577 * {@link #tryAcquire} but is otherwise uninterpreted and 578 * can represent anything you like. 579 * @throws InterruptedException if the current thread is interrupted 580 */ 581 public final void acquireInterruptibly(long arg) 582 throws InterruptedException { 583 if (Thread.interrupted() || 584 (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) 585 throw new InterruptedException(); 586 } 587 588 /** 589 * Attempts to acquire in exclusive mode, aborting if interrupted, 590 * and failing if the given timeout elapses. Implemented by first 591 * checking interrupt status, then invoking at least once {@link 592 * #tryAcquire}, returning on success. Otherwise, the thread is 593 * queued, possibly repeatedly blocking and unblocking, invoking 594 * {@link #tryAcquire} until success or the thread is interrupted 595 * or the timeout elapses. This method can be used to implement 596 * method {@link Lock#tryLock(long, TimeUnit)}. 597 * 598 * @param arg the acquire argument. This value is conveyed to 599 * {@link #tryAcquire} but is otherwise uninterpreted and 600 * can represent anything you like. 601 * @param nanosTimeout the maximum number of nanoseconds to wait 602 * @return {@code true} if acquired; {@code false} if timed out 603 * @throws InterruptedException if the current thread is interrupted 604 */ 605 public final boolean tryAcquireNanos(long arg, long nanosTimeout) 606 throws InterruptedException { 607 if (!Thread.interrupted()) { 608 if (tryAcquire(arg)) 609 return true; 610 if (nanosTimeout <= 0L) 611 return false; 612 int stat = acquire(null, arg, false, true, true, 613 System.nanoTime() + nanosTimeout); 614 if (stat > 0) 615 return true; 616 if (stat == 0) 617 return false; 618 } 619 throw new InterruptedException(); 620 } 621 622 /** 623 * Releases in exclusive mode. Implemented by unblocking one or 624 * more threads if {@link #tryRelease} returns true. 625 * This method can be used to implement method {@link Lock#unlock}. 626 * 627 * @param arg the release argument. This value is conveyed to 628 * {@link #tryRelease} but is otherwise uninterpreted and 629 * can represent anything you like. 630 * @return the value returned from {@link #tryRelease} 631 */ 632 public final boolean release(long arg) { 633 if (tryRelease(arg)) { 634 signalNext(head); 635 return true; 636 } 637 return false; 638 } 639 640 /** 641 * Acquires in shared mode, ignoring interrupts. Implemented by 642 * first invoking at least once {@link #tryAcquireShared}, 643 * returning on success. Otherwise the thread is queued, possibly 644 * repeatedly blocking and unblocking, invoking {@link 645 * #tryAcquireShared} until success. 646 * 647 * @param arg the acquire argument. This value is conveyed to 648 * {@link #tryAcquireShared} but is otherwise uninterpreted 649 * and can represent anything you like. 650 */ 651 public final void acquireShared(long arg) { 652 if (tryAcquireShared(arg) < 0) 653 acquire(null, arg, true, false, false, 0L); 654 } 655 656 /** 657 * Acquires in shared mode, aborting if interrupted. Implemented 658 * by first checking interrupt status, then invoking at least once 659 * {@link #tryAcquireShared}, returning on success. Otherwise the 660 * thread is queued, possibly repeatedly blocking and unblocking, 661 * invoking {@link #tryAcquireShared} until success or the thread 662 * is interrupted. 663 * @param arg the acquire argument. 664 * This value is conveyed to {@link #tryAcquireShared} but is 665 * otherwise uninterpreted and can represent anything 666 * you like. 667 * @throws InterruptedException if the current thread is interrupted 668 */ 669 public final void acquireSharedInterruptibly(long arg) 670 throws InterruptedException { 671 if (Thread.interrupted() || 672 (tryAcquireShared(arg) < 0 && 673 acquire(null, arg, true, true, false, 0L) < 0)) 674 throw new InterruptedException(); 675 } 676 677 /** 678 * Attempts to acquire in shared mode, aborting if interrupted, and 679 * failing if the given timeout elapses. Implemented by first 680 * checking interrupt status, then invoking at least once {@link 681 * #tryAcquireShared}, returning on success. Otherwise, the 682 * thread is queued, possibly repeatedly blocking and unblocking, 683 * invoking {@link #tryAcquireShared} until success or the thread 684 * is interrupted or the timeout elapses. 685 * 686 * @param arg the acquire argument. This value is conveyed to 687 * {@link #tryAcquireShared} but is otherwise uninterpreted 688 * and can represent anything you like. 689 * @param nanosTimeout the maximum number of nanoseconds to wait 690 * @return {@code true} if acquired; {@code false} if timed out 691 * @throws InterruptedException if the current thread is interrupted 692 */ 693 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) 694 throws InterruptedException { 695 if (!Thread.interrupted()) { 696 if (tryAcquireShared(arg) >= 0) 697 return true; 698 if (nanosTimeout <= 0L) 699 return false; 700 int stat = acquire(null, arg, true, true, true, 701 System.nanoTime() + nanosTimeout); 702 if (stat > 0) 703 return true; 704 if (stat == 0) 705 return false; 706 } 707 throw new InterruptedException(); 708 } 709 710 /** 711 * Releases in shared mode. Implemented by unblocking one or more 712 * threads if {@link #tryReleaseShared} returns true. 713 * 714 * @param arg the release argument. This value is conveyed to 715 * {@link #tryReleaseShared} but is otherwise uninterpreted 716 * and can represent anything you like. 717 * @return the value returned from {@link #tryReleaseShared} 718 */ 719 public final boolean releaseShared(long arg) { 720 if (tryReleaseShared(arg)) { 721 signalNext(head); 722 return true; 723 } 724 return false; 725 } 726 727 // Queue inspection methods 728 729 /** 730 * Queries whether any threads are waiting to acquire. Note that 731 * because cancellations due to interrupts and timeouts may occur 732 * at any time, a {@code true} return does not guarantee that any 733 * other thread will ever acquire. 734 * 735 * @return {@code true} if there may be other threads waiting to acquire 736 */ 737 public final boolean hasQueuedThreads() { 738 for (Node p = tail, h = head; p != h && p != null; p = p.prev) 739 if (p.status >= 0) 740 return true; 741 return false; 742 } 743 744 /** 745 * Queries whether any threads have ever contended to acquire this 746 * synchronizer; that is, if an acquire method has ever blocked. 747 * 748 * <p>In this implementation, this operation returns in 749 * constant time. 750 * 751 * @return {@code true} if there has ever been contention 752 */ 753 public final boolean hasContended() { 754 return head != null; 755 } 756 757 /** 758 * Returns the first (longest-waiting) thread in the queue, or 759 * {@code null} if no threads are currently queued. 760 * 761 * <p>In this implementation, this operation normally returns in 762 * constant time, but may iterate upon contention if other threads are 763 * concurrently modifying the queue. 764 * 765 * @return the first (longest-waiting) thread in the queue, or 766 * {@code null} if no threads are currently queued 767 */ 768 public final Thread getFirstQueuedThread() { 769 Thread first = null, w; Node h, s; 770 if ((h = head) != null && ((s = h.next) == null || 771 (first = s.waiter) == null || 772 s.prev == null)) { 773 // traverse from tail on stale reads 774 for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) 775 if ((w = p.waiter) != null) 776 first = w; 777 } 778 return first; 779 } 780 781 /** 782 * Returns true if the given thread is currently queued. 783 * 784 * <p>This implementation traverses the queue to determine 785 * presence of the given thread. 786 * 787 * @param thread the thread 788 * @return {@code true} if the given thread is on the queue 789 * @throws NullPointerException if the thread is null 790 */ 791 public final boolean isQueued(Thread thread) { 792 if (thread == null) 793 throw new NullPointerException(); 794 for (Node p = tail; p != null; p = p.prev) 795 if (p.waiter == thread) 796 return true; 797 return false; 798 } 799 800 /** 801 * Returns {@code true} if the apparent first queued thread, if one 802 * exists, is waiting in exclusive mode. If this method returns 803 * {@code true}, and the current thread is attempting to acquire in 804 * shared mode (that is, this method is invoked from {@link 805 * #tryAcquireShared}) then it is guaranteed that the current thread 806 * is not the first queued thread. Used only as a heuristic in 807 * ReentrantReadWriteLock. 808 */ 809 final boolean apparentlyFirstQueuedIsExclusive() { 810 Node h, s; 811 return (h = head) != null && (s = h.next) != null && 812 !(s instanceof SharedNode) && s.waiter != null; 813 } 814 815 /** 816 * Queries whether any threads have been waiting to acquire longer 817 * than the current thread. 818 * 819 * <p>An invocation of this method is equivalent to (but may be 820 * more efficient than): 821 * <pre> {@code 822 * getFirstQueuedThread() != Thread.currentThread() 823 * && hasQueuedThreads()}</pre> 824 * 825 * <p>Note that because cancellations due to interrupts and 826 * timeouts may occur at any time, a {@code true} return does not 827 * guarantee that some other thread will acquire before the current 828 * thread. Likewise, it is possible for another thread to win a 829 * race to enqueue after this method has returned {@code false}, 830 * due to the queue being empty. 831 * 832 * <p>This method is designed to be used by a fair synchronizer to 833 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 834 * Such a synchronizer's {@link #tryAcquire} method should return 835 * {@code false}, and its {@link #tryAcquireShared} method should 836 * return a negative value, if this method returns {@code true} 837 * (unless this is a reentrant acquire). For example, the {@code 838 * tryAcquire} method for a fair, reentrant, exclusive mode 839 * synchronizer might look like this: 840 * 841 * <pre> {@code 842 * protected boolean tryAcquire(long arg) { 843 * if (isHeldExclusively()) { 844 * // A reentrant acquire; increment hold count 845 * return true; 846 * } else if (hasQueuedPredecessors()) { 847 * return false; 848 * } else { 849 * // try to acquire normally 850 * } 851 * }}</pre> 852 * 853 * @return {@code true} if there is a queued thread preceding the 854 * current thread, and {@code false} if the current thread 855 * is at the head of the queue or the queue is empty 856 * @since 1.7 857 */ 858 public final boolean hasQueuedPredecessors() { 859 Thread first = null; Node h, s; 860 if ((h = head) != null && ((s = h.next) == null || 861 (first = s.waiter) == null || 862 s.prev == null)) 863 first = getFirstQueuedThread(); // retry via getFirstQueuedThread 864 return first != null && first != Thread.currentThread(); 865 } 866 867 // Instrumentation and monitoring methods 868 869 /** 870 * Returns an estimate of the number of threads waiting to 871 * acquire. The value is only an estimate because the number of 872 * threads may change dynamically while this method traverses 873 * internal data structures. This method is designed for use in 874 * monitoring system state, not for synchronization control. 875 * 876 * @return the estimated number of threads waiting to acquire 877 */ 878 public final int getQueueLength() { 879 int n = 0; 880 for (Node p = tail; p != null; p = p.prev) { 881 if (p.waiter != null) 882 ++n; 883 } 884 return n; 885 } 886 887 /** 888 * Returns a collection containing threads that may be waiting to 889 * acquire. Because the actual set of threads may change 890 * dynamically while constructing this result, the returned 891 * collection is only a best-effort estimate. The elements of the 892 * returned collection are in no particular order. This method is 893 * designed to facilitate construction of subclasses that provide 894 * more extensive monitoring facilities. 895 * 896 * @return the collection of threads 897 */ 898 public final Collection<Thread> getQueuedThreads() { 899 ArrayList<Thread> list = new ArrayList<>(); 900 for (Node p = tail; p != null; p = p.prev) { 901 Thread t = p.waiter; 902 if (t != null) 903 list.add(t); 904 } 905 return list; 906 } 907 908 /** 909 * Returns a collection containing threads that may be waiting to 910 * acquire in exclusive mode. This has the same properties 911 * as {@link #getQueuedThreads} except that it only returns 912 * those threads waiting due to an exclusive acquire. 913 * 914 * @return the collection of threads 915 */ 916 public final Collection<Thread> getExclusiveQueuedThreads() { 917 ArrayList<Thread> list = new ArrayList<>(); 918 for (Node p = tail; p != null; p = p.prev) { 919 if (!(p instanceof SharedNode)) { 920 Thread t = p.waiter; 921 if (t != null) 922 list.add(t); 923 } 924 } 925 return list; 926 } 927 928 /** 929 * Returns a collection containing threads that may be waiting to 930 * acquire in shared mode. This has the same properties 931 * as {@link #getQueuedThreads} except that it only returns 932 * those threads waiting due to a shared acquire. 933 * 934 * @return the collection of threads 935 */ 936 public final Collection<Thread> getSharedQueuedThreads() { 937 ArrayList<Thread> list = new ArrayList<>(); 938 for (Node p = tail; p != null; p = p.prev) { 939 if (p instanceof SharedNode) { 940 Thread t = p.waiter; 941 if (t != null) 942 list.add(t); 943 } 944 } 945 return list; 946 } 947 948 /** 949 * Returns a string identifying this synchronizer, as well as its state. 950 * The state, in brackets, includes the String {@code "State ="} 951 * followed by the current value of {@link #getState}, and either 952 * {@code "nonempty"} or {@code "empty"} depending on whether the 953 * queue is empty. 954 * 955 * @return a string identifying this synchronizer, as well as its state 956 */ 957 public String toString() { 958 return super.toString() 959 + "[State = " + getState() + ", " 960 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; 961 } 962 963 // Instrumentation methods for conditions 964 965 /** 966 * Queries whether the given ConditionObject 967 * uses this synchronizer as its lock. 968 * 969 * @param condition the condition 970 * @return {@code true} if owned 971 * @throws NullPointerException if the condition is null 972 */ 973 public final boolean owns(ConditionObject condition) { 974 return condition.isOwnedBy(this); 975 } 976 977 /** 978 * Queries whether any threads are waiting on the given condition 979 * associated with this synchronizer. Note that because timeouts 980 * and interrupts may occur at any time, a {@code true} return 981 * does not guarantee that a future {@code signal} will awaken 982 * any threads. This method is designed primarily for use in 983 * monitoring of the system state. 984 * 985 * @param condition the condition 986 * @return {@code true} if there are any waiting threads 987 * @throws IllegalMonitorStateException if exclusive synchronization 988 * is not held 989 * @throws IllegalArgumentException if the given condition is 990 * not associated with this synchronizer 991 * @throws NullPointerException if the condition is null 992 */ 993 public final boolean hasWaiters(ConditionObject condition) { 994 if (!owns(condition)) 995 throw new IllegalArgumentException("Not owner"); 996 return condition.hasWaiters(); 997 } 998 999 /** 1000 * Returns an estimate of the number of threads waiting on the 1001 * given condition associated with this synchronizer. Note that 1002 * because timeouts and interrupts may occur at any time, the 1003 * estimate serves only as an upper bound on the actual number of 1004 * waiters. This method is designed for use in monitoring system 1005 * state, not for synchronization control. 1006 * 1007 * @param condition the condition 1008 * @return the estimated number of waiting threads 1009 * @throws IllegalMonitorStateException if exclusive synchronization 1010 * is not held 1011 * @throws IllegalArgumentException if the given condition is 1012 * not associated with this synchronizer 1013 * @throws NullPointerException if the condition is null 1014 */ 1015 public final int getWaitQueueLength(ConditionObject condition) { 1016 if (!owns(condition)) 1017 throw new IllegalArgumentException("Not owner"); 1018 return condition.getWaitQueueLength(); 1019 } 1020 1021 /** 1022 * Returns a collection containing those threads that may be 1023 * waiting on the given condition associated with this 1024 * synchronizer. Because the actual set of threads may change 1025 * dynamically while constructing this result, the returned 1026 * collection is only a best-effort estimate. The elements of the 1027 * returned collection are in no particular order. 1028 * 1029 * @param condition the condition 1030 * @return the collection of threads 1031 * @throws IllegalMonitorStateException if exclusive synchronization 1032 * is not held 1033 * @throws IllegalArgumentException if the given condition is 1034 * not associated with this synchronizer 1035 * @throws NullPointerException if the condition is null 1036 */ 1037 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1038 if (!owns(condition)) 1039 throw new IllegalArgumentException("Not owner"); 1040 return condition.getWaitingThreads(); 1041 } 1042 1043 /** 1044 * Condition implementation for a {@link AbstractQueuedLongSynchronizer} 1045 * serving as the basis of a {@link Lock} implementation. 1046 * 1047 * <p>Method documentation for this class describes mechanics, 1048 * not behavioral specifications from the point of view of Lock 1049 * and Condition users. Exported versions of this class will in 1050 * general need to be accompanied by documentation describing 1051 * condition semantics that rely on those of the associated 1052 * {@code AbstractQueuedLongSynchronizer}. 1053 * 1054 * <p>This class is Serializable, but all fields are transient, 1055 * so deserialized conditions have no waiters. 1056 */ 1057 public class ConditionObject implements Condition, java.io.Serializable { 1058 private static final long serialVersionUID = 1173984872572414699L; 1059 /** First node of condition queue. */ 1060 private transient ConditionNode firstWaiter; 1061 /** Last node of condition queue. */ 1062 private transient ConditionNode lastWaiter; 1063 1064 /** 1065 * Creates a new {@code ConditionObject} instance. 1066 */ 1067 public ConditionObject() { } 1068 1069 // Signalling methods 1070 1071 /** 1072 * Removes and transfers one or all waiters to sync queue. 1073 */ 1074 private void doSignal(ConditionNode first, boolean all) { 1075 while (first != null) { 1076 ConditionNode next = first.nextWaiter; 1077 if ((firstWaiter = next) == null) 1078 lastWaiter = null; 1079 if ((first.getAndUnsetStatus(COND) & COND) != 0) { 1080 enqueue(first); 1081 if (!all) 1082 break; 1083 } 1084 first = next; 1085 } 1086 } 1087 1088 /** 1089 * Moves the longest-waiting thread, if one exists, from the 1090 * wait queue for this condition to the wait queue for the 1091 * owning lock. 1092 * 1093 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1094 * returns {@code false} 1095 */ 1096 public final void signal() { 1097 ConditionNode first = firstWaiter; 1098 if (!isHeldExclusively()) 1099 throw new IllegalMonitorStateException(); 1100 if (first != null) 1101 doSignal(first, false); 1102 } 1103 1104 /** 1105 * Moves all threads from the wait queue for this condition to 1106 * the wait queue for the owning lock. 1107 * 1108 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1109 * returns {@code false} 1110 */ 1111 public final void signalAll() { 1112 ConditionNode first = firstWaiter; 1113 if (!isHeldExclusively()) 1114 throw new IllegalMonitorStateException(); 1115 if (first != null) 1116 doSignal(first, true); 1117 } 1118 1119 // Waiting methods 1120 1121 /** 1122 * Adds node to condition list and releases lock. 1123 * 1124 * @param node the node 1125 * @return savedState to reacquire after wait 1126 */ 1127 private long enableWait(ConditionNode node) { 1128 if (isHeldExclusively()) { 1129 node.waiter = Thread.currentThread(); 1130 node.setStatusRelaxed(COND | WAITING); 1131 ConditionNode last = lastWaiter; 1132 if (last == null) 1133 firstWaiter = node; 1134 else 1135 last.nextWaiter = node; 1136 lastWaiter = node; 1137 long savedState = getState(); 1138 if (release(savedState)) 1139 return savedState; 1140 } 1141 node.status = CANCELLED; // lock not held or inconsistent 1142 throw new IllegalMonitorStateException(); 1143 } 1144 1145 /** 1146 * Returns true if a node that was initially placed on a condition 1147 * queue is now ready to reacquire on sync queue. 1148 * @param node the node 1149 * @return true if is reacquiring 1150 */ 1151 private boolean canReacquire(ConditionNode node) { 1152 // check links, not status to avoid enqueue race 1153 return node != null && node.prev != null && isEnqueued(node); 1154 } 1155 1156 /** 1157 * Unlinks the given node and other non-waiting nodes from 1158 * condition queue unless already unlinked. 1159 */ 1160 private void unlinkCancelledWaiters(ConditionNode node) { 1161 if (node == null || node.nextWaiter != null || node == lastWaiter) { 1162 ConditionNode w = firstWaiter, trail = null; 1163 while (w != null) { 1164 ConditionNode next = w.nextWaiter; 1165 if ((w.status & COND) == 0) { 1166 w.nextWaiter = null; 1167 if (trail == null) 1168 firstWaiter = next; 1169 else 1170 trail.nextWaiter = next; 1171 if (next == null) 1172 lastWaiter = trail; 1173 } else 1174 trail = w; 1175 w = next; 1176 } 1177 } 1178 } 1179 1180 /** 1181 * Implements uninterruptible condition wait. 1182 * <ol> 1183 * <li>Save lock state returned by {@link #getState}. 1184 * <li>Invoke {@link #release} with saved state as argument, 1185 * throwing IllegalMonitorStateException if it fails. 1186 * <li>Block until signalled. 1187 * <li>Reacquire by invoking specialized version of 1188 * {@link #acquire} with saved state as argument. 1189 * </ol> 1190 */ 1191 public final void awaitUninterruptibly() { 1192 ConditionNode node = new ConditionNode(); 1193 long savedState = enableWait(node); 1194 LockSupport.setCurrentBlocker(this); // for back-compatibility 1195 boolean interrupted = false; 1196 while (!canReacquire(node)) { 1197 if (Thread.interrupted()) 1198 interrupted = true; 1199 else if ((node.status & COND) != 0) { 1200 try { 1201 ForkJoinPool.managedBlock(node); 1202 } catch (InterruptedException ie) { 1203 interrupted = true; 1204 } 1205 } else 1206 Thread.onSpinWait(); // awoke while enqueuing 1207 } 1208 LockSupport.setCurrentBlocker(null); 1209 node.clearStatus(); 1210 acquire(node, savedState, false, false, false, 0L); 1211 if (interrupted) 1212 Thread.currentThread().interrupt(); 1213 } 1214 1215 /** 1216 * Implements interruptible condition wait. 1217 * <ol> 1218 * <li>If current thread is interrupted, throw InterruptedException. 1219 * <li>Save lock state returned by {@link #getState}. 1220 * <li>Invoke {@link #release} with saved state as argument, 1221 * throwing IllegalMonitorStateException if it fails. 1222 * <li>Block until signalled or interrupted. 1223 * <li>Reacquire by invoking specialized version of 1224 * {@link #acquire} with saved state as argument. 1225 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1226 * </ol> 1227 */ 1228 public final void await() throws InterruptedException { 1229 if (Thread.interrupted()) 1230 throw new InterruptedException(); 1231 ConditionNode node = new ConditionNode(); 1232 long savedState = enableWait(node); 1233 LockSupport.setCurrentBlocker(this); // for back-compatibility 1234 boolean interrupted = false, cancelled = false; 1235 while (!canReacquire(node)) { 1236 if (interrupted |= Thread.interrupted()) { 1237 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1238 break; // else interrupted after signal 1239 } else if ((node.status & COND) != 0) { 1240 try { 1241 ForkJoinPool.managedBlock(node); 1242 } catch (InterruptedException ie) { 1243 interrupted = true; 1244 } 1245 } else 1246 Thread.onSpinWait(); // awoke while enqueuing 1247 } 1248 LockSupport.setCurrentBlocker(null); 1249 node.clearStatus(); 1250 acquire(node, savedState, false, false, false, 0L); 1251 if (interrupted) { 1252 if (cancelled) { 1253 unlinkCancelledWaiters(node); 1254 throw new InterruptedException(); 1255 } 1256 Thread.currentThread().interrupt(); 1257 } 1258 } 1259 1260 /** 1261 * Implements timed condition wait. 1262 * <ol> 1263 * <li>If current thread is interrupted, throw InterruptedException. 1264 * <li>Save lock state returned by {@link #getState}. 1265 * <li>Invoke {@link #release} with saved state as argument, 1266 * throwing IllegalMonitorStateException if it fails. 1267 * <li>Block until signalled, interrupted, or timed out. 1268 * <li>Reacquire by invoking specialized version of 1269 * {@link #acquire} with saved state as argument. 1270 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1271 * </ol> 1272 */ 1273 public final long awaitNanos(long nanosTimeout) 1274 throws InterruptedException { 1275 if (Thread.interrupted()) 1276 throw new InterruptedException(); 1277 ConditionNode node = new ConditionNode(); 1278 long savedState = enableWait(node); 1279 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1280 long deadline = System.nanoTime() + nanos; 1281 boolean cancelled = false, interrupted = false; 1282 while (!canReacquire(node)) { 1283 if ((interrupted |= Thread.interrupted()) || 1284 (nanos = deadline - System.nanoTime()) <= 0L) { 1285 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1286 break; 1287 } else 1288 LockSupport.parkNanos(this, nanos); 1289 } 1290 node.clearStatus(); 1291 acquire(node, savedState, false, false, false, 0L); 1292 if (cancelled) { 1293 unlinkCancelledWaiters(node); 1294 if (interrupted) 1295 throw new InterruptedException(); 1296 } else if (interrupted) 1297 Thread.currentThread().interrupt(); 1298 long remaining = deadline - System.nanoTime(); // avoid overflow 1299 return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; 1300 } 1301 1302 /** 1303 * Implements absolute timed condition wait. 1304 * <ol> 1305 * <li>If current thread is interrupted, throw InterruptedException. 1306 * <li>Save lock state returned by {@link #getState}. 1307 * <li>Invoke {@link #release} with saved state as argument, 1308 * throwing IllegalMonitorStateException if it fails. 1309 * <li>Block until signalled, interrupted, or timed out. 1310 * <li>Reacquire by invoking specialized version of 1311 * {@link #acquire} with saved state as argument. 1312 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1313 * <li>If timed out while blocked in step 4, return false, else true. 1314 * </ol> 1315 */ 1316 public final boolean awaitUntil(Date deadline) 1317 throws InterruptedException { 1318 long abstime = deadline.getTime(); 1319 if (Thread.interrupted()) 1320 throw new InterruptedException(); 1321 ConditionNode node = new ConditionNode(); 1322 long savedState = enableWait(node); 1323 boolean cancelled = false, interrupted = false; 1324 while (!canReacquire(node)) { 1325 if ((interrupted |= Thread.interrupted()) || 1326 System.currentTimeMillis() >= abstime) { 1327 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1328 break; 1329 } else 1330 LockSupport.parkUntil(this, abstime); 1331 } 1332 node.clearStatus(); 1333 acquire(node, savedState, false, false, false, 0L); 1334 if (cancelled) { 1335 unlinkCancelledWaiters(node); 1336 if (interrupted) 1337 throw new InterruptedException(); 1338 } else if (interrupted) 1339 Thread.currentThread().interrupt(); 1340 return !cancelled; 1341 } 1342 1343 /** 1344 * Implements timed condition wait. 1345 * <ol> 1346 * <li>If current thread is interrupted, throw InterruptedException. 1347 * <li>Save lock state returned by {@link #getState}. 1348 * <li>Invoke {@link #release} with saved state as argument, 1349 * throwing IllegalMonitorStateException if it fails. 1350 * <li>Block until signalled, interrupted, or timed out. 1351 * <li>Reacquire by invoking specialized version of 1352 * {@link #acquire} with saved state as argument. 1353 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1354 * <li>If timed out while blocked in step 4, return false, else true. 1355 * </ol> 1356 */ 1357 public final boolean await(long time, TimeUnit unit) 1358 throws InterruptedException { 1359 long nanosTimeout = unit.toNanos(time); 1360 if (Thread.interrupted()) 1361 throw new InterruptedException(); 1362 ConditionNode node = new ConditionNode(); 1363 long savedState = enableWait(node); 1364 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1365 long deadline = System.nanoTime() + nanos; 1366 boolean cancelled = false, interrupted = false; 1367 while (!canReacquire(node)) { 1368 if ((interrupted |= Thread.interrupted()) || 1369 (nanos = deadline - System.nanoTime()) <= 0L) { 1370 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1371 break; 1372 } else 1373 LockSupport.parkNanos(this, nanos); 1374 } 1375 node.clearStatus(); 1376 acquire(node, savedState, false, false, false, 0L); 1377 if (cancelled) { 1378 unlinkCancelledWaiters(node); 1379 if (interrupted) 1380 throw new InterruptedException(); 1381 } else if (interrupted) 1382 Thread.currentThread().interrupt(); 1383 return !cancelled; 1384 } 1385 1386 // support for instrumentation 1387 1388 /** 1389 * Returns true if this condition was created by the given 1390 * synchronization object. 1391 * 1392 * @return {@code true} if owned 1393 */ 1394 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { 1395 return sync == AbstractQueuedLongSynchronizer.this; 1396 } 1397 1398 /** 1399 * Queries whether any threads are waiting on this condition. 1400 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}. 1401 * 1402 * @return {@code true} if there are any waiting threads 1403 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1404 * returns {@code false} 1405 */ 1406 protected final boolean hasWaiters() { 1407 if (!isHeldExclusively()) 1408 throw new IllegalMonitorStateException(); 1409 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1410 if ((w.status & COND) != 0) 1411 return true; 1412 } 1413 return false; 1414 } 1415 1416 /** 1417 * Returns an estimate of the number of threads waiting on 1418 * this condition. 1419 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}. 1420 * 1421 * @return the estimated number of waiting threads 1422 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1423 * returns {@code false} 1424 */ 1425 protected final int getWaitQueueLength() { 1426 if (!isHeldExclusively()) 1427 throw new IllegalMonitorStateException(); 1428 int n = 0; 1429 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1430 if ((w.status & COND) != 0) 1431 ++n; 1432 } 1433 return n; 1434 } 1435 1436 /** 1437 * Returns a collection containing those threads that may be 1438 * waiting on this Condition. 1439 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}. 1440 * 1441 * @return the collection of threads 1442 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1443 * returns {@code false} 1444 */ 1445 protected final Collection<Thread> getWaitingThreads() { 1446 if (!isHeldExclusively()) 1447 throw new IllegalMonitorStateException(); 1448 ArrayList<Thread> list = new ArrayList<>(); 1449 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1450 if ((w.status & COND) != 0) { 1451 Thread t = w.waiter; 1452 if (t != null) 1453 list.add(t); 1454 } 1455 } 1456 return list; 1457 } 1458 } 1459 1460 // Unsafe 1461 private static final Unsafe U = Unsafe.getUnsafe(); 1462 private static final long STATE 1463 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state"); 1464 private static final long HEAD 1465 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head"); 1466 private static final long TAIL 1467 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail"); 1468 1469 static { 1470 Class<?> ensureLoaded = LockSupport.class; 1471 } 1472 }