1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.AbstractQueue; 39 import java.util.Collection; 40 import java.util.ConcurrentModificationException; 41 import java.util.Iterator; 42 import java.util.NoSuchElementException; 43 import java.util.Queue; 44 import java.util.concurrent.locks.LockSupport; 45 46 /** 47 * An unbounded {@link TransferQueue} based on linked nodes. 48 * This queue orders elements FIFO (first-in-first-out) with respect 49 * to any given producer. The <em>head</em> of the queue is that 50 * element that has been on the queue the longest time for some 51 * producer. The <em>tail</em> of the queue is that element that has 52 * been on the queue the shortest time for some producer. 53 * 54 * <p>Beware that, unlike in most collections, the {@code size} 55 * method is <em>NOT</em> a constant-time operation. Because of the 56 * asynchronous nature of these queues, determining the current number 57 * of elements requires a traversal of the elements. 58 * 59 * <p>This class and its iterator implement all of the 60 * <em>optional</em> methods of the {@link Collection} and {@link 61 * Iterator} interfaces. 62 * 63 * <p>Memory consistency effects: As with other concurrent 64 * collections, actions in a thread prior to placing an object into a 65 * {@code LinkedTransferQueue} 66 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 67 * actions subsequent to the access or removal of that element from 68 * the {@code LinkedTransferQueue} in another thread. 69 * 70 * <p>This class is a member of the 71 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 72 * Java Collections Framework</a>. 73 * 74 * @since 1.7 75 * @author Doug Lea 76 * @param <E> the type of elements held in this collection 77 */ 78 public class LinkedTransferQueue<E> extends AbstractQueue<E> 79 implements TransferQueue<E>, java.io.Serializable { 80 private static final long serialVersionUID = -3223113410248163686L; 81 82 /* 83 * *** Overview of Dual Queues with Slack *** 84 * 85 * Dual Queues, introduced by Scherer and Scott 86 * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are 87 * (linked) queues in which nodes may represent either data or 88 * requests. When a thread tries to enqueue a data node, but 89 * encounters a request node, it instead "matches" and removes it; 90 * and vice versa for enqueuing requests. Blocking Dual Queues 91 * arrange that threads enqueuing unmatched requests block until 92 * other threads provide the match. Dual Synchronous Queues (see 93 * Scherer, Lea, & Scott 94 * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf) 95 * additionally arrange that threads enqueuing unmatched data also 96 * block. Dual Transfer Queues support all of these modes, as 97 * dictated by callers. 98 * 99 * A FIFO dual queue may be implemented using a variation of the 100 * Michael & Scott (M&S) lock-free queue algorithm 101 * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf). 102 * It maintains two pointer fields, "head", pointing to a 103 * (matched) node that in turn points to the first actual 104 * (unmatched) queue node (or null if empty); and "tail" that 105 * points to the last node on the queue (or again null if 106 * empty). For example, here is a possible queue with four data 107 * elements: 108 * 109 * head tail 110 * | | 111 * v v 112 * M -> U -> U -> U -> U 113 * 114 * The M&S queue algorithm is known to be prone to scalability and 115 * overhead limitations when maintaining (via CAS) these head and 116 * tail pointers. This has led to the development of 117 * contention-reducing variants such as elimination arrays (see 118 * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and 119 * optimistic back pointers (see Ladan-Mozes & Shavit 120 * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf). 121 * However, the nature of dual queues enables a simpler tactic for 122 * improving M&S-style implementations when dual-ness is needed. 123 * 124 * In a dual queue, each node must atomically maintain its match 125 * status. While there are other possible variants, we implement 126 * this here as: for a data-mode node, matching entails CASing an 127 * "item" field from a non-null data value to null upon match, and 128 * vice-versa for request nodes, CASing from null to a data 129 * value. (Note that the linearization properties of this style of 130 * queue are easy to verify -- elements are made available by 131 * linking, and unavailable by matching.) Compared to plain M&S 132 * queues, this property of dual queues requires one additional 133 * successful atomic operation per enq/deq pair. But it also 134 * enables lower cost variants of queue maintenance mechanics. (A 135 * variation of this idea applies even for non-dual queues that 136 * support deletion of interior elements, such as 137 * j.u.c.ConcurrentLinkedQueue.) 138 * 139 * Once a node is matched, its match status can never again 140 * change. We may thus arrange that the linked list of them 141 * contain a prefix of zero or more matched nodes, followed by a 142 * suffix of zero or more unmatched nodes. (Note that we allow 143 * both the prefix and suffix to be zero length, which in turn 144 * means that we do not use a dummy header.) If we were not 145 * concerned with either time or space efficiency, we could 146 * correctly perform enqueue and dequeue operations by traversing 147 * from a pointer to the initial node; CASing the item of the 148 * first unmatched node on match and CASing the next field of the 149 * trailing node on appends. (Plus some special-casing when 150 * initially empty). While this would be a terrible idea in 151 * itself, it does have the benefit of not requiring ANY atomic 152 * updates on head/tail fields. 153 * 154 * We introduce here an approach that lies between the extremes of 155 * never versus always updating queue (head and tail) pointers. 156 * This offers a tradeoff between sometimes requiring extra 157 * traversal steps to locate the first and/or last unmatched 158 * nodes, versus the reduced overhead and contention of fewer 159 * updates to queue pointers. For example, a possible snapshot of 160 * a queue is: 161 * 162 * head tail 163 * | | 164 * v v 165 * M -> M -> U -> U -> U -> U 166 * 167 * The best value for this "slack" (the targeted maximum distance 168 * between the value of "head" and the first unmatched node, and 169 * similarly for "tail") is an empirical matter. We have found 170 * that using very small constants in the range of 1-3 work best 171 * over a range of platforms. Larger values introduce increasing 172 * costs of cache misses and risks of long traversal chains, while 173 * smaller values increase CAS contention and overhead. 174 * 175 * Dual queues with slack differ from plain M&S dual queues by 176 * virtue of only sometimes updating head or tail pointers when 177 * matching, appending, or even traversing nodes; in order to 178 * maintain a targeted slack. The idea of "sometimes" may be 179 * operationalized in several ways. The simplest is to use a 180 * per-operation counter incremented on each traversal step, and 181 * to try (via CAS) to update the associated queue pointer 182 * whenever the count exceeds a threshold. Another, that requires 183 * more overhead, is to use random number generators to update 184 * with a given probability per traversal step. 185 * 186 * In any strategy along these lines, because CASes updating 187 * fields may fail, the actual slack may exceed targeted 188 * slack. However, they may be retried at any time to maintain 189 * targets. Even when using very small slack values, this 190 * approach works well for dual queues because it allows all 191 * operations up to the point of matching or appending an item 192 * (hence potentially allowing progress by another thread) to be 193 * read-only, thus not introducing any further contention. As 194 * described below, we implement this by performing slack 195 * maintenance retries only after these points. 196 * 197 * As an accompaniment to such techniques, traversal overhead can 198 * be further reduced without increasing contention of head 199 * pointer updates: Threads may sometimes shortcut the "next" link 200 * path from the current "head" node to be closer to the currently 201 * known first unmatched node, and similarly for tail. Again, this 202 * may be triggered with using thresholds or randomization. 203 * 204 * These ideas must be further extended to avoid unbounded amounts 205 * of costly-to-reclaim garbage caused by the sequential "next" 206 * links of nodes starting at old forgotten head nodes: As first 207 * described in detail by Boehm 208 * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC 209 * delays noticing that any arbitrarily old node has become 210 * garbage, all newer dead nodes will also be unreclaimed. 211 * (Similar issues arise in non-GC environments.) To cope with 212 * this in our implementation, upon CASing to advance the head 213 * pointer, we set the "next" link of the previous head to point 214 * only to itself; thus limiting the length of connected dead lists. 215 * (We also take similar care to wipe out possibly garbage 216 * retaining values held in other Node fields.) However, doing so 217 * adds some further complexity to traversal: If any "next" 218 * pointer links to itself, it indicates that the current thread 219 * has lagged behind a head-update, and so the traversal must 220 * continue from the "head". Traversals trying to find the 221 * current tail starting from "tail" may also encounter 222 * self-links, in which case they also continue at "head". 223 * 224 * It is tempting in slack-based scheme to not even use CAS for 225 * updates (similarly to Ladan-Mozes & Shavit). However, this 226 * cannot be done for head updates under the above link-forgetting 227 * mechanics because an update may leave head at a detached node. 228 * And while direct writes are possible for tail updates, they 229 * increase the risk of long retraversals, and hence long garbage 230 * chains, which can be much more costly than is worthwhile 231 * considering that the cost difference of performing a CAS vs 232 * write is smaller when they are not triggered on each operation 233 * (especially considering that writes and CASes equally require 234 * additional GC bookkeeping ("write barriers") that are sometimes 235 * more costly than the writes themselves because of contention). 236 * 237 * *** Overview of implementation *** 238 * 239 * We use a threshold-based approach to updates, with a slack 240 * threshold of two -- that is, we update head/tail when the 241 * current pointer appears to be two or more steps away from the 242 * first/last node. The slack value is hard-wired: a path greater 243 * than one is naturally implemented by checking equality of 244 * traversal pointers except when the list has only one element, 245 * in which case we keep slack threshold at one. Avoiding tracking 246 * explicit counts across method calls slightly simplifies an 247 * already-messy implementation. Using randomization would 248 * probably work better if there were a low-quality dirt-cheap 249 * per-thread one available, but even ThreadLocalRandom is too 250 * heavy for these purposes. 251 * 252 * With such a small slack threshold value, it is not worthwhile 253 * to augment this with path short-circuiting (i.e., unsplicing 254 * interior nodes) except in the case of cancellation/removal (see 255 * below). 256 * 257 * We allow both the head and tail fields to be null before any 258 * nodes are enqueued; initializing upon first append. This 259 * simplifies some other logic, as well as providing more 260 * efficient explicit control paths instead of letting JVMs insert 261 * implicit NullPointerExceptions when they are null. While not 262 * currently fully implemented, we also leave open the possibility 263 * of re-nulling these fields when empty (which is complicated to 264 * arrange, for little benefit.) 265 * 266 * All enqueue/dequeue operations are handled by the single method 267 * "xfer" with parameters indicating whether to act as some form 268 * of offer, put, poll, take, or transfer (each possibly with 269 * timeout). The relative complexity of using one monolithic 270 * method outweighs the code bulk and maintenance problems of 271 * using separate methods for each case. 272 * 273 * Operation consists of up to three phases. The first is 274 * implemented within method xfer, the second in tryAppend, and 275 * the third in method awaitMatch. 276 * 277 * 1. Try to match an existing node 278 * 279 * Starting at head, skip already-matched nodes until finding 280 * an unmatched node of opposite mode, if one exists, in which 281 * case matching it and returning, also if necessary updating 282 * head to one past the matched node (or the node itself if the 283 * list has no other unmatched nodes). If the CAS misses, then 284 * a loop retries advancing head by two steps until either 285 * success or the slack is at most two. By requiring that each 286 * attempt advances head by two (if applicable), we ensure that 287 * the slack does not grow without bound. Traversals also check 288 * if the initial head is now off-list, in which case they 289 * start at the new head. 290 * 291 * If no candidates are found and the call was untimed 292 * poll/offer, (argument "how" is NOW) return. 293 * 294 * 2. Try to append a new node (method tryAppend) 295 * 296 * Starting at current tail pointer, find the actual last node 297 * and try to append a new node (or if head was null, establish 298 * the first node). Nodes can be appended only if their 299 * predecessors are either already matched or are of the same 300 * mode. If we detect otherwise, then a new node with opposite 301 * mode must have been appended during traversal, so we must 302 * restart at phase 1. The traversal and update steps are 303 * otherwise similar to phase 1: Retrying upon CAS misses and 304 * checking for staleness. In particular, if a self-link is 305 * encountered, then we can safely jump to a node on the list 306 * by continuing the traversal at current head. 307 * 308 * On successful append, if the call was ASYNC, return. 309 * 310 * 3. Await match or cancellation (method awaitMatch) 311 * 312 * Wait for another thread to match node; instead cancelling if 313 * the current thread was interrupted or the wait timed out. On 314 * multiprocessors, we use front-of-queue spinning: If a node 315 * appears to be the first unmatched node in the queue, it 316 * spins a bit before blocking. In either case, before blocking 317 * it tries to unsplice any nodes between the current "head" 318 * and the first unmatched node. 319 * 320 * Front-of-queue spinning vastly improves performance of 321 * heavily contended queues. And so long as it is relatively 322 * brief and "quiet", spinning does not much impact performance 323 * of less-contended queues. During spins threads check their 324 * interrupt status and generate a thread-local random number 325 * to decide to occasionally perform a Thread.yield. While 326 * yield has underdefined specs, we assume that might it help, 327 * and will not hurt in limiting impact of spinning on busy 328 * systems. We also use smaller (1/2) spins for nodes that are 329 * not known to be front but whose predecessors have not 330 * blocked -- these "chained" spins avoid artifacts of 331 * front-of-queue rules which otherwise lead to alternating 332 * nodes spinning vs blocking. Further, front threads that 333 * represent phase changes (from data to request node or vice 334 * versa) compared to their predecessors receive additional 335 * chained spins, reflecting longer paths typically required to 336 * unblock threads during phase changes. 337 * 338 * 339 * ** Unlinking removed interior nodes ** 340 * 341 * In addition to minimizing garbage retention via self-linking 342 * described above, we also unlink removed interior nodes. These 343 * may arise due to timed out or interrupted waits, or calls to 344 * remove(x) or Iterator.remove. Normally, given a node that was 345 * at one time known to be the predecessor of some node s that is 346 * to be removed, we can unsplice s by CASing the next field of 347 * its predecessor if it still points to s (otherwise s must 348 * already have been removed or is now offlist). But there are two 349 * situations in which we cannot guarantee to make node s 350 * unreachable in this way: (1) If s is the trailing node of list 351 * (i.e., with null next), then it is pinned as the target node 352 * for appends, so can only be removed later after other nodes are 353 * appended. (2) We cannot necessarily unlink s given a 354 * predecessor node that is matched (including the case of being 355 * cancelled): the predecessor may already be unspliced, in which 356 * case some previous reachable node may still point to s. 357 * (For further explanation see Herlihy & Shavit "The Art of 358 * Multiprocessor Programming" chapter 9). Although, in both 359 * cases, we can rule out the need for further action if either s 360 * or its predecessor are (or can be made to be) at, or fall off 361 * from, the head of list. 362 * 363 * Without taking these into account, it would be possible for an 364 * unbounded number of supposedly removed nodes to remain 365 * reachable. Situations leading to such buildup are uncommon but 366 * can occur in practice; for example when a series of short timed 367 * calls to poll repeatedly time out but never otherwise fall off 368 * the list because of an untimed call to take at the front of the 369 * queue. 370 * 371 * When these cases arise, rather than always retraversing the 372 * entire list to find an actual predecessor to unlink (which 373 * won't help for case (1) anyway), we record a conservative 374 * estimate of possible unsplice failures (in "sweepVotes"). 375 * We trigger a full sweep when the estimate exceeds a threshold 376 * ("SWEEP_THRESHOLD") indicating the maximum number of estimated 377 * removal failures to tolerate before sweeping through, unlinking 378 * cancelled nodes that were not unlinked upon initial removal. 379 * We perform sweeps by the thread hitting threshold (rather than 380 * background threads or by spreading work to other threads) 381 * because in the main contexts in which removal occurs, the 382 * caller is already timed-out, cancelled, or performing a 383 * potentially O(n) operation (e.g. remove(x)), none of which are 384 * time-critical enough to warrant the overhead that alternatives 385 * would impose on other threads. 386 * 387 * Because the sweepVotes estimate is conservative, and because 388 * nodes become unlinked "naturally" as they fall off the head of 389 * the queue, and because we allow votes to accumulate even while 390 * sweeps are in progress, there are typically significantly fewer 391 * such nodes than estimated. Choice of a threshold value 392 * balances the likelihood of wasted effort and contention, versus 393 * providing a worst-case bound on retention of interior nodes in 394 * quiescent queues. The value defined below was chosen 395 * empirically to balance these under various timeout scenarios. 396 * 397 * Note that we cannot self-link unlinked interior nodes during 398 * sweeps. However, the associated garbage chains terminate when 399 * some successor ultimately falls off the head of the list and is 400 * self-linked. 401 */ 402 403 /** True if on multiprocessor */ 404 private static final boolean MP = 405 Runtime.getRuntime().availableProcessors() > 1; 406 407 /** 408 * The number of times to spin (with randomly interspersed calls 409 * to Thread.yield) on multiprocessor before blocking when a node 410 * is apparently the first waiter in the queue. See above for 411 * explanation. Must be a power of two. The value is empirically 412 * derived -- it works pretty well across a variety of processors, 413 * numbers of CPUs, and OSes. 414 */ 415 private static final int FRONT_SPINS = 1 << 7; 416 417 /** 418 * The number of times to spin before blocking when a node is 419 * preceded by another node that is apparently spinning. Also 420 * serves as an increment to FRONT_SPINS on phase changes, and as 421 * base average frequency for yielding during spins. Must be a 422 * power of two. 423 */ 424 private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; 425 426 /** 427 * The maximum number of estimated removal failures (sweepVotes) 428 * to tolerate before sweeping through the queue unlinking 429 * cancelled nodes that were not unlinked upon initial 430 * removal. See above for explanation. The value must be at least 431 * two to avoid useless sweeps when removing trailing nodes. 432 */ 433 static final int SWEEP_THRESHOLD = 32; 434 435 /** 436 * Queue nodes. Uses Object, not E, for items to allow forgetting 437 * them after use. Relies heavily on Unsafe mechanics to minimize 438 * unnecessary ordering constraints: Writes that are intrinsically 439 * ordered wrt other accesses or CASes use simple relaxed forms. 440 */ 441 static final class Node { 442 final boolean isData; // false if this is a request node 443 volatile Object item; // initially non-null if isData; CASed to match 444 volatile Node next; 445 volatile Thread waiter; // null until waiting 446 447 // CAS methods for fields 448 final boolean casNext(Node cmp, Node val) { 449 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 450 } 451 452 final boolean casItem(Object cmp, Object val) { 453 // assert cmp == null || cmp.getClass() != Node.class; 454 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 455 } 456 457 /** 458 * Constructs a new node. Uses relaxed write because item can 459 * only be seen after publication via casNext. 460 */ 461 Node(Object item, boolean isData) { 462 UNSAFE.putObject(this, itemOffset, item); // relaxed write 463 this.isData = isData; 464 } 465 466 /** 467 * Links node to itself to avoid garbage retention. Called 468 * only after CASing head field, so uses relaxed write. 469 */ 470 final void forgetNext() { 471 UNSAFE.putObject(this, nextOffset, this); 472 } 473 474 /** 475 * Sets item to self and waiter to null, to avoid garbage 476 * retention after matching or cancelling. Uses relaxed writes 477 * because order is already constrained in the only calling 478 * contexts: item is forgotten only after volatile/atomic 479 * mechanics that extract items. Similarly, clearing waiter 480 * follows either CAS or return from park (if ever parked; 481 * else we don't care). 482 */ 483 final void forgetContents() { 484 UNSAFE.putObject(this, itemOffset, this); 485 UNSAFE.putObject(this, waiterOffset, null); 486 } 487 488 /** 489 * Returns true if this node has been matched, including the 490 * case of artificial matches due to cancellation. 491 */ 492 final boolean isMatched() { 493 Object x = item; 494 return (x == this) || ((x == null) == isData); 495 } 496 497 /** 498 * Returns true if this is an unmatched request node. 499 */ 500 final boolean isUnmatchedRequest() { 501 return !isData && item == null; 502 } 503 504 /** 505 * Returns true if a node with the given mode cannot be 506 * appended to this node because this node is unmatched and 507 * has opposite data mode. 508 */ 509 final boolean cannotPrecede(boolean haveData) { 510 boolean d = isData; 511 Object x; 512 return d != haveData && (x = item) != this && (x != null) == d; 513 } 514 515 /** 516 * Tries to artificially match a data node -- used by remove. 517 */ 518 final boolean tryMatchData() { 519 // assert isData; 520 Object x = item; 521 if (x != null && x != this && casItem(x, null)) { 522 LockSupport.unpark(waiter); 523 return true; 524 } 525 return false; 526 } 527 528 // Unsafe mechanics 529 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 530 private static final long nextOffset = 531 objectFieldOffset(UNSAFE, "next", Node.class); 532 private static final long itemOffset = 533 objectFieldOffset(UNSAFE, "item", Node.class); 534 private static final long waiterOffset = 535 objectFieldOffset(UNSAFE, "waiter", Node.class); 536 537 private static final long serialVersionUID = -3375979862319811754L; 538 } 539 540 /** head of the queue; null until first enqueue */ 541 transient volatile Node head; 542 543 /** tail of the queue; null until first append */ 544 private transient volatile Node tail; 545 546 /** The number of apparent failures to unsplice removed nodes */ 547 private transient volatile int sweepVotes; 548 549 // CAS methods for fields 550 private boolean casTail(Node cmp, Node val) { 551 return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); 552 } 553 554 private boolean casHead(Node cmp, Node val) { 555 return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); 556 } 557 558 private boolean casSweepVotes(int cmp, int val) { 559 return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val); 560 } 561 562 /* 563 * Possible values for "how" argument in xfer method. 564 */ 565 private static final int NOW = 0; // for untimed poll, tryTransfer 566 private static final int ASYNC = 1; // for offer, put, add 567 private static final int SYNC = 2; // for transfer, take 568 private static final int TIMED = 3; // for timed poll, tryTransfer 569 570 @SuppressWarnings("unchecked") 571 static <E> E cast(Object item) { 572 // assert item == null || item.getClass() != Node.class; 573 return (E) item; 574 } 575 576 /** 577 * Implements all queuing methods. See above for explanation. 578 * 579 * @param e the item or null for take 580 * @param haveData true if this is a put, else a take 581 * @param how NOW, ASYNC, SYNC, or TIMED 582 * @param nanos timeout in nanosecs, used only if mode is TIMED 583 * @return an item if matched, else e 584 * @throws NullPointerException if haveData mode but e is null 585 */ 586 private E xfer(E e, boolean haveData, int how, long nanos) { 587 if (haveData && (e == null)) 588 throw new NullPointerException(); 589 Node s = null; // the node to append, if needed 590 591 retry: for (;;) { // restart on append race 592 593 for (Node h = head, p = h; p != null;) { // find & match first node 594 boolean isData = p.isData; 595 Object item = p.item; 596 if (item != p && (item != null) == isData) { // unmatched 597 if (isData == haveData) // can't match 598 break; 599 if (p.casItem(item, e)) { // match 600 for (Node q = p; q != h;) { 601 Node n = q.next; // update by 2 unless singleton 602 if (head == h && casHead(h, n == null? q : n)) { 603 h.forgetNext(); 604 break; 605 } // advance and retry 606 if ((h = head) == null || 607 (q = h.next) == null || !q.isMatched()) 608 break; // unless slack < 2 609 } 610 LockSupport.unpark(p.waiter); 611 return this.<E>cast(item); 612 } 613 } 614 Node n = p.next; 615 p = (p != n) ? n : (h = head); // Use head if p offlist 616 } 617 618 if (how != NOW) { // No matches available 619 if (s == null) 620 s = new Node(e, haveData); 621 Node pred = tryAppend(s, haveData); 622 if (pred == null) 623 continue retry; // lost race vs opposite mode 624 if (how != ASYNC) 625 return awaitMatch(s, pred, e, (how == TIMED), nanos); 626 } 627 return e; // not waiting 628 } 629 } 630 631 /** 632 * Tries to append node s as tail. 633 * 634 * @param s the node to append 635 * @param haveData true if appending in data mode 636 * @return null on failure due to losing race with append in 637 * different mode, else s's predecessor, or s itself if no 638 * predecessor 639 */ 640 private Node tryAppend(Node s, boolean haveData) { 641 for (Node t = tail, p = t;;) { // move p to last node and append 642 Node n, u; // temps for reads of next & tail 643 if (p == null && (p = head) == null) { 644 if (casHead(null, s)) 645 return s; // initialize 646 } 647 else if (p.cannotPrecede(haveData)) 648 return null; // lost race vs opposite mode 649 else if ((n = p.next) != null) // not last; keep traversing 650 p = p != t && t != (u = tail) ? (t = u) : // stale tail 651 (p != n) ? n : null; // restart if off list 652 else if (!p.casNext(null, s)) 653 p = p.next; // re-read on CAS failure 654 else { 655 if (p != t) { // update if slack now >= 2 656 while ((tail != t || !casTail(t, s)) && 657 (t = tail) != null && 658 (s = t.next) != null && // advance and retry 659 (s = s.next) != null && s != t); 660 } 661 return p; 662 } 663 } 664 } 665 666 /** 667 * Spins/yields/blocks until node s is matched or caller gives up. 668 * 669 * @param s the waiting node 670 * @param pred the predecessor of s, or s itself if it has no 671 * predecessor, or null if unknown (the null case does not occur 672 * in any current calls but may in possible future extensions) 673 * @param e the comparison value for checking match 674 * @param timed if true, wait only until timeout elapses 675 * @param nanos timeout in nanosecs, used only if timed is true 676 * @return matched item, or e if unmatched on interrupt or timeout 677 */ 678 private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { 679 long lastTime = timed ? System.nanoTime() : 0L; 680 Thread w = Thread.currentThread(); 681 int spins = -1; // initialized after first item and cancel checks 682 ThreadLocalRandom randomYields = null; // bound if needed 683 684 for (;;) { 685 Object item = s.item; 686 if (item != e) { // matched 687 // assert item != s; 688 s.forgetContents(); // avoid garbage 689 return this.<E>cast(item); 690 } 691 if ((w.isInterrupted() || (timed && nanos <= 0)) && 692 s.casItem(e, s)) { // cancel 693 unsplice(pred, s); 694 return e; 695 } 696 697 if (spins < 0) { // establish spins at/near front 698 if ((spins = spinsFor(pred, s.isData)) > 0) 699 randomYields = ThreadLocalRandom.current(); 700 } 701 else if (spins > 0) { // spin 702 --spins; 703 if (randomYields.nextInt(CHAINED_SPINS) == 0) 704 Thread.yield(); // occasionally yield 705 } 706 else if (s.waiter == null) { 707 s.waiter = w; // request unpark then recheck 708 } 709 else if (timed) { 710 long now = System.nanoTime(); 711 if ((nanos -= now - lastTime) > 0) 712 LockSupport.parkNanos(this, nanos); 713 lastTime = now; 714 } 715 else { 716 LockSupport.park(this); 717 } 718 } 719 } 720 721 /** 722 * Returns spin/yield value for a node with given predecessor and 723 * data mode. See above for explanation. 724 */ 725 private static int spinsFor(Node pred, boolean haveData) { 726 if (MP && pred != null) { 727 if (pred.isData != haveData) // phase change 728 return FRONT_SPINS + CHAINED_SPINS; 729 if (pred.isMatched()) // probably at front 730 return FRONT_SPINS; 731 if (pred.waiter == null) // pred apparently spinning 732 return CHAINED_SPINS; 733 } 734 return 0; 735 } 736 737 /* -------------- Traversal methods -------------- */ 738 739 /** 740 * Returns the successor of p, or the head node if p.next has been 741 * linked to self, which will only be true if traversing with a 742 * stale pointer that is now off the list. 743 */ 744 final Node succ(Node p) { 745 Node next = p.next; 746 return (p == next) ? head : next; 747 } 748 749 /** 750 * Returns the first unmatched node of the given mode, or null if 751 * none. Used by methods isEmpty, hasWaitingConsumer. 752 */ 753 private Node firstOfMode(boolean isData) { 754 for (Node p = head; p != null; p = succ(p)) { 755 if (!p.isMatched()) 756 return (p.isData == isData) ? p : null; 757 } 758 return null; 759 } 760 761 /** 762 * Returns the item in the first unmatched node with isData; or 763 * null if none. Used by peek. 764 */ 765 private E firstDataItem() { 766 for (Node p = head; p != null; p = succ(p)) { 767 Object item = p.item; 768 if (p.isData) { 769 if (item != null && item != p) 770 return this.<E>cast(item); 771 } 772 else if (item == null) 773 return null; 774 } 775 return null; 776 } 777 778 /** 779 * Traverses and counts unmatched nodes of the given mode. 780 * Used by methods size and getWaitingConsumerCount. 781 */ 782 private int countOfMode(boolean data) { 783 int count = 0; 784 for (Node p = head; p != null; ) { 785 if (!p.isMatched()) { 786 if (p.isData != data) 787 return 0; 788 if (++count == Integer.MAX_VALUE) // saturated 789 break; 790 } 791 Node n = p.next; 792 if (n != p) 793 p = n; 794 else { 795 count = 0; 796 p = head; 797 } 798 } 799 return count; 800 } 801 802 final class Itr implements Iterator<E> { 803 private Node nextNode; // next node to return item for 804 private E nextItem; // the corresponding item 805 private Node lastRet; // last returned node, to support remove 806 private Node lastPred; // predecessor to unlink lastRet 807 808 /** 809 * Moves to next node after prev, or first node if prev null. 810 */ 811 private void advance(Node prev) { 812 lastPred = lastRet; 813 lastRet = prev; 814 for (Node p = (prev == null) ? head : succ(prev); 815 p != null; p = succ(p)) { 816 Object item = p.item; 817 if (p.isData) { 818 if (item != null && item != p) { 819 nextItem = LinkedTransferQueue.this.<E>cast(item); 820 nextNode = p; 821 return; 822 } 823 } 824 else if (item == null) 825 break; 826 } 827 nextNode = null; 828 } 829 830 Itr() { 831 advance(null); 832 } 833 834 public final boolean hasNext() { 835 return nextNode != null; 836 } 837 838 public final E next() { 839 Node p = nextNode; 840 if (p == null) throw new NoSuchElementException(); 841 E e = nextItem; 842 advance(p); 843 return e; 844 } 845 846 public final void remove() { 847 Node p = lastRet; 848 if (p == null) throw new IllegalStateException(); 849 if (p.tryMatchData()) 850 unsplice(lastPred, p); 851 } 852 } 853 854 /* -------------- Removal methods -------------- */ 855 856 /** 857 * Unsplices (now or later) the given deleted/cancelled node with 858 * the given predecessor. 859 * 860 * @param pred a node that was at one time known to be the 861 * predecessor of s, or null or s itself if s is/was at head 862 * @param s the node to be unspliced 863 */ 864 final void unsplice(Node pred, Node s) { 865 s.forgetContents(); // forget unneeded fields 866 /* 867 * See above for rationale. Briefly: if pred still points to 868 * s, try to unlink s. If s cannot be unlinked, because it is 869 * trailing node or pred might be unlinked, and neither pred 870 * nor s are head or offlist, add to sweepVotes, and if enough 871 * votes have accumulated, sweep. 872 */ 873 if (pred != null && pred != s && pred.next == s) { 874 Node n = s.next; 875 if (n == null || 876 (n != s && pred.casNext(s, n) && pred.isMatched())) { 877 for (;;) { // check if at, or could be, head 878 Node h = head; 879 if (h == pred || h == s || h == null) 880 return; // at head or list empty 881 if (!h.isMatched()) 882 break; 883 Node hn = h.next; 884 if (hn == null) 885 return; // now empty 886 if (hn != h && casHead(h, hn)) 887 h.forgetNext(); // advance head 888 } 889 if (pred.next != pred && s.next != s) { // recheck if offlist 890 for (;;) { // sweep now if enough votes 891 int v = sweepVotes; 892 if (v < SWEEP_THRESHOLD) { 893 if (casSweepVotes(v, v + 1)) 894 break; 895 } 896 else if (casSweepVotes(v, 0)) { 897 sweep(); 898 break; 899 } 900 } 901 } 902 } 903 } 904 } 905 906 /** 907 * Unlinks matched (typically cancelled) nodes encountered in a 908 * traversal from head. 909 */ 910 private void sweep() { 911 for (Node p = head, s, n; p != null && (s = p.next) != null; ) { 912 if (!s.isMatched()) 913 // Unmatched nodes are never self-linked 914 p = s; 915 else if ((n = s.next) == null) // trailing node is pinned 916 break; 917 else if (s == n) // stale 918 // No need to also check for p == s, since that implies s == n 919 p = head; 920 else 921 p.casNext(s, n); 922 } 923 } 924 925 /** 926 * Main implementation of remove(Object) 927 */ 928 private boolean findAndRemove(Object e) { 929 if (e != null) { 930 for (Node pred = null, p = head; p != null; ) { 931 Object item = p.item; 932 if (p.isData) { 933 if (item != null && item != p && e.equals(item) && 934 p.tryMatchData()) { 935 unsplice(pred, p); 936 return true; 937 } 938 } 939 else if (item == null) 940 break; 941 pred = p; 942 if ((p = p.next) == pred) { // stale 943 pred = null; 944 p = head; 945 } 946 } 947 } 948 return false; 949 } 950 951 952 /** 953 * Creates an initially empty {@code LinkedTransferQueue}. 954 */ 955 public LinkedTransferQueue() { 956 } 957 958 /** 959 * Creates a {@code LinkedTransferQueue} 960 * initially containing the elements of the given collection, 961 * added in traversal order of the collection's iterator. 962 * 963 * @param c the collection of elements to initially contain 964 * @throws NullPointerException if the specified collection or any 965 * of its elements are null 966 */ 967 public LinkedTransferQueue(Collection<? extends E> c) { 968 this(); 969 addAll(c); 970 } 971 972 /** 973 * Inserts the specified element at the tail of this queue. 974 * As the queue is unbounded, this method will never block. 975 * 976 * @throws NullPointerException if the specified element is null 977 */ 978 public void put(E e) { 979 xfer(e, true, ASYNC, 0); 980 } 981 982 /** 983 * Inserts the specified element at the tail of this queue. 984 * As the queue is unbounded, this method will never block or 985 * return {@code false}. 986 * 987 * @return {@code true} (as specified by 988 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) 989 * @throws NullPointerException if the specified element is null 990 */ 991 public boolean offer(E e, long timeout, TimeUnit unit) { 992 xfer(e, true, ASYNC, 0); 993 return true; 994 } 995 996 /** 997 * Inserts the specified element at the tail of this queue. 998 * As the queue is unbounded, this method will never return {@code false}. 999 * 1000 * @return {@code true} (as specified by 1001 * {@link BlockingQueue#offer(Object) BlockingQueue.offer}) 1002 * @throws NullPointerException if the specified element is null 1003 */ 1004 public boolean offer(E e) { 1005 xfer(e, true, ASYNC, 0); 1006 return true; 1007 } 1008 1009 /** 1010 * Inserts the specified element at the tail of this queue. 1011 * As the queue is unbounded, this method will never throw 1012 * {@link IllegalStateException} or return {@code false}. 1013 * 1014 * @return {@code true} (as specified by {@link Collection#add}) 1015 * @throws NullPointerException if the specified element is null 1016 */ 1017 public boolean add(E e) { 1018 xfer(e, true, ASYNC, 0); 1019 return true; 1020 } 1021 1022 /** 1023 * Transfers the element to a waiting consumer immediately, if possible. 1024 * 1025 * <p>More precisely, transfers the specified element immediately 1026 * if there exists a consumer already waiting to receive it (in 1027 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1028 * otherwise returning {@code false} without enqueuing the element. 1029 * 1030 * @throws NullPointerException if the specified element is null 1031 */ 1032 public boolean tryTransfer(E e) { 1033 return xfer(e, true, NOW, 0) == null; 1034 } 1035 1036 /** 1037 * Transfers the element to a consumer, waiting if necessary to do so. 1038 * 1039 * <p>More precisely, transfers the specified element immediately 1040 * if there exists a consumer already waiting to receive it (in 1041 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1042 * else inserts the specified element at the tail of this queue 1043 * and waits until the element is received by a consumer. 1044 * 1045 * @throws NullPointerException if the specified element is null 1046 */ 1047 public void transfer(E e) throws InterruptedException { 1048 if (xfer(e, true, SYNC, 0) != null) { 1049 Thread.interrupted(); // failure possible only due to interrupt 1050 throw new InterruptedException(); 1051 } 1052 } 1053 1054 /** 1055 * Transfers the element to a consumer if it is possible to do so 1056 * before the timeout elapses. 1057 * 1058 * <p>More precisely, transfers the specified element immediately 1059 * if there exists a consumer already waiting to receive it (in 1060 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1061 * else inserts the specified element at the tail of this queue 1062 * and waits until the element is received by a consumer, 1063 * returning {@code false} if the specified wait time elapses 1064 * before the element can be transferred. 1065 * 1066 * @throws NullPointerException if the specified element is null 1067 */ 1068 public boolean tryTransfer(E e, long timeout, TimeUnit unit) 1069 throws InterruptedException { 1070 if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) 1071 return true; 1072 if (!Thread.interrupted()) 1073 return false; 1074 throw new InterruptedException(); 1075 } 1076 1077 public E take() throws InterruptedException { 1078 E e = xfer(null, false, SYNC, 0); 1079 if (e != null) 1080 return e; 1081 Thread.interrupted(); 1082 throw new InterruptedException(); 1083 } 1084 1085 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 1086 E e = xfer(null, false, TIMED, unit.toNanos(timeout)); 1087 if (e != null || !Thread.interrupted()) 1088 return e; 1089 throw new InterruptedException(); 1090 } 1091 1092 public E poll() { 1093 return xfer(null, false, NOW, 0); 1094 } 1095 1096 /** 1097 * @throws NullPointerException {@inheritDoc} 1098 * @throws IllegalArgumentException {@inheritDoc} 1099 */ 1100 public int drainTo(Collection<? super E> c) { 1101 if (c == null) 1102 throw new NullPointerException(); 1103 if (c == this) 1104 throw new IllegalArgumentException(); 1105 int n = 0; 1106 E e; 1107 while ( (e = poll()) != null) { 1108 c.add(e); 1109 ++n; 1110 } 1111 return n; 1112 } 1113 1114 /** 1115 * @throws NullPointerException {@inheritDoc} 1116 * @throws IllegalArgumentException {@inheritDoc} 1117 */ 1118 public int drainTo(Collection<? super E> c, int maxElements) { 1119 if (c == null) 1120 throw new NullPointerException(); 1121 if (c == this) 1122 throw new IllegalArgumentException(); 1123 int n = 0; 1124 E e; 1125 while (n < maxElements && (e = poll()) != null) { 1126 c.add(e); 1127 ++n; 1128 } 1129 return n; 1130 } 1131 1132 /** 1133 * Returns an iterator over the elements in this queue in proper 1134 * sequence, from head to tail. 1135 * 1136 * <p>The returned iterator is a "weakly consistent" iterator that 1137 * will never throw 1138 * {@link ConcurrentModificationException ConcurrentModificationException}, 1139 * and guarantees to traverse elements as they existed upon 1140 * construction of the iterator, and may (but is not guaranteed 1141 * to) reflect any modifications subsequent to construction. 1142 * 1143 * @return an iterator over the elements in this queue in proper sequence 1144 */ 1145 public Iterator<E> iterator() { 1146 return new Itr(); 1147 } 1148 1149 public E peek() { 1150 return firstDataItem(); 1151 } 1152 1153 /** 1154 * Returns {@code true} if this queue contains no elements. 1155 * 1156 * @return {@code true} if this queue contains no elements 1157 */ 1158 public boolean isEmpty() { 1159 for (Node p = head; p != null; p = succ(p)) { 1160 if (!p.isMatched()) 1161 return !p.isData; 1162 } 1163 return true; 1164 } 1165 1166 public boolean hasWaitingConsumer() { 1167 return firstOfMode(false) != null; 1168 } 1169 1170 /** 1171 * Returns the number of elements in this queue. If this queue 1172 * contains more than {@code Integer.MAX_VALUE} elements, returns 1173 * {@code Integer.MAX_VALUE}. 1174 * 1175 * <p>Beware that, unlike in most collections, this method is 1176 * <em>NOT</em> a constant-time operation. Because of the 1177 * asynchronous nature of these queues, determining the current 1178 * number of elements requires an O(n) traversal. 1179 * 1180 * @return the number of elements in this queue 1181 */ 1182 public int size() { 1183 return countOfMode(true); 1184 } 1185 1186 public int getWaitingConsumerCount() { 1187 return countOfMode(false); 1188 } 1189 1190 /** 1191 * Removes a single instance of the specified element from this queue, 1192 * if it is present. More formally, removes an element {@code e} such 1193 * that {@code o.equals(e)}, if this queue contains one or more such 1194 * elements. 1195 * Returns {@code true} if this queue contained the specified element 1196 * (or equivalently, if this queue changed as a result of the call). 1197 * 1198 * @param o element to be removed from this queue, if present 1199 * @return {@code true} if this queue changed as a result of the call 1200 */ 1201 public boolean remove(Object o) { 1202 return findAndRemove(o); 1203 } 1204 1205 /** 1206 * Always returns {@code Integer.MAX_VALUE} because a 1207 * {@code LinkedTransferQueue} is not capacity constrained. 1208 * 1209 * @return {@code Integer.MAX_VALUE} (as specified by 1210 * {@link BlockingQueue#remainingCapacity()}) 1211 */ 1212 public int remainingCapacity() { 1213 return Integer.MAX_VALUE; 1214 } 1215 1216 /** 1217 * Saves the state to a stream (that is, serializes it). 1218 * 1219 * @serialData All of the elements (each an {@code E}) in 1220 * the proper order, followed by a null 1221 * @param s the stream 1222 */ 1223 private void writeObject(java.io.ObjectOutputStream s) 1224 throws java.io.IOException { 1225 s.defaultWriteObject(); 1226 for (E e : this) 1227 s.writeObject(e); 1228 // Use trailing null as sentinel 1229 s.writeObject(null); 1230 } 1231 1232 /** 1233 * Reconstitutes the Queue instance from a stream (that is, 1234 * deserializes it). 1235 * 1236 * @param s the stream 1237 */ 1238 private void readObject(java.io.ObjectInputStream s) 1239 throws java.io.IOException, ClassNotFoundException { 1240 s.defaultReadObject(); 1241 for (;;) { 1242 @SuppressWarnings("unchecked") E item = (E) s.readObject(); 1243 if (item == null) 1244 break; 1245 else 1246 offer(item); 1247 } 1248 } 1249 1250 // Unsafe mechanics 1251 1252 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); 1253 private static final long headOffset = 1254 objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class); 1255 private static final long tailOffset = 1256 objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class); 1257 private static final long sweepVotesOffset = 1258 objectFieldOffset(UNSAFE, "sweepVotes", LinkedTransferQueue.class); 1259 1260 static long objectFieldOffset(sun.misc.Unsafe UNSAFE, 1261 String field, Class<?> klazz) { 1262 try { 1263 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); 1264 } catch (NoSuchFieldException e) { 1265 // Convert Exception to corresponding Error 1266 NoSuchFieldError error = new NoSuchFieldError(field); 1267 error.initCause(e); 1268 throw error; 1269 } 1270 } 1271 } --- EOF ---