1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent.locks; 37 38 import java.util.ArrayList; 39 import java.util.Collection; 40 import java.util.Date; 41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node; 43 44 /** 45 * A version of {@link AbstractQueuedSynchronizer} in 46 * which synchronization state is maintained as a {@code long}. 47 * This class has exactly the same structure, properties, and methods 48 * as {@code AbstractQueuedSynchronizer} with the exception 49 * that all state-related parameters and results are defined 50 * as {@code long} rather than {@code int}. This class 51 * may be useful when creating synchronizers such as 52 * multilevel locks and barriers that require 53 * 64 bits of state. 54 * 55 * <p>See {@link AbstractQueuedSynchronizer} for usage 56 * notes and examples. 57 * 58 * @since 1.6 59 * @author Doug Lea 60 */ 61 public abstract class AbstractQueuedLongSynchronizer 62 extends AbstractOwnableSynchronizer 63 implements java.io.Serializable { 64 65 private static final long serialVersionUID = 7373984972572414692L; 66 67 /* 68 To keep sources in sync, the remainder of this source file is 69 exactly cloned from AbstractQueuedSynchronizer, replacing class 70 name and changing ints related with sync state to longs. Please 71 keep it that way. 72 */ 73 74 /** 75 * Creates a new {@code AbstractQueuedLongSynchronizer} instance 76 * with initial synchronization state of zero. 77 */ 78 protected AbstractQueuedLongSynchronizer() { } 79 80 /** 81 * Head of the wait queue, lazily initialized. Except for 82 * initialization, it is modified only via method setHead. Note: 83 * If head exists, its waitStatus is guaranteed not to be 84 * CANCELLED. 85 */ 86 private transient volatile Node head; 87 88 /** 89 * Tail of the wait queue, lazily initialized. Modified only via 90 * method enq to add new wait node. 91 */ 92 private transient volatile Node tail; 93 94 /** 95 * The synchronization state. 96 */ 97 private volatile long state; 98 99 /** 100 * Returns the current value of synchronization state. 101 * This operation has memory semantics of a {@code volatile} read. 102 * @return current state value 103 */ 104 protected final long getState() { 105 return state; 106 } 107 108 /** 109 * Sets the value of synchronization state. 110 * This operation has memory semantics of a {@code volatile} write. 111 * @param newState the new state value 112 */ 113 protected final void setState(long newState) { 114 // Use putLongVolatile instead of ordinary volatile store when 115 // using compareAndSwapLong, for sake of some 32bit systems. 116 U.putLongVolatile(this, STATE, newState); 117 } 118 119 /** 120 * Atomically sets synchronization state to the given updated 121 * value if the current state value equals the expected value. 122 * This operation has memory semantics of a {@code volatile} read 123 * and write. 124 * 125 * @param expect the expected value 126 * @param update the new value 127 * @return {@code true} if successful. False return indicates that the actual 128 * value was not equal to the expected value. 129 */ 130 protected final boolean compareAndSetState(long expect, long update) { 131 return U.compareAndSwapLong(this, STATE, expect, update); 132 } 133 134 // Queuing utilities 135 136 /** 137 * The number of nanoseconds for which it is faster to spin 138 * rather than to use timed park. A rough estimate suffices 139 * to improve responsiveness with very short timeouts. 140 */ 141 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; 142 143 /** 144 * Inserts node into queue, initializing if necessary. See picture above. 145 * @param node the node to insert 146 * @return node's predecessor 147 */ 148 private Node enq(Node node) { 149 for (;;) { 150 Node oldTail = tail; 151 if (oldTail != null) { 152 U.putObject(node, Node.PREV, oldTail); 153 if (compareAndSetTail(oldTail, node)) { 154 oldTail.next = node; 155 return oldTail; 156 } 157 } else { 158 initializeSyncQueue(); 159 } 160 } 161 } 162 163 /** 164 * Creates and enqueues node for current thread and given mode. 165 * 166 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 167 * @return the new node 168 */ 169 private Node addWaiter(Node mode) { 170 Node node = new Node(mode); 171 172 for (;;) { 173 Node oldTail = tail; 174 if (oldTail != null) { 175 U.putObject(node, Node.PREV, oldTail); 176 if (compareAndSetTail(oldTail, node)) { 177 oldTail.next = node; 178 return node; 179 } 180 } else { 181 initializeSyncQueue(); 182 } 183 } 184 } 185 186 /** 187 * Sets head of queue to be node, thus dequeuing. Called only by 188 * acquire methods. Also nulls out unused fields for sake of GC 189 * and to suppress unnecessary signals and traversals. 190 * 191 * @param node the node 192 */ 193 private void setHead(Node node) { 194 head = node; 195 node.thread = null; 196 node.prev = null; 197 } 198 199 /** 200 * Wakes up node's successor, if one exists. 201 * 202 * @param node the node 203 */ 204 private void unparkSuccessor(Node node) { 205 /* 206 * If status is negative (i.e., possibly needing signal) try 207 * to clear in anticipation of signalling. It is OK if this 208 * fails or if status is changed by waiting thread. 209 */ 210 int ws = node.waitStatus; 211 if (ws < 0) 212 node.compareAndSetWaitStatus(ws, 0); 213 214 /* 215 * Thread to unpark is held in successor, which is normally 216 * just the next node. But if cancelled or apparently null, 217 * traverse backwards from tail to find the actual 218 * non-cancelled successor. 219 */ 220 Node s = node.next; 221 if (s == null || s.waitStatus > 0) { 222 s = null; 223 for (Node p = tail; p != node && p != null; p = p.prev) 224 if (p.waitStatus <= 0) 225 s = p; 226 } 227 if (s != null) 228 LockSupport.unpark(s.thread); 229 } 230 231 /** 232 * Release action for shared mode -- signals successor and ensures 233 * propagation. (Note: For exclusive mode, release just amounts 234 * to calling unparkSuccessor of head if it needs signal.) 235 */ 236 private void doReleaseShared() { 237 /* 238 * Ensure that a release propagates, even if there are other 239 * in-progress acquires/releases. This proceeds in the usual 240 * way of trying to unparkSuccessor of head if it needs 241 * signal. But if it does not, status is set to PROPAGATE to 242 * ensure that upon release, propagation continues. 243 * Additionally, we must loop in case a new node is added 244 * while we are doing this. Also, unlike other uses of 245 * unparkSuccessor, we need to know if CAS to reset status 246 * fails, if so rechecking. 247 */ 248 for (;;) { 249 Node h = head; 250 if (h != null && h != tail) { 251 int ws = h.waitStatus; 252 if (ws == Node.SIGNAL) { 253 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) 254 continue; // loop to recheck cases 255 unparkSuccessor(h); 256 } 257 else if (ws == 0 && 258 !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) 259 continue; // loop on failed CAS 260 } 261 if (h == head) // loop if head changed 262 break; 263 } 264 } 265 266 /** 267 * Sets head of queue, and checks if successor may be waiting 268 * in shared mode, if so propagating if either propagate > 0 or 269 * PROPAGATE status was set. 270 * 271 * @param node the node 272 * @param propagate the return value from a tryAcquireShared 273 */ 274 private void setHeadAndPropagate(Node node, long propagate) { 275 Node h = head; // Record old head for check below 276 setHead(node); 277 /* 278 * Try to signal next queued node if: 279 * Propagation was indicated by caller, 280 * or was recorded (as h.waitStatus either before 281 * or after setHead) by a previous operation 282 * (note: this uses sign-check of waitStatus because 283 * PROPAGATE status may transition to SIGNAL.) 284 * and 285 * The next node is waiting in shared mode, 286 * or we don't know, because it appears null 287 * 288 * The conservatism in both of these checks may cause 289 * unnecessary wake-ups, but only when there are multiple 290 * racing acquires/releases, so most need signals now or soon 291 * anyway. 292 */ 293 if (propagate > 0 || h == null || h.waitStatus < 0 || 294 (h = head) == null || h.waitStatus < 0) { 295 Node s = node.next; 296 if (s == null || s.isShared()) 297 doReleaseShared(); 298 } 299 } 300 301 // Utilities for various versions of acquire 302 303 /** 304 * Cancels an ongoing attempt to acquire. 305 * 306 * @param node the node 307 */ 308 private void cancelAcquire(Node node) { 309 // Ignore if node doesn't exist 310 if (node == null) 311 return; 312 313 node.thread = null; 314 315 // Skip cancelled predecessors 316 Node pred = node.prev; 317 while (pred.waitStatus > 0) 318 node.prev = pred = pred.prev; 319 320 // predNext is the apparent node to unsplice. CASes below will 321 // fail if not, in which case, we lost race vs another cancel 322 // or signal, so no further action is necessary. 323 Node predNext = pred.next; 324 325 // Can use unconditional write instead of CAS here. 326 // After this atomic step, other Nodes can skip past us. 327 // Before, we are free of interference from other threads. 328 node.waitStatus = Node.CANCELLED; 329 330 // If we are the tail, remove ourselves. 331 if (node == tail && compareAndSetTail(node, pred)) { 332 pred.compareAndSetNext(predNext, null); 333 } else { 334 // If successor needs signal, try to set pred's next-link 335 // so it will get one. Otherwise wake it up to propagate. 336 int ws; 337 if (pred != head && 338 ((ws = pred.waitStatus) == Node.SIGNAL || 339 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && 340 pred.thread != null) { 341 Node next = node.next; 342 if (next != null && next.waitStatus <= 0) 343 pred.compareAndSetNext(predNext, next); 344 } else { 345 unparkSuccessor(node); 346 } 347 348 node.next = node; // help GC 349 } 350 } 351 352 /** 353 * Checks and updates status for a node that failed to acquire. 354 * Returns true if thread should block. This is the main signal 355 * control in all acquire loops. Requires that pred == node.prev. 356 * 357 * @param pred node's predecessor holding status 358 * @param node the node 359 * @return {@code true} if thread should block 360 */ 361 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 362 int ws = pred.waitStatus; 363 if (ws == Node.SIGNAL) 364 /* 365 * This node has already set status asking a release 366 * to signal it, so it can safely park. 367 */ 368 return true; 369 if (ws > 0) { 370 /* 371 * Predecessor was cancelled. Skip over predecessors and 372 * indicate retry. 373 */ 374 do { 375 node.prev = pred = pred.prev; 376 } while (pred.waitStatus > 0); 377 pred.next = node; 378 } else { 379 /* 380 * waitStatus must be 0 or PROPAGATE. Indicate that we 381 * need a signal, but don't park yet. Caller will need to 382 * retry to make sure it cannot acquire before parking. 383 */ 384 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); 385 } 386 return false; 387 } 388 389 /** 390 * Convenience method to interrupt current thread. 391 */ 392 static void selfInterrupt() { 393 Thread.currentThread().interrupt(); 394 } 395 396 /** 397 * Convenience method to park and then check if interrupted. 398 * 399 * @return {@code true} if interrupted 400 */ 401 private final boolean parkAndCheckInterrupt() { 402 LockSupport.park(this); 403 return Thread.interrupted(); 404 } 405 406 /* 407 * Various flavors of acquire, varying in exclusive/shared and 408 * control modes. Each is mostly the same, but annoyingly 409 * different. Only a little bit of factoring is possible due to 410 * interactions of exception mechanics (including ensuring that we 411 * cancel if tryAcquire throws exception) and other control, at 412 * least not without hurting performance too much. 413 */ 414 415 /** 416 * Acquires in exclusive uninterruptible mode for thread already in 417 * queue. Used by condition wait methods as well as acquire. 418 * 419 * @param node the node 420 * @param arg the acquire argument 421 * @return {@code true} if interrupted while waiting 422 */ 423 final boolean acquireQueued(final Node node, long arg) { 424 try { 425 boolean interrupted = false; 426 for (;;) { 427 final Node p = node.predecessor(); 428 if (p == head && tryAcquire(arg)) { 429 setHead(node); 430 p.next = null; // help GC 431 return interrupted; 432 } 433 if (shouldParkAfterFailedAcquire(p, node) && 434 parkAndCheckInterrupt()) 435 interrupted = true; 436 } 437 } catch (Throwable t) { 438 cancelAcquire(node); 439 throw t; 440 } 441 } 442 443 /** 444 * Acquires in exclusive interruptible mode. 445 * @param arg the acquire argument 446 */ 447 private void doAcquireInterruptibly(long arg) 448 throws InterruptedException { 449 final Node node = addWaiter(Node.EXCLUSIVE); 450 try { 451 for (;;) { 452 final Node p = node.predecessor(); 453 if (p == head && tryAcquire(arg)) { 454 setHead(node); 455 p.next = null; // help GC 456 return; 457 } 458 if (shouldParkAfterFailedAcquire(p, node) && 459 parkAndCheckInterrupt()) 460 throw new InterruptedException(); 461 } 462 } catch (Throwable t) { 463 cancelAcquire(node); 464 throw t; 465 } 466 } 467 468 /** 469 * Acquires in exclusive timed mode. 470 * 471 * @param arg the acquire argument 472 * @param nanosTimeout max wait time 473 * @return {@code true} if acquired 474 */ 475 private boolean doAcquireNanos(long arg, long nanosTimeout) 476 throws InterruptedException { 477 if (nanosTimeout <= 0L) 478 return false; 479 final long deadline = System.nanoTime() + nanosTimeout; 480 final Node node = addWaiter(Node.EXCLUSIVE); 481 try { 482 for (;;) { 483 final Node p = node.predecessor(); 484 if (p == head && tryAcquire(arg)) { 485 setHead(node); 486 p.next = null; // help GC 487 return true; 488 } 489 nanosTimeout = deadline - System.nanoTime(); 490 if (nanosTimeout <= 0L) { 491 cancelAcquire(node); 492 return false; 493 } 494 if (shouldParkAfterFailedAcquire(p, node) && 495 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 496 LockSupport.parkNanos(this, nanosTimeout); 497 if (Thread.interrupted()) 498 throw new InterruptedException(); 499 } 500 } catch (Throwable t) { 501 cancelAcquire(node); 502 throw t; 503 } 504 } 505 506 /** 507 * Acquires in shared uninterruptible mode. 508 * @param arg the acquire argument 509 */ 510 private void doAcquireShared(long arg) { 511 final Node node = addWaiter(Node.SHARED); 512 try { 513 boolean interrupted = false; 514 for (;;) { 515 final Node p = node.predecessor(); 516 if (p == head) { 517 long r = tryAcquireShared(arg); 518 if (r >= 0) { 519 setHeadAndPropagate(node, r); 520 p.next = null; // help GC 521 if (interrupted) 522 selfInterrupt(); 523 return; 524 } 525 } 526 if (shouldParkAfterFailedAcquire(p, node) && 527 parkAndCheckInterrupt()) 528 interrupted = true; 529 } 530 } catch (Throwable t) { 531 cancelAcquire(node); 532 throw t; 533 } 534 } 535 536 /** 537 * Acquires in shared interruptible mode. 538 * @param arg the acquire argument 539 */ 540 private void doAcquireSharedInterruptibly(long arg) 541 throws InterruptedException { 542 final Node node = addWaiter(Node.SHARED); 543 try { 544 for (;;) { 545 final Node p = node.predecessor(); 546 if (p == head) { 547 long r = tryAcquireShared(arg); 548 if (r >= 0) { 549 setHeadAndPropagate(node, r); 550 p.next = null; // help GC 551 return; 552 } 553 } 554 if (shouldParkAfterFailedAcquire(p, node) && 555 parkAndCheckInterrupt()) 556 throw new InterruptedException(); 557 } 558 } catch (Throwable t) { 559 cancelAcquire(node); 560 throw t; 561 } 562 } 563 564 /** 565 * Acquires in shared timed mode. 566 * 567 * @param arg the acquire argument 568 * @param nanosTimeout max wait time 569 * @return {@code true} if acquired 570 */ 571 private boolean doAcquireSharedNanos(long arg, long nanosTimeout) 572 throws InterruptedException { 573 if (nanosTimeout <= 0L) 574 return false; 575 final long deadline = System.nanoTime() + nanosTimeout; 576 final Node node = addWaiter(Node.SHARED); 577 try { 578 for (;;) { 579 final Node p = node.predecessor(); 580 if (p == head) { 581 long r = tryAcquireShared(arg); 582 if (r >= 0) { 583 setHeadAndPropagate(node, r); 584 p.next = null; // help GC 585 return true; 586 } 587 } 588 nanosTimeout = deadline - System.nanoTime(); 589 if (nanosTimeout <= 0L) { 590 cancelAcquire(node); 591 return false; 592 } 593 if (shouldParkAfterFailedAcquire(p, node) && 594 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 595 LockSupport.parkNanos(this, nanosTimeout); 596 if (Thread.interrupted()) 597 throw new InterruptedException(); 598 } 599 } catch (Throwable t) { 600 cancelAcquire(node); 601 throw t; 602 } 603 } 604 605 // Main exported methods 606 607 /** 608 * Attempts to acquire in exclusive mode. This method should query 609 * if the state of the object permits it to be acquired in the 610 * exclusive mode, and if so to acquire it. 611 * 612 * <p>This method is always invoked by the thread performing 613 * acquire. If this method reports failure, the acquire method 614 * may queue the thread, if it is not already queued, until it is 615 * signalled by a release from some other thread. This can be used 616 * to implement method {@link Lock#tryLock()}. 617 * 618 * <p>The default 619 * implementation throws {@link UnsupportedOperationException}. 620 * 621 * @param arg the acquire argument. This value is always the one 622 * passed to an acquire method, or is the value saved on entry 623 * to a condition wait. The value is otherwise uninterpreted 624 * and can represent anything you like. 625 * @return {@code true} if successful. Upon success, this object has 626 * been acquired. 627 * @throws IllegalMonitorStateException if acquiring would place this 628 * synchronizer in an illegal state. This exception must be 629 * thrown in a consistent fashion for synchronization to work 630 * correctly. 631 * @throws UnsupportedOperationException if exclusive mode is not supported 632 */ 633 protected boolean tryAcquire(long arg) { 634 throw new UnsupportedOperationException(); 635 } 636 637 /** 638 * Attempts to set the state to reflect a release in exclusive 639 * mode. 640 * 641 * <p>This method is always invoked by the thread performing release. 642 * 643 * <p>The default implementation throws 644 * {@link UnsupportedOperationException}. 645 * 646 * @param arg the release argument. This value is always the one 647 * passed to a release method, or the current state value upon 648 * entry to a condition wait. The value is otherwise 649 * uninterpreted and can represent anything you like. 650 * @return {@code true} if this object is now in a fully released 651 * state, so that any waiting threads may attempt to acquire; 652 * and {@code false} otherwise. 653 * @throws IllegalMonitorStateException if releasing would place this 654 * synchronizer in an illegal state. This exception must be 655 * thrown in a consistent fashion for synchronization to work 656 * correctly. 657 * @throws UnsupportedOperationException if exclusive mode is not supported 658 */ 659 protected boolean tryRelease(long arg) { 660 throw new UnsupportedOperationException(); 661 } 662 663 /** 664 * Attempts to acquire in shared mode. This method should query if 665 * the state of the object permits it to be acquired in the shared 666 * mode, and if so to acquire it. 667 * 668 * <p>This method is always invoked by the thread performing 669 * acquire. If this method reports failure, the acquire method 670 * may queue the thread, if it is not already queued, until it is 671 * signalled by a release from some other thread. 672 * 673 * <p>The default implementation throws {@link 674 * UnsupportedOperationException}. 675 * 676 * @param arg the acquire argument. This value is always the one 677 * passed to an acquire method, or is the value saved on entry 678 * to a condition wait. The value is otherwise uninterpreted 679 * and can represent anything you like. 680 * @return a negative value on failure; zero if acquisition in shared 681 * mode succeeded but no subsequent shared-mode acquire can 682 * succeed; and a positive value if acquisition in shared 683 * mode succeeded and subsequent shared-mode acquires might 684 * also succeed, in which case a subsequent waiting thread 685 * must check availability. (Support for three different 686 * return values enables this method to be used in contexts 687 * where acquires only sometimes act exclusively.) Upon 688 * success, this object has been acquired. 689 * @throws IllegalMonitorStateException if acquiring would place this 690 * synchronizer in an illegal state. This exception must be 691 * thrown in a consistent fashion for synchronization to work 692 * correctly. 693 * @throws UnsupportedOperationException if shared mode is not supported 694 */ 695 protected long tryAcquireShared(long arg) { 696 throw new UnsupportedOperationException(); 697 } 698 699 /** 700 * Attempts to set the state to reflect a release in shared mode. 701 * 702 * <p>This method is always invoked by the thread performing release. 703 * 704 * <p>The default implementation throws 705 * {@link UnsupportedOperationException}. 706 * 707 * @param arg the release argument. This value is always the one 708 * passed to a release method, or the current state value upon 709 * entry to a condition wait. The value is otherwise 710 * uninterpreted and can represent anything you like. 711 * @return {@code true} if this release of shared mode may permit a 712 * waiting acquire (shared or exclusive) to succeed; and 713 * {@code false} otherwise 714 * @throws IllegalMonitorStateException if releasing would place this 715 * synchronizer in an illegal state. This exception must be 716 * thrown in a consistent fashion for synchronization to work 717 * correctly. 718 * @throws UnsupportedOperationException if shared mode is not supported 719 */ 720 protected boolean tryReleaseShared(long arg) { 721 throw new UnsupportedOperationException(); 722 } 723 724 /** 725 * Returns {@code true} if synchronization is held exclusively with 726 * respect to the current (calling) thread. This method is invoked 727 * upon each call to a non-waiting {@link ConditionObject} method. 728 * (Waiting methods instead invoke {@link #release}.) 729 * 730 * <p>The default implementation throws {@link 731 * UnsupportedOperationException}. This method is invoked 732 * internally only within {@link ConditionObject} methods, so need 733 * not be defined if conditions are not used. 734 * 735 * @return {@code true} if synchronization is held exclusively; 736 * {@code false} otherwise 737 * @throws UnsupportedOperationException if conditions are not supported 738 */ 739 protected boolean isHeldExclusively() { 740 throw new UnsupportedOperationException(); 741 } 742 743 /** 744 * Acquires in exclusive mode, ignoring interrupts. Implemented 745 * by invoking at least once {@link #tryAcquire}, 746 * returning on success. Otherwise the thread is queued, possibly 747 * repeatedly blocking and unblocking, invoking {@link 748 * #tryAcquire} until success. This method can be used 749 * to implement method {@link Lock#lock}. 750 * 751 * @param arg the acquire argument. This value is conveyed to 752 * {@link #tryAcquire} but is otherwise uninterpreted and 753 * can represent anything you like. 754 */ 755 public final void acquire(long arg) { 756 if (!tryAcquire(arg) && 757 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 758 selfInterrupt(); 759 } 760 761 /** 762 * Acquires in exclusive mode, aborting if interrupted. 763 * Implemented by first checking interrupt status, then invoking 764 * at least once {@link #tryAcquire}, returning on 765 * success. Otherwise the thread is queued, possibly repeatedly 766 * blocking and unblocking, invoking {@link #tryAcquire} 767 * until success or the thread is interrupted. This method can be 768 * used to implement method {@link Lock#lockInterruptibly}. 769 * 770 * @param arg the acquire argument. This value is conveyed to 771 * {@link #tryAcquire} but is otherwise uninterpreted and 772 * can represent anything you like. 773 * @throws InterruptedException if the current thread is interrupted 774 */ 775 public final void acquireInterruptibly(long arg) 776 throws InterruptedException { 777 if (Thread.interrupted()) 778 throw new InterruptedException(); 779 if (!tryAcquire(arg)) 780 doAcquireInterruptibly(arg); 781 } 782 783 /** 784 * Attempts to acquire in exclusive mode, aborting if interrupted, 785 * and failing if the given timeout elapses. Implemented by first 786 * checking interrupt status, then invoking at least once {@link 787 * #tryAcquire}, returning on success. Otherwise, the thread is 788 * queued, possibly repeatedly blocking and unblocking, invoking 789 * {@link #tryAcquire} until success or the thread is interrupted 790 * or the timeout elapses. This method can be used to implement 791 * method {@link Lock#tryLock(long, TimeUnit)}. 792 * 793 * @param arg the acquire argument. This value is conveyed to 794 * {@link #tryAcquire} but is otherwise uninterpreted and 795 * can represent anything you like. 796 * @param nanosTimeout the maximum number of nanoseconds to wait 797 * @return {@code true} if acquired; {@code false} if timed out 798 * @throws InterruptedException if the current thread is interrupted 799 */ 800 public final boolean tryAcquireNanos(long arg, long nanosTimeout) 801 throws InterruptedException { 802 if (Thread.interrupted()) 803 throw new InterruptedException(); 804 return tryAcquire(arg) || 805 doAcquireNanos(arg, nanosTimeout); 806 } 807 808 /** 809 * Releases in exclusive mode. Implemented by unblocking one or 810 * more threads if {@link #tryRelease} returns true. 811 * This method can be used to implement method {@link Lock#unlock}. 812 * 813 * @param arg the release argument. This value is conveyed to 814 * {@link #tryRelease} but is otherwise uninterpreted and 815 * can represent anything you like. 816 * @return the value returned from {@link #tryRelease} 817 */ 818 public final boolean release(long arg) { 819 if (tryRelease(arg)) { 820 Node h = head; 821 if (h != null && h.waitStatus != 0) 822 unparkSuccessor(h); 823 return true; 824 } 825 return false; 826 } 827 828 /** 829 * Acquires in shared mode, ignoring interrupts. Implemented by 830 * first invoking at least once {@link #tryAcquireShared}, 831 * returning on success. Otherwise the thread is queued, possibly 832 * repeatedly blocking and unblocking, invoking {@link 833 * #tryAcquireShared} until success. 834 * 835 * @param arg the acquire argument. This value is conveyed to 836 * {@link #tryAcquireShared} but is otherwise uninterpreted 837 * and can represent anything you like. 838 */ 839 public final void acquireShared(long arg) { 840 if (tryAcquireShared(arg) < 0) 841 doAcquireShared(arg); 842 } 843 844 /** 845 * Acquires in shared mode, aborting if interrupted. Implemented 846 * by first checking interrupt status, then invoking at least once 847 * {@link #tryAcquireShared}, returning on success. Otherwise the 848 * thread is queued, possibly repeatedly blocking and unblocking, 849 * invoking {@link #tryAcquireShared} until success or the thread 850 * is interrupted. 851 * @param arg the acquire argument. 852 * This value is conveyed to {@link #tryAcquireShared} but is 853 * otherwise uninterpreted and can represent anything 854 * you like. 855 * @throws InterruptedException if the current thread is interrupted 856 */ 857 public final void acquireSharedInterruptibly(long arg) 858 throws InterruptedException { 859 if (Thread.interrupted()) 860 throw new InterruptedException(); 861 if (tryAcquireShared(arg) < 0) 862 doAcquireSharedInterruptibly(arg); 863 } 864 865 /** 866 * Attempts to acquire in shared mode, aborting if interrupted, and 867 * failing if the given timeout elapses. Implemented by first 868 * checking interrupt status, then invoking at least once {@link 869 * #tryAcquireShared}, returning on success. Otherwise, the 870 * thread is queued, possibly repeatedly blocking and unblocking, 871 * invoking {@link #tryAcquireShared} until success or the thread 872 * is interrupted or the timeout elapses. 873 * 874 * @param arg the acquire argument. This value is conveyed to 875 * {@link #tryAcquireShared} but is otherwise uninterpreted 876 * and can represent anything you like. 877 * @param nanosTimeout the maximum number of nanoseconds to wait 878 * @return {@code true} if acquired; {@code false} if timed out 879 * @throws InterruptedException if the current thread is interrupted 880 */ 881 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) 882 throws InterruptedException { 883 if (Thread.interrupted()) 884 throw new InterruptedException(); 885 return tryAcquireShared(arg) >= 0 || 886 doAcquireSharedNanos(arg, nanosTimeout); 887 } 888 889 /** 890 * Releases in shared mode. Implemented by unblocking one or more 891 * threads if {@link #tryReleaseShared} returns true. 892 * 893 * @param arg the release argument. This value is conveyed to 894 * {@link #tryReleaseShared} but is otherwise uninterpreted 895 * and can represent anything you like. 896 * @return the value returned from {@link #tryReleaseShared} 897 */ 898 public final boolean releaseShared(long arg) { 899 if (tryReleaseShared(arg)) { 900 doReleaseShared(); 901 return true; 902 } 903 return false; 904 } 905 906 // Queue inspection methods 907 908 /** 909 * Queries whether any threads are waiting to acquire. Note that 910 * because cancellations due to interrupts and timeouts may occur 911 * at any time, a {@code true} return does not guarantee that any 912 * other thread will ever acquire. 913 * 914 * <p>In this implementation, this operation returns in 915 * constant time. 916 * 917 * @return {@code true} if there may be other threads waiting to acquire 918 */ 919 public final boolean hasQueuedThreads() { 920 return head != tail; 921 } 922 923 /** 924 * Queries whether any threads have ever contended to acquire this 925 * synchronizer; that is if an acquire method has ever blocked. 926 * 927 * <p>In this implementation, this operation returns in 928 * constant time. 929 * 930 * @return {@code true} if there has ever been contention 931 */ 932 public final boolean hasContended() { 933 return head != null; 934 } 935 936 /** 937 * Returns the first (longest-waiting) thread in the queue, or 938 * {@code null} if no threads are currently queued. 939 * 940 * <p>In this implementation, this operation normally returns in 941 * constant time, but may iterate upon contention if other threads are 942 * concurrently modifying the queue. 943 * 944 * @return the first (longest-waiting) thread in the queue, or 945 * {@code null} if no threads are currently queued 946 */ 947 public final Thread getFirstQueuedThread() { 948 // handle only fast path, else relay 949 return (head == tail) ? null : fullGetFirstQueuedThread(); 950 } 951 952 /** 953 * Version of getFirstQueuedThread called when fastpath fails. 954 */ 955 private Thread fullGetFirstQueuedThread() { 956 /* 957 * The first node is normally head.next. Try to get its 958 * thread field, ensuring consistent reads: If thread 959 * field is nulled out or s.prev is no longer head, then 960 * some other thread(s) concurrently performed setHead in 961 * between some of our reads. We try this twice before 962 * resorting to traversal. 963 */ 964 Node h, s; 965 Thread st; 966 if (((h = head) != null && (s = h.next) != null && 967 s.prev == head && (st = s.thread) != null) || 968 ((h = head) != null && (s = h.next) != null && 969 s.prev == head && (st = s.thread) != null)) 970 return st; 971 972 /* 973 * Head's next field might not have been set yet, or may have 974 * been unset after setHead. So we must check to see if tail 975 * is actually first node. If not, we continue on, safely 976 * traversing from tail back to head to find first, 977 * guaranteeing termination. 978 */ 979 980 Node t = tail; 981 Thread firstThread = null; 982 while (t != null && t != head) { 983 Thread tt = t.thread; 984 if (tt != null) 985 firstThread = tt; 986 t = t.prev; 987 } 988 return firstThread; 989 } 990 991 /** 992 * Returns true if the given thread is currently queued. 993 * 994 * <p>This implementation traverses the queue to determine 995 * presence of the given thread. 996 * 997 * @param thread the thread 998 * @return {@code true} if the given thread is on the queue 999 * @throws NullPointerException if the thread is null 1000 */ 1001 public final boolean isQueued(Thread thread) { 1002 if (thread == null) 1003 throw new NullPointerException(); 1004 for (Node p = tail; p != null; p = p.prev) 1005 if (p.thread == thread) 1006 return true; 1007 return false; 1008 } 1009 1010 /** 1011 * Returns {@code true} if the apparent first queued thread, if one 1012 * exists, is waiting in exclusive mode. If this method returns 1013 * {@code true}, and the current thread is attempting to acquire in 1014 * shared mode (that is, this method is invoked from {@link 1015 * #tryAcquireShared}) then it is guaranteed that the current thread 1016 * is not the first queued thread. Used only as a heuristic in 1017 * ReentrantReadWriteLock. 1018 */ 1019 final boolean apparentlyFirstQueuedIsExclusive() { 1020 Node h, s; 1021 return (h = head) != null && 1022 (s = h.next) != null && 1023 !s.isShared() && 1024 s.thread != null; 1025 } 1026 1027 /** 1028 * Queries whether any threads have been waiting to acquire longer 1029 * than the current thread. 1030 * 1031 * <p>An invocation of this method is equivalent to (but may be 1032 * more efficient than): 1033 * <pre> {@code 1034 * getFirstQueuedThread() != Thread.currentThread() && 1035 * hasQueuedThreads()}</pre> 1036 * 1037 * <p>Note that because cancellations due to interrupts and 1038 * timeouts may occur at any time, a {@code true} return does not 1039 * guarantee that some other thread will acquire before the current 1040 * thread. Likewise, it is possible for another thread to win a 1041 * race to enqueue after this method has returned {@code false}, 1042 * due to the queue being empty. 1043 * 1044 * <p>This method is designed to be used by a fair synchronizer to 1045 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 1046 * Such a synchronizer's {@link #tryAcquire} method should return 1047 * {@code false}, and its {@link #tryAcquireShared} method should 1048 * return a negative value, if this method returns {@code true} 1049 * (unless this is a reentrant acquire). For example, the {@code 1050 * tryAcquire} method for a fair, reentrant, exclusive mode 1051 * synchronizer might look like this: 1052 * 1053 * <pre> {@code 1054 * protected boolean tryAcquire(int arg) { 1055 * if (isHeldExclusively()) { 1056 * // A reentrant acquire; increment hold count 1057 * return true; 1058 * } else if (hasQueuedPredecessors()) { 1059 * return false; 1060 * } else { 1061 * // try to acquire normally 1062 * } 1063 * }}</pre> 1064 * 1065 * @return {@code true} if there is a queued thread preceding the 1066 * current thread, and {@code false} if the current thread 1067 * is at the head of the queue or the queue is empty 1068 * @since 1.7 1069 */ 1070 public final boolean hasQueuedPredecessors() { 1071 // The correctness of this depends on head being initialized 1072 // before tail and on head.next being accurate if the current 1073 // thread is first in queue. 1074 Node t = tail; // Read fields in reverse initialization order 1075 Node h = head; 1076 Node s; 1077 return h != t && 1078 ((s = h.next) == null || s.thread != Thread.currentThread()); 1079 } 1080 1081 1082 // Instrumentation and monitoring methods 1083 1084 /** 1085 * Returns an estimate of the number of threads waiting to 1086 * acquire. The value is only an estimate because the number of 1087 * threads may change dynamically while this method traverses 1088 * internal data structures. This method is designed for use in 1089 * monitoring system state, not for synchronization control. 1090 * 1091 * @return the estimated number of threads waiting to acquire 1092 */ 1093 public final int getQueueLength() { 1094 int n = 0; 1095 for (Node p = tail; p != null; p = p.prev) { 1096 if (p.thread != null) 1097 ++n; 1098 } 1099 return n; 1100 } 1101 1102 /** 1103 * Returns a collection containing threads that may be waiting to 1104 * acquire. Because the actual set of threads may change 1105 * dynamically while constructing this result, the returned 1106 * collection is only a best-effort estimate. The elements of the 1107 * returned collection are in no particular order. This method is 1108 * designed to facilitate construction of subclasses that provide 1109 * more extensive monitoring facilities. 1110 * 1111 * @return the collection of threads 1112 */ 1113 public final Collection<Thread> getQueuedThreads() { 1114 ArrayList<Thread> list = new ArrayList<>(); 1115 for (Node p = tail; p != null; p = p.prev) { 1116 Thread t = p.thread; 1117 if (t != null) 1118 list.add(t); 1119 } 1120 return list; 1121 } 1122 1123 /** 1124 * Returns a collection containing threads that may be waiting to 1125 * acquire in exclusive mode. This has the same properties 1126 * as {@link #getQueuedThreads} except that it only returns 1127 * those threads waiting due to an exclusive acquire. 1128 * 1129 * @return the collection of threads 1130 */ 1131 public final Collection<Thread> getExclusiveQueuedThreads() { 1132 ArrayList<Thread> list = new ArrayList<>(); 1133 for (Node p = tail; p != null; p = p.prev) { 1134 if (!p.isShared()) { 1135 Thread t = p.thread; 1136 if (t != null) 1137 list.add(t); 1138 } 1139 } 1140 return list; 1141 } 1142 1143 /** 1144 * Returns a collection containing threads that may be waiting to 1145 * acquire in shared mode. This has the same properties 1146 * as {@link #getQueuedThreads} except that it only returns 1147 * those threads waiting due to a shared acquire. 1148 * 1149 * @return the collection of threads 1150 */ 1151 public final Collection<Thread> getSharedQueuedThreads() { 1152 ArrayList<Thread> list = new ArrayList<>(); 1153 for (Node p = tail; p != null; p = p.prev) { 1154 if (p.isShared()) { 1155 Thread t = p.thread; 1156 if (t != null) 1157 list.add(t); 1158 } 1159 } 1160 return list; 1161 } 1162 1163 /** 1164 * Returns a string identifying this synchronizer, as well as its state. 1165 * The state, in brackets, includes the String {@code "State ="} 1166 * followed by the current value of {@link #getState}, and either 1167 * {@code "nonempty"} or {@code "empty"} depending on whether the 1168 * queue is empty. 1169 * 1170 * @return a string identifying this synchronizer, as well as its state 1171 */ 1172 public String toString() { 1173 return super.toString() 1174 + "[State = " + getState() + ", " 1175 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; 1176 } 1177 1178 1179 // Internal support methods for Conditions 1180 1181 /** 1182 * Returns true if a node, always one that was initially placed on 1183 * a condition queue, is now waiting to reacquire on sync queue. 1184 * @param node the node 1185 * @return true if is reacquiring 1186 */ 1187 final boolean isOnSyncQueue(Node node) { 1188 if (node.waitStatus == Node.CONDITION || node.prev == null) 1189 return false; 1190 if (node.next != null) // If has successor, it must be on queue 1191 return true; 1192 /* 1193 * node.prev can be non-null, but not yet on queue because 1194 * the CAS to place it on queue can fail. So we have to 1195 * traverse from tail to make sure it actually made it. It 1196 * will always be near the tail in calls to this method, and 1197 * unless the CAS failed (which is unlikely), it will be 1198 * there, so we hardly ever traverse much. 1199 */ 1200 return findNodeFromTail(node); 1201 } 1202 1203 /** 1204 * Returns true if node is on sync queue by searching backwards from tail. 1205 * Called only when needed by isOnSyncQueue. 1206 * @return true if present 1207 */ 1208 private boolean findNodeFromTail(Node node) { 1209 // We check for node first, since it's likely to be at or near tail. 1210 // tail is known to be non-null, so we could re-order to "save" 1211 // one null check, but we leave it this way to help the VM. 1212 for (Node p = tail;;) { 1213 if (p == node) 1214 return true; 1215 if (p == null) 1216 return false; 1217 p = p.prev; 1218 } 1219 } 1220 1221 /** 1222 * Transfers a node from a condition queue onto sync queue. 1223 * Returns true if successful. 1224 * @param node the node 1225 * @return true if successfully transferred (else the node was 1226 * cancelled before signal) 1227 */ 1228 final boolean transferForSignal(Node node) { 1229 /* 1230 * If cannot change waitStatus, the node has been cancelled. 1231 */ 1232 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) 1233 return false; 1234 1235 /* 1236 * Splice onto queue and try to set waitStatus of predecessor to 1237 * indicate that thread is (probably) waiting. If cancelled or 1238 * attempt to set waitStatus fails, wake up to resync (in which 1239 * case the waitStatus can be transiently and harmlessly wrong). 1240 */ 1241 Node p = enq(node); 1242 int ws = p.waitStatus; 1243 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) 1244 LockSupport.unpark(node.thread); 1245 return true; 1246 } 1247 1248 /** 1249 * Transfers node, if necessary, to sync queue after a cancelled wait. 1250 * Returns true if thread was cancelled before being signalled. 1251 * 1252 * @param node the node 1253 * @return true if cancelled before the node was signalled 1254 */ 1255 final boolean transferAfterCancelledWait(Node node) { 1256 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { 1257 enq(node); 1258 return true; 1259 } 1260 /* 1261 * If we lost out to a signal(), then we can't proceed 1262 * until it finishes its enq(). Cancelling during an 1263 * incomplete transfer is both rare and transient, so just 1264 * spin. 1265 */ 1266 while (!isOnSyncQueue(node)) 1267 Thread.yield(); 1268 return false; 1269 } 1270 1271 /** 1272 * Invokes release with current state value; returns saved state. 1273 * Cancels node and throws exception on failure. 1274 * @param node the condition node for this wait 1275 * @return previous sync state 1276 */ 1277 final long fullyRelease(Node node) { 1278 try { 1279 long savedState = getState(); 1280 if (release(savedState)) 1281 return savedState; 1282 throw new IllegalMonitorStateException(); 1283 } catch (Throwable t) { 1284 node.waitStatus = Node.CANCELLED; 1285 throw t; 1286 } 1287 } 1288 1289 // Instrumentation methods for conditions 1290 1291 /** 1292 * Queries whether the given ConditionObject 1293 * uses this synchronizer as its lock. 1294 * 1295 * @param condition the condition 1296 * @return {@code true} if owned 1297 * @throws NullPointerException if the condition is null 1298 */ 1299 public final boolean owns(ConditionObject condition) { 1300 return condition.isOwnedBy(this); 1301 } 1302 1303 /** 1304 * Queries whether any threads are waiting on the given condition 1305 * associated with this synchronizer. Note that because timeouts 1306 * and interrupts may occur at any time, a {@code true} return 1307 * does not guarantee that a future {@code signal} will awaken 1308 * any threads. This method is designed primarily for use in 1309 * monitoring of the system state. 1310 * 1311 * @param condition the condition 1312 * @return {@code true} if there are any waiting threads 1313 * @throws IllegalMonitorStateException if exclusive synchronization 1314 * is not held 1315 * @throws IllegalArgumentException if the given condition is 1316 * not associated with this synchronizer 1317 * @throws NullPointerException if the condition is null 1318 */ 1319 public final boolean hasWaiters(ConditionObject condition) { 1320 if (!owns(condition)) 1321 throw new IllegalArgumentException("Not owner"); 1322 return condition.hasWaiters(); 1323 } 1324 1325 /** 1326 * Returns an estimate of the number of threads waiting on the 1327 * given condition associated with this synchronizer. Note that 1328 * because timeouts and interrupts may occur at any time, the 1329 * estimate serves only as an upper bound on the actual number of 1330 * waiters. This method is designed for use in monitoring system 1331 * state, not for synchronization control. 1332 * 1333 * @param condition the condition 1334 * @return the estimated number of waiting threads 1335 * @throws IllegalMonitorStateException if exclusive synchronization 1336 * is not held 1337 * @throws IllegalArgumentException if the given condition is 1338 * not associated with this synchronizer 1339 * @throws NullPointerException if the condition is null 1340 */ 1341 public final int getWaitQueueLength(ConditionObject condition) { 1342 if (!owns(condition)) 1343 throw new IllegalArgumentException("Not owner"); 1344 return condition.getWaitQueueLength(); 1345 } 1346 1347 /** 1348 * Returns a collection containing those threads that may be 1349 * waiting on the given condition associated with this 1350 * synchronizer. Because the actual set of threads may change 1351 * dynamically while constructing this result, the returned 1352 * collection is only a best-effort estimate. The elements of the 1353 * returned collection are in no particular order. 1354 * 1355 * @param condition the condition 1356 * @return the collection of threads 1357 * @throws IllegalMonitorStateException if exclusive synchronization 1358 * is not held 1359 * @throws IllegalArgumentException if the given condition is 1360 * not associated with this synchronizer 1361 * @throws NullPointerException if the condition is null 1362 */ 1363 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1364 if (!owns(condition)) 1365 throw new IllegalArgumentException("Not owner"); 1366 return condition.getWaitingThreads(); 1367 } 1368 1369 /** 1370 * Condition implementation for a {@link 1371 * AbstractQueuedLongSynchronizer} serving as the basis of a {@link 1372 * Lock} implementation. 1373 * 1374 * <p>Method documentation for this class describes mechanics, 1375 * not behavioral specifications from the point of view of Lock 1376 * and Condition users. Exported versions of this class will in 1377 * general need to be accompanied by documentation describing 1378 * condition semantics that rely on those of the associated 1379 * {@code AbstractQueuedLongSynchronizer}. 1380 * 1381 * <p>This class is Serializable, but all fields are transient, 1382 * so deserialized conditions have no waiters. 1383 * 1384 * @since 1.6 1385 */ 1386 public class ConditionObject implements Condition, java.io.Serializable { 1387 private static final long serialVersionUID = 1173984872572414699L; 1388 /** First node of condition queue. */ 1389 private transient Node firstWaiter; 1390 /** Last node of condition queue. */ 1391 private transient Node lastWaiter; 1392 1393 /** 1394 * Creates a new {@code ConditionObject} instance. 1395 */ 1396 public ConditionObject() { } 1397 1398 // Internal methods 1399 1400 /** 1401 * Adds a new waiter to wait queue. 1402 * @return its new wait node 1403 */ 1404 private Node addConditionWaiter() { 1405 Node t = lastWaiter; 1406 // If lastWaiter is cancelled, clean out. 1407 if (t != null && t.waitStatus != Node.CONDITION) { 1408 unlinkCancelledWaiters(); 1409 t = lastWaiter; 1410 } 1411 1412 Node node = new Node(Node.CONDITION); 1413 1414 if (t == null) 1415 firstWaiter = node; 1416 else 1417 t.nextWaiter = node; 1418 lastWaiter = node; 1419 return node; 1420 } 1421 1422 /** 1423 * Removes and transfers nodes until hit non-cancelled one or 1424 * null. Split out from signal in part to encourage compilers 1425 * to inline the case of no waiters. 1426 * @param first (non-null) the first node on condition queue 1427 */ 1428 private void doSignal(Node first) { 1429 do { 1430 if ( (firstWaiter = first.nextWaiter) == null) 1431 lastWaiter = null; 1432 first.nextWaiter = null; 1433 } while (!transferForSignal(first) && 1434 (first = firstWaiter) != null); 1435 } 1436 1437 /** 1438 * Removes and transfers all nodes. 1439 * @param first (non-null) the first node on condition queue 1440 */ 1441 private void doSignalAll(Node first) { 1442 lastWaiter = firstWaiter = null; 1443 do { 1444 Node next = first.nextWaiter; 1445 first.nextWaiter = null; 1446 transferForSignal(first); 1447 first = next; 1448 } while (first != null); 1449 } 1450 1451 /** 1452 * Unlinks cancelled waiter nodes from condition queue. 1453 * Called only while holding lock. This is called when 1454 * cancellation occurred during condition wait, and upon 1455 * insertion of a new waiter when lastWaiter is seen to have 1456 * been cancelled. This method is needed to avoid garbage 1457 * retention in the absence of signals. So even though it may 1458 * require a full traversal, it comes into play only when 1459 * timeouts or cancellations occur in the absence of 1460 * signals. It traverses all nodes rather than stopping at a 1461 * particular target to unlink all pointers to garbage nodes 1462 * without requiring many re-traversals during cancellation 1463 * storms. 1464 */ 1465 private void unlinkCancelledWaiters() { 1466 Node t = firstWaiter; 1467 Node trail = null; 1468 while (t != null) { 1469 Node next = t.nextWaiter; 1470 if (t.waitStatus != Node.CONDITION) { 1471 t.nextWaiter = null; 1472 if (trail == null) 1473 firstWaiter = next; 1474 else 1475 trail.nextWaiter = next; 1476 if (next == null) 1477 lastWaiter = trail; 1478 } 1479 else 1480 trail = t; 1481 t = next; 1482 } 1483 } 1484 1485 // public methods 1486 1487 /** 1488 * Moves the longest-waiting thread, if one exists, from the 1489 * wait queue for this condition to the wait queue for the 1490 * owning lock. 1491 * 1492 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1493 * returns {@code false} 1494 */ 1495 public final void signal() { 1496 if (!isHeldExclusively()) 1497 throw new IllegalMonitorStateException(); 1498 Node first = firstWaiter; 1499 if (first != null) 1500 doSignal(first); 1501 } 1502 1503 /** 1504 * Moves all threads from the wait queue for this condition to 1505 * the wait queue for the owning lock. 1506 * 1507 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1508 * returns {@code false} 1509 */ 1510 public final void signalAll() { 1511 if (!isHeldExclusively()) 1512 throw new IllegalMonitorStateException(); 1513 Node first = firstWaiter; 1514 if (first != null) 1515 doSignalAll(first); 1516 } 1517 1518 /** 1519 * Implements uninterruptible condition wait. 1520 * <ol> 1521 * <li>Save lock state returned by {@link #getState}. 1522 * <li>Invoke {@link #release} with saved state as argument, 1523 * throwing IllegalMonitorStateException if it fails. 1524 * <li>Block until signalled. 1525 * <li>Reacquire by invoking specialized version of 1526 * {@link #acquire} with saved state as argument. 1527 * </ol> 1528 */ 1529 public final void awaitUninterruptibly() { 1530 Node node = addConditionWaiter(); 1531 long savedState = fullyRelease(node); 1532 boolean interrupted = false; 1533 while (!isOnSyncQueue(node)) { 1534 LockSupport.park(this); 1535 if (Thread.interrupted()) 1536 interrupted = true; 1537 } 1538 if (acquireQueued(node, savedState) || interrupted) 1539 selfInterrupt(); 1540 } 1541 1542 /* 1543 * For interruptible waits, we need to track whether to throw 1544 * InterruptedException, if interrupted while blocked on 1545 * condition, versus reinterrupt current thread, if 1546 * interrupted while blocked waiting to re-acquire. 1547 */ 1548 1549 /** Mode meaning to reinterrupt on exit from wait */ 1550 private static final int REINTERRUPT = 1; 1551 /** Mode meaning to throw InterruptedException on exit from wait */ 1552 private static final int THROW_IE = -1; 1553 1554 /** 1555 * Checks for interrupt, returning THROW_IE if interrupted 1556 * before signalled, REINTERRUPT if after signalled, or 1557 * 0 if not interrupted. 1558 */ 1559 private int checkInterruptWhileWaiting(Node node) { 1560 return Thread.interrupted() ? 1561 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 1562 0; 1563 } 1564 1565 /** 1566 * Throws InterruptedException, reinterrupts current thread, or 1567 * does nothing, depending on mode. 1568 */ 1569 private void reportInterruptAfterWait(int interruptMode) 1570 throws InterruptedException { 1571 if (interruptMode == THROW_IE) 1572 throw new InterruptedException(); 1573 else if (interruptMode == REINTERRUPT) 1574 selfInterrupt(); 1575 } 1576 1577 /** 1578 * Implements interruptible condition wait. 1579 * <ol> 1580 * <li>If current thread is interrupted, throw InterruptedException. 1581 * <li>Save lock state returned by {@link #getState}. 1582 * <li>Invoke {@link #release} with saved state as argument, 1583 * throwing IllegalMonitorStateException if it fails. 1584 * <li>Block until signalled or interrupted. 1585 * <li>Reacquire by invoking specialized version of 1586 * {@link #acquire} with saved state as argument. 1587 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1588 * </ol> 1589 */ 1590 public final void await() throws InterruptedException { 1591 if (Thread.interrupted()) 1592 throw new InterruptedException(); 1593 Node node = addConditionWaiter(); 1594 long savedState = fullyRelease(node); 1595 int interruptMode = 0; 1596 while (!isOnSyncQueue(node)) { 1597 LockSupport.park(this); 1598 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1599 break; 1600 } 1601 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1602 interruptMode = REINTERRUPT; 1603 if (node.nextWaiter != null) // clean up if cancelled 1604 unlinkCancelledWaiters(); 1605 if (interruptMode != 0) 1606 reportInterruptAfterWait(interruptMode); 1607 } 1608 1609 /** 1610 * Implements timed condition wait. 1611 * <ol> 1612 * <li>If current thread is interrupted, throw InterruptedException. 1613 * <li>Save lock state returned by {@link #getState}. 1614 * <li>Invoke {@link #release} with saved state as argument, 1615 * throwing IllegalMonitorStateException if it fails. 1616 * <li>Block until signalled, interrupted, or timed out. 1617 * <li>Reacquire by invoking specialized version of 1618 * {@link #acquire} with saved state as argument. 1619 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1620 * </ol> 1621 */ 1622 public final long awaitNanos(long nanosTimeout) 1623 throws InterruptedException { 1624 if (Thread.interrupted()) 1625 throw new InterruptedException(); 1626 // We don't check for nanosTimeout <= 0L here, to allow 1627 // awaitNanos(0) as a way to "yield the lock". 1628 final long deadline = System.nanoTime() + nanosTimeout; 1629 long initialNanos = nanosTimeout; 1630 Node node = addConditionWaiter(); 1631 long savedState = fullyRelease(node); 1632 int interruptMode = 0; 1633 while (!isOnSyncQueue(node)) { 1634 if (nanosTimeout <= 0L) { 1635 transferAfterCancelledWait(node); 1636 break; 1637 } 1638 if (nanosTimeout >= SPIN_FOR_TIMEOUT_THRESHOLD) 1639 LockSupport.parkNanos(this, nanosTimeout); 1640 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1641 break; 1642 nanosTimeout = deadline - System.nanoTime(); 1643 } 1644 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1645 interruptMode = REINTERRUPT; 1646 if (node.nextWaiter != null) 1647 unlinkCancelledWaiters(); 1648 if (interruptMode != 0) 1649 reportInterruptAfterWait(interruptMode); 1650 long remaining = deadline - System.nanoTime(); // avoid overflow 1651 return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; 1652 } 1653 1654 /** 1655 * Implements absolute timed condition wait. 1656 * <ol> 1657 * <li>If current thread is interrupted, throw InterruptedException. 1658 * <li>Save lock state returned by {@link #getState}. 1659 * <li>Invoke {@link #release} with saved state as argument, 1660 * throwing IllegalMonitorStateException if it fails. 1661 * <li>Block until signalled, interrupted, or timed out. 1662 * <li>Reacquire by invoking specialized version of 1663 * {@link #acquire} with saved state as argument. 1664 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1665 * <li>If timed out while blocked in step 4, return false, else true. 1666 * </ol> 1667 */ 1668 public final boolean awaitUntil(Date deadline) 1669 throws InterruptedException { 1670 long abstime = deadline.getTime(); 1671 if (Thread.interrupted()) 1672 throw new InterruptedException(); 1673 Node node = addConditionWaiter(); 1674 long savedState = fullyRelease(node); 1675 boolean timedout = false; 1676 int interruptMode = 0; 1677 while (!isOnSyncQueue(node)) { 1678 if (System.currentTimeMillis() >= abstime) { 1679 timedout = transferAfterCancelledWait(node); 1680 break; 1681 } 1682 LockSupport.parkUntil(this, abstime); 1683 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1684 break; 1685 } 1686 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1687 interruptMode = REINTERRUPT; 1688 if (node.nextWaiter != null) 1689 unlinkCancelledWaiters(); 1690 if (interruptMode != 0) 1691 reportInterruptAfterWait(interruptMode); 1692 return !timedout; 1693 } 1694 1695 /** 1696 * Implements timed condition wait. 1697 * <ol> 1698 * <li>If current thread is interrupted, throw InterruptedException. 1699 * <li>Save lock state returned by {@link #getState}. 1700 * <li>Invoke {@link #release} with saved state as argument, 1701 * throwing IllegalMonitorStateException if it fails. 1702 * <li>Block until signalled, interrupted, or timed out. 1703 * <li>Reacquire by invoking specialized version of 1704 * {@link #acquire} with saved state as argument. 1705 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1706 * <li>If timed out while blocked in step 4, return false, else true. 1707 * </ol> 1708 */ 1709 public final boolean await(long time, TimeUnit unit) 1710 throws InterruptedException { 1711 long nanosTimeout = unit.toNanos(time); 1712 if (Thread.interrupted()) 1713 throw new InterruptedException(); 1714 // We don't check for nanosTimeout <= 0L here, to allow 1715 // await(0, unit) as a way to "yield the lock". 1716 final long deadline = System.nanoTime() + nanosTimeout; 1717 Node node = addConditionWaiter(); 1718 long savedState = fullyRelease(node); 1719 boolean timedout = false; 1720 int interruptMode = 0; 1721 while (!isOnSyncQueue(node)) { 1722 if (nanosTimeout <= 0L) { 1723 timedout = transferAfterCancelledWait(node); 1724 break; 1725 } 1726 if (nanosTimeout >= SPIN_FOR_TIMEOUT_THRESHOLD) 1727 LockSupport.parkNanos(this, nanosTimeout); 1728 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1729 break; 1730 nanosTimeout = deadline - System.nanoTime(); 1731 } 1732 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1733 interruptMode = REINTERRUPT; 1734 if (node.nextWaiter != null) 1735 unlinkCancelledWaiters(); 1736 if (interruptMode != 0) 1737 reportInterruptAfterWait(interruptMode); 1738 return !timedout; 1739 } 1740 1741 // support for instrumentation 1742 1743 /** 1744 * Returns true if this condition was created by the given 1745 * synchronization object. 1746 * 1747 * @return {@code true} if owned 1748 */ 1749 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { 1750 return sync == AbstractQueuedLongSynchronizer.this; 1751 } 1752 1753 /** 1754 * Queries whether any threads are waiting on this condition. 1755 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}. 1756 * 1757 * @return {@code true} if there are any waiting threads 1758 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1759 * returns {@code false} 1760 */ 1761 protected final boolean hasWaiters() { 1762 if (!isHeldExclusively()) 1763 throw new IllegalMonitorStateException(); 1764 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1765 if (w.waitStatus == Node.CONDITION) 1766 return true; 1767 } 1768 return false; 1769 } 1770 1771 /** 1772 * Returns an estimate of the number of threads waiting on 1773 * this condition. 1774 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}. 1775 * 1776 * @return the estimated number of waiting threads 1777 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1778 * returns {@code false} 1779 */ 1780 protected final int getWaitQueueLength() { 1781 if (!isHeldExclusively()) 1782 throw new IllegalMonitorStateException(); 1783 int n = 0; 1784 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1785 if (w.waitStatus == Node.CONDITION) 1786 ++n; 1787 } 1788 return n; 1789 } 1790 1791 /** 1792 * Returns a collection containing those threads that may be 1793 * waiting on this Condition. 1794 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}. 1795 * 1796 * @return the collection of threads 1797 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1798 * returns {@code false} 1799 */ 1800 protected final Collection<Thread> getWaitingThreads() { 1801 if (!isHeldExclusively()) 1802 throw new IllegalMonitorStateException(); 1803 ArrayList<Thread> list = new ArrayList<>(); 1804 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1805 if (w.waitStatus == Node.CONDITION) { 1806 Thread t = w.thread; 1807 if (t != null) 1808 list.add(t); 1809 } 1810 } 1811 return list; 1812 } 1813 } 1814 1815 /** 1816 * Setup to support compareAndSet. We need to natively implement 1817 * this here: For the sake of permitting future enhancements, we 1818 * cannot explicitly subclass AtomicLong, which would be 1819 * efficient and useful otherwise. So, as the lesser of evils, we 1820 * natively implement using hotspot intrinsics API. And while we 1821 * are at it, we do the same for other CASable fields (which could 1822 * otherwise be done with atomic field updaters). 1823 */ 1824 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 1825 private static final long STATE; 1826 private static final long HEAD; 1827 private static final long TAIL; 1828 1829 static { 1830 try { 1831 STATE = U.objectFieldOffset 1832 (AbstractQueuedLongSynchronizer.class.getDeclaredField("state")); 1833 HEAD = U.objectFieldOffset 1834 (AbstractQueuedLongSynchronizer.class.getDeclaredField("head")); 1835 TAIL = U.objectFieldOffset 1836 (AbstractQueuedLongSynchronizer.class.getDeclaredField("tail")); 1837 } catch (ReflectiveOperationException e) { 1838 throw new Error(e); 1839 } 1840 1841 // Reduce the risk of rare disastrous classloading in first call to 1842 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 1843 Class<?> ensureLoaded = LockSupport.class; 1844 } 1845 1846 /** 1847 * Initializes head and tail fields on first contention. 1848 */ 1849 private final void initializeSyncQueue() { 1850 if (U.compareAndSwapObject(this, HEAD, null, new Node())) 1851 tail = head; 1852 } 1853 1854 /** 1855 * CASes tail field. 1856 */ 1857 private final boolean compareAndSetTail(Node expect, Node update) { 1858 return U.compareAndSwapObject(this, TAIL, expect, update); 1859 } 1860 }