1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea, Bill Scherer, and Michael Scott with 32 * assistance from members of JCP JSR-166 Expert Group and released to 33 * the public domain, as explained at 34 * http://creativecommons.org/publicdomain/zero/1.0/ 35 */ 36 37 package java.util.concurrent; 38 import java.util.concurrent.locks.*; 39 import java.util.*; 40 41 /** 42 * A {@linkplain BlockingQueue blocking queue} in which each insert 43 * operation must wait for a corresponding remove operation by another 44 * thread, and vice versa. A synchronous queue does not have any 45 * internal capacity, not even a capacity of one. You cannot 46 * <tt>peek</tt> at a synchronous queue because an element is only 47 * present when you try to remove it; you cannot insert an element 48 * (using any method) unless another thread is trying to remove it; 49 * you cannot iterate as there is nothing to iterate. The 50 * <em>head</em> of the queue is the element that the first queued 51 * inserting thread is trying to add to the queue; if there is no such 52 * queued thread then no element is available for removal and 53 * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other 54 * <tt>Collection</tt> methods (for example <tt>contains</tt>), a 55 * <tt>SynchronousQueue</tt> acts as an empty collection. This queue 56 * does not permit <tt>null</tt> elements. 57 * 58 * <p>Synchronous queues are similar to rendezvous channels used in 59 * CSP and Ada. They are well suited for handoff designs, in which an 60 * object running in one thread must sync up with an object running 61 * in another thread in order to hand it some information, event, or 62 * task. 63 * 64 * <p> This class supports an optional fairness policy for ordering 65 * waiting producer and consumer threads. By default, this ordering 66 * is not guaranteed. However, a queue constructed with fairness set 67 * to <tt>true</tt> grants threads access in FIFO order. 68 * 69 * <p>This class and its iterator implement all of the 70 * <em>optional</em> methods of the {@link Collection} and {@link 71 * Iterator} interfaces. 72 * 73 * <p>This class is a member of the 74 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 75 * Java Collections Framework</a>. 76 * 77 * @since 1.5 78 * @author Doug Lea and Bill Scherer and Michael Scott 79 * @param <E> the type of elements held in this collection 80 */ 81 public class SynchronousQueue<E> extends AbstractQueue<E> 82 implements BlockingQueue<E>, java.io.Serializable { 83 private static final long serialVersionUID = -3223113410248163686L; 84 85 /* 86 * This class implements extensions of the dual stack and dual 87 * queue algorithms described in "Nonblocking Concurrent Objects 88 * with Condition Synchronization", by W. N. Scherer III and 89 * M. L. Scott. 18th Annual Conf. on Distributed Computing, 90 * Oct. 2004 (see also 91 * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). 92 * The (Lifo) stack is used for non-fair mode, and the (Fifo) 93 * queue for fair mode. The performance of the two is generally 94 * similar. Fifo usually supports higher throughput under 95 * contention but Lifo maintains higher thread locality in common 96 * applications. 97 * 98 * A dual queue (and similarly stack) is one that at any given 99 * time either holds "data" -- items provided by put operations, 100 * or "requests" -- slots representing take operations, or is 101 * empty. A call to "fulfill" (i.e., a call requesting an item 102 * from a queue holding data or vice versa) dequeues a 103 * complementary node. The most interesting feature of these 104 * queues is that any operation can figure out which mode the 105 * queue is in, and act accordingly without needing locks. 106 * 107 * Both the queue and stack extend abstract class Transferer 108 * defining the single method transfer that does a put or a 109 * take. These are unified into a single method because in dual 110 * data structures, the put and take operations are symmetrical, 111 * so nearly all code can be combined. The resulting transfer 112 * methods are on the long side, but are easier to follow than 113 * they would be if broken up into nearly-duplicated parts. 114 * 115 * The queue and stack data structures share many conceptual 116 * similarities but very few concrete details. For simplicity, 117 * they are kept distinct so that they can later evolve 118 * separately. 119 * 120 * The algorithms here differ from the versions in the above paper 121 * in extending them for use in synchronous queues, as well as 122 * dealing with cancellation. The main differences include: 123 * 124 * 1. The original algorithms used bit-marked pointers, but 125 * the ones here use mode bits in nodes, leading to a number 126 * of further adaptations. 127 * 2. SynchronousQueues must block threads waiting to become 128 * fulfilled. 129 * 3. Support for cancellation via timeout and interrupts, 130 * including cleaning out cancelled nodes/threads 131 * from lists to avoid garbage retention and memory depletion. 132 * 133 * Blocking is mainly accomplished using LockSupport park/unpark, 134 * except that nodes that appear to be the next ones to become 135 * fulfilled first spin a bit (on multiprocessors only). On very 136 * busy synchronous queues, spinning can dramatically improve 137 * throughput. And on less busy ones, the amount of spinning is 138 * small enough not to be noticeable. 139 * 140 * Cleaning is done in different ways in queues vs stacks. For 141 * queues, we can almost always remove a node immediately in O(1) 142 * time (modulo retries for consistency checks) when it is 143 * cancelled. But if it may be pinned as the current tail, it must 144 * wait until some subsequent cancellation. For stacks, we need a 145 * potentially O(n) traversal to be sure that we can remove the 146 * node, but this can run concurrently with other threads 147 * accessing the stack. 148 * 149 * While garbage collection takes care of most node reclamation 150 * issues that otherwise complicate nonblocking algorithms, care 151 * is taken to "forget" references to data, other nodes, and 152 * threads that might be held on to long-term by blocked 153 * threads. In cases where setting to null would otherwise 154 * conflict with main algorithms, this is done by changing a 155 * node's link to now point to the node itself. This doesn't arise 156 * much for Stack nodes (because blocked threads do not hang on to 157 * old head pointers), but references in Queue nodes must be 158 * aggressively forgotten to avoid reachability of everything any 159 * node has ever referred to since arrival. 160 */ 161 162 /** 163 * Shared internal API for dual stacks and queues. 164 */ 165 abstract static class Transferer<E> { 166 /** 167 * Performs a put or take. 168 * 169 * @param e if non-null, the item to be handed to a consumer; 170 * if null, requests that transfer return an item 171 * offered by producer. 172 * @param timed if this operation should timeout 173 * @param nanos the timeout, in nanoseconds 174 * @return if non-null, the item provided or received; if null, 175 * the operation failed due to timeout or interrupt -- 176 * the caller can distinguish which of these occurred 177 * by checking Thread.interrupted. 178 */ 179 abstract E transfer(E e, boolean timed, long nanos); 180 } 181 182 /** The number of CPUs, for spin control */ 183 static final int NCPUS = Runtime.getRuntime().availableProcessors(); 184 185 /** 186 * The number of times to spin before blocking in timed waits. 187 * The value is empirically derived -- it works well across a 188 * variety of processors and OSes. Empirically, the best value 189 * seems not to vary with number of CPUs (beyond 2) so is just 190 * a constant. 191 */ 192 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; 193 194 /** 195 * The number of times to spin before blocking in untimed waits. 196 * This is greater than timed value because untimed waits spin 197 * faster since they don't need to check times on each spin. 198 */ 199 static final int maxUntimedSpins = maxTimedSpins * 16; 200 201 /** 202 * The number of nanoseconds for which it is faster to spin 203 * rather than to use timed park. A rough estimate suffices. 204 */ 205 static final long spinForTimeoutThreshold = 1000L; 206 207 /** Dual stack */ 208 static final class TransferStack<E> extends Transferer<E> { 209 /* 210 * This extends Scherer-Scott dual stack algorithm, differing, 211 * among other ways, by using "covering" nodes rather than 212 * bit-marked pointers: Fulfilling operations push on marker 213 * nodes (with FULFILLING bit set in mode) to reserve a spot 214 * to match a waiting node. 215 */ 216 217 /* Modes for SNodes, ORed together in node fields */ 218 /** Node represents an unfulfilled consumer */ 219 static final int REQUEST = 0; 220 /** Node represents an unfulfilled producer */ 221 static final int DATA = 1; 222 /** Node is fulfilling another unfulfilled DATA or REQUEST */ 223 static final int FULFILLING = 2; 224 225 /** Return true if m has fulfilling bit set */ 226 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } 227 228 /** Node class for TransferStacks. */ 229 static final class SNode { 230 volatile SNode next; // next node in stack 231 volatile SNode match; // the node matched to this 232 volatile Thread waiter; // to control park/unpark 233 Object item; // data; or null for REQUESTs 234 int mode; 235 // Note: item and mode fields don't need to be volatile 236 // since they are always written before, and read after, 237 // other volatile/atomic operations. 238 239 SNode(Object item) { 240 this.item = item; 241 } 242 243 boolean casNext(SNode cmp, SNode val) { 244 return cmp == next && 245 UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 246 } 247 248 /** 249 * Tries to match node s to this node, if so, waking up thread. 250 * Fulfillers call tryMatch to identify their waiters. 251 * Waiters block until they have been matched. 252 * 253 * @param s the node to match 254 * @return true if successfully matched to s 255 */ 256 boolean tryMatch(SNode s) { 257 if (match == null && 258 UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { 259 Thread w = waiter; 260 if (w != null) { // waiters need at most one unpark 261 waiter = null; 262 LockSupport.unpark(w); 263 } 264 return true; 265 } 266 return match == s; 267 } 268 269 /** 270 * Tries to cancel a wait by matching node to itself. 271 */ 272 void tryCancel() { 273 UNSAFE.compareAndSwapObject(this, matchOffset, null, this); 274 } 275 276 boolean isCancelled() { 277 return match == this; 278 } 279 280 // Unsafe mechanics 281 private static final sun.misc.Unsafe UNSAFE; 282 private static final long matchOffset; 283 private static final long nextOffset; 284 285 static { 286 try { 287 UNSAFE = sun.misc.Unsafe.getUnsafe(); 288 Class<?> k = SNode.class; 289 matchOffset = UNSAFE.objectFieldOffset 290 (k.getDeclaredField("match")); 291 nextOffset = UNSAFE.objectFieldOffset 292 (k.getDeclaredField("next")); 293 } catch (Exception e) { 294 throw new Error(e); 295 } 296 } 297 } 298 299 /** The head (top) of the stack */ 300 volatile SNode head; 301 302 boolean casHead(SNode h, SNode nh) { 303 return h == head && 304 UNSAFE.compareAndSwapObject(this, headOffset, h, nh); 305 } 306 307 /** 308 * Creates or resets fields of a node. Called only from transfer 309 * where the node to push on stack is lazily created and 310 * reused when possible to help reduce intervals between reads 311 * and CASes of head and to avoid surges of garbage when CASes 312 * to push nodes fail due to contention. 313 */ 314 static SNode snode(SNode s, Object e, SNode next, int mode) { 315 if (s == null) s = new SNode(e); 316 s.mode = mode; 317 s.next = next; 318 return s; 319 } 320 321 /** 322 * Puts or takes an item. 323 */ 324 @SuppressWarnings("unchecked") 325 E transfer(E e, boolean timed, long nanos) { 326 /* 327 * Basic algorithm is to loop trying one of three actions: 328 * 329 * 1. If apparently empty or already containing nodes of same 330 * mode, try to push node on stack and wait for a match, 331 * returning it, or null if cancelled. 332 * 333 * 2. If apparently containing node of complementary mode, 334 * try to push a fulfilling node on to stack, match 335 * with corresponding waiting node, pop both from 336 * stack, and return matched item. The matching or 337 * unlinking might not actually be necessary because of 338 * other threads performing action 3: 339 * 340 * 3. If top of stack already holds another fulfilling node, 341 * help it out by doing its match and/or pop 342 * operations, and then continue. The code for helping 343 * is essentially the same as for fulfilling, except 344 * that it doesn't return the item. 345 */ 346 347 SNode s = null; // constructed/reused as needed 348 int mode = (e == null) ? REQUEST : DATA; 349 350 for (;;) { 351 SNode h = head; 352 if (h == null || h.mode == mode) { // empty or same-mode 353 if (timed && nanos <= 0) { // can't wait 354 if (h != null && h.isCancelled()) 355 casHead(h, h.next); // pop cancelled node 356 else 357 return null; 358 } else if (casHead(h, s = snode(s, e, h, mode))) { 359 SNode m = awaitFulfill(s, timed, nanos); 360 if (m == s) { // wait was cancelled 361 clean(s); 362 return null; 363 } 364 if ((h = head) != null && h.next == s) 365 casHead(h, s.next); // help s's fulfiller 366 return (E) ((mode == REQUEST) ? m.item : s.item); 367 } 368 } else if (!isFulfilling(h.mode)) { // try to fulfill 369 if (h.isCancelled()) // already cancelled 370 casHead(h, h.next); // pop and retry 371 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { 372 for (;;) { // loop until matched or waiters disappear 373 SNode m = s.next; // m is s's match 374 if (m == null) { // all waiters are gone 375 casHead(s, null); // pop fulfill node 376 s = null; // use new node next time 377 break; // restart main loop 378 } 379 SNode mn = m.next; 380 if (m.tryMatch(s)) { 381 casHead(s, mn); // pop both s and m 382 return (E) ((mode == REQUEST) ? m.item : s.item); 383 } else // lost match 384 s.casNext(m, mn); // help unlink 385 } 386 } 387 } else { // help a fulfiller 388 SNode m = h.next; // m is h's match 389 if (m == null) // waiter is gone 390 casHead(h, null); // pop fulfilling node 391 else { 392 SNode mn = m.next; 393 if (m.tryMatch(h)) // help match 394 casHead(h, mn); // pop both h and m 395 else // lost match 396 h.casNext(m, mn); // help unlink 397 } 398 } 399 } 400 } 401 402 /** 403 * Spins/blocks until node s is matched by a fulfill operation. 404 * 405 * @param s the waiting node 406 * @param timed true if timed wait 407 * @param nanos timeout value 408 * @return matched node, or s if cancelled 409 */ 410 SNode awaitFulfill(SNode s, boolean timed, long nanos) { 411 /* 412 * When a node/thread is about to block, it sets its waiter 413 * field and then rechecks state at least one more time 414 * before actually parking, thus covering race vs 415 * fulfiller noticing that waiter is non-null so should be 416 * woken. 417 * 418 * When invoked by nodes that appear at the point of call 419 * to be at the head of the stack, calls to park are 420 * preceded by spins to avoid blocking when producers and 421 * consumers are arriving very close in time. This can 422 * happen enough to bother only on multiprocessors. 423 * 424 * The order of checks for returning out of main loop 425 * reflects fact that interrupts have precedence over 426 * normal returns, which have precedence over 427 * timeouts. (So, on timeout, one last check for match is 428 * done before giving up.) Except that calls from untimed 429 * SynchronousQueue.{poll/offer} don't check interrupts 430 * and don't wait at all, so are trapped in transfer 431 * method rather than calling awaitFulfill. 432 */ 433 long lastTime = timed ? System.nanoTime() : 0; 434 Thread w = Thread.currentThread(); 435 SNode h = head; 436 int spins = (shouldSpin(s) ? 437 (timed ? maxTimedSpins : maxUntimedSpins) : 0); 438 for (;;) { 439 if (w.isInterrupted()) 440 s.tryCancel(); 441 SNode m = s.match; 442 if (m != null) 443 return m; 444 if (timed) { 445 long now = System.nanoTime(); 446 nanos -= now - lastTime; 447 lastTime = now; 448 if (nanos <= 0) { 449 s.tryCancel(); 450 continue; 451 } 452 } 453 if (spins > 0) 454 spins = shouldSpin(s) ? (spins-1) : 0; 455 else if (s.waiter == null) 456 s.waiter = w; // establish waiter so can park next iter 457 else if (!timed) 458 LockSupport.park(this); 459 else if (nanos > spinForTimeoutThreshold) 460 LockSupport.parkNanos(this, nanos); 461 } 462 } 463 464 /** 465 * Returns true if node s is at head or there is an active 466 * fulfiller. 467 */ 468 boolean shouldSpin(SNode s) { 469 SNode h = head; 470 return (h == s || h == null || isFulfilling(h.mode)); 471 } 472 473 /** 474 * Unlinks s from the stack. 475 */ 476 void clean(SNode s) { 477 s.item = null; // forget item 478 s.waiter = null; // forget thread 479 480 /* 481 * At worst we may need to traverse entire stack to unlink 482 * s. If there are multiple concurrent calls to clean, we 483 * might not see s if another thread has already removed 484 * it. But we can stop when we see any node known to 485 * follow s. We use s.next unless it too is cancelled, in 486 * which case we try the node one past. We don't check any 487 * further because we don't want to doubly traverse just to 488 * find sentinel. 489 */ 490 491 SNode past = s.next; 492 if (past != null && past.isCancelled()) 493 past = past.next; 494 495 // Absorb cancelled nodes at head 496 SNode p; 497 while ((p = head) != null && p != past && p.isCancelled()) 498 casHead(p, p.next); 499 500 // Unsplice embedded nodes 501 while (p != null && p != past) { 502 SNode n = p.next; 503 if (n != null && n.isCancelled()) 504 p.casNext(n, n.next); 505 else 506 p = n; 507 } 508 } 509 510 // Unsafe mechanics 511 private static final sun.misc.Unsafe UNSAFE; 512 private static final long headOffset; 513 static { 514 try { 515 UNSAFE = sun.misc.Unsafe.getUnsafe(); 516 Class<?> k = TransferStack.class; 517 headOffset = UNSAFE.objectFieldOffset 518 (k.getDeclaredField("head")); 519 } catch (Exception e) { 520 throw new Error(e); 521 } 522 } 523 } 524 525 /** Dual Queue */ 526 static final class TransferQueue<E> extends Transferer<E> { 527 /* 528 * This extends Scherer-Scott dual queue algorithm, differing, 529 * among other ways, by using modes within nodes rather than 530 * marked pointers. The algorithm is a little simpler than 531 * that for stacks because fulfillers do not need explicit 532 * nodes, and matching is done by CAS'ing QNode.item field 533 * from non-null to null (for put) or vice versa (for take). 534 */ 535 536 /** Node class for TransferQueue. */ 537 static final class QNode { 538 volatile QNode next; // next node in queue 539 volatile Object item; // CAS'ed to or from null 540 volatile Thread waiter; // to control park/unpark 541 final boolean isData; 542 543 QNode(Object item, boolean isData) { 544 this.item = item; 545 this.isData = isData; 546 } 547 548 boolean casNext(QNode cmp, QNode val) { 549 return next == cmp && 550 UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 551 } 552 553 boolean casItem(Object cmp, Object val) { 554 return item == cmp && 555 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 556 } 557 558 /** 559 * Tries to cancel by CAS'ing ref to this as item. 560 */ 561 void tryCancel(Object cmp) { 562 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); 563 } 564 565 boolean isCancelled() { 566 return item == this; 567 } 568 569 /** 570 * Returns true if this node is known to be off the queue 571 * because its next pointer has been forgotten due to 572 * an advanceHead operation. 573 */ 574 boolean isOffList() { 575 return next == this; 576 } 577 578 // Unsafe mechanics 579 private static final sun.misc.Unsafe UNSAFE; 580 private static final long itemOffset; 581 private static final long nextOffset; 582 583 static { 584 try { 585 UNSAFE = sun.misc.Unsafe.getUnsafe(); 586 Class<?> k = QNode.class; 587 itemOffset = UNSAFE.objectFieldOffset 588 (k.getDeclaredField("item")); 589 nextOffset = UNSAFE.objectFieldOffset 590 (k.getDeclaredField("next")); 591 } catch (Exception e) { 592 throw new Error(e); 593 } 594 } 595 } 596 597 /** Head of queue */ 598 transient volatile QNode head; 599 /** Tail of queue */ 600 transient volatile QNode tail; 601 /** 602 * Reference to a cancelled node that might not yet have been 603 * unlinked from queue because it was the last inserted node 604 * when it cancelled. 605 */ 606 transient volatile QNode cleanMe; 607 608 TransferQueue() { 609 QNode h = new QNode(null, false); // initialize to dummy node. 610 head = h; 611 tail = h; 612 } 613 614 /** 615 * Tries to cas nh as new head; if successful, unlink 616 * old head's next node to avoid garbage retention. 617 */ 618 void advanceHead(QNode h, QNode nh) { 619 if (h == head && 620 UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) 621 h.next = h; // forget old next 622 } 623 624 /** 625 * Tries to cas nt as new tail. 626 */ 627 void advanceTail(QNode t, QNode nt) { 628 if (tail == t) 629 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); 630 } 631 632 /** 633 * Tries to CAS cleanMe slot. 634 */ 635 boolean casCleanMe(QNode cmp, QNode val) { 636 return cleanMe == cmp && 637 UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val); 638 } 639 640 /** 641 * Puts or takes an item. 642 */ 643 @SuppressWarnings("unchecked") 644 E transfer(E e, boolean timed, long nanos) { 645 /* Basic algorithm is to loop trying to take either of 646 * two actions: 647 * 648 * 1. If queue apparently empty or holding same-mode nodes, 649 * try to add node to queue of waiters, wait to be 650 * fulfilled (or cancelled) and return matching item. 651 * 652 * 2. If queue apparently contains waiting items, and this 653 * call is of complementary mode, try to fulfill by CAS'ing 654 * item field of waiting node and dequeuing it, and then 655 * returning matching item. 656 * 657 * In each case, along the way, check for and try to help 658 * advance head and tail on behalf of other stalled/slow 659 * threads. 660 * 661 * The loop starts off with a null check guarding against 662 * seeing uninitialized head or tail values. This never 663 * happens in current SynchronousQueue, but could if 664 * callers held non-volatile/final ref to the 665 * transferer. The check is here anyway because it places 666 * null checks at top of loop, which is usually faster 667 * than having them implicitly interspersed. 668 */ 669 670 QNode s = null; // constructed/reused as needed 671 boolean isData = (e != null); 672 673 for (;;) { 674 QNode t = tail; 675 QNode h = head; 676 if (t == null || h == null) // saw uninitialized value 677 continue; // spin 678 679 if (h == t || t.isData == isData) { // empty or same-mode 680 QNode tn = t.next; 681 if (t != tail) // inconsistent read 682 continue; 683 if (tn != null) { // lagging tail 684 advanceTail(t, tn); 685 continue; 686 } 687 if (timed && nanos <= 0) // can't wait 688 return null; 689 if (s == null) 690 s = new QNode(e, isData); 691 if (!t.casNext(null, s)) // failed to link in 692 continue; 693 694 advanceTail(t, s); // swing tail and wait 695 Object x = awaitFulfill(s, e, timed, nanos); 696 if (x == s) { // wait was cancelled 697 clean(t, s); 698 return null; 699 } 700 701 if (!s.isOffList()) { // not already unlinked 702 advanceHead(t, s); // unlink if head 703 if (x != null) // and forget fields 704 s.item = s; 705 s.waiter = null; 706 } 707 return (x != null) ? (E)x : e; 708 709 } else { // complementary-mode 710 QNode m = h.next; // node to fulfill 711 if (t != tail || m == null || h != head) 712 continue; // inconsistent read 713 714 Object x = m.item; 715 if (isData == (x != null) || // m already fulfilled 716 x == m || // m cancelled 717 !m.casItem(x, e)) { // lost CAS 718 advanceHead(h, m); // dequeue and retry 719 continue; 720 } 721 722 advanceHead(h, m); // successfully fulfilled 723 LockSupport.unpark(m.waiter); 724 return (x != null) ? (E)x : e; 725 } 726 } 727 } 728 729 /** 730 * Spins/blocks until node s is fulfilled. 731 * 732 * @param s the waiting node 733 * @param e the comparison value for checking match 734 * @param timed true if timed wait 735 * @param nanos timeout value 736 * @return matched item, or s if cancelled 737 */ 738 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { 739 /* Same idea as TransferStack.awaitFulfill */ 740 long lastTime = timed ? System.nanoTime() : 0; 741 Thread w = Thread.currentThread(); 742 int spins = ((head.next == s) ? 743 (timed ? maxTimedSpins : maxUntimedSpins) : 0); 744 for (;;) { 745 if (w.isInterrupted()) 746 s.tryCancel(e); 747 Object x = s.item; 748 if (x != e) 749 return x; 750 if (timed) { 751 long now = System.nanoTime(); 752 nanos -= now - lastTime; 753 lastTime = now; 754 if (nanos <= 0) { 755 s.tryCancel(e); 756 continue; 757 } 758 } 759 if (spins > 0) 760 --spins; 761 else if (s.waiter == null) 762 s.waiter = w; 763 else if (!timed) 764 LockSupport.park(this); 765 else if (nanos > spinForTimeoutThreshold) 766 LockSupport.parkNanos(this, nanos); 767 } 768 } 769 770 /** 771 * Gets rid of cancelled node s with original predecessor pred. 772 */ 773 void clean(QNode pred, QNode s) { 774 s.waiter = null; // forget thread 775 /* 776 * At any given time, exactly one node on list cannot be 777 * deleted -- the last inserted node. To accommodate this, 778 * if we cannot delete s, we save its predecessor as 779 * "cleanMe", deleting the previously saved version 780 * first. At least one of node s or the node previously 781 * saved can always be deleted, so this always terminates. 782 */ 783 while (pred.next == s) { // Return early if already unlinked 784 QNode h = head; 785 QNode hn = h.next; // Absorb cancelled first node as head 786 if (hn != null && hn.isCancelled()) { 787 advanceHead(h, hn); 788 continue; 789 } 790 QNode t = tail; // Ensure consistent read for tail 791 if (t == h) 792 return; 793 QNode tn = t.next; 794 if (t != tail) 795 continue; 796 if (tn != null) { 797 advanceTail(t, tn); 798 continue; 799 } 800 if (s != t) { // If not tail, try to unsplice 801 QNode sn = s.next; 802 if (sn == s || pred.casNext(s, sn)) 803 return; 804 } 805 QNode dp = cleanMe; 806 if (dp != null) { // Try unlinking previous cancelled node 807 QNode d = dp.next; 808 QNode dn; 809 if (d == null || // d is gone or 810 d == dp || // d is off list or 811 !d.isCancelled() || // d not cancelled or 812 (d != t && // d not tail and 813 (dn = d.next) != null && // has successor 814 dn != d && // that is on list 815 dp.casNext(d, dn))) // d unspliced 816 casCleanMe(dp, null); 817 if (dp == pred) 818 return; // s is already saved node 819 } else if (casCleanMe(null, pred)) 820 return; // Postpone cleaning s 821 } 822 } 823 824 private static final sun.misc.Unsafe UNSAFE; 825 private static final long headOffset; 826 private static final long tailOffset; 827 private static final long cleanMeOffset; 828 static { 829 try { 830 UNSAFE = sun.misc.Unsafe.getUnsafe(); 831 Class<?> k = TransferQueue.class; 832 headOffset = UNSAFE.objectFieldOffset 833 (k.getDeclaredField("head")); 834 tailOffset = UNSAFE.objectFieldOffset 835 (k.getDeclaredField("tail")); 836 cleanMeOffset = UNSAFE.objectFieldOffset 837 (k.getDeclaredField("cleanMe")); 838 } catch (Exception e) { 839 throw new Error(e); 840 } 841 } 842 } 843 844 /** 845 * The transferer. Set only in constructor, but cannot be declared 846 * as final without further complicating serialization. Since 847 * this is accessed only at most once per public method, there 848 * isn't a noticeable performance penalty for using volatile 849 * instead of final here. 850 */ 851 private transient volatile Transferer<E> transferer; 852 853 /** 854 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy. 855 */ 856 public SynchronousQueue() { 857 this(false); 858 } 859 860 /** 861 * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy. 862 * 863 * @param fair if true, waiting threads contend in FIFO order for 864 * access; otherwise the order is unspecified. 865 */ 866 public SynchronousQueue(boolean fair) { 867 transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); 868 } 869 870 /** 871 * Adds the specified element to this queue, waiting if necessary for 872 * another thread to receive it. 873 * 874 * @throws InterruptedException {@inheritDoc} 875 * @throws NullPointerException {@inheritDoc} 876 */ 877 public void put(E o) throws InterruptedException { 878 if (o == null) throw new NullPointerException(); 879 if (transferer.transfer(o, false, 0) == null) { 880 Thread.interrupted(); 881 throw new InterruptedException(); 882 } 883 } 884 885 /** 886 * Inserts the specified element into this queue, waiting if necessary 887 * up to the specified wait time for another thread to receive it. 888 * 889 * @return <tt>true</tt> if successful, or <tt>false</tt> if the 890 * specified waiting time elapses before a consumer appears. 891 * @throws InterruptedException {@inheritDoc} 892 * @throws NullPointerException {@inheritDoc} 893 */ 894 public boolean offer(E o, long timeout, TimeUnit unit) 895 throws InterruptedException { 896 if (o == null) throw new NullPointerException(); 897 if (transferer.transfer(o, true, unit.toNanos(timeout)) != null) 898 return true; 899 if (!Thread.interrupted()) 900 return false; 901 throw new InterruptedException(); 902 } 903 904 /** 905 * Inserts the specified element into this queue, if another thread is 906 * waiting to receive it. 907 * 908 * @param e the element to add 909 * @return <tt>true</tt> if the element was added to this queue, else 910 * <tt>false</tt> 911 * @throws NullPointerException if the specified element is null 912 */ 913 public boolean offer(E e) { 914 if (e == null) throw new NullPointerException(); 915 return transferer.transfer(e, true, 0) != null; 916 } 917 918 /** 919 * Retrieves and removes the head of this queue, waiting if necessary 920 * for another thread to insert it. 921 * 922 * @return the head of this queue 923 * @throws InterruptedException {@inheritDoc} 924 */ 925 public E take() throws InterruptedException { 926 E e = transferer.transfer(null, false, 0); 927 if (e != null) 928 return e; 929 Thread.interrupted(); 930 throw new InterruptedException(); 931 } 932 933 /** 934 * Retrieves and removes the head of this queue, waiting 935 * if necessary up to the specified wait time, for another thread 936 * to insert it. 937 * 938 * @return the head of this queue, or <tt>null</tt> if the 939 * specified waiting time elapses before an element is present. 940 * @throws InterruptedException {@inheritDoc} 941 */ 942 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 943 E e = transferer.transfer(null, true, unit.toNanos(timeout)); 944 if (e != null || !Thread.interrupted()) 945 return e; 946 throw new InterruptedException(); 947 } 948 949 /** 950 * Retrieves and removes the head of this queue, if another thread 951 * is currently making an element available. 952 * 953 * @return the head of this queue, or <tt>null</tt> if no 954 * element is available. 955 */ 956 public E poll() { 957 return transferer.transfer(null, true, 0); 958 } 959 960 /** 961 * Always returns <tt>true</tt>. 962 * A <tt>SynchronousQueue</tt> has no internal capacity. 963 * 964 * @return <tt>true</tt> 965 */ 966 public boolean isEmpty() { 967 return true; 968 } 969 970 /** 971 * Always returns zero. 972 * A <tt>SynchronousQueue</tt> has no internal capacity. 973 * 974 * @return zero. 975 */ 976 public int size() { 977 return 0; 978 } 979 980 /** 981 * Always returns zero. 982 * A <tt>SynchronousQueue</tt> has no internal capacity. 983 * 984 * @return zero. 985 */ 986 public int remainingCapacity() { 987 return 0; 988 } 989 990 /** 991 * Does nothing. 992 * A <tt>SynchronousQueue</tt> has no internal capacity. 993 */ 994 public void clear() { 995 } 996 997 /** 998 * Always returns <tt>false</tt>. 999 * A <tt>SynchronousQueue</tt> has no internal capacity. 1000 * 1001 * @param o the element 1002 * @return <tt>false</tt> 1003 */ 1004 public boolean contains(Object o) { 1005 return false; 1006 } 1007 1008 /** 1009 * Always returns <tt>false</tt>. 1010 * A <tt>SynchronousQueue</tt> has no internal capacity. 1011 * 1012 * @param o the element to remove 1013 * @return <tt>false</tt> 1014 */ 1015 public boolean remove(Object o) { 1016 return false; 1017 } 1018 1019 /** 1020 * Returns <tt>false</tt> unless the given collection is empty. 1021 * A <tt>SynchronousQueue</tt> has no internal capacity. 1022 * 1023 * @param c the collection 1024 * @return <tt>false</tt> unless given collection is empty 1025 */ 1026 public boolean containsAll(Collection<?> c) { 1027 return c.isEmpty(); 1028 } 1029 1030 /** 1031 * Always returns <tt>false</tt>. 1032 * A <tt>SynchronousQueue</tt> has no internal capacity. 1033 * 1034 * @param c the collection 1035 * @return <tt>false</tt> 1036 */ 1037 public boolean removeAll(Collection<?> c) { 1038 return false; 1039 } 1040 1041 /** 1042 * Always returns <tt>false</tt>. 1043 * A <tt>SynchronousQueue</tt> has no internal capacity. 1044 * 1045 * @param c the collection 1046 * @return <tt>false</tt> 1047 */ 1048 public boolean retainAll(Collection<?> c) { 1049 return false; 1050 } 1051 1052 /** 1053 * Always returns <tt>null</tt>. 1054 * A <tt>SynchronousQueue</tt> does not return elements 1055 * unless actively waited on. 1056 * 1057 * @return <tt>null</tt> 1058 */ 1059 public E peek() { 1060 return null; 1061 } 1062 1063 /** 1064 * Returns an empty iterator in which <tt>hasNext</tt> always returns 1065 * <tt>false</tt>. 1066 * 1067 * @return an empty iterator 1068 */ 1069 @SuppressWarnings("unchecked") 1070 public Iterator<E> iterator() { 1071 return (Iterator<E>) EmptyIterator.EMPTY_ITERATOR; 1072 } 1073 1074 // Replicated from a previous version of Collections 1075 private static class EmptyIterator<E> implements Iterator<E> { 1076 static final EmptyIterator<Object> EMPTY_ITERATOR 1077 = new EmptyIterator<Object>(); 1078 1079 public boolean hasNext() { return false; } 1080 public E next() { throw new NoSuchElementException(); } 1081 public void remove() { throw new IllegalStateException(); } 1082 } 1083 1084 /** 1085 * Returns a zero-length array. 1086 * @return a zero-length array 1087 */ 1088 public Object[] toArray() { 1089 return new Object[0]; 1090 } 1091 1092 /** 1093 * Sets the zeroeth element of the specified array to <tt>null</tt> 1094 * (if the array has non-zero length) and returns it. 1095 * 1096 * @param a the array 1097 * @return the specified array 1098 * @throws NullPointerException if the specified array is null 1099 */ 1100 public <T> T[] toArray(T[] a) { 1101 if (a.length > 0) 1102 a[0] = null; 1103 return a; 1104 } 1105 1106 /** 1107 * @throws UnsupportedOperationException {@inheritDoc} 1108 * @throws ClassCastException {@inheritDoc} 1109 * @throws NullPointerException {@inheritDoc} 1110 * @throws IllegalArgumentException {@inheritDoc} 1111 */ 1112 public int drainTo(Collection<? super E> c) { 1113 if (c == null) 1114 throw new NullPointerException(); 1115 if (c == this) 1116 throw new IllegalArgumentException(); 1117 int n = 0; 1118 for (E e; (e = poll()) != null;) { 1119 c.add(e); 1120 ++n; 1121 } 1122 return n; 1123 } 1124 1125 /** 1126 * @throws UnsupportedOperationException {@inheritDoc} 1127 * @throws ClassCastException {@inheritDoc} 1128 * @throws NullPointerException {@inheritDoc} 1129 * @throws IllegalArgumentException {@inheritDoc} 1130 */ 1131 public int drainTo(Collection<? super E> c, int maxElements) { 1132 if (c == null) 1133 throw new NullPointerException(); 1134 if (c == this) 1135 throw new IllegalArgumentException(); 1136 int n = 0; 1137 for (E e; n < maxElements && (e = poll()) != null;) { 1138 c.add(e); 1139 ++n; 1140 } 1141 return n; 1142 } 1143 1144 /* 1145 * To cope with serialization strategy in the 1.5 version of 1146 * SynchronousQueue, we declare some unused classes and fields 1147 * that exist solely to enable serializability across versions. 1148 * These fields are never used, so are initialized only if this 1149 * object is ever serialized or deserialized. 1150 */ 1151 1152 @SuppressWarnings("serial") 1153 static class WaitQueue implements java.io.Serializable { } 1154 static class LifoWaitQueue extends WaitQueue { 1155 private static final long serialVersionUID = -3633113410248163686L; 1156 } 1157 static class FifoWaitQueue extends WaitQueue { 1158 private static final long serialVersionUID = -3623113410248163686L; 1159 } 1160 private ReentrantLock qlock; 1161 private WaitQueue waitingProducers; 1162 private WaitQueue waitingConsumers; 1163 1164 /** 1165 * Saves the state to a stream (that is, serializes it). 1166 * 1167 * @param s the stream 1168 */ 1169 private void writeObject(java.io.ObjectOutputStream s) 1170 throws java.io.IOException { 1171 boolean fair = transferer instanceof TransferQueue; 1172 if (fair) { 1173 qlock = new ReentrantLock(true); 1174 waitingProducers = new FifoWaitQueue(); 1175 waitingConsumers = new FifoWaitQueue(); 1176 } 1177 else { 1178 qlock = new ReentrantLock(); 1179 waitingProducers = new LifoWaitQueue(); 1180 waitingConsumers = new LifoWaitQueue(); 1181 } 1182 s.defaultWriteObject(); 1183 } 1184 1185 private void readObject(final java.io.ObjectInputStream s) 1186 throws java.io.IOException, ClassNotFoundException { 1187 s.defaultReadObject(); 1188 if (waitingProducers instanceof FifoWaitQueue) 1189 transferer = new TransferQueue<E>(); 1190 else 1191 transferer = new TransferStack<E>(); 1192 } 1193 1194 // Unsafe mechanics 1195 static long objectFieldOffset(sun.misc.Unsafe UNSAFE, 1196 String field, Class<?> klazz) { 1197 try { 1198 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 1199 } catch (NoSuchFieldException e) { 1200 // Convert Exception to corresponding Error 1201 NoSuchFieldError error = new NoSuchFieldError(field); 1202 error.initCause(e); 1203 throw error; 1204 } 1205 } 1206 1207 }