292 * this call was ASYNC (e.g. offer), an element was
293 * successfully added to the end of the queue and we return.
294 *
295 * Of course, this naive traversal is O(n) when no match is
296 * possible. We optimize the traversal by maintaining a tail
297 * pointer, which is expected to be "near" the end of the list.
298 * It is only safe to fast-forward to tail (in the presence of
299 * arbitrary concurrent changes) if it is pointing to a node of
300 * the same mode, even if it is dead (in this case no preceding
301 * node could still be matchable by this traversal). If we
302 * need to restart due to falling off-list, we can again
303 * fast-forward to tail, but only if it has changed since the
304 * last traversal (else we might loop forever). If tail cannot
305 * be used, traversal starts at head (but in this case we
306 * expect to be able to match near head). As with head, we
307 * CAS-advance the tail pointer by at least two hops.
308 *
309 * 2. Await match or cancellation (method awaitMatch)
310 *
311 * Wait for another thread to match node; instead cancelling if
312 * the current thread was interrupted or the wait timed out. On
313 * multiprocessors, we use front-of-queue spinning: If a node
314 * appears to be the first unmatched node in the queue, it
315 * spins a bit before blocking. In either case, before blocking
316 * it tries to unsplice any nodes between the current "head"
317 * and the first unmatched node.
318 *
319 * Front-of-queue spinning vastly improves performance of
320 * heavily contended queues. And so long as it is relatively
321 * brief and "quiet", spinning does not much impact performance
322 * of less-contended queues. During spins threads check their
323 * interrupt status and generate a thread-local random number
324 * to decide to occasionally perform a Thread.yield. While
325 * yield has underdefined specs, we assume that it might help,
326 * and will not hurt, in limiting impact of spinning on busy
327 * systems. We also use smaller (1/2) spins for nodes that are
328 * not known to be front but whose predecessors have not
329 * blocked -- these "chained" spins avoid artifacts of
330 * front-of-queue rules which otherwise lead to alternating
331 * nodes spinning vs blocking. Further, front threads that
332 * represent phase changes (from data to request node or vice
333 * versa) compared to their predecessors receive additional
334 * chained spins, reflecting longer paths typically required to
335 * unblock threads during phase changes.
336 *
337 *
338 * ** Unlinking removed interior nodes **
339 *
340 * In addition to minimizing garbage retention via self-linking
341 * described above, we also unlink removed interior nodes. These
342 * may arise due to timed out or interrupted waits, or calls to
343 * remove(x) or Iterator.remove. Normally, given a node that was
344 * at one time known to be the predecessor of some node s that is
345 * to be removed, we can unsplice s by CASing the next field of
346 * its predecessor if it still points to s (otherwise s must
347 * already have been removed or is now offlist). But there are two
348 * situations in which we cannot guarantee to make node s
349 * unreachable in this way: (1) If s is the trailing node of list
350 * (i.e., with null next), then it is pinned as the target node
351 * for appends, so can only be removed later after other nodes are
352 * appended. (2) We cannot necessarily unlink s given a
353 * predecessor node that is matched (including the case of being
354 * cancelled): the predecessor may already be unspliced, in which
355 * case some previous reachable node may still point to s.
356 * (For further explanation see Herlihy & Shavit "The Art of
357 * Multiprocessor Programming" chapter 9). Although, in both
358 * cases, we can rule out the need for further action if either s
359 * or its predecessor are (or can be made to be) at, or fall off
360 * from, the head of list.
361 *
362 * Without taking these into account, it would be possible for an
363 * unbounded number of supposedly removed nodes to remain reachable.
364 * Situations leading to such buildup are uncommon but can occur
365 * in practice; for example when a series of short timed calls to
366 * poll repeatedly time out at the trailing node but otherwise
367 * never fall off the list because of an untimed call to take() at
368 * the front of the queue.
369 *
370 * When these cases arise, rather than always retraversing the
371 * entire list to find an actual predecessor to unlink (which
372 * won't help for case (1) anyway), we record a conservative
373 * estimate of possible unsplice failures (in "sweepVotes").
374 * We trigger a full sweep when the estimate exceeds a threshold
375 * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
376 * removal failures to tolerate before sweeping through, unlinking
377 * cancelled nodes that were not unlinked upon initial removal.
378 * We perform sweeps by the thread hitting threshold (rather than
379 * background threads or by spreading work to other threads)
380 * because in the main contexts in which removal occurs, the
381 * caller is timed-out or cancelled, which are not time-critical
382 * enough to warrant the overhead that alternatives would impose
383 * on other threads.
384 *
385 * Because the sweepVotes estimate is conservative, and because
386 * nodes become unlinked "naturally" as they fall off the head of
387 * the queue, and because we allow votes to accumulate even while
388 * sweeps are in progress, there are typically significantly fewer
389 * such nodes than estimated. Choice of a threshold value
390 * balances the likelihood of wasted effort and contention, versus
391 * providing a worst-case bound on retention of interior nodes in
392 * quiescent queues. The value defined below was chosen
393 * empirically to balance these under various timeout scenarios.
394 *
395 * Because traversal operations on the linked list of nodes are a
396 * natural opportunity to sweep dead nodes, we generally do so,
397 * including all the operations that might remove elements as they
398 * traverse, such as removeIf and Iterator.remove. This largely
399 * eliminates long chains of dead interior nodes, except from
400 * cancelled or timed out blocking operations.
401 *
402 * Note that we cannot self-link unlinked interior nodes during
403 * sweeps. However, the associated garbage chains terminate when
404 * some successor ultimately falls off the head of the list and is
405 * self-linked.
406 */
407
408 /** True if on multiprocessor */
409 private static final boolean MP =
410 Runtime.getRuntime().availableProcessors() > 1;
411
412 /**
413 * The number of times to spin (with randomly interspersed calls
414 * to Thread.yield) on multiprocessor before blocking when a node
415 * is apparently the first waiter in the queue. See above for
416 * explanation. Must be a power of two. The value is empirically
417 * derived -- it works pretty well across a variety of processors,
418 * numbers of CPUs, and OSes.
419 */
420 private static final int FRONT_SPINS = 1 << 7;
421
422 /**
423 * The number of times to spin before blocking when a node is
424 * preceded by another node that is apparently spinning. Also
425 * serves as an increment to FRONT_SPINS on phase changes, and as
426 * base average frequency for yielding during spins. Must be a
427 * power of two.
428 */
429 private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
430
431 /**
432 * The maximum number of estimated removal failures (sweepVotes)
433 * to tolerate before sweeping through the queue unlinking
434 * cancelled nodes that were not unlinked upon initial
435 * removal. See above for explanation. The value must be at least
436 * two to avoid useless sweeps when removing trailing nodes.
437 */
438 static final int SWEEP_THRESHOLD = 32;
439
440 /**
441 * Queue nodes. Uses Object, not E, for items to allow forgetting
442 * them after use. Writes that are intrinsically ordered wrt
443 * other accesses or CASes use simple relaxed forms.
444 */
445 static final class Node {
446 final boolean isData; // false if this is a request node
447 volatile Object item; // initially non-null if isData; CASed to match
448 volatile Node next;
449 volatile Thread waiter; // null when not waiting for a match
450
451 /**
452 * Constructs a data node holding item if item is non-null,
453 * else a request node. Uses relaxed write because item can
454 * only be seen after piggy-backing publication via CAS.
455 */
456 Node(Object item) {
457 ITEM.set(this, item);
458 isData = (item != null);
459 }
460
461 /** Constructs a (matched data) dummy node. */
462 Node() {
463 isData = true;
464 }
465
470
471 final boolean casItem(Object cmp, Object val) {
472 // assert isData == (cmp != null);
473 // assert isData == (val == null);
474 // assert !(cmp instanceof Node);
475 return ITEM.compareAndSet(this, cmp, val);
476 }
477
478 /**
479 * Links node to itself to avoid garbage retention. Called
480 * only after CASing head field, so uses relaxed write.
481 */
482 final void selfLink() {
483 // assert isMatched();
484 NEXT.setRelease(this, this);
485 }
486
487 final void appendRelaxed(Node next) {
488 // assert next != null;
489 // assert this.next == null;
490 NEXT.set(this, next);
491 }
492
493 /**
494 * Sets item (of a request node) to self and waiter to null,
495 * to avoid garbage retention after matching or cancelling.
496 * Uses relaxed writes because order is already constrained in
497 * the only calling contexts: item is forgotten only after
498 * volatile/atomic mechanics that extract items, and visitors
499 * of request nodes only ever check whether item is null.
500 * Similarly, clearing waiter follows either CAS or return
501 * from park (if ever parked; else we don't care).
502 */
503 final void forgetContents() {
504 // assert isMatched();
505 if (!isData)
506 ITEM.set(this, this);
507 WAITER.set(this, null);
508 }
509
510 /**
511 * Returns true if this node has been matched, including the
512 * case of artificial matches due to cancellation.
513 */
514 final boolean isMatched() {
515 return isData == (item == null);
516 }
517
518 /** Tries to CAS-match this node; if successful, wakes waiter. */
519 final boolean tryMatch(Object cmp, Object val) {
520 if (casItem(cmp, val)) {
521 LockSupport.unpark(waiter);
522 return true;
523 }
524 return false;
525 }
526
527 /**
528 * Returns true if a node with the given mode cannot be
529 * appended to this node because this node is unmatched and
530 * has opposite data mode.
531 */
532 final boolean cannotPrecede(boolean haveData) {
533 boolean d = isData;
534 return d != haveData && d != (item == null);
535 }
536
537 private static final long serialVersionUID = -3375979862319811754L;
538 }
539
540 /**
541 * A node from which the first live (non-matched) node (if any)
542 * can be reached in O(1) time.
543 * Invariants:
544 * - all live nodes are reachable from head via .next
545 * - head != null
546 * - (tmp = head).next != tmp || tmp != head
547 * Non-invariants:
548 * - head may or may not be live
549 * - it is permitted for tail to lag behind head, that is, for tail
550 * to not be reachable from head!
551 */
552 transient volatile Node head;
553
554 /**
555 * A node from which the last node on list (that is, the unique
556 * node with node.next == null) can be reached in O(1) time.
557 * Invariants:
558 * - the last node is always reachable from tail via .next
559 * - tail != null
560 * Non-invariants:
561 * - tail may or may not be live
562 * - it is permitted for tail to lag behind head, that is, for tail
563 * to not be reachable from head!
564 * - tail.next may or may not be self-linked.
565 */
566 private transient volatile Node tail;
567
568 /** The number of apparent failures to unsplice cancelled nodes */
569 private transient volatile int sweepVotes;
570
571 private boolean casTail(Node cmp, Node val) {
572 // assert cmp != null;
573 // assert val != null;
574 return TAIL.compareAndSet(this, cmp, val);
575 }
576
577 private boolean casHead(Node cmp, Node val) {
578 return HEAD.compareAndSet(this, cmp, val);
579 }
580
581 /** Atomic version of ++sweepVotes. */
582 private int incSweepVotes() {
583 return (int) SWEEPVOTES.getAndAdd(this, 1) + 1;
584 }
585
586 /**
587 * Tries to CAS pred.next (or head, if pred is null) from c to p.
588 * Caller must ensure that we're not unlinking the trailing node.
589 */
590 private boolean tryCasSuccessor(Node pred, Node c, Node p) {
591 // assert p != null;
592 // assert c.isData != (c.item != null);
593 // assert c != p;
594 if (pred != null)
595 return pred.casNext(c, p);
596 if (casHead(c, p)) {
597 c.selfLink();
598 return true;
599 }
600 return false;
601 }
602
603 /**
604 * Collapses dead (matched) nodes between pred and q.
605 * @param pred the last known live node, or null if none
672 if (h == null) h = head;
673 if (p.tryMatch(item, e)) {
674 if (h != p) skipDeadNodesNearHead(h, p);
675 return (E) item;
676 }
677 }
678 if ((q = p.next) == null) {
679 if (how == NOW) return e;
680 if (s == null) s = new Node(e);
681 if (!p.casNext(null, s)) continue;
682 if (p != t) casTail(t, s);
683 if (how == ASYNC) return e;
684 return awaitMatch(s, p, e, (how == TIMED), nanos);
685 }
686 if (p == (p = q)) continue restart;
687 }
688 }
689 }
690
691 /**
692 * Spins/yields/blocks until node s is matched or caller gives up.
693 *
694 * @param s the waiting node
695 * @param pred the predecessor of s, or null if unknown (the null
696 * case does not occur in any current calls but may in possible
697 * future extensions)
698 * @param e the comparison value for checking match
699 * @param timed if true, wait only until timeout elapses
700 * @param nanos timeout in nanosecs, used only if timed is true
701 * @return matched item, or e if unmatched on interrupt or timeout
702 */
703 private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
704 final long deadline = timed ? System.nanoTime() + nanos : 0L;
705 Thread w = Thread.currentThread();
706 int spins = -1; // initialized after first item and cancel checks
707 ThreadLocalRandom randomYields = null; // bound if needed
708
709 for (;;) {
710 final Object item;
711 if ((item = s.item) != e) { // matched
712 // assert item != s;
713 s.forgetContents(); // avoid garbage
714 @SuppressWarnings("unchecked") E itemE = (E) item;
715 return itemE;
716 }
717 else if (w.isInterrupted() || (timed && nanos <= 0L)) {
718 // try to cancel and unlink
719 if (s.casItem(e, s.isData ? null : s)) {
720 unsplice(pred, s);
721 return e;
722 }
723 // return normally if lost CAS
724 }
725 else if (spins < 0) { // establish spins at/near front
726 if ((spins = spinsFor(pred, s.isData)) > 0)
727 randomYields = ThreadLocalRandom.current();
728 }
729 else if (spins > 0) { // spin
730 --spins;
731 if (randomYields.nextInt(CHAINED_SPINS) == 0)
732 Thread.yield(); // occasionally yield
733 }
734 else if (s.waiter == null) {
735 s.waiter = w; // request unpark then recheck
736 }
737 else if (timed) {
738 nanos = deadline - System.nanoTime();
739 if (nanos > 0L)
740 LockSupport.parkNanos(this, nanos);
741 }
742 else {
743 LockSupport.park(this);
744 }
745 }
746 }
747
748 /**
749 * Returns spin/yield value for a node with given predecessor and
750 * data mode. See above for explanation.
751 */
752 private static int spinsFor(Node pred, boolean haveData) {
753 if (MP && pred != null) {
754 if (pred.isData != haveData) // phase change
755 return FRONT_SPINS + CHAINED_SPINS;
756 if (pred.isMatched()) // probably at front
757 return FRONT_SPINS;
758 if (pred.waiter == null) // pred apparently spinning
759 return CHAINED_SPINS;
760 }
761 return 0;
762 }
763
764 /* -------------- Traversal methods -------------- */
765
766 /**
767 * Returns the first unmatched data node, or null if none.
768 * Callers must recheck if the returned node is unmatched
769 * before using.
770 */
771 final Node firstDataNode() {
772 Node first = null;
773 restartFromHead: for (;;) {
774 Node h = head, p = h;
775 while (p != null) {
776 if (p.item != null) {
777 if (p.isData) {
778 first = p;
779 break;
780 }
781 }
1164
1165 /**
1166 * Unsplices (now or later) the given deleted/cancelled node with
1167 * the given predecessor.
1168 *
1169 * @param pred a node that was at one time known to be the
1170 * predecessor of s
1171 * @param s the node to be unspliced
1172 */
1173 final void unsplice(Node pred, Node s) {
1174 // assert pred != null;
1175 // assert pred != s;
1176 // assert s != null;
1177 // assert s.isMatched();
1178 // assert (SWEEP_THRESHOLD & (SWEEP_THRESHOLD - 1)) == 0;
1179 s.waiter = null; // disable signals
1180 /*
1181 * See above for rationale. Briefly: if pred still points to
1182 * s, try to unlink s. If s cannot be unlinked, because it is
1183 * trailing node or pred might be unlinked, and neither pred
1184 * nor s are head or offlist, add to sweepVotes, and if enough
1185 * votes have accumulated, sweep.
1186 */
1187 if (pred != null && pred.next == s) {
1188 Node n = s.next;
1189 if (n == null ||
1190 (n != s && pred.casNext(s, n) && pred.isMatched())) {
1191 for (;;) { // check if at, or could be, head
1192 Node h = head;
1193 if (h == pred || h == s)
1194 return; // at head or list empty
1195 if (!h.isMatched())
1196 break;
1197 Node hn = h.next;
1198 if (hn == null)
1199 return; // now empty
1200 if (hn != h && casHead(h, hn))
1201 h.selfLink(); // advance head
1202 }
1203 // sweep every SWEEP_THRESHOLD votes
1204 if (pred.next != pred && s.next != s // recheck if offlist
1205 && (incSweepVotes() & (SWEEP_THRESHOLD - 1)) == 0)
1206 sweep();
1207 }
1208 }
1209 }
1210
1211 /**
1212 * Unlinks matched (typically cancelled) nodes encountered in a
1213 * traversal from head.
1214 */
1215 private void sweep() {
1216 for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
1217 if (!s.isMatched())
1218 // Unmatched nodes are never self-linked
1219 p = s;
1220 else if ((n = s.next) == null) // trailing node is pinned
1221 break;
1222 else if (s == n) // stale
1223 // No need to also check for p == s, since that implies s == n
1224 p = head;
1225 else
1226 p.casNext(s, n);
1227 }
1228 }
1229
1230 /**
1231 * Creates an initially empty {@code LinkedTransferQueue}.
1232 */
1233 public LinkedTransferQueue() {
1234 head = tail = new Node();
1235 }
1248 for (E e : c) {
1249 Node newNode = new Node(Objects.requireNonNull(e));
1250 if (h == null)
1251 h = t = newNode;
1252 else
1253 t.appendRelaxed(t = newNode);
1254 }
1255 if (h == null)
1256 h = t = new Node();
1257 head = h;
1258 tail = t;
1259 }
1260
1261 /**
1262 * Inserts the specified element at the tail of this queue.
1263 * As the queue is unbounded, this method will never block.
1264 *
1265 * @throws NullPointerException if the specified element is null
1266 */
1267 public void put(E e) {
1268 xfer(e, true, ASYNC, 0);
1269 }
1270
1271 /**
1272 * Inserts the specified element at the tail of this queue.
1273 * As the queue is unbounded, this method will never block or
1274 * return {@code false}.
1275 *
1276 * @return {@code true} (as specified by
1277 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
1278 * @throws NullPointerException if the specified element is null
1279 */
1280 public boolean offer(E e, long timeout, TimeUnit unit) {
1281 xfer(e, true, ASYNC, 0);
1282 return true;
1283 }
1284
1285 /**
1286 * Inserts the specified element at the tail of this queue.
1287 * As the queue is unbounded, this method will never return {@code false}.
1288 *
1289 * @return {@code true} (as specified by {@link Queue#offer})
1290 * @throws NullPointerException if the specified element is null
1291 */
1292 public boolean offer(E e) {
1293 xfer(e, true, ASYNC, 0);
1294 return true;
1295 }
1296
1297 /**
1298 * Inserts the specified element at the tail of this queue.
1299 * As the queue is unbounded, this method will never throw
1300 * {@link IllegalStateException} or return {@code false}.
1301 *
1302 * @return {@code true} (as specified by {@link Collection#add})
1303 * @throws NullPointerException if the specified element is null
1304 */
1305 public boolean add(E e) {
1306 xfer(e, true, ASYNC, 0);
1307 return true;
1308 }
1309
1310 /**
1311 * Transfers the element to a waiting consumer immediately, if possible.
1312 *
1313 * <p>More precisely, transfers the specified element immediately
1314 * if there exists a consumer already waiting to receive it (in
1315 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1316 * otherwise returning {@code false} without enqueuing the element.
1317 *
1318 * @throws NullPointerException if the specified element is null
1319 */
1320 public boolean tryTransfer(E e) {
1321 return xfer(e, true, NOW, 0) == null;
1322 }
1323
1324 /**
1325 * Transfers the element to a consumer, waiting if necessary to do so.
1326 *
1327 * <p>More precisely, transfers the specified element immediately
1328 * if there exists a consumer already waiting to receive it (in
1329 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1330 * else inserts the specified element at the tail of this queue
1331 * and waits until the element is received by a consumer.
1332 *
1333 * @throws NullPointerException if the specified element is null
1334 */
1335 public void transfer(E e) throws InterruptedException {
1336 if (xfer(e, true, SYNC, 0) != null) {
1337 Thread.interrupted(); // failure possible only due to interrupt
1338 throw new InterruptedException();
1339 }
1340 }
1341
1342 /**
1343 * Transfers the element to a consumer if it is possible to do so
1344 * before the timeout elapses.
1345 *
1346 * <p>More precisely, transfers the specified element immediately
1347 * if there exists a consumer already waiting to receive it (in
1348 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1349 * else inserts the specified element at the tail of this queue
1350 * and waits until the element is received by a consumer,
1351 * returning {@code false} if the specified wait time elapses
1352 * before the element can be transferred.
1353 *
1354 * @throws NullPointerException if the specified element is null
1355 */
1356 public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1357 throws InterruptedException {
1358 if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1359 return true;
1360 if (!Thread.interrupted())
1361 return false;
1362 throw new InterruptedException();
1363 }
1364
1365 public E take() throws InterruptedException {
1366 E e = xfer(null, false, SYNC, 0);
1367 if (e != null)
1368 return e;
1369 Thread.interrupted();
1370 throw new InterruptedException();
1371 }
1372
1373 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1374 E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1375 if (e != null || !Thread.interrupted())
1376 return e;
1377 throw new InterruptedException();
1378 }
1379
1380 public E poll() {
1381 return xfer(null, false, NOW, 0);
1382 }
1383
1384 /**
1385 * @throws NullPointerException {@inheritDoc}
1386 * @throws IllegalArgumentException {@inheritDoc}
1387 */
1388 public int drainTo(Collection<? super E> c) {
1389 Objects.requireNonNull(c);
1390 if (c == this)
1391 throw new IllegalArgumentException();
1392 int n = 0;
1393 for (E e; (e = poll()) != null; n++)
1394 c.add(e);
1395 return n;
1396 }
1397
1398 /**
1399 * @throws NullPointerException {@inheritDoc}
1400 * @throws IllegalArgumentException {@inheritDoc}
1401 */
1705 for (Node c = p;; q = p.next) {
1706 if (q == null || !q.isMatched()) {
1707 pred = skipDeadNodes(pred, c, p, q); p = q; break;
1708 }
1709 if (p == (p = q)) { pred = null; p = head; break; }
1710 }
1711 }
1712 }
1713
1714 /**
1715 * @throws NullPointerException {@inheritDoc}
1716 */
1717 public void forEach(Consumer<? super E> action) {
1718 Objects.requireNonNull(action);
1719 forEachFrom(action, head);
1720 }
1721
1722 // VarHandle mechanics
1723 private static final VarHandle HEAD;
1724 private static final VarHandle TAIL;
1725 private static final VarHandle SWEEPVOTES;
1726 static final VarHandle ITEM;
1727 static final VarHandle NEXT;
1728 static final VarHandle WAITER;
1729 static {
1730 try {
1731 MethodHandles.Lookup l = MethodHandles.lookup();
1732 HEAD = l.findVarHandle(LinkedTransferQueue.class, "head",
1733 Node.class);
1734 TAIL = l.findVarHandle(LinkedTransferQueue.class, "tail",
1735 Node.class);
1736 SWEEPVOTES = l.findVarHandle(LinkedTransferQueue.class, "sweepVotes",
1737 int.class);
1738 ITEM = l.findVarHandle(Node.class, "item", Object.class);
1739 NEXT = l.findVarHandle(Node.class, "next", Node.class);
1740 WAITER = l.findVarHandle(Node.class, "waiter", Thread.class);
1741 } catch (ReflectiveOperationException e) {
1742 throw new ExceptionInInitializerError(e);
1743 }
1744
1745 // Reduce the risk of rare disastrous classloading in first call to
1746 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1747 Class<?> ensureLoaded = LockSupport.class;
1748 }
1749 }
|
292 * this call was ASYNC (e.g. offer), an element was
293 * successfully added to the end of the queue and we return.
294 *
295 * Of course, this naive traversal is O(n) when no match is
296 * possible. We optimize the traversal by maintaining a tail
297 * pointer, which is expected to be "near" the end of the list.
298 * It is only safe to fast-forward to tail (in the presence of
299 * arbitrary concurrent changes) if it is pointing to a node of
300 * the same mode, even if it is dead (in this case no preceding
301 * node could still be matchable by this traversal). If we
302 * need to restart due to falling off-list, we can again
303 * fast-forward to tail, but only if it has changed since the
304 * last traversal (else we might loop forever). If tail cannot
305 * be used, traversal starts at head (but in this case we
306 * expect to be able to match near head). As with head, we
307 * CAS-advance the tail pointer by at least two hops.
308 *
309 * 2. Await match or cancellation (method awaitMatch)
310 *
311 * Wait for another thread to match node; instead cancelling if
312 * the current thread was interrupted or the wait timed out. To
313 * improve performance in common single-source / single-sink
314 * usages when there are more tasks that cores, an initial
315 * Thread.yield is tried when there is apparently only one
316 * waiter. In other cases, waiters may help with some
317 * bookkeeping, then park/unpark.
318 *
319 * ** Unlinking removed interior nodes **
320 *
321 * In addition to minimizing garbage retention via self-linking
322 * described above, we also unlink removed interior nodes. These
323 * may arise due to timed out or interrupted waits, or calls to
324 * remove(x) or Iterator.remove. Normally, given a node that was
325 * at one time known to be the predecessor of some node s that is
326 * to be removed, we can unsplice s by CASing the next field of
327 * its predecessor if it still points to s (otherwise s must
328 * already have been removed or is now offlist). But there are two
329 * situations in which we cannot guarantee to make node s
330 * unreachable in this way: (1) If s is the trailing node of list
331 * (i.e., with null next), then it is pinned as the target node
332 * for appends, so can only be removed later after other nodes are
333 * appended. (2) We cannot necessarily unlink s given a
334 * predecessor node that is matched (including the case of being
335 * cancelled): the predecessor may already be unspliced, in which
336 * case some previous reachable node may still point to s.
337 * (For further explanation see Herlihy & Shavit "The Art of
338 * Multiprocessor Programming" chapter 9). Although, in both
339 * cases, we can rule out the need for further action if either s
340 * or its predecessor are (or can be made to be) at, or fall off
341 * from, the head of list.
342 *
343 * Without taking these into account, it would be possible for an
344 * unbounded number of supposedly removed nodes to remain reachable.
345 * Situations leading to such buildup are uncommon but can occur
346 * in practice; for example when a series of short timed calls to
347 * poll repeatedly time out at the trailing node but otherwise
348 * never fall off the list because of an untimed call to take() at
349 * the front of the queue.
350 *
351 * When these cases arise, rather than always retraversing the
352 * entire list to find an actual predecessor to unlink (which
353 * won't help for case (1) anyway), we record the need to sweep the
354 * next time any thread would otherwise block in awaitMatch. Also,
355 * because traversal operations on the linked list of nodes are a
356 * natural opportunity to sweep dead nodes, we generally do so,
357 * including all the operations that might remove elements as they
358 * traverse, such as removeIf and Iterator.remove. This largely
359 * eliminates long chains of dead interior nodes, except from
360 * cancelled or timed out blocking operations.
361 *
362 * Note that we cannot self-link unlinked interior nodes during
363 * sweeps. However, the associated garbage chains terminate when
364 * some successor ultimately falls off the head of the list and is
365 * self-linked.
366 */
367
368 /**
369 * The number of nanoseconds for which it is faster to spin
370 * rather than to use timed park. A rough estimate suffices.
371 * Using a power of two minus one simplifies some comparisons.
372 */
373 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
374
375 /**
376 * The maximum number of estimated removal failures (sweepVotes)
377 * to tolerate before sweeping through the queue unlinking
378 * cancelled nodes that were not unlinked upon initial
379 * removal. See above for explanation. The value must be at least
380 * two to avoid useless sweeps when removing trailing nodes.
381 */
382 static final int SWEEP_THRESHOLD = 32;
383
384 /**
385 * Queue nodes. Uses Object, not E, for items to allow forgetting
386 * them after use. Writes that are intrinsically ordered wrt
387 * other accesses or CASes use simple relaxed forms.
388 */
389 static final class Node implements ForkJoinPool.ManagedBlocker {
390 final boolean isData; // false if this is a request node
391 volatile Object item; // initially non-null if isData; CASed to match
392 volatile Node next;
393 volatile Thread waiter; // null when not waiting for a match
394
395 /**
396 * Constructs a data node holding item if item is non-null,
397 * else a request node. Uses relaxed write because item can
398 * only be seen after piggy-backing publication via CAS.
399 */
400 Node(Object item) {
401 ITEM.set(this, item);
402 isData = (item != null);
403 }
404
405 /** Constructs a (matched data) dummy node. */
406 Node() {
407 isData = true;
408 }
409
414
415 final boolean casItem(Object cmp, Object val) {
416 // assert isData == (cmp != null);
417 // assert isData == (val == null);
418 // assert !(cmp instanceof Node);
419 return ITEM.compareAndSet(this, cmp, val);
420 }
421
422 /**
423 * Links node to itself to avoid garbage retention. Called
424 * only after CASing head field, so uses relaxed write.
425 */
426 final void selfLink() {
427 // assert isMatched();
428 NEXT.setRelease(this, this);
429 }
430
431 final void appendRelaxed(Node next) {
432 // assert next != null;
433 // assert this.next == null;
434 NEXT.setOpaque(this, next);
435 }
436
437 /**
438 * Returns true if this node has been matched, including the
439 * case of artificial matches due to cancellation.
440 */
441 final boolean isMatched() {
442 return isData == (item == null);
443 }
444
445 /** Tries to CAS-match this node; if successful, wakes waiter. */
446 final boolean tryMatch(Object cmp, Object val) {
447 if (casItem(cmp, val)) {
448 LockSupport.unpark(waiter);
449 return true;
450 }
451 return false;
452 }
453
454 /**
455 * Returns true if a node with the given mode cannot be
456 * appended to this node because this node is unmatched and
457 * has opposite data mode.
458 */
459 final boolean cannotPrecede(boolean haveData) {
460 boolean d = isData;
461 return d != haveData && d != (item == null);
462 }
463
464 public final boolean isReleasable() {
465 return (isData == (item == null)) ||
466 Thread.currentThread().isInterrupted();
467 }
468
469 public final boolean block() {
470 while (!isReleasable()) LockSupport.park();
471 return true;
472 }
473
474 private static final long serialVersionUID = -3375979862319811754L;
475 }
476
477 /**
478 * A node from which the first live (non-matched) node (if any)
479 * can be reached in O(1) time.
480 * Invariants:
481 * - all live nodes are reachable from head via .next
482 * - head != null
483 * - (tmp = head).next != tmp || tmp != head
484 * Non-invariants:
485 * - head may or may not be live
486 * - it is permitted for tail to lag behind head, that is, for tail
487 * to not be reachable from head!
488 */
489 transient volatile Node head;
490
491 /**
492 * A node from which the last node on list (that is, the unique
493 * node with node.next == null) can be reached in O(1) time.
494 * Invariants:
495 * - the last node is always reachable from tail via .next
496 * - tail != null
497 * Non-invariants:
498 * - tail may or may not be live
499 * - it is permitted for tail to lag behind head, that is, for tail
500 * to not be reachable from head!
501 * - tail.next may or may not be self-linked.
502 */
503 private transient volatile Node tail;
504
505 /** The number of apparent failures to unsplice cancelled nodes */
506 private transient volatile boolean needSweep;
507
508 private boolean casTail(Node cmp, Node val) {
509 // assert cmp != null;
510 // assert val != null;
511 return TAIL.compareAndSet(this, cmp, val);
512 }
513
514 private boolean casHead(Node cmp, Node val) {
515 return HEAD.compareAndSet(this, cmp, val);
516 }
517
518 /**
519 * Tries to CAS pred.next (or head, if pred is null) from c to p.
520 * Caller must ensure that we're not unlinking the trailing node.
521 */
522 private boolean tryCasSuccessor(Node pred, Node c, Node p) {
523 // assert p != null;
524 // assert c.isData != (c.item != null);
525 // assert c != p;
526 if (pred != null)
527 return pred.casNext(c, p);
528 if (casHead(c, p)) {
529 c.selfLink();
530 return true;
531 }
532 return false;
533 }
534
535 /**
536 * Collapses dead (matched) nodes between pred and q.
537 * @param pred the last known live node, or null if none
604 if (h == null) h = head;
605 if (p.tryMatch(item, e)) {
606 if (h != p) skipDeadNodesNearHead(h, p);
607 return (E) item;
608 }
609 }
610 if ((q = p.next) == null) {
611 if (how == NOW) return e;
612 if (s == null) s = new Node(e);
613 if (!p.casNext(null, s)) continue;
614 if (p != t) casTail(t, s);
615 if (how == ASYNC) return e;
616 return awaitMatch(s, p, e, (how == TIMED), nanos);
617 }
618 if (p == (p = q)) continue restart;
619 }
620 }
621 }
622
623 /**
624 * Possibly blocks until node s is matched or caller gives up.
625 *
626 * @param s the waiting node
627 * @param pred the predecessor of s, or null if unknown (the null
628 * case does not occur in any current calls but may in possible
629 * future extensions)
630 * @param e the comparison value for checking match
631 * @param timed if true, wait only until timeout elapses
632 * @param nanos timeout in nanosecs, used only if timed is true
633 * @return matched item, or e if unmatched on interrupt or timeout
634 */
635 @SuppressWarnings("unchecked")
636 private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
637 final boolean isData = s.isData;
638 final long deadline = timed ? System.nanoTime() + nanos : 0L;
639 final Thread w = Thread.currentThread();
640 int stat = -1; // -1: may yield, +1: park, else 0
641 Object item;
642 while ((item = s.item) == e) {
643 if (needSweep) // help clean
644 sweep();
645 else if ((timed && nanos <= 0L) || w.isInterrupted()) {
646 if (s.casItem(e, (e == null) ? s : null)) {
647 unsplice(pred, s); // cancelled
648 return e;
649 }
650 }
651 else if (stat <= 0) {
652 if (pred != null && pred.next == s) {
653 if (stat < 0 &&
654 (pred.isData != isData || pred.isMatched())) {
655 stat = 0; // yield once if first
656 Thread.yield();
657 }
658 else {
659 stat = 1;
660 s.waiter = w; // enable unpark
661 }
662 } // else signal in progress
663 }
664 else if ((item = s.item) != e)
665 break; // recheck
666 else if (!timed) {
667 LockSupport.setCurrentBlocker(this);
668 try {
669 ForkJoinPool.managedBlock(s);
670 } catch (InterruptedException cannotHappen) { }
671 LockSupport.setCurrentBlocker(null);
672 }
673 else {
674 nanos = deadline - System.nanoTime();
675 if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
676 LockSupport.parkNanos(this, nanos);
677 }
678 }
679 if (stat == 1)
680 WAITER.set(s, null);
681 if (!isData)
682 ITEM.set(s, s); // self-link to avoid garbage
683 return (E) item;
684 }
685
686 /* -------------- Traversal methods -------------- */
687
688 /**
689 * Returns the first unmatched data node, or null if none.
690 * Callers must recheck if the returned node is unmatched
691 * before using.
692 */
693 final Node firstDataNode() {
694 Node first = null;
695 restartFromHead: for (;;) {
696 Node h = head, p = h;
697 while (p != null) {
698 if (p.item != null) {
699 if (p.isData) {
700 first = p;
701 break;
702 }
703 }
1086
1087 /**
1088 * Unsplices (now or later) the given deleted/cancelled node with
1089 * the given predecessor.
1090 *
1091 * @param pred a node that was at one time known to be the
1092 * predecessor of s
1093 * @param s the node to be unspliced
1094 */
1095 final void unsplice(Node pred, Node s) {
1096 // assert pred != null;
1097 // assert pred != s;
1098 // assert s != null;
1099 // assert s.isMatched();
1100 // assert (SWEEP_THRESHOLD & (SWEEP_THRESHOLD - 1)) == 0;
1101 s.waiter = null; // disable signals
1102 /*
1103 * See above for rationale. Briefly: if pred still points to
1104 * s, try to unlink s. If s cannot be unlinked, because it is
1105 * trailing node or pred might be unlinked, and neither pred
1106 * nor s are head or offlist, set needSweep;
1107 */
1108 if (pred != null && pred.next == s) {
1109 Node n = s.next;
1110 if (n == null ||
1111 (n != s && pred.casNext(s, n) && pred.isMatched())) {
1112 for (;;) { // check if at, or could be, head
1113 Node h = head;
1114 if (h == pred || h == s)
1115 return; // at head or list empty
1116 if (!h.isMatched())
1117 break;
1118 Node hn = h.next;
1119 if (hn == null)
1120 return; // now empty
1121 if (hn != h && casHead(h, hn))
1122 h.selfLink(); // advance head
1123 }
1124 if (pred.next != pred && s.next != s)
1125 needSweep = true;
1126 }
1127 }
1128 }
1129
1130 /**
1131 * Unlinks matched (typically cancelled) nodes encountered in a
1132 * traversal from head.
1133 */
1134 private void sweep() {
1135 needSweep = false;
1136 for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
1137 if (!s.isMatched())
1138 // Unmatched nodes are never self-linked
1139 p = s;
1140 else if ((n = s.next) == null) // trailing node is pinned
1141 break;
1142 else if (s == n) // stale
1143 // No need to also check for p == s, since that implies s == n
1144 p = head;
1145 else
1146 p.casNext(s, n);
1147 }
1148 }
1149
1150 /**
1151 * Creates an initially empty {@code LinkedTransferQueue}.
1152 */
1153 public LinkedTransferQueue() {
1154 head = tail = new Node();
1155 }
1168 for (E e : c) {
1169 Node newNode = new Node(Objects.requireNonNull(e));
1170 if (h == null)
1171 h = t = newNode;
1172 else
1173 t.appendRelaxed(t = newNode);
1174 }
1175 if (h == null)
1176 h = t = new Node();
1177 head = h;
1178 tail = t;
1179 }
1180
1181 /**
1182 * Inserts the specified element at the tail of this queue.
1183 * As the queue is unbounded, this method will never block.
1184 *
1185 * @throws NullPointerException if the specified element is null
1186 */
1187 public void put(E e) {
1188 xfer(e, true, ASYNC, 0L);
1189 }
1190
1191 /**
1192 * Inserts the specified element at the tail of this queue.
1193 * As the queue is unbounded, this method will never block or
1194 * return {@code false}.
1195 *
1196 * @return {@code true} (as specified by
1197 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
1198 * @throws NullPointerException if the specified element is null
1199 */
1200 public boolean offer(E e, long timeout, TimeUnit unit) {
1201 xfer(e, true, ASYNC, 0L);
1202 return true;
1203 }
1204
1205 /**
1206 * Inserts the specified element at the tail of this queue.
1207 * As the queue is unbounded, this method will never return {@code false}.
1208 *
1209 * @return {@code true} (as specified by {@link Queue#offer})
1210 * @throws NullPointerException if the specified element is null
1211 */
1212 public boolean offer(E e) {
1213 xfer(e, true, ASYNC, 0L);
1214 return true;
1215 }
1216
1217 /**
1218 * Inserts the specified element at the tail of this queue.
1219 * As the queue is unbounded, this method will never throw
1220 * {@link IllegalStateException} or return {@code false}.
1221 *
1222 * @return {@code true} (as specified by {@link Collection#add})
1223 * @throws NullPointerException if the specified element is null
1224 */
1225 public boolean add(E e) {
1226 xfer(e, true, ASYNC, 0L);
1227 return true;
1228 }
1229
1230 /**
1231 * Transfers the element to a waiting consumer immediately, if possible.
1232 *
1233 * <p>More precisely, transfers the specified element immediately
1234 * if there exists a consumer already waiting to receive it (in
1235 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1236 * otherwise returning {@code false} without enqueuing the element.
1237 *
1238 * @throws NullPointerException if the specified element is null
1239 */
1240 public boolean tryTransfer(E e) {
1241 return xfer(e, true, NOW, 0L) == null;
1242 }
1243
1244 /**
1245 * Transfers the element to a consumer, waiting if necessary to do so.
1246 *
1247 * <p>More precisely, transfers the specified element immediately
1248 * if there exists a consumer already waiting to receive it (in
1249 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1250 * else inserts the specified element at the tail of this queue
1251 * and waits until the element is received by a consumer.
1252 *
1253 * @throws NullPointerException if the specified element is null
1254 */
1255 public void transfer(E e) throws InterruptedException {
1256 if (xfer(e, true, SYNC, 0L) != null) {
1257 Thread.interrupted(); // failure possible only due to interrupt
1258 throw new InterruptedException();
1259 }
1260 }
1261
1262 /**
1263 * Transfers the element to a consumer if it is possible to do so
1264 * before the timeout elapses.
1265 *
1266 * <p>More precisely, transfers the specified element immediately
1267 * if there exists a consumer already waiting to receive it (in
1268 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1269 * else inserts the specified element at the tail of this queue
1270 * and waits until the element is received by a consumer,
1271 * returning {@code false} if the specified wait time elapses
1272 * before the element can be transferred.
1273 *
1274 * @throws NullPointerException if the specified element is null
1275 */
1276 public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1277 throws InterruptedException {
1278 if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1279 return true;
1280 if (!Thread.interrupted())
1281 return false;
1282 throw new InterruptedException();
1283 }
1284
1285 public E take() throws InterruptedException {
1286 E e = xfer(null, false, SYNC, 0L);
1287 if (e != null)
1288 return e;
1289 Thread.interrupted();
1290 throw new InterruptedException();
1291 }
1292
1293 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1294 E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1295 if (e != null || !Thread.interrupted())
1296 return e;
1297 throw new InterruptedException();
1298 }
1299
1300 public E poll() {
1301 return xfer(null, false, NOW, 0L);
1302 }
1303
1304 /**
1305 * @throws NullPointerException {@inheritDoc}
1306 * @throws IllegalArgumentException {@inheritDoc}
1307 */
1308 public int drainTo(Collection<? super E> c) {
1309 Objects.requireNonNull(c);
1310 if (c == this)
1311 throw new IllegalArgumentException();
1312 int n = 0;
1313 for (E e; (e = poll()) != null; n++)
1314 c.add(e);
1315 return n;
1316 }
1317
1318 /**
1319 * @throws NullPointerException {@inheritDoc}
1320 * @throws IllegalArgumentException {@inheritDoc}
1321 */
1625 for (Node c = p;; q = p.next) {
1626 if (q == null || !q.isMatched()) {
1627 pred = skipDeadNodes(pred, c, p, q); p = q; break;
1628 }
1629 if (p == (p = q)) { pred = null; p = head; break; }
1630 }
1631 }
1632 }
1633
1634 /**
1635 * @throws NullPointerException {@inheritDoc}
1636 */
1637 public void forEach(Consumer<? super E> action) {
1638 Objects.requireNonNull(action);
1639 forEachFrom(action, head);
1640 }
1641
1642 // VarHandle mechanics
1643 private static final VarHandle HEAD;
1644 private static final VarHandle TAIL;
1645 static final VarHandle ITEM;
1646 static final VarHandle NEXT;
1647 static final VarHandle WAITER;
1648 static {
1649 try {
1650 MethodHandles.Lookup l = MethodHandles.lookup();
1651 HEAD = l.findVarHandle(LinkedTransferQueue.class, "head",
1652 Node.class);
1653 TAIL = l.findVarHandle(LinkedTransferQueue.class, "tail",
1654 Node.class);
1655 ITEM = l.findVarHandle(Node.class, "item", Object.class);
1656 NEXT = l.findVarHandle(Node.class, "next", Node.class);
1657 WAITER = l.findVarHandle(Node.class, "waiter", Thread.class);
1658 } catch (ReflectiveOperationException e) {
1659 throw new ExceptionInInitializerError(e);
1660 }
1661
1662 // Reduce the risk of rare disastrous classloading in first call to
1663 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1664 Class<?> ensureLoaded = LockSupport.class;
1665 }
1666 }
|