Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
+++ new/src/share/classes/java/util/concurrent/locks/AbstractQueuedLongSynchronizer.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation. Oracle designates this
7 7 * particular file as subject to the "Classpath" exception as provided
8 8 * by Oracle in the LICENSE file that accompanied this code.
9 9 *
10 10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 13 * version 2 for more details (a copy is included in the LICENSE file that
14 14 * accompanied this code).
15 15 *
16 16 * You should have received a copy of the GNU General Public License version
17 17 * 2 along with this work; if not, write to the Free Software Foundation,
18 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 19 *
20 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 21 * or visit www.oracle.com if you need additional information or have any
22 22 * questions.
23 23 */
24 24
25 25 /*
26 26 * This file is available under and governed by the GNU General Public
27 27 * License version 2 only, as published by the Free Software Foundation.
28 28 * However, the following notice accompanied the original version of this
29 29 * file:
30 30 *
31 31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 32 * Expert Group and released to the public domain, as explained at
33 33 * http://creativecommons.org/licenses/publicdomain
34 34 */
35 35
36 36 package java.util.concurrent.locks;
37 37 import java.util.*;
38 38 import java.util.concurrent.*;
39 39 import java.util.concurrent.atomic.*;
40 40 import sun.misc.Unsafe;
41 41
42 42 /**
43 43 * A version of {@link AbstractQueuedSynchronizer} in
44 44 * which synchronization state is maintained as a <tt>long</tt>.
45 45 * This class has exactly the same structure, properties, and methods
46 46 * as <tt>AbstractQueuedSynchronizer</tt> with the exception
47 47 * that all state-related parameters and results are defined
48 48 * as <tt>long</tt> rather than <tt>int</tt>. This class
49 49 * may be useful when creating synchronizers such as
50 50 * multilevel locks and barriers that require
51 51 * 64 bits of state.
52 52 *
53 53 * <p>See {@link AbstractQueuedSynchronizer} for usage
54 54 * notes and examples.
55 55 *
56 56 * @since 1.6
57 57 * @author Doug Lea
58 58 */
59 59 public abstract class AbstractQueuedLongSynchronizer
60 60 extends AbstractOwnableSynchronizer
61 61 implements java.io.Serializable {
62 62
63 63 private static final long serialVersionUID = 7373984972572414692L;
64 64
65 65 /*
66 66 To keep sources in sync, the remainder of this source file is
67 67 exactly cloned from AbstractQueuedSynchronizer, replacing class
68 68 name and changing ints related with sync state to longs. Please
69 69 keep it that way.
70 70 */
71 71
72 72 /**
73 73 * Creates a new <tt>AbstractQueuedLongSynchronizer</tt> instance
74 74 * with initial synchronization state of zero.
75 75 */
76 76 protected AbstractQueuedLongSynchronizer() { }
77 77
78 78 /**
79 79 * Wait queue node class.
80 80 *
81 81 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
82 82 * Hagersten) lock queue. CLH locks are normally used for
83 83 * spinlocks. We instead use them for blocking synchronizers, but
84 84 * use the same basic tactic of holding some of the control
85 85 * information about a thread in the predecessor of its node. A
86 86 * "status" field in each node keeps track of whether a thread
87 87 * should block. A node is signalled when its predecessor
88 88 * releases. Each node of the queue otherwise serves as a
89 89 * specific-notification-style monitor holding a single waiting
90 90 * thread. The status field does NOT control whether threads are
91 91 * granted locks etc though. A thread may try to acquire if it is
92 92 * first in the queue. But being first does not guarantee success;
93 93 * it only gives the right to contend. So the currently released
94 94 * contender thread may need to rewait.
95 95 *
96 96 * <p>To enqueue into a CLH lock, you atomically splice it in as new
97 97 * tail. To dequeue, you just set the head field.
98 98 * <pre>
99 99 * +------+ prev +-----+ +-----+
100 100 * head | | <---- | | <---- | | tail
101 101 * +------+ +-----+ +-----+
102 102 * </pre>
103 103 *
104 104 * <p>Insertion into a CLH queue requires only a single atomic
105 105 * operation on "tail", so there is a simple atomic point of
106 106 * demarcation from unqueued to queued. Similarly, dequeing
107 107 * involves only updating the "head". However, it takes a bit
108 108 * more work for nodes to determine who their successors are,
109 109 * in part to deal with possible cancellation due to timeouts
110 110 * and interrupts.
111 111 *
112 112 * <p>The "prev" links (not used in original CLH locks), are mainly
113 113 * needed to handle cancellation. If a node is cancelled, its
114 114 * successor is (normally) relinked to a non-cancelled
115 115 * predecessor. For explanation of similar mechanics in the case
116 116 * of spin locks, see the papers by Scott and Scherer at
117 117 * http://www.cs.rochester.edu/u/scott/synchronization/
118 118 *
119 119 * <p>We also use "next" links to implement blocking mechanics.
120 120 * The thread id for each node is kept in its own node, so a
121 121 * predecessor signals the next node to wake up by traversing
122 122 * next link to determine which thread it is. Determination of
123 123 * successor must avoid races with newly queued nodes to set
124 124 * the "next" fields of their predecessors. This is solved
125 125 * when necessary by checking backwards from the atomically
126 126 * updated "tail" when a node's successor appears to be null.
127 127 * (Or, said differently, the next-links are an optimization
128 128 * so that we don't usually need a backward scan.)
129 129 *
130 130 * <p>Cancellation introduces some conservatism to the basic
131 131 * algorithms. Since we must poll for cancellation of other
132 132 * nodes, we can miss noticing whether a cancelled node is
133 133 * ahead or behind us. This is dealt with by always unparking
134 134 * successors upon cancellation, allowing them to stabilize on
135 135 * a new predecessor, unless we can identify an uncancelled
136 136 * predecessor who will carry this responsibility.
137 137 *
138 138 * <p>CLH queues need a dummy header node to get started. But
139 139 * we don't create them on construction, because it would be wasted
140 140 * effort if there is never contention. Instead, the node
141 141 * is constructed and head and tail pointers are set upon first
142 142 * contention.
143 143 *
144 144 * <p>Threads waiting on Conditions use the same nodes, but
145 145 * use an additional link. Conditions only need to link nodes
146 146 * in simple (non-concurrent) linked queues because they are
147 147 * only accessed when exclusively held. Upon await, a node is
148 148 * inserted into a condition queue. Upon signal, the node is
149 149 * transferred to the main queue. A special value of status
150 150 * field is used to mark which queue a node is on.
151 151 *
152 152 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
153 153 * Scherer and Michael Scott, along with members of JSR-166
154 154 * expert group, for helpful ideas, discussions, and critiques
155 155 * on the design of this class.
156 156 */
157 157 static final class Node {
158 158 /** Marker to indicate a node is waiting in shared mode */
159 159 static final Node SHARED = new Node();
160 160 /** Marker to indicate a node is waiting in exclusive mode */
161 161 static final Node EXCLUSIVE = null;
162 162
163 163 /** waitStatus value to indicate thread has cancelled */
164 164 static final int CANCELLED = 1;
165 165 /** waitStatus value to indicate successor's thread needs unparking */
166 166 static final int SIGNAL = -1;
167 167 /** waitStatus value to indicate thread is waiting on condition */
168 168 static final int CONDITION = -2;
169 169 /**
170 170 * waitStatus value to indicate the next acquireShared should
171 171 * unconditionally propagate
172 172 */
173 173 static final int PROPAGATE = -3;
174 174
175 175 /**
176 176 * Status field, taking on only the values:
177 177 * SIGNAL: The successor of this node is (or will soon be)
178 178 * blocked (via park), so the current node must
179 179 * unpark its successor when it releases or
180 180 * cancels. To avoid races, acquire methods must
181 181 * first indicate they need a signal,
182 182 * then retry the atomic acquire, and then,
183 183 * on failure, block.
184 184 * CANCELLED: This node is cancelled due to timeout or interrupt.
185 185 * Nodes never leave this state. In particular,
186 186 * a thread with cancelled node never again blocks.
187 187 * CONDITION: This node is currently on a condition queue.
188 188 * It will not be used as a sync queue node
189 189 * until transferred, at which time the status
190 190 * will be set to 0. (Use of this value here has
191 191 * nothing to do with the other uses of the
192 192 * field, but simplifies mechanics.)
193 193 * PROPAGATE: A releaseShared should be propagated to other
194 194 * nodes. This is set (for head node only) in
195 195 * doReleaseShared to ensure propagation
196 196 * continues, even if other operations have
197 197 * since intervened.
198 198 * 0: None of the above
199 199 *
200 200 * The values are arranged numerically to simplify use.
201 201 * Non-negative values mean that a node doesn't need to
202 202 * signal. So, most code doesn't need to check for particular
203 203 * values, just for sign.
204 204 *
205 205 * The field is initialized to 0 for normal sync nodes, and
206 206 * CONDITION for condition nodes. It is modified using CAS
207 207 * (or when possible, unconditional volatile writes).
208 208 */
209 209 volatile int waitStatus;
210 210
211 211 /**
212 212 * Link to predecessor node that current node/thread relies on
213 213 * for checking waitStatus. Assigned during enqueing, and nulled
214 214 * out (for sake of GC) only upon dequeuing. Also, upon
215 215 * cancellation of a predecessor, we short-circuit while
216 216 * finding a non-cancelled one, which will always exist
217 217 * because the head node is never cancelled: A node becomes
218 218 * head only as a result of successful acquire. A
219 219 * cancelled thread never succeeds in acquiring, and a thread only
220 220 * cancels itself, not any other node.
221 221 */
222 222 volatile Node prev;
223 223
224 224 /**
225 225 * Link to the successor node that the current node/thread
226 226 * unparks upon release. Assigned during enqueuing, adjusted
227 227 * when bypassing cancelled predecessors, and nulled out (for
228 228 * sake of GC) when dequeued. The enq operation does not
229 229 * assign next field of a predecessor until after attachment,
230 230 * so seeing a null next field does not necessarily mean that
231 231 * node is at end of queue. However, if a next field appears
232 232 * to be null, we can scan prev's from the tail to
233 233 * double-check. The next field of cancelled nodes is set to
234 234 * point to the node itself instead of null, to make life
235 235 * easier for isOnSyncQueue.
236 236 */
237 237 volatile Node next;
238 238
239 239 /**
240 240 * The thread that enqueued this node. Initialized on
241 241 * construction and nulled out after use.
242 242 */
243 243 volatile Thread thread;
244 244
245 245 /**
246 246 * Link to next node waiting on condition, or the special
247 247 * value SHARED. Because condition queues are accessed only
248 248 * when holding in exclusive mode, we just need a simple
249 249 * linked queue to hold nodes while they are waiting on
250 250 * conditions. They are then transferred to the queue to
251 251 * re-acquire. And because conditions can only be exclusive,
252 252 * we save a field by using special value to indicate shared
253 253 * mode.
254 254 */
255 255 Node nextWaiter;
256 256
257 257 /**
258 258 * Returns true if node is waiting in shared mode
259 259 */
260 260 final boolean isShared() {
261 261 return nextWaiter == SHARED;
262 262 }
263 263
264 264 /**
265 265 * Returns previous node, or throws NullPointerException if null.
266 266 * Use when predecessor cannot be null. The null check could
267 267 * be elided, but is present to help the VM.
268 268 *
269 269 * @return the predecessor of this node
270 270 */
271 271 final Node predecessor() throws NullPointerException {
272 272 Node p = prev;
273 273 if (p == null)
274 274 throw new NullPointerException();
275 275 else
276 276 return p;
277 277 }
278 278
279 279 Node() { // Used to establish initial head or SHARED marker
280 280 }
281 281
282 282 Node(Thread thread, Node mode) { // Used by addWaiter
283 283 this.nextWaiter = mode;
284 284 this.thread = thread;
285 285 }
286 286
287 287 Node(Thread thread, int waitStatus) { // Used by Condition
288 288 this.waitStatus = waitStatus;
289 289 this.thread = thread;
290 290 }
291 291 }
292 292
293 293 /**
294 294 * Head of the wait queue, lazily initialized. Except for
295 295 * initialization, it is modified only via method setHead. Note:
296 296 * If head exists, its waitStatus is guaranteed not to be
297 297 * CANCELLED.
298 298 */
299 299 private transient volatile Node head;
300 300
301 301 /**
302 302 * Tail of the wait queue, lazily initialized. Modified only via
303 303 * method enq to add new wait node.
304 304 */
305 305 private transient volatile Node tail;
306 306
307 307 /**
308 308 * The synchronization state.
309 309 */
310 310 private volatile long state;
311 311
312 312 /**
313 313 * Returns the current value of synchronization state.
314 314 * This operation has memory semantics of a <tt>volatile</tt> read.
315 315 * @return current state value
316 316 */
317 317 protected final long getState() {
318 318 return state;
319 319 }
320 320
321 321 /**
322 322 * Sets the value of synchronization state.
323 323 * This operation has memory semantics of a <tt>volatile</tt> write.
324 324 * @param newState the new state value
325 325 */
326 326 protected final void setState(long newState) {
327 327 state = newState;
328 328 }
329 329
330 330 /**
331 331 * Atomically sets synchronization state to the given updated
332 332 * value if the current state value equals the expected value.
333 333 * This operation has memory semantics of a <tt>volatile</tt> read
334 334 * and write.
335 335 *
336 336 * @param expect the expected value
337 337 * @param update the new value
338 338 * @return true if successful. False return indicates that the actual
339 339 * value was not equal to the expected value.
340 340 */
341 341 protected final boolean compareAndSetState(long expect, long update) {
342 342 // See below for intrinsics setup to support this
343 343 return unsafe.compareAndSwapLong(this, stateOffset, expect, update);
344 344 }
345 345
346 346 // Queuing utilities
347 347
348 348 /**
349 349 * The number of nanoseconds for which it is faster to spin
350 350 * rather than to use timed park. A rough estimate suffices
351 351 * to improve responsiveness with very short timeouts.
352 352 */
353 353 static final long spinForTimeoutThreshold = 1000L;
354 354
355 355 /**
356 356 * Inserts node into queue, initializing if necessary. See picture above.
357 357 * @param node the node to insert
358 358 * @return node's predecessor
359 359 */
360 360 private Node enq(final Node node) {
361 361 for (;;) {
362 362 Node t = tail;
363 363 if (t == null) { // Must initialize
364 364 if (compareAndSetHead(new Node()))
365 365 tail = head;
366 366 } else {
367 367 node.prev = t;
368 368 if (compareAndSetTail(t, node)) {
369 369 t.next = node;
370 370 return t;
371 371 }
372 372 }
373 373 }
374 374 }
375 375
376 376 /**
377 377 * Creates and enqueues node for current thread and given mode.
378 378 *
379 379 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
380 380 * @return the new node
381 381 */
382 382 private Node addWaiter(Node mode) {
383 383 Node node = new Node(Thread.currentThread(), mode);
384 384 // Try the fast path of enq; backup to full enq on failure
385 385 Node pred = tail;
386 386 if (pred != null) {
387 387 node.prev = pred;
388 388 if (compareAndSetTail(pred, node)) {
389 389 pred.next = node;
390 390 return node;
391 391 }
392 392 }
393 393 enq(node);
394 394 return node;
395 395 }
396 396
397 397 /**
398 398 * Sets head of queue to be node, thus dequeuing. Called only by
399 399 * acquire methods. Also nulls out unused fields for sake of GC
400 400 * and to suppress unnecessary signals and traversals.
401 401 *
402 402 * @param node the node
403 403 */
404 404 private void setHead(Node node) {
405 405 head = node;
406 406 node.thread = null;
407 407 node.prev = null;
408 408 }
409 409
410 410 /**
411 411 * Wakes up node's successor, if one exists.
412 412 *
413 413 * @param node the node
414 414 */
415 415 private void unparkSuccessor(Node node) {
416 416 /*
417 417 * If status is negative (i.e., possibly needing signal) try
418 418 * to clear in anticipation of signalling. It is OK if this
419 419 * fails or if status is changed by waiting thread.
420 420 */
421 421 int ws = node.waitStatus;
422 422 if (ws < 0)
423 423 compareAndSetWaitStatus(node, ws, 0);
424 424
425 425 /*
426 426 * Thread to unpark is held in successor, which is normally
427 427 * just the next node. But if cancelled or apparently null,
428 428 * traverse backwards from tail to find the actual
429 429 * non-cancelled successor.
430 430 */
431 431 Node s = node.next;
432 432 if (s == null || s.waitStatus > 0) {
433 433 s = null;
434 434 for (Node t = tail; t != null && t != node; t = t.prev)
435 435 if (t.waitStatus <= 0)
436 436 s = t;
437 437 }
438 438 if (s != null)
439 439 LockSupport.unpark(s.thread);
440 440 }
441 441
442 442 /**
443 443 * Release action for shared mode -- signal successor and ensure
444 444 * propagation. (Note: For exclusive mode, release just amounts
445 445 * to calling unparkSuccessor of head if it needs signal.)
446 446 */
447 447 private void doReleaseShared() {
448 448 /*
449 449 * Ensure that a release propagates, even if there are other
450 450 * in-progress acquires/releases. This proceeds in the usual
451 451 * way of trying to unparkSuccessor of head if it needs
452 452 * signal. But if it does not, status is set to PROPAGATE to
453 453 * ensure that upon release, propagation continues.
454 454 * Additionally, we must loop in case a new node is added
455 455 * while we are doing this. Also, unlike other uses of
456 456 * unparkSuccessor, we need to know if CAS to reset status
457 457 * fails, if so rechecking.
458 458 */
459 459 for (;;) {
460 460 Node h = head;
461 461 if (h != null && h != tail) {
462 462 int ws = h.waitStatus;
463 463 if (ws == Node.SIGNAL) {
464 464 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
465 465 continue; // loop to recheck cases
466 466 unparkSuccessor(h);
467 467 }
468 468 else if (ws == 0 &&
469 469 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
470 470 continue; // loop on failed CAS
471 471 }
472 472 if (h == head) // loop if head changed
473 473 break;
474 474 }
475 475 }
476 476
477 477 /**
478 478 * Sets head of queue, and checks if successor may be waiting
479 479 * in shared mode, if so propagating if either propagate > 0 or
480 480 * PROPAGATE status was set.
481 481 *
482 482 * @param node the node
483 483 * @param propagate the return value from a tryAcquireShared
484 484 */
485 485 private void setHeadAndPropagate(Node node, long propagate) {
486 486 Node h = head; // Record old head for check below
487 487 setHead(node);
488 488 /*
489 489 * Try to signal next queued node if:
490 490 * Propagation was indicated by caller,
491 491 * or was recorded (as h.waitStatus) by a previous operation
492 492 * (note: this uses sign-check of waitStatus because
493 493 * PROPAGATE status may transition to SIGNAL.)
494 494 * and
495 495 * The next node is waiting in shared mode,
496 496 * or we don't know, because it appears null
497 497 *
498 498 * The conservatism in both of these checks may cause
499 499 * unnecessary wake-ups, but only when there are multiple
500 500 * racing acquires/releases, so most need signals now or soon
501 501 * anyway.
502 502 */
503 503 if (propagate > 0 || h == null || h.waitStatus < 0) {
504 504 Node s = node.next;
505 505 if (s == null || s.isShared())
506 506 doReleaseShared();
507 507 }
508 508 }
509 509
510 510 // Utilities for various versions of acquire
511 511
512 512 /**
513 513 * Cancels an ongoing attempt to acquire.
514 514 *
515 515 * @param node the node
516 516 */
517 517 private void cancelAcquire(Node node) {
518 518 // Ignore if node doesn't exist
519 519 if (node == null)
520 520 return;
521 521
522 522 node.thread = null;
523 523
524 524 // Skip cancelled predecessors
525 525 Node pred = node.prev;
526 526 while (pred.waitStatus > 0)
527 527 node.prev = pred = pred.prev;
528 528
529 529 // predNext is the apparent node to unsplice. CASes below will
530 530 // fail if not, in which case, we lost race vs another cancel
531 531 // or signal, so no further action is necessary.
532 532 Node predNext = pred.next;
533 533
534 534 // Can use unconditional write instead of CAS here.
535 535 // After this atomic step, other Nodes can skip past us.
536 536 // Before, we are free of interference from other threads.
537 537 node.waitStatus = Node.CANCELLED;
538 538
539 539 // If we are the tail, remove ourselves.
540 540 if (node == tail && compareAndSetTail(node, pred)) {
541 541 compareAndSetNext(pred, predNext, null);
542 542 } else {
543 543 // If successor needs signal, try to set pred's next-link
544 544 // so it will get one. Otherwise wake it up to propagate.
545 545 int ws;
546 546 if (pred != head &&
547 547 ((ws = pred.waitStatus) == Node.SIGNAL ||
548 548 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
549 549 pred.thread != null) {
550 550 Node next = node.next;
551 551 if (next != null && next.waitStatus <= 0)
552 552 compareAndSetNext(pred, predNext, next);
553 553 } else {
554 554 unparkSuccessor(node);
555 555 }
556 556
557 557 node.next = node; // help GC
558 558 }
559 559 }
560 560
561 561 /**
562 562 * Checks and updates status for a node that failed to acquire.
563 563 * Returns true if thread should block. This is the main signal
564 564 * control in all acquire loops. Requires that pred == node.prev
565 565 *
566 566 * @param pred node's predecessor holding status
567 567 * @param node the node
568 568 * @return {@code true} if thread should block
569 569 */
570 570 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
571 571 int ws = pred.waitStatus;
572 572 if (ws == Node.SIGNAL)
573 573 /*
574 574 * This node has already set status asking a release
575 575 * to signal it, so it can safely park.
576 576 */
577 577 return true;
578 578 if (ws > 0) {
579 579 /*
580 580 * Predecessor was cancelled. Skip over predecessors and
581 581 * indicate retry.
582 582 */
583 583 do {
584 584 node.prev = pred = pred.prev;
585 585 } while (pred.waitStatus > 0);
586 586 pred.next = node;
587 587 } else {
588 588 /*
589 589 * waitStatus must be 0 or PROPAGATE. Indicate that we
590 590 * need a signal, but don't park yet. Caller will need to
591 591 * retry to make sure it cannot acquire before parking.
592 592 */
593 593 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
594 594 }
595 595 return false;
596 596 }
597 597
598 598 /**
599 599 * Convenience method to interrupt current thread.
600 600 */
601 601 private static void selfInterrupt() {
602 602 Thread.currentThread().interrupt();
603 603 }
604 604
605 605 /**
606 606 * Convenience method to park and then check if interrupted
607 607 *
608 608 * @return {@code true} if interrupted
609 609 */
610 610 private final boolean parkAndCheckInterrupt() {
611 611 LockSupport.park(this);
612 612 return Thread.interrupted();
613 613 }
614 614
615 615 /*
616 616 * Various flavors of acquire, varying in exclusive/shared and
617 617 * control modes. Each is mostly the same, but annoyingly
618 618 * different. Only a little bit of factoring is possible due to
619 619 * interactions of exception mechanics (including ensuring that we
620 620 * cancel if tryAcquire throws exception) and other control, at
621 621 * least not without hurting performance too much.
622 622 */
623 623
624 624 /**
625 625 * Acquires in exclusive uninterruptible mode for thread already in
626 626 * queue. Used by condition wait methods as well as acquire.
627 627 *
628 628 * @param node the node
629 629 * @param arg the acquire argument
630 630 * @return {@code true} if interrupted while waiting
631 631 */
632 632 final boolean acquireQueued(final Node node, long arg) {
633 633 boolean failed = true;
634 634 try {
635 635 boolean interrupted = false;
636 636 for (;;) {
637 637 final Node p = node.predecessor();
638 638 if (p == head && tryAcquire(arg)) {
639 639 setHead(node);
640 640 p.next = null; // help GC
641 641 failed = false;
642 642 return interrupted;
643 643 }
644 644 if (shouldParkAfterFailedAcquire(p, node) &&
645 645 parkAndCheckInterrupt())
646 646 interrupted = true;
647 647 }
648 648 } finally {
649 649 if (failed)
650 650 cancelAcquire(node);
651 651 }
652 652 }
653 653
654 654 /**
655 655 * Acquires in exclusive interruptible mode.
656 656 * @param arg the acquire argument
657 657 */
658 658 private void doAcquireInterruptibly(long arg)
659 659 throws InterruptedException {
660 660 final Node node = addWaiter(Node.EXCLUSIVE);
661 661 boolean failed = true;
662 662 try {
663 663 for (;;) {
664 664 final Node p = node.predecessor();
665 665 if (p == head && tryAcquire(arg)) {
666 666 setHead(node);
667 667 p.next = null; // help GC
668 668 failed = false;
669 669 return;
670 670 }
671 671 if (shouldParkAfterFailedAcquire(p, node) &&
672 672 parkAndCheckInterrupt())
673 673 throw new InterruptedException();
674 674 }
675 675 } finally {
676 676 if (failed)
677 677 cancelAcquire(node);
678 678 }
679 679 }
680 680
681 681 /**
682 682 * Acquires in exclusive timed mode.
683 683 *
684 684 * @param arg the acquire argument
685 685 * @param nanosTimeout max wait time
686 686 * @return {@code true} if acquired
687 687 */
688 688 private boolean doAcquireNanos(long arg, long nanosTimeout)
689 689 throws InterruptedException {
690 690 long lastTime = System.nanoTime();
691 691 final Node node = addWaiter(Node.EXCLUSIVE);
692 692 boolean failed = true;
693 693 try {
694 694 for (;;) {
695 695 final Node p = node.predecessor();
696 696 if (p == head && tryAcquire(arg)) {
697 697 setHead(node);
698 698 p.next = null; // help GC
699 699 failed = false;
700 700 return true;
701 701 }
702 702 if (nanosTimeout <= 0)
703 703 return false;
704 704 if (shouldParkAfterFailedAcquire(p, node) &&
705 705 nanosTimeout > spinForTimeoutThreshold)
706 706 LockSupport.parkNanos(this, nanosTimeout);
707 707 long now = System.nanoTime();
708 708 nanosTimeout -= now - lastTime;
709 709 lastTime = now;
710 710 if (Thread.interrupted())
711 711 throw new InterruptedException();
712 712 }
713 713 } finally {
714 714 if (failed)
715 715 cancelAcquire(node);
716 716 }
717 717 }
718 718
719 719 /**
720 720 * Acquires in shared uninterruptible mode.
721 721 * @param arg the acquire argument
722 722 */
723 723 private void doAcquireShared(long arg) {
724 724 final Node node = addWaiter(Node.SHARED);
725 725 boolean failed = true;
726 726 try {
727 727 boolean interrupted = false;
728 728 for (;;) {
729 729 final Node p = node.predecessor();
730 730 if (p == head) {
731 731 long r = tryAcquireShared(arg);
732 732 if (r >= 0) {
733 733 setHeadAndPropagate(node, r);
734 734 p.next = null; // help GC
735 735 if (interrupted)
736 736 selfInterrupt();
737 737 failed = false;
738 738 return;
739 739 }
740 740 }
741 741 if (shouldParkAfterFailedAcquire(p, node) &&
742 742 parkAndCheckInterrupt())
743 743 interrupted = true;
744 744 }
745 745 } finally {
746 746 if (failed)
747 747 cancelAcquire(node);
748 748 }
749 749 }
750 750
751 751 /**
752 752 * Acquires in shared interruptible mode.
753 753 * @param arg the acquire argument
754 754 */
755 755 private void doAcquireSharedInterruptibly(long arg)
756 756 throws InterruptedException {
757 757 final Node node = addWaiter(Node.SHARED);
758 758 boolean failed = true;
759 759 try {
760 760 for (;;) {
761 761 final Node p = node.predecessor();
762 762 if (p == head) {
763 763 long r = tryAcquireShared(arg);
764 764 if (r >= 0) {
765 765 setHeadAndPropagate(node, r);
766 766 p.next = null; // help GC
767 767 failed = false;
768 768 return;
769 769 }
770 770 }
771 771 if (shouldParkAfterFailedAcquire(p, node) &&
772 772 parkAndCheckInterrupt())
773 773 throw new InterruptedException();
774 774 }
775 775 } finally {
776 776 if (failed)
777 777 cancelAcquire(node);
778 778 }
779 779 }
780 780
781 781 /**
782 782 * Acquires in shared timed mode.
783 783 *
784 784 * @param arg the acquire argument
785 785 * @param nanosTimeout max wait time
786 786 * @return {@code true} if acquired
787 787 */
788 788 private boolean doAcquireSharedNanos(long arg, long nanosTimeout)
789 789 throws InterruptedException {
790 790
791 791 long lastTime = System.nanoTime();
792 792 final Node node = addWaiter(Node.SHARED);
793 793 boolean failed = true;
794 794 try {
795 795 for (;;) {
796 796 final Node p = node.predecessor();
797 797 if (p == head) {
798 798 long r = tryAcquireShared(arg);
799 799 if (r >= 0) {
800 800 setHeadAndPropagate(node, r);
801 801 p.next = null; // help GC
802 802 failed = false;
803 803 return true;
804 804 }
805 805 }
806 806 if (nanosTimeout <= 0)
807 807 return false;
808 808 if (shouldParkAfterFailedAcquire(p, node) &&
809 809 nanosTimeout > spinForTimeoutThreshold)
810 810 LockSupport.parkNanos(this, nanosTimeout);
811 811 long now = System.nanoTime();
812 812 nanosTimeout -= now - lastTime;
813 813 lastTime = now;
814 814 if (Thread.interrupted())
815 815 throw new InterruptedException();
816 816 }
817 817 } finally {
818 818 if (failed)
819 819 cancelAcquire(node);
820 820 }
821 821 }
822 822
823 823 // Main exported methods
824 824
825 825 /**
826 826 * Attempts to acquire in exclusive mode. This method should query
827 827 * if the state of the object permits it to be acquired in the
828 828 * exclusive mode, and if so to acquire it.
829 829 *
830 830 * <p>This method is always invoked by the thread performing
831 831 * acquire. If this method reports failure, the acquire method
832 832 * may queue the thread, if it is not already queued, until it is
833 833 * signalled by a release from some other thread. This can be used
834 834 * to implement method {@link Lock#tryLock()}.
835 835 *
836 836 * <p>The default
837 837 * implementation throws {@link UnsupportedOperationException}.
838 838 *
839 839 * @param arg the acquire argument. This value is always the one
840 840 * passed to an acquire method, or is the value saved on entry
841 841 * to a condition wait. The value is otherwise uninterpreted
842 842 * and can represent anything you like.
843 843 * @return {@code true} if successful. Upon success, this object has
844 844 * been acquired.
845 845 * @throws IllegalMonitorStateException if acquiring would place this
846 846 * synchronizer in an illegal state. This exception must be
847 847 * thrown in a consistent fashion for synchronization to work
848 848 * correctly.
849 849 * @throws UnsupportedOperationException if exclusive mode is not supported
850 850 */
851 851 protected boolean tryAcquire(long arg) {
852 852 throw new UnsupportedOperationException();
853 853 }
854 854
855 855 /**
856 856 * Attempts to set the state to reflect a release in exclusive
857 857 * mode.
858 858 *
859 859 * <p>This method is always invoked by the thread performing release.
860 860 *
861 861 * <p>The default implementation throws
862 862 * {@link UnsupportedOperationException}.
863 863 *
864 864 * @param arg the release argument. This value is always the one
865 865 * passed to a release method, or the current state value upon
866 866 * entry to a condition wait. The value is otherwise
867 867 * uninterpreted and can represent anything you like.
868 868 * @return {@code true} if this object is now in a fully released
869 869 * state, so that any waiting threads may attempt to acquire;
870 870 * and {@code false} otherwise.
871 871 * @throws IllegalMonitorStateException if releasing would place this
872 872 * synchronizer in an illegal state. This exception must be
873 873 * thrown in a consistent fashion for synchronization to work
874 874 * correctly.
875 875 * @throws UnsupportedOperationException if exclusive mode is not supported
876 876 */
877 877 protected boolean tryRelease(long arg) {
878 878 throw new UnsupportedOperationException();
879 879 }
880 880
881 881 /**
882 882 * Attempts to acquire in shared mode. This method should query if
883 883 * the state of the object permits it to be acquired in the shared
884 884 * mode, and if so to acquire it.
885 885 *
886 886 * <p>This method is always invoked by the thread performing
887 887 * acquire. If this method reports failure, the acquire method
888 888 * may queue the thread, if it is not already queued, until it is
889 889 * signalled by a release from some other thread.
890 890 *
891 891 * <p>The default implementation throws {@link
892 892 * UnsupportedOperationException}.
893 893 *
894 894 * @param arg the acquire argument. This value is always the one
895 895 * passed to an acquire method, or is the value saved on entry
896 896 * to a condition wait. The value is otherwise uninterpreted
897 897 * and can represent anything you like.
898 898 * @return a negative value on failure; zero if acquisition in shared
899 899 * mode succeeded but no subsequent shared-mode acquire can
900 900 * succeed; and a positive value if acquisition in shared
901 901 * mode succeeded and subsequent shared-mode acquires might
902 902 * also succeed, in which case a subsequent waiting thread
903 903 * must check availability. (Support for three different
904 904 * return values enables this method to be used in contexts
905 905 * where acquires only sometimes act exclusively.) Upon
906 906 * success, this object has been acquired.
907 907 * @throws IllegalMonitorStateException if acquiring would place this
908 908 * synchronizer in an illegal state. This exception must be
909 909 * thrown in a consistent fashion for synchronization to work
910 910 * correctly.
911 911 * @throws UnsupportedOperationException if shared mode is not supported
912 912 */
913 913 protected long tryAcquireShared(long arg) {
914 914 throw new UnsupportedOperationException();
915 915 }
916 916
917 917 /**
918 918 * Attempts to set the state to reflect a release in shared mode.
919 919 *
920 920 * <p>This method is always invoked by the thread performing release.
921 921 *
922 922 * <p>The default implementation throws
923 923 * {@link UnsupportedOperationException}.
924 924 *
925 925 * @param arg the release argument. This value is always the one
926 926 * passed to a release method, or the current state value upon
927 927 * entry to a condition wait. The value is otherwise
928 928 * uninterpreted and can represent anything you like.
929 929 * @return {@code true} if this release of shared mode may permit a
930 930 * waiting acquire (shared or exclusive) to succeed; and
931 931 * {@code false} otherwise
932 932 * @throws IllegalMonitorStateException if releasing would place this
933 933 * synchronizer in an illegal state. This exception must be
934 934 * thrown in a consistent fashion for synchronization to work
935 935 * correctly.
936 936 * @throws UnsupportedOperationException if shared mode is not supported
937 937 */
938 938 protected boolean tryReleaseShared(long arg) {
939 939 throw new UnsupportedOperationException();
940 940 }
941 941
942 942 /**
943 943 * Returns {@code true} if synchronization is held exclusively with
944 944 * respect to the current (calling) thread. This method is invoked
945 945 * upon each call to a non-waiting {@link ConditionObject} method.
946 946 * (Waiting methods instead invoke {@link #release}.)
947 947 *
948 948 * <p>The default implementation throws {@link
949 949 * UnsupportedOperationException}. This method is invoked
950 950 * internally only within {@link ConditionObject} methods, so need
951 951 * not be defined if conditions are not used.
952 952 *
953 953 * @return {@code true} if synchronization is held exclusively;
954 954 * {@code false} otherwise
955 955 * @throws UnsupportedOperationException if conditions are not supported
956 956 */
957 957 protected boolean isHeldExclusively() {
958 958 throw new UnsupportedOperationException();
959 959 }
960 960
961 961 /**
962 962 * Acquires in exclusive mode, ignoring interrupts. Implemented
963 963 * by invoking at least once {@link #tryAcquire},
964 964 * returning on success. Otherwise the thread is queued, possibly
965 965 * repeatedly blocking and unblocking, invoking {@link
966 966 * #tryAcquire} until success. This method can be used
967 967 * to implement method {@link Lock#lock}.
968 968 *
969 969 * @param arg the acquire argument. This value is conveyed to
970 970 * {@link #tryAcquire} but is otherwise uninterpreted and
971 971 * can represent anything you like.
972 972 */
973 973 public final void acquire(long arg) {
974 974 if (!tryAcquire(arg) &&
975 975 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
976 976 selfInterrupt();
977 977 }
978 978
979 979 /**
980 980 * Acquires in exclusive mode, aborting if interrupted.
981 981 * Implemented by first checking interrupt status, then invoking
982 982 * at least once {@link #tryAcquire}, returning on
↓ open down ↓ |
982 lines elided |
↑ open up ↑ |
983 983 * success. Otherwise the thread is queued, possibly repeatedly
984 984 * blocking and unblocking, invoking {@link #tryAcquire}
985 985 * until success or the thread is interrupted. This method can be
986 986 * used to implement method {@link Lock#lockInterruptibly}.
987 987 *
988 988 * @param arg the acquire argument. This value is conveyed to
989 989 * {@link #tryAcquire} but is otherwise uninterpreted and
990 990 * can represent anything you like.
991 991 * @throws InterruptedException if the current thread is interrupted
992 992 */
993 - public final void acquireInterruptibly(long arg) throws InterruptedException {
993 + public final void acquireInterruptibly(long arg)
994 + throws InterruptedException {
994 995 if (Thread.interrupted())
995 996 throw new InterruptedException();
996 997 if (!tryAcquire(arg))
997 998 doAcquireInterruptibly(arg);
998 999 }
999 1000
1000 1001 /**
1001 1002 * Attempts to acquire in exclusive mode, aborting if interrupted,
1002 1003 * and failing if the given timeout elapses. Implemented by first
1003 1004 * checking interrupt status, then invoking at least once {@link
1004 1005 * #tryAcquire}, returning on success. Otherwise, the thread is
1005 1006 * queued, possibly repeatedly blocking and unblocking, invoking
1006 1007 * {@link #tryAcquire} until success or the thread is interrupted
↓ open down ↓ |
3 lines elided |
↑ open up ↑ |
1007 1008 * or the timeout elapses. This method can be used to implement
1008 1009 * method {@link Lock#tryLock(long, TimeUnit)}.
1009 1010 *
1010 1011 * @param arg the acquire argument. This value is conveyed to
1011 1012 * {@link #tryAcquire} but is otherwise uninterpreted and
1012 1013 * can represent anything you like.
1013 1014 * @param nanosTimeout the maximum number of nanoseconds to wait
1014 1015 * @return {@code true} if acquired; {@code false} if timed out
1015 1016 * @throws InterruptedException if the current thread is interrupted
1016 1017 */
1017 - public final boolean tryAcquireNanos(long arg, long nanosTimeout) throws InterruptedException {
1018 + public final boolean tryAcquireNanos(long arg, long nanosTimeout)
1019 + throws InterruptedException {
1018 1020 if (Thread.interrupted())
1019 1021 throw new InterruptedException();
1020 1022 return tryAcquire(arg) ||
1021 1023 doAcquireNanos(arg, nanosTimeout);
1022 1024 }
1023 1025
1024 1026 /**
1025 1027 * Releases in exclusive mode. Implemented by unblocking one or
1026 1028 * more threads if {@link #tryRelease} returns true.
1027 1029 * This method can be used to implement method {@link Lock#unlock}.
1028 1030 *
1029 1031 * @param arg the release argument. This value is conveyed to
1030 1032 * {@link #tryRelease} but is otherwise uninterpreted and
1031 1033 * can represent anything you like.
1032 1034 * @return the value returned from {@link #tryRelease}
1033 1035 */
1034 1036 public final boolean release(long arg) {
1035 1037 if (tryRelease(arg)) {
1036 1038 Node h = head;
1037 1039 if (h != null && h.waitStatus != 0)
1038 1040 unparkSuccessor(h);
1039 1041 return true;
1040 1042 }
1041 1043 return false;
1042 1044 }
1043 1045
1044 1046 /**
1045 1047 * Acquires in shared mode, ignoring interrupts. Implemented by
1046 1048 * first invoking at least once {@link #tryAcquireShared},
1047 1049 * returning on success. Otherwise the thread is queued, possibly
1048 1050 * repeatedly blocking and unblocking, invoking {@link
1049 1051 * #tryAcquireShared} until success.
1050 1052 *
1051 1053 * @param arg the acquire argument. This value is conveyed to
1052 1054 * {@link #tryAcquireShared} but is otherwise uninterpreted
1053 1055 * and can represent anything you like.
1054 1056 */
1055 1057 public final void acquireShared(long arg) {
1056 1058 if (tryAcquireShared(arg) < 0)
1057 1059 doAcquireShared(arg);
1058 1060 }
1059 1061
1060 1062 /**
1061 1063 * Acquires in shared mode, aborting if interrupted. Implemented
1062 1064 * by first checking interrupt status, then invoking at least once
↓ open down ↓ |
35 lines elided |
↑ open up ↑ |
1063 1065 * {@link #tryAcquireShared}, returning on success. Otherwise the
1064 1066 * thread is queued, possibly repeatedly blocking and unblocking,
1065 1067 * invoking {@link #tryAcquireShared} until success or the thread
1066 1068 * is interrupted.
1067 1069 * @param arg the acquire argument
1068 1070 * This value is conveyed to {@link #tryAcquireShared} but is
1069 1071 * otherwise uninterpreted and can represent anything
1070 1072 * you like.
1071 1073 * @throws InterruptedException if the current thread is interrupted
1072 1074 */
1073 - public final void acquireSharedInterruptibly(long arg) throws InterruptedException {
1075 + public final void acquireSharedInterruptibly(long arg)
1076 + throws InterruptedException {
1074 1077 if (Thread.interrupted())
1075 1078 throw new InterruptedException();
1076 1079 if (tryAcquireShared(arg) < 0)
1077 1080 doAcquireSharedInterruptibly(arg);
1078 1081 }
1079 1082
1080 1083 /**
1081 1084 * Attempts to acquire in shared mode, aborting if interrupted, and
1082 1085 * failing if the given timeout elapses. Implemented by first
1083 1086 * checking interrupt status, then invoking at least once {@link
1084 1087 * #tryAcquireShared}, returning on success. Otherwise, the
1085 1088 * thread is queued, possibly repeatedly blocking and unblocking,
↓ open down ↓ |
2 lines elided |
↑ open up ↑ |
1086 1089 * invoking {@link #tryAcquireShared} until success or the thread
1087 1090 * is interrupted or the timeout elapses.
1088 1091 *
1089 1092 * @param arg the acquire argument. This value is conveyed to
1090 1093 * {@link #tryAcquireShared} but is otherwise uninterpreted
1091 1094 * and can represent anything you like.
1092 1095 * @param nanosTimeout the maximum number of nanoseconds to wait
1093 1096 * @return {@code true} if acquired; {@code false} if timed out
1094 1097 * @throws InterruptedException if the current thread is interrupted
1095 1098 */
1096 - public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) throws InterruptedException {
1099 + public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
1100 + throws InterruptedException {
1097 1101 if (Thread.interrupted())
1098 1102 throw new InterruptedException();
1099 1103 return tryAcquireShared(arg) >= 0 ||
1100 1104 doAcquireSharedNanos(arg, nanosTimeout);
1101 1105 }
1102 1106
1103 1107 /**
1104 1108 * Releases in shared mode. Implemented by unblocking one or more
1105 1109 * threads if {@link #tryReleaseShared} returns true.
1106 1110 *
1107 1111 * @param arg the release argument. This value is conveyed to
1108 1112 * {@link #tryReleaseShared} but is otherwise uninterpreted
1109 1113 * and can represent anything you like.
1110 1114 * @return the value returned from {@link #tryReleaseShared}
1111 1115 */
1112 1116 public final boolean releaseShared(long arg) {
1113 1117 if (tryReleaseShared(arg)) {
1114 1118 doReleaseShared();
1115 1119 return true;
1116 1120 }
1117 1121 return false;
1118 1122 }
1119 1123
1120 1124 // Queue inspection methods
1121 1125
1122 1126 /**
1123 1127 * Queries whether any threads are waiting to acquire. Note that
1124 1128 * because cancellations due to interrupts and timeouts may occur
1125 1129 * at any time, a {@code true} return does not guarantee that any
1126 1130 * other thread will ever acquire.
1127 1131 *
1128 1132 * <p>In this implementation, this operation returns in
1129 1133 * constant time.
1130 1134 *
1131 1135 * @return {@code true} if there may be other threads waiting to acquire
1132 1136 */
1133 1137 public final boolean hasQueuedThreads() {
1134 1138 return head != tail;
1135 1139 }
1136 1140
1137 1141 /**
1138 1142 * Queries whether any threads have ever contended to acquire this
1139 1143 * synchronizer; that is if an acquire method has ever blocked.
1140 1144 *
1141 1145 * <p>In this implementation, this operation returns in
1142 1146 * constant time.
1143 1147 *
1144 1148 * @return {@code true} if there has ever been contention
1145 1149 */
1146 1150 public final boolean hasContended() {
1147 1151 return head != null;
1148 1152 }
1149 1153
1150 1154 /**
1151 1155 * Returns the first (longest-waiting) thread in the queue, or
1152 1156 * {@code null} if no threads are currently queued.
1153 1157 *
1154 1158 * <p>In this implementation, this operation normally returns in
1155 1159 * constant time, but may iterate upon contention if other threads are
1156 1160 * concurrently modifying the queue.
1157 1161 *
1158 1162 * @return the first (longest-waiting) thread in the queue, or
1159 1163 * {@code null} if no threads are currently queued
1160 1164 */
1161 1165 public final Thread getFirstQueuedThread() {
1162 1166 // handle only fast path, else relay
1163 1167 return (head == tail) ? null : fullGetFirstQueuedThread();
1164 1168 }
1165 1169
1166 1170 /**
1167 1171 * Version of getFirstQueuedThread called when fastpath fails
1168 1172 */
1169 1173 private Thread fullGetFirstQueuedThread() {
1170 1174 /*
1171 1175 * The first node is normally head.next. Try to get its
1172 1176 * thread field, ensuring consistent reads: If thread
1173 1177 * field is nulled out or s.prev is no longer head, then
1174 1178 * some other thread(s) concurrently performed setHead in
1175 1179 * between some of our reads. We try this twice before
1176 1180 * resorting to traversal.
1177 1181 */
1178 1182 Node h, s;
1179 1183 Thread st;
1180 1184 if (((h = head) != null && (s = h.next) != null &&
1181 1185 s.prev == head && (st = s.thread) != null) ||
1182 1186 ((h = head) != null && (s = h.next) != null &&
1183 1187 s.prev == head && (st = s.thread) != null))
1184 1188 return st;
1185 1189
1186 1190 /*
1187 1191 * Head's next field might not have been set yet, or may have
1188 1192 * been unset after setHead. So we must check to see if tail
1189 1193 * is actually first node. If not, we continue on, safely
1190 1194 * traversing from tail back to head to find first,
1191 1195 * guaranteeing termination.
1192 1196 */
1193 1197
1194 1198 Node t = tail;
1195 1199 Thread firstThread = null;
1196 1200 while (t != null && t != head) {
1197 1201 Thread tt = t.thread;
1198 1202 if (tt != null)
1199 1203 firstThread = tt;
1200 1204 t = t.prev;
1201 1205 }
1202 1206 return firstThread;
1203 1207 }
1204 1208
1205 1209 /**
1206 1210 * Returns true if the given thread is currently queued.
1207 1211 *
1208 1212 * <p>This implementation traverses the queue to determine
1209 1213 * presence of the given thread.
1210 1214 *
1211 1215 * @param thread the thread
1212 1216 * @return {@code true} if the given thread is on the queue
1213 1217 * @throws NullPointerException if the thread is null
1214 1218 */
1215 1219 public final boolean isQueued(Thread thread) {
1216 1220 if (thread == null)
1217 1221 throw new NullPointerException();
1218 1222 for (Node p = tail; p != null; p = p.prev)
1219 1223 if (p.thread == thread)
1220 1224 return true;
1221 1225 return false;
1222 1226 }
1223 1227
1224 1228 /**
1225 1229 * Returns {@code true} if the apparent first queued thread, if one
1226 1230 * exists, is waiting in exclusive mode. If this method returns
1227 1231 * {@code true}, and the current thread is attempting to acquire in
1228 1232 * shared mode (that is, this method is invoked from {@link
1229 1233 * #tryAcquireShared}) then it is guaranteed that the current thread
1230 1234 * is not the first queued thread. Used only as a heuristic in
1231 1235 * ReentrantReadWriteLock.
1232 1236 */
1233 1237 final boolean apparentlyFirstQueuedIsExclusive() {
1234 1238 Node h, s;
1235 1239 return (h = head) != null &&
1236 1240 (s = h.next) != null &&
1237 1241 !s.isShared() &&
1238 1242 s.thread != null;
1239 1243 }
1240 1244
1241 1245 /**
1242 1246 * Queries whether any threads have been waiting to acquire longer
1243 1247 * than the current thread.
1244 1248 *
1245 1249 * <p>An invocation of this method is equivalent to (but may be
1246 1250 * more efficient than):
1247 1251 * <pre> {@code
1248 1252 * getFirstQueuedThread() != Thread.currentThread() &&
1249 1253 * hasQueuedThreads()}</pre>
1250 1254 *
1251 1255 * <p>Note that because cancellations due to interrupts and
1252 1256 * timeouts may occur at any time, a {@code true} return does not
1253 1257 * guarantee that some other thread will acquire before the current
1254 1258 * thread. Likewise, it is possible for another thread to win a
1255 1259 * race to enqueue after this method has returned {@code false},
1256 1260 * due to the queue being empty.
1257 1261 *
1258 1262 * <p>This method is designed to be used by a fair synchronizer to
1259 1263 * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
1260 1264 * Such a synchronizer's {@link #tryAcquire} method should return
1261 1265 * {@code false}, and its {@link #tryAcquireShared} method should
1262 1266 * return a negative value, if this method returns {@code true}
1263 1267 * (unless this is a reentrant acquire). For example, the {@code
1264 1268 * tryAcquire} method for a fair, reentrant, exclusive mode
1265 1269 * synchronizer might look like this:
1266 1270 *
1267 1271 * <pre> {@code
1268 1272 * protected boolean tryAcquire(int arg) {
1269 1273 * if (isHeldExclusively()) {
1270 1274 * // A reentrant acquire; increment hold count
1271 1275 * return true;
1272 1276 * } else if (hasQueuedPredecessors()) {
1273 1277 * return false;
1274 1278 * } else {
1275 1279 * // try to acquire normally
1276 1280 * }
1277 1281 * }}</pre>
1278 1282 *
1279 1283 * @return {@code true} if there is a queued thread preceding the
1280 1284 * current thread, and {@code false} if the current thread
1281 1285 * is at the head of the queue or the queue is empty
1282 1286 * @since 1.7
1283 1287 */
1284 1288 public final boolean hasQueuedPredecessors() {
1285 1289 // The correctness of this depends on head being initialized
1286 1290 // before tail and on head.next being accurate if the current
1287 1291 // thread is first in queue.
1288 1292 Node t = tail; // Read fields in reverse initialization order
1289 1293 Node h = head;
1290 1294 Node s;
1291 1295 return h != t &&
1292 1296 ((s = h.next) == null || s.thread != Thread.currentThread());
1293 1297 }
1294 1298
1295 1299
1296 1300 // Instrumentation and monitoring methods
1297 1301
1298 1302 /**
1299 1303 * Returns an estimate of the number of threads waiting to
1300 1304 * acquire. The value is only an estimate because the number of
1301 1305 * threads may change dynamically while this method traverses
1302 1306 * internal data structures. This method is designed for use in
1303 1307 * monitoring system state, not for synchronization
1304 1308 * control.
1305 1309 *
1306 1310 * @return the estimated number of threads waiting to acquire
1307 1311 */
1308 1312 public final int getQueueLength() {
1309 1313 int n = 0;
1310 1314 for (Node p = tail; p != null; p = p.prev) {
1311 1315 if (p.thread != null)
1312 1316 ++n;
1313 1317 }
1314 1318 return n;
1315 1319 }
1316 1320
1317 1321 /**
1318 1322 * Returns a collection containing threads that may be waiting to
1319 1323 * acquire. Because the actual set of threads may change
1320 1324 * dynamically while constructing this result, the returned
1321 1325 * collection is only a best-effort estimate. The elements of the
1322 1326 * returned collection are in no particular order. This method is
1323 1327 * designed to facilitate construction of subclasses that provide
1324 1328 * more extensive monitoring facilities.
1325 1329 *
1326 1330 * @return the collection of threads
1327 1331 */
1328 1332 public final Collection<Thread> getQueuedThreads() {
1329 1333 ArrayList<Thread> list = new ArrayList<Thread>();
1330 1334 for (Node p = tail; p != null; p = p.prev) {
1331 1335 Thread t = p.thread;
1332 1336 if (t != null)
1333 1337 list.add(t);
1334 1338 }
1335 1339 return list;
1336 1340 }
1337 1341
1338 1342 /**
1339 1343 * Returns a collection containing threads that may be waiting to
1340 1344 * acquire in exclusive mode. This has the same properties
1341 1345 * as {@link #getQueuedThreads} except that it only returns
1342 1346 * those threads waiting due to an exclusive acquire.
1343 1347 *
1344 1348 * @return the collection of threads
1345 1349 */
1346 1350 public final Collection<Thread> getExclusiveQueuedThreads() {
1347 1351 ArrayList<Thread> list = new ArrayList<Thread>();
1348 1352 for (Node p = tail; p != null; p = p.prev) {
1349 1353 if (!p.isShared()) {
1350 1354 Thread t = p.thread;
1351 1355 if (t != null)
1352 1356 list.add(t);
1353 1357 }
1354 1358 }
1355 1359 return list;
1356 1360 }
1357 1361
1358 1362 /**
1359 1363 * Returns a collection containing threads that may be waiting to
1360 1364 * acquire in shared mode. This has the same properties
1361 1365 * as {@link #getQueuedThreads} except that it only returns
1362 1366 * those threads waiting due to a shared acquire.
1363 1367 *
1364 1368 * @return the collection of threads
1365 1369 */
1366 1370 public final Collection<Thread> getSharedQueuedThreads() {
1367 1371 ArrayList<Thread> list = new ArrayList<Thread>();
1368 1372 for (Node p = tail; p != null; p = p.prev) {
1369 1373 if (p.isShared()) {
1370 1374 Thread t = p.thread;
1371 1375 if (t != null)
1372 1376 list.add(t);
1373 1377 }
1374 1378 }
1375 1379 return list;
1376 1380 }
1377 1381
1378 1382 /**
1379 1383 * Returns a string identifying this synchronizer, as well as its state.
1380 1384 * The state, in brackets, includes the String {@code "State ="}
1381 1385 * followed by the current value of {@link #getState}, and either
1382 1386 * {@code "nonempty"} or {@code "empty"} depending on whether the
1383 1387 * queue is empty.
1384 1388 *
1385 1389 * @return a string identifying this synchronizer, as well as its state
1386 1390 */
1387 1391 public String toString() {
1388 1392 long s = getState();
1389 1393 String q = hasQueuedThreads() ? "non" : "";
1390 1394 return super.toString() +
1391 1395 "[State = " + s + ", " + q + "empty queue]";
1392 1396 }
1393 1397
1394 1398
1395 1399 // Internal support methods for Conditions
1396 1400
1397 1401 /**
1398 1402 * Returns true if a node, always one that was initially placed on
1399 1403 * a condition queue, is now waiting to reacquire on sync queue.
1400 1404 * @param node the node
1401 1405 * @return true if is reacquiring
1402 1406 */
1403 1407 final boolean isOnSyncQueue(Node node) {
1404 1408 if (node.waitStatus == Node.CONDITION || node.prev == null)
1405 1409 return false;
1406 1410 if (node.next != null) // If has successor, it must be on queue
1407 1411 return true;
1408 1412 /*
1409 1413 * node.prev can be non-null, but not yet on queue because
1410 1414 * the CAS to place it on queue can fail. So we have to
1411 1415 * traverse from tail to make sure it actually made it. It
1412 1416 * will always be near the tail in calls to this method, and
1413 1417 * unless the CAS failed (which is unlikely), it will be
1414 1418 * there, so we hardly ever traverse much.
1415 1419 */
1416 1420 return findNodeFromTail(node);
1417 1421 }
1418 1422
1419 1423 /**
1420 1424 * Returns true if node is on sync queue by searching backwards from tail.
1421 1425 * Called only when needed by isOnSyncQueue.
1422 1426 * @return true if present
1423 1427 */
1424 1428 private boolean findNodeFromTail(Node node) {
1425 1429 Node t = tail;
1426 1430 for (;;) {
1427 1431 if (t == node)
1428 1432 return true;
1429 1433 if (t == null)
1430 1434 return false;
1431 1435 t = t.prev;
1432 1436 }
1433 1437 }
1434 1438
1435 1439 /**
1436 1440 * Transfers a node from a condition queue onto sync queue.
1437 1441 * Returns true if successful.
1438 1442 * @param node the node
1439 1443 * @return true if successfully transferred (else the node was
1440 1444 * cancelled before signal).
1441 1445 */
1442 1446 final boolean transferForSignal(Node node) {
1443 1447 /*
1444 1448 * If cannot change waitStatus, the node has been cancelled.
1445 1449 */
1446 1450 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
1447 1451 return false;
1448 1452
1449 1453 /*
1450 1454 * Splice onto queue and try to set waitStatus of predecessor to
1451 1455 * indicate that thread is (probably) waiting. If cancelled or
1452 1456 * attempt to set waitStatus fails, wake up to resync (in which
1453 1457 * case the waitStatus can be transiently and harmlessly wrong).
1454 1458 */
1455 1459 Node p = enq(node);
1456 1460 int ws = p.waitStatus;
1457 1461 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
1458 1462 LockSupport.unpark(node.thread);
1459 1463 return true;
1460 1464 }
1461 1465
1462 1466 /**
1463 1467 * Transfers node, if necessary, to sync queue after a cancelled
1464 1468 * wait. Returns true if thread was cancelled before being
1465 1469 * signalled.
1466 1470 * @param current the waiting thread
1467 1471 * @param node its node
1468 1472 * @return true if cancelled before the node was signalled
1469 1473 */
1470 1474 final boolean transferAfterCancelledWait(Node node) {
1471 1475 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
1472 1476 enq(node);
1473 1477 return true;
1474 1478 }
1475 1479 /*
1476 1480 * If we lost out to a signal(), then we can't proceed
1477 1481 * until it finishes its enq(). Cancelling during an
1478 1482 * incomplete transfer is both rare and transient, so just
1479 1483 * spin.
1480 1484 */
1481 1485 while (!isOnSyncQueue(node))
1482 1486 Thread.yield();
1483 1487 return false;
1484 1488 }
1485 1489
1486 1490 /**
1487 1491 * Invokes release with current state value; returns saved state.
1488 1492 * Cancels node and throws exception on failure.
1489 1493 * @param node the condition node for this wait
1490 1494 * @return previous sync state
1491 1495 */
1492 1496 final long fullyRelease(Node node) {
1493 1497 boolean failed = true;
1494 1498 try {
1495 1499 long savedState = getState();
1496 1500 if (release(savedState)) {
1497 1501 failed = false;
1498 1502 return savedState;
1499 1503 } else {
1500 1504 throw new IllegalMonitorStateException();
1501 1505 }
1502 1506 } finally {
1503 1507 if (failed)
1504 1508 node.waitStatus = Node.CANCELLED;
1505 1509 }
1506 1510 }
1507 1511
1508 1512 // Instrumentation methods for conditions
1509 1513
1510 1514 /**
1511 1515 * Queries whether the given ConditionObject
1512 1516 * uses this synchronizer as its lock.
1513 1517 *
1514 1518 * @param condition the condition
1515 1519 * @return <tt>true</tt> if owned
1516 1520 * @throws NullPointerException if the condition is null
1517 1521 */
1518 1522 public final boolean owns(ConditionObject condition) {
1519 1523 if (condition == null)
1520 1524 throw new NullPointerException();
1521 1525 return condition.isOwnedBy(this);
1522 1526 }
1523 1527
1524 1528 /**
1525 1529 * Queries whether any threads are waiting on the given condition
1526 1530 * associated with this synchronizer. Note that because timeouts
1527 1531 * and interrupts may occur at any time, a <tt>true</tt> return
1528 1532 * does not guarantee that a future <tt>signal</tt> will awaken
1529 1533 * any threads. This method is designed primarily for use in
1530 1534 * monitoring of the system state.
1531 1535 *
1532 1536 * @param condition the condition
1533 1537 * @return <tt>true</tt> if there are any waiting threads
1534 1538 * @throws IllegalMonitorStateException if exclusive synchronization
1535 1539 * is not held
1536 1540 * @throws IllegalArgumentException if the given condition is
1537 1541 * not associated with this synchronizer
1538 1542 * @throws NullPointerException if the condition is null
1539 1543 */
1540 1544 public final boolean hasWaiters(ConditionObject condition) {
1541 1545 if (!owns(condition))
1542 1546 throw new IllegalArgumentException("Not owner");
1543 1547 return condition.hasWaiters();
1544 1548 }
1545 1549
1546 1550 /**
1547 1551 * Returns an estimate of the number of threads waiting on the
1548 1552 * given condition associated with this synchronizer. Note that
1549 1553 * because timeouts and interrupts may occur at any time, the
1550 1554 * estimate serves only as an upper bound on the actual number of
1551 1555 * waiters. This method is designed for use in monitoring of the
1552 1556 * system state, not for synchronization control.
1553 1557 *
1554 1558 * @param condition the condition
1555 1559 * @return the estimated number of waiting threads
1556 1560 * @throws IllegalMonitorStateException if exclusive synchronization
1557 1561 * is not held
1558 1562 * @throws IllegalArgumentException if the given condition is
1559 1563 * not associated with this synchronizer
1560 1564 * @throws NullPointerException if the condition is null
1561 1565 */
1562 1566 public final int getWaitQueueLength(ConditionObject condition) {
1563 1567 if (!owns(condition))
1564 1568 throw new IllegalArgumentException("Not owner");
1565 1569 return condition.getWaitQueueLength();
1566 1570 }
1567 1571
1568 1572 /**
1569 1573 * Returns a collection containing those threads that may be
1570 1574 * waiting on the given condition associated with this
1571 1575 * synchronizer. Because the actual set of threads may change
1572 1576 * dynamically while constructing this result, the returned
1573 1577 * collection is only a best-effort estimate. The elements of the
1574 1578 * returned collection are in no particular order.
1575 1579 *
1576 1580 * @param condition the condition
1577 1581 * @return the collection of threads
1578 1582 * @throws IllegalMonitorStateException if exclusive synchronization
1579 1583 * is not held
1580 1584 * @throws IllegalArgumentException if the given condition is
1581 1585 * not associated with this synchronizer
1582 1586 * @throws NullPointerException if the condition is null
1583 1587 */
1584 1588 public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
1585 1589 if (!owns(condition))
1586 1590 throw new IllegalArgumentException("Not owner");
1587 1591 return condition.getWaitingThreads();
1588 1592 }
1589 1593
1590 1594 /**
1591 1595 * Condition implementation for a {@link
1592 1596 * AbstractQueuedLongSynchronizer} serving as the basis of a {@link
1593 1597 * Lock} implementation.
1594 1598 *
1595 1599 * <p>Method documentation for this class describes mechanics,
1596 1600 * not behavioral specifications from the point of view of Lock
1597 1601 * and Condition users. Exported versions of this class will in
1598 1602 * general need to be accompanied by documentation describing
1599 1603 * condition semantics that rely on those of the associated
1600 1604 * <tt>AbstractQueuedLongSynchronizer</tt>.
1601 1605 *
1602 1606 * <p>This class is Serializable, but all fields are transient,
1603 1607 * so deserialized conditions have no waiters.
1604 1608 *
1605 1609 * @since 1.6
1606 1610 */
1607 1611 public class ConditionObject implements Condition, java.io.Serializable {
1608 1612 private static final long serialVersionUID = 1173984872572414699L;
1609 1613 /** First node of condition queue. */
1610 1614 private transient Node firstWaiter;
1611 1615 /** Last node of condition queue. */
1612 1616 private transient Node lastWaiter;
1613 1617
1614 1618 /**
1615 1619 * Creates a new <tt>ConditionObject</tt> instance.
1616 1620 */
1617 1621 public ConditionObject() { }
1618 1622
1619 1623 // Internal methods
1620 1624
1621 1625 /**
1622 1626 * Adds a new waiter to wait queue.
1623 1627 * @return its new wait node
1624 1628 */
1625 1629 private Node addConditionWaiter() {
1626 1630 Node t = lastWaiter;
1627 1631 // If lastWaiter is cancelled, clean out.
1628 1632 if (t != null && t.waitStatus != Node.CONDITION) {
1629 1633 unlinkCancelledWaiters();
1630 1634 t = lastWaiter;
1631 1635 }
1632 1636 Node node = new Node(Thread.currentThread(), Node.CONDITION);
1633 1637 if (t == null)
1634 1638 firstWaiter = node;
1635 1639 else
1636 1640 t.nextWaiter = node;
1637 1641 lastWaiter = node;
1638 1642 return node;
1639 1643 }
1640 1644
1641 1645 /**
1642 1646 * Removes and transfers nodes until hit non-cancelled one or
1643 1647 * null. Split out from signal in part to encourage compilers
1644 1648 * to inline the case of no waiters.
1645 1649 * @param first (non-null) the first node on condition queue
1646 1650 */
1647 1651 private void doSignal(Node first) {
1648 1652 do {
1649 1653 if ( (firstWaiter = first.nextWaiter) == null)
1650 1654 lastWaiter = null;
1651 1655 first.nextWaiter = null;
1652 1656 } while (!transferForSignal(first) &&
1653 1657 (first = firstWaiter) != null);
1654 1658 }
1655 1659
1656 1660 /**
1657 1661 * Removes and transfers all nodes.
1658 1662 * @param first (non-null) the first node on condition queue
1659 1663 */
1660 1664 private void doSignalAll(Node first) {
1661 1665 lastWaiter = firstWaiter = null;
1662 1666 do {
1663 1667 Node next = first.nextWaiter;
1664 1668 first.nextWaiter = null;
1665 1669 transferForSignal(first);
1666 1670 first = next;
1667 1671 } while (first != null);
1668 1672 }
1669 1673
1670 1674 /**
1671 1675 * Unlinks cancelled waiter nodes from condition queue.
1672 1676 * Called only while holding lock. This is called when
1673 1677 * cancellation occurred during condition wait, and upon
1674 1678 * insertion of a new waiter when lastWaiter is seen to have
1675 1679 * been cancelled. This method is needed to avoid garbage
1676 1680 * retention in the absence of signals. So even though it may
1677 1681 * require a full traversal, it comes into play only when
1678 1682 * timeouts or cancellations occur in the absence of
1679 1683 * signals. It traverses all nodes rather than stopping at a
1680 1684 * particular target to unlink all pointers to garbage nodes
1681 1685 * without requiring many re-traversals during cancellation
1682 1686 * storms.
1683 1687 */
1684 1688 private void unlinkCancelledWaiters() {
1685 1689 Node t = firstWaiter;
1686 1690 Node trail = null;
1687 1691 while (t != null) {
1688 1692 Node next = t.nextWaiter;
1689 1693 if (t.waitStatus != Node.CONDITION) {
1690 1694 t.nextWaiter = null;
1691 1695 if (trail == null)
1692 1696 firstWaiter = next;
1693 1697 else
1694 1698 trail.nextWaiter = next;
1695 1699 if (next == null)
1696 1700 lastWaiter = trail;
1697 1701 }
1698 1702 else
1699 1703 trail = t;
1700 1704 t = next;
1701 1705 }
1702 1706 }
1703 1707
1704 1708 // public methods
1705 1709
1706 1710 /**
1707 1711 * Moves the longest-waiting thread, if one exists, from the
1708 1712 * wait queue for this condition to the wait queue for the
1709 1713 * owning lock.
1710 1714 *
1711 1715 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1712 1716 * returns {@code false}
1713 1717 */
1714 1718 public final void signal() {
1715 1719 if (!isHeldExclusively())
1716 1720 throw new IllegalMonitorStateException();
1717 1721 Node first = firstWaiter;
1718 1722 if (first != null)
1719 1723 doSignal(first);
1720 1724 }
1721 1725
1722 1726 /**
1723 1727 * Moves all threads from the wait queue for this condition to
1724 1728 * the wait queue for the owning lock.
1725 1729 *
1726 1730 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1727 1731 * returns {@code false}
1728 1732 */
1729 1733 public final void signalAll() {
1730 1734 if (!isHeldExclusively())
1731 1735 throw new IllegalMonitorStateException();
1732 1736 Node first = firstWaiter;
1733 1737 if (first != null)
1734 1738 doSignalAll(first);
1735 1739 }
1736 1740
1737 1741 /**
1738 1742 * Implements uninterruptible condition wait.
1739 1743 * <ol>
1740 1744 * <li> Save lock state returned by {@link #getState}.
1741 1745 * <li> Invoke {@link #release} with
1742 1746 * saved state as argument, throwing
1743 1747 * IllegalMonitorStateException if it fails.
1744 1748 * <li> Block until signalled.
1745 1749 * <li> Reacquire by invoking specialized version of
1746 1750 * {@link #acquire} with saved state as argument.
1747 1751 * </ol>
1748 1752 */
1749 1753 public final void awaitUninterruptibly() {
1750 1754 Node node = addConditionWaiter();
1751 1755 long savedState = fullyRelease(node);
1752 1756 boolean interrupted = false;
1753 1757 while (!isOnSyncQueue(node)) {
1754 1758 LockSupport.park(this);
1755 1759 if (Thread.interrupted())
1756 1760 interrupted = true;
1757 1761 }
1758 1762 if (acquireQueued(node, savedState) || interrupted)
1759 1763 selfInterrupt();
1760 1764 }
1761 1765
1762 1766 /*
1763 1767 * For interruptible waits, we need to track whether to throw
1764 1768 * InterruptedException, if interrupted while blocked on
1765 1769 * condition, versus reinterrupt current thread, if
1766 1770 * interrupted while blocked waiting to re-acquire.
1767 1771 */
1768 1772
1769 1773 /** Mode meaning to reinterrupt on exit from wait */
1770 1774 private static final int REINTERRUPT = 1;
1771 1775 /** Mode meaning to throw InterruptedException on exit from wait */
1772 1776 private static final int THROW_IE = -1;
1773 1777
1774 1778 /**
1775 1779 * Checks for interrupt, returning THROW_IE if interrupted
1776 1780 * before signalled, REINTERRUPT if after signalled, or
1777 1781 * 0 if not interrupted.
1778 1782 */
1779 1783 private int checkInterruptWhileWaiting(Node node) {
1780 1784 return Thread.interrupted() ?
1781 1785 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
1782 1786 0;
1783 1787 }
1784 1788
1785 1789 /**
1786 1790 * Throws InterruptedException, reinterrupts current thread, or
1787 1791 * does nothing, depending on mode.
1788 1792 */
1789 1793 private void reportInterruptAfterWait(int interruptMode)
1790 1794 throws InterruptedException {
1791 1795 if (interruptMode == THROW_IE)
1792 1796 throw new InterruptedException();
1793 1797 else if (interruptMode == REINTERRUPT)
1794 1798 selfInterrupt();
1795 1799 }
1796 1800
1797 1801 /**
1798 1802 * Implements interruptible condition wait.
1799 1803 * <ol>
1800 1804 * <li> If current thread is interrupted, throw InterruptedException.
1801 1805 * <li> Save lock state returned by {@link #getState}.
1802 1806 * <li> Invoke {@link #release} with
1803 1807 * saved state as argument, throwing
1804 1808 * IllegalMonitorStateException if it fails.
1805 1809 * <li> Block until signalled or interrupted.
1806 1810 * <li> Reacquire by invoking specialized version of
1807 1811 * {@link #acquire} with saved state as argument.
1808 1812 * <li> If interrupted while blocked in step 4, throw InterruptedException.
1809 1813 * </ol>
1810 1814 */
1811 1815 public final void await() throws InterruptedException {
1812 1816 if (Thread.interrupted())
1813 1817 throw new InterruptedException();
1814 1818 Node node = addConditionWaiter();
1815 1819 long savedState = fullyRelease(node);
1816 1820 int interruptMode = 0;
1817 1821 while (!isOnSyncQueue(node)) {
1818 1822 LockSupport.park(this);
1819 1823 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1820 1824 break;
1821 1825 }
1822 1826 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1823 1827 interruptMode = REINTERRUPT;
1824 1828 if (node.nextWaiter != null) // clean up if cancelled
1825 1829 unlinkCancelledWaiters();
1826 1830 if (interruptMode != 0)
1827 1831 reportInterruptAfterWait(interruptMode);
1828 1832 }
1829 1833
1830 1834 /**
1831 1835 * Implements timed condition wait.
1832 1836 * <ol>
1833 1837 * <li> If current thread is interrupted, throw InterruptedException.
↓ open down ↓ |
727 lines elided |
↑ open up ↑ |
1834 1838 * <li> Save lock state returned by {@link #getState}.
1835 1839 * <li> Invoke {@link #release} with
1836 1840 * saved state as argument, throwing
1837 1841 * IllegalMonitorStateException if it fails.
1838 1842 * <li> Block until signalled, interrupted, or timed out.
1839 1843 * <li> Reacquire by invoking specialized version of
1840 1844 * {@link #acquire} with saved state as argument.
1841 1845 * <li> If interrupted while blocked in step 4, throw InterruptedException.
1842 1846 * </ol>
1843 1847 */
1844 - public final long awaitNanos(long nanosTimeout) throws InterruptedException {
1848 + public final long awaitNanos(long nanosTimeout)
1849 + throws InterruptedException {
1845 1850 if (Thread.interrupted())
1846 1851 throw new InterruptedException();
1847 1852 Node node = addConditionWaiter();
1848 1853 long savedState = fullyRelease(node);
1849 1854 long lastTime = System.nanoTime();
1850 1855 int interruptMode = 0;
1851 1856 while (!isOnSyncQueue(node)) {
1852 1857 if (nanosTimeout <= 0L) {
1853 1858 transferAfterCancelledWait(node);
1854 1859 break;
1855 1860 }
1856 1861 LockSupport.parkNanos(this, nanosTimeout);
1857 1862 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1858 1863 break;
1859 1864
1860 1865 long now = System.nanoTime();
1861 1866 nanosTimeout -= now - lastTime;
1862 1867 lastTime = now;
1863 1868 }
1864 1869 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1865 1870 interruptMode = REINTERRUPT;
1866 1871 if (node.nextWaiter != null)
1867 1872 unlinkCancelledWaiters();
1868 1873 if (interruptMode != 0)
1869 1874 reportInterruptAfterWait(interruptMode);
1870 1875 return nanosTimeout - (System.nanoTime() - lastTime);
1871 1876 }
1872 1877
1873 1878 /**
1874 1879 * Implements absolute timed condition wait.
1875 1880 * <ol>
1876 1881 * <li> If current thread is interrupted, throw InterruptedException.
1877 1882 * <li> Save lock state returned by {@link #getState}.
↓ open down ↓ |
23 lines elided |
↑ open up ↑ |
1878 1883 * <li> Invoke {@link #release} with
1879 1884 * saved state as argument, throwing
1880 1885 * IllegalMonitorStateException if it fails.
1881 1886 * <li> Block until signalled, interrupted, or timed out.
1882 1887 * <li> Reacquire by invoking specialized version of
1883 1888 * {@link #acquire} with saved state as argument.
1884 1889 * <li> If interrupted while blocked in step 4, throw InterruptedException.
1885 1890 * <li> If timed out while blocked in step 4, return false, else true.
1886 1891 * </ol>
1887 1892 */
1888 - public final boolean awaitUntil(Date deadline) throws InterruptedException {
1893 + public final boolean awaitUntil(Date deadline)
1894 + throws InterruptedException {
1889 1895 if (deadline == null)
1890 1896 throw new NullPointerException();
1891 1897 long abstime = deadline.getTime();
1892 1898 if (Thread.interrupted())
1893 1899 throw new InterruptedException();
1894 1900 Node node = addConditionWaiter();
1895 1901 long savedState = fullyRelease(node);
1896 1902 boolean timedout = false;
1897 1903 int interruptMode = 0;
1898 1904 while (!isOnSyncQueue(node)) {
1899 1905 if (System.currentTimeMillis() > abstime) {
1900 1906 timedout = transferAfterCancelledWait(node);
1901 1907 break;
1902 1908 }
1903 1909 LockSupport.parkUntil(this, abstime);
1904 1910 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1905 1911 break;
1906 1912 }
1907 1913 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1908 1914 interruptMode = REINTERRUPT;
1909 1915 if (node.nextWaiter != null)
1910 1916 unlinkCancelledWaiters();
1911 1917 if (interruptMode != 0)
1912 1918 reportInterruptAfterWait(interruptMode);
1913 1919 return !timedout;
1914 1920 }
1915 1921
1916 1922 /**
1917 1923 * Implements timed condition wait.
1918 1924 * <ol>
1919 1925 * <li> If current thread is interrupted, throw InterruptedException.
1920 1926 * <li> Save lock state returned by {@link #getState}.
↓ open down ↓ |
22 lines elided |
↑ open up ↑ |
1921 1927 * <li> Invoke {@link #release} with
1922 1928 * saved state as argument, throwing
1923 1929 * IllegalMonitorStateException if it fails.
1924 1930 * <li> Block until signalled, interrupted, or timed out.
1925 1931 * <li> Reacquire by invoking specialized version of
1926 1932 * {@link #acquire} with saved state as argument.
1927 1933 * <li> If interrupted while blocked in step 4, throw InterruptedException.
1928 1934 * <li> If timed out while blocked in step 4, return false, else true.
1929 1935 * </ol>
1930 1936 */
1931 - public final boolean await(long time, TimeUnit unit) throws InterruptedException {
1937 + public final boolean await(long time, TimeUnit unit)
1938 + throws InterruptedException {
1932 1939 if (unit == null)
1933 1940 throw new NullPointerException();
1934 1941 long nanosTimeout = unit.toNanos(time);
1935 1942 if (Thread.interrupted())
1936 1943 throw new InterruptedException();
1937 1944 Node node = addConditionWaiter();
1938 1945 long savedState = fullyRelease(node);
1939 1946 long lastTime = System.nanoTime();
1940 1947 boolean timedout = false;
1941 1948 int interruptMode = 0;
1942 1949 while (!isOnSyncQueue(node)) {
1943 1950 if (nanosTimeout <= 0L) {
1944 1951 timedout = transferAfterCancelledWait(node);
1945 1952 break;
1946 1953 }
1947 1954 if (nanosTimeout >= spinForTimeoutThreshold)
1948 1955 LockSupport.parkNanos(this, nanosTimeout);
1949 1956 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1950 1957 break;
1951 1958 long now = System.nanoTime();
1952 1959 nanosTimeout -= now - lastTime;
1953 1960 lastTime = now;
1954 1961 }
1955 1962 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1956 1963 interruptMode = REINTERRUPT;
1957 1964 if (node.nextWaiter != null)
1958 1965 unlinkCancelledWaiters();
1959 1966 if (interruptMode != 0)
1960 1967 reportInterruptAfterWait(interruptMode);
1961 1968 return !timedout;
1962 1969 }
1963 1970
1964 1971 // support for instrumentation
1965 1972
1966 1973 /**
1967 1974 * Returns true if this condition was created by the given
1968 1975 * synchronization object.
1969 1976 *
1970 1977 * @return {@code true} if owned
1971 1978 */
1972 1979 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) {
1973 1980 return sync == AbstractQueuedLongSynchronizer.this;
1974 1981 }
1975 1982
1976 1983 /**
1977 1984 * Queries whether any threads are waiting on this condition.
1978 1985 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters}.
1979 1986 *
1980 1987 * @return {@code true} if there are any waiting threads
1981 1988 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1982 1989 * returns {@code false}
1983 1990 */
1984 1991 protected final boolean hasWaiters() {
1985 1992 if (!isHeldExclusively())
1986 1993 throw new IllegalMonitorStateException();
1987 1994 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1988 1995 if (w.waitStatus == Node.CONDITION)
1989 1996 return true;
1990 1997 }
1991 1998 return false;
1992 1999 }
1993 2000
1994 2001 /**
1995 2002 * Returns an estimate of the number of threads waiting on
1996 2003 * this condition.
1997 2004 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength}.
1998 2005 *
1999 2006 * @return the estimated number of waiting threads
2000 2007 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2001 2008 * returns {@code false}
2002 2009 */
2003 2010 protected final int getWaitQueueLength() {
2004 2011 if (!isHeldExclusively())
2005 2012 throw new IllegalMonitorStateException();
2006 2013 int n = 0;
2007 2014 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2008 2015 if (w.waitStatus == Node.CONDITION)
2009 2016 ++n;
2010 2017 }
2011 2018 return n;
2012 2019 }
2013 2020
2014 2021 /**
2015 2022 * Returns a collection containing those threads that may be
2016 2023 * waiting on this Condition.
2017 2024 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads}.
2018 2025 *
2019 2026 * @return the collection of threads
2020 2027 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
2021 2028 * returns {@code false}
2022 2029 */
2023 2030 protected final Collection<Thread> getWaitingThreads() {
2024 2031 if (!isHeldExclusively())
2025 2032 throw new IllegalMonitorStateException();
2026 2033 ArrayList<Thread> list = new ArrayList<Thread>();
2027 2034 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
2028 2035 if (w.waitStatus == Node.CONDITION) {
2029 2036 Thread t = w.thread;
2030 2037 if (t != null)
2031 2038 list.add(t);
2032 2039 }
2033 2040 }
2034 2041 return list;
2035 2042 }
2036 2043 }
2037 2044
2038 2045 /**
2039 2046 * Setup to support compareAndSet. We need to natively implement
2040 2047 * this here: For the sake of permitting future enhancements, we
2041 2048 * cannot explicitly subclass AtomicLong, which would be
2042 2049 * efficient and useful otherwise. So, as the lesser of evils, we
2043 2050 * natively implement using hotspot intrinsics API. And while we
2044 2051 * are at it, we do the same for other CASable fields (which could
2045 2052 * otherwise be done with atomic field updaters).
2046 2053 */
2047 2054 private static final Unsafe unsafe = Unsafe.getUnsafe();
2048 2055 private static final long stateOffset;
2049 2056 private static final long headOffset;
2050 2057 private static final long tailOffset;
2051 2058 private static final long waitStatusOffset;
2052 2059 private static final long nextOffset;
2053 2060
2054 2061 static {
2055 2062 try {
2056 2063 stateOffset = unsafe.objectFieldOffset
2057 2064 (AbstractQueuedLongSynchronizer.class.getDeclaredField("state"));
2058 2065 headOffset = unsafe.objectFieldOffset
2059 2066 (AbstractQueuedLongSynchronizer.class.getDeclaredField("head"));
2060 2067 tailOffset = unsafe.objectFieldOffset
2061 2068 (AbstractQueuedLongSynchronizer.class.getDeclaredField("tail"));
2062 2069 waitStatusOffset = unsafe.objectFieldOffset
2063 2070 (Node.class.getDeclaredField("waitStatus"));
2064 2071 nextOffset = unsafe.objectFieldOffset
2065 2072 (Node.class.getDeclaredField("next"));
2066 2073
2067 2074 } catch (Exception ex) { throw new Error(ex); }
2068 2075 }
2069 2076
2070 2077 /**
2071 2078 * CAS head field. Used only by enq.
2072 2079 */
2073 2080 private final boolean compareAndSetHead(Node update) {
2074 2081 return unsafe.compareAndSwapObject(this, headOffset, null, update);
2075 2082 }
2076 2083
↓ open down ↓ |
135 lines elided |
↑ open up ↑ |
2077 2084 /**
2078 2085 * CAS tail field. Used only by enq.
2079 2086 */
2080 2087 private final boolean compareAndSetTail(Node expect, Node update) {
2081 2088 return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
2082 2089 }
2083 2090
2084 2091 /**
2085 2092 * CAS waitStatus field of a node.
2086 2093 */
2087 - private final static boolean compareAndSetWaitStatus(Node node,
2094 + private static final boolean compareAndSetWaitStatus(Node node,
2088 2095 int expect,
2089 2096 int update) {
2090 2097 return unsafe.compareAndSwapInt(node, waitStatusOffset,
2091 2098 expect, update);
2092 2099 }
2093 2100
2094 2101 /**
2095 2102 * CAS next field of a node.
2096 2103 */
2097 - private final static boolean compareAndSetNext(Node node,
2104 + private static final boolean compareAndSetNext(Node node,
2098 2105 Node expect,
2099 2106 Node update) {
2100 2107 return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
2101 2108 }
2102 2109 }
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX