18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent.locks;
37
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.Date;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node;
45
46 /**
47 * A version of {@link AbstractQueuedSynchronizer} in
48 * which synchronization state is maintained as a {@code long}.
49 * This class has exactly the same structure, properties, and methods
50 * as {@code AbstractQueuedSynchronizer} with the exception
51 * that all state-related parameters and results are defined
52 * as {@code long} rather than {@code int}. This class
53 * may be useful when creating synchronizers such as
54 * multilevel locks and barriers that require
55 * 64 bits of state.
56 *
57 * <p>See {@link AbstractQueuedSynchronizer} for usage
58 * notes and examples.
59 *
60 * @since 1.6
61 * @author Doug Lea
62 */
63 public abstract class AbstractQueuedLongSynchronizer
64 extends AbstractOwnableSynchronizer
65 implements java.io.Serializable {
66
67 private static final long serialVersionUID = 7373984972572414692L;
68
69 /*
70 * To keep sources in sync, the remainder of this source file is
71 * exactly cloned from AbstractQueuedSynchronizer, replacing class
72 * name and changing ints related with sync state to longs. Please
73 * keep it that way.
74 */
75
76 /**
77 * Creates a new {@code AbstractQueuedLongSynchronizer} instance
78 * with initial synchronization state of zero.
79 */
80 protected AbstractQueuedLongSynchronizer() { }
81
82 /**
83 * Head of the wait queue, lazily initialized. Except for
84 * initialization, it is modified only via method setHead. Note:
85 * If head exists, its waitStatus is guaranteed not to be
86 * CANCELLED.
87 */
88 private transient volatile Node head;
89
90 /**
91 * Tail of the wait queue, lazily initialized. Modified only via
92 * method enq to add new wait node.
93 */
94 private transient volatile Node tail;
95
96 /**
97 * The synchronization state.
98 */
99 private volatile long state;
100
101 /**
102 * Returns the current value of synchronization state.
103 * This operation has memory semantics of a {@code volatile} read.
104 * @return current state value
105 */
106 protected final long getState() {
107 return state;
108 }
109
110 /**
111 * Sets the value of synchronization state.
112 * This operation has memory semantics of a {@code volatile} write.
113 * @param newState the new state value
114 */
115 protected final void setState(long newState) {
116 // See JDK-8180620: Clarify VarHandle mixed-access subtleties
117 STATE.setVolatile(this, newState);
118 }
119
120 /**
121 * Atomically sets synchronization state to the given updated
122 * value if the current state value equals the expected value.
123 * This operation has memory semantics of a {@code volatile} read
124 * and write.
125 *
126 * @param expect the expected value
127 * @param update the new value
128 * @return {@code true} if successful. False return indicates that the actual
129 * value was not equal to the expected value.
130 */
131 protected final boolean compareAndSetState(long expect, long update) {
132 return STATE.compareAndSet(this, expect, update);
133 }
134
135 // Queuing utilities
136
137 /**
138 * The number of nanoseconds for which it is faster to spin
139 * rather than to use timed park. A rough estimate suffices
140 * to improve responsiveness with very short timeouts.
141 */
142 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
143
144 /**
145 * Inserts node into queue, initializing if necessary. See picture above.
146 * @param node the node to insert
147 * @return node's predecessor
148 */
149 private Node enq(Node node) {
150 for (;;) {
151 Node oldTail = tail;
152 if (oldTail != null) {
153 node.setPrevRelaxed(oldTail);
154 if (compareAndSetTail(oldTail, node)) {
155 oldTail.next = node;
156 return oldTail;
157 }
158 } else {
159 initializeSyncQueue();
160 }
161 }
162 }
163
164 /**
165 * Creates and enqueues node for current thread and given mode.
166 *
167 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
168 * @return the new node
169 */
170 private Node addWaiter(Node mode) {
171 Node node = new Node(mode);
172
173 for (;;) {
174 Node oldTail = tail;
175 if (oldTail != null) {
176 node.setPrevRelaxed(oldTail);
177 if (compareAndSetTail(oldTail, node)) {
178 oldTail.next = node;
179 return node;
180 }
181 } else {
182 initializeSyncQueue();
183 }
184 }
185 }
186
187 /**
188 * Sets head of queue to be node, thus dequeuing. Called only by
189 * acquire methods. Also nulls out unused fields for sake of GC
190 * and to suppress unnecessary signals and traversals.
191 *
192 * @param node the node
193 */
194 private void setHead(Node node) {
195 head = node;
196 node.thread = null;
197 node.prev = null;
198 }
199
200 /**
201 * Wakes up node's successor, if one exists.
202 *
203 * @param node the node
204 */
205 private void unparkSuccessor(Node node) {
206 /*
207 * If status is negative (i.e., possibly needing signal) try
208 * to clear in anticipation of signalling. It is OK if this
209 * fails or if status is changed by waiting thread.
210 */
211 int ws = node.waitStatus;
212 if (ws < 0)
213 node.compareAndSetWaitStatus(ws, 0);
214
215 /*
216 * Thread to unpark is held in successor, which is normally
217 * just the next node. But if cancelled or apparently null,
218 * traverse backwards from tail to find the actual
219 * non-cancelled successor.
220 */
221 Node s = node.next;
222 if (s == null || s.waitStatus > 0) {
223 s = null;
224 for (Node p = tail; p != node && p != null; p = p.prev)
225 if (p.waitStatus <= 0)
226 s = p;
227 }
228 if (s != null)
229 LockSupport.unpark(s.thread);
230 }
231
232 /**
233 * Release action for shared mode -- signals successor and ensures
234 * propagation. (Note: For exclusive mode, release just amounts
235 * to calling unparkSuccessor of head if it needs signal.)
236 */
237 private void doReleaseShared() {
238 /*
239 * Ensure that a release propagates, even if there are other
240 * in-progress acquires/releases. This proceeds in the usual
241 * way of trying to unparkSuccessor of head if it needs
242 * signal. But if it does not, status is set to PROPAGATE to
243 * ensure that upon release, propagation continues.
244 * Additionally, we must loop in case a new node is added
245 * while we are doing this. Also, unlike other uses of
246 * unparkSuccessor, we need to know if CAS to reset status
247 * fails, if so rechecking.
248 */
249 for (;;) {
250 Node h = head;
251 if (h != null && h != tail) {
252 int ws = h.waitStatus;
253 if (ws == Node.SIGNAL) {
254 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
255 continue; // loop to recheck cases
256 unparkSuccessor(h);
257 }
258 else if (ws == 0 &&
259 !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
260 continue; // loop on failed CAS
261 }
262 if (h == head) // loop if head changed
263 break;
264 }
265 }
266
267 /**
268 * Sets head of queue, and checks if successor may be waiting
269 * in shared mode, if so propagating if either propagate > 0 or
270 * PROPAGATE status was set.
271 *
272 * @param node the node
273 * @param propagate the return value from a tryAcquireShared
274 */
275 private void setHeadAndPropagate(Node node, long propagate) {
276 Node h = head; // Record old head for check below
277 setHead(node);
278 /*
279 * Try to signal next queued node if:
280 * Propagation was indicated by caller,
281 * or was recorded (as h.waitStatus either before
282 * or after setHead) by a previous operation
283 * (note: this uses sign-check of waitStatus because
284 * PROPAGATE status may transition to SIGNAL.)
285 * and
286 * The next node is waiting in shared mode,
287 * or we don't know, because it appears null
288 *
289 * The conservatism in both of these checks may cause
290 * unnecessary wake-ups, but only when there are multiple
291 * racing acquires/releases, so most need signals now or soon
292 * anyway.
293 */
294 if (propagate > 0 || h == null || h.waitStatus < 0 ||
295 (h = head) == null || h.waitStatus < 0) {
296 Node s = node.next;
297 if (s == null || s.isShared())
298 doReleaseShared();
299 }
300 }
301
302 // Utilities for various versions of acquire
303
304 /**
305 * Cancels an ongoing attempt to acquire.
306 *
307 * @param node the node
308 */
309 private void cancelAcquire(Node node) {
310 // Ignore if node doesn't exist
311 if (node == null)
312 return;
313
314 node.thread = null;
315
316 // Skip cancelled predecessors
317 Node pred = node.prev;
318 while (pred.waitStatus > 0)
319 node.prev = pred = pred.prev;
320
321 // predNext is the apparent node to unsplice. CASes below will
322 // fail if not, in which case, we lost race vs another cancel
323 // or signal, so no further action is necessary, although with
324 // a possibility that a cancelled node may transiently remain
325 // reachable.
326 Node predNext = pred.next;
327
328 // Can use unconditional write instead of CAS here.
329 // After this atomic step, other Nodes can skip past us.
330 // Before, we are free of interference from other threads.
331 node.waitStatus = Node.CANCELLED;
332
333 // If we are the tail, remove ourselves.
334 if (node == tail && compareAndSetTail(node, pred)) {
335 pred.compareAndSetNext(predNext, null);
336 } else {
337 // If successor needs signal, try to set pred's next-link
338 // so it will get one. Otherwise wake it up to propagate.
339 int ws;
340 if (pred != head &&
341 ((ws = pred.waitStatus) == Node.SIGNAL ||
342 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
343 pred.thread != null) {
344 Node next = node.next;
345 if (next != null && next.waitStatus <= 0)
346 pred.compareAndSetNext(predNext, next);
347 } else {
348 unparkSuccessor(node);
349 }
350
351 node.next = node; // help GC
352 }
353 }
354
355 /**
356 * Checks and updates status for a node that failed to acquire.
357 * Returns true if thread should block. This is the main signal
358 * control in all acquire loops. Requires that pred == node.prev.
359 *
360 * @param pred node's predecessor holding status
361 * @param node the node
362 * @return {@code true} if thread should block
363 */
364 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
365 int ws = pred.waitStatus;
366 if (ws == Node.SIGNAL)
367 /*
368 * This node has already set status asking a release
369 * to signal it, so it can safely park.
370 */
371 return true;
372 if (ws > 0) {
373 /*
374 * Predecessor was cancelled. Skip over predecessors and
375 * indicate retry.
376 */
377 do {
378 node.prev = pred = pred.prev;
379 } while (pred.waitStatus > 0);
380 pred.next = node;
381 } else {
382 /*
383 * waitStatus must be 0 or PROPAGATE. Indicate that we
384 * need a signal, but don't park yet. Caller will need to
385 * retry to make sure it cannot acquire before parking.
386 */
387 pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
388 }
389 return false;
390 }
391
392 /**
393 * Convenience method to interrupt current thread.
394 */
395 static void selfInterrupt() {
396 Thread.currentThread().interrupt();
397 }
398
399 /**
400 * Convenience method to park and then check if interrupted.
401 *
402 * @return {@code true} if interrupted
403 */
404 private final boolean parkAndCheckInterrupt() {
405 LockSupport.park(this);
406 return Thread.interrupted();
407 }
408
409 /*
410 * Various flavors of acquire, varying in exclusive/shared and
411 * control modes. Each is mostly the same, but annoyingly
412 * different. Only a little bit of factoring is possible due to
413 * interactions of exception mechanics (including ensuring that we
414 * cancel if tryAcquire throws exception) and other control, at
415 * least not without hurting performance too much.
416 */
417
418 /**
419 * Acquires in exclusive uninterruptible mode for thread already in
420 * queue. Used by condition wait methods as well as acquire.
421 *
422 * @param node the node
423 * @param arg the acquire argument
424 * @return {@code true} if interrupted while waiting
425 */
426 final boolean acquireQueued(final Node node, long arg) {
427 boolean interrupted = false;
428 try {
429 for (;;) {
430 final Node p = node.predecessor();
431 if (p == head && tryAcquire(arg)) {
432 setHead(node);
433 p.next = null; // help GC
434 return interrupted;
435 }
436 if (shouldParkAfterFailedAcquire(p, node))
437 interrupted |= parkAndCheckInterrupt();
438 }
439 } catch (Throwable t) {
440 cancelAcquire(node);
441 if (interrupted)
442 selfInterrupt();
443 throw t;
444 }
445 }
446
447 /**
448 * Acquires in exclusive interruptible mode.
449 * @param arg the acquire argument
450 */
451 private void doAcquireInterruptibly(long arg)
452 throws InterruptedException {
453 final Node node = addWaiter(Node.EXCLUSIVE);
454 try {
455 for (;;) {
456 final Node p = node.predecessor();
457 if (p == head && tryAcquire(arg)) {
458 setHead(node);
459 p.next = null; // help GC
460 return;
461 }
462 if (shouldParkAfterFailedAcquire(p, node) &&
463 parkAndCheckInterrupt())
464 throw new InterruptedException();
465 }
466 } catch (Throwable t) {
467 cancelAcquire(node);
468 throw t;
469 }
470 }
471
472 /**
473 * Acquires in exclusive timed mode.
474 *
475 * @param arg the acquire argument
476 * @param nanosTimeout max wait time
477 * @return {@code true} if acquired
478 */
479 private boolean doAcquireNanos(long arg, long nanosTimeout)
480 throws InterruptedException {
481 if (nanosTimeout <= 0L)
482 return false;
483 final long deadline = System.nanoTime() + nanosTimeout;
484 final Node node = addWaiter(Node.EXCLUSIVE);
485 try {
486 for (;;) {
487 final Node p = node.predecessor();
488 if (p == head && tryAcquire(arg)) {
489 setHead(node);
490 p.next = null; // help GC
491 return true;
492 }
493 nanosTimeout = deadline - System.nanoTime();
494 if (nanosTimeout <= 0L) {
495 cancelAcquire(node);
496 return false;
497 }
498 if (shouldParkAfterFailedAcquire(p, node) &&
499 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
500 LockSupport.parkNanos(this, nanosTimeout);
501 if (Thread.interrupted())
502 throw new InterruptedException();
503 }
504 } catch (Throwable t) {
505 cancelAcquire(node);
506 throw t;
507 }
508 }
509
510 /**
511 * Acquires in shared uninterruptible mode.
512 * @param arg the acquire argument
513 */
514 private void doAcquireShared(long arg) {
515 final Node node = addWaiter(Node.SHARED);
516 boolean interrupted = false;
517 try {
518 for (;;) {
519 final Node p = node.predecessor();
520 if (p == head) {
521 long r = tryAcquireShared(arg);
522 if (r >= 0) {
523 setHeadAndPropagate(node, r);
524 p.next = null; // help GC
525 return;
526 }
527 }
528 if (shouldParkAfterFailedAcquire(p, node))
529 interrupted |= parkAndCheckInterrupt();
530 }
531 } catch (Throwable t) {
532 cancelAcquire(node);
533 throw t;
534 } finally {
535 if (interrupted)
536 selfInterrupt();
537 }
538 }
539
540 /**
541 * Acquires in shared interruptible mode.
542 * @param arg the acquire argument
543 */
544 private void doAcquireSharedInterruptibly(long arg)
545 throws InterruptedException {
546 final Node node = addWaiter(Node.SHARED);
547 try {
548 for (;;) {
549 final Node p = node.predecessor();
550 if (p == head) {
551 long r = tryAcquireShared(arg);
552 if (r >= 0) {
553 setHeadAndPropagate(node, r);
554 p.next = null; // help GC
555 return;
556 }
557 }
558 if (shouldParkAfterFailedAcquire(p, node) &&
559 parkAndCheckInterrupt())
560 throw new InterruptedException();
561 }
562 } catch (Throwable t) {
563 cancelAcquire(node);
564 throw t;
565 }
566 }
567
568 /**
569 * Acquires in shared timed mode.
570 *
571 * @param arg the acquire argument
572 * @param nanosTimeout max wait time
573 * @return {@code true} if acquired
574 */
575 private boolean doAcquireSharedNanos(long arg, long nanosTimeout)
576 throws InterruptedException {
577 if (nanosTimeout <= 0L)
578 return false;
579 final long deadline = System.nanoTime() + nanosTimeout;
580 final Node node = addWaiter(Node.SHARED);
581 try {
582 for (;;) {
583 final Node p = node.predecessor();
584 if (p == head) {
585 long r = tryAcquireShared(arg);
586 if (r >= 0) {
587 setHeadAndPropagate(node, r);
588 p.next = null; // help GC
589 return true;
590 }
591 }
592 nanosTimeout = deadline - System.nanoTime();
593 if (nanosTimeout <= 0L) {
594 cancelAcquire(node);
595 return false;
596 }
597 if (shouldParkAfterFailedAcquire(p, node) &&
598 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
599 LockSupport.parkNanos(this, nanosTimeout);
600 if (Thread.interrupted())
601 throw new InterruptedException();
602 }
603 } catch (Throwable t) {
604 cancelAcquire(node);
605 throw t;
606 }
607 }
608
609 // Main exported methods
610
611 /**
612 * Attempts to acquire in exclusive mode. This method should query
613 * if the state of the object permits it to be acquired in the
614 * exclusive mode, and if so to acquire it.
615 *
616 * <p>This method is always invoked by the thread performing
617 * acquire. If this method reports failure, the acquire method
618 * may queue the thread, if it is not already queued, until it is
619 * signalled by a release from some other thread. This can be used
620 * to implement method {@link Lock#tryLock()}.
621 *
622 * <p>The default
623 * implementation throws {@link UnsupportedOperationException}.
624 *
625 * @param arg the acquire argument. This value is always the one
626 * passed to an acquire method, or is the value saved on entry
739 * {@code false} otherwise
740 * @throws UnsupportedOperationException if conditions are not supported
741 */
742 protected boolean isHeldExclusively() {
743 throw new UnsupportedOperationException();
744 }
745
746 /**
747 * Acquires in exclusive mode, ignoring interrupts. Implemented
748 * by invoking at least once {@link #tryAcquire},
749 * returning on success. Otherwise the thread is queued, possibly
750 * repeatedly blocking and unblocking, invoking {@link
751 * #tryAcquire} until success. This method can be used
752 * to implement method {@link Lock#lock}.
753 *
754 * @param arg the acquire argument. This value is conveyed to
755 * {@link #tryAcquire} but is otherwise uninterpreted and
756 * can represent anything you like.
757 */
758 public final void acquire(long arg) {
759 if (!tryAcquire(arg) &&
760 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
761 selfInterrupt();
762 }
763
764 /**
765 * Acquires in exclusive mode, aborting if interrupted.
766 * Implemented by first checking interrupt status, then invoking
767 * at least once {@link #tryAcquire}, returning on
768 * success. Otherwise the thread is queued, possibly repeatedly
769 * blocking and unblocking, invoking {@link #tryAcquire}
770 * until success or the thread is interrupted. This method can be
771 * used to implement method {@link Lock#lockInterruptibly}.
772 *
773 * @param arg the acquire argument. This value is conveyed to
774 * {@link #tryAcquire} but is otherwise uninterpreted and
775 * can represent anything you like.
776 * @throws InterruptedException if the current thread is interrupted
777 */
778 public final void acquireInterruptibly(long arg)
779 throws InterruptedException {
780 if (Thread.interrupted())
781 throw new InterruptedException();
782 if (!tryAcquire(arg))
783 doAcquireInterruptibly(arg);
784 }
785
786 /**
787 * Attempts to acquire in exclusive mode, aborting if interrupted,
788 * and failing if the given timeout elapses. Implemented by first
789 * checking interrupt status, then invoking at least once {@link
790 * #tryAcquire}, returning on success. Otherwise, the thread is
791 * queued, possibly repeatedly blocking and unblocking, invoking
792 * {@link #tryAcquire} until success or the thread is interrupted
793 * or the timeout elapses. This method can be used to implement
794 * method {@link Lock#tryLock(long, TimeUnit)}.
795 *
796 * @param arg the acquire argument. This value is conveyed to
797 * {@link #tryAcquire} but is otherwise uninterpreted and
798 * can represent anything you like.
799 * @param nanosTimeout the maximum number of nanoseconds to wait
800 * @return {@code true} if acquired; {@code false} if timed out
801 * @throws InterruptedException if the current thread is interrupted
802 */
803 public final boolean tryAcquireNanos(long arg, long nanosTimeout)
804 throws InterruptedException {
805 if (Thread.interrupted())
806 throw new InterruptedException();
807 return tryAcquire(arg) ||
808 doAcquireNanos(arg, nanosTimeout);
809 }
810
811 /**
812 * Releases in exclusive mode. Implemented by unblocking one or
813 * more threads if {@link #tryRelease} returns true.
814 * This method can be used to implement method {@link Lock#unlock}.
815 *
816 * @param arg the release argument. This value is conveyed to
817 * {@link #tryRelease} but is otherwise uninterpreted and
818 * can represent anything you like.
819 * @return the value returned from {@link #tryRelease}
820 */
821 public final boolean release(long arg) {
822 if (tryRelease(arg)) {
823 Node h = head;
824 if (h != null && h.waitStatus != 0)
825 unparkSuccessor(h);
826 return true;
827 }
828 return false;
829 }
830
831 /**
832 * Acquires in shared mode, ignoring interrupts. Implemented by
833 * first invoking at least once {@link #tryAcquireShared},
834 * returning on success. Otherwise the thread is queued, possibly
835 * repeatedly blocking and unblocking, invoking {@link
836 * #tryAcquireShared} until success.
837 *
838 * @param arg the acquire argument. This value is conveyed to
839 * {@link #tryAcquireShared} but is otherwise uninterpreted
840 * and can represent anything you like.
841 */
842 public final void acquireShared(long arg) {
843 if (tryAcquireShared(arg) < 0)
844 doAcquireShared(arg);
845 }
846
847 /**
848 * Acquires in shared mode, aborting if interrupted. Implemented
849 * by first checking interrupt status, then invoking at least once
850 * {@link #tryAcquireShared}, returning on success. Otherwise the
851 * thread is queued, possibly repeatedly blocking and unblocking,
852 * invoking {@link #tryAcquireShared} until success or the thread
853 * is interrupted.
854 * @param arg the acquire argument.
855 * This value is conveyed to {@link #tryAcquireShared} but is
856 * otherwise uninterpreted and can represent anything
857 * you like.
858 * @throws InterruptedException if the current thread is interrupted
859 */
860 public final void acquireSharedInterruptibly(long arg)
861 throws InterruptedException {
862 if (Thread.interrupted())
863 throw new InterruptedException();
864 if (tryAcquireShared(arg) < 0)
865 doAcquireSharedInterruptibly(arg);
866 }
867
868 /**
869 * Attempts to acquire in shared mode, aborting if interrupted, and
870 * failing if the given timeout elapses. Implemented by first
871 * checking interrupt status, then invoking at least once {@link
872 * #tryAcquireShared}, returning on success. Otherwise, the
873 * thread is queued, possibly repeatedly blocking and unblocking,
874 * invoking {@link #tryAcquireShared} until success or the thread
875 * is interrupted or the timeout elapses.
876 *
877 * @param arg the acquire argument. This value is conveyed to
878 * {@link #tryAcquireShared} but is otherwise uninterpreted
879 * and can represent anything you like.
880 * @param nanosTimeout the maximum number of nanoseconds to wait
881 * @return {@code true} if acquired; {@code false} if timed out
882 * @throws InterruptedException if the current thread is interrupted
883 */
884 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
885 throws InterruptedException {
886 if (Thread.interrupted())
887 throw new InterruptedException();
888 return tryAcquireShared(arg) >= 0 ||
889 doAcquireSharedNanos(arg, nanosTimeout);
890 }
891
892 /**
893 * Releases in shared mode. Implemented by unblocking one or more
894 * threads if {@link #tryReleaseShared} returns true.
895 *
896 * @param arg the release argument. This value is conveyed to
897 * {@link #tryReleaseShared} but is otherwise uninterpreted
898 * and can represent anything you like.
899 * @return the value returned from {@link #tryReleaseShared}
900 */
901 public final boolean releaseShared(long arg) {
902 if (tryReleaseShared(arg)) {
903 doReleaseShared();
904 return true;
905 }
906 return false;
907 }
908
909 // Queue inspection methods
910
911 /**
912 * Queries whether any threads are waiting to acquire. Note that
913 * because cancellations due to interrupts and timeouts may occur
914 * at any time, a {@code true} return does not guarantee that any
915 * other thread will ever acquire.
916 *
917 * @return {@code true} if there may be other threads waiting to acquire
918 */
919 public final boolean hasQueuedThreads() {
920 for (Node p = tail, h = head; p != h && p != null; p = p.prev)
921 if (p.waitStatus <= 0)
922 return true;
923 return false;
924 }
925
926 /**
927 * Queries whether any threads have ever contended to acquire this
928 * synchronizer; that is, if an acquire method has ever blocked.
929 *
930 * <p>In this implementation, this operation returns in
931 * constant time.
932 *
933 * @return {@code true} if there has ever been contention
934 */
935 public final boolean hasContended() {
936 return head != null;
937 }
938
939 /**
940 * Returns the first (longest-waiting) thread in the queue, or
941 * {@code null} if no threads are currently queued.
942 *
943 * <p>In this implementation, this operation normally returns in
944 * constant time, but may iterate upon contention if other threads are
945 * concurrently modifying the queue.
946 *
947 * @return the first (longest-waiting) thread in the queue, or
948 * {@code null} if no threads are currently queued
949 */
950 public final Thread getFirstQueuedThread() {
951 // handle only fast path, else relay
952 return (head == tail) ? null : fullGetFirstQueuedThread();
953 }
954
955 /**
956 * Version of getFirstQueuedThread called when fastpath fails.
957 */
958 private Thread fullGetFirstQueuedThread() {
959 /*
960 * The first node is normally head.next. Try to get its
961 * thread field, ensuring consistent reads: If thread
962 * field is nulled out or s.prev is no longer head, then
963 * some other thread(s) concurrently performed setHead in
964 * between some of our reads. We try this twice before
965 * resorting to traversal.
966 */
967 Node h, s;
968 Thread st;
969 if (((h = head) != null && (s = h.next) != null &&
970 s.prev == head && (st = s.thread) != null) ||
971 ((h = head) != null && (s = h.next) != null &&
972 s.prev == head && (st = s.thread) != null))
973 return st;
974
975 /*
976 * Head's next field might not have been set yet, or may have
977 * been unset after setHead. So we must check to see if tail
978 * is actually first node. If not, we continue on, safely
979 * traversing from tail back to head to find first,
980 * guaranteeing termination.
981 */
982
983 Thread firstThread = null;
984 for (Node p = tail; p != null && p != head; p = p.prev) {
985 Thread t = p.thread;
986 if (t != null)
987 firstThread = t;
988 }
989 return firstThread;
990 }
991
992 /**
993 * Returns true if the given thread is currently queued.
994 *
995 * <p>This implementation traverses the queue to determine
996 * presence of the given thread.
997 *
998 * @param thread the thread
999 * @return {@code true} if the given thread is on the queue
1000 * @throws NullPointerException if the thread is null
1001 */
1002 public final boolean isQueued(Thread thread) {
1003 if (thread == null)
1004 throw new NullPointerException();
1005 for (Node p = tail; p != null; p = p.prev)
1006 if (p.thread == thread)
1007 return true;
1008 return false;
1009 }
1010
1011 /**
1012 * Returns {@code true} if the apparent first queued thread, if one
1013 * exists, is waiting in exclusive mode. If this method returns
1014 * {@code true}, and the current thread is attempting to acquire in
1015 * shared mode (that is, this method is invoked from {@link
1016 * #tryAcquireShared}) then it is guaranteed that the current thread
1017 * is not the first queued thread. Used only as a heuristic in
1018 * ReentrantReadWriteLock.
1019 */
1020 final boolean apparentlyFirstQueuedIsExclusive() {
1021 Node h, s;
1022 return (h = head) != null &&
1023 (s = h.next) != null &&
1024 !s.isShared() &&
1025 s.thread != null;
1026 }
1027
1028 /**
1029 * Queries whether any threads have been waiting to acquire longer
1030 * than the current thread.
1031 *
1032 * <p>An invocation of this method is equivalent to (but may be
1033 * more efficient than):
1034 * <pre> {@code
1035 * getFirstQueuedThread() != Thread.currentThread()
1036 * && hasQueuedThreads()}</pre>
1037 *
1038 * <p>Note that because cancellations due to interrupts and
1039 * timeouts may occur at any time, a {@code true} return does not
1040 * guarantee that some other thread will acquire before the current
1041 * thread. Likewise, it is possible for another thread to win a
1042 * race to enqueue after this method has returned {@code false},
1043 * due to the queue being empty.
1044 *
1045 * <p>This method is designed to be used by a fair synchronizer to
1046 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
1047 * Such a synchronizer's {@link #tryAcquire} method should return
1048 * {@code false}, and its {@link #tryAcquireShared} method should
1049 * return a negative value, if this method returns {@code true}
1050 * (unless this is a reentrant acquire). For example, the {@code
1051 * tryAcquire} method for a fair, reentrant, exclusive mode
1052 * synchronizer might look like this:
1053 *
1054 * <pre> {@code
1055 * protected boolean tryAcquire(int arg) {
1056 * if (isHeldExclusively()) {
1057 * // A reentrant acquire; increment hold count
1058 * return true;
1059 * } else if (hasQueuedPredecessors()) {
1060 * return false;
1061 * } else {
1062 * // try to acquire normally
1063 * }
1064 * }}</pre>
1065 *
1066 * @return {@code true} if there is a queued thread preceding the
1067 * current thread, and {@code false} if the current thread
1068 * is at the head of the queue or the queue is empty
1069 * @since 1.7
1070 */
1071 public final boolean hasQueuedPredecessors() {
1072 Node h, s;
1073 if ((h = head) != null) {
1074 if ((s = h.next) == null || s.waitStatus > 0) {
1075 s = null; // traverse in case of concurrent cancellation
1076 for (Node p = tail; p != h && p != null; p = p.prev) {
1077 if (p.waitStatus <= 0)
1078 s = p;
1079 }
1080 }
1081 if (s != null && s.thread != Thread.currentThread())
1082 return true;
1083 }
1084 return false;
1085 }
1086
1087 // Instrumentation and monitoring methods
1088
1089 /**
1090 * Returns an estimate of the number of threads waiting to
1091 * acquire. The value is only an estimate because the number of
1092 * threads may change dynamically while this method traverses
1093 * internal data structures. This method is designed for use in
1094 * monitoring system state, not for synchronization control.
1095 *
1096 * @return the estimated number of threads waiting to acquire
1097 */
1098 public final int getQueueLength() {
1099 int n = 0;
1100 for (Node p = tail; p != null; p = p.prev) {
1101 if (p.thread != null)
1102 ++n;
1103 }
1104 return n;
1105 }
1106
1107 /**
1108 * Returns a collection containing threads that may be waiting to
1109 * acquire. Because the actual set of threads may change
1110 * dynamically while constructing this result, the returned
1111 * collection is only a best-effort estimate. The elements of the
1112 * returned collection are in no particular order. This method is
1113 * designed to facilitate construction of subclasses that provide
1114 * more extensive monitoring facilities.
1115 *
1116 * @return the collection of threads
1117 */
1118 public final Collection<Thread> getQueuedThreads() {
1119 ArrayList<Thread> list = new ArrayList<>();
1120 for (Node p = tail; p != null; p = p.prev) {
1121 Thread t = p.thread;
1122 if (t != null)
1123 list.add(t);
1124 }
1125 return list;
1126 }
1127
1128 /**
1129 * Returns a collection containing threads that may be waiting to
1130 * acquire in exclusive mode. This has the same properties
1131 * as {@link #getQueuedThreads} except that it only returns
1132 * those threads waiting due to an exclusive acquire.
1133 *
1134 * @return the collection of threads
1135 */
1136 public final Collection<Thread> getExclusiveQueuedThreads() {
1137 ArrayList<Thread> list = new ArrayList<>();
1138 for (Node p = tail; p != null; p = p.prev) {
1139 if (!p.isShared()) {
1140 Thread t = p.thread;
1141 if (t != null)
1142 list.add(t);
1143 }
1144 }
1145 return list;
1146 }
1147
1148 /**
1149 * Returns a collection containing threads that may be waiting to
1150 * acquire in shared mode. This has the same properties
1151 * as {@link #getQueuedThreads} except that it only returns
1152 * those threads waiting due to a shared acquire.
1153 *
1154 * @return the collection of threads
1155 */
1156 public final Collection<Thread> getSharedQueuedThreads() {
1157 ArrayList<Thread> list = new ArrayList<>();
1158 for (Node p = tail; p != null; p = p.prev) {
1159 if (p.isShared()) {
1160 Thread t = p.thread;
1161 if (t != null)
1162 list.add(t);
1163 }
1164 }
1165 return list;
1166 }
1167
1168 /**
1169 * Returns a string identifying this synchronizer, as well as its state.
1170 * The state, in brackets, includes the String {@code "State ="}
1171 * followed by the current value of {@link #getState}, and either
1172 * {@code "nonempty"} or {@code "empty"} depending on whether the
1173 * queue is empty.
1174 *
1175 * @return a string identifying this synchronizer, as well as its state
1176 */
1177 public String toString() {
1178 return super.toString()
1179 + "[State = " + getState() + ", "
1180 + (hasQueuedThreads() ? "non" : "") + "empty queue]";
1181 }
1182
1183
1184 // Internal support methods for Conditions
1185
1186 /**
1187 * Returns true if a node, always one that was initially placed on
1188 * a condition queue, is now waiting to reacquire on sync queue.
1189 * @param node the node
1190 * @return true if is reacquiring
1191 */
1192 final boolean isOnSyncQueue(Node node) {
1193 if (node.waitStatus == Node.CONDITION || node.prev == null)
1194 return false;
1195 if (node.next != null) // If has successor, it must be on queue
1196 return true;
1197 /*
1198 * node.prev can be non-null, but not yet on queue because
1199 * the CAS to place it on queue can fail. So we have to
1200 * traverse from tail to make sure it actually made it. It
1201 * will always be near the tail in calls to this method, and
1202 * unless the CAS failed (which is unlikely), it will be
1203 * there, so we hardly ever traverse much.
1204 */
1205 return findNodeFromTail(node);
1206 }
1207
1208 /**
1209 * Returns true if node is on sync queue by searching backwards from tail.
1210 * Called only when needed by isOnSyncQueue.
1211 * @return true if present
1212 */
1213 private boolean findNodeFromTail(Node node) {
1214 // We check for node first, since it's likely to be at or near tail.
1215 // tail is known to be non-null, so we could re-order to "save"
1216 // one null check, but we leave it this way to help the VM.
1217 for (Node p = tail;;) {
1218 if (p == node)
1219 return true;
1220 if (p == null)
1221 return false;
1222 p = p.prev;
1223 }
1224 }
1225
1226 /**
1227 * Transfers a node from a condition queue onto sync queue.
1228 * Returns true if successful.
1229 * @param node the node
1230 * @return true if successfully transferred (else the node was
1231 * cancelled before signal)
1232 */
1233 final boolean transferForSignal(Node node) {
1234 /*
1235 * If cannot change waitStatus, the node has been cancelled.
1236 */
1237 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
1238 return false;
1239
1240 /*
1241 * Splice onto queue and try to set waitStatus of predecessor to
1242 * indicate that thread is (probably) waiting. If cancelled or
1243 * attempt to set waitStatus fails, wake up to resync (in which
1244 * case the waitStatus can be transiently and harmlessly wrong).
1245 */
1246 Node p = enq(node);
1247 int ws = p.waitStatus;
1248 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
1249 LockSupport.unpark(node.thread);
1250 return true;
1251 }
1252
1253 /**
1254 * Transfers node, if necessary, to sync queue after a cancelled wait.
1255 * Returns true if thread was cancelled before being signalled.
1256 *
1257 * @param node the node
1258 * @return true if cancelled before the node was signalled
1259 */
1260 final boolean transferAfterCancelledWait(Node node) {
1261 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
1262 enq(node);
1263 return true;
1264 }
1265 /*
1266 * If we lost out to a signal(), then we can't proceed
1267 * until it finishes its enq(). Cancelling during an
1268 * incomplete transfer is both rare and transient, so just
1269 * spin.
1270 */
1271 while (!isOnSyncQueue(node))
1272 Thread.yield();
1273 return false;
1274 }
1275
1276 /**
1277 * Invokes release with current state value; returns saved state.
1278 * Cancels node and throws exception on failure.
1279 * @param node the condition node for this wait
1280 * @return previous sync state
1281 */
1282 final long fullyRelease(Node node) {
1283 try {
1284 long savedState = getState();
1285 if (release(savedState))
1286 return savedState;
1287 throw new IllegalMonitorStateException();
1288 } catch (Throwable t) {
1289 node.waitStatus = Node.CANCELLED;
1290 throw t;
1291 }
1292 }
1293
1294 // Instrumentation methods for conditions
1295
1296 /**
1297 * Queries whether the given ConditionObject
1298 * uses this synchronizer as its lock.
1299 *
1300 * @param condition the condition
1301 * @return {@code true} if owned
1302 * @throws NullPointerException if the condition is null
1303 */
1304 public final boolean owns(ConditionObject condition) {
1305 return condition.isOwnedBy(this);
1306 }
1307
1308 /**
1309 * Queries whether any threads are waiting on the given condition
1310 * associated with this synchronizer. Note that because timeouts
1311 * and interrupts may occur at any time, a {@code true} return
1312 * does not guarantee that a future {@code signal} will awaken
1313 * any threads. This method is designed primarily for use in
1367 */
1368 public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
1369 if (!owns(condition))
1370 throw new IllegalArgumentException("Not owner");
1371 return condition.getWaitingThreads();
1372 }
1373
1374 /**
1375 * Condition implementation for a {@link AbstractQueuedLongSynchronizer}
1376 * serving as the basis of a {@link Lock} implementation.
1377 *
1378 * <p>Method documentation for this class describes mechanics,
1379 * not behavioral specifications from the point of view of Lock
1380 * and Condition users. Exported versions of this class will in
1381 * general need to be accompanied by documentation describing
1382 * condition semantics that rely on those of the associated
1383 * {@code AbstractQueuedLongSynchronizer}.
1384 *
1385 * <p>This class is Serializable, but all fields are transient,
1386 * so deserialized conditions have no waiters.
1387 *
1388 * @since 1.6
1389 */
1390 public class ConditionObject implements Condition, java.io.Serializable {
1391 private static final long serialVersionUID = 1173984872572414699L;
1392 /** First node of condition queue. */
1393 private transient Node firstWaiter;
1394 /** Last node of condition queue. */
1395 private transient Node lastWaiter;
1396
1397 /**
1398 * Creates a new {@code ConditionObject} instance.
1399 */
1400 public ConditionObject() { }
1401
1402 // Internal methods
1403
1404 /**
1405 * Adds a new waiter to wait queue.
1406 * @return its new wait node
1407 */
1408 private Node addConditionWaiter() {
1409 if (!isHeldExclusively())
1410 throw new IllegalMonitorStateException();
1411 Node t = lastWaiter;
1412 // If lastWaiter is cancelled, clean out.
1413 if (t != null && t.waitStatus != Node.CONDITION) {
1414 unlinkCancelledWaiters();
1415 t = lastWaiter;
1416 }
1417
1418 Node node = new Node(Node.CONDITION);
1419
1420 if (t == null)
1421 firstWaiter = node;
1422 else
1423 t.nextWaiter = node;
1424 lastWaiter = node;
1425 return node;
1426 }
1427
1428 /**
1429 * Removes and transfers nodes until hit non-cancelled one or
1430 * null. Split out from signal in part to encourage compilers
1431 * to inline the case of no waiters.
1432 * @param first (non-null) the first node on condition queue
1433 */
1434 private void doSignal(Node first) {
1435 do {
1436 if ( (firstWaiter = first.nextWaiter) == null)
1437 lastWaiter = null;
1438 first.nextWaiter = null;
1439 } while (!transferForSignal(first) &&
1440 (first = firstWaiter) != null);
1441 }
1442
1443 /**
1444 * Removes and transfers all nodes.
1445 * @param first (non-null) the first node on condition queue
1446 */
1447 private void doSignalAll(Node first) {
1448 lastWaiter = firstWaiter = null;
1449 do {
1450 Node next = first.nextWaiter;
1451 first.nextWaiter = null;
1452 transferForSignal(first);
1453 first = next;
1454 } while (first != null);
1455 }
1456
1457 /**
1458 * Unlinks cancelled waiter nodes from condition queue.
1459 * Called only while holding lock. This is called when
1460 * cancellation occurred during condition wait, and upon
1461 * insertion of a new waiter when lastWaiter is seen to have
1462 * been cancelled. This method is needed to avoid garbage
1463 * retention in the absence of signals. So even though it may
1464 * require a full traversal, it comes into play only when
1465 * timeouts or cancellations occur in the absence of
1466 * signals. It traverses all nodes rather than stopping at a
1467 * particular target to unlink all pointers to garbage nodes
1468 * without requiring many re-traversals during cancellation
1469 * storms.
1470 */
1471 private void unlinkCancelledWaiters() {
1472 Node t = firstWaiter;
1473 Node trail = null;
1474 while (t != null) {
1475 Node next = t.nextWaiter;
1476 if (t.waitStatus != Node.CONDITION) {
1477 t.nextWaiter = null;
1478 if (trail == null)
1479 firstWaiter = next;
1480 else
1481 trail.nextWaiter = next;
1482 if (next == null)
1483 lastWaiter = trail;
1484 }
1485 else
1486 trail = t;
1487 t = next;
1488 }
1489 }
1490
1491 // public methods
1492
1493 /**
1494 * Moves the longest-waiting thread, if one exists, from the
1495 * wait queue for this condition to the wait queue for the
1496 * owning lock.
1497 *
1498 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1499 * returns {@code false}
1500 */
1501 public final void signal() {
1502 if (!isHeldExclusively())
1503 throw new IllegalMonitorStateException();
1504 Node first = firstWaiter;
1505 if (first != null)
1506 doSignal(first);
1507 }
1508
1509 /**
1510 * Moves all threads from the wait queue for this condition to
1511 * the wait queue for the owning lock.
1512 *
1513 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1514 * returns {@code false}
1515 */
1516 public final void signalAll() {
1517 if (!isHeldExclusively())
1518 throw new IllegalMonitorStateException();
1519 Node first = firstWaiter;
1520 if (first != null)
1521 doSignalAll(first);
1522 }
1523
1524 /**
1525 * Implements uninterruptible condition wait.
1526 * <ol>
1527 * <li>Save lock state returned by {@link #getState}.
1528 * <li>Invoke {@link #release} with saved state as argument,
1529 * throwing IllegalMonitorStateException if it fails.
1530 * <li>Block until signalled.
1531 * <li>Reacquire by invoking specialized version of
1532 * {@link #acquire} with saved state as argument.
1533 * </ol>
1534 */
1535 public final void awaitUninterruptibly() {
1536 Node node = addConditionWaiter();
1537 long savedState = fullyRelease(node);
1538 boolean interrupted = false;
1539 while (!isOnSyncQueue(node)) {
1540 LockSupport.park(this);
1541 if (Thread.interrupted())
1542 interrupted = true;
1543 }
1544 if (acquireQueued(node, savedState) || interrupted)
1545 selfInterrupt();
1546 }
1547
1548 /*
1549 * For interruptible waits, we need to track whether to throw
1550 * InterruptedException, if interrupted while blocked on
1551 * condition, versus reinterrupt current thread, if
1552 * interrupted while blocked waiting to re-acquire.
1553 */
1554
1555 /** Mode meaning to reinterrupt on exit from wait */
1556 private static final int REINTERRUPT = 1;
1557 /** Mode meaning to throw InterruptedException on exit from wait */
1558 private static final int THROW_IE = -1;
1559
1560 /**
1561 * Checks for interrupt, returning THROW_IE if interrupted
1562 * before signalled, REINTERRUPT if after signalled, or
1563 * 0 if not interrupted.
1564 */
1565 private int checkInterruptWhileWaiting(Node node) {
1566 return Thread.interrupted() ?
1567 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
1568 0;
1569 }
1570
1571 /**
1572 * Throws InterruptedException, reinterrupts current thread, or
1573 * does nothing, depending on mode.
1574 */
1575 private void reportInterruptAfterWait(int interruptMode)
1576 throws InterruptedException {
1577 if (interruptMode == THROW_IE)
1578 throw new InterruptedException();
1579 else if (interruptMode == REINTERRUPT)
1580 selfInterrupt();
1581 }
1582
1583 /**
1584 * Implements interruptible condition wait.
1585 * <ol>
1586 * <li>If current thread is interrupted, throw InterruptedException.
1587 * <li>Save lock state returned by {@link #getState}.
1588 * <li>Invoke {@link #release} with saved state as argument,
1589 * throwing IllegalMonitorStateException if it fails.
1590 * <li>Block until signalled or interrupted.
1591 * <li>Reacquire by invoking specialized version of
1592 * {@link #acquire} with saved state as argument.
1593 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1594 * </ol>
1595 */
1596 public final void await() throws InterruptedException {
1597 if (Thread.interrupted())
1598 throw new InterruptedException();
1599 Node node = addConditionWaiter();
1600 long savedState = fullyRelease(node);
1601 int interruptMode = 0;
1602 while (!isOnSyncQueue(node)) {
1603 LockSupport.park(this);
1604 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1605 break;
1606 }
1607 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1608 interruptMode = REINTERRUPT;
1609 if (node.nextWaiter != null) // clean up if cancelled
1610 unlinkCancelledWaiters();
1611 if (interruptMode != 0)
1612 reportInterruptAfterWait(interruptMode);
1613 }
1614
1615 /**
1616 * Implements timed condition wait.
1617 * <ol>
1618 * <li>If current thread is interrupted, throw InterruptedException.
1619 * <li>Save lock state returned by {@link #getState}.
1620 * <li>Invoke {@link #release} with saved state as argument,
1621 * throwing IllegalMonitorStateException if it fails.
1622 * <li>Block until signalled, interrupted, or timed out.
1623 * <li>Reacquire by invoking specialized version of
1624 * {@link #acquire} with saved state as argument.
1625 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1626 * </ol>
1627 */
1628 public final long awaitNanos(long nanosTimeout)
1629 throws InterruptedException {
1630 if (Thread.interrupted())
1631 throw new InterruptedException();
1632 // We don't check for nanosTimeout <= 0L here, to allow
1633 // awaitNanos(0) as a way to "yield the lock".
1634 final long deadline = System.nanoTime() + nanosTimeout;
1635 long initialNanos = nanosTimeout;
1636 Node node = addConditionWaiter();
1637 long savedState = fullyRelease(node);
1638 int interruptMode = 0;
1639 while (!isOnSyncQueue(node)) {
1640 if (nanosTimeout <= 0L) {
1641 transferAfterCancelledWait(node);
1642 break;
1643 }
1644 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
1645 LockSupport.parkNanos(this, nanosTimeout);
1646 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1647 break;
1648 nanosTimeout = deadline - System.nanoTime();
1649 }
1650 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1651 interruptMode = REINTERRUPT;
1652 if (node.nextWaiter != null)
1653 unlinkCancelledWaiters();
1654 if (interruptMode != 0)
1655 reportInterruptAfterWait(interruptMode);
1656 long remaining = deadline - System.nanoTime(); // avoid overflow
1657 return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
1658 }
1659
1660 /**
1661 * Implements absolute timed condition wait.
1662 * <ol>
1663 * <li>If current thread is interrupted, throw InterruptedException.
1664 * <li>Save lock state returned by {@link #getState}.
1665 * <li>Invoke {@link #release} with saved state as argument,
1666 * throwing IllegalMonitorStateException if it fails.
1667 * <li>Block until signalled, interrupted, or timed out.
1668 * <li>Reacquire by invoking specialized version of
1669 * {@link #acquire} with saved state as argument.
1670 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1671 * <li>If timed out while blocked in step 4, return false, else true.
1672 * </ol>
1673 */
1674 public final boolean awaitUntil(Date deadline)
1675 throws InterruptedException {
1676 long abstime = deadline.getTime();
1677 if (Thread.interrupted())
1678 throw new InterruptedException();
1679 Node node = addConditionWaiter();
1680 long savedState = fullyRelease(node);
1681 boolean timedout = false;
1682 int interruptMode = 0;
1683 while (!isOnSyncQueue(node)) {
1684 if (System.currentTimeMillis() >= abstime) {
1685 timedout = transferAfterCancelledWait(node);
1686 break;
1687 }
1688 LockSupport.parkUntil(this, abstime);
1689 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1690 break;
1691 }
1692 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1693 interruptMode = REINTERRUPT;
1694 if (node.nextWaiter != null)
1695 unlinkCancelledWaiters();
1696 if (interruptMode != 0)
1697 reportInterruptAfterWait(interruptMode);
1698 return !timedout;
1699 }
1700
1701 /**
1702 * Implements timed condition wait.
1703 * <ol>
1704 * <li>If current thread is interrupted, throw InterruptedException.
1705 * <li>Save lock state returned by {@link #getState}.
1706 * <li>Invoke {@link #release} with saved state as argument,
1707 * throwing IllegalMonitorStateException if it fails.
1708 * <li>Block until signalled, interrupted, or timed out.
1709 * <li>Reacquire by invoking specialized version of
1710 * {@link #acquire} with saved state as argument.
1711 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1712 * <li>If timed out while blocked in step 4, return false, else true.
1713 * </ol>
1714 */
1715 public final boolean await(long time, TimeUnit unit)
1716 throws InterruptedException {
1717 long nanosTimeout = unit.toNanos(time);
1718 if (Thread.interrupted())
1719 throw new InterruptedException();
1720 // We don't check for nanosTimeout <= 0L here, to allow
1721 // await(0, unit) as a way to "yield the lock".
1722 final long deadline = System.nanoTime() + nanosTimeout;
1723 Node node = addConditionWaiter();
1724 long savedState = fullyRelease(node);
1725 boolean timedout = false;
1726 int interruptMode = 0;
1727 while (!isOnSyncQueue(node)) {
1728 if (nanosTimeout <= 0L) {
1729 timedout = transferAfterCancelledWait(node);
1730 break;
1731 }
1732 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
1733 LockSupport.parkNanos(this, nanosTimeout);
1734 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1735 break;
1736 nanosTimeout = deadline - System.nanoTime();
1737 }
1738 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1739 interruptMode = REINTERRUPT;
1740 if (node.nextWaiter != null)
1741 unlinkCancelledWaiters();
1742 if (interruptMode != 0)
1743 reportInterruptAfterWait(interruptMode);
1744 return !timedout;
1745 }
1746
1747 // support for instrumentation
1748
1749 /**
1750 * Returns true if this condition was created by the given
1751 * synchronization object.
1752 *
1753 * @return {@code true} if owned
1754 */
1755 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) {
1756 return sync == AbstractQueuedLongSynchronizer.this;
1757 }
1758
1759 /**
1760 * Queries whether any threads are waiting on this condition.
1761 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}.
1762 *
1763 * @return {@code true} if there are any waiting threads
1764 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1765 * returns {@code false}
1766 */
1767 protected final boolean hasWaiters() {
1768 if (!isHeldExclusively())
1769 throw new IllegalMonitorStateException();
1770 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1771 if (w.waitStatus == Node.CONDITION)
1772 return true;
1773 }
1774 return false;
1775 }
1776
1777 /**
1778 * Returns an estimate of the number of threads waiting on
1779 * this condition.
1780 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}.
1781 *
1782 * @return the estimated number of waiting threads
1783 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1784 * returns {@code false}
1785 */
1786 protected final int getWaitQueueLength() {
1787 if (!isHeldExclusively())
1788 throw new IllegalMonitorStateException();
1789 int n = 0;
1790 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1791 if (w.waitStatus == Node.CONDITION)
1792 ++n;
1793 }
1794 return n;
1795 }
1796
1797 /**
1798 * Returns a collection containing those threads that may be
1799 * waiting on this Condition.
1800 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}.
1801 *
1802 * @return the collection of threads
1803 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1804 * returns {@code false}
1805 */
1806 protected final Collection<Thread> getWaitingThreads() {
1807 if (!isHeldExclusively())
1808 throw new IllegalMonitorStateException();
1809 ArrayList<Thread> list = new ArrayList<>();
1810 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1811 if (w.waitStatus == Node.CONDITION) {
1812 Thread t = w.thread;
1813 if (t != null)
1814 list.add(t);
1815 }
1816 }
1817 return list;
1818 }
1819 }
1820
1821 // VarHandle mechanics
1822 private static final VarHandle STATE;
1823 private static final VarHandle HEAD;
1824 private static final VarHandle TAIL;
1825
1826 static {
1827 try {
1828 MethodHandles.Lookup l = MethodHandles.lookup();
1829 STATE = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "state", long.class);
1830 HEAD = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "head", Node.class);
1831 TAIL = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "tail", Node.class);
1832 } catch (ReflectiveOperationException e) {
1833 throw new ExceptionInInitializerError(e);
1834 }
1835
1836 // Reduce the risk of rare disastrous classloading in first call to
1837 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1838 Class<?> ensureLoaded = LockSupport.class;
1839 }
1840
1841 /**
1842 * Initializes head and tail fields on first contention.
1843 */
1844 private final void initializeSyncQueue() {
1845 Node h;
1846 if (HEAD.compareAndSet(this, null, (h = new Node())))
1847 tail = h;
1848 }
1849
1850 /**
1851 * CASes tail field.
1852 */
1853 private final boolean compareAndSetTail(Node expect, Node update) {
1854 return TAIL.compareAndSet(this, expect, update);
1855 }
1856 }
|
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent.locks;
37
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.Date;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.ForkJoinPool;
43 import jdk.internal.misc.Unsafe;
44
45 /**
46 * A version of {@link AbstractQueuedSynchronizer} in
47 * which synchronization state is maintained as a {@code long}.
48 * This class has exactly the same structure, properties, and methods
49 * as {@code AbstractQueuedSynchronizer} with the exception
50 * that all state-related parameters and results are defined
51 * as {@code long} rather than {@code int}. This class
52 * may be useful when creating synchronizers such as
53 * multilevel locks and barriers that require
54 * 64 bits of state.
55 *
56 * <p>See {@link AbstractQueuedSynchronizer} for usage
57 * notes and examples.
58 *
59 * @since 1.6
60 * @author Doug Lea
61 */
62 public abstract class AbstractQueuedLongSynchronizer
63 extends AbstractOwnableSynchronizer
64 implements java.io.Serializable {
65
66 private static final long serialVersionUID = 7373984972572414692L;
67
68 /*
69 * To keep sources in sync, the remainder of this source file is
70 * exactly cloned from AbstractQueuedSynchronizer, replacing class
71 * name and changing ints related with sync state to longs. Please
72 * keep it that way.
73 */
74
75 // Node status bits, also used as argument and return values
76 static final int WAITING = 1; // must be 1
77 static final int CANCELLED = 0x80000000; // must be negative
78 static final int COND = 2; // in a condition wait
79
80 /** CLH Nodes */
81 abstract static class Node {
82 volatile Node prev; // initially attached via casTail
83 volatile Node next; // visibly nonnull when signallable
84 Thread waiter; // visibly nonnull when enqueued
85 volatile int status; // written by owner, atomic bit ops by others
86
87 // methods for atomic operations
88 final boolean casPrev(Node c, Node v) { // for cleanQueue
89 return U.weakCompareAndSetReference(this, PREV, c, v);
90 }
91 final boolean casNext(Node c, Node v) { // for cleanQueue
92 return U.weakCompareAndSetReference(this, NEXT, c, v);
93 }
94 final int getAndUnsetStatus(int v) { // for signalling
95 return U.getAndBitwiseAndInt(this, STATUS, ~v);
96 }
97 final void setPrevRelaxed(Node p) { // for off-queue assignment
98 U.putReference(this, PREV, p);
99 }
100 final void setStatusRelaxed(int s) { // for off-queue assignment
101 U.putInt(this, STATUS, s);
102 }
103 final void clearStatus() { // for reducing unneeded signals
104 U.putIntOpaque(this, STATUS, 0);
105 }
106
107 private static final long STATUS
108 = U.objectFieldOffset(Node.class, "status");
109 private static final long NEXT
110 = U.objectFieldOffset(Node.class, "next");
111 private static final long PREV
112 = U.objectFieldOffset(Node.class, "prev");
113 }
114
115 // Concrete classes tagged by type
116 static final class ExclusiveNode extends Node { }
117 static final class SharedNode extends Node { }
118
119 static final class ConditionNode extends Node
120 implements ForkJoinPool.ManagedBlocker {
121 ConditionNode nextWaiter; // link to next waiting node
122
123 /**
124 * Allows Conditions to be used in ForkJoinPools without
125 * risking fixed pool exhaustion. This is usable only for
126 * untimed Condition waits, not timed versions.
127 */
128 public final boolean isReleasable() {
129 return status <= 1 || Thread.currentThread().isInterrupted();
130 }
131
132 public final boolean block() {
133 while (!isReleasable()) LockSupport.park(this);
134 return true;
135 }
136 }
137
138 /**
139 * Head of the wait queue, lazily initialized.
140 */
141 private transient volatile Node head;
142
143 /**
144 * Tail of the wait queue. After initialization, modified only via casTail.
145 */
146 private transient volatile Node tail;
147
148 /**
149 * The synchronization state.
150 */
151 private volatile long state;
152
153 /**
154 * Returns the current value of synchronization state.
155 * This operation has memory semantics of a {@code volatile} read.
156 * @return current state value
157 */
158 protected final long getState() {
159 return state;
160 }
161
162 /**
163 * Sets the value of synchronization state.
164 * This operation has memory semantics of a {@code volatile} write.
165 * @param newState the new state value
166 */
167 protected final void setState(long newState) {
168 state = newState;
169 }
170
171 /**
172 * Atomically sets synchronization state to the given updated
173 * value if the current state value equals the expected value.
174 * This operation has memory semantics of a {@code volatile} read
175 * and write.
176 *
177 * @param expect the expected value
178 * @param update the new value
179 * @return {@code true} if successful. False return indicates that the actual
180 * value was not equal to the expected value.
181 */
182 protected final boolean compareAndSetState(long expect, long update) {
183 return U.compareAndSetLong(this, STATE, expect, update);
184 }
185
186 // Queuing utilities
187
188 private boolean casTail(Node c, Node v) {
189 return U.compareAndSetReference(this, TAIL, c, v);
190 }
191
192 /** tries once to CAS a new dummy node for head */
193 private void tryInitializeHead() {
194 Node h = new ExclusiveNode();
195 if (U.compareAndSetReference(this, HEAD, null, h))
196 tail = h;
197 }
198
199 /**
200 * Enqueues the node unless null. (Currently used only for
201 * ConditionNodes; other cases are interleaved with acquires.)
202 */
203 final void enqueue(Node node) {
204 if (node != null) {
205 for (;;) {
206 Node t = tail;
207 node.setPrevRelaxed(t); // avoid unnecessary fence
208 if (t == null) // initialize
209 tryInitializeHead();
210 else if (casTail(t, node)) {
211 t.next = node;
212 if (t.status < 0) // wake up to clean link
213 LockSupport.unpark(node.waiter);
214 break;
215 }
216 }
217 }
218 }
219
220 /** Returns true if node is found in traversal from tail */
221 final boolean isEnqueued(Node node) {
222 for (Node t = tail; t != null; t = t.prev)
223 if (t == node)
224 return true;
225 return false;
226 }
227
228 /**
229 * Wakes up the successor of given node, if one exists, and unsets its
230 * WAITING status to avoid park race. This may fail to wake up an
231 * eligible thread when one or more have been cancelled, but
232 * cancelAcquire ensures liveness.
233 */
234 private static void signalNext(Node h) {
235 Node s;
236 if (h != null && (s = h.next) != null && s.status != 0) {
237 s.getAndUnsetStatus(WAITING);
238 LockSupport.unpark(s.waiter);
239 }
240 }
241
242 /** Wakes up the given node if in shared mode */
243 private static void signalNextIfShared(Node h) {
244 Node s;
245 if (h != null && (s = h.next) != null &&
246 (s instanceof SharedNode) && s.status != 0) {
247 s.getAndUnsetStatus(WAITING);
248 LockSupport.unpark(s.waiter);
249 }
250 }
251
252 /**
253 * Main acquire method, invoked by all exported acquire methods.
254 *
255 * @param node null unless a reacquiring Condition
256 * @param arg the acquire argument
257 * @param shared true if shared mode else exclusive
258 * @param interruptible if abort and return negative on interrupt
259 * @param timed if true use timed waits
260 * @param time if timed, the System.nanoTime value to timeout
261 * @return positive if acquired, 0 if timed out, negative if interrupted
262 */
263 final int acquire(Node node, long arg, boolean shared,
264 boolean interruptible, boolean timed, long time) {
265 Thread current = Thread.currentThread();
266 byte spins = 0, postSpins = 0; // retries upon unpark of first thread
267 boolean interrupted = false, first = false;
268 Node pred = null; // predecessor of node when enqueued
269
270 /*
271 * Repeatedly:
272 * Check if node now first
273 * if so, ensure head stable, else ensure valid predecessor
274 * if node is first or not yet enqueued, try acquiring
275 * else if node not yet created, create it
276 * else if not yet enqueued, try once to enqueue
277 * else if woken from park, retry (up to postSpins times)
278 * else if WAITING status not set, set and retry
279 * else park and clear WAITING status, and check cancellation
280 */
281
282 for (;;) {
283 if (!first && (pred = (node == null) ? null : node.prev) != null &&
284 !(first = (head == pred))) {
285 if (pred.status < 0) {
286 cleanQueue(); // predecessor cancelled
287 continue;
288 } else if (pred.prev == null) {
289 Thread.onSpinWait(); // ensure serialization
290 continue;
291 }
292 }
293 if (first || pred == null) {
294 boolean acquired;
295 try {
296 if (shared)
297 acquired = (tryAcquireShared(arg) >= 0);
298 else
299 acquired = tryAcquire(arg);
300 } catch (Throwable ex) {
301 cancelAcquire(node, interrupted, false);
302 throw ex;
303 }
304 if (acquired) {
305 if (first) {
306 node.prev = null;
307 head = node;
308 pred.next = null;
309 node.waiter = null;
310 if (shared)
311 signalNextIfShared(node);
312 if (interrupted)
313 current.interrupt();
314 }
315 return 1;
316 }
317 }
318 if (node == null) { // allocate; retry before enqueue
319 if (shared)
320 node = new SharedNode();
321 else
322 node = new ExclusiveNode();
323 } else if (pred == null) { // try to enqueue
324 node.waiter = current;
325 Node t = tail;
326 node.setPrevRelaxed(t); // avoid unnecessary fence
327 if (t == null)
328 tryInitializeHead();
329 else if (!casTail(t, node))
330 node.setPrevRelaxed(null); // back out
331 else
332 t.next = node;
333 } else if (first && spins != 0) {
334 --spins; // reduce unfairness on rewaits
335 Thread.onSpinWait();
336 } else if (node.status == 0) {
337 node.status = WAITING; // enable signal and recheck
338 } else {
339 long nanos;
340 spins = postSpins = (byte)((postSpins << 1) | 1);
341 if (!timed)
342 LockSupport.park(this);
343 else if ((nanos = time - System.nanoTime()) > 0L)
344 LockSupport.parkNanos(this, nanos);
345 else
346 break;
347 node.clearStatus();
348 if ((interrupted |= Thread.interrupted()) && interruptible)
349 break;
350 }
351 }
352 return cancelAcquire(node, interrupted, interruptible);
353 }
354
355 /**
356 * Possibly repeatedly traverses from tail, unsplicing cancelled
357 * nodes until none are found.
358 */
359 private void cleanQueue() {
360 for (;;) { // restart point
361 for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
362 if (q == null || (p = q.prev) == null)
363 return; // end of list
364 if (s == null ? tail != q : (s.prev != q || s.status < 0))
365 break; // inconsistent
366 if (q.status < 0) { // cancelled
367 if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
368 q.prev == p) {
369 p.casNext(q, s); // OK if fails
370 if (p.prev == null)
371 signalNext(p);
372 }
373 break;
374 }
375 if ((n = p.next) != q) { // help finish
376 if (n != null && q.prev == p) {
377 p.casNext(n, q);
378 if (p.prev == null)
379 signalNext(p);
380 }
381 break;
382 }
383 s = q;
384 q = q.prev;
385 }
386 }
387 }
388
389 /**
390 * Cancels an ongoing attempt to acquire.
391 *
392 * @param node the node (may be null if cancelled before enqueuing)
393 * @param interrupted true if thread interrupted
394 * @param interruptible if should report interruption vs reset
395 */
396 private int cancelAcquire(Node node, boolean interrupted,
397 boolean interruptible) {
398 if (node != null) {
399 node.waiter = null;
400 node.status = CANCELLED;
401 if (node.prev != null)
402 cleanQueue();
403 }
404 if (interrupted) {
405 if (interruptible)
406 return CANCELLED;
407 else
408 Thread.currentThread().interrupt();
409 }
410 return 0;
411 }
412
413 // Main exported methods
414
415 /**
416 * Attempts to acquire in exclusive mode. This method should query
417 * if the state of the object permits it to be acquired in the
418 * exclusive mode, and if so to acquire it.
419 *
420 * <p>This method is always invoked by the thread performing
421 * acquire. If this method reports failure, the acquire method
422 * may queue the thread, if it is not already queued, until it is
423 * signalled by a release from some other thread. This can be used
424 * to implement method {@link Lock#tryLock()}.
425 *
426 * <p>The default
427 * implementation throws {@link UnsupportedOperationException}.
428 *
429 * @param arg the acquire argument. This value is always the one
430 * passed to an acquire method, or is the value saved on entry
543 * {@code false} otherwise
544 * @throws UnsupportedOperationException if conditions are not supported
545 */
546 protected boolean isHeldExclusively() {
547 throw new UnsupportedOperationException();
548 }
549
550 /**
551 * Acquires in exclusive mode, ignoring interrupts. Implemented
552 * by invoking at least once {@link #tryAcquire},
553 * returning on success. Otherwise the thread is queued, possibly
554 * repeatedly blocking and unblocking, invoking {@link
555 * #tryAcquire} until success. This method can be used
556 * to implement method {@link Lock#lock}.
557 *
558 * @param arg the acquire argument. This value is conveyed to
559 * {@link #tryAcquire} but is otherwise uninterpreted and
560 * can represent anything you like.
561 */
562 public final void acquire(long arg) {
563 if (!tryAcquire(arg))
564 acquire(null, arg, false, false, false, 0L);
565 }
566
567 /**
568 * Acquires in exclusive mode, aborting if interrupted.
569 * Implemented by first checking interrupt status, then invoking
570 * at least once {@link #tryAcquire}, returning on
571 * success. Otherwise the thread is queued, possibly repeatedly
572 * blocking and unblocking, invoking {@link #tryAcquire}
573 * until success or the thread is interrupted. This method can be
574 * used to implement method {@link Lock#lockInterruptibly}.
575 *
576 * @param arg the acquire argument. This value is conveyed to
577 * {@link #tryAcquire} but is otherwise uninterpreted and
578 * can represent anything you like.
579 * @throws InterruptedException if the current thread is interrupted
580 */
581 public final void acquireInterruptibly(long arg)
582 throws InterruptedException {
583 if (Thread.interrupted() ||
584 (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
585 throw new InterruptedException();
586 }
587
588 /**
589 * Attempts to acquire in exclusive mode, aborting if interrupted,
590 * and failing if the given timeout elapses. Implemented by first
591 * checking interrupt status, then invoking at least once {@link
592 * #tryAcquire}, returning on success. Otherwise, the thread is
593 * queued, possibly repeatedly blocking and unblocking, invoking
594 * {@link #tryAcquire} until success or the thread is interrupted
595 * or the timeout elapses. This method can be used to implement
596 * method {@link Lock#tryLock(long, TimeUnit)}.
597 *
598 * @param arg the acquire argument. This value is conveyed to
599 * {@link #tryAcquire} but is otherwise uninterpreted and
600 * can represent anything you like.
601 * @param nanosTimeout the maximum number of nanoseconds to wait
602 * @return {@code true} if acquired; {@code false} if timed out
603 * @throws InterruptedException if the current thread is interrupted
604 */
605 public final boolean tryAcquireNanos(long arg, long nanosTimeout)
606 throws InterruptedException {
607 if (!Thread.interrupted()) {
608 if (tryAcquire(arg))
609 return true;
610 if (nanosTimeout <= 0L)
611 return false;
612 int stat = acquire(null, arg, false, true, true,
613 System.nanoTime() + nanosTimeout);
614 if (stat > 0)
615 return true;
616 if (stat == 0)
617 return false;
618 }
619 throw new InterruptedException();
620 }
621
622 /**
623 * Releases in exclusive mode. Implemented by unblocking one or
624 * more threads if {@link #tryRelease} returns true.
625 * This method can be used to implement method {@link Lock#unlock}.
626 *
627 * @param arg the release argument. This value is conveyed to
628 * {@link #tryRelease} but is otherwise uninterpreted and
629 * can represent anything you like.
630 * @return the value returned from {@link #tryRelease}
631 */
632 public final boolean release(long arg) {
633 if (tryRelease(arg)) {
634 signalNext(head);
635 return true;
636 }
637 return false;
638 }
639
640 /**
641 * Acquires in shared mode, ignoring interrupts. Implemented by
642 * first invoking at least once {@link #tryAcquireShared},
643 * returning on success. Otherwise the thread is queued, possibly
644 * repeatedly blocking and unblocking, invoking {@link
645 * #tryAcquireShared} until success.
646 *
647 * @param arg the acquire argument. This value is conveyed to
648 * {@link #tryAcquireShared} but is otherwise uninterpreted
649 * and can represent anything you like.
650 */
651 public final void acquireShared(long arg) {
652 if (tryAcquireShared(arg) < 0)
653 acquire(null, arg, true, false, false, 0L);
654 }
655
656 /**
657 * Acquires in shared mode, aborting if interrupted. Implemented
658 * by first checking interrupt status, then invoking at least once
659 * {@link #tryAcquireShared}, returning on success. Otherwise the
660 * thread is queued, possibly repeatedly blocking and unblocking,
661 * invoking {@link #tryAcquireShared} until success or the thread
662 * is interrupted.
663 * @param arg the acquire argument.
664 * This value is conveyed to {@link #tryAcquireShared} but is
665 * otherwise uninterpreted and can represent anything
666 * you like.
667 * @throws InterruptedException if the current thread is interrupted
668 */
669 public final void acquireSharedInterruptibly(long arg)
670 throws InterruptedException {
671 if (Thread.interrupted() ||
672 (tryAcquireShared(arg) < 0 &&
673 acquire(null, arg, true, true, false, 0L) < 0))
674 throw new InterruptedException();
675 }
676
677 /**
678 * Attempts to acquire in shared mode, aborting if interrupted, and
679 * failing if the given timeout elapses. Implemented by first
680 * checking interrupt status, then invoking at least once {@link
681 * #tryAcquireShared}, returning on success. Otherwise, the
682 * thread is queued, possibly repeatedly blocking and unblocking,
683 * invoking {@link #tryAcquireShared} until success or the thread
684 * is interrupted or the timeout elapses.
685 *
686 * @param arg the acquire argument. This value is conveyed to
687 * {@link #tryAcquireShared} but is otherwise uninterpreted
688 * and can represent anything you like.
689 * @param nanosTimeout the maximum number of nanoseconds to wait
690 * @return {@code true} if acquired; {@code false} if timed out
691 * @throws InterruptedException if the current thread is interrupted
692 */
693 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
694 throws InterruptedException {
695 if (!Thread.interrupted()) {
696 if (tryAcquireShared(arg) >= 0)
697 return true;
698 if (nanosTimeout <= 0L)
699 return false;
700 int stat = acquire(null, arg, true, true, true,
701 System.nanoTime() + nanosTimeout);
702 if (stat > 0)
703 return true;
704 if (stat == 0)
705 return false;
706 }
707 throw new InterruptedException();
708 }
709
710 /**
711 * Releases in shared mode. Implemented by unblocking one or more
712 * threads if {@link #tryReleaseShared} returns true.
713 *
714 * @param arg the release argument. This value is conveyed to
715 * {@link #tryReleaseShared} but is otherwise uninterpreted
716 * and can represent anything you like.
717 * @return the value returned from {@link #tryReleaseShared}
718 */
719 public final boolean releaseShared(long arg) {
720 if (tryReleaseShared(arg)) {
721 signalNext(head);
722 return true;
723 }
724 return false;
725 }
726
727 // Queue inspection methods
728
729 /**
730 * Queries whether any threads are waiting to acquire. Note that
731 * because cancellations due to interrupts and timeouts may occur
732 * at any time, a {@code true} return does not guarantee that any
733 * other thread will ever acquire.
734 *
735 * @return {@code true} if there may be other threads waiting to acquire
736 */
737 public final boolean hasQueuedThreads() {
738 for (Node p = tail, h = head; p != h && p != null; p = p.prev)
739 if (p.status >= 0)
740 return true;
741 return false;
742 }
743
744 /**
745 * Queries whether any threads have ever contended to acquire this
746 * synchronizer; that is, if an acquire method has ever blocked.
747 *
748 * <p>In this implementation, this operation returns in
749 * constant time.
750 *
751 * @return {@code true} if there has ever been contention
752 */
753 public final boolean hasContended() {
754 return head != null;
755 }
756
757 /**
758 * Returns the first (longest-waiting) thread in the queue, or
759 * {@code null} if no threads are currently queued.
760 *
761 * <p>In this implementation, this operation normally returns in
762 * constant time, but may iterate upon contention if other threads are
763 * concurrently modifying the queue.
764 *
765 * @return the first (longest-waiting) thread in the queue, or
766 * {@code null} if no threads are currently queued
767 */
768 public final Thread getFirstQueuedThread() {
769 Thread first = null, w; Node h, s;
770 if ((h = head) != null && ((s = h.next) == null ||
771 (first = s.waiter) == null ||
772 s.prev == null)) {
773 // traverse from tail on stale reads
774 for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
775 if ((w = p.waiter) != null)
776 first = w;
777 }
778 return first;
779 }
780
781 /**
782 * Returns true if the given thread is currently queued.
783 *
784 * <p>This implementation traverses the queue to determine
785 * presence of the given thread.
786 *
787 * @param thread the thread
788 * @return {@code true} if the given thread is on the queue
789 * @throws NullPointerException if the thread is null
790 */
791 public final boolean isQueued(Thread thread) {
792 if (thread == null)
793 throw new NullPointerException();
794 for (Node p = tail; p != null; p = p.prev)
795 if (p.waiter == thread)
796 return true;
797 return false;
798 }
799
800 /**
801 * Returns {@code true} if the apparent first queued thread, if one
802 * exists, is waiting in exclusive mode. If this method returns
803 * {@code true}, and the current thread is attempting to acquire in
804 * shared mode (that is, this method is invoked from {@link
805 * #tryAcquireShared}) then it is guaranteed that the current thread
806 * is not the first queued thread. Used only as a heuristic in
807 * ReentrantReadWriteLock.
808 */
809 final boolean apparentlyFirstQueuedIsExclusive() {
810 Node h, s;
811 return (h = head) != null && (s = h.next) != null &&
812 !(s instanceof SharedNode) && s.waiter != null;
813 }
814
815 /**
816 * Queries whether any threads have been waiting to acquire longer
817 * than the current thread.
818 *
819 * <p>An invocation of this method is equivalent to (but may be
820 * more efficient than):
821 * <pre> {@code
822 * getFirstQueuedThread() != Thread.currentThread()
823 * && hasQueuedThreads()}</pre>
824 *
825 * <p>Note that because cancellations due to interrupts and
826 * timeouts may occur at any time, a {@code true} return does not
827 * guarantee that some other thread will acquire before the current
828 * thread. Likewise, it is possible for another thread to win a
829 * race to enqueue after this method has returned {@code false},
830 * due to the queue being empty.
831 *
832 * <p>This method is designed to be used by a fair synchronizer to
833 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
834 * Such a synchronizer's {@link #tryAcquire} method should return
835 * {@code false}, and its {@link #tryAcquireShared} method should
836 * return a negative value, if this method returns {@code true}
837 * (unless this is a reentrant acquire). For example, the {@code
838 * tryAcquire} method for a fair, reentrant, exclusive mode
839 * synchronizer might look like this:
840 *
841 * <pre> {@code
842 * protected boolean tryAcquire(long arg) {
843 * if (isHeldExclusively()) {
844 * // A reentrant acquire; increment hold count
845 * return true;
846 * } else if (hasQueuedPredecessors()) {
847 * return false;
848 * } else {
849 * // try to acquire normally
850 * }
851 * }}</pre>
852 *
853 * @return {@code true} if there is a queued thread preceding the
854 * current thread, and {@code false} if the current thread
855 * is at the head of the queue or the queue is empty
856 * @since 1.7
857 */
858 public final boolean hasQueuedPredecessors() {
859 Thread first = null; Node h, s;
860 if ((h = head) != null && ((s = h.next) == null ||
861 (first = s.waiter) == null ||
862 s.prev == null))
863 first = getFirstQueuedThread(); // retry via getFirstQueuedThread
864 return first != null && first != Thread.currentThread();
865 }
866
867 // Instrumentation and monitoring methods
868
869 /**
870 * Returns an estimate of the number of threads waiting to
871 * acquire. The value is only an estimate because the number of
872 * threads may change dynamically while this method traverses
873 * internal data structures. This method is designed for use in
874 * monitoring system state, not for synchronization control.
875 *
876 * @return the estimated number of threads waiting to acquire
877 */
878 public final int getQueueLength() {
879 int n = 0;
880 for (Node p = tail; p != null; p = p.prev) {
881 if (p.waiter != null)
882 ++n;
883 }
884 return n;
885 }
886
887 /**
888 * Returns a collection containing threads that may be waiting to
889 * acquire. Because the actual set of threads may change
890 * dynamically while constructing this result, the returned
891 * collection is only a best-effort estimate. The elements of the
892 * returned collection are in no particular order. This method is
893 * designed to facilitate construction of subclasses that provide
894 * more extensive monitoring facilities.
895 *
896 * @return the collection of threads
897 */
898 public final Collection<Thread> getQueuedThreads() {
899 ArrayList<Thread> list = new ArrayList<>();
900 for (Node p = tail; p != null; p = p.prev) {
901 Thread t = p.waiter;
902 if (t != null)
903 list.add(t);
904 }
905 return list;
906 }
907
908 /**
909 * Returns a collection containing threads that may be waiting to
910 * acquire in exclusive mode. This has the same properties
911 * as {@link #getQueuedThreads} except that it only returns
912 * those threads waiting due to an exclusive acquire.
913 *
914 * @return the collection of threads
915 */
916 public final Collection<Thread> getExclusiveQueuedThreads() {
917 ArrayList<Thread> list = new ArrayList<>();
918 for (Node p = tail; p != null; p = p.prev) {
919 if (!(p instanceof SharedNode)) {
920 Thread t = p.waiter;
921 if (t != null)
922 list.add(t);
923 }
924 }
925 return list;
926 }
927
928 /**
929 * Returns a collection containing threads that may be waiting to
930 * acquire in shared mode. This has the same properties
931 * as {@link #getQueuedThreads} except that it only returns
932 * those threads waiting due to a shared acquire.
933 *
934 * @return the collection of threads
935 */
936 public final Collection<Thread> getSharedQueuedThreads() {
937 ArrayList<Thread> list = new ArrayList<>();
938 for (Node p = tail; p != null; p = p.prev) {
939 if (p instanceof SharedNode) {
940 Thread t = p.waiter;
941 if (t != null)
942 list.add(t);
943 }
944 }
945 return list;
946 }
947
948 /**
949 * Returns a string identifying this synchronizer, as well as its state.
950 * The state, in brackets, includes the String {@code "State ="}
951 * followed by the current value of {@link #getState}, and either
952 * {@code "nonempty"} or {@code "empty"} depending on whether the
953 * queue is empty.
954 *
955 * @return a string identifying this synchronizer, as well as its state
956 */
957 public String toString() {
958 return super.toString()
959 + "[State = " + getState() + ", "
960 + (hasQueuedThreads() ? "non" : "") + "empty queue]";
961 }
962
963 // Instrumentation methods for conditions
964
965 /**
966 * Queries whether the given ConditionObject
967 * uses this synchronizer as its lock.
968 *
969 * @param condition the condition
970 * @return {@code true} if owned
971 * @throws NullPointerException if the condition is null
972 */
973 public final boolean owns(ConditionObject condition) {
974 return condition.isOwnedBy(this);
975 }
976
977 /**
978 * Queries whether any threads are waiting on the given condition
979 * associated with this synchronizer. Note that because timeouts
980 * and interrupts may occur at any time, a {@code true} return
981 * does not guarantee that a future {@code signal} will awaken
982 * any threads. This method is designed primarily for use in
1036 */
1037 public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
1038 if (!owns(condition))
1039 throw new IllegalArgumentException("Not owner");
1040 return condition.getWaitingThreads();
1041 }
1042
1043 /**
1044 * Condition implementation for a {@link AbstractQueuedLongSynchronizer}
1045 * serving as the basis of a {@link Lock} implementation.
1046 *
1047 * <p>Method documentation for this class describes mechanics,
1048 * not behavioral specifications from the point of view of Lock
1049 * and Condition users. Exported versions of this class will in
1050 * general need to be accompanied by documentation describing
1051 * condition semantics that rely on those of the associated
1052 * {@code AbstractQueuedLongSynchronizer}.
1053 *
1054 * <p>This class is Serializable, but all fields are transient,
1055 * so deserialized conditions have no waiters.
1056 */
1057 public class ConditionObject implements Condition, java.io.Serializable {
1058 private static final long serialVersionUID = 1173984872572414699L;
1059 /** First node of condition queue. */
1060 private transient ConditionNode firstWaiter;
1061 /** Last node of condition queue. */
1062 private transient ConditionNode lastWaiter;
1063
1064 /**
1065 * Creates a new {@code ConditionObject} instance.
1066 */
1067 public ConditionObject() { }
1068
1069 // Signalling methods
1070
1071 /**
1072 * Removes and transfers one or all waiters to sync queue.
1073 */
1074 private void doSignal(ConditionNode first, boolean all) {
1075 while (first != null) {
1076 ConditionNode next = first.nextWaiter;
1077 if ((firstWaiter = next) == null)
1078 lastWaiter = null;
1079 if ((first.getAndUnsetStatus(COND) & COND) != 0) {
1080 enqueue(first);
1081 if (!all)
1082 break;
1083 }
1084 first = next;
1085 }
1086 }
1087
1088 /**
1089 * Moves the longest-waiting thread, if one exists, from the
1090 * wait queue for this condition to the wait queue for the
1091 * owning lock.
1092 *
1093 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1094 * returns {@code false}
1095 */
1096 public final void signal() {
1097 ConditionNode first = firstWaiter;
1098 if (!isHeldExclusively())
1099 throw new IllegalMonitorStateException();
1100 if (first != null)
1101 doSignal(first, false);
1102 }
1103
1104 /**
1105 * Moves all threads from the wait queue for this condition to
1106 * the wait queue for the owning lock.
1107 *
1108 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1109 * returns {@code false}
1110 */
1111 public final void signalAll() {
1112 ConditionNode first = firstWaiter;
1113 if (!isHeldExclusively())
1114 throw new IllegalMonitorStateException();
1115 if (first != null)
1116 doSignal(first, true);
1117 }
1118
1119 // Waiting methods
1120
1121 /**
1122 * Adds node to condition list and releases lock.
1123 *
1124 * @param node the node
1125 * @return savedState to reacquire after wait
1126 */
1127 private long enableWait(ConditionNode node) {
1128 if (isHeldExclusively()) {
1129 node.waiter = Thread.currentThread();
1130 node.setStatusRelaxed(COND | WAITING);
1131 ConditionNode last = lastWaiter;
1132 if (last == null)
1133 firstWaiter = node;
1134 else
1135 last.nextWaiter = node;
1136 lastWaiter = node;
1137 long savedState = getState();
1138 if (release(savedState))
1139 return savedState;
1140 }
1141 node.status = CANCELLED; // lock not held or inconsistent
1142 throw new IllegalMonitorStateException();
1143 }
1144
1145 /**
1146 * Returns true if a node that was initially placed on a condition
1147 * queue is now ready to reacquire on sync queue.
1148 * @param node the node
1149 * @return true if is reacquiring
1150 */
1151 private boolean canReacquire(ConditionNode node) {
1152 // check links, not status to avoid enqueue race
1153 return node != null && node.prev != null && isEnqueued(node);
1154 }
1155
1156 /**
1157 * Unlinks the given node and other non-waiting nodes from
1158 * condition queue unless already unlinked.
1159 */
1160 private void unlinkCancelledWaiters(ConditionNode node) {
1161 if (node == null || node.nextWaiter != null || node == lastWaiter) {
1162 ConditionNode w = firstWaiter, trail = null;
1163 while (w != null) {
1164 ConditionNode next = w.nextWaiter;
1165 if ((w.status & COND) == 0) {
1166 w.nextWaiter = null;
1167 if (trail == null)
1168 firstWaiter = next;
1169 else
1170 trail.nextWaiter = next;
1171 if (next == null)
1172 lastWaiter = trail;
1173 } else
1174 trail = w;
1175 w = next;
1176 }
1177 }
1178 }
1179
1180 /**
1181 * Implements uninterruptible condition wait.
1182 * <ol>
1183 * <li>Save lock state returned by {@link #getState}.
1184 * <li>Invoke {@link #release} with saved state as argument,
1185 * throwing IllegalMonitorStateException if it fails.
1186 * <li>Block until signalled.
1187 * <li>Reacquire by invoking specialized version of
1188 * {@link #acquire} with saved state as argument.
1189 * </ol>
1190 */
1191 public final void awaitUninterruptibly() {
1192 ConditionNode node = new ConditionNode();
1193 long savedState = enableWait(node);
1194 LockSupport.setCurrentBlocker(this); // for back-compatibility
1195 boolean interrupted = false;
1196 while (!canReacquire(node)) {
1197 if (Thread.interrupted())
1198 interrupted = true;
1199 else if ((node.status & COND) != 0) {
1200 try {
1201 ForkJoinPool.managedBlock(node);
1202 } catch (InterruptedException ie) {
1203 interrupted = true;
1204 }
1205 } else
1206 Thread.onSpinWait(); // awoke while enqueuing
1207 }
1208 LockSupport.setCurrentBlocker(null);
1209 node.clearStatus();
1210 acquire(node, savedState, false, false, false, 0L);
1211 if (interrupted)
1212 Thread.currentThread().interrupt();
1213 }
1214
1215 /**
1216 * Implements interruptible condition wait.
1217 * <ol>
1218 * <li>If current thread is interrupted, throw InterruptedException.
1219 * <li>Save lock state returned by {@link #getState}.
1220 * <li>Invoke {@link #release} with saved state as argument,
1221 * throwing IllegalMonitorStateException if it fails.
1222 * <li>Block until signalled or interrupted.
1223 * <li>Reacquire by invoking specialized version of
1224 * {@link #acquire} with saved state as argument.
1225 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1226 * </ol>
1227 */
1228 public final void await() throws InterruptedException {
1229 if (Thread.interrupted())
1230 throw new InterruptedException();
1231 ConditionNode node = new ConditionNode();
1232 long savedState = enableWait(node);
1233 LockSupport.setCurrentBlocker(this); // for back-compatibility
1234 boolean interrupted = false, cancelled = false;
1235 while (!canReacquire(node)) {
1236 if (interrupted |= Thread.interrupted()) {
1237 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1238 break; // else interrupted after signal
1239 } else if ((node.status & COND) != 0) {
1240 try {
1241 ForkJoinPool.managedBlock(node);
1242 } catch (InterruptedException ie) {
1243 interrupted = true;
1244 }
1245 } else
1246 Thread.onSpinWait(); // awoke while enqueuing
1247 }
1248 LockSupport.setCurrentBlocker(null);
1249 node.clearStatus();
1250 acquire(node, savedState, false, false, false, 0L);
1251 if (interrupted) {
1252 if (cancelled) {
1253 unlinkCancelledWaiters(node);
1254 throw new InterruptedException();
1255 }
1256 Thread.currentThread().interrupt();
1257 }
1258 }
1259
1260 /**
1261 * Implements timed condition wait.
1262 * <ol>
1263 * <li>If current thread is interrupted, throw InterruptedException.
1264 * <li>Save lock state returned by {@link #getState}.
1265 * <li>Invoke {@link #release} with saved state as argument,
1266 * throwing IllegalMonitorStateException if it fails.
1267 * <li>Block until signalled, interrupted, or timed out.
1268 * <li>Reacquire by invoking specialized version of
1269 * {@link #acquire} with saved state as argument.
1270 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1271 * </ol>
1272 */
1273 public final long awaitNanos(long nanosTimeout)
1274 throws InterruptedException {
1275 if (Thread.interrupted())
1276 throw new InterruptedException();
1277 ConditionNode node = new ConditionNode();
1278 long savedState = enableWait(node);
1279 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
1280 long deadline = System.nanoTime() + nanos;
1281 boolean cancelled = false, interrupted = false;
1282 while (!canReacquire(node)) {
1283 if ((interrupted |= Thread.interrupted()) ||
1284 (nanos = deadline - System.nanoTime()) <= 0L) {
1285 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1286 break;
1287 } else
1288 LockSupport.parkNanos(this, nanos);
1289 }
1290 node.clearStatus();
1291 acquire(node, savedState, false, false, false, 0L);
1292 if (cancelled) {
1293 unlinkCancelledWaiters(node);
1294 if (interrupted)
1295 throw new InterruptedException();
1296 } else if (interrupted)
1297 Thread.currentThread().interrupt();
1298 long remaining = deadline - System.nanoTime(); // avoid overflow
1299 return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
1300 }
1301
1302 /**
1303 * Implements absolute timed condition wait.
1304 * <ol>
1305 * <li>If current thread is interrupted, throw InterruptedException.
1306 * <li>Save lock state returned by {@link #getState}.
1307 * <li>Invoke {@link #release} with saved state as argument,
1308 * throwing IllegalMonitorStateException if it fails.
1309 * <li>Block until signalled, interrupted, or timed out.
1310 * <li>Reacquire by invoking specialized version of
1311 * {@link #acquire} with saved state as argument.
1312 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1313 * <li>If timed out while blocked in step 4, return false, else true.
1314 * </ol>
1315 */
1316 public final boolean awaitUntil(Date deadline)
1317 throws InterruptedException {
1318 long abstime = deadline.getTime();
1319 if (Thread.interrupted())
1320 throw new InterruptedException();
1321 ConditionNode node = new ConditionNode();
1322 long savedState = enableWait(node);
1323 boolean cancelled = false, interrupted = false;
1324 while (!canReacquire(node)) {
1325 if ((interrupted |= Thread.interrupted()) ||
1326 System.currentTimeMillis() >= abstime) {
1327 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1328 break;
1329 } else
1330 LockSupport.parkUntil(this, abstime);
1331 }
1332 node.clearStatus();
1333 acquire(node, savedState, false, false, false, 0L);
1334 if (cancelled) {
1335 unlinkCancelledWaiters(node);
1336 if (interrupted)
1337 throw new InterruptedException();
1338 } else if (interrupted)
1339 Thread.currentThread().interrupt();
1340 return !cancelled;
1341 }
1342
1343 /**
1344 * Implements timed condition wait.
1345 * <ol>
1346 * <li>If current thread is interrupted, throw InterruptedException.
1347 * <li>Save lock state returned by {@link #getState}.
1348 * <li>Invoke {@link #release} with saved state as argument,
1349 * throwing IllegalMonitorStateException if it fails.
1350 * <li>Block until signalled, interrupted, or timed out.
1351 * <li>Reacquire by invoking specialized version of
1352 * {@link #acquire} with saved state as argument.
1353 * <li>If interrupted while blocked in step 4, throw InterruptedException.
1354 * <li>If timed out while blocked in step 4, return false, else true.
1355 * </ol>
1356 */
1357 public final boolean await(long time, TimeUnit unit)
1358 throws InterruptedException {
1359 long nanosTimeout = unit.toNanos(time);
1360 if (Thread.interrupted())
1361 throw new InterruptedException();
1362 ConditionNode node = new ConditionNode();
1363 long savedState = enableWait(node);
1364 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
1365 long deadline = System.nanoTime() + nanos;
1366 boolean cancelled = false, interrupted = false;
1367 while (!canReacquire(node)) {
1368 if ((interrupted |= Thread.interrupted()) ||
1369 (nanos = deadline - System.nanoTime()) <= 0L) {
1370 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1371 break;
1372 } else
1373 LockSupport.parkNanos(this, nanos);
1374 }
1375 node.clearStatus();
1376 acquire(node, savedState, false, false, false, 0L);
1377 if (cancelled) {
1378 unlinkCancelledWaiters(node);
1379 if (interrupted)
1380 throw new InterruptedException();
1381 } else if (interrupted)
1382 Thread.currentThread().interrupt();
1383 return !cancelled;
1384 }
1385
1386 // support for instrumentation
1387
1388 /**
1389 * Returns true if this condition was created by the given
1390 * synchronization object.
1391 *
1392 * @return {@code true} if owned
1393 */
1394 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) {
1395 return sync == AbstractQueuedLongSynchronizer.this;
1396 }
1397
1398 /**
1399 * Queries whether any threads are waiting on this condition.
1400 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}.
1401 *
1402 * @return {@code true} if there are any waiting threads
1403 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1404 * returns {@code false}
1405 */
1406 protected final boolean hasWaiters() {
1407 if (!isHeldExclusively())
1408 throw new IllegalMonitorStateException();
1409 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
1410 if ((w.status & COND) != 0)
1411 return true;
1412 }
1413 return false;
1414 }
1415
1416 /**
1417 * Returns an estimate of the number of threads waiting on
1418 * this condition.
1419 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}.
1420 *
1421 * @return the estimated number of waiting threads
1422 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1423 * returns {@code false}
1424 */
1425 protected final int getWaitQueueLength() {
1426 if (!isHeldExclusively())
1427 throw new IllegalMonitorStateException();
1428 int n = 0;
1429 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
1430 if ((w.status & COND) != 0)
1431 ++n;
1432 }
1433 return n;
1434 }
1435
1436 /**
1437 * Returns a collection containing those threads that may be
1438 * waiting on this Condition.
1439 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}.
1440 *
1441 * @return the collection of threads
1442 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1443 * returns {@code false}
1444 */
1445 protected final Collection<Thread> getWaitingThreads() {
1446 if (!isHeldExclusively())
1447 throw new IllegalMonitorStateException();
1448 ArrayList<Thread> list = new ArrayList<>();
1449 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
1450 if ((w.status & COND) != 0) {
1451 Thread t = w.waiter;
1452 if (t != null)
1453 list.add(t);
1454 }
1455 }
1456 return list;
1457 }
1458 }
1459
1460 // Unsafe
1461 private static final Unsafe U = Unsafe.getUnsafe();
1462 private static final long STATE
1463 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state");
1464 private static final long HEAD
1465 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head");
1466 private static final long TAIL
1467 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail");
1468
1469 static {
1470 Class<?> ensureLoaded = LockSupport.class;
1471 }
1472 }
|