1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent.locks; 37 38 import java.util.ArrayList; 39 import java.util.Collection; 40 import java.util.Date; 41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.ForkJoinPool; 43 import java.util.concurrent.RejectedExecutionException; 44 import jdk.internal.misc.Unsafe; 45 46 /** 47 * A version of {@link AbstractQueuedSynchronizer} in 48 * which synchronization state is maintained as a {@code long}. 49 * This class has exactly the same structure, properties, and methods 50 * as {@code AbstractQueuedSynchronizer} with the exception 51 * that all state-related parameters and results are defined 52 * as {@code long} rather than {@code int}. This class 53 * may be useful when creating synchronizers such as 54 * multilevel locks and barriers that require 55 * 64 bits of state. 56 * 57 * <p>See {@link AbstractQueuedSynchronizer} for usage 58 * notes and examples. 59 * 60 * @since 1.6 61 * @author Doug Lea 62 */ 63 public abstract class AbstractQueuedLongSynchronizer 64 extends AbstractOwnableSynchronizer 65 implements java.io.Serializable { 66 67 private static final long serialVersionUID = 7373984972572414692L; 68 69 /** 70 * Constructor for subclasses to call. 71 */ 72 public AbstractQueuedLongSynchronizer() {} 73 74 /* 75 * To keep sources in sync, the remainder of this source file is 76 * exactly cloned from AbstractQueuedSynchronizer, replacing class 77 * name and changing ints related with sync state to longs. Please 78 * keep it that way. 79 */ 80 81 // Node status bits, also used as argument and return values 82 static final int WAITING = 1; // must be 1 83 static final int CANCELLED = 0x80000000; // must be negative 84 static final int COND = 2; // in a condition wait 85 86 /** CLH Nodes */ 87 abstract static class Node { 88 volatile Node prev; // initially attached via casTail 89 volatile Node next; // visibly nonnull when signallable 90 Thread waiter; // visibly nonnull when enqueued 91 volatile int status; // written by owner, atomic bit ops by others 92 93 // methods for atomic operations 94 final boolean casPrev(Node c, Node v) { // for cleanQueue 95 return U.weakCompareAndSetReference(this, PREV, c, v); 96 } 97 final boolean casNext(Node c, Node v) { // for cleanQueue 98 return U.weakCompareAndSetReference(this, NEXT, c, v); 99 } 100 final int getAndUnsetStatus(int v) { // for signalling 101 return U.getAndBitwiseAndInt(this, STATUS, ~v); 102 } 103 final void setPrevRelaxed(Node p) { // for off-queue assignment 104 U.putReference(this, PREV, p); 105 } 106 final void setStatusRelaxed(int s) { // for off-queue assignment 107 U.putInt(this, STATUS, s); 108 } 109 final void clearStatus() { // for reducing unneeded signals 110 U.putIntOpaque(this, STATUS, 0); 111 } 112 113 private static final long STATUS 114 = U.objectFieldOffset(Node.class, "status"); 115 private static final long NEXT 116 = U.objectFieldOffset(Node.class, "next"); 117 private static final long PREV 118 = U.objectFieldOffset(Node.class, "prev"); 119 } 120 121 // Concrete classes tagged by type 122 static final class ExclusiveNode extends Node { } 123 static final class SharedNode extends Node { } 124 125 static final class ConditionNode extends Node 126 implements ForkJoinPool.ManagedBlocker { 127 ConditionNode nextWaiter; // link to next waiting node 128 129 /** 130 * Allows Conditions to be used in ForkJoinPools without 131 * risking fixed pool exhaustion. This is usable only for 132 * untimed Condition waits, not timed versions. 133 */ 134 public final boolean isReleasable() { 135 return status <= 1 || Thread.currentThread().isInterrupted(); 136 } 137 138 public final boolean block() { 139 while (!isReleasable()) LockSupport.park(); 140 return true; 141 } 142 } 143 144 /** 145 * Head of the wait queue, lazily initialized. 146 */ 147 private transient volatile Node head; 148 149 /** 150 * Tail of the wait queue. After initialization, modified only via casTail. 151 */ 152 private transient volatile Node tail; 153 154 /** 155 * The synchronization state. 156 */ 157 private volatile long state; 158 159 /** 160 * Returns the current value of synchronization state. 161 * This operation has memory semantics of a {@code volatile} read. 162 * @return current state value 163 */ 164 protected final long getState() { 165 return state; 166 } 167 168 /** 169 * Sets the value of synchronization state. 170 * This operation has memory semantics of a {@code volatile} write. 171 * @param newState the new state value 172 */ 173 protected final void setState(long newState) { 174 state = newState; 175 } 176 177 /** 178 * Atomically sets synchronization state to the given updated 179 * value if the current state value equals the expected value. 180 * This operation has memory semantics of a {@code volatile} read 181 * and write. 182 * 183 * @param expect the expected value 184 * @param update the new value 185 * @return {@code true} if successful. False return indicates that the actual 186 * value was not equal to the expected value. 187 */ 188 protected final boolean compareAndSetState(long expect, long update) { 189 return U.compareAndSetLong(this, STATE, expect, update); 190 } 191 192 // Queuing utilities 193 194 private boolean casTail(Node c, Node v) { 195 return U.compareAndSetReference(this, TAIL, c, v); 196 } 197 198 /** tries once to CAS a new dummy node for head */ 199 private void tryInitializeHead() { 200 Node h = new ExclusiveNode(); 201 if (U.compareAndSetReference(this, HEAD, null, h)) 202 tail = h; 203 } 204 205 /** 206 * Enqueues the node unless null. (Currently used only for 207 * ConditionNodes; other cases are interleaved with acquires.) 208 */ 209 final void enqueue(Node node) { 210 if (node != null) { 211 for (;;) { 212 Node t = tail; 213 node.setPrevRelaxed(t); // avoid unnecessary fence 214 if (t == null) // initialize 215 tryInitializeHead(); 216 else if (casTail(t, node)) { 217 t.next = node; 218 if (t.status < 0) // wake up to clean link 219 LockSupport.unpark(node.waiter); 220 break; 221 } 222 } 223 } 224 } 225 226 /** Returns true if node is found in traversal from tail */ 227 final boolean isEnqueued(Node node) { 228 for (Node t = tail; t != null; t = t.prev) 229 if (t == node) 230 return true; 231 return false; 232 } 233 234 /** 235 * Wakes up the successor of given node, if one exists, and unsets its 236 * WAITING status to avoid park race. This may fail to wake up an 237 * eligible thread when one or more have been cancelled, but 238 * cancelAcquire ensures liveness. 239 */ 240 private static void signalNext(Node h) { 241 Node s; 242 if (h != null && (s = h.next) != null && s.status != 0) { 243 s.getAndUnsetStatus(WAITING); 244 LockSupport.unpark(s.waiter); 245 } 246 } 247 248 /** Wakes up the given node if in shared mode */ 249 private static void signalNextIfShared(Node h) { 250 Node s; 251 if (h != null && (s = h.next) != null && 252 (s instanceof SharedNode) && s.status != 0) { 253 s.getAndUnsetStatus(WAITING); 254 LockSupport.unpark(s.waiter); 255 } 256 } 257 258 /** 259 * Main acquire method, invoked by all exported acquire methods. 260 * 261 * @param node null unless a reacquiring Condition 262 * @param arg the acquire argument 263 * @param shared true if shared mode else exclusive 264 * @param interruptible if abort and return negative on interrupt 265 * @param timed if true use timed waits 266 * @param time if timed, the System.nanoTime value to timeout 267 * @return positive if acquired, 0 if timed out, negative if interrupted 268 */ 269 final int acquire(Node node, long arg, boolean shared, 270 boolean interruptible, boolean timed, long time) { 271 Thread current = Thread.currentThread(); 272 byte spins = 0, postSpins = 0; // retries upon unpark of first thread 273 boolean interrupted = false, first = false; 274 Node pred = null; // predecessor of node when enqueued 275 276 /* 277 * Repeatedly: 278 * Check if node now first 279 * if so, ensure head stable, else ensure valid predecessor 280 * if node is first or not yet enqueued, try acquiring 281 * else if node not yet created, create it 282 * else if not yet enqueued, try once to enqueue 283 * else if woken from park, retry (up to postSpins times) 284 * else if WAITING status not set, set and retry 285 * else park and clear WAITING status, and check cancellation 286 */ 287 288 for (;;) { 289 if (!first && (pred = (node == null) ? null : node.prev) != null && 290 !(first = (head == pred))) { 291 if (pred.status < 0) { 292 cleanQueue(); // predecessor cancelled 293 continue; 294 } else if (pred.prev == null) { 295 Thread.onSpinWait(); // ensure serialization 296 continue; 297 } 298 } 299 if (first || pred == null) { 300 boolean acquired; 301 try { 302 if (shared) 303 acquired = (tryAcquireShared(arg) >= 0); 304 else 305 acquired = tryAcquire(arg); 306 } catch (Throwable ex) { 307 cancelAcquire(node, interrupted, false); 308 throw ex; 309 } 310 if (acquired) { 311 if (first) { 312 node.prev = null; 313 head = node; 314 pred.next = null; 315 node.waiter = null; 316 if (shared) 317 signalNextIfShared(node); 318 if (interrupted) 319 current.interrupt(); 320 } 321 return 1; 322 } 323 } 324 if (node == null) { // allocate; retry before enqueue 325 if (shared) 326 node = new SharedNode(); 327 else 328 node = new ExclusiveNode(); 329 } else if (pred == null) { // try to enqueue 330 node.waiter = current; 331 Node t = tail; 332 node.setPrevRelaxed(t); // avoid unnecessary fence 333 if (t == null) 334 tryInitializeHead(); 335 else if (!casTail(t, node)) 336 node.setPrevRelaxed(null); // back out 337 else 338 t.next = node; 339 } else if (first && spins != 0) { 340 --spins; // reduce unfairness on rewaits 341 Thread.onSpinWait(); 342 } else if (node.status == 0) { 343 node.status = WAITING; // enable signal and recheck 344 } else { 345 long nanos; 346 spins = postSpins = (byte)((postSpins << 1) | 1); 347 if (!timed) 348 LockSupport.park(this); 349 else if ((nanos = time - System.nanoTime()) > 0L) 350 LockSupport.parkNanos(this, nanos); 351 else 352 break; 353 node.clearStatus(); 354 if ((interrupted |= Thread.interrupted()) && interruptible) 355 break; 356 } 357 } 358 return cancelAcquire(node, interrupted, interruptible); 359 } 360 361 /** 362 * Possibly repeatedly traverses from tail, unsplicing cancelled 363 * nodes until none are found. 364 */ 365 private void cleanQueue() { 366 for (;;) { // restart point 367 for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples 368 if (q == null || (p = q.prev) == null) 369 return; // end of list 370 if (s == null ? tail != q : (s.prev != q || s.status < 0)) 371 break; // inconsistent 372 if (q.status < 0) { // cancelled 373 if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && 374 q.prev == p) { 375 p.casNext(q, s); // OK if fails 376 if (p.prev == null) 377 signalNext(p); 378 } 379 break; 380 } 381 if ((n = p.next) != q) { // help finish 382 if (n != null && q.prev == p) { 383 p.casNext(n, q); 384 if (p.prev == null) 385 signalNext(p); 386 } 387 break; 388 } 389 s = q; 390 q = q.prev; 391 } 392 } 393 } 394 395 /** 396 * Cancels an ongoing attempt to acquire. 397 * 398 * @param node the node (may be null if cancelled before enqueuing) 399 * @param interrupted true if thread interrupted 400 * @param interruptible if should report interruption vs reset 401 */ 402 private int cancelAcquire(Node node, boolean interrupted, 403 boolean interruptible) { 404 if (node != null) { 405 node.waiter = null; 406 node.status = CANCELLED; 407 if (node.prev != null) 408 cleanQueue(); 409 } 410 if (interrupted) { 411 if (interruptible) 412 return CANCELLED; 413 else 414 Thread.currentThread().interrupt(); 415 } 416 return 0; 417 } 418 419 // Main exported methods 420 421 /** 422 * Attempts to acquire in exclusive mode. This method should query 423 * if the state of the object permits it to be acquired in the 424 * exclusive mode, and if so to acquire it. 425 * 426 * <p>This method is always invoked by the thread performing 427 * acquire. If this method reports failure, the acquire method 428 * may queue the thread, if it is not already queued, until it is 429 * signalled by a release from some other thread. This can be used 430 * to implement method {@link Lock#tryLock()}. 431 * 432 * <p>The default 433 * implementation throws {@link UnsupportedOperationException}. 434 * 435 * @param arg the acquire argument. This value is always the one 436 * passed to an acquire method, or is the value saved on entry 437 * to a condition wait. The value is otherwise uninterpreted 438 * and can represent anything you like. 439 * @return {@code true} if successful. Upon success, this object has 440 * been acquired. 441 * @throws IllegalMonitorStateException if acquiring would place this 442 * synchronizer in an illegal state. This exception must be 443 * thrown in a consistent fashion for synchronization to work 444 * correctly. 445 * @throws UnsupportedOperationException if exclusive mode is not supported 446 */ 447 protected boolean tryAcquire(long arg) { 448 throw new UnsupportedOperationException(); 449 } 450 451 /** 452 * Attempts to set the state to reflect a release in exclusive 453 * mode. 454 * 455 * <p>This method is always invoked by the thread performing release. 456 * 457 * <p>The default implementation throws 458 * {@link UnsupportedOperationException}. 459 * 460 * @param arg the release argument. This value is always the one 461 * passed to a release method, or the current state value upon 462 * entry to a condition wait. The value is otherwise 463 * uninterpreted and can represent anything you like. 464 * @return {@code true} if this object is now in a fully released 465 * state, so that any waiting threads may attempt to acquire; 466 * and {@code false} otherwise. 467 * @throws IllegalMonitorStateException if releasing would place this 468 * synchronizer in an illegal state. This exception must be 469 * thrown in a consistent fashion for synchronization to work 470 * correctly. 471 * @throws UnsupportedOperationException if exclusive mode is not supported 472 */ 473 protected boolean tryRelease(long arg) { 474 throw new UnsupportedOperationException(); 475 } 476 477 /** 478 * Attempts to acquire in shared mode. This method should query if 479 * the state of the object permits it to be acquired in the shared 480 * mode, and if so to acquire it. 481 * 482 * <p>This method is always invoked by the thread performing 483 * acquire. If this method reports failure, the acquire method 484 * may queue the thread, if it is not already queued, until it is 485 * signalled by a release from some other thread. 486 * 487 * <p>The default implementation throws {@link 488 * UnsupportedOperationException}. 489 * 490 * @param arg the acquire argument. This value is always the one 491 * passed to an acquire method, or is the value saved on entry 492 * to a condition wait. The value is otherwise uninterpreted 493 * and can represent anything you like. 494 * @return a negative value on failure; zero if acquisition in shared 495 * mode succeeded but no subsequent shared-mode acquire can 496 * succeed; and a positive value if acquisition in shared 497 * mode succeeded and subsequent shared-mode acquires might 498 * also succeed, in which case a subsequent waiting thread 499 * must check availability. (Support for three different 500 * return values enables this method to be used in contexts 501 * where acquires only sometimes act exclusively.) Upon 502 * success, this object has been acquired. 503 * @throws IllegalMonitorStateException if acquiring would place this 504 * synchronizer in an illegal state. This exception must be 505 * thrown in a consistent fashion for synchronization to work 506 * correctly. 507 * @throws UnsupportedOperationException if shared mode is not supported 508 */ 509 protected long tryAcquireShared(long arg) { 510 throw new UnsupportedOperationException(); 511 } 512 513 /** 514 * Attempts to set the state to reflect a release in shared mode. 515 * 516 * <p>This method is always invoked by the thread performing release. 517 * 518 * <p>The default implementation throws 519 * {@link UnsupportedOperationException}. 520 * 521 * @param arg the release argument. This value is always the one 522 * passed to a release method, or the current state value upon 523 * entry to a condition wait. The value is otherwise 524 * uninterpreted and can represent anything you like. 525 * @return {@code true} if this release of shared mode may permit a 526 * waiting acquire (shared or exclusive) to succeed; and 527 * {@code false} otherwise 528 * @throws IllegalMonitorStateException if releasing would place this 529 * synchronizer in an illegal state. This exception must be 530 * thrown in a consistent fashion for synchronization to work 531 * correctly. 532 * @throws UnsupportedOperationException if shared mode is not supported 533 */ 534 protected boolean tryReleaseShared(long arg) { 535 throw new UnsupportedOperationException(); 536 } 537 538 /** 539 * Returns {@code true} if synchronization is held exclusively with 540 * respect to the current (calling) thread. This method is invoked 541 * upon each call to a {@link ConditionObject} method. 542 * 543 * <p>The default implementation throws {@link 544 * UnsupportedOperationException}. This method is invoked 545 * internally only within {@link ConditionObject} methods, so need 546 * not be defined if conditions are not used. 547 * 548 * @return {@code true} if synchronization is held exclusively; 549 * {@code false} otherwise 550 * @throws UnsupportedOperationException if conditions are not supported 551 */ 552 protected boolean isHeldExclusively() { 553 throw new UnsupportedOperationException(); 554 } 555 556 /** 557 * Acquires in exclusive mode, ignoring interrupts. Implemented 558 * by invoking at least once {@link #tryAcquire}, 559 * returning on success. Otherwise the thread is queued, possibly 560 * repeatedly blocking and unblocking, invoking {@link 561 * #tryAcquire} until success. This method can be used 562 * to implement method {@link Lock#lock}. 563 * 564 * @param arg the acquire argument. This value is conveyed to 565 * {@link #tryAcquire} but is otherwise uninterpreted and 566 * can represent anything you like. 567 */ 568 public final void acquire(long arg) { 569 if (!tryAcquire(arg)) 570 acquire(null, arg, false, false, false, 0L); 571 } 572 573 /** 574 * Acquires in exclusive mode, aborting if interrupted. 575 * Implemented by first checking interrupt status, then invoking 576 * at least once {@link #tryAcquire}, returning on 577 * success. Otherwise the thread is queued, possibly repeatedly 578 * blocking and unblocking, invoking {@link #tryAcquire} 579 * until success or the thread is interrupted. This method can be 580 * used to implement method {@link Lock#lockInterruptibly}. 581 * 582 * @param arg the acquire argument. This value is conveyed to 583 * {@link #tryAcquire} but is otherwise uninterpreted and 584 * can represent anything you like. 585 * @throws InterruptedException if the current thread is interrupted 586 */ 587 public final void acquireInterruptibly(long arg) 588 throws InterruptedException { 589 if (Thread.interrupted() || 590 (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) 591 throw new InterruptedException(); 592 } 593 594 /** 595 * Attempts to acquire in exclusive mode, aborting if interrupted, 596 * and failing if the given timeout elapses. Implemented by first 597 * checking interrupt status, then invoking at least once {@link 598 * #tryAcquire}, returning on success. Otherwise, the thread is 599 * queued, possibly repeatedly blocking and unblocking, invoking 600 * {@link #tryAcquire} until success or the thread is interrupted 601 * or the timeout elapses. This method can be used to implement 602 * method {@link Lock#tryLock(long, TimeUnit)}. 603 * 604 * @param arg the acquire argument. This value is conveyed to 605 * {@link #tryAcquire} but is otherwise uninterpreted and 606 * can represent anything you like. 607 * @param nanosTimeout the maximum number of nanoseconds to wait 608 * @return {@code true} if acquired; {@code false} if timed out 609 * @throws InterruptedException if the current thread is interrupted 610 */ 611 public final boolean tryAcquireNanos(long arg, long nanosTimeout) 612 throws InterruptedException { 613 if (!Thread.interrupted()) { 614 if (tryAcquire(arg)) 615 return true; 616 if (nanosTimeout <= 0L) 617 return false; 618 int stat = acquire(null, arg, false, true, true, 619 System.nanoTime() + nanosTimeout); 620 if (stat > 0) 621 return true; 622 if (stat == 0) 623 return false; 624 } 625 throw new InterruptedException(); 626 } 627 628 /** 629 * Releases in exclusive mode. Implemented by unblocking one or 630 * more threads if {@link #tryRelease} returns true. 631 * This method can be used to implement method {@link Lock#unlock}. 632 * 633 * @param arg the release argument. This value is conveyed to 634 * {@link #tryRelease} but is otherwise uninterpreted and 635 * can represent anything you like. 636 * @return the value returned from {@link #tryRelease} 637 */ 638 public final boolean release(long arg) { 639 if (tryRelease(arg)) { 640 signalNext(head); 641 return true; 642 } 643 return false; 644 } 645 646 /** 647 * Acquires in shared mode, ignoring interrupts. Implemented by 648 * first invoking at least once {@link #tryAcquireShared}, 649 * returning on success. Otherwise the thread is queued, possibly 650 * repeatedly blocking and unblocking, invoking {@link 651 * #tryAcquireShared} until success. 652 * 653 * @param arg the acquire argument. This value is conveyed to 654 * {@link #tryAcquireShared} but is otherwise uninterpreted 655 * and can represent anything you like. 656 */ 657 public final void acquireShared(long arg) { 658 if (tryAcquireShared(arg) < 0) 659 acquire(null, arg, true, false, false, 0L); 660 } 661 662 /** 663 * Acquires in shared mode, aborting if interrupted. Implemented 664 * by first checking interrupt status, then invoking at least once 665 * {@link #tryAcquireShared}, returning on success. Otherwise the 666 * thread is queued, possibly repeatedly blocking and unblocking, 667 * invoking {@link #tryAcquireShared} until success or the thread 668 * is interrupted. 669 * @param arg the acquire argument. 670 * This value is conveyed to {@link #tryAcquireShared} but is 671 * otherwise uninterpreted and can represent anything 672 * you like. 673 * @throws InterruptedException if the current thread is interrupted 674 */ 675 public final void acquireSharedInterruptibly(long arg) 676 throws InterruptedException { 677 if (Thread.interrupted() || 678 (tryAcquireShared(arg) < 0 && 679 acquire(null, arg, true, true, false, 0L) < 0)) 680 throw new InterruptedException(); 681 } 682 683 /** 684 * Attempts to acquire in shared mode, aborting if interrupted, and 685 * failing if the given timeout elapses. Implemented by first 686 * checking interrupt status, then invoking at least once {@link 687 * #tryAcquireShared}, returning on success. Otherwise, the 688 * thread is queued, possibly repeatedly blocking and unblocking, 689 * invoking {@link #tryAcquireShared} until success or the thread 690 * is interrupted or the timeout elapses. 691 * 692 * @param arg the acquire argument. This value is conveyed to 693 * {@link #tryAcquireShared} but is otherwise uninterpreted 694 * and can represent anything you like. 695 * @param nanosTimeout the maximum number of nanoseconds to wait 696 * @return {@code true} if acquired; {@code false} if timed out 697 * @throws InterruptedException if the current thread is interrupted 698 */ 699 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) 700 throws InterruptedException { 701 if (!Thread.interrupted()) { 702 if (tryAcquireShared(arg) >= 0) 703 return true; 704 if (nanosTimeout <= 0L) 705 return false; 706 int stat = acquire(null, arg, true, true, true, 707 System.nanoTime() + nanosTimeout); 708 if (stat > 0) 709 return true; 710 if (stat == 0) 711 return false; 712 } 713 throw new InterruptedException(); 714 } 715 716 /** 717 * Releases in shared mode. Implemented by unblocking one or more 718 * threads if {@link #tryReleaseShared} returns true. 719 * 720 * @param arg the release argument. This value is conveyed to 721 * {@link #tryReleaseShared} but is otherwise uninterpreted 722 * and can represent anything you like. 723 * @return the value returned from {@link #tryReleaseShared} 724 */ 725 public final boolean releaseShared(long arg) { 726 if (tryReleaseShared(arg)) { 727 signalNext(head); 728 return true; 729 } 730 return false; 731 } 732 733 // Queue inspection methods 734 735 /** 736 * Queries whether any threads are waiting to acquire. Note that 737 * because cancellations due to interrupts and timeouts may occur 738 * at any time, a {@code true} return does not guarantee that any 739 * other thread will ever acquire. 740 * 741 * @return {@code true} if there may be other threads waiting to acquire 742 */ 743 public final boolean hasQueuedThreads() { 744 for (Node p = tail, h = head; p != h && p != null; p = p.prev) 745 if (p.status >= 0) 746 return true; 747 return false; 748 } 749 750 /** 751 * Queries whether any threads have ever contended to acquire this 752 * synchronizer; that is, if an acquire method has ever blocked. 753 * 754 * <p>In this implementation, this operation returns in 755 * constant time. 756 * 757 * @return {@code true} if there has ever been contention 758 */ 759 public final boolean hasContended() { 760 return head != null; 761 } 762 763 /** 764 * Returns the first (longest-waiting) thread in the queue, or 765 * {@code null} if no threads are currently queued. 766 * 767 * <p>In this implementation, this operation normally returns in 768 * constant time, but may iterate upon contention if other threads are 769 * concurrently modifying the queue. 770 * 771 * @return the first (longest-waiting) thread in the queue, or 772 * {@code null} if no threads are currently queued 773 */ 774 public final Thread getFirstQueuedThread() { 775 Thread first = null, w; Node h, s; 776 if ((h = head) != null && ((s = h.next) == null || 777 (first = s.waiter) == null || 778 s.prev == null)) { 779 // traverse from tail on stale reads 780 for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) 781 if ((w = p.waiter) != null) 782 first = w; 783 } 784 return first; 785 } 786 787 /** 788 * Returns true if the given thread is currently queued. 789 * 790 * <p>This implementation traverses the queue to determine 791 * presence of the given thread. 792 * 793 * @param thread the thread 794 * @return {@code true} if the given thread is on the queue 795 * @throws NullPointerException if the thread is null 796 */ 797 public final boolean isQueued(Thread thread) { 798 if (thread == null) 799 throw new NullPointerException(); 800 for (Node p = tail; p != null; p = p.prev) 801 if (p.waiter == thread) 802 return true; 803 return false; 804 } 805 806 /** 807 * Returns {@code true} if the apparent first queued thread, if one 808 * exists, is waiting in exclusive mode. If this method returns 809 * {@code true}, and the current thread is attempting to acquire in 810 * shared mode (that is, this method is invoked from {@link 811 * #tryAcquireShared}) then it is guaranteed that the current thread 812 * is not the first queued thread. Used only as a heuristic in 813 * ReentrantReadWriteLock. 814 */ 815 final boolean apparentlyFirstQueuedIsExclusive() { 816 Node h, s; 817 return (h = head) != null && (s = h.next) != null && 818 !(s instanceof SharedNode) && s.waiter != null; 819 } 820 821 /** 822 * Queries whether any threads have been waiting to acquire longer 823 * than the current thread. 824 * 825 * <p>An invocation of this method is equivalent to (but may be 826 * more efficient than): 827 * <pre> {@code 828 * getFirstQueuedThread() != Thread.currentThread() 829 * && hasQueuedThreads()}</pre> 830 * 831 * <p>Note that because cancellations due to interrupts and 832 * timeouts may occur at any time, a {@code true} return does not 833 * guarantee that some other thread will acquire before the current 834 * thread. Likewise, it is possible for another thread to win a 835 * race to enqueue after this method has returned {@code false}, 836 * due to the queue being empty. 837 * 838 * <p>This method is designed to be used by a fair synchronizer to 839 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 840 * Such a synchronizer's {@link #tryAcquire} method should return 841 * {@code false}, and its {@link #tryAcquireShared} method should 842 * return a negative value, if this method returns {@code true} 843 * (unless this is a reentrant acquire). For example, the {@code 844 * tryAcquire} method for a fair, reentrant, exclusive mode 845 * synchronizer might look like this: 846 * 847 * <pre> {@code 848 * protected boolean tryAcquire(long arg) { 849 * if (isHeldExclusively()) { 850 * // A reentrant acquire; increment hold count 851 * return true; 852 * } else if (hasQueuedPredecessors()) { 853 * return false; 854 * } else { 855 * // try to acquire normally 856 * } 857 * }}</pre> 858 * 859 * @return {@code true} if there is a queued thread preceding the 860 * current thread, and {@code false} if the current thread 861 * is at the head of the queue or the queue is empty 862 * @since 1.7 863 */ 864 public final boolean hasQueuedPredecessors() { 865 Thread first = null; Node h, s; 866 if ((h = head) != null && ((s = h.next) == null || 867 (first = s.waiter) == null || 868 s.prev == null)) 869 first = getFirstQueuedThread(); // retry via getFirstQueuedThread 870 return first != null && first != Thread.currentThread(); 871 } 872 873 // Instrumentation and monitoring methods 874 875 /** 876 * Returns an estimate of the number of threads waiting to 877 * acquire. The value is only an estimate because the number of 878 * threads may change dynamically while this method traverses 879 * internal data structures. This method is designed for use in 880 * monitoring system state, not for synchronization control. 881 * 882 * @return the estimated number of threads waiting to acquire 883 */ 884 public final int getQueueLength() { 885 int n = 0; 886 for (Node p = tail; p != null; p = p.prev) { 887 if (p.waiter != null) 888 ++n; 889 } 890 return n; 891 } 892 893 /** 894 * Returns a collection containing threads that may be waiting to 895 * acquire. Because the actual set of threads may change 896 * dynamically while constructing this result, the returned 897 * collection is only a best-effort estimate. The elements of the 898 * returned collection are in no particular order. This method is 899 * designed to facilitate construction of subclasses that provide 900 * more extensive monitoring facilities. 901 * 902 * @return the collection of threads 903 */ 904 public final Collection<Thread> getQueuedThreads() { 905 ArrayList<Thread> list = new ArrayList<>(); 906 for (Node p = tail; p != null; p = p.prev) { 907 Thread t = p.waiter; 908 if (t != null) 909 list.add(t); 910 } 911 return list; 912 } 913 914 /** 915 * Returns a collection containing threads that may be waiting to 916 * acquire in exclusive mode. This has the same properties 917 * as {@link #getQueuedThreads} except that it only returns 918 * those threads waiting due to an exclusive acquire. 919 * 920 * @return the collection of threads 921 */ 922 public final Collection<Thread> getExclusiveQueuedThreads() { 923 ArrayList<Thread> list = new ArrayList<>(); 924 for (Node p = tail; p != null; p = p.prev) { 925 if (!(p instanceof SharedNode)) { 926 Thread t = p.waiter; 927 if (t != null) 928 list.add(t); 929 } 930 } 931 return list; 932 } 933 934 /** 935 * Returns a collection containing threads that may be waiting to 936 * acquire in shared mode. This has the same properties 937 * as {@link #getQueuedThreads} except that it only returns 938 * those threads waiting due to a shared acquire. 939 * 940 * @return the collection of threads 941 */ 942 public final Collection<Thread> getSharedQueuedThreads() { 943 ArrayList<Thread> list = new ArrayList<>(); 944 for (Node p = tail; p != null; p = p.prev) { 945 if (p instanceof SharedNode) { 946 Thread t = p.waiter; 947 if (t != null) 948 list.add(t); 949 } 950 } 951 return list; 952 } 953 954 /** 955 * Returns a string identifying this synchronizer, as well as its state. 956 * The state, in brackets, includes the String {@code "State ="} 957 * followed by the current value of {@link #getState}, and either 958 * {@code "nonempty"} or {@code "empty"} depending on whether the 959 * queue is empty. 960 * 961 * @return a string identifying this synchronizer, as well as its state 962 */ 963 public String toString() { 964 return super.toString() 965 + "[State = " + getState() + ", " 966 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; 967 } 968 969 // Instrumentation methods for conditions 970 971 /** 972 * Queries whether the given ConditionObject 973 * uses this synchronizer as its lock. 974 * 975 * @param condition the condition 976 * @return {@code true} if owned 977 * @throws NullPointerException if the condition is null 978 */ 979 public final boolean owns(ConditionObject condition) { 980 return condition.isOwnedBy(this); 981 } 982 983 /** 984 * Queries whether any threads are waiting on the given condition 985 * associated with this synchronizer. Note that because timeouts 986 * and interrupts may occur at any time, a {@code true} return 987 * does not guarantee that a future {@code signal} will awaken 988 * any threads. This method is designed primarily for use in 989 * monitoring of the system state. 990 * 991 * @param condition the condition 992 * @return {@code true} if there are any waiting threads 993 * @throws IllegalMonitorStateException if exclusive synchronization 994 * is not held 995 * @throws IllegalArgumentException if the given condition is 996 * not associated with this synchronizer 997 * @throws NullPointerException if the condition is null 998 */ 999 public final boolean hasWaiters(ConditionObject condition) { 1000 if (!owns(condition)) 1001 throw new IllegalArgumentException("Not owner"); 1002 return condition.hasWaiters(); 1003 } 1004 1005 /** 1006 * Returns an estimate of the number of threads waiting on the 1007 * given condition associated with this synchronizer. Note that 1008 * because timeouts and interrupts may occur at any time, the 1009 * estimate serves only as an upper bound on the actual number of 1010 * waiters. This method is designed for use in monitoring system 1011 * state, not for synchronization control. 1012 * 1013 * @param condition the condition 1014 * @return the estimated number of waiting threads 1015 * @throws IllegalMonitorStateException if exclusive synchronization 1016 * is not held 1017 * @throws IllegalArgumentException if the given condition is 1018 * not associated with this synchronizer 1019 * @throws NullPointerException if the condition is null 1020 */ 1021 public final int getWaitQueueLength(ConditionObject condition) { 1022 if (!owns(condition)) 1023 throw new IllegalArgumentException("Not owner"); 1024 return condition.getWaitQueueLength(); 1025 } 1026 1027 /** 1028 * Returns a collection containing those threads that may be 1029 * waiting on the given condition associated with this 1030 * synchronizer. Because the actual set of threads may change 1031 * dynamically while constructing this result, the returned 1032 * collection is only a best-effort estimate. The elements of the 1033 * returned collection are in no particular order. 1034 * 1035 * @param condition the condition 1036 * @return the collection of threads 1037 * @throws IllegalMonitorStateException if exclusive synchronization 1038 * is not held 1039 * @throws IllegalArgumentException if the given condition is 1040 * not associated with this synchronizer 1041 * @throws NullPointerException if the condition is null 1042 */ 1043 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1044 if (!owns(condition)) 1045 throw new IllegalArgumentException("Not owner"); 1046 return condition.getWaitingThreads(); 1047 } 1048 1049 /** 1050 * Condition implementation for a {@link AbstractQueuedLongSynchronizer} 1051 * serving as the basis of a {@link Lock} implementation. 1052 * 1053 * <p>Method documentation for this class describes mechanics, 1054 * not behavioral specifications from the point of view of Lock 1055 * and Condition users. Exported versions of this class will in 1056 * general need to be accompanied by documentation describing 1057 * condition semantics that rely on those of the associated 1058 * {@code AbstractQueuedLongSynchronizer}. 1059 * 1060 * <p>This class is Serializable, but all fields are transient, 1061 * so deserialized conditions have no waiters. 1062 */ 1063 public class ConditionObject implements Condition, java.io.Serializable { 1064 private static final long serialVersionUID = 1173984872572414699L; 1065 /** First node of condition queue. */ 1066 private transient ConditionNode firstWaiter; 1067 /** Last node of condition queue. */ 1068 private transient ConditionNode lastWaiter; 1069 1070 /** 1071 * Creates a new {@code ConditionObject} instance. 1072 */ 1073 public ConditionObject() { } 1074 1075 // Signalling methods 1076 1077 /** 1078 * Removes and transfers one or all waiters to sync queue. 1079 */ 1080 private void doSignal(ConditionNode first, boolean all) { 1081 while (first != null) { 1082 ConditionNode next = first.nextWaiter; 1083 if ((firstWaiter = next) == null) 1084 lastWaiter = null; 1085 if ((first.getAndUnsetStatus(COND) & COND) != 0) { 1086 enqueue(first); 1087 if (!all) 1088 break; 1089 } 1090 first = next; 1091 } 1092 } 1093 1094 /** 1095 * Moves the longest-waiting thread, if one exists, from the 1096 * wait queue for this condition to the wait queue for the 1097 * owning lock. 1098 * 1099 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1100 * returns {@code false} 1101 */ 1102 public final void signal() { 1103 ConditionNode first = firstWaiter; 1104 if (!isHeldExclusively()) 1105 throw new IllegalMonitorStateException(); 1106 if (first != null) 1107 doSignal(first, false); 1108 } 1109 1110 /** 1111 * Moves all threads from the wait queue for this condition to 1112 * the wait queue for the owning lock. 1113 * 1114 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1115 * returns {@code false} 1116 */ 1117 public final void signalAll() { 1118 ConditionNode first = firstWaiter; 1119 if (!isHeldExclusively()) 1120 throw new IllegalMonitorStateException(); 1121 if (first != null) 1122 doSignal(first, true); 1123 } 1124 1125 // Waiting methods 1126 1127 /** 1128 * Adds node to condition list and releases lock. 1129 * 1130 * @param node the node 1131 * @return savedState to reacquire after wait 1132 */ 1133 private long enableWait(ConditionNode node) { 1134 if (isHeldExclusively()) { 1135 node.waiter = Thread.currentThread(); 1136 node.setStatusRelaxed(COND | WAITING); 1137 ConditionNode last = lastWaiter; 1138 if (last == null) 1139 firstWaiter = node; 1140 else 1141 last.nextWaiter = node; 1142 lastWaiter = node; 1143 long savedState = getState(); 1144 if (release(savedState)) 1145 return savedState; 1146 } 1147 node.status = CANCELLED; // lock not held or inconsistent 1148 throw new IllegalMonitorStateException(); 1149 } 1150 1151 /** 1152 * Returns true if a node that was initially placed on a condition 1153 * queue is now ready to reacquire on sync queue. 1154 * @param node the node 1155 * @return true if is reacquiring 1156 */ 1157 private boolean canReacquire(ConditionNode node) { 1158 // check links, not status to avoid enqueue race 1159 return node != null && node.prev != null && isEnqueued(node); 1160 } 1161 1162 /** 1163 * Unlinks the given node and other non-waiting nodes from 1164 * condition queue unless already unlinked. 1165 */ 1166 private void unlinkCancelledWaiters(ConditionNode node) { 1167 if (node == null || node.nextWaiter != null || node == lastWaiter) { 1168 ConditionNode w = firstWaiter, trail = null; 1169 while (w != null) { 1170 ConditionNode next = w.nextWaiter; 1171 if ((w.status & COND) == 0) { 1172 w.nextWaiter = null; 1173 if (trail == null) 1174 firstWaiter = next; 1175 else 1176 trail.nextWaiter = next; 1177 if (next == null) 1178 lastWaiter = trail; 1179 } else 1180 trail = w; 1181 w = next; 1182 } 1183 } 1184 } 1185 1186 /** 1187 * Implements uninterruptible condition wait. 1188 * <ol> 1189 * <li>Save lock state returned by {@link #getState}. 1190 * <li>Invoke {@link #release} with saved state as argument, 1191 * throwing IllegalMonitorStateException if it fails. 1192 * <li>Block until signalled. 1193 * <li>Reacquire by invoking specialized version of 1194 * {@link #acquire} with saved state as argument. 1195 * </ol> 1196 */ 1197 public final void awaitUninterruptibly() { 1198 ConditionNode node = new ConditionNode(); 1199 long savedState = enableWait(node); 1200 LockSupport.setCurrentBlocker(this); // for back-compatibility 1201 boolean interrupted = false, rejected = false; 1202 while (!canReacquire(node)) { 1203 if (Thread.interrupted()) 1204 interrupted = true; 1205 else if ((node.status & COND) != 0) { 1206 try { 1207 if (rejected) 1208 node.block(); 1209 else 1210 ForkJoinPool.managedBlock(node); 1211 } catch (RejectedExecutionException ex) { 1212 rejected = true; 1213 } catch (InterruptedException ie) { 1214 interrupted = true; 1215 } 1216 } else 1217 Thread.onSpinWait(); // awoke while enqueuing 1218 } 1219 LockSupport.setCurrentBlocker(null); 1220 node.clearStatus(); 1221 acquire(node, savedState, false, false, false, 0L); 1222 if (interrupted) 1223 Thread.currentThread().interrupt(); 1224 } 1225 1226 /** 1227 * Implements interruptible condition wait. 1228 * <ol> 1229 * <li>If current thread is interrupted, throw InterruptedException. 1230 * <li>Save lock state returned by {@link #getState}. 1231 * <li>Invoke {@link #release} with saved state as argument, 1232 * throwing IllegalMonitorStateException if it fails. 1233 * <li>Block until signalled or interrupted. 1234 * <li>Reacquire by invoking specialized version of 1235 * {@link #acquire} with saved state as argument. 1236 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1237 * </ol> 1238 */ 1239 public final void await() throws InterruptedException { 1240 if (Thread.interrupted()) 1241 throw new InterruptedException(); 1242 ConditionNode node = new ConditionNode(); 1243 long savedState = enableWait(node); 1244 LockSupport.setCurrentBlocker(this); // for back-compatibility 1245 boolean interrupted = false, cancelled = false, rejected = false; 1246 while (!canReacquire(node)) { 1247 if (interrupted |= Thread.interrupted()) { 1248 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1249 break; // else interrupted after signal 1250 } else if ((node.status & COND) != 0) { 1251 try { 1252 if (rejected) 1253 node.block(); 1254 else 1255 ForkJoinPool.managedBlock(node); 1256 } catch (RejectedExecutionException ex) { 1257 rejected = true; 1258 } catch (InterruptedException ie) { 1259 interrupted = true; 1260 } 1261 } else 1262 Thread.onSpinWait(); // awoke while enqueuing 1263 } 1264 LockSupport.setCurrentBlocker(null); 1265 node.clearStatus(); 1266 acquire(node, savedState, false, false, false, 0L); 1267 if (interrupted) { 1268 if (cancelled) { 1269 unlinkCancelledWaiters(node); 1270 throw new InterruptedException(); 1271 } 1272 Thread.currentThread().interrupt(); 1273 } 1274 } 1275 1276 /** 1277 * Implements timed condition wait. 1278 * <ol> 1279 * <li>If current thread is interrupted, throw InterruptedException. 1280 * <li>Save lock state returned by {@link #getState}. 1281 * <li>Invoke {@link #release} with saved state as argument, 1282 * throwing IllegalMonitorStateException if it fails. 1283 * <li>Block until signalled, interrupted, or timed out. 1284 * <li>Reacquire by invoking specialized version of 1285 * {@link #acquire} with saved state as argument. 1286 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1287 * </ol> 1288 */ 1289 public final long awaitNanos(long nanosTimeout) 1290 throws InterruptedException { 1291 if (Thread.interrupted()) 1292 throw new InterruptedException(); 1293 ConditionNode node = new ConditionNode(); 1294 long savedState = enableWait(node); 1295 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1296 long deadline = System.nanoTime() + nanos; 1297 boolean cancelled = false, interrupted = false; 1298 while (!canReacquire(node)) { 1299 if ((interrupted |= Thread.interrupted()) || 1300 (nanos = deadline - System.nanoTime()) <= 0L) { 1301 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1302 break; 1303 } else 1304 LockSupport.parkNanos(this, nanos); 1305 } 1306 node.clearStatus(); 1307 acquire(node, savedState, false, false, false, 0L); 1308 if (cancelled) { 1309 unlinkCancelledWaiters(node); 1310 if (interrupted) 1311 throw new InterruptedException(); 1312 } else if (interrupted) 1313 Thread.currentThread().interrupt(); 1314 long remaining = deadline - System.nanoTime(); // avoid overflow 1315 return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; 1316 } 1317 1318 /** 1319 * Implements absolute timed condition wait. 1320 * <ol> 1321 * <li>If current thread is interrupted, throw InterruptedException. 1322 * <li>Save lock state returned by {@link #getState}. 1323 * <li>Invoke {@link #release} with saved state as argument, 1324 * throwing IllegalMonitorStateException if it fails. 1325 * <li>Block until signalled, interrupted, or timed out. 1326 * <li>Reacquire by invoking specialized version of 1327 * {@link #acquire} with saved state as argument. 1328 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1329 * <li>If timed out while blocked in step 4, return false, else true. 1330 * </ol> 1331 */ 1332 public final boolean awaitUntil(Date deadline) 1333 throws InterruptedException { 1334 long abstime = deadline.getTime(); 1335 if (Thread.interrupted()) 1336 throw new InterruptedException(); 1337 ConditionNode node = new ConditionNode(); 1338 long savedState = enableWait(node); 1339 boolean cancelled = false, interrupted = false; 1340 while (!canReacquire(node)) { 1341 if ((interrupted |= Thread.interrupted()) || 1342 System.currentTimeMillis() >= abstime) { 1343 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1344 break; 1345 } else 1346 LockSupport.parkUntil(this, abstime); 1347 } 1348 node.clearStatus(); 1349 acquire(node, savedState, false, false, false, 0L); 1350 if (cancelled) { 1351 unlinkCancelledWaiters(node); 1352 if (interrupted) 1353 throw new InterruptedException(); 1354 } else if (interrupted) 1355 Thread.currentThread().interrupt(); 1356 return !cancelled; 1357 } 1358 1359 /** 1360 * Implements timed condition wait. 1361 * <ol> 1362 * <li>If current thread is interrupted, throw InterruptedException. 1363 * <li>Save lock state returned by {@link #getState}. 1364 * <li>Invoke {@link #release} with saved state as argument, 1365 * throwing IllegalMonitorStateException if it fails. 1366 * <li>Block until signalled, interrupted, or timed out. 1367 * <li>Reacquire by invoking specialized version of 1368 * {@link #acquire} with saved state as argument. 1369 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1370 * <li>If timed out while blocked in step 4, return false, else true. 1371 * </ol> 1372 */ 1373 public final boolean await(long time, TimeUnit unit) 1374 throws InterruptedException { 1375 long nanosTimeout = unit.toNanos(time); 1376 if (Thread.interrupted()) 1377 throw new InterruptedException(); 1378 ConditionNode node = new ConditionNode(); 1379 long savedState = enableWait(node); 1380 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1381 long deadline = System.nanoTime() + nanos; 1382 boolean cancelled = false, interrupted = false; 1383 while (!canReacquire(node)) { 1384 if ((interrupted |= Thread.interrupted()) || 1385 (nanos = deadline - System.nanoTime()) <= 0L) { 1386 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1387 break; 1388 } else 1389 LockSupport.parkNanos(this, nanos); 1390 } 1391 node.clearStatus(); 1392 acquire(node, savedState, false, false, false, 0L); 1393 if (cancelled) { 1394 unlinkCancelledWaiters(node); 1395 if (interrupted) 1396 throw new InterruptedException(); 1397 } else if (interrupted) 1398 Thread.currentThread().interrupt(); 1399 return !cancelled; 1400 } 1401 1402 // support for instrumentation 1403 1404 /** 1405 * Returns true if this condition was created by the given 1406 * synchronization object. 1407 * 1408 * @return {@code true} if owned 1409 */ 1410 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { 1411 return sync == AbstractQueuedLongSynchronizer.this; 1412 } 1413 1414 /** 1415 * Queries whether any threads are waiting on this condition. 1416 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}. 1417 * 1418 * @return {@code true} if there are any waiting threads 1419 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1420 * returns {@code false} 1421 */ 1422 protected final boolean hasWaiters() { 1423 if (!isHeldExclusively()) 1424 throw new IllegalMonitorStateException(); 1425 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1426 if ((w.status & COND) != 0) 1427 return true; 1428 } 1429 return false; 1430 } 1431 1432 /** 1433 * Returns an estimate of the number of threads waiting on 1434 * this condition. 1435 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}. 1436 * 1437 * @return the estimated number of waiting threads 1438 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1439 * returns {@code false} 1440 */ 1441 protected final int getWaitQueueLength() { 1442 if (!isHeldExclusively()) 1443 throw new IllegalMonitorStateException(); 1444 int n = 0; 1445 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1446 if ((w.status & COND) != 0) 1447 ++n; 1448 } 1449 return n; 1450 } 1451 1452 /** 1453 * Returns a collection containing those threads that may be 1454 * waiting on this Condition. 1455 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}. 1456 * 1457 * @return the collection of threads 1458 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1459 * returns {@code false} 1460 */ 1461 protected final Collection<Thread> getWaitingThreads() { 1462 if (!isHeldExclusively()) 1463 throw new IllegalMonitorStateException(); 1464 ArrayList<Thread> list = new ArrayList<>(); 1465 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1466 if ((w.status & COND) != 0) { 1467 Thread t = w.waiter; 1468 if (t != null) 1469 list.add(t); 1470 } 1471 } 1472 return list; 1473 } 1474 } 1475 1476 // Unsafe 1477 private static final Unsafe U = Unsafe.getUnsafe(); 1478 private static final long STATE 1479 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state"); 1480 private static final long HEAD 1481 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head"); 1482 private static final long TAIL 1483 = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail"); 1484 1485 static { 1486 Class<?> ensureLoaded = LockSupport.class; 1487 } 1488 }