18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent.locks;
37
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.Date;
43 import java.util.concurrent.TimeUnit;
44
45 /**
46 * Provides a framework for implementing blocking locks and related
47 * synchronizers (semaphores, events, etc) that rely on
48 * first-in-first-out (FIFO) wait queues. This class is designed to
49 * be a useful basis for most kinds of synchronizers that rely on a
50 * single atomic {@code int} value to represent state. Subclasses
51 * must define the protected methods that change this state, and which
52 * define what that state means in terms of this object being acquired
53 * or released. Given these, the other methods in this class carry
54 * out all queuing and blocking mechanics. Subclasses can maintain
55 * other state fields, but only the atomically updated {@code int}
56 * value manipulated using methods {@link #getState}, {@link
57 * #setState} and {@link #compareAndSetState} is tracked with respect
58 * to synchronization.
59 *
60 * <p>Subclasses should be defined as non-public internal helper
61 * classes that are used to implement the synchronization properties
62 * of their enclosing class. Class
63 * {@code AbstractQueuedSynchronizer} does not implement any
295 * public void await() throws InterruptedException {
296 * sync.acquireSharedInterruptibly(1);
297 * }
298 * }}</pre>
299 *
300 * @since 1.5
301 * @author Doug Lea
302 */
303 public abstract class AbstractQueuedSynchronizer
304 extends AbstractOwnableSynchronizer
305 implements java.io.Serializable {
306
307 private static final long serialVersionUID = 7373984972572414691L;
308
309 /**
310 * Creates a new {@code AbstractQueuedSynchronizer} instance
311 * with initial synchronization state of zero.
312 */
313 protected AbstractQueuedSynchronizer() { }
314
315 /**
316 * Wait queue node class.
317 *
318 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
319 * Hagersten) lock queue. CLH locks are normally used for
320 * spinlocks. We instead use them for blocking synchronizers, but
321 * use the same basic tactic of holding some of the control
322 * information about a thread in the predecessor of its node. A
323 * "status" field in each node keeps track of whether a thread
324 * should block. A node is signalled when its predecessor
325 * releases. Each node of the queue otherwise serves as a
326 * specific-notification-style monitor holding a single waiting
327 * thread. The status field does NOT control whether threads are
328 * granted locks etc though. A thread may try to acquire if it is
329 * first in the queue. But being first does not guarantee success;
330 * it only gives the right to contend. So the currently released
331 * contender thread may need to rewait.
332 *
333 * <p>To enqueue into a CLH lock, you atomically splice it in as new
334 * tail. To dequeue, you just set the head field.
335 * <pre>
336 * +------+ prev +-----+ +-----+
337 * head | | <---- | | <---- | | tail
338 * +------+ +-----+ +-----+
339 * </pre>
340 *
341 * <p>Insertion into a CLH queue requires only a single atomic
342 * operation on "tail", so there is a simple atomic point of
343 * demarcation from unqueued to queued. Similarly, dequeuing
344 * involves only updating the "head". However, it takes a bit
345 * more work for nodes to determine who their successors are,
346 * in part to deal with possible cancellation due to timeouts
347 * and interrupts.
348 *
349 * <p>The "prev" links (not used in original CLH locks), are mainly
350 * needed to handle cancellation. If a node is cancelled, its
351 * successor is (normally) relinked to a non-cancelled
352 * predecessor. For explanation of similar mechanics in the case
353 * of spin locks, see the papers by Scott and Scherer at
354 * http://www.cs.rochester.edu/u/scott/synchronization/
355 *
356 * <p>We also use "next" links to implement blocking mechanics.
357 * The thread id for each node is kept in its own node, so a
358 * predecessor signals the next node to wake up by traversing
359 * next link to determine which thread it is. Determination of
360 * successor must avoid races with newly queued nodes to set
361 * the "next" fields of their predecessors. This is solved
362 * when necessary by checking backwards from the atomically
363 * updated "tail" when a node's successor appears to be null.
364 * (Or, said differently, the next-links are an optimization
365 * so that we don't usually need a backward scan.)
366 *
367 * <p>Cancellation introduces some conservatism to the basic
368 * algorithms. Since we must poll for cancellation of other
369 * nodes, we can miss noticing whether a cancelled node is
370 * ahead or behind us. This is dealt with by always unparking
371 * successors upon cancellation, allowing them to stabilize on
372 * a new predecessor, unless we can identify an uncancelled
373 * predecessor who will carry this responsibility.
374 *
375 * <p>CLH queues need a dummy header node to get started. But
376 * we don't create them on construction, because it would be wasted
377 * effort if there is never contention. Instead, the node
378 * is constructed and head and tail pointers are set upon first
379 * contention.
380 *
381 * <p>Threads waiting on Conditions use the same nodes, but
382 * use an additional link. Conditions only need to link nodes
383 * in simple (non-concurrent) linked queues because they are
384 * only accessed when exclusively held. Upon await, a node is
385 * inserted into a condition queue. Upon signal, the node is
386 * transferred to the main queue. A special value of status
387 * field is used to mark which queue a node is on.
388 *
389 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
390 * Scherer and Michael Scott, along with members of JSR-166
391 * expert group, for helpful ideas, discussions, and critiques
392 * on the design of this class.
393 */
394 static final class Node {
395 /** Marker to indicate a node is waiting in shared mode */
396 static final Node SHARED = new Node();
397 /** Marker to indicate a node is waiting in exclusive mode */
398 static final Node EXCLUSIVE = null;
399
400 /** waitStatus value to indicate thread has cancelled. */
401 static final int CANCELLED = 1;
402 /** waitStatus value to indicate successor's thread needs unparking. */
403 static final int SIGNAL = -1;
404 /** waitStatus value to indicate thread is waiting on condition. */
405 static final int CONDITION = -2;
406 /**
407 * waitStatus value to indicate the next acquireShared should
408 * unconditionally propagate.
409 */
410 static final int PROPAGATE = -3;
411
412 /**
413 * Status field, taking on only the values:
414 * SIGNAL: The successor of this node is (or will soon be)
415 * blocked (via park), so the current node must
416 * unpark its successor when it releases or
417 * cancels. To avoid races, acquire methods must
418 * first indicate they need a signal,
419 * then retry the atomic acquire, and then,
420 * on failure, block.
421 * CANCELLED: This node is cancelled due to timeout or interrupt.
422 * Nodes never leave this state. In particular,
423 * a thread with cancelled node never again blocks.
424 * CONDITION: This node is currently on a condition queue.
425 * It will not be used as a sync queue node
426 * until transferred, at which time the status
427 * will be set to 0. (Use of this value here has
428 * nothing to do with the other uses of the
429 * field, but simplifies mechanics.)
430 * PROPAGATE: A releaseShared should be propagated to other
431 * nodes. This is set (for head node only) in
432 * doReleaseShared to ensure propagation
433 * continues, even if other operations have
434 * since intervened.
435 * 0: None of the above
436 *
437 * The values are arranged numerically to simplify use.
438 * Non-negative values mean that a node doesn't need to
439 * signal. So, most code doesn't need to check for particular
440 * values, just for sign.
441 *
442 * The field is initialized to 0 for normal sync nodes, and
443 * CONDITION for condition nodes. It is modified using CAS
444 * (or when possible, unconditional volatile writes).
445 */
446 volatile int waitStatus;
447
448 /**
449 * Link to predecessor node that current node/thread relies on
450 * for checking waitStatus. Assigned during enqueuing, and nulled
451 * out (for sake of GC) only upon dequeuing. Also, upon
452 * cancellation of a predecessor, we short-circuit while
453 * finding a non-cancelled one, which will always exist
454 * because the head node is never cancelled: A node becomes
455 * head only as a result of successful acquire. A
456 * cancelled thread never succeeds in acquiring, and a thread only
457 * cancels itself, not any other node.
458 */
459 volatile Node prev;
460
461 /**
462 * Link to the successor node that the current node/thread
463 * unparks upon release. Assigned during enqueuing, adjusted
464 * when bypassing cancelled predecessors, and nulled out (for
465 * sake of GC) when dequeued. The enq operation does not
466 * assign next field of a predecessor until after attachment,
467 * so seeing a null next field does not necessarily mean that
468 * node is at end of queue. However, if a next field appears
469 * to be null, we can scan prev's from the tail to
470 * double-check. The next field of cancelled nodes is set to
471 * point to the node itself instead of null, to make life
472 * easier for isOnSyncQueue.
473 */
474 volatile Node next;
475
476 /**
477 * The thread that enqueued this node. Initialized on
478 * construction and nulled out after use.
479 */
480 volatile Thread thread;
481
482 /**
483 * Link to next node waiting on condition, or the special
484 * value SHARED. Because condition queues are accessed only
485 * when holding in exclusive mode, we just need a simple
486 * linked queue to hold nodes while they are waiting on
487 * conditions. They are then transferred to the queue to
488 * re-acquire. And because conditions can only be exclusive,
489 * we save a field by using special value to indicate shared
490 * mode.
491 */
492 Node nextWaiter;
493
494 /**
495 * Returns true if node is waiting in shared mode.
496 */
497 final boolean isShared() {
498 return nextWaiter == SHARED;
499 }
500
501 /**
502 * Returns previous node, or throws NullPointerException if null.
503 * Use when predecessor cannot be null. The null check could
504 * be elided, but is present to help the VM.
505 *
506 * @return the predecessor of this node
507 */
508 final Node predecessor() {
509 Node p = prev;
510 if (p == null)
511 throw new NullPointerException();
512 else
513 return p;
514 }
515
516 /** Establishes initial head or SHARED marker. */
517 Node() {}
518
519 /** Constructor used by addWaiter. */
520 Node(Node nextWaiter) {
521 this.nextWaiter = nextWaiter;
522 THREAD.set(this, Thread.currentThread());
523 }
524
525 /** Constructor used by addConditionWaiter. */
526 Node(int waitStatus) {
527 WAITSTATUS.set(this, waitStatus);
528 THREAD.set(this, Thread.currentThread());
529 }
530
531 /** CASes waitStatus field. */
532 final boolean compareAndSetWaitStatus(int expect, int update) {
533 return WAITSTATUS.compareAndSet(this, expect, update);
534 }
535
536 /** CASes next field. */
537 final boolean compareAndSetNext(Node expect, Node update) {
538 return NEXT.compareAndSet(this, expect, update);
539 }
540
541 final void setPrevRelaxed(Node p) {
542 PREV.set(this, p);
543 }
544
545 // VarHandle mechanics
546 private static final VarHandle NEXT;
547 private static final VarHandle PREV;
548 private static final VarHandle THREAD;
549 private static final VarHandle WAITSTATUS;
550 static {
551 try {
552 MethodHandles.Lookup l = MethodHandles.lookup();
553 NEXT = l.findVarHandle(Node.class, "next", Node.class);
554 PREV = l.findVarHandle(Node.class, "prev", Node.class);
555 THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
556 WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
557 } catch (ReflectiveOperationException e) {
558 throw new ExceptionInInitializerError(e);
559 }
560 }
561 }
562
563 /**
564 * Head of the wait queue, lazily initialized. Except for
565 * initialization, it is modified only via method setHead. Note:
566 * If head exists, its waitStatus is guaranteed not to be
567 * CANCELLED.
568 */
569 private transient volatile Node head;
570
571 /**
572 * Tail of the wait queue, lazily initialized. Modified only via
573 * method enq to add new wait node.
574 */
575 private transient volatile Node tail;
576
577 /**
578 * The synchronization state.
579 */
580 private volatile int state;
581
582 /**
583 * Returns the current value of synchronization state.
584 * This operation has memory semantics of a {@code volatile} read.
585 * @return current state value
586 */
587 protected final int getState() {
588 return state;
589 }
590
591 /**
592 * Sets the value of synchronization state.
593 * This operation has memory semantics of a {@code volatile} write.
594 * @param newState the new state value
595 */
596 protected final void setState(int newState) {
597 state = newState;
598 }
599
600 /**
601 * Atomically sets synchronization state to the given updated
602 * value if the current state value equals the expected value.
603 * This operation has memory semantics of a {@code volatile} read
604 * and write.
605 *
606 * @param expect the expected value
607 * @param update the new value
608 * @return {@code true} if successful. False return indicates that the actual
609 * value was not equal to the expected value.
610 */
611 protected final boolean compareAndSetState(int expect, int update) {
612 return STATE.compareAndSet(this, expect, update);
613 }
614
615 // Queuing utilities
616
617 /**
618 * The number of nanoseconds for which it is faster to spin
619 * rather than to use timed park. A rough estimate suffices
620 * to improve responsiveness with very short timeouts.
621 */
622 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
623
624 /**
625 * Inserts node into queue, initializing if necessary. See picture above.
626 * @param node the node to insert
627 * @return node's predecessor
628 */
629 private Node enq(Node node) {
630 for (;;) {
631 Node oldTail = tail;
632 if (oldTail != null) {
633 node.setPrevRelaxed(oldTail);
634 if (compareAndSetTail(oldTail, node)) {
635 oldTail.next = node;
636 return oldTail;
637 }
638 } else {
639 initializeSyncQueue();
640 }
641 }
642 }
643
644 /**
645 * Creates and enqueues node for current thread and given mode.
646 *
647 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
648 * @return the new node
649 */
650 private Node addWaiter(Node mode) {
651 Node node = new Node(mode);
652
653 for (;;) {
654 Node oldTail = tail;
655 if (oldTail != null) {
656 node.setPrevRelaxed(oldTail);
657 if (compareAndSetTail(oldTail, node)) {
658 oldTail.next = node;
659 return node;
660 }
661 } else {
662 initializeSyncQueue();
663 }
664 }
665 }
666
667 /**
668 * Sets head of queue to be node, thus dequeuing. Called only by
669 * acquire methods. Also nulls out unused fields for sake of GC
670 * and to suppress unnecessary signals and traversals.
671 *
672 * @param node the node
673 */
674 private void setHead(Node node) {
675 head = node;
676 node.thread = null;
677 node.prev = null;
678 }
679
680 /**
681 * Wakes up node's successor, if one exists.
682 *
683 * @param node the node
684 */
685 private void unparkSuccessor(Node node) {
686 /*
687 * If status is negative (i.e., possibly needing signal) try
688 * to clear in anticipation of signalling. It is OK if this
689 * fails or if status is changed by waiting thread.
690 */
691 int ws = node.waitStatus;
692 if (ws < 0)
693 node.compareAndSetWaitStatus(ws, 0);
694
695 /*
696 * Thread to unpark is held in successor, which is normally
697 * just the next node. But if cancelled or apparently null,
698 * traverse backwards from tail to find the actual
699 * non-cancelled successor.
700 */
701 Node s = node.next;
702 if (s == null || s.waitStatus > 0) {
703 s = null;
704 for (Node p = tail; p != node && p != null; p = p.prev)
705 if (p.waitStatus <= 0)
706 s = p;
707 }
708 if (s != null)
709 LockSupport.unpark(s.thread);
710 }
711
712 /**
713 * Release action for shared mode -- signals successor and ensures
714 * propagation. (Note: For exclusive mode, release just amounts
715 * to calling unparkSuccessor of head if it needs signal.)
716 */
717 private void doReleaseShared() {
718 /*
719 * Ensure that a release propagates, even if there are other
720 * in-progress acquires/releases. This proceeds in the usual
721 * way of trying to unparkSuccessor of head if it needs
722 * signal. But if it does not, status is set to PROPAGATE to
723 * ensure that upon release, propagation continues.
724 * Additionally, we must loop in case a new node is added
725 * while we are doing this. Also, unlike other uses of
726 * unparkSuccessor, we need to know if CAS to reset status
727 * fails, if so rechecking.
728 */
729 for (;;) {
730 Node h = head;
731 if (h != null && h != tail) {
732 int ws = h.waitStatus;
733 if (ws == Node.SIGNAL) {
734 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
735 continue; // loop to recheck cases
736 unparkSuccessor(h);
737 }
738 else if (ws == 0 &&
739 !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
740 continue; // loop on failed CAS
741 }
742 if (h == head) // loop if head changed
743 break;
744 }
745 }
746
747 /**
748 * Sets head of queue, and checks if successor may be waiting
749 * in shared mode, if so propagating if either propagate > 0 or
750 * PROPAGATE status was set.
751 *
752 * @param node the node
753 * @param propagate the return value from a tryAcquireShared
754 */
755 private void setHeadAndPropagate(Node node, int propagate) {
756 Node h = head; // Record old head for check below
757 setHead(node);
758 /*
759 * Try to signal next queued node if:
760 * Propagation was indicated by caller,
761 * or was recorded (as h.waitStatus either before
762 * or after setHead) by a previous operation
763 * (note: this uses sign-check of waitStatus because
764 * PROPAGATE status may transition to SIGNAL.)
765 * and
766 * The next node is waiting in shared mode,
767 * or we don't know, because it appears null
768 *
769 * The conservatism in both of these checks may cause
770 * unnecessary wake-ups, but only when there are multiple
771 * racing acquires/releases, so most need signals now or soon
772 * anyway.
773 */
774 if (propagate > 0 || h == null || h.waitStatus < 0 ||
775 (h = head) == null || h.waitStatus < 0) {
776 Node s = node.next;
777 if (s == null || s.isShared())
778 doReleaseShared();
779 }
780 }
781
782 // Utilities for various versions of acquire
783
784 /**
785 * Cancels an ongoing attempt to acquire.
786 *
787 * @param node the node
788 */
789 private void cancelAcquire(Node node) {
790 // Ignore if node doesn't exist
791 if (node == null)
792 return;
793
794 node.thread = null;
795
796 // Skip cancelled predecessors
797 Node pred = node.prev;
798 while (pred.waitStatus > 0)
799 node.prev = pred = pred.prev;
800
801 // predNext is the apparent node to unsplice. CASes below will
802 // fail if not, in which case, we lost race vs another cancel
803 // or signal, so no further action is necessary, although with
804 // a possibility that a cancelled node may transiently remain
805 // reachable.
806 Node predNext = pred.next;
807
808 // Can use unconditional write instead of CAS here.
809 // After this atomic step, other Nodes can skip past us.
810 // Before, we are free of interference from other threads.
811 node.waitStatus = Node.CANCELLED;
812
813 // If we are the tail, remove ourselves.
814 if (node == tail && compareAndSetTail(node, pred)) {
815 pred.compareAndSetNext(predNext, null);
816 } else {
817 // If successor needs signal, try to set pred's next-link
818 // so it will get one. Otherwise wake it up to propagate.
819 int ws;
820 if (pred != head &&
821 ((ws = pred.waitStatus) == Node.SIGNAL ||
822 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
823 pred.thread != null) {
824 Node next = node.next;
825 if (next != null && next.waitStatus <= 0)
826 pred.compareAndSetNext(predNext, next);
827 } else {
828 unparkSuccessor(node);
829 }
830
831 node.next = node; // help GC
832 }
833 }
834
835 /**
836 * Checks and updates status for a node that failed to acquire.
837 * Returns true if thread should block. This is the main signal
838 * control in all acquire loops. Requires that pred == node.prev.
839 *
840 * @param pred node's predecessor holding status
841 * @param node the node
842 * @return {@code true} if thread should block
843 */
844 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
845 int ws = pred.waitStatus;
846 if (ws == Node.SIGNAL)
847 /*
848 * This node has already set status asking a release
849 * to signal it, so it can safely park.
850 */
851 return true;
852 if (ws > 0) {
853 /*
854 * Predecessor was cancelled. Skip over predecessors and
855 * indicate retry.
856 */
857 do {
858 node.prev = pred = pred.prev;
859 } while (pred.waitStatus > 0);
860 pred.next = node;
861 } else {
862 /*
863 * waitStatus must be 0 or PROPAGATE. Indicate that we
864 * need a signal, but don't park yet. Caller will need to
865 * retry to make sure it cannot acquire before parking.
866 */
867 pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
868 }
869 return false;
870 }
871
872 /**
873 * Convenience method to interrupt current thread.
874 */
875 static void selfInterrupt() {
876 Thread.currentThread().interrupt();
877 }
878
879 /**
880 * Convenience method to park and then check if interrupted.
881 *
882 * @return {@code true} if interrupted
883 */
884 private final boolean parkAndCheckInterrupt() {
885 LockSupport.park(this);
886 return Thread.interrupted();
887 }
888
889 /*
890 * Various flavors of acquire, varying in exclusive/shared and
891 * control modes. Each is mostly the same, but annoyingly
892 * different. Only a little bit of factoring is possible due to
893 * interactions of exception mechanics (including ensuring that we
894 * cancel if tryAcquire throws exception) and other control, at
895 * least not without hurting performance too much.
896 */
897
898 /**
899 * Acquires in exclusive uninterruptible mode for thread already in
900 * queue. Used by condition wait methods as well as acquire.
901 *
902 * @param node the node
903 * @param arg the acquire argument
904 * @return {@code true} if interrupted while waiting
905 */
906 final boolean acquireQueued(final Node node, int arg) {
907 boolean interrupted = false;
908 try {
909 for (;;) {
910 final Node p = node.predecessor();
911 if (p == head && tryAcquire(arg)) {
912 setHead(node);
913 p.next = null; // help GC
914 return interrupted;
915 }
916 if (shouldParkAfterFailedAcquire(p, node))
917 interrupted |= parkAndCheckInterrupt();
918 }
919 } catch (Throwable t) {
920 cancelAcquire(node);
921 if (interrupted)
922 selfInterrupt();
923 throw t;
924 }
925 }
926
927 /**
928 * Acquires in exclusive interruptible mode.
929 * @param arg the acquire argument
930 */
931 private void doAcquireInterruptibly(int arg)
932 throws InterruptedException {
933 final Node node = addWaiter(Node.EXCLUSIVE);
934 try {
935 for (;;) {
936 final Node p = node.predecessor();
937 if (p == head && tryAcquire(arg)) {
938 setHead(node);
939 p.next = null; // help GC
940 return;
941 }
942 if (shouldParkAfterFailedAcquire(p, node) &&
943 parkAndCheckInterrupt())
944 throw new InterruptedException();
945 }
946 } catch (Throwable t) {
947 cancelAcquire(node);
948 throw t;
949 }
950 }
951
952 /**
953 * Acquires in exclusive timed mode.
954 *
955 * @param arg the acquire argument
956 * @param nanosTimeout max wait time
957 * @return {@code true} if acquired
958 */
959 private boolean doAcquireNanos(int arg, long nanosTimeout)
960 throws InterruptedException {
961 if (nanosTimeout <= 0L)
962 return false;
963 final long deadline = System.nanoTime() + nanosTimeout;
964 final Node node = addWaiter(Node.EXCLUSIVE);
965 try {
966 for (;;) {
967 final Node p = node.predecessor();
968 if (p == head && tryAcquire(arg)) {
969 setHead(node);
970 p.next = null; // help GC
971 return true;
972 }
973 nanosTimeout = deadline - System.nanoTime();
974 if (nanosTimeout <= 0L) {
975 cancelAcquire(node);
976 return false;
977 }
978 if (shouldParkAfterFailedAcquire(p, node) &&
979 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
980 LockSupport.parkNanos(this, nanosTimeout);
981 if (Thread.interrupted())
982 throw new InterruptedException();
983 }
984 } catch (Throwable t) {
985 cancelAcquire(node);
986 throw t;
987 }
988 }
989
990 /**
991 * Acquires in shared uninterruptible mode.
992 * @param arg the acquire argument
993 */
994 private void doAcquireShared(int arg) {
995 final Node node = addWaiter(Node.SHARED);
996 boolean interrupted = false;
997 try {
998 for (;;) {
999 final Node p = node.predecessor();
1000 if (p == head) {
1001 int r = tryAcquireShared(arg);
1002 if (r >= 0) {
1003 setHeadAndPropagate(node, r);
1004 p.next = null; // help GC
1005 return;
1006 }
1007 }
1008 if (shouldParkAfterFailedAcquire(p, node))
1009 interrupted |= parkAndCheckInterrupt();
1010 }
1011 } catch (Throwable t) {
1012 cancelAcquire(node);
1013 throw t;
1014 } finally {
1015 if (interrupted)
1016 selfInterrupt();
1017 }
1018 }
1019
1020 /**
1021 * Acquires in shared interruptible mode.
1022 * @param arg the acquire argument
1023 */
1024 private void doAcquireSharedInterruptibly(int arg)
1025 throws InterruptedException {
1026 final Node node = addWaiter(Node.SHARED);
1027 try {
1028 for (;;) {
1029 final Node p = node.predecessor();
1030 if (p == head) {
1031 int r = tryAcquireShared(arg);
1032 if (r >= 0) {
1033 setHeadAndPropagate(node, r);
1034 p.next = null; // help GC
1035 return;
1036 }
1037 }
1038 if (shouldParkAfterFailedAcquire(p, node) &&
1039 parkAndCheckInterrupt())
1040 throw new InterruptedException();
1041 }
1042 } catch (Throwable t) {
1043 cancelAcquire(node);
1044 throw t;
1045 }
1046 }
1047
1048 /**
1049 * Acquires in shared timed mode.
1050 *
1051 * @param arg the acquire argument
1052 * @param nanosTimeout max wait time
1053 * @return {@code true} if acquired
1054 */
1055 private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
1056 throws InterruptedException {
1057 if (nanosTimeout <= 0L)
1058 return false;
1059 final long deadline = System.nanoTime() + nanosTimeout;
1060 final Node node = addWaiter(Node.SHARED);
1061 try {
1062 for (;;) {
1063 final Node p = node.predecessor();
1064 if (p == head) {
1065 int r = tryAcquireShared(arg);
1066 if (r >= 0) {
1067 setHeadAndPropagate(node, r);
1068 p.next = null; // help GC
1069 return true;
1070 }
1071 }
1072 nanosTimeout = deadline - System.nanoTime();
1073 if (nanosTimeout <= 0L) {
1074 cancelAcquire(node);
1075 return false;
1076 }
1077 if (shouldParkAfterFailedAcquire(p, node) &&
1078 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
1079 LockSupport.parkNanos(this, nanosTimeout);
1080 if (Thread.interrupted())
1081 throw new InterruptedException();
1082 }
1083 } catch (Throwable t) {
1084 cancelAcquire(node);
1085 throw t;
1086 }
1087 }
1088
1089 // Main exported methods
1090
1091 /**
1092 * Attempts to acquire in exclusive mode. This method should query
1093 * if the state of the object permits it to be acquired in the
1094 * exclusive mode, and if so to acquire it.
1095 *
1096 * <p>This method is always invoked by the thread performing
1097 * acquire. If this method reports failure, the acquire method
1098 * may queue the thread, if it is not already queued, until it is
1099 * signalled by a release from some other thread. This can be used
1100 * to implement method {@link Lock#tryLock()}.
1101 *
1102 * <p>The default
1103 * implementation throws {@link UnsupportedOperationException}.
1104 *
1105 * @param arg the acquire argument. This value is always the one
1106 * passed to an acquire method, or is the value saved on entry
1219 * {@code false} otherwise
1220 * @throws UnsupportedOperationException if conditions are not supported
1221 */
1222 protected boolean isHeldExclusively() {
1223 throw new UnsupportedOperationException();
1224 }
1225
1226 /**
1227 * Acquires in exclusive mode, ignoring interrupts. Implemented
1228 * by invoking at least once {@link #tryAcquire},
1229 * returning on success. Otherwise the thread is queued, possibly
1230 * repeatedly blocking and unblocking, invoking {@link
1231 * #tryAcquire} until success. This method can be used
1232 * to implement method {@link Lock#lock}.
1233 *
1234 * @param arg the acquire argument. This value is conveyed to
1235 * {@link #tryAcquire} but is otherwise uninterpreted and
1236 * can represent anything you like.
1237 */
1238 public final void acquire(int arg) {
1239 if (!tryAcquire(arg) &&
1240 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
1241 selfInterrupt();
1242 }
1243
1244 /**
1245 * Acquires in exclusive mode, aborting if interrupted.
1246 * Implemented by first checking interrupt status, then invoking
1247 * at least once {@link #tryAcquire}, returning on
1248 * success. Otherwise the thread is queued, possibly repeatedly
1249 * blocking and unblocking, invoking {@link #tryAcquire}
1250 * until success or the thread is interrupted. This method can be
1251 * used to implement method {@link Lock#lockInterruptibly}.
1252 *
1253 * @param arg the acquire argument. This value is conveyed to
1254 * {@link #tryAcquire} but is otherwise uninterpreted and
1255 * can represent anything you like.
1256 * @throws InterruptedException if the current thread is interrupted
1257 */
1258 public final void acquireInterruptibly(int arg)
1259 throws InterruptedException {
1260 if (Thread.interrupted())
1261 throw new InterruptedException();
1262 if (!tryAcquire(arg))
1263 doAcquireInterruptibly(arg);
1264 }
1265
1266 /**
1267 * Attempts to acquire in exclusive mode, aborting if interrupted,
1268 * and failing if the given timeout elapses. Implemented by first
1269 * checking interrupt status, then invoking at least once {@link
1270 * #tryAcquire}, returning on success. Otherwise, the thread is
1271 * queued, possibly repeatedly blocking and unblocking, invoking
1272 * {@link #tryAcquire} until success or the thread is interrupted
1273 * or the timeout elapses. This method can be used to implement
1274 * method {@link Lock#tryLock(long, TimeUnit)}.
1275 *
1276 * @param arg the acquire argument. This value is conveyed to
1277 * {@link #tryAcquire} but is otherwise uninterpreted and
1278 * can represent anything you like.
1279 * @param nanosTimeout the maximum number of nanoseconds to wait
1280 * @return {@code true} if acquired; {@code false} if timed out
1281 * @throws InterruptedException if the current thread is interrupted
1282 */
1283 public final boolean tryAcquireNanos(int arg, long nanosTimeout)
1284 throws InterruptedException {
1285 if (Thread.interrupted())
1286 throw new InterruptedException();
1287 return tryAcquire(arg) ||
1288 doAcquireNanos(arg, nanosTimeout);
1289 }
1290
1291 /**
1292 * Releases in exclusive mode. Implemented by unblocking one or
1293 * more threads if {@link #tryRelease} returns true.
1294 * This method can be used to implement method {@link Lock#unlock}.
1295 *
1296 * @param arg the release argument. This value is conveyed to
1297 * {@link #tryRelease} but is otherwise uninterpreted and
1298 * can represent anything you like.
1299 * @return the value returned from {@link #tryRelease}
1300 */
1301 public final boolean release(int arg) {
1302 if (tryRelease(arg)) {
1303 Node h = head;
1304 if (h != null && h.waitStatus != 0)
1305 unparkSuccessor(h);
1306 return true;
1307 }
1308 return false;
1309 }
1310
1311 /**
1312 * Acquires in shared mode, ignoring interrupts. Implemented by
1313 * first invoking at least once {@link #tryAcquireShared},
1314 * returning on success. Otherwise the thread is queued, possibly
1315 * repeatedly blocking and unblocking, invoking {@link
1316 * #tryAcquireShared} until success.
1317 *
1318 * @param arg the acquire argument. This value is conveyed to
1319 * {@link #tryAcquireShared} but is otherwise uninterpreted
1320 * and can represent anything you like.
1321 */
1322 public final void acquireShared(int arg) {
1323 if (tryAcquireShared(arg) < 0)
1324 doAcquireShared(arg);
1325 }
1326
1327 /**
1328 * Acquires in shared mode, aborting if interrupted. Implemented
1329 * by first checking interrupt status, then invoking at least once
1330 * {@link #tryAcquireShared}, returning on success. Otherwise the
1331 * thread is queued, possibly repeatedly blocking and unblocking,
1332 * invoking {@link #tryAcquireShared} until success or the thread
1333 * is interrupted.
1334 * @param arg the acquire argument.
1335 * This value is conveyed to {@link #tryAcquireShared} but is
1336 * otherwise uninterpreted and can represent anything
1337 * you like.
1338 * @throws InterruptedException if the current thread is interrupted
1339 */
1340 public final void acquireSharedInterruptibly(int arg)
1341 throws InterruptedException {
1342 if (Thread.interrupted())
1343 throw new InterruptedException();
1344 if (tryAcquireShared(arg) < 0)
1345 doAcquireSharedInterruptibly(arg);
1346 }
1347
1348 /**
1349 * Attempts to acquire in shared mode, aborting if interrupted, and
1350 * failing if the given timeout elapses. Implemented by first
1351 * checking interrupt status, then invoking at least once {@link
1352 * #tryAcquireShared}, returning on success. Otherwise, the
1353 * thread is queued, possibly repeatedly blocking and unblocking,
1354 * invoking {@link #tryAcquireShared} until success or the thread
1355 * is interrupted or the timeout elapses.
1356 *
1357 * @param arg the acquire argument. This value is conveyed to
1358 * {@link #tryAcquireShared} but is otherwise uninterpreted
1359 * and can represent anything you like.
1360 * @param nanosTimeout the maximum number of nanoseconds to wait
1361 * @return {@code true} if acquired; {@code false} if timed out
1362 * @throws InterruptedException if the current thread is interrupted
1363 */
1364 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
1365 throws InterruptedException {
1366 if (Thread.interrupted())
1367 throw new InterruptedException();
1368 return tryAcquireShared(arg) >= 0 ||
1369 doAcquireSharedNanos(arg, nanosTimeout);
1370 }
1371
1372 /**
1373 * Releases in shared mode. Implemented by unblocking one or more
1374 * threads if {@link #tryReleaseShared} returns true.
1375 *
1376 * @param arg the release argument. This value is conveyed to
1377 * {@link #tryReleaseShared} but is otherwise uninterpreted
1378 * and can represent anything you like.
1379 * @return the value returned from {@link #tryReleaseShared}
1380 */
1381 public final boolean releaseShared(int arg) {
1382 if (tryReleaseShared(arg)) {
1383 doReleaseShared();
1384 return true;
1385 }
1386 return false;
1387 }
1388
1389 // Queue inspection methods
1390
1391 /**
1392 * Queries whether any threads are waiting to acquire. Note that
1393 * because cancellations due to interrupts and timeouts may occur
1394 * at any time, a {@code true} return does not guarantee that any
1395 * other thread will ever acquire.
1396 *
1397 * @return {@code true} if there may be other threads waiting to acquire
1398 */
1399 public final boolean hasQueuedThreads() {
1400 for (Node p = tail, h = head; p != h && p != null; p = p.prev)
1401 if (p.waitStatus <= 0)
1402 return true;
1403 return false;
1404 }
1405
1406 /**
1407 * Queries whether any threads have ever contended to acquire this
1408 * synchronizer; that is, if an acquire method has ever blocked.
1409 *
1410 * <p>In this implementation, this operation returns in
1411 * constant time.
1412 *
1413 * @return {@code true} if there has ever been contention
1414 */
1415 public final boolean hasContended() {
1416 return head != null;
1417 }
1418
1419 /**
1420 * Returns the first (longest-waiting) thread in the queue, or
1421 * {@code null} if no threads are currently queued.
1422 *
1423 * <p>In this implementation, this operation normally returns in
1424 * constant time, but may iterate upon contention if other threads are
1425 * concurrently modifying the queue.
1426 *
1427 * @return the first (longest-waiting) thread in the queue, or
1428 * {@code null} if no threads are currently queued
1429 */
1430 public final Thread getFirstQueuedThread() {
1431 // handle only fast path, else relay
1432 return (head == tail) ? null : fullGetFirstQueuedThread();
1433 }
1434
1435 /**
1436 * Version of getFirstQueuedThread called when fastpath fails.
1437 */
1438 private Thread fullGetFirstQueuedThread() {
1439 /*
1440 * The first node is normally head.next. Try to get its
1441 * thread field, ensuring consistent reads: If thread
1442 * field is nulled out or s.prev is no longer head, then
1443 * some other thread(s) concurrently performed setHead in
1444 * between some of our reads. We try this twice before
1445 * resorting to traversal.
1446 */
1447 Node h, s;
1448 Thread st;
1449 if (((h = head) != null && (s = h.next) != null &&
1450 s.prev == head && (st = s.thread) != null) ||
1451 ((h = head) != null && (s = h.next) != null &&
1452 s.prev == head && (st = s.thread) != null))
1453 return st;
1454
1455 /*
1456 * Head's next field might not have been set yet, or may have
1457 * been unset after setHead. So we must check to see if tail
1458 * is actually first node. If not, we continue on, safely
1459 * traversing from tail back to head to find first,
1460 * guaranteeing termination.
1461 */
1462
1463 Thread firstThread = null;
1464 for (Node p = tail; p != null && p != head; p = p.prev) {
1465 Thread t = p.thread;
1466 if (t != null)
1467 firstThread = t;
1468 }
1469 return firstThread;
1470 }
1471
1472 /**
1473 * Returns true if the given thread is currently queued.
1474 *
1475 * <p>This implementation traverses the queue to determine
1476 * presence of the given thread.
1477 *
1478 * @param thread the thread
1479 * @return {@code true} if the given thread is on the queue
1480 * @throws NullPointerException if the thread is null
1481 */
1482 public final boolean isQueued(Thread thread) {
1483 if (thread == null)
1484 throw new NullPointerException();
1485 for (Node p = tail; p != null; p = p.prev)
1486 if (p.thread == thread)
1487 return true;
1488 return false;
1489 }
1490
1491 /**
1492 * Returns {@code true} if the apparent first queued thread, if one
1493 * exists, is waiting in exclusive mode. If this method returns
1494 * {@code true}, and the current thread is attempting to acquire in
1495 * shared mode (that is, this method is invoked from {@link
1496 * #tryAcquireShared}) then it is guaranteed that the current thread
1497 * is not the first queued thread. Used only as a heuristic in
1498 * ReentrantReadWriteLock.
1499 */
1500 final boolean apparentlyFirstQueuedIsExclusive() {
1501 Node h, s;
1502 return (h = head) != null &&
1503 (s = h.next) != null &&
1504 !s.isShared() &&
1505 s.thread != null;
1506 }
1507
1508 /**
1509 * Queries whether any threads have been waiting to acquire longer
1510 * than the current thread.
1511 *
1512 * <p>An invocation of this method is equivalent to (but may be
1513 * more efficient than):
1514 * <pre> {@code
1515 * getFirstQueuedThread() != Thread.currentThread()
1516 * && hasQueuedThreads()}</pre>
1517 *
1518 * <p>Note that because cancellations due to interrupts and
1519 * timeouts may occur at any time, a {@code true} return does not
1520 * guarantee that some other thread will acquire before the current
1521 * thread. Likewise, it is possible for another thread to win a
1522 * race to enqueue after this method has returned {@code false},
1523 * due to the queue being empty.
1524 *
1525 * <p>This method is designed to be used by a fair synchronizer to
1532 * synchronizer might look like this:
1533 *
1534 * <pre> {@code
1535 * protected boolean tryAcquire(int arg) {
1536 * if (isHeldExclusively()) {
1537 * // A reentrant acquire; increment hold count
1538 * return true;
1539 * } else if (hasQueuedPredecessors()) {
1540 * return false;
1541 * } else {
1542 * // try to acquire normally
1543 * }
1544 * }}</pre>
1545 *
1546 * @return {@code true} if there is a queued thread preceding the
1547 * current thread, and {@code false} if the current thread
1548 * is at the head of the queue or the queue is empty
1549 * @since 1.7
1550 */
1551 public final boolean hasQueuedPredecessors() {
1552 Node h, s;
1553 if ((h = head) != null) {
1554 if ((s = h.next) == null || s.waitStatus > 0) {
1555 s = null; // traverse in case of concurrent cancellation
1556 for (Node p = tail; p != h && p != null; p = p.prev) {
1557 if (p.waitStatus <= 0)
1558 s = p;
1559 }
1560 }
1561 if (s != null && s.thread != Thread.currentThread())
1562 return true;
1563 }
1564 return false;
1565 }
1566
1567 // Instrumentation and monitoring methods
1568
1569 /**
1570 * Returns an estimate of the number of threads waiting to
1571 * acquire. The value is only an estimate because the number of
1572 * threads may change dynamically while this method traverses
1573 * internal data structures. This method is designed for use in
1574 * monitoring system state, not for synchronization control.
1575 *
1576 * @return the estimated number of threads waiting to acquire
1577 */
1578 public final int getQueueLength() {
1579 int n = 0;
1580 for (Node p = tail; p != null; p = p.prev) {
1581 if (p.thread != null)
1582 ++n;
1583 }
1584 return n;
1585 }
1586
1587 /**
1588 * Returns a collection containing threads that may be waiting to
1589 * acquire. Because the actual set of threads may change
1590 * dynamically while constructing this result, the returned
1591 * collection is only a best-effort estimate. The elements of the
1592 * returned collection are in no particular order. This method is
1593 * designed to facilitate construction of subclasses that provide
1594 * more extensive monitoring facilities.
1595 *
1596 * @return the collection of threads
1597 */
1598 public final Collection<Thread> getQueuedThreads() {
1599 ArrayList<Thread> list = new ArrayList<>();
1600 for (Node p = tail; p != null; p = p.prev) {
1601 Thread t = p.thread;
1602 if (t != null)
1603 list.add(t);
1604 }
1605 return list;
1606 }
1607
1608 /**
1609 * Returns a collection containing threads that may be waiting to
1610 * acquire in exclusive mode. This has the same properties
1611 * as {@link #getQueuedThreads} except that it only returns
1612 * those threads waiting due to an exclusive acquire.
1613 *
1614 * @return the collection of threads
1615 */
1616 public final Collection<Thread> getExclusiveQueuedThreads() {
1617 ArrayList<Thread> list = new ArrayList<>();
1618 for (Node p = tail; p != null; p = p.prev) {
1619 if (!p.isShared()) {
1620 Thread t = p.thread;
1621 if (t != null)
1622 list.add(t);
1623 }
1624 }
1625 return list;
1626 }
1627
1628 /**
1629 * Returns a collection containing threads that may be waiting to
1630 * acquire in shared mode. This has the same properties
1631 * as {@link #getQueuedThreads} except that it only returns
1632 * those threads waiting due to a shared acquire.
1633 *
1634 * @return the collection of threads
1635 */
1636 public final Collection<Thread> getSharedQueuedThreads() {
1637 ArrayList<Thread> list = new ArrayList<>();
1638 for (Node p = tail; p != null; p = p.prev) {
1639 if (p.isShared()) {
1640 Thread t = p.thread;
1641 if (t != null)
1642 list.add(t);
1643 }
1644 }
1645 return list;
1646 }
1647
1648 /**
1649 * Returns a string identifying this synchronizer, as well as its state.
1650 * The state, in brackets, includes the String {@code "State ="}
1651 * followed by the current value of {@link #getState}, and either
1652 * {@code "nonempty"} or {@code "empty"} depending on whether the
1653 * queue is empty.
1654 *
1655 * @return a string identifying this synchronizer, as well as its state
1656 */
1657 public String toString() {
1658 return super.toString()
1659 + "[State = " + getState() + ", "
1660 + (hasQueuedThreads() ? "non" : "") + "empty queue]";
1661 }
1662
1663
1664 // Internal support methods for Conditions
1665
1666 /**
1667 * Returns true if a node, always one that was initially placed on
1668 * a condition queue, is now waiting to reacquire on sync queue.
1669 * @param node the node
1670 * @return true if is reacquiring
1671 */
1672 final boolean isOnSyncQueue(Node node) {
1673 if (node.waitStatus == Node.CONDITION || node.prev == null)
1674 return false;
1675 if (node.next != null) // If has successor, it must be on queue
1676 return true;
1677 /*
1678 * node.prev can be non-null, but not yet on queue because
1679 * the CAS to place it on queue can fail. So we have to
1680 * traverse from tail to make sure it actually made it. It
1681 * will always be near the tail in calls to this method, and
1682 * unless the CAS failed (which is unlikely), it will be
1683 * there, so we hardly ever traverse much.
1684 */
1685 return findNodeFromTail(node);
1686 }
1687
1688 /**
1689 * Returns true if node is on sync queue by searching backwards from tail.
1690 * Called only when needed by isOnSyncQueue.
1691 * @return true if present
1692 */
1693 private boolean findNodeFromTail(Node node) {
1694 // We check for node first, since it's likely to be at or near tail.
1695 // tail is known to be non-null, so we could re-order to "save"
1696 // one null check, but we leave it this way to help the VM.
1697 for (Node p = tail;;) {
1698 if (p == node)
1699 return true;
1700 if (p == null)
1701 return false;
1702 p = p.prev;
1703 }
1704 }
1705
1706 /**
1707 * Transfers a node from a condition queue onto sync queue.
1708 * Returns true if successful.
1709 * @param node the node
1710 * @return true if successfully transferred (else the node was
1711 * cancelled before signal)
1712 */
1713 final boolean transferForSignal(Node node) {
1714 /*
1715 * If cannot change waitStatus, the node has been cancelled.
1716 */
1717 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
1718 return false;
1719
1720 /*
1721 * Splice onto queue and try to set waitStatus of predecessor to
1722 * indicate that thread is (probably) waiting. If cancelled or
1723 * attempt to set waitStatus fails, wake up to resync (in which
1724 * case the waitStatus can be transiently and harmlessly wrong).
1725 */
1726 Node p = enq(node);
1727 int ws = p.waitStatus;
1728 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
1729 LockSupport.unpark(node.thread);
1730 return true;
1731 }
1732
1733 /**
1734 * Transfers node, if necessary, to sync queue after a cancelled wait.
1735 * Returns true if thread was cancelled before being signalled.
1736 *
1737 * @param node the node
1738 * @return true if cancelled before the node was signalled
1739 */
1740 final boolean transferAfterCancelledWait(Node node) {
1741 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
1742 enq(node);
1743 return true;
1744 }
1745 /*
1746 * If we lost out to a signal(), then we can't proceed
1747 * until it finishes its enq(). Cancelling during an
1748 * incomplete transfer is both rare and transient, so just
1749 * spin.
1750 */
1751 while (!isOnSyncQueue(node))
1752 Thread.yield();
1753 return false;
1754 }
1755
1756 /**
1757 * Invokes release with current state value; returns saved state.
1758 * Cancels node and throws exception on failure.
1759 * @param node the condition node for this wait
1760 * @return previous sync state
1761 */
1762 final int fullyRelease(Node node) {
1763 try {
1764 int savedState = getState();
1765 if (release(savedState))
1766 return savedState;
1767 throw new IllegalMonitorStateException();
1768 } catch (Throwable t) {
1769 node.waitStatus = Node.CANCELLED;
1770 throw t;
1771 }
1772 }
1773
1774 // Instrumentation methods for conditions
1775
1776 /**
1777 * Queries whether the given ConditionObject
1778 * uses this synchronizer as its lock.
1779 *
1780 * @param condition the condition
1781 * @return {@code true} if owned
1782 * @throws NullPointerException if the condition is null
1783 */
1784 public final boolean owns(ConditionObject condition) {
1785 return condition.isOwnedBy(this);
1786 }
1787
1788 /**
1789 * Queries whether any threads are waiting on the given condition
1790 * associated with this synchronizer. Note that because timeouts
1791 * and interrupts may occur at any time, a {@code true} return
1792 * does not guarantee that a future {@code signal} will awaken
1793 * any threads. This method is designed primarily for use in
1851 return condition.getWaitingThreads();
1852 }
1853
1854 /**
1855 * Condition implementation for a {@link AbstractQueuedSynchronizer}
1856 * serving as the basis of a {@link Lock} implementation.
1857 *
1858 * <p>Method documentation for this class describes mechanics,
1859 * not behavioral specifications from the point of view of Lock
1860 * and Condition users. Exported versions of this class will in
1861 * general need to be accompanied by documentation describing
1862 * condition semantics that rely on those of the associated
1863 * {@code AbstractQueuedSynchronizer}.
1864 *
1865 * <p>This class is Serializable, but all fields are transient,
1866 * so deserialized conditions have no waiters.
1867 */
1868 public class ConditionObject implements Condition, java.io.Serializable {
1869 private static final long serialVersionUID = 1173984872572414699L;
1870 /** First node of condition queue. */
1871 private transient Node firstWaiter;
1872 /** Last node of condition queue. */
1873 private transient Node lastWaiter;
1874
1875 /**
1876 * Creates a new {@code ConditionObject} instance.
1877 */
1878 public ConditionObject() { }
1879
1880 // Internal methods
1881
1882 /**
1883 * Adds a new waiter to wait queue.
1884 * @return its new wait node
1885 */
1886 private Node addConditionWaiter() {
1887 if (!isHeldExclusively())
1888 throw new IllegalMonitorStateException();
1889 Node t = lastWaiter;
1890 // If lastWaiter is cancelled, clean out.
1891 if (t != null && t.waitStatus != Node.CONDITION) {
1892 unlinkCancelledWaiters();
1893 t = lastWaiter;
1894 }
1895
1896 Node node = new Node(Node.CONDITION);
1897
1898 if (t == null)
1899 firstWaiter = node;
1900 else
1901 t.nextWaiter = node;
1902 lastWaiter = node;
1903 return node;
1904 }
1905
1906 /**
1907 * Removes and transfers nodes until hit non-cancelled one or
1908 * null. Split out from signal in part to encourage compilers
1909 * to inline the case of no waiters.
1910 * @param first (non-null) the first node on condition queue
1911 */
1912 private void doSignal(Node first) {
1913 do {
1914 if ( (firstWaiter = first.nextWaiter) == null)
1915 lastWaiter = null;
1916 first.nextWaiter = null;
1917 } while (!transferForSignal(first) &&
1918 (first = firstWaiter) != null);
1919 }
1920
1921 /**
1922 * Removes and transfers all nodes.
1923 * @param first (non-null) the first node on condition queue
1924 */
1925 private void doSignalAll(Node first) {
1926 lastWaiter = firstWaiter = null;
1927 do {
1928 Node next = first.nextWaiter;
1929 first.nextWaiter = null;
1930 transferForSignal(first);
1931 first = next;
1932 } while (first != null);
1933 }
1934
1935 /**
1936 * Unlinks cancelled waiter nodes from condition queue.
1937 * Called only while holding lock. This is called when
1938 * cancellation occurred during condition wait, and upon
1939 * insertion of a new waiter when lastWaiter is seen to have
1940 * been cancelled. This method is needed to avoid garbage
1941 * retention in the absence of signals. So even though it may
1942 * require a full traversal, it comes into play only when
1943 * timeouts or cancellations occur in the absence of
1944 * signals. It traverses all nodes rather than stopping at a
1945 * particular target to unlink all pointers to garbage nodes
1946 * without requiring many re-traversals during cancellation
1947 * storms.
1948 */
1949 private void unlinkCancelledWaiters() {
1950 Node t = firstWaiter;
1951 Node trail = null;
1952 while (t != null) {
1953 Node next = t.nextWaiter;
1954 if (t.waitStatus != Node.CONDITION) {
1955 t.nextWaiter = null;
1956 if (trail == null)
1957 firstWaiter = next;
1958 else
1959 trail.nextWaiter = next;
1960 if (next == null)
1961 lastWaiter = trail;
1962 }
1963 else
1964 trail = t;
1965 t = next;
1966 }
1967 }
1968
1969 // public methods
1970
1971 /**
1972 * Moves the longest-waiting thread, if one exists, from the
1973 * wait queue for this condition to the wait queue for the
1974 * owning lock.
1975 *
1976 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1977 * returns {@code false}
1978 */
1979 public final void signal() {
1980 if (!isHeldExclusively())
1981 throw new IllegalMonitorStateException();
1982 Node first = firstWaiter;
1983 if (first != null)
1984 doSignal(first);
1985 }
1986
1987 /**
1988 * Moves all threads from the wait queue for this condition to
1989 * the wait queue for the owning lock.
1990 *
1991 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1992 * returns {@code false}
1993 */
1994 public final void signalAll() {
1995 if (!isHeldExclusively())
1996 throw new IllegalMonitorStateException();
1997 Node first = firstWaiter;
1998 if (first != null)
1999 doSignalAll(first);
2000 }
2001
2002 /**
2003 * Implements uninterruptible condition wait.
2004 * <ol>
2005 * <li>Save lock state returned by {@link #getState}.
2006 * <li>Invoke {@link #release} with saved state as argument,
2007 * throwing IllegalMonitorStateException if it fails.
2008 * <li>Block until signalled.
2009 * <li>Reacquire by invoking specialized version of
2010 * {@link #acquire} with saved state as argument.
2011 * </ol>
2012 */
2013 public final void awaitUninterruptibly() {
2014 Node node = addConditionWaiter();
2015 int savedState = fullyRelease(node);
2016 boolean interrupted = false;
2017 while (!isOnSyncQueue(node)) {
2018 LockSupport.park(this);
2019 if (Thread.interrupted())
2020 interrupted = true;
2021 }
2022 if (acquireQueued(node, savedState) || interrupted)
2023 selfInterrupt();
2024 }
2025
2026 /*
2027 * For interruptible waits, we need to track whether to throw
2028 * InterruptedException, if interrupted while blocked on
2029 * condition, versus reinterrupt current thread, if
2030 * interrupted while blocked waiting to re-acquire.
2031 */
2032
2033 /** Mode meaning to reinterrupt on exit from wait */
2034 private static final int REINTERRUPT = 1;
2035 /** Mode meaning to throw InterruptedException on exit from wait */
2036 private static final int THROW_IE = -1;
2037
2038 /**
2039 * Checks for interrupt, returning THROW_IE if interrupted
2040 * before signalled, REINTERRUPT if after signalled, or
2041 * 0 if not interrupted.
2042 */
2043 private int checkInterruptWhileWaiting(Node node) {
2044 return Thread.interrupted() ?
2045 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
2046 0;
2047 }
2048
2049 /**
2050 * Throws InterruptedException, reinterrupts current thread, or
2051 * does nothing, depending on mode.
2052 */
2053 private void reportInterruptAfterWait(int interruptMode)
2054 throws InterruptedException {
2055 if (interruptMode == THROW_IE)
2056 throw new InterruptedException();
2057 else if (interruptMode == REINTERRUPT)
2058 selfInterrupt();
2059 }
2060
2061 /**
2062 * Implements interruptible condition wait.
2063 * <ol>
2064 * <li>If current thread is interrupted, throw InterruptedException.
2065 * <li>Save lock state returned by {@link #getState}.
2066 * <li>Invoke {@link #release} with saved state as argument,
2067 * throwing IllegalMonitorStateException if it fails.
2068 * <li>Block until signalled or interrupted.
2069 * <li>Reacquire by invoking specialized version of
2070 * {@link #acquire} with saved state as argument.
2071 * <li>If interrupted while blocked in step 4, throw InterruptedException.
2072 * </ol>
2073 */
2074 public final void await() throws InterruptedException {
2075 if (Thread.interrupted())
2076 throw new InterruptedException();
2077 Node node = addConditionWaiter();
2078 int savedState = fullyRelease(node);
2079 int interruptMode = 0;
2080 while (!isOnSyncQueue(node)) {
2081 LockSupport.park(this);
2082 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
2083 break;
2084 }
2085 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
2086 interruptMode = REINTERRUPT;
2087 if (node.nextWaiter != null) // clean up if cancelled
2088 unlinkCancelledWaiters();
2089 if (interruptMode != 0)
2090 reportInterruptAfterWait(interruptMode);
2091 }
2092
2093 /**
2094 * Implements timed condition wait.
2095 * <ol>
2096 * <li>If current thread is interrupted, throw InterruptedException.
2097 * <li>Save lock state returned by {@link #getState}.
2098 * <li>Invoke {@link #release} with saved state as argument,
2099 * throwing IllegalMonitorStateException if it fails.
2100 * <li>Block until signalled, interrupted, or timed out.
2101 * <li>Reacquire by invoking specialized version of
2102 * {@link #acquire} with saved state as argument.
2103 * <li>If interrupted while blocked in step 4, throw InterruptedException.
2104 * </ol>
2105 */
2106 public final long awaitNanos(long nanosTimeout)
2107 throws InterruptedException {
2108 if (Thread.interrupted())
2109 throw new InterruptedException();
2110 // We don't check for nanosTimeout <= 0L here, to allow
2111 // awaitNanos(0) as a way to "yield the lock".
2112 final long deadline = System.nanoTime() + nanosTimeout;
2113 long initialNanos = nanosTimeout;
2114 Node node = addConditionWaiter();
2115 int savedState = fullyRelease(node);
2116 int interruptMode = 0;
2117 while (!isOnSyncQueue(node)) {
2118 if (nanosTimeout <= 0L) {
2119 transferAfterCancelledWait(node);
2120 break;
2121 }
2122 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
2123 LockSupport.parkNanos(this, nanosTimeout);
2124 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
2125 break;
2126 nanosTimeout = deadline - System.nanoTime();
2127 }
2128 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
2129 interruptMode = REINTERRUPT;
2130 if (node.nextWaiter != null)
2131 unlinkCancelledWaiters();
2132 if (interruptMode != 0)
2133 reportInterruptAfterWait(interruptMode);
2134 long remaining = deadline - System.nanoTime(); // avoid overflow
2135 return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
2136 }
2137
2138 /**
2139 * Implements absolute timed condition wait.
2140 * <ol>
2141 * <li>If current thread is interrupted, throw InterruptedException.
2142 * <li>Save lock state returned by {@link #getState}.
2143 * <li>Invoke {@link #release} with saved state as argument,
2144 * throwing IllegalMonitorStateException if it fails.
2145 * <li>Block until signalled, interrupted, or timed out.
2146 * <li>Reacquire by invoking specialized version of
2147 * {@link #acquire} with saved state as argument.
2148 * <li>If interrupted while blocked in step 4, throw InterruptedException.
2149 * <li>If timed out while blocked in step 4, return false, else true.
2150 * </ol>
2151 */
2152 public final boolean awaitUntil(Date deadline)
2153 throws InterruptedException {
2154 long abstime = deadline.getTime();
2155 if (Thread.interrupted())
2156 throw new InterruptedException();
2157 Node node = addConditionWaiter();
2158 int savedState = fullyRelease(node);
2159 boolean timedout = false;
2160 int interruptMode = 0;
2161 while (!isOnSyncQueue(node)) {
2162 if (System.currentTimeMillis() >= abstime) {
2163 timedout = transferAfterCancelledWait(node);
2164 break;
2165 }
2166 LockSupport.parkUntil(this, abstime);
2167 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
2168 break;
2169 }
2170 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
2171 interruptMode = REINTERRUPT;
2172 if (node.nextWaiter != null)
2173 unlinkCancelledWaiters();
2174 if (interruptMode != 0)
2175 reportInterruptAfterWait(interruptMode);
2176 return !timedout;
2177 }
2178
2179 /**
2180 * Implements timed condition wait.
2181 * <ol>
2182 * <li>If current thread is interrupted, throw InterruptedException.
2183 * <li>Save lock state returned by {@link #getState}.
2184 * <li>Invoke {@link #release} with saved state as argument,
2185 * throwing IllegalMonitorStateException if it fails.
2186 * <li>Block until signalled, interrupted, or timed out.
2187 * <li>Reacquire by invoking specialized version of
2188 * {@link #acquire} with saved state as argument.
2189 * <li>If interrupted while blocked in step 4, throw InterruptedException.
2190 * <li>If timed out while blocked in step 4, return false, else true.
2191 * </ol>
2192 */
2193 public final boolean await(long time, TimeUnit unit)
2194 throws InterruptedException {
2195 long nanosTimeout = unit.toNanos(time);
2196 if (Thread.interrupted())
2197 throw new InterruptedException();
2198 // We don't check for nanosTimeout <= 0L here, to allow
2199 // await(0, unit) as a way to "yield the lock".
2200 final long deadline = System.nanoTime() + nanosTimeout;
2201 Node node = addConditionWaiter();
2202 int savedState = fullyRelease(node);
2203 boolean timedout = false;
2204 int interruptMode = 0;
2205 while (!isOnSyncQueue(node)) {
2206 if (nanosTimeout <= 0L) {
2207 timedout = transferAfterCancelledWait(node);
2208 break;
2209 }
2210 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
2211 LockSupport.parkNanos(this, nanosTimeout);
2212 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
2213 break;
2214 nanosTimeout = deadline - System.nanoTime();
2215 }
2216 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
2217 interruptMode = REINTERRUPT;
2218 if (node.nextWaiter != null)
2219 unlinkCancelledWaiters();
2220 if (interruptMode != 0)
2221 reportInterruptAfterWait(interruptMode);
2222 return !timedout;
2223 }
2224
2225 // support for instrumentation
2226
2227 /**
2228 * Returns true if this condition was created by the given
2229 * synchronization object.
2230 *
2231 * @return {@code true} if owned
2232 */
2233 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
2234 return sync == AbstractQueuedSynchronizer.this;
2235 }
2236
2237 /**
2238 * Queries whether any threads are waiting on this condition.
2239 * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
2240 *
2241 * @return {@code true} if there are any waiting threads
2242 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2243 * returns {@code false}
2244 */
2245 protected final boolean hasWaiters() {
2246 if (!isHeldExclusively())
2247 throw new IllegalMonitorStateException();
2248 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2249 if (w.waitStatus == Node.CONDITION)
2250 return true;
2251 }
2252 return false;
2253 }
2254
2255 /**
2256 * Returns an estimate of the number of threads waiting on
2257 * this condition.
2258 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
2259 *
2260 * @return the estimated number of waiting threads
2261 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2262 * returns {@code false}
2263 */
2264 protected final int getWaitQueueLength() {
2265 if (!isHeldExclusively())
2266 throw new IllegalMonitorStateException();
2267 int n = 0;
2268 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2269 if (w.waitStatus == Node.CONDITION)
2270 ++n;
2271 }
2272 return n;
2273 }
2274
2275 /**
2276 * Returns a collection containing those threads that may be
2277 * waiting on this Condition.
2278 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
2279 *
2280 * @return the collection of threads
2281 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2282 * returns {@code false}
2283 */
2284 protected final Collection<Thread> getWaitingThreads() {
2285 if (!isHeldExclusively())
2286 throw new IllegalMonitorStateException();
2287 ArrayList<Thread> list = new ArrayList<>();
2288 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2289 if (w.waitStatus == Node.CONDITION) {
2290 Thread t = w.thread;
2291 if (t != null)
2292 list.add(t);
2293 }
2294 }
2295 return list;
2296 }
2297 }
2298
2299 // VarHandle mechanics
2300 private static final VarHandle STATE;
2301 private static final VarHandle HEAD;
2302 private static final VarHandle TAIL;
2303
2304 static {
2305 try {
2306 MethodHandles.Lookup l = MethodHandles.lookup();
2307 STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
2308 HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
2309 TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
2310 } catch (ReflectiveOperationException e) {
2311 throw new ExceptionInInitializerError(e);
2312 }
2313
2314 // Reduce the risk of rare disastrous classloading in first call to
2315 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
2316 Class<?> ensureLoaded = LockSupport.class;
2317 }
2318
2319 /**
2320 * Initializes head and tail fields on first contention.
2321 */
2322 private final void initializeSyncQueue() {
2323 Node h;
2324 if (HEAD.compareAndSet(this, null, (h = new Node())))
2325 tail = h;
2326 }
2327
2328 /**
2329 * CASes tail field.
2330 */
2331 private final boolean compareAndSetTail(Node expect, Node update) {
2332 return TAIL.compareAndSet(this, expect, update);
2333 }
2334 }
|
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent.locks;
37
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.Date;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.ForkJoinPool;
43 import jdk.internal.misc.Unsafe;
44
45 /**
46 * Provides a framework for implementing blocking locks and related
47 * synchronizers (semaphores, events, etc) that rely on
48 * first-in-first-out (FIFO) wait queues. This class is designed to
49 * be a useful basis for most kinds of synchronizers that rely on a
50 * single atomic {@code int} value to represent state. Subclasses
51 * must define the protected methods that change this state, and which
52 * define what that state means in terms of this object being acquired
53 * or released. Given these, the other methods in this class carry
54 * out all queuing and blocking mechanics. Subclasses can maintain
55 * other state fields, but only the atomically updated {@code int}
56 * value manipulated using methods {@link #getState}, {@link
57 * #setState} and {@link #compareAndSetState} is tracked with respect
58 * to synchronization.
59 *
60 * <p>Subclasses should be defined as non-public internal helper
61 * classes that are used to implement the synchronization properties
62 * of their enclosing class. Class
63 * {@code AbstractQueuedSynchronizer} does not implement any
295 * public void await() throws InterruptedException {
296 * sync.acquireSharedInterruptibly(1);
297 * }
298 * }}</pre>
299 *
300 * @since 1.5
301 * @author Doug Lea
302 */
303 public abstract class AbstractQueuedSynchronizer
304 extends AbstractOwnableSynchronizer
305 implements java.io.Serializable {
306
307 private static final long serialVersionUID = 7373984972572414691L;
308
309 /**
310 * Creates a new {@code AbstractQueuedSynchronizer} instance
311 * with initial synchronization state of zero.
312 */
313 protected AbstractQueuedSynchronizer() { }
314
315 /*
316 * Overview.
317 *
318 * The wait queue is a variant of a "CLH" (Craig, Landin, and
319 * Hagersten) lock queue. CLH locks are normally used for
320 * spinlocks. We instead use them for blocking synchronizers by
321 * including explicit ("prev" and "next") links plus a "status"
322 * field that allow nodes to signal successors when releasing
323 * locks, and handle cancellation due to interrupts and timeouts.
324 * The status field includes bits that track whether a thread
325 * needs a signal (using LockSupport.unpark). Despite these
326 * additions, we maintain most CLH locality properties.
327 *
328 * To enqueue into a CLH lock, you atomically splice it in as new
329 * tail. To dequeue, you set the head field, so the next eligible
330 * waiter becomes first.
331 *
332 * +------+ prev +-------+ +------+
333 * | head | <---- | first | <---- | tail |
334 * +------+ +-------+ +------+
335 *
336 * Insertion into a CLH queue requires only a single atomic
337 * operation on "tail", so there is a simple point of demarcation
338 * from unqueued to queued. The "next" link of the predecessor is
339 * set by the enqueuing thread after successful CAS. Even though
340 * non-atomic, this suffices to ensure that any blocked thread is
341 * signalled by a predecessor when eligible (although in the case
342 * of cancellation, possibly with the assistance of a signal in
343 * method cleanQueue). Signalling is based in part on a
344 * Dekker-like scheme in which the to-be waiting thread indicates
345 * WAITING status, then retries acquiring, and then rechecks
346 * status before blocking. The signaller atomically clears WAITING
347 * status when unparking.
348 *
349 * Dequeuing on acquire involves detaching (nulling) a node's
350 * "prev" node and then updating the "head". Other threads check
351 * if a node is or was dequeued by checking "prev" rather than
352 * head. We enforce the nulling then setting order by spin-waiting
353 * if necessary. Because of this, the lock algorithm is not itself
354 * strictly "lock-free" because an acquiring thread may need to
355 * wait for a previous acquire to make progress. When used with
356 * exclusive locks, such progress is required anyway. However
357 * Shared mode may (uncommonly) require a spin-wait before
358 * setting head field to ensure proper propagation. (Historical
359 * note: This allows some simplifications and efficiencies
360 * compared to previous versions of this class.)
361 *
362 * A node's predecessor can change due to cancellation while it is
363 * waiting, until the node is first in queue, at which point it
364 * cannot change. The acquire methods cope with this by rechecking
365 * "prev" before waiting. The prev and next fields are modified
366 * only via CAS by cancelled nodes in method cleanQueue. The
367 * unsplice strategy is reminiscent of Michael-Scott queues in
368 * that after a successful CAS to prev field, other threads help
369 * fix next fields. Because cancellation often occurs in bunches
370 * that complicate decisions about necessary signals, each call to
371 * cleanQueue traverses the queue until a clean sweep. Nodes that
372 * become relinked as first are unconditionally unparked
373 * (sometimes unnecessarily, but those cases are not worth
374 * avoiding).
375 *
376 * A thread may try to acquire if it is first (frontmost) in the
377 * queue, and sometimes before. Being first does not guarantee
378 * success; it only gives the right to contend. We balance
379 * throughput, overhead, and fairness by allowing incoming threads
380 * to "barge" and acquire the synchronizer while in the process of
381 * enqueuing, in which case an awakened first thread may need to
382 * rewait. To counteract possible repeated unlucky rewaits, we
383 * exponentially increase retries (up to 256) to acquire each time
384 * a thread is unparked. Except in this case, AQS locks do not
385 * spin; they instead interleave attempts to acquire with
386 * bookkeeping steps. (Users who want spinlocks can use
387 * tryAcquire.)
388 *
389 * To improve garbage collectibility, fields of nodes not yet on
390 * list are null. (It is not rare to create and then throw away a
391 * node without using it.) Fields of nodes coming off the list are
392 * nulled out as soon as possible. This accentuates the challenge
393 * of externally determining the first waiting thread (as in
394 * method getFirstQueuedThread). This sometimes requires the
395 * fallback of traversing backwards from the atomically updated
396 * "tail" when fields appear null. (This is never needed in the
397 * process of signalling though.)
398 *
399 * CLH queues need a dummy header node to get started. But
400 * we don't create them on construction, because it would be wasted
401 * effort if there is never contention. Instead, the node
402 * is constructed and head and tail pointers are set upon first
403 * contention.
404 *
405 * Shared mode operations differ from Exclusive in that an acquire
406 * signals the next waiter to try to acquire if it is also
407 * Shared. The tryAcquireShared API allows users to indicate the
408 * degree of propagation, but in most applications, it is more
409 * efficient to ignore this, allowing the successor to try
410 * acquiring in any case.
411 *
412 * Threads waiting on Conditions use nodes with an additional
413 * link to maintain the (FIFO) list of conditions. Conditions only
414 * need to link nodes in simple (non-concurrent) linked queues
415 * because they are only accessed when exclusively held. Upon
416 * await, a node is inserted into a condition queue. Upon signal,
417 * the node is enqueued on the main queue. A special status field
418 * value is used to track and atomically trigger this.
419 *
420 * Accesses to fields head, tail, and state use full Volatile
421 * mode, along with CAS. Node fields status, prev and next also do
422 * so while threads may be signallable, but sometimes use weaker
423 * modes otherwise. Accesses to field "waiter" (the thread to be
424 * signalled) are always sandwiched between other atomic accesses
425 * so are used in Plain mode. We use jdk.internal Unsafe versions
426 * of atomic access methods rather than VarHandles to avoid
427 * potential VM bootstrap issues.
428 *
429 * Most of the above is performed by primary internal method
430 * acquire, that is invoked in some way by all exported acquire
431 * methods. (It is usually easy for compilers to optimize
432 * call-site specializations when heavily used.)
433 *
434 * There are several arbitrary decisions about when and how to
435 * check interrupts in both acquire and await before and/or after
436 * blocking. The decisions are less arbitrary in implementation
437 * updates because some users appear to rely on original behaviors
438 * in ways that are racy and so (rarely) wrong in general but hard
439 * to justify changing.
440 *
441 * Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
442 * Scherer and Michael Scott, along with members of JSR-166
443 * expert group, for helpful ideas, discussions, and critiques
444 * on the design of this class.
445 */
446
447 // Node status bits, also used as argument and return values
448 static final int WAITING = 1; // must be 1
449 static final int CANCELLED = 0x80000000; // must be negative
450 static final int COND = 2; // in a condition wait
451
452 /** CLH Nodes */
453 abstract static class Node {
454 volatile Node prev; // initially attached via casTail
455 volatile Node next; // visibly nonnull when signallable
456 Thread waiter; // visibly nonnull when enqueued
457 volatile int status; // written by owner, atomic bit ops by others
458
459 // methods for atomic operations
460 final boolean casPrev(Node c, Node v) { // for cleanQueue
461 return U.weakCompareAndSetReference(this, PREV, c, v);
462 }
463 final boolean casNext(Node c, Node v) { // for cleanQueue
464 return U.weakCompareAndSetReference(this, NEXT, c, v);
465 }
466 final int getAndUnsetStatus(int v) { // for signalling
467 return U.getAndBitwiseAndInt(this, STATUS, ~v);
468 }
469 final void setPrevRelaxed(Node p) { // for off-queue assignment
470 U.putReference(this, PREV, p);
471 }
472 final void setStatusRelaxed(int s) { // for off-queue assignment
473 U.putInt(this, STATUS, s);
474 }
475 final void clearStatus() { // for reducing unneeded signals
476 U.putIntOpaque(this, STATUS, 0);
477 }
478
479 private static final long STATUS
480 = U.objectFieldOffset(Node.class, "status");
481 private static final long NEXT
482 = U.objectFieldOffset(Node.class, "next");
483 private static final long PREV
484 = U.objectFieldOffset(Node.class, "prev");
485 }
486
487 // Concrete classes tagged by type
488 static final class ExclusiveNode extends Node { }
489 static final class SharedNode extends Node { }
490
491 static final class ConditionNode extends Node
492 implements ForkJoinPool.ManagedBlocker {
493 ConditionNode nextWaiter; // link to next waiting node
494
495 /**
496 * Allows Conditions to be used in ForkJoinPools without
497 * risking fixed pool exhaustion. This is usable only for
498 * untimed Condition waits, not timed versions.
499 */
500 public final boolean isReleasable() {
501 return status <= 1 || Thread.currentThread().isInterrupted();
502 }
503
504 public final boolean block() {
505 while (!isReleasable()) LockSupport.park(this);
506 return true;
507 }
508 }
509
510 /**
511 * Head of the wait queue, lazily initialized.
512 */
513 private transient volatile Node head;
514
515 /**
516 * Tail of the wait queue. After initialization, modified only via casTail.
517 */
518 private transient volatile Node tail;
519
520 /**
521 * The synchronization state.
522 */
523 private volatile int state;
524
525 /**
526 * Returns the current value of synchronization state.
527 * This operation has memory semantics of a {@code volatile} read.
528 * @return current state value
529 */
530 protected final int getState() {
531 return state;
532 }
533
534 /**
535 * Sets the value of synchronization state.
536 * This operation has memory semantics of a {@code volatile} write.
537 * @param newState the new state value
538 */
539 protected final void setState(int newState) {
540 state = newState;
541 }
542
543 /**
544 * Atomically sets synchronization state to the given updated
545 * value if the current state value equals the expected value.
546 * This operation has memory semantics of a {@code volatile} read
547 * and write.
548 *
549 * @param expect the expected value
550 * @param update the new value
551 * @return {@code true} if successful. False return indicates that the actual
552 * value was not equal to the expected value.
553 */
554 protected final boolean compareAndSetState(int expect, int update) {
555 return U.compareAndSetInt(this, STATE, expect, update);
556 }
557
558 // Queuing utilities
559
560 private boolean casTail(Node c, Node v) {
561 return U.compareAndSetReference(this, TAIL, c, v);
562 }
563
564 /** tries once to CAS a new dummy node for head */
565 private void tryInitializeHead() {
566 Node h = new ExclusiveNode();
567 if (U.compareAndSetReference(this, HEAD, null, h))
568 tail = h;
569 }
570
571 /**
572 * Enqueues the node unless null. (Currently used only for
573 * ConditionNodes; other cases are interleaved with acquires.)
574 */
575 final void enqueue(Node node) {
576 if (node != null) {
577 for (;;) {
578 Node t = tail;
579 node.setPrevRelaxed(t); // avoid unnecessary fence
580 if (t == null) // initialize
581 tryInitializeHead();
582 else if (casTail(t, node)) {
583 t.next = node;
584 if (t.status < 0) // wake up to clean link
585 LockSupport.unpark(node.waiter);
586 break;
587 }
588 }
589 }
590 }
591
592 /** Returns true if node is found in traversal from tail */
593 final boolean isEnqueued(Node node) {
594 for (Node t = tail; t != null; t = t.prev)
595 if (t == node)
596 return true;
597 return false;
598 }
599
600 /**
601 * Wakes up the successor of given node, if one exists, and unsets its
602 * WAITING status to avoid park race. This may fail to wake up an
603 * eligible thread when one or more have been cancelled, but
604 * cancelAcquire ensures liveness.
605 */
606 private static void signalNext(Node h) {
607 Node s;
608 if (h != null && (s = h.next) != null && s.status != 0) {
609 s.getAndUnsetStatus(WAITING);
610 LockSupport.unpark(s.waiter);
611 }
612 }
613
614 /** Wakes up the given node if in shared mode */
615 private static void signalNextIfShared(Node h) {
616 Node s;
617 if (h != null && (s = h.next) != null &&
618 (s instanceof SharedNode) && s.status != 0) {
619 s.getAndUnsetStatus(WAITING);
620 LockSupport.unpark(s.waiter);
621 }
622 }
623
624 /**
625 * Main acquire method, invoked by all exported acquire methods.
626 *
627 * @param node null unless a reacquiring Condition
628 * @param arg the acquire argument
629 * @param shared true if shared mode else exclusive
630 * @param interruptible if abort and return negative on interrupt
631 * @param timed if true use timed waits
632 * @param time if timed, the System.nanoTime value to timeout
633 * @return positive if acquired, 0 if timed out, negative if interrupted
634 */
635 final int acquire(Node node, int arg, boolean shared,
636 boolean interruptible, boolean timed, long time) {
637 Thread current = Thread.currentThread();
638 byte spins = 0, postSpins = 0; // retries upon unpark of first thread
639 boolean interrupted = false, first = false;
640 Node pred = null; // predecessor of node when enqueued
641
642 /*
643 * Repeatedly:
644 * Check if node now first
645 * if so, ensure head stable, else ensure valid predecessor
646 * if node is first or not yet enqueued, try acquiring
647 * else if node not yet created, create it
648 * else if not yet enqueued, try once to enqueue
649 * else if woken from park, retry (up to postSpins times)
650 * else if WAITING status not set, set and retry
651 * else park and clear WAITING status, and check cancellation
652 */
653
654 for (;;) {
655 if (!first && (pred = (node == null) ? null : node.prev) != null &&
656 !(first = (head == pred))) {
657 if (pred.status < 0) {
658 cleanQueue(); // predecessor cancelled
659 continue;
660 } else if (pred.prev == null) {
661 Thread.onSpinWait(); // ensure serialization
662 continue;
663 }
664 }
665 if (first || pred == null) {
666 boolean acquired;
667 try {
668 if (shared)
669 acquired = (tryAcquireShared(arg) >= 0);
670 else
671 acquired = tryAcquire(arg);
672 } catch (Throwable ex) {
673 cancelAcquire(node, interrupted, false);
674 throw ex;
675 }
676 if (acquired) {
677 if (first) {
678 node.prev = null;
679 head = node;
680 pred.next = null;
681 node.waiter = null;
682 if (shared)
683 signalNextIfShared(node);
684 if (interrupted)
685 current.interrupt();
686 }
687 return 1;
688 }
689 }
690 if (node == null) { // allocate; retry before enqueue
691 if (shared)
692 node = new SharedNode();
693 else
694 node = new ExclusiveNode();
695 } else if (pred == null) { // try to enqueue
696 node.waiter = current;
697 Node t = tail;
698 node.setPrevRelaxed(t); // avoid unnecessary fence
699 if (t == null)
700 tryInitializeHead();
701 else if (!casTail(t, node))
702 node.setPrevRelaxed(null); // back out
703 else
704 t.next = node;
705 } else if (first && spins != 0) {
706 --spins; // reduce unfairness on rewaits
707 Thread.onSpinWait();
708 } else if (node.status == 0) {
709 node.status = WAITING; // enable signal and recheck
710 } else {
711 long nanos;
712 spins = postSpins = (byte)((postSpins << 1) | 1);
713 if (!timed)
714 LockSupport.park(this);
715 else if ((nanos = time - System.nanoTime()) > 0L)
716 LockSupport.parkNanos(this, nanos);
717 else
718 break;
719 node.clearStatus();
720 if ((interrupted |= Thread.interrupted()) && interruptible)
721 break;
722 }
723 }
724 return cancelAcquire(node, interrupted, interruptible);
725 }
726
727 /**
728 * Possibly repeatedly traverses from tail, unsplicing cancelled
729 * nodes until none are found. Unparks nodes that may have been
730 * relinked to be next eligible acquirer.
731 */
732 private void cleanQueue() {
733 for (;;) { // restart point
734 for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
735 if (q == null || (p = q.prev) == null)
736 return; // end of list
737 if (s == null ? tail != q : (s.prev != q || s.status < 0))
738 break; // inconsistent
739 if (q.status < 0) { // cancelled
740 if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
741 q.prev == p) {
742 p.casNext(q, s); // OK if fails
743 if (p.prev == null)
744 signalNext(p);
745 }
746 break;
747 }
748 if ((n = p.next) != q) { // help finish
749 if (n != null && q.prev == p) {
750 p.casNext(n, q);
751 if (p.prev == null)
752 signalNext(p);
753 }
754 break;
755 }
756 s = q;
757 q = q.prev;
758 }
759 }
760 }
761
762 /**
763 * Cancels an ongoing attempt to acquire.
764 *
765 * @param node the node (may be null if cancelled before enqueuing)
766 * @param interrupted true if thread interrupted
767 * @param interruptible if should report interruption vs reset
768 */
769 private int cancelAcquire(Node node, boolean interrupted,
770 boolean interruptible) {
771 if (node != null) {
772 node.waiter = null;
773 node.status = CANCELLED;
774 if (node.prev != null)
775 cleanQueue();
776 }
777 if (interrupted) {
778 if (interruptible)
779 return CANCELLED;
780 else
781 Thread.currentThread().interrupt();
782 }
783 return 0;
784 }
785
786 // Main exported methods
787
788 /**
789 * Attempts to acquire in exclusive mode. This method should query
790 * if the state of the object permits it to be acquired in the
791 * exclusive mode, and if so to acquire it.
792 *
793 * <p>This method is always invoked by the thread performing
794 * acquire. If this method reports failure, the acquire method
795 * may queue the thread, if it is not already queued, until it is
796 * signalled by a release from some other thread. This can be used
797 * to implement method {@link Lock#tryLock()}.
798 *
799 * <p>The default
800 * implementation throws {@link UnsupportedOperationException}.
801 *
802 * @param arg the acquire argument. This value is always the one
803 * passed to an acquire method, or is the value saved on entry
916 * {@code false} otherwise
917 * @throws UnsupportedOperationException if conditions are not supported
918 */
919 protected boolean isHeldExclusively() {
920 throw new UnsupportedOperationException();
921 }
922
923 /**
924 * Acquires in exclusive mode, ignoring interrupts. Implemented
925 * by invoking at least once {@link #tryAcquire},
926 * returning on success. Otherwise the thread is queued, possibly
927 * repeatedly blocking and unblocking, invoking {@link
928 * #tryAcquire} until success. This method can be used
929 * to implement method {@link Lock#lock}.
930 *
931 * @param arg the acquire argument. This value is conveyed to
932 * {@link #tryAcquire} but is otherwise uninterpreted and
933 * can represent anything you like.
934 */
935 public final void acquire(int arg) {
936 if (!tryAcquire(arg))
937 acquire(null, arg, false, false, false, 0L);
938 }
939
940 /**
941 * Acquires in exclusive mode, aborting if interrupted.
942 * Implemented by first checking interrupt status, then invoking
943 * at least once {@link #tryAcquire}, returning on
944 * success. Otherwise the thread is queued, possibly repeatedly
945 * blocking and unblocking, invoking {@link #tryAcquire}
946 * until success or the thread is interrupted. This method can be
947 * used to implement method {@link Lock#lockInterruptibly}.
948 *
949 * @param arg the acquire argument. This value is conveyed to
950 * {@link #tryAcquire} but is otherwise uninterpreted and
951 * can represent anything you like.
952 * @throws InterruptedException if the current thread is interrupted
953 */
954 public final void acquireInterruptibly(int arg)
955 throws InterruptedException {
956 if (Thread.interrupted() ||
957 (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
958 throw new InterruptedException();
959 }
960
961 /**
962 * Attempts to acquire in exclusive mode, aborting if interrupted,
963 * and failing if the given timeout elapses. Implemented by first
964 * checking interrupt status, then invoking at least once {@link
965 * #tryAcquire}, returning on success. Otherwise, the thread is
966 * queued, possibly repeatedly blocking and unblocking, invoking
967 * {@link #tryAcquire} until success or the thread is interrupted
968 * or the timeout elapses. This method can be used to implement
969 * method {@link Lock#tryLock(long, TimeUnit)}.
970 *
971 * @param arg the acquire argument. This value is conveyed to
972 * {@link #tryAcquire} but is otherwise uninterpreted and
973 * can represent anything you like.
974 * @param nanosTimeout the maximum number of nanoseconds to wait
975 * @return {@code true} if acquired; {@code false} if timed out
976 * @throws InterruptedException if the current thread is interrupted
977 */
978 public final boolean tryAcquireNanos(int arg, long nanosTimeout)
979 throws InterruptedException {
980 if (!Thread.interrupted()) {
981 if (tryAcquire(arg))
982 return true;
983 if (nanosTimeout <= 0L)
984 return false;
985 int stat = acquire(null, arg, false, true, true,
986 System.nanoTime() + nanosTimeout);
987 if (stat > 0)
988 return true;
989 if (stat == 0)
990 return false;
991 }
992 throw new InterruptedException();
993 }
994
995 /**
996 * Releases in exclusive mode. Implemented by unblocking one or
997 * more threads if {@link #tryRelease} returns true.
998 * This method can be used to implement method {@link Lock#unlock}.
999 *
1000 * @param arg the release argument. This value is conveyed to
1001 * {@link #tryRelease} but is otherwise uninterpreted and
1002 * can represent anything you like.
1003 * @return the value returned from {@link #tryRelease}
1004 */
1005 public final boolean release(int arg) {
1006 if (tryRelease(arg)) {
1007 signalNext(head);
1008 return true;
1009 }
1010 return false;
1011 }
1012
1013 /**
1014 * Acquires in shared mode, ignoring interrupts. Implemented by
1015 * first invoking at least once {@link #tryAcquireShared},
1016 * returning on success. Otherwise the thread is queued, possibly
1017 * repeatedly blocking and unblocking, invoking {@link
1018 * #tryAcquireShared} until success.
1019 *
1020 * @param arg the acquire argument. This value is conveyed to
1021 * {@link #tryAcquireShared} but is otherwise uninterpreted
1022 * and can represent anything you like.
1023 */
1024 public final void acquireShared(int arg) {
1025 if (tryAcquireShared(arg) < 0)
1026 acquire(null, arg, true, false, false, 0L);
1027 }
1028
1029 /**
1030 * Acquires in shared mode, aborting if interrupted. Implemented
1031 * by first checking interrupt status, then invoking at least once
1032 * {@link #tryAcquireShared}, returning on success. Otherwise the
1033 * thread is queued, possibly repeatedly blocking and unblocking,
1034 * invoking {@link #tryAcquireShared} until success or the thread
1035 * is interrupted.
1036 * @param arg the acquire argument.
1037 * This value is conveyed to {@link #tryAcquireShared} but is
1038 * otherwise uninterpreted and can represent anything
1039 * you like.
1040 * @throws InterruptedException if the current thread is interrupted
1041 */
1042 public final void acquireSharedInterruptibly(int arg)
1043 throws InterruptedException {
1044 if (Thread.interrupted() ||
1045 (tryAcquireShared(arg) < 0 &&
1046 acquire(null, arg, true, true, false, 0L) < 0))
1047 throw new InterruptedException();
1048 }
1049
1050 /**
1051 * Attempts to acquire in shared mode, aborting if interrupted, and
1052 * failing if the given timeout elapses. Implemented by first
1053 * checking interrupt status, then invoking at least once {@link
1054 * #tryAcquireShared}, returning on success. Otherwise, the
1055 * thread is queued, possibly repeatedly blocking and unblocking,
1056 * invoking {@link #tryAcquireShared} until success or the thread
1057 * is interrupted or the timeout elapses.
1058 *
1059 * @param arg the acquire argument. This value is conveyed to
1060 * {@link #tryAcquireShared} but is otherwise uninterpreted
1061 * and can represent anything you like.
1062 * @param nanosTimeout the maximum number of nanoseconds to wait
1063 * @return {@code true} if acquired; {@code false} if timed out
1064 * @throws InterruptedException if the current thread is interrupted
1065 */
1066 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
1067 throws InterruptedException {
1068 if (!Thread.interrupted()) {
1069 if (tryAcquireShared(arg) >= 0)
1070 return true;
1071 if (nanosTimeout <= 0L)
1072 return false;
1073 int stat = acquire(null, arg, true, true, true,
1074 System.nanoTime() + nanosTimeout);
1075 if (stat > 0)
1076 return true;
1077 if (stat == 0)
1078 return false;
1079 }
1080 throw new InterruptedException();
1081 }
1082
1083 /**
1084 * Releases in shared mode. Implemented by unblocking one or more
1085 * threads if {@link #tryReleaseShared} returns true.
1086 *
1087 * @param arg the release argument. This value is conveyed to
1088 * {@link #tryReleaseShared} but is otherwise uninterpreted
1089 * and can represent anything you like.
1090 * @return the value returned from {@link #tryReleaseShared}
1091 */
1092 public final boolean releaseShared(int arg) {
1093 if (tryReleaseShared(arg)) {
1094 signalNext(head);
1095 return true;
1096 }
1097 return false;
1098 }
1099
1100 // Queue inspection methods
1101
1102 /**
1103 * Queries whether any threads are waiting to acquire. Note that
1104 * because cancellations due to interrupts and timeouts may occur
1105 * at any time, a {@code true} return does not guarantee that any
1106 * other thread will ever acquire.
1107 *
1108 * @return {@code true} if there may be other threads waiting to acquire
1109 */
1110 public final boolean hasQueuedThreads() {
1111 for (Node p = tail, h = head; p != h && p != null; p = p.prev)
1112 if (p.status >= 0)
1113 return true;
1114 return false;
1115 }
1116
1117 /**
1118 * Queries whether any threads have ever contended to acquire this
1119 * synchronizer; that is, if an acquire method has ever blocked.
1120 *
1121 * <p>In this implementation, this operation returns in
1122 * constant time.
1123 *
1124 * @return {@code true} if there has ever been contention
1125 */
1126 public final boolean hasContended() {
1127 return head != null;
1128 }
1129
1130 /**
1131 * Returns the first (longest-waiting) thread in the queue, or
1132 * {@code null} if no threads are currently queued.
1133 *
1134 * <p>In this implementation, this operation normally returns in
1135 * constant time, but may iterate upon contention if other threads are
1136 * concurrently modifying the queue.
1137 *
1138 * @return the first (longest-waiting) thread in the queue, or
1139 * {@code null} if no threads are currently queued
1140 */
1141 public final Thread getFirstQueuedThread() {
1142 Thread first = null, w; Node h, s;
1143 if ((h = head) != null && ((s = h.next) == null ||
1144 (first = s.waiter) == null ||
1145 s.prev == null)) {
1146 // traverse from tail on stale reads
1147 for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
1148 if ((w = p.waiter) != null)
1149 first = w;
1150 }
1151 return first;
1152 }
1153
1154 /**
1155 * Returns true if the given thread is currently queued.
1156 *
1157 * <p>This implementation traverses the queue to determine
1158 * presence of the given thread.
1159 *
1160 * @param thread the thread
1161 * @return {@code true} if the given thread is on the queue
1162 * @throws NullPointerException if the thread is null
1163 */
1164 public final boolean isQueued(Thread thread) {
1165 if (thread == null)
1166 throw new NullPointerException();
1167 for (Node p = tail; p != null; p = p.prev)
1168 if (p.waiter == thread)
1169 return true;
1170 return false;
1171 }
1172
1173 /**
1174 * Returns {@code true} if the apparent first queued thread, if one
1175 * exists, is waiting in exclusive mode. If this method returns
1176 * {@code true}, and the current thread is attempting to acquire in
1177 * shared mode (that is, this method is invoked from {@link
1178 * #tryAcquireShared}) then it is guaranteed that the current thread
1179 * is not the first queued thread. Used only as a heuristic in
1180 * ReentrantReadWriteLock.
1181 */
1182 final boolean apparentlyFirstQueuedIsExclusive() {
1183 Node h, s;
1184 return (h = head) != null && (s = h.next) != null &&
1185 !(s instanceof SharedNode) && s.waiter != null;
1186 }
1187
1188 /**
1189 * Queries whether any threads have been waiting to acquire longer
1190 * than the current thread.
1191 *
1192 * <p>An invocation of this method is equivalent to (but may be
1193 * more efficient than):
1194 * <pre> {@code
1195 * getFirstQueuedThread() != Thread.currentThread()
1196 * && hasQueuedThreads()}</pre>
1197 *
1198 * <p>Note that because cancellations due to interrupts and
1199 * timeouts may occur at any time, a {@code true} return does not
1200 * guarantee that some other thread will acquire before the current
1201 * thread. Likewise, it is possible for another thread to win a
1202 * race to enqueue after this method has returned {@code false},
1203 * due to the queue being empty.
1204 *
1205 * <p>This method is designed to be used by a fair synchronizer to
1212 * synchronizer might look like this:
1213 *
1214 * <pre> {@code
1215 * protected boolean tryAcquire(int arg) {
1216 * if (isHeldExclusively()) {
1217 * // A reentrant acquire; increment hold count
1218 * return true;
1219 * } else if (hasQueuedPredecessors()) {
1220 * return false;
1221 * } else {
1222 * // try to acquire normally
1223 * }
1224 * }}</pre>
1225 *
1226 * @return {@code true} if there is a queued thread preceding the
1227 * current thread, and {@code false} if the current thread
1228 * is at the head of the queue or the queue is empty
1229 * @since 1.7
1230 */
1231 public final boolean hasQueuedPredecessors() {
1232 Thread first = null; Node h, s;
1233 if ((h = head) != null && ((s = h.next) == null ||
1234 (first = s.waiter) == null ||
1235 s.prev == null))
1236 first = getFirstQueuedThread(); // retry via getFirstQueuedThread
1237 return first != null && first != Thread.currentThread();
1238 }
1239
1240 // Instrumentation and monitoring methods
1241
1242 /**
1243 * Returns an estimate of the number of threads waiting to
1244 * acquire. The value is only an estimate because the number of
1245 * threads may change dynamically while this method traverses
1246 * internal data structures. This method is designed for use in
1247 * monitoring system state, not for synchronization control.
1248 *
1249 * @return the estimated number of threads waiting to acquire
1250 */
1251 public final int getQueueLength() {
1252 int n = 0;
1253 for (Node p = tail; p != null; p = p.prev) {
1254 if (p.waiter != null)
1255 ++n;
1256 }
1257 return n;
1258 }
1259
1260 /**
1261 * Returns a collection containing threads that may be waiting to
1262 * acquire. Because the actual set of threads may change
1263 * dynamically while constructing this result, the returned
1264 * collection is only a best-effort estimate. The elements of the
1265 * returned collection are in no particular order. This method is
1266 * designed to facilitate construction of subclasses that provide
1267 * more extensive monitoring facilities.
1268 *
1269 * @return the collection of threads
1270 */
1271 public final Collection<Thread> getQueuedThreads() {
1272 ArrayList<Thread> list = new ArrayList<>();
1273 for (Node p = tail; p != null; p = p.prev) {
1274 Thread t = p.waiter;
1275 if (t != null)
1276 list.add(t);
1277 }
1278 return list;
1279 }
1280
1281 /**
1282 * Returns a collection containing threads that may be waiting to
1283 * acquire in exclusive mode. This has the same properties
1284 * as {@link #getQueuedThreads} except that it only returns
1285 * those threads waiting due to an exclusive acquire.
1286 *
1287 * @return the collection of threads
1288 */
1289 public final Collection<Thread> getExclusiveQueuedThreads() {
1290 ArrayList<Thread> list = new ArrayList<>();
1291 for (Node p = tail; p != null; p = p.prev) {
1292 if (!(p instanceof SharedNode)) {
1293 Thread t = p.waiter;
1294 if (t != null)
1295 list.add(t);
1296 }
1297 }
1298 return list;
1299 }
1300
1301 /**
1302 * Returns a collection containing threads that may be waiting to
1303 * acquire in shared mode. This has the same properties
1304 * as {@link #getQueuedThreads} except that it only returns
1305 * those threads waiting due to a shared acquire.
1306 *
1307 * @return the collection of threads
1308 */
1309 public final Collection<Thread> getSharedQueuedThreads() {
1310 ArrayList<Thread> list = new ArrayList<>();
1311 for (Node p = tail; p != null; p = p.prev) {
1312 if (p instanceof SharedNode) {
1313 Thread t = p.waiter;
1314 if (t != null)
1315 list.add(t);
1316 }
1317 }
1318 return list;
1319 }
1320
1321 /**
1322 * Returns a string identifying this synchronizer, as well as its state.
1323 * The state, in brackets, includes the String {@code "State ="}
1324 * followed by the current value of {@link #getState}, and either
1325 * {@code "nonempty"} or {@code "empty"} depending on whether the
1326 * queue is empty.
1327 *
1328 * @return a string identifying this synchronizer, as well as its state
1329 */
1330 public String toString() {
1331 return super.toString()
1332 + "[State = " + getState() + ", "
1333 + (hasQueuedThreads() ? "non" : "") + "empty queue]";
1334 }
1335
1336 // Instrumentation methods for conditions
1337
1338 /**
1339 * Queries whether the given ConditionObject
1340 * uses this synchronizer as its lock.
1341 *
1342 * @param condition the condition
1343 * @return {@code true} if owned
1344 * @throws NullPointerException if the condition is null
1345 */
1346 public final boolean owns(ConditionObject condition) {
1347 return condition.isOwnedBy(this);
1348 }
1349
1350 /**
1351 * Queries whether any threads are waiting on the given condition
1352 * associated with this synchronizer. Note that because timeouts
1353 * and interrupts may occur at any time, a {@code true} return
1354 * does not guarantee that a future {@code signal} will awaken
1355 * any threads. This method is designed primarily for use in
1413 return condition.getWaitingThreads();
1414 }
1415
1416 /**
1417 * Condition implementation for a {@link AbstractQueuedSynchronizer}
1418 * serving as the basis of a {@link Lock} implementation.
1419 *
1420 * <p>Method documentation for this class describes mechanics,
1421 * not behavioral specifications from the point of view of Lock
1422 * and Condition users. Exported versions of this class will in
1423 * general need to be accompanied by documentation describing
1424 * condition semantics that rely on those of the associated
1425 * {@code AbstractQueuedSynchronizer}.
1426 *
1427 * <p>This class is Serializable, but all fields are transient,
1428 * so deserialized conditions have no waiters.
1429 */
1430 public class ConditionObject implements Condition, java.io.Serializable {
1431 private static final long serialVersionUID = 1173984872572414699L;
1432 /** First node of condition queue. */
1433 private transient ConditionNode firstWaiter;
1434 /** Last node of condition queue. */
1435 private transient ConditionNode lastWaiter;
1436
1437 /**
1438 * Creates a new {@code ConditionObject} instance.
1439 */
1440 public ConditionObject() { }
1441
1442 // Signalling methods
1443
1444 /**
1445 * Removes and transfers one or all waiters to sync queue.
1446 */
1447 private void doSignal(ConditionNode first, boolean all) {
1448 while (first != null) {
1449 ConditionNode next = first.nextWaiter;
1450 if ((firstWaiter = next) == null)
1451 lastWaiter = null;
1452 if ((first.getAndUnsetStatus(COND) & COND) != 0) {
1453 enqueue(first);
1454 if (!all)
1455 break;
1456 }
1457 first = next;
1458 }
1459 }
1460
1461 /**
1462 * Moves the longest-waiting thread, if one exists, from the
1463 * wait queue for this condition to the wait queue for the
1464 * owning lock.
1465 *
1466 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1467 * returns {@code false}
1468 */
1469 public final void signal() {
1470 ConditionNode first = firstWaiter;
1471 if (!isHeldExclusively())
1472 throw new IllegalMonitorStateException();
1473 if (first != null)
1474 doSignal(first, false);
1475 }
1476
1477 /**
1478 * Moves all threads from the wait queue for this condition to
1479 * the wait queue for the owning lock.
1480 *
1481 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1482 * returns {@code false}
1483 */
1484 public final void signalAll() {
1485 ConditionNode first = firstWaiter;
1486 if (!isHeldExclusively())
1487 throw new IllegalMonitorStateException();
1488 if (first != null)
1489 doSignal(first, true);
1490 }
1491
1492 // Waiting methods
1493
1494 /**
1495 * Adds node to condition list and releases lock.
1496 *
1497 * @param node the node
1498 * @return savedState to reacquire after wait
1499 */
1500 private int enableWait(ConditionNode node) {
1501 if (isHeldExclusively()) {
1502 node.waiter = Thread.currentThread();
1503 node.setStatusRelaxed(COND | WAITING);
1504 ConditionNode last = lastWaiter;
1505 if (last == null)
1506 firstWaiter = node;
1507 else
1508 last.nextWaiter = node;
1509 lastWaiter = node;
1510 int savedState = getState();
1511 if (release(savedState))
1512 return savedState;
1513 }
1514 node.status = CANCELLED; // lock not held or inconsistent
1515 throw new IllegalMonitorStateException();
1516 }
1517
1518 /**
1519 * Returns true if a node that was initially placed on a condition
1520 * queue is now ready to reacquire on sync queue.
1521 * @param node the node
1522 * @return true if is reacquiring
1523 */
1524 private boolean canReacquire(ConditionNode node) {
1525 // check links, not status to avoid enqueue race
1526 return node != null && node.prev != null && isEnqueued(node);
1527 }
1528
1529 /**
1530 * Unlinks the given node and other non-waiting nodes from
1531 * condition queue unless already unlinked.
1532 */
1533 private void unlinkCancelledWaiters(ConditionNode node) {
1534 if (node == null || node.nextWaiter != null || node == lastWaiter) {
1535 ConditionNode w = firstWaiter, trail = null;
1536 while (w != null) {
1537 ConditionNode next = w.nextWaiter;
1538 if ((w.status & COND) == 0) {
1539 w.nextWaiter = null;
1540 if (trail == null)
1541 firstWaiter = next;
1542 else
1543 trail.nextWaiter = next;
1544 if (next == null)
1545 lastWaiter = trail;
1546 } else
1547 trail = w;
1548 w = next;
1549 }
1550 }
1551 }
1552
1553 /**
1554 * Implements uninterruptible condition wait.
1555 * <ol>
1556 * <li>Save lock state returned by {@link #getState}.
1557 * <li>Invoke {@link #release} with saved state as argument,
1558 * throwing IllegalMonitorStateException if it fails.
1559 * <li>Block until signalled.
1560 * <li>Reacquire by invoking specialized version of
1561 * {@link #acquire} with saved state as argument.
1562 * </ol>
1563 */
1564 public final void awaitUninterruptibly() {
1565 ConditionNode node = new ConditionNode();
1566 int savedState = enableWait(node);
1567 LockSupport.setCurrentBlocker(this); // for back-compatibility
1568 boolean interrupted = false;
1569 while (!canReacquire(node)) {
1570 if (Thread.interrupted())
1571 interrupted = true;
1572 else if ((node.status & COND) != 0) {
1573 try {
1574 ForkJoinPool.managedBlock(node);
1575 } catch (InterruptedException ie) {
1576 interrupted = true;
1577 }
1578 } else
1579 Thread.onSpinWait(); // awoke while enqueuing
1580 }
1581 LockSupport.setCurrentBlocker(null);
1582 node.clearStatus();
1583 acquire(node, savedState, false, false, false, 0L);
1584 if (interrupted)
1585 Thread.currentThread().interrupt();
1586 }
1587
1588 /**
1589 * Implements interruptible condition wait.
1590 * <ol>
1591 * <li>If current thread is interrupted, throw InterruptedException.
1592 * <li>Save lock state returned by {@link #getState}.
1593 * <li>Invoke {@link #release} with saved state as argument,
1594 * throwing IllegalMonitorStateException if it fails.
1595 * <li>Block until signalled or interrupted.
1596 * <li>Reacquire by invoking specialized version of
1597 * {@link #acquire} with saved state as argument.
1598 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1599 * </ol>
1600 */
1601 public final void await() throws InterruptedException {
1602 if (Thread.interrupted())
1603 throw new InterruptedException();
1604 ConditionNode node = new ConditionNode();
1605 int savedState = enableWait(node);
1606 LockSupport.setCurrentBlocker(this); // for back-compatibility
1607 boolean interrupted = false, cancelled = false;
1608 while (!canReacquire(node)) {
1609 if (interrupted |= Thread.interrupted()) {
1610 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1611 break; // else interrupted after signal
1612 } else if ((node.status & COND) != 0) {
1613 try {
1614 ForkJoinPool.managedBlock(node);
1615 } catch (InterruptedException ie) {
1616 interrupted = true;
1617 }
1618 } else
1619 Thread.onSpinWait(); // awoke while enqueuing
1620 }
1621 LockSupport.setCurrentBlocker(null);
1622 node.clearStatus();
1623 acquire(node, savedState, false, false, false, 0L);
1624 if (interrupted) {
1625 if (cancelled) {
1626 unlinkCancelledWaiters(node);
1627 throw new InterruptedException();
1628 }
1629 Thread.currentThread().interrupt();
1630 }
1631 }
1632
1633 /**
1634 * Implements timed condition wait.
1635 * <ol>
1636 * <li>If current thread is interrupted, throw InterruptedException.
1637 * <li>Save lock state returned by {@link #getState}.
1638 * <li>Invoke {@link #release} with saved state as argument,
1639 * throwing IllegalMonitorStateException if it fails.
1640 * <li>Block until signalled, interrupted, or timed out.
1641 * <li>Reacquire by invoking specialized version of
1642 * {@link #acquire} with saved state as argument.
1643 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1644 * </ol>
1645 */
1646 public final long awaitNanos(long nanosTimeout)
1647 throws InterruptedException {
1648 if (Thread.interrupted())
1649 throw new InterruptedException();
1650 ConditionNode node = new ConditionNode();
1651 int savedState = enableWait(node);
1652 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
1653 long deadline = System.nanoTime() + nanos;
1654 boolean cancelled = false, interrupted = false;
1655 while (!canReacquire(node)) {
1656 if ((interrupted |= Thread.interrupted()) ||
1657 (nanos = deadline - System.nanoTime()) <= 0L) {
1658 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1659 break;
1660 } else
1661 LockSupport.parkNanos(this, nanos);
1662 }
1663 node.clearStatus();
1664 acquire(node, savedState, false, false, false, 0L);
1665 if (cancelled) {
1666 unlinkCancelledWaiters(node);
1667 if (interrupted)
1668 throw new InterruptedException();
1669 } else if (interrupted)
1670 Thread.currentThread().interrupt();
1671 long remaining = deadline - System.nanoTime(); // avoid overflow
1672 return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
1673 }
1674
1675 /**
1676 * Implements absolute timed condition wait.
1677 * <ol>
1678 * <li>If current thread is interrupted, throw InterruptedException.
1679 * <li>Save lock state returned by {@link #getState}.
1680 * <li>Invoke {@link #release} with saved state as argument,
1681 * throwing IllegalMonitorStateException if it fails.
1682 * <li>Block until signalled, interrupted, or timed out.
1683 * <li>Reacquire by invoking specialized version of
1684 * {@link #acquire} with saved state as argument.
1685 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1686 * <li>If timed out while blocked in step 4, return false, else true.
1687 * </ol>
1688 */
1689 public final boolean awaitUntil(Date deadline)
1690 throws InterruptedException {
1691 long abstime = deadline.getTime();
1692 if (Thread.interrupted())
1693 throw new InterruptedException();
1694 ConditionNode node = new ConditionNode();
1695 int savedState = enableWait(node);
1696 boolean cancelled = false, interrupted = false;
1697 while (!canReacquire(node)) {
1698 if ((interrupted |= Thread.interrupted()) ||
1699 System.currentTimeMillis() >= abstime) {
1700 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1701 break;
1702 } else
1703 LockSupport.parkUntil(this, abstime);
1704 }
1705 node.clearStatus();
1706 acquire(node, savedState, false, false, false, 0L);
1707 if (cancelled) {
1708 unlinkCancelledWaiters(node);
1709 if (interrupted)
1710 throw new InterruptedException();
1711 } else if (interrupted)
1712 Thread.currentThread().interrupt();
1713 return !cancelled;
1714 }
1715
1716 /**
1717 * Implements timed condition wait.
1718 * <ol>
1719 * <li>If current thread is interrupted, throw InterruptedException.
1720 * <li>Save lock state returned by {@link #getState}.
1721 * <li>Invoke {@link #release} with saved state as argument,
1722 * throwing IllegalMonitorStateException if it fails.
1723 * <li>Block until signalled, interrupted, or timed out.
1724 * <li>Reacquire by invoking specialized version of
1725 * {@link #acquire} with saved state as argument.
1726 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1727 * <li>If timed out while blocked in step 4, return false, else true.
1728 * </ol>
1729 */
1730 public final boolean await(long time, TimeUnit unit)
1731 throws InterruptedException {
1732 long nanosTimeout = unit.toNanos(time);
1733 if (Thread.interrupted())
1734 throw new InterruptedException();
1735 ConditionNode node = new ConditionNode();
1736 int savedState = enableWait(node);
1737 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
1738 long deadline = System.nanoTime() + nanos;
1739 boolean cancelled = false, interrupted = false;
1740 while (!canReacquire(node)) {
1741 if ((interrupted |= Thread.interrupted()) ||
1742 (nanos = deadline - System.nanoTime()) <= 0L) {
1743 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1744 break;
1745 } else
1746 LockSupport.parkNanos(this, nanos);
1747 }
1748 node.clearStatus();
1749 acquire(node, savedState, false, false, false, 0L);
1750 if (cancelled) {
1751 unlinkCancelledWaiters(node);
1752 if (interrupted)
1753 throw new InterruptedException();
1754 } else if (interrupted)
1755 Thread.currentThread().interrupt();
1756 return !cancelled;
1757 }
1758
1759 // support for instrumentation
1760
1761 /**
1762 * Returns true if this condition was created by the given
1763 * synchronization object.
1764 *
1765 * @return {@code true} if owned
1766 */
1767 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
1768 return sync == AbstractQueuedSynchronizer.this;
1769 }
1770
1771 /**
1772 * Queries whether any threads are waiting on this condition.
1773 * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
1774 *
1775 * @return {@code true} if there are any waiting threads
1776 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1777 * returns {@code false}
1778 */
1779 protected final boolean hasWaiters() {
1780 if (!isHeldExclusively())
1781 throw new IllegalMonitorStateException();
1782 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
1783 if ((w.status & COND) != 0)
1784 return true;
1785 }
1786 return false;
1787 }
1788
1789 /**
1790 * Returns an estimate of the number of threads waiting on
1791 * this condition.
1792 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
1793 *
1794 * @return the estimated number of waiting threads
1795 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1796 * returns {@code false}
1797 */
1798 protected final int getWaitQueueLength() {
1799 if (!isHeldExclusively())
1800 throw new IllegalMonitorStateException();
1801 int n = 0;
1802 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
1803 if ((w.status & COND) != 0)
1804 ++n;
1805 }
1806 return n;
1807 }
1808
1809 /**
1810 * Returns a collection containing those threads that may be
1811 * waiting on this Condition.
1812 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
1813 *
1814 * @return the collection of threads
1815 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1816 * returns {@code false}
1817 */
1818 protected final Collection<Thread> getWaitingThreads() {
1819 if (!isHeldExclusively())
1820 throw new IllegalMonitorStateException();
1821 ArrayList<Thread> list = new ArrayList<>();
1822 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
1823 if ((w.status & COND) != 0) {
1824 Thread t = w.waiter;
1825 if (t != null)
1826 list.add(t);
1827 }
1828 }
1829 return list;
1830 }
1831 }
1832
1833 // Unsafe
1834 private static final Unsafe U = Unsafe.getUnsafe();
1835 private static final long STATE
1836 = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state");
1837 private static final long HEAD
1838 = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "head");
1839 private static final long TAIL
1840 = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "tail");
1841
1842 static {
1843 Class<?> ensureLoaded = LockSupport.class;
1844 }
1845 }
|