1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.AbstractQueue; 39 import java.util.Collection; 40 import java.util.Iterator; 41 import java.util.NoSuchElementException; 42 import java.util.Queue; 43 import java.util.concurrent.TimeUnit; 44 import java.util.concurrent.locks.LockSupport; 45 46 /** 47 * An unbounded {@link TransferQueue} based on linked nodes. 48 * This queue orders elements FIFO (first-in-first-out) with respect 49 * to any given producer. The <em>head</em> of the queue is that 50 * element that has been on the queue the longest time for some 51 * producer. The <em>tail</em> of the queue is that element that has 52 * been on the queue the shortest time for some producer. 53 * 54 * <p>Beware that, unlike in most collections, the {@code size} 55 * method is <em>NOT</em> a constant-time operation. Because of the 56 * asynchronous nature of these queues, determining the current number 57 * of elements requires a traversal of the elements. 58 * 59 * <p>This class and its iterator implement all of the 60 * <em>optional</em> methods of the {@link Collection} and {@link 61 * Iterator} interfaces. 62 * 63 * <p>Memory consistency effects: As with other concurrent 64 * collections, actions in a thread prior to placing an object into a 65 * {@code LinkedTransferQueue} 66 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 67 * actions subsequent to the access or removal of that element from 68 * the {@code LinkedTransferQueue} in another thread. 69 * 70 * <p>This class is a member of the 71 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 72 * Java Collections Framework</a>. 73 * 74 * @since 1.7 75 * @author Doug Lea 76 * @param <E> the type of elements held in this collection 77 */ 78 public class LinkedTransferQueue<E> extends AbstractQueue<E> 79 implements TransferQueue<E>, java.io.Serializable { 80 private static final long serialVersionUID = -3223113410248163686L; 81 82 /* 83 * *** Overview of Dual Queues with Slack *** 84 * 85 * Dual Queues, introduced by Scherer and Scott 86 * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are 87 * (linked) queues in which nodes may represent either data or 88 * requests. When a thread tries to enqueue a data node, but 89 * encounters a request node, it instead "matches" and removes it; 90 * and vice versa for enqueuing requests. Blocking Dual Queues 91 * arrange that threads enqueuing unmatched requests block until 92 * other threads provide the match. Dual Synchronous Queues (see 93 * Scherer, Lea, & Scott 94 * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf) 95 * additionally arrange that threads enqueuing unmatched data also 96 * block. Dual Transfer Queues support all of these modes, as 97 * dictated by callers. 98 * 99 * A FIFO dual queue may be implemented using a variation of the 100 * Michael & Scott (M&S) lock-free queue algorithm 101 * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf). 102 * It maintains two pointer fields, "head", pointing to a 103 * (matched) node that in turn points to the first actual 104 * (unmatched) queue node (or null if empty); and "tail" that 105 * points to the last node on the queue (or again null if 106 * empty). For example, here is a possible queue with four data 107 * elements: 108 * 109 * head tail 110 * | | 111 * v v 112 * M -> U -> U -> U -> U 113 * 114 * The M&S queue algorithm is known to be prone to scalability and 115 * overhead limitations when maintaining (via CAS) these head and 116 * tail pointers. This has led to the development of 117 * contention-reducing variants such as elimination arrays (see 118 * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and 119 * optimistic back pointers (see Ladan-Mozes & Shavit 120 * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf). 121 * However, the nature of dual queues enables a simpler tactic for 122 * improving M&S-style implementations when dual-ness is needed. 123 * 124 * In a dual queue, each node must atomically maintain its match 125 * status. While there are other possible variants, we implement 126 * this here as: for a data-mode node, matching entails CASing an 127 * "item" field from a non-null data value to null upon match, and 128 * vice-versa for request nodes, CASing from null to a data 129 * value. (Note that the linearization properties of this style of 130 * queue are easy to verify -- elements are made available by 131 * linking, and unavailable by matching.) Compared to plain M&S 132 * queues, this property of dual queues requires one additional 133 * successful atomic operation per enq/deq pair. But it also 134 * enables lower cost variants of queue maintenance mechanics. (A 135 * variation of this idea applies even for non-dual queues that 136 * support deletion of interior elements, such as 137 * j.u.c.ConcurrentLinkedQueue.) 138 * 139 * Once a node is matched, its match status can never again 140 * change. We may thus arrange that the linked list of them 141 * contain a prefix of zero or more matched nodes, followed by a 142 * suffix of zero or more unmatched nodes. (Note that we allow 143 * both the prefix and suffix to be zero length, which in turn 144 * means that we do not use a dummy header.) If we were not 145 * concerned with either time or space efficiency, we could 146 * correctly perform enqueue and dequeue operations by traversing 147 * from a pointer to the initial node; CASing the item of the 148 * first unmatched node on match and CASing the next field of the 149 * trailing node on appends. (Plus some special-casing when 150 * initially empty). While this would be a terrible idea in 151 * itself, it does have the benefit of not requiring ANY atomic 152 * updates on head/tail fields. 153 * 154 * We introduce here an approach that lies between the extremes of 155 * never versus always updating queue (head and tail) pointers. 156 * This offers a tradeoff between sometimes requiring extra 157 * traversal steps to locate the first and/or last unmatched 158 * nodes, versus the reduced overhead and contention of fewer 159 * updates to queue pointers. For example, a possible snapshot of 160 * a queue is: 161 * 162 * head tail 163 * | | 164 * v v 165 * M -> M -> U -> U -> U -> U 166 * 167 * The best value for this "slack" (the targeted maximum distance 168 * between the value of "head" and the first unmatched node, and 169 * similarly for "tail") is an empirical matter. We have found 170 * that using very small constants in the range of 1-3 work best 171 * over a range of platforms. Larger values introduce increasing 172 * costs of cache misses and risks of long traversal chains, while 173 * smaller values increase CAS contention and overhead. 174 * 175 * Dual queues with slack differ from plain M&S dual queues by 176 * virtue of only sometimes updating head or tail pointers when 177 * matching, appending, or even traversing nodes; in order to 178 * maintain a targeted slack. The idea of "sometimes" may be 179 * operationalized in several ways. The simplest is to use a 180 * per-operation counter incremented on each traversal step, and 181 * to try (via CAS) to update the associated queue pointer 182 * whenever the count exceeds a threshold. Another, that requires 183 * more overhead, is to use random number generators to update 184 * with a given probability per traversal step. 185 * 186 * In any strategy along these lines, because CASes updating 187 * fields may fail, the actual slack may exceed targeted 188 * slack. However, they may be retried at any time to maintain 189 * targets. Even when using very small slack values, this 190 * approach works well for dual queues because it allows all 191 * operations up to the point of matching or appending an item 192 * (hence potentially allowing progress by another thread) to be 193 * read-only, thus not introducing any further contention. As 194 * described below, we implement this by performing slack 195 * maintenance retries only after these points. 196 * 197 * As an accompaniment to such techniques, traversal overhead can 198 * be further reduced without increasing contention of head 199 * pointer updates: Threads may sometimes shortcut the "next" link 200 * path from the current "head" node to be closer to the currently 201 * known first unmatched node, and similarly for tail. Again, this 202 * may be triggered with using thresholds or randomization. 203 * 204 * These ideas must be further extended to avoid unbounded amounts 205 * of costly-to-reclaim garbage caused by the sequential "next" 206 * links of nodes starting at old forgotten head nodes: As first 207 * described in detail by Boehm 208 * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC 209 * delays noticing that any arbitrarily old node has become 210 * garbage, all newer dead nodes will also be unreclaimed. 211 * (Similar issues arise in non-GC environments.) To cope with 212 * this in our implementation, upon CASing to advance the head 213 * pointer, we set the "next" link of the previous head to point 214 * only to itself; thus limiting the length of connected dead lists. 215 * (We also take similar care to wipe out possibly garbage 216 * retaining values held in other Node fields.) However, doing so 217 * adds some further complexity to traversal: If any "next" 218 * pointer links to itself, it indicates that the current thread 219 * has lagged behind a head-update, and so the traversal must 220 * continue from the "head". Traversals trying to find the 221 * current tail starting from "tail" may also encounter 222 * self-links, in which case they also continue at "head". 223 * 224 * It is tempting in slack-based scheme to not even use CAS for 225 * updates (similarly to Ladan-Mozes & Shavit). However, this 226 * cannot be done for head updates under the above link-forgetting 227 * mechanics because an update may leave head at a detached node. 228 * And while direct writes are possible for tail updates, they 229 * increase the risk of long retraversals, and hence long garbage 230 * chains, which can be much more costly than is worthwhile 231 * considering that the cost difference of performing a CAS vs 232 * write is smaller when they are not triggered on each operation 233 * (especially considering that writes and CASes equally require 234 * additional GC bookkeeping ("write barriers") that are sometimes 235 * more costly than the writes themselves because of contention). 236 * 237 * *** Overview of implementation *** 238 * 239 * We use a threshold-based approach to updates, with a slack 240 * threshold of two -- that is, we update head/tail when the 241 * current pointer appears to be two or more steps away from the 242 * first/last node. The slack value is hard-wired: a path greater 243 * than one is naturally implemented by checking equality of 244 * traversal pointers except when the list has only one element, 245 * in which case we keep slack threshold at one. Avoiding tracking 246 * explicit counts across method calls slightly simplifies an 247 * already-messy implementation. Using randomization would 248 * probably work better if there were a low-quality dirt-cheap 249 * per-thread one available, but even ThreadLocalRandom is too 250 * heavy for these purposes. 251 * 252 * With such a small slack threshold value, it is not worthwhile 253 * to augment this with path short-circuiting (i.e., unsplicing 254 * interior nodes) except in the case of cancellation/removal (see 255 * below). 256 * 257 * We allow both the head and tail fields to be null before any 258 * nodes are enqueued; initializing upon first append. This 259 * simplifies some other logic, as well as providing more 260 * efficient explicit control paths instead of letting JVMs insert 261 * implicit NullPointerExceptions when they are null. While not 262 * currently fully implemented, we also leave open the possibility 263 * of re-nulling these fields when empty (which is complicated to 264 * arrange, for little benefit.) 265 * 266 * All enqueue/dequeue operations are handled by the single method 267 * "xfer" with parameters indicating whether to act as some form 268 * of offer, put, poll, take, or transfer (each possibly with 269 * timeout). The relative complexity of using one monolithic 270 * method outweighs the code bulk and maintenance problems of 271 * using separate methods for each case. 272 * 273 * Operation consists of up to three phases. The first is 274 * implemented within method xfer, the second in tryAppend, and 275 * the third in method awaitMatch. 276 * 277 * 1. Try to match an existing node 278 * 279 * Starting at head, skip already-matched nodes until finding 280 * an unmatched node of opposite mode, if one exists, in which 281 * case matching it and returning, also if necessary updating 282 * head to one past the matched node (or the node itself if the 283 * list has no other unmatched nodes). If the CAS misses, then 284 * a loop retries advancing head by two steps until either 285 * success or the slack is at most two. By requiring that each 286 * attempt advances head by two (if applicable), we ensure that 287 * the slack does not grow without bound. Traversals also check 288 * if the initial head is now off-list, in which case they 289 * start at the new head. 290 * 291 * If no candidates are found and the call was untimed 292 * poll/offer, (argument "how" is NOW) return. 293 * 294 * 2. Try to append a new node (method tryAppend) 295 * 296 * Starting at current tail pointer, find the actual last node 297 * and try to append a new node (or if head was null, establish 298 * the first node). Nodes can be appended only if their 299 * predecessors are either already matched or are of the same 300 * mode. If we detect otherwise, then a new node with opposite 301 * mode must have been appended during traversal, so we must 302 * restart at phase 1. The traversal and update steps are 303 * otherwise similar to phase 1: Retrying upon CAS misses and 304 * checking for staleness. In particular, if a self-link is 305 * encountered, then we can safely jump to a node on the list 306 * by continuing the traversal at current head. 307 * 308 * On successful append, if the call was ASYNC, return. 309 * 310 * 3. Await match or cancellation (method awaitMatch) 311 * 312 * Wait for another thread to match node; instead cancelling if 313 * the current thread was interrupted or the wait timed out. On 314 * multiprocessors, we use front-of-queue spinning: If a node 315 * appears to be the first unmatched node in the queue, it 316 * spins a bit before blocking. In either case, before blocking 317 * it tries to unsplice any nodes between the current "head" 318 * and the first unmatched node. 319 * 320 * Front-of-queue spinning vastly improves performance of 321 * heavily contended queues. And so long as it is relatively 322 * brief and "quiet", spinning does not much impact performance 323 * of less-contended queues. During spins threads check their 324 * interrupt status and generate a thread-local random number 325 * to decide to occasionally perform a Thread.yield. While 326 * yield has underdefined specs, we assume that might it help, 327 * and will not hurt in limiting impact of spinning on busy 328 * systems. We also use smaller (1/2) spins for nodes that are 329 * not known to be front but whose predecessors have not 330 * blocked -- these "chained" spins avoid artifacts of 331 * front-of-queue rules which otherwise lead to alternating 332 * nodes spinning vs blocking. Further, front threads that 333 * represent phase changes (from data to request node or vice 334 * versa) compared to their predecessors receive additional 335 * chained spins, reflecting longer paths typically required to 336 * unblock threads during phase changes. 337 * 338 * 339 * ** Unlinking removed interior nodes ** 340 * 341 * In addition to minimizing garbage retention via self-linking 342 * described above, we also unlink removed interior nodes. These 343 * may arise due to timed out or interrupted waits, or calls to 344 * remove(x) or Iterator.remove. Normally, given a node that was 345 * at one time known to be the predecessor of some node s that is 346 * to be removed, we can unsplice s by CASing the next field of 347 * its predecessor if it still points to s (otherwise s must 348 * already have been removed or is now offlist). But there are two 349 * situations in which we cannot guarantee to make node s 350 * unreachable in this way: (1) If s is the trailing node of list 351 * (i.e., with null next), then it is pinned as the target node 352 * for appends, so can only be removed later after other nodes are 353 * appended. (2) We cannot necessarily unlink s given a 354 * predecessor node that is matched (including the case of being 355 * cancelled): the predecessor may already be unspliced, in which 356 * case some previous reachable node may still point to s. 357 * (For further explanation see Herlihy & Shavit "The Art of 358 * Multiprocessor Programming" chapter 9). Although, in both 359 * cases, we can rule out the need for further action if either s 360 * or its predecessor are (or can be made to be) at, or fall off 361 * from, the head of list. 362 * 363 * Without taking these into account, it would be possible for an 364 * unbounded number of supposedly removed nodes to remain 365 * reachable. Situations leading to such buildup are uncommon but 366 * can occur in practice; for example when a series of short timed 367 * calls to poll repeatedly time out but never otherwise fall off 368 * the list because of an untimed call to take at the front of the 369 * queue. 370 * 371 * When these cases arise, rather than always retraversing the 372 * entire list to find an actual predecessor to unlink (which 373 * won't help for case (1) anyway), we record a conservative 374 * estimate of possible unsplice failures (in "sweepVotes"). 375 * We trigger a full sweep when the estimate exceeds a threshold 376 * ("SWEEP_THRESHOLD") indicating the maximum number of estimated 377 * removal failures to tolerate before sweeping through, unlinking 378 * cancelled nodes that were not unlinked upon initial removal. 379 * We perform sweeps by the thread hitting threshold (rather than 380 * background threads or by spreading work to other threads) 381 * because in the main contexts in which removal occurs, the 382 * caller is already timed-out, cancelled, or performing a 383 * potentially O(n) operation (e.g. remove(x)), none of which are 384 * time-critical enough to warrant the overhead that alternatives 385 * would impose on other threads. 386 * 387 * Because the sweepVotes estimate is conservative, and because 388 * nodes become unlinked "naturally" as they fall off the head of 389 * the queue, and because we allow votes to accumulate even while 390 * sweeps are in progress, there are typically significantly fewer 391 * such nodes than estimated. Choice of a threshold value 392 * balances the likelihood of wasted effort and contention, versus 393 * providing a worst-case bound on retention of interior nodes in 394 * quiescent queues. The value defined below was chosen 395 * empirically to balance these under various timeout scenarios. 396 * 397 * Note that we cannot self-link unlinked interior nodes during 398 * sweeps. However, the associated garbage chains terminate when 399 * some successor ultimately falls off the head of the list and is 400 * self-linked. 401 */ 402 403 /** True if on multiprocessor */ 404 private static final boolean MP = 405 Runtime.getRuntime().availableProcessors() > 1; 406 407 /** 408 * The number of times to spin (with randomly interspersed calls 409 * to Thread.yield) on multiprocessor before blocking when a node 410 * is apparently the first waiter in the queue. See above for 411 * explanation. Must be a power of two. The value is empirically 412 * derived -- it works pretty well across a variety of processors, 413 * numbers of CPUs, and OSes. 414 */ 415 private static final int FRONT_SPINS = 1 << 7; 416 417 /** 418 * The number of times to spin before blocking when a node is 419 * preceded by another node that is apparently spinning. Also 420 * serves as an increment to FRONT_SPINS on phase changes, and as 421 * base average frequency for yielding during spins. Must be a 422 * power of two. 423 */ 424 private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; 425 426 /** 427 * The maximum number of estimated removal failures (sweepVotes) 428 * to tolerate before sweeping through the queue unlinking 429 * cancelled nodes that were not unlinked upon initial 430 * removal. See above for explanation. The value must be at least 431 * two to avoid useless sweeps when removing trailing nodes. 432 */ 433 static final int SWEEP_THRESHOLD = 32; 434 435 /** 436 * Queue nodes. Uses Object, not E, for items to allow forgetting 437 * them after use. Relies heavily on Unsafe mechanics to minimize 438 * unnecessary ordering constraints: Writes that are intrinsically 439 * ordered wrt other accesses or CASes use simple relaxed forms. 440 */ 441 static final class Node { 442 final boolean isData; // false if this is a request node 443 volatile Object item; // initially non-null if isData; CASed to match 444 volatile Node next; 445 volatile Thread waiter; // null until waiting 446 447 // CAS methods for fields 448 final boolean casNext(Node cmp, Node val) { 449 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 450 } 451 452 final boolean casItem(Object cmp, Object val) { 453 // assert cmp == null || cmp.getClass() != Node.class; 454 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 455 } 456 457 /** 458 * Constructs a new node. Uses relaxed write because item can 459 * only be seen after publication via casNext. 460 */ 461 Node(Object item, boolean isData) { 462 UNSAFE.putObject(this, itemOffset, item); // relaxed write 463 this.isData = isData; 464 } 465 466 /** 467 * Links node to itself to avoid garbage retention. Called 468 * only after CASing head field, so uses relaxed write. 469 */ 470 final void forgetNext() { 471 UNSAFE.putObject(this, nextOffset, this); 472 } 473 474 /** 475 * Sets item to self and waiter to null, to avoid garbage 476 * retention after matching or cancelling. Uses relaxed writes 477 * because order is already constrained in the only calling 478 * contexts: item is forgotten only after volatile/atomic 479 * mechanics that extract items. Similarly, clearing waiter 480 * follows either CAS or return from park (if ever parked; 481 * else we don't care). 482 */ 483 final void forgetContents() { 484 UNSAFE.putObject(this, itemOffset, this); 485 UNSAFE.putObject(this, waiterOffset, null); 486 } 487 488 /** 489 * Returns true if this node has been matched, including the 490 * case of artificial matches due to cancellation. 491 */ 492 final boolean isMatched() { 493 Object x = item; 494 return (x == this) || ((x == null) == isData); 495 } 496 497 /** 498 * Returns true if this is an unmatched request node. 499 */ 500 final boolean isUnmatchedRequest() { 501 return !isData && item == null; 502 } 503 504 /** 505 * Returns true if a node with the given mode cannot be 506 * appended to this node because this node is unmatched and 507 * has opposite data mode. 508 */ 509 final boolean cannotPrecede(boolean haveData) { 510 boolean d = isData; 511 Object x; 512 return d != haveData && (x = item) != this && (x != null) == d; 513 } 514 515 /** 516 * Tries to artificially match a data node -- used by remove. 517 */ 518 final boolean tryMatchData() { 519 // assert isData; 520 Object x = item; 521 if (x != null && x != this && casItem(x, null)) { 522 LockSupport.unpark(waiter); 523 return true; 524 } 525 return false; 526 } 527 528 // Unsafe mechanics 529 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 530 private static final long nextOffset = 531 objectFieldOffset(UNSAFE, "next", Node.class); 532 private static final long itemOffset = 533 objectFieldOffset(UNSAFE, "item", Node.class); 534 private static final long waiterOffset = 535 objectFieldOffset(UNSAFE, "waiter", Node.class); 536 537 private static final long serialVersionUID = -3375979862319811754L; 538 } 539 540 /** head of the queue; null until first enqueue */ 541 transient volatile Node head; 542 543 /** tail of the queue; null until first append */ 544 private transient volatile Node tail; 545 546 /** The number of apparent failures to unsplice removed nodes */ 547 private transient volatile int sweepVotes; 548 549 // CAS methods for fields 550 private boolean casTail(Node cmp, Node val) { 551 return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); 552 } 553 554 private boolean casHead(Node cmp, Node val) { 555 return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); 556 } 557 558 private boolean casSweepVotes(int cmp, int val) { 559 return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val); 560 } 561 562 /* 563 * Possible values for "how" argument in xfer method. 564 */ 565 private static final int NOW = 0; // for untimed poll, tryTransfer 566 private static final int ASYNC = 1; // for offer, put, add 567 private static final int SYNC = 2; // for transfer, take 568 private static final int TIMED = 3; // for timed poll, tryTransfer 569 570 @SuppressWarnings("unchecked") 571 static <E> E cast(Object item) { 572 // assert item == null || item.getClass() != Node.class; 573 return (E) item; 574 } 575 576 /** 577 * Implements all queuing methods. See above for explanation. 578 * 579 * @param e the item or null for take 580 * @param haveData true if this is a put, else a take 581 * @param how NOW, ASYNC, SYNC, or TIMED 582 * @param nanos timeout in nanosecs, used only if mode is TIMED 583 * @return an item if matched, else e 584 * @throws NullPointerException if haveData mode but e is null 585 */ 586 private E xfer(E e, boolean haveData, int how, long nanos) { 587 if (haveData && (e == null)) 588 throw new NullPointerException(); 589 Node s = null; // the node to append, if needed 590 591 retry: 592 for (;;) { // restart on append race 593 594 for (Node h = head, p = h; p != null;) { // find & match first node 595 boolean isData = p.isData; 596 Object item = p.item; 597 if (item != p && (item != null) == isData) { // unmatched 598 if (isData == haveData) // can't match 599 break; 600 if (p.casItem(item, e)) { // match 601 for (Node q = p; q != h;) { 602 Node n = q.next; // update by 2 unless singleton 603 if (head == h && casHead(h, n == null ? q : n)) { 604 h.forgetNext(); 605 break; 606 } // advance and retry 607 if ((h = head) == null || 608 (q = h.next) == null || !q.isMatched()) 609 break; // unless slack < 2 610 } 611 LockSupport.unpark(p.waiter); 612 return this.<E>cast(item); 613 } 614 } 615 Node n = p.next; 616 p = (p != n) ? n : (h = head); // Use head if p offlist 617 } 618 619 if (how != NOW) { // No matches available 620 if (s == null) 621 s = new Node(e, haveData); 622 Node pred = tryAppend(s, haveData); 623 if (pred == null) 624 continue retry; // lost race vs opposite mode 625 if (how != ASYNC) 626 return awaitMatch(s, pred, e, (how == TIMED), nanos); 627 } 628 return e; // not waiting 629 } 630 } 631 632 /** 633 * Tries to append node s as tail. 634 * 635 * @param s the node to append 636 * @param haveData true if appending in data mode 637 * @return null on failure due to losing race with append in 638 * different mode, else s's predecessor, or s itself if no 639 * predecessor 640 */ 641 private Node tryAppend(Node s, boolean haveData) { 642 for (Node t = tail, p = t;;) { // move p to last node and append 643 Node n, u; // temps for reads of next & tail 644 if (p == null && (p = head) == null) { 645 if (casHead(null, s)) 646 return s; // initialize 647 } 648 else if (p.cannotPrecede(haveData)) 649 return null; // lost race vs opposite mode 650 else if ((n = p.next) != null) // not last; keep traversing 651 p = p != t && t != (u = tail) ? (t = u) : // stale tail 652 (p != n) ? n : null; // restart if off list 653 else if (!p.casNext(null, s)) 654 p = p.next; // re-read on CAS failure 655 else { 656 if (p != t) { // update if slack now >= 2 657 while ((tail != t || !casTail(t, s)) && 658 (t = tail) != null && 659 (s = t.next) != null && // advance and retry 660 (s = s.next) != null && s != t); 661 } 662 return p; 663 } 664 } 665 } 666 667 /** 668 * Spins/yields/blocks until node s is matched or caller gives up. 669 * 670 * @param s the waiting node 671 * @param pred the predecessor of s, or s itself if it has no 672 * predecessor, or null if unknown (the null case does not occur 673 * in any current calls but may in possible future extensions) 674 * @param e the comparison value for checking match 675 * @param timed if true, wait only until timeout elapses 676 * @param nanos timeout in nanosecs, used only if timed is true 677 * @return matched item, or e if unmatched on interrupt or timeout 678 */ 679 private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { 680 long lastTime = timed ? System.nanoTime() : 0L; 681 Thread w = Thread.currentThread(); 682 int spins = -1; // initialized after first item and cancel checks 683 ThreadLocalRandom randomYields = null; // bound if needed 684 685 for (;;) { 686 Object item = s.item; 687 if (item != e) { // matched 688 // assert item != s; 689 s.forgetContents(); // avoid garbage 690 return this.<E>cast(item); 691 } 692 if ((w.isInterrupted() || (timed && nanos <= 0)) && 693 s.casItem(e, s)) { // cancel 694 unsplice(pred, s); 695 return e; 696 } 697 698 if (spins < 0) { // establish spins at/near front 699 if ((spins = spinsFor(pred, s.isData)) > 0) 700 randomYields = ThreadLocalRandom.current(); 701 } 702 else if (spins > 0) { // spin 703 --spins; 704 if (randomYields.nextInt(CHAINED_SPINS) == 0) 705 Thread.yield(); // occasionally yield 706 } 707 else if (s.waiter == null) { 708 s.waiter = w; // request unpark then recheck 709 } 710 else if (timed) { 711 long now = System.nanoTime(); 712 if ((nanos -= now - lastTime) > 0) 713 LockSupport.parkNanos(this, nanos); 714 lastTime = now; 715 } 716 else { 717 LockSupport.park(this); 718 } 719 } 720 } 721 722 /** 723 * Returns spin/yield value for a node with given predecessor and 724 * data mode. See above for explanation. 725 */ 726 private static int spinsFor(Node pred, boolean haveData) { 727 if (MP && pred != null) { 728 if (pred.isData != haveData) // phase change 729 return FRONT_SPINS + CHAINED_SPINS; 730 if (pred.isMatched()) // probably at front 731 return FRONT_SPINS; 732 if (pred.waiter == null) // pred apparently spinning 733 return CHAINED_SPINS; 734 } 735 return 0; 736 } 737 738 /* -------------- Traversal methods -------------- */ 739 740 /** 741 * Returns the successor of p, or the head node if p.next has been 742 * linked to self, which will only be true if traversing with a 743 * stale pointer that is now off the list. 744 */ 745 final Node succ(Node p) { 746 Node next = p.next; 747 return (p == next) ? head : next; 748 } 749 750 /** 751 * Returns the first unmatched node of the given mode, or null if 752 * none. Used by methods isEmpty, hasWaitingConsumer. 753 */ 754 private Node firstOfMode(boolean isData) { 755 for (Node p = head; p != null; p = succ(p)) { 756 if (!p.isMatched()) 757 return (p.isData == isData) ? p : null; 758 } 759 return null; 760 } 761 762 /** 763 * Returns the item in the first unmatched node with isData; or 764 * null if none. Used by peek. 765 */ 766 private E firstDataItem() { 767 for (Node p = head; p != null; p = succ(p)) { 768 Object item = p.item; 769 if (p.isData) { 770 if (item != null && item != p) 771 return this.<E>cast(item); 772 } 773 else if (item == null) 774 return null; 775 } 776 return null; 777 } 778 779 /** 780 * Traverses and counts unmatched nodes of the given mode. 781 * Used by methods size and getWaitingConsumerCount. 782 */ 783 private int countOfMode(boolean data) { 784 int count = 0; 785 for (Node p = head; p != null; ) { 786 if (!p.isMatched()) { 787 if (p.isData != data) 788 return 0; 789 if (++count == Integer.MAX_VALUE) // saturated 790 break; 791 } 792 Node n = p.next; 793 if (n != p) 794 p = n; 795 else { 796 count = 0; 797 p = head; 798 } 799 } 800 return count; 801 } 802 803 final class Itr implements Iterator<E> { 804 private Node nextNode; // next node to return item for 805 private E nextItem; // the corresponding item 806 private Node lastRet; // last returned node, to support remove 807 private Node lastPred; // predecessor to unlink lastRet 808 809 /** 810 * Moves to next node after prev, or first node if prev null. 811 */ 812 private void advance(Node prev) { 813 /* 814 * To track and avoid buildup of deleted nodes in the face 815 * of calls to both Queue.remove and Itr.remove, we must 816 * include variants of unsplice and sweep upon each 817 * advance: Upon Itr.remove, we may need to catch up links 818 * from lastPred, and upon other removes, we might need to 819 * skip ahead from stale nodes and unsplice deleted ones 820 * found while advancing. 821 */ 822 823 Node r, b; // reset lastPred upon possible deletion of lastRet 824 if ((r = lastRet) != null && !r.isMatched()) 825 lastPred = r; // next lastPred is old lastRet 826 else if ((b = lastPred) == null || b.isMatched()) 827 lastPred = null; // at start of list 828 else { 829 Node s, n; // help with removal of lastPred.next 830 while ((s = b.next) != null && 831 s != b && s.isMatched() && 832 (n = s.next) != null && n != s) 833 b.casNext(s, n); 834 } 835 836 this.lastRet = prev; 837 838 for (Node p = prev, s, n;;) { 839 s = (p == null) ? head : p.next; 840 if (s == null) 841 break; 842 else if (s == p) { 843 p = null; 844 continue; 845 } 846 Object item = s.item; 847 if (s.isData) { 848 if (item != null && item != s) { 849 nextItem = LinkedTransferQueue.<E>cast(item); 850 nextNode = s; 851 return; 852 } 853 } 854 else if (item == null) 855 break; 856 // assert s.isMatched(); 857 if (p == null) 858 p = s; 859 else if ((n = s.next) == null) 860 break; 861 else if (s == n) 862 p = null; 863 else 864 p.casNext(s, n); 865 } 866 nextNode = null; 867 nextItem = null; 868 } 869 870 Itr() { 871 advance(null); 872 } 873 874 public final boolean hasNext() { 875 return nextNode != null; 876 } 877 878 public final E next() { 879 Node p = nextNode; 880 if (p == null) throw new NoSuchElementException(); 881 E e = nextItem; 882 advance(p); 883 return e; 884 } 885 886 public final void remove() { 887 final Node lastRet = this.lastRet; 888 if (lastRet == null) 889 throw new IllegalStateException(); 890 this.lastRet = null; 891 if (lastRet.tryMatchData()) 892 unsplice(lastPred, lastRet); 893 } 894 } 895 896 /* -------------- Removal methods -------------- */ 897 898 /** 899 * Unsplices (now or later) the given deleted/cancelled node with 900 * the given predecessor. 901 * 902 * @param pred a node that was at one time known to be the 903 * predecessor of s, or null or s itself if s is/was at head 904 * @param s the node to be unspliced 905 */ 906 final void unsplice(Node pred, Node s) { 907 s.forgetContents(); // forget unneeded fields 908 /* 909 * See above for rationale. Briefly: if pred still points to 910 * s, try to unlink s. If s cannot be unlinked, because it is 911 * trailing node or pred might be unlinked, and neither pred 912 * nor s are head or offlist, add to sweepVotes, and if enough 913 * votes have accumulated, sweep. 914 */ 915 if (pred != null && pred != s && pred.next == s) { 916 Node n = s.next; 917 if (n == null || 918 (n != s && pred.casNext(s, n) && pred.isMatched())) { 919 for (;;) { // check if at, or could be, head 920 Node h = head; 921 if (h == pred || h == s || h == null) 922 return; // at head or list empty 923 if (!h.isMatched()) 924 break; 925 Node hn = h.next; 926 if (hn == null) 927 return; // now empty 928 if (hn != h && casHead(h, hn)) 929 h.forgetNext(); // advance head 930 } 931 if (pred.next != pred && s.next != s) { // recheck if offlist 932 for (;;) { // sweep now if enough votes 933 int v = sweepVotes; 934 if (v < SWEEP_THRESHOLD) { 935 if (casSweepVotes(v, v + 1)) 936 break; 937 } 938 else if (casSweepVotes(v, 0)) { 939 sweep(); 940 break; 941 } 942 } 943 } 944 } 945 } 946 } 947 948 /** 949 * Unlinks matched (typically cancelled) nodes encountered in a 950 * traversal from head. 951 */ 952 private void sweep() { 953 for (Node p = head, s, n; p != null && (s = p.next) != null; ) { 954 if (!s.isMatched()) 955 // Unmatched nodes are never self-linked 956 p = s; 957 else if ((n = s.next) == null) // trailing node is pinned 958 break; 959 else if (s == n) // stale 960 // No need to also check for p == s, since that implies s == n 961 p = head; 962 else 963 p.casNext(s, n); 964 } 965 } 966 967 /** 968 * Main implementation of remove(Object) 969 */ 970 private boolean findAndRemove(Object e) { 971 if (e != null) { 972 for (Node pred = null, p = head; p != null; ) { 973 Object item = p.item; 974 if (p.isData) { 975 if (item != null && item != p && e.equals(item) && 976 p.tryMatchData()) { 977 unsplice(pred, p); 978 return true; 979 } 980 } 981 else if (item == null) 982 break; 983 pred = p; 984 if ((p = p.next) == pred) { // stale 985 pred = null; 986 p = head; 987 } 988 } 989 } 990 return false; 991 } 992 993 994 /** 995 * Creates an initially empty {@code LinkedTransferQueue}. 996 */ 997 public LinkedTransferQueue() { 998 } 999 1000 /** 1001 * Creates a {@code LinkedTransferQueue} 1002 * initially containing the elements of the given collection, 1003 * added in traversal order of the collection's iterator. 1004 * 1005 * @param c the collection of elements to initially contain 1006 * @throws NullPointerException if the specified collection or any 1007 * of its elements are null 1008 */ 1009 public LinkedTransferQueue(Collection<? extends E> c) { 1010 this(); 1011 addAll(c); 1012 } 1013 1014 /** 1015 * Inserts the specified element at the tail of this queue. 1016 * As the queue is unbounded, this method will never block. 1017 * 1018 * @throws NullPointerException if the specified element is null 1019 */ 1020 public void put(E e) { 1021 xfer(e, true, ASYNC, 0); 1022 } 1023 1024 /** 1025 * Inserts the specified element at the tail of this queue. 1026 * As the queue is unbounded, this method will never block or 1027 * return {@code false}. 1028 * 1029 * @return {@code true} (as specified by 1030 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) 1031 * @throws NullPointerException if the specified element is null 1032 */ 1033 public boolean offer(E e, long timeout, TimeUnit unit) { 1034 xfer(e, true, ASYNC, 0); 1035 return true; 1036 } 1037 1038 /** 1039 * Inserts the specified element at the tail of this queue. 1040 * As the queue is unbounded, this method will never return {@code false}. 1041 * 1042 * @return {@code true} (as specified by {@link Queue#offer}) 1043 * @throws NullPointerException if the specified element is null 1044 */ 1045 public boolean offer(E e) { 1046 xfer(e, true, ASYNC, 0); 1047 return true; 1048 } 1049 1050 /** 1051 * Inserts the specified element at the tail of this queue. 1052 * As the queue is unbounded, this method will never throw 1053 * {@link IllegalStateException} or return {@code false}. 1054 * 1055 * @return {@code true} (as specified by {@link Collection#add}) 1056 * @throws NullPointerException if the specified element is null 1057 */ 1058 public boolean add(E e) { 1059 xfer(e, true, ASYNC, 0); 1060 return true; 1061 } 1062 1063 /** 1064 * Transfers the element to a waiting consumer immediately, if possible. 1065 * 1066 * <p>More precisely, transfers the specified element immediately 1067 * if there exists a consumer already waiting to receive it (in 1068 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1069 * otherwise returning {@code false} without enqueuing the element. 1070 * 1071 * @throws NullPointerException if the specified element is null 1072 */ 1073 public boolean tryTransfer(E e) { 1074 return xfer(e, true, NOW, 0) == null; 1075 } 1076 1077 /** 1078 * Transfers the element to a consumer, waiting if necessary to do so. 1079 * 1080 * <p>More precisely, transfers the specified element immediately 1081 * if there exists a consumer already waiting to receive it (in 1082 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1083 * else inserts the specified element at the tail of this queue 1084 * and waits until the element is received by a consumer. 1085 * 1086 * @throws NullPointerException if the specified element is null 1087 */ 1088 public void transfer(E e) throws InterruptedException { 1089 if (xfer(e, true, SYNC, 0) != null) { 1090 Thread.interrupted(); // failure possible only due to interrupt 1091 throw new InterruptedException(); 1092 } 1093 } 1094 1095 /** 1096 * Transfers the element to a consumer if it is possible to do so 1097 * before the timeout elapses. 1098 * 1099 * <p>More precisely, transfers the specified element immediately 1100 * if there exists a consumer already waiting to receive it (in 1101 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1102 * else inserts the specified element at the tail of this queue 1103 * and waits until the element is received by a consumer, 1104 * returning {@code false} if the specified wait time elapses 1105 * before the element can be transferred. 1106 * 1107 * @throws NullPointerException if the specified element is null 1108 */ 1109 public boolean tryTransfer(E e, long timeout, TimeUnit unit) 1110 throws InterruptedException { 1111 if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) 1112 return true; 1113 if (!Thread.interrupted()) 1114 return false; 1115 throw new InterruptedException(); 1116 } 1117 1118 public E take() throws InterruptedException { 1119 E e = xfer(null, false, SYNC, 0); 1120 if (e != null) 1121 return e; 1122 Thread.interrupted(); 1123 throw new InterruptedException(); 1124 } 1125 1126 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 1127 E e = xfer(null, false, TIMED, unit.toNanos(timeout)); 1128 if (e != null || !Thread.interrupted()) 1129 return e; 1130 throw new InterruptedException(); 1131 } 1132 1133 public E poll() { 1134 return xfer(null, false, NOW, 0); 1135 } 1136 1137 /** 1138 * @throws NullPointerException {@inheritDoc} 1139 * @throws IllegalArgumentException {@inheritDoc} 1140 */ 1141 public int drainTo(Collection<? super E> c) { 1142 if (c == null) 1143 throw new NullPointerException(); 1144 if (c == this) 1145 throw new IllegalArgumentException(); 1146 int n = 0; 1147 E e; 1148 while ( (e = poll()) != null) { 1149 c.add(e); 1150 ++n; 1151 } 1152 return n; 1153 } 1154 1155 /** 1156 * @throws NullPointerException {@inheritDoc} 1157 * @throws IllegalArgumentException {@inheritDoc} 1158 */ 1159 public int drainTo(Collection<? super E> c, int maxElements) { 1160 if (c == null) 1161 throw new NullPointerException(); 1162 if (c == this) 1163 throw new IllegalArgumentException(); 1164 int n = 0; 1165 E e; 1166 while (n < maxElements && (e = poll()) != null) { 1167 c.add(e); 1168 ++n; 1169 } 1170 return n; 1171 } 1172 1173 /** 1174 * Returns an iterator over the elements in this queue in proper sequence. 1175 * The elements will be returned in order from first (head) to last (tail). 1176 * 1177 * <p>The returned iterator is a "weakly consistent" iterator that 1178 * will never throw {@link java.util.ConcurrentModificationException 1179 * ConcurrentModificationException}, and guarantees to traverse 1180 * elements as they existed upon construction of the iterator, and 1181 * may (but is not guaranteed to) reflect any modifications 1182 * subsequent to construction. 1183 * 1184 * @return an iterator over the elements in this queue in proper sequence 1185 */ 1186 public Iterator<E> iterator() { 1187 return new Itr(); 1188 } 1189 1190 public E peek() { 1191 return firstDataItem(); 1192 } 1193 1194 /** 1195 * Returns {@code true} if this queue contains no elements. 1196 * 1197 * @return {@code true} if this queue contains no elements 1198 */ 1199 public boolean isEmpty() { 1200 for (Node p = head; p != null; p = succ(p)) { 1201 if (!p.isMatched()) 1202 return !p.isData; 1203 } 1204 return true; 1205 } 1206 1207 public boolean hasWaitingConsumer() { 1208 return firstOfMode(false) != null; 1209 } 1210 1211 /** 1212 * Returns the number of elements in this queue. If this queue 1213 * contains more than {@code Integer.MAX_VALUE} elements, returns 1214 * {@code Integer.MAX_VALUE}. 1215 * 1216 * <p>Beware that, unlike in most collections, this method is 1217 * <em>NOT</em> a constant-time operation. Because of the 1218 * asynchronous nature of these queues, determining the current 1219 * number of elements requires an O(n) traversal. 1220 * 1221 * @return the number of elements in this queue 1222 */ 1223 public int size() { 1224 return countOfMode(true); 1225 } 1226 1227 public int getWaitingConsumerCount() { 1228 return countOfMode(false); 1229 } 1230 1231 /** 1232 * Removes a single instance of the specified element from this queue, 1233 * if it is present. More formally, removes an element {@code e} such 1234 * that {@code o.equals(e)}, if this queue contains one or more such 1235 * elements. 1236 * Returns {@code true} if this queue contained the specified element 1237 * (or equivalently, if this queue changed as a result of the call). 1238 * 1239 * @param o element to be removed from this queue, if present 1240 * @return {@code true} if this queue changed as a result of the call 1241 */ 1242 public boolean remove(Object o) { 1243 return findAndRemove(o); 1244 } 1245 1246 /** 1247 * Returns {@code true} if this queue contains the specified element. 1248 * More formally, returns {@code true} if and only if this queue contains 1249 * at least one element {@code e} such that {@code o.equals(e)}. 1250 * 1251 * @param o object to be checked for containment in this queue 1252 * @return {@code true} if this queue contains the specified element 1253 */ 1254 public boolean contains(Object o) { 1255 if (o == null) return false; 1256 for (Node p = head; p != null; p = succ(p)) { 1257 Object item = p.item; 1258 if (p.isData) { 1259 if (item != null && item != p && o.equals(item)) 1260 return true; 1261 } 1262 else if (item == null) 1263 break; 1264 } 1265 return false; 1266 } 1267 1268 /** 1269 * Always returns {@code Integer.MAX_VALUE} because a 1270 * {@code LinkedTransferQueue} is not capacity constrained. 1271 * 1272 * @return {@code Integer.MAX_VALUE} (as specified by 1273 * {@link BlockingQueue#remainingCapacity()}) 1274 */ 1275 public int remainingCapacity() { 1276 return Integer.MAX_VALUE; 1277 } 1278 1279 /** 1280 * Saves the state to a stream (that is, serializes it). 1281 * 1282 * @serialData All of the elements (each an {@code E}) in 1283 * the proper order, followed by a null 1284 * @param s the stream 1285 */ 1286 private void writeObject(java.io.ObjectOutputStream s) 1287 throws java.io.IOException { 1288 s.defaultWriteObject(); 1289 for (E e : this) 1290 s.writeObject(e); 1291 // Use trailing null as sentinel 1292 s.writeObject(null); 1293 } 1294 1295 /** 1296 * Reconstitutes the Queue instance from a stream (that is, 1297 * deserializes it). 1298 * 1299 * @param s the stream 1300 */ 1301 private void readObject(java.io.ObjectInputStream s) 1302 throws java.io.IOException, ClassNotFoundException { 1303 s.defaultReadObject(); 1304 for (;;) { 1305 @SuppressWarnings("unchecked") E item = (E) s.readObject(); 1306 if (item == null) 1307 break; 1308 else 1309 offer(item); 1310 } 1311 } 1312 1313 // Unsafe mechanics 1314 1315 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 1316 private static final long headOffset = 1317 objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class); 1318 private static final long tailOffset = 1319 objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class); 1320 private static final long sweepVotesOffset = 1321 objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class); 1322 1323 static long objectFieldOffset(sun.misc.Unsafe UNSAFE, 1324 String field, Class<?> klazz) { 1325 try { 1326 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 1327 } catch (NoSuchFieldException e) { 1328 // Convert Exception to corresponding Error 1329 NoSuchFieldError error = new NoSuchFieldError(field); 1330 error.initCause(e); 1331 throw error; 1332 } 1333 } 1334 } --- EOF ---