1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.AbstractQueue; 39 import java.util.Collection; 40 import java.util.Iterator; 41 import java.util.NoSuchElementException; 42 import java.util.Objects; 43 import java.util.Spliterator; 44 import java.util.Spliterators; 45 import java.util.concurrent.atomic.AtomicInteger; 46 import java.util.concurrent.locks.Condition; 47 import java.util.concurrent.locks.ReentrantLock; 48 import java.util.function.Consumer; 49 import java.util.function.Predicate; 50 51 /** 52 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on 53 * linked nodes. 54 * This queue orders elements FIFO (first-in-first-out). 55 * The <em>head</em> of the queue is that element that has been on the 56 * queue the longest time. 57 * The <em>tail</em> of the queue is that element that has been on the 58 * queue the shortest time. New elements 59 * are inserted at the tail of the queue, and the queue retrieval 60 * operations obtain elements at the head of the queue. 61 * Linked queues typically have higher throughput than array-based queues but 62 * less predictable performance in most concurrent applications. 63 * 64 * <p>The optional capacity bound constructor argument serves as a 65 * way to prevent excessive queue expansion. The capacity, if unspecified, 66 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are 67 * dynamically created upon each insertion unless this would bring the 68 * queue above capacity. 69 * 70 * <p>This class and its iterator implement all of the <em>optional</em> 71 * methods of the {@link Collection} and {@link Iterator} interfaces. 72 * 73 * <p>This class is a member of the 74 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 75 * Java Collections Framework</a>. 76 * 77 * @since 1.5 78 * @author Doug Lea 79 * @param <E> the type of elements held in this queue 80 */ 81 public class LinkedBlockingQueue<E> extends AbstractQueue<E> 82 implements BlockingQueue<E>, java.io.Serializable { 83 private static final long serialVersionUID = -6903933977591709194L; 84 85 /* 86 * A variant of the "two lock queue" algorithm. The putLock gates 87 * entry to put (and offer), and has an associated condition for 88 * waiting puts. Similarly for the takeLock. The "count" field 89 * that they both rely on is maintained as an atomic to avoid 90 * needing to get both locks in most cases. Also, to minimize need 91 * for puts to get takeLock and vice-versa, cascading notifies are 92 * used. When a put notices that it has enabled at least one take, 93 * it signals taker. That taker in turn signals others if more 94 * items have been entered since the signal. And symmetrically for 95 * takes signalling puts. Operations such as remove(Object) and 96 * iterators acquire both locks. 97 * 98 * Visibility between writers and readers is provided as follows: 99 * 100 * Whenever an element is enqueued, the putLock is acquired and 101 * count updated. A subsequent reader guarantees visibility to the 102 * enqueued Node by either acquiring the putLock (via fullyLock) 103 * or by acquiring the takeLock, and then reading n = count.get(); 104 * this gives visibility to the first n items. 105 * 106 * To implement weakly consistent iterators, it appears we need to 107 * keep all Nodes GC-reachable from a predecessor dequeued Node. 108 * That would cause two problems: 109 * - allow a rogue Iterator to cause unbounded memory retention 110 * - cause cross-generational linking of old Nodes to new Nodes if 111 * a Node was tenured while live, which generational GCs have a 112 * hard time dealing with, causing repeated major collections. 113 * However, only non-deleted Nodes need to be reachable from 114 * dequeued Nodes, and reachability does not necessarily have to 115 * be of the kind understood by the GC. We use the trick of 116 * linking a Node that has just been dequeued to itself. Such a 117 * self-link implicitly means to advance to head.next. 118 */ 119 120 /** 121 * Linked list node class. 122 */ 123 static class Node<E> { 124 E item; 125 126 /** 127 * One of: 128 * - the real successor Node 129 * - this Node, meaning the successor is head.next 130 * - null, meaning there is no successor (this is the last node) 131 */ 132 Node<E> next; 133 134 Node(E x) { item = x; } 135 } 136 137 /** The capacity bound, or Integer.MAX_VALUE if none */ 138 private final int capacity; 139 140 /** Current number of elements */ 141 private final AtomicInteger count = new AtomicInteger(); 142 143 /** 144 * Head of linked list. 145 * Invariant: head.item == null 146 */ 147 transient Node<E> head; 148 149 /** 150 * Tail of linked list. 151 * Invariant: last.next == null 152 */ 153 private transient Node<E> last; 154 155 /** Lock held by take, poll, etc */ 156 private final ReentrantLock takeLock = new ReentrantLock(); 157 158 /** Wait queue for waiting takes */ 159 private final Condition notEmpty = takeLock.newCondition(); 160 161 /** Lock held by put, offer, etc */ 162 private final ReentrantLock putLock = new ReentrantLock(); 163 164 /** Wait queue for waiting puts */ 165 private final Condition notFull = putLock.newCondition(); 166 167 /** 168 * Signals a waiting take. Called only from put/offer (which do not 169 * otherwise ordinarily lock takeLock.) 170 */ 171 private void signalNotEmpty() { 172 final ReentrantLock takeLock = this.takeLock; 173 takeLock.lock(); 174 try { 175 notEmpty.signal(); 176 } finally { 177 takeLock.unlock(); 178 } 179 } 180 181 /** 182 * Signals a waiting put. Called only from take/poll. 183 */ 184 private void signalNotFull() { 185 final ReentrantLock putLock = this.putLock; 186 putLock.lock(); 187 try { 188 notFull.signal(); 189 } finally { 190 putLock.unlock(); 191 } 192 } 193 194 /** 195 * Links node at end of queue. 196 * 197 * @param node the node 198 */ 199 private void enqueue(Node<E> node) { 200 // assert putLock.isHeldByCurrentThread(); 201 // assert last.next == null; 202 last = last.next = node; 203 } 204 205 /** 206 * Removes a node from head of queue. 207 * 208 * @return the node 209 */ 210 private E dequeue() { 211 // assert takeLock.isHeldByCurrentThread(); 212 // assert head.item == null; 213 Node<E> h = head; 214 Node<E> first = h.next; 215 h.next = h; // help GC 216 head = first; 217 E x = first.item; 218 first.item = null; 219 return x; 220 } 221 222 /** 223 * Locks to prevent both puts and takes. 224 */ 225 void fullyLock() { 226 putLock.lock(); 227 takeLock.lock(); 228 } 229 230 /** 231 * Unlocks to allow both puts and takes. 232 */ 233 void fullyUnlock() { 234 takeLock.unlock(); 235 putLock.unlock(); 236 } 237 238 /** 239 * Creates a {@code LinkedBlockingQueue} with a capacity of 240 * {@link Integer#MAX_VALUE}. 241 */ 242 public LinkedBlockingQueue() { 243 this(Integer.MAX_VALUE); 244 } 245 246 /** 247 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. 248 * 249 * @param capacity the capacity of this queue 250 * @throws IllegalArgumentException if {@code capacity} is not greater 251 * than zero 252 */ 253 public LinkedBlockingQueue(int capacity) { 254 if (capacity <= 0) throw new IllegalArgumentException(); 255 this.capacity = capacity; 256 last = head = new Node<E>(null); 257 } 258 259 /** 260 * Creates a {@code LinkedBlockingQueue} with a capacity of 261 * {@link Integer#MAX_VALUE}, initially containing the elements of the 262 * given collection, 263 * added in traversal order of the collection's iterator. 264 * 265 * @param c the collection of elements to initially contain 266 * @throws NullPointerException if the specified collection or any 267 * of its elements are null 268 */ 269 public LinkedBlockingQueue(Collection<? extends E> c) { 270 this(Integer.MAX_VALUE); 271 final ReentrantLock putLock = this.putLock; 272 putLock.lock(); // Never contended, but necessary for visibility 273 try { 274 int n = 0; 275 for (E e : c) { 276 if (e == null) 277 throw new NullPointerException(); 278 if (n == capacity) 279 throw new IllegalStateException("Queue full"); 280 enqueue(new Node<E>(e)); 281 ++n; 282 } 283 count.set(n); 284 } finally { 285 putLock.unlock(); 286 } 287 } 288 289 // this doc comment is overridden to remove the reference to collections 290 // greater in size than Integer.MAX_VALUE 291 /** 292 * Returns the number of elements in this queue. 293 * 294 * @return the number of elements in this queue 295 */ 296 public int size() { 297 return count.get(); 298 } 299 300 // this doc comment is a modified copy of the inherited doc comment, 301 // without the reference to unlimited queues. 302 /** 303 * Returns the number of additional elements that this queue can ideally 304 * (in the absence of memory or resource constraints) accept without 305 * blocking. This is always equal to the initial capacity of this queue 306 * less the current {@code size} of this queue. 307 * 308 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 309 * an element will succeed by inspecting {@code remainingCapacity} 310 * because it may be the case that another thread is about to 311 * insert or remove an element. 312 */ 313 public int remainingCapacity() { 314 return capacity - count.get(); 315 } 316 317 /** 318 * Inserts the specified element at the tail of this queue, waiting if 319 * necessary for space to become available. 320 * 321 * @throws InterruptedException {@inheritDoc} 322 * @throws NullPointerException {@inheritDoc} 323 */ 324 public void put(E e) throws InterruptedException { 325 if (e == null) throw new NullPointerException(); 326 // Note: convention in all put/take/etc is to preset local var 327 // holding count negative to indicate failure unless set. 328 int c = -1; 329 Node<E> node = new Node<E>(e); 330 final ReentrantLock putLock = this.putLock; 331 final AtomicInteger count = this.count; 332 putLock.lockInterruptibly(); 333 try { 334 /* 335 * Note that count is used in wait guard even though it is 336 * not protected by lock. This works because count can 337 * only decrease at this point (all other puts are shut 338 * out by lock), and we (or some other waiting put) are 339 * signalled if it ever changes from capacity. Similarly 340 * for all other uses of count in other wait guards. 341 */ 342 while (count.get() == capacity) { 343 notFull.await(); 344 } 345 enqueue(node); 346 c = count.getAndIncrement(); 347 if (c + 1 < capacity) 348 notFull.signal(); 349 } finally { 350 putLock.unlock(); 351 } 352 if (c == 0) 353 signalNotEmpty(); 354 } 355 356 /** 357 * Inserts the specified element at the tail of this queue, waiting if 358 * necessary up to the specified wait time for space to become available. 359 * 360 * @return {@code true} if successful, or {@code false} if 361 * the specified waiting time elapses before space is available 362 * @throws InterruptedException {@inheritDoc} 363 * @throws NullPointerException {@inheritDoc} 364 */ 365 public boolean offer(E e, long timeout, TimeUnit unit) 366 throws InterruptedException { 367 368 if (e == null) throw new NullPointerException(); 369 long nanos = unit.toNanos(timeout); 370 int c = -1; 371 final ReentrantLock putLock = this.putLock; 372 final AtomicInteger count = this.count; 373 putLock.lockInterruptibly(); 374 try { 375 while (count.get() == capacity) { 376 if (nanos <= 0L) 377 return false; 378 nanos = notFull.awaitNanos(nanos); 379 } 380 enqueue(new Node<E>(e)); 381 c = count.getAndIncrement(); 382 if (c + 1 < capacity) 383 notFull.signal(); 384 } finally { 385 putLock.unlock(); 386 } 387 if (c == 0) 388 signalNotEmpty(); 389 return true; 390 } 391 392 /** 393 * Inserts the specified element at the tail of this queue if it is 394 * possible to do so immediately without exceeding the queue's capacity, 395 * returning {@code true} upon success and {@code false} if this queue 396 * is full. 397 * When using a capacity-restricted queue, this method is generally 398 * preferable to method {@link BlockingQueue#add add}, which can fail to 399 * insert an element only by throwing an exception. 400 * 401 * @throws NullPointerException if the specified element is null 402 */ 403 public boolean offer(E e) { 404 if (e == null) throw new NullPointerException(); 405 final AtomicInteger count = this.count; 406 if (count.get() == capacity) 407 return false; 408 int c = -1; 409 Node<E> node = new Node<E>(e); 410 final ReentrantLock putLock = this.putLock; 411 putLock.lock(); 412 try { 413 if (count.get() < capacity) { 414 enqueue(node); 415 c = count.getAndIncrement(); 416 if (c + 1 < capacity) 417 notFull.signal(); 418 } 419 } finally { 420 putLock.unlock(); 421 } 422 if (c == 0) 423 signalNotEmpty(); 424 return c >= 0; 425 } 426 427 public E take() throws InterruptedException { 428 E x; 429 int c = -1; 430 final AtomicInteger count = this.count; 431 final ReentrantLock takeLock = this.takeLock; 432 takeLock.lockInterruptibly(); 433 try { 434 while (count.get() == 0) { 435 notEmpty.await(); 436 } 437 x = dequeue(); 438 c = count.getAndDecrement(); 439 if (c > 1) 440 notEmpty.signal(); 441 } finally { 442 takeLock.unlock(); 443 } 444 if (c == capacity) 445 signalNotFull(); 446 return x; 447 } 448 449 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 450 E x = null; 451 int c = -1; 452 long nanos = unit.toNanos(timeout); 453 final AtomicInteger count = this.count; 454 final ReentrantLock takeLock = this.takeLock; 455 takeLock.lockInterruptibly(); 456 try { 457 while (count.get() == 0) { 458 if (nanos <= 0L) 459 return null; 460 nanos = notEmpty.awaitNanos(nanos); 461 } 462 x = dequeue(); 463 c = count.getAndDecrement(); 464 if (c > 1) 465 notEmpty.signal(); 466 } finally { 467 takeLock.unlock(); 468 } 469 if (c == capacity) 470 signalNotFull(); 471 return x; 472 } 473 474 public E poll() { 475 final AtomicInteger count = this.count; 476 if (count.get() == 0) 477 return null; 478 E x = null; 479 int c = -1; 480 final ReentrantLock takeLock = this.takeLock; 481 takeLock.lock(); 482 try { 483 if (count.get() > 0) { 484 x = dequeue(); 485 c = count.getAndDecrement(); 486 if (c > 1) 487 notEmpty.signal(); 488 } 489 } finally { 490 takeLock.unlock(); 491 } 492 if (c == capacity) 493 signalNotFull(); 494 return x; 495 } 496 497 public E peek() { 498 if (count.get() == 0) 499 return null; 500 final ReentrantLock takeLock = this.takeLock; 501 takeLock.lock(); 502 try { 503 return (count.get() > 0) ? head.next.item : null; 504 } finally { 505 takeLock.unlock(); 506 } 507 } 508 509 /** 510 * Unlinks interior Node p with predecessor pred. 511 */ 512 void unlink(Node<E> p, Node<E> pred) { 513 // assert putLock.isHeldByCurrentThread(); 514 // assert takeLock.isHeldByCurrentThread(); 515 // p.next is not changed, to allow iterators that are 516 // traversing p to maintain their weak-consistency guarantee. 517 p.item = null; 518 pred.next = p.next; 519 if (last == p) 520 last = pred; 521 if (count.getAndDecrement() == capacity) 522 notFull.signal(); 523 } 524 525 /** 526 * Removes a single instance of the specified element from this queue, 527 * if it is present. More formally, removes an element {@code e} such 528 * that {@code o.equals(e)}, if this queue contains one or more such 529 * elements. 530 * Returns {@code true} if this queue contained the specified element 531 * (or equivalently, if this queue changed as a result of the call). 532 * 533 * @param o element to be removed from this queue, if present 534 * @return {@code true} if this queue changed as a result of the call 535 */ 536 public boolean remove(Object o) { 537 if (o == null) return false; 538 fullyLock(); 539 try { 540 for (Node<E> pred = head, p = pred.next; 541 p != null; 542 pred = p, p = p.next) { 543 if (o.equals(p.item)) { 544 unlink(p, pred); 545 return true; 546 } 547 } 548 return false; 549 } finally { 550 fullyUnlock(); 551 } 552 } 553 554 /** 555 * Returns {@code true} if this queue contains the specified element. 556 * More formally, returns {@code true} if and only if this queue contains 557 * at least one element {@code e} such that {@code o.equals(e)}. 558 * 559 * @param o object to be checked for containment in this queue 560 * @return {@code true} if this queue contains the specified element 561 */ 562 public boolean contains(Object o) { 563 if (o == null) return false; 564 fullyLock(); 565 try { 566 for (Node<E> p = head.next; p != null; p = p.next) 567 if (o.equals(p.item)) 568 return true; 569 return false; 570 } finally { 571 fullyUnlock(); 572 } 573 } 574 575 /** 576 * Returns an array containing all of the elements in this queue, in 577 * proper sequence. 578 * 579 * <p>The returned array will be "safe" in that no references to it are 580 * maintained by this queue. (In other words, this method must allocate 581 * a new array). The caller is thus free to modify the returned array. 582 * 583 * <p>This method acts as bridge between array-based and collection-based 584 * APIs. 585 * 586 * @return an array containing all of the elements in this queue 587 */ 588 public Object[] toArray() { 589 fullyLock(); 590 try { 591 int size = count.get(); 592 Object[] a = new Object[size]; 593 int k = 0; 594 for (Node<E> p = head.next; p != null; p = p.next) 595 a[k++] = p.item; 596 return a; 597 } finally { 598 fullyUnlock(); 599 } 600 } 601 602 /** 603 * Returns an array containing all of the elements in this queue, in 604 * proper sequence; the runtime type of the returned array is that of 605 * the specified array. If the queue fits in the specified array, it 606 * is returned therein. Otherwise, a new array is allocated with the 607 * runtime type of the specified array and the size of this queue. 608 * 609 * <p>If this queue fits in the specified array with room to spare 610 * (i.e., the array has more elements than this queue), the element in 611 * the array immediately following the end of the queue is set to 612 * {@code null}. 613 * 614 * <p>Like the {@link #toArray()} method, this method acts as bridge between 615 * array-based and collection-based APIs. Further, this method allows 616 * precise control over the runtime type of the output array, and may, 617 * under certain circumstances, be used to save allocation costs. 618 * 619 * <p>Suppose {@code x} is a queue known to contain only strings. 620 * The following code can be used to dump the queue into a newly 621 * allocated array of {@code String}: 622 * 623 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 624 * 625 * Note that {@code toArray(new Object[0])} is identical in function to 626 * {@code toArray()}. 627 * 628 * @param a the array into which the elements of the queue are to 629 * be stored, if it is big enough; otherwise, a new array of the 630 * same runtime type is allocated for this purpose 631 * @return an array containing all of the elements in this queue 632 * @throws ArrayStoreException if the runtime type of the specified array 633 * is not a supertype of the runtime type of every element in 634 * this queue 635 * @throws NullPointerException if the specified array is null 636 */ 637 @SuppressWarnings("unchecked") 638 public <T> T[] toArray(T[] a) { 639 fullyLock(); 640 try { 641 int size = count.get(); 642 if (a.length < size) 643 a = (T[])java.lang.reflect.Array.newInstance 644 (a.getClass().getComponentType(), size); 645 646 int k = 0; 647 for (Node<E> p = head.next; p != null; p = p.next) 648 a[k++] = (T)p.item; 649 if (a.length > k) 650 a[k] = null; 651 return a; 652 } finally { 653 fullyUnlock(); 654 } 655 } 656 657 public String toString() { 658 return Helpers.collectionToString(this); 659 } 660 661 /** 662 * Atomically removes all of the elements from this queue. 663 * The queue will be empty after this call returns. 664 */ 665 public void clear() { 666 fullyLock(); 667 try { 668 for (Node<E> p, h = head; (p = h.next) != null; h = p) { 669 h.next = h; 670 p.item = null; 671 } 672 head = last; 673 // assert head.item == null && head.next == null; 674 if (count.getAndSet(0) == capacity) 675 notFull.signal(); 676 } finally { 677 fullyUnlock(); 678 } 679 } 680 681 /** 682 * @throws UnsupportedOperationException {@inheritDoc} 683 * @throws ClassCastException {@inheritDoc} 684 * @throws NullPointerException {@inheritDoc} 685 * @throws IllegalArgumentException {@inheritDoc} 686 */ 687 public int drainTo(Collection<? super E> c) { 688 return drainTo(c, Integer.MAX_VALUE); 689 } 690 691 /** 692 * @throws UnsupportedOperationException {@inheritDoc} 693 * @throws ClassCastException {@inheritDoc} 694 * @throws NullPointerException {@inheritDoc} 695 * @throws IllegalArgumentException {@inheritDoc} 696 */ 697 public int drainTo(Collection<? super E> c, int maxElements) { 698 Objects.requireNonNull(c); 699 if (c == this) 700 throw new IllegalArgumentException(); 701 if (maxElements <= 0) 702 return 0; 703 boolean signalNotFull = false; 704 final ReentrantLock takeLock = this.takeLock; 705 takeLock.lock(); 706 try { 707 int n = Math.min(maxElements, count.get()); 708 // count.get provides visibility to first n Nodes 709 Node<E> h = head; 710 int i = 0; 711 try { 712 while (i < n) { 713 Node<E> p = h.next; 714 c.add(p.item); 715 p.item = null; 716 h.next = h; 717 h = p; 718 ++i; 719 } 720 return n; 721 } finally { 722 // Restore invariants even if c.add() threw 723 if (i > 0) { 724 // assert h.item == null; 725 head = h; 726 signalNotFull = (count.getAndAdd(-i) == capacity); 727 } 728 } 729 } finally { 730 takeLock.unlock(); 731 if (signalNotFull) 732 signalNotFull(); 733 } 734 } 735 736 /** 737 * Used for any element traversal that is not entirely under lock. 738 * Such traversals must handle both: 739 * - dequeued nodes (p.next == p) 740 * - (possibly multiple) interior removed nodes (p.item == null) 741 */ 742 Node<E> succ(Node<E> p) { 743 if (p == (p = p.next)) 744 p = head.next; 745 return p; 746 } 747 748 /** 749 * Returns an iterator over the elements in this queue in proper sequence. 750 * The elements will be returned in order from first (head) to last (tail). 751 * 752 * <p>The returned iterator is 753 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 754 * 755 * @return an iterator over the elements in this queue in proper sequence 756 */ 757 public Iterator<E> iterator() { 758 return new Itr(); 759 } 760 761 /** 762 * Weakly-consistent iterator. 763 * 764 * Lazily updated ancestor field provides expected O(1) remove(), 765 * but still O(n) in the worst case, whenever the saved ancestor 766 * is concurrently deleted. 767 */ 768 private class Itr implements Iterator<E> { 769 private Node<E> next; // Node holding nextItem 770 private E nextItem; // next item to hand out 771 private Node<E> lastRet; 772 private Node<E> ancestor; // Helps unlink lastRet on remove() 773 774 Itr() { 775 fullyLock(); 776 try { 777 if ((next = head.next) != null) 778 nextItem = next.item; 779 } finally { 780 fullyUnlock(); 781 } 782 } 783 784 public boolean hasNext() { 785 return next != null; 786 } 787 788 public E next() { 789 Node<E> p; 790 if ((p = next) == null) 791 throw new NoSuchElementException(); 792 lastRet = p; 793 E x = nextItem; 794 fullyLock(); 795 try { 796 E e = null; 797 for (p = p.next; p != null && (e = p.item) == null; ) 798 p = succ(p); 799 next = p; 800 nextItem = e; 801 } finally { 802 fullyUnlock(); 803 } 804 return x; 805 } 806 807 public void forEachRemaining(Consumer<? super E> action) { 808 // A variant of forEachFrom 809 Objects.requireNonNull(action); 810 Node<E> p; 811 if ((p = next) == null) return; 812 lastRet = p; 813 next = null; 814 final int batchSize = 64; 815 Object[] es = null; 816 int n, len = 1; 817 do { 818 fullyLock(); 819 try { 820 if (es == null) { 821 p = p.next; 822 for (Node<E> q = p; q != null; q = succ(q)) 823 if (q.item != null && ++len == batchSize) 824 break; 825 es = new Object[len]; 826 es[0] = nextItem; 827 nextItem = null; 828 n = 1; 829 } else 830 n = 0; 831 for (; p != null && n < len; p = succ(p)) 832 if ((es[n] = p.item) != null) { 833 lastRet = p; 834 n++; 835 } 836 } finally { 837 fullyUnlock(); 838 } 839 for (int i = 0; i < n; i++) { 840 @SuppressWarnings("unchecked") E e = (E) es[i]; 841 action.accept(e); 842 } 843 } while (n > 0 && p != null); 844 } 845 846 public void remove() { 847 Node<E> p = lastRet; 848 if (p == null) 849 throw new IllegalStateException(); 850 lastRet = null; 851 fullyLock(); 852 try { 853 if (p.item != null) { 854 if (ancestor == null) 855 ancestor = head; 856 ancestor = findPred(p, ancestor); 857 unlink(p, ancestor); 858 } 859 } finally { 860 fullyUnlock(); 861 } 862 } 863 } 864 865 /** 866 * A customized variant of Spliterators.IteratorSpliterator. 867 * Keep this class in sync with (very similar) LBDSpliterator. 868 */ 869 private final class LBQSpliterator implements Spliterator<E> { 870 static final int MAX_BATCH = 1 << 25; // max batch array size; 871 Node<E> current; // current node; null until initialized 872 int batch; // batch size for splits 873 boolean exhausted; // true when no more nodes 874 long est = size(); // size estimate 875 876 LBQSpliterator() {} 877 878 public long estimateSize() { return est; } 879 880 public Spliterator<E> trySplit() { 881 Node<E> h; 882 if (!exhausted && 883 ((h = current) != null || (h = head.next) != null) 884 && h.next != null) { 885 int n = batch = Math.min(batch + 1, MAX_BATCH); 886 Object[] a = new Object[n]; 887 int i = 0; 888 Node<E> p = current; 889 fullyLock(); 890 try { 891 if (p != null || (p = head.next) != null) 892 for (; p != null && i < n; p = succ(p)) 893 if ((a[i] = p.item) != null) 894 i++; 895 } finally { 896 fullyUnlock(); 897 } 898 if ((current = p) == null) { 899 est = 0L; 900 exhausted = true; 901 } 902 else if ((est -= i) < 0L) 903 est = 0L; 904 if (i > 0) 905 return Spliterators.spliterator 906 (a, 0, i, (Spliterator.ORDERED | 907 Spliterator.NONNULL | 908 Spliterator.CONCURRENT)); 909 } 910 return null; 911 } 912 913 public boolean tryAdvance(Consumer<? super E> action) { 914 Objects.requireNonNull(action); 915 if (!exhausted) { 916 E e = null; 917 fullyLock(); 918 try { 919 Node<E> p; 920 if ((p = current) != null || (p = head.next) != null) 921 do { 922 e = p.item; 923 p = succ(p); 924 } while (e == null && p != null); 925 if ((current = p) == null) 926 exhausted = true; 927 } finally { 928 fullyUnlock(); 929 } 930 if (e != null) { 931 action.accept(e); 932 return true; 933 } 934 } 935 return false; 936 } 937 938 public void forEachRemaining(Consumer<? super E> action) { 939 Objects.requireNonNull(action); 940 if (!exhausted) { 941 exhausted = true; 942 Node<E> p = current; 943 current = null; 944 forEachFrom(action, p); 945 } 946 } 947 948 public int characteristics() { 949 return (Spliterator.ORDERED | 950 Spliterator.NONNULL | 951 Spliterator.CONCURRENT); 952 } 953 } 954 955 /** 956 * Returns a {@link Spliterator} over the elements in this queue. 957 * 958 * <p>The returned spliterator is 959 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 960 * 961 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 962 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 963 * 964 * @implNote 965 * The {@code Spliterator} implements {@code trySplit} to permit limited 966 * parallelism. 967 * 968 * @return a {@code Spliterator} over the elements in this queue 969 * @since 1.8 970 */ 971 public Spliterator<E> spliterator() { 972 return new LBQSpliterator(); 973 } 974 975 /** 976 * @throws NullPointerException {@inheritDoc} 977 */ 978 public void forEach(Consumer<? super E> action) { 979 Objects.requireNonNull(action); 980 forEachFrom(action, null); 981 } 982 983 /** 984 * Runs action on each element found during a traversal starting at p. 985 * If p is null, traversal starts at head. 986 */ 987 void forEachFrom(Consumer<? super E> action, Node<E> p) { 988 // Extract batches of elements while holding the lock; then 989 // run the action on the elements while not 990 final int batchSize = 64; // max number of elements per batch 991 Object[] es = null; // container for batch of elements 992 int n, len = 0; 993 do { 994 fullyLock(); 995 try { 996 if (es == null) { 997 if (p == null) p = head.next; 998 for (Node<E> q = p; q != null; q = succ(q)) 999 if (q.item != null && ++len == batchSize) 1000 break; 1001 es = new Object[len]; 1002 } 1003 for (n = 0; p != null && n < len; p = succ(p)) 1004 if ((es[n] = p.item) != null) 1005 n++; 1006 } finally { 1007 fullyUnlock(); 1008 } 1009 for (int i = 0; i < n; i++) { 1010 @SuppressWarnings("unchecked") E e = (E) es[i]; 1011 action.accept(e); 1012 } 1013 } while (n > 0 && p != null); 1014 } 1015 1016 /** 1017 * @throws NullPointerException {@inheritDoc} 1018 */ 1019 public boolean removeIf(Predicate<? super E> filter) { 1020 Objects.requireNonNull(filter); 1021 return bulkRemove(filter); 1022 } 1023 1024 /** 1025 * @throws NullPointerException {@inheritDoc} 1026 */ 1027 public boolean removeAll(Collection<?> c) { 1028 Objects.requireNonNull(c); 1029 return bulkRemove(e -> c.contains(e)); 1030 } 1031 1032 /** 1033 * @throws NullPointerException {@inheritDoc} 1034 */ 1035 public boolean retainAll(Collection<?> c) { 1036 Objects.requireNonNull(c); 1037 return bulkRemove(e -> !c.contains(e)); 1038 } 1039 1040 /** 1041 * Returns the predecessor of live node p, given a node that was 1042 * once a live ancestor of p (or head); allows unlinking of p. 1043 */ 1044 Node<E> findPred(Node<E> p, Node<E> ancestor) { 1045 // assert p.item != null; 1046 if (ancestor.item == null) 1047 ancestor = head; 1048 // Fails with NPE if precondition not satisfied 1049 for (Node<E> q; (q = ancestor.next) != p; ) 1050 ancestor = q; 1051 return ancestor; 1052 } 1053 1054 /** Implementation of bulk remove methods. */ 1055 @SuppressWarnings("unchecked") 1056 private boolean bulkRemove(Predicate<? super E> filter) { 1057 boolean removed = false; 1058 Node<E> p = null, ancestor = head; 1059 Node<E>[] nodes = null; 1060 int n, len = 0; 1061 do { 1062 // 1. Extract batch of up to 64 elements while holding the lock. 1063 long deathRow = 0; // "bitset" of size 64 1064 fullyLock(); 1065 try { 1066 if (nodes == null) { 1067 if (p == null) p = head.next; 1068 for (Node<E> q = p; q != null; q = succ(q)) 1069 if (q.item != null && ++len == 64) 1070 break; 1071 nodes = (Node<E>[]) new Node<?>[len]; 1072 } 1073 for (n = 0; p != null && n < len; p = succ(p)) 1074 nodes[n++] = p; 1075 } finally { 1076 fullyUnlock(); 1077 } 1078 1079 // 2. Run the filter on the elements while lock is free. 1080 for (int i = 0; i < n; i++) { 1081 final E e; 1082 if ((e = nodes[i].item) != null && filter.test(e)) 1083 deathRow |= 1L << i; 1084 } 1085 1086 // 3. Remove any filtered elements while holding the lock. 1087 if (deathRow != 0) { 1088 fullyLock(); 1089 try { 1090 for (int i = 0; i < n; i++) { 1091 final Node<E> q; 1092 if ((deathRow & (1L << i)) != 0L 1093 && (q = nodes[i]).item != null) { 1094 ancestor = findPred(q, ancestor); 1095 unlink(q, ancestor); 1096 removed = true; 1097 } 1098 } 1099 } finally { 1100 fullyUnlock(); 1101 } 1102 } 1103 } while (n > 0 && p != null); 1104 return removed; 1105 } 1106 1107 /** 1108 * Saves this queue to a stream (that is, serializes it). 1109 * 1110 * @param s the stream 1111 * @throws java.io.IOException if an I/O error occurs 1112 * @serialData The capacity is emitted (int), followed by all of 1113 * its elements (each an {@code Object}) in the proper order, 1114 * followed by a null 1115 */ 1116 private void writeObject(java.io.ObjectOutputStream s) 1117 throws java.io.IOException { 1118 1119 fullyLock(); 1120 try { 1121 // Write out any hidden stuff, plus capacity 1122 s.defaultWriteObject(); 1123 1124 // Write out all elements in the proper order. 1125 for (Node<E> p = head.next; p != null; p = p.next) 1126 s.writeObject(p.item); 1127 1128 // Use trailing null as sentinel 1129 s.writeObject(null); 1130 } finally { 1131 fullyUnlock(); 1132 } 1133 } 1134 1135 /** 1136 * Reconstitutes this queue from a stream (that is, deserializes it). 1137 * @param s the stream 1138 * @throws ClassNotFoundException if the class of a serialized object 1139 * could not be found 1140 * @throws java.io.IOException if an I/O error occurs 1141 */ 1142 private void readObject(java.io.ObjectInputStream s) 1143 throws java.io.IOException, ClassNotFoundException { 1144 // Read in capacity, and any hidden stuff 1145 s.defaultReadObject(); 1146 1147 count.set(0); 1148 last = head = new Node<E>(null); 1149 1150 // Read in all elements and place in queue 1151 for (;;) { 1152 @SuppressWarnings("unchecked") 1153 E item = (E)s.readObject(); 1154 if (item == null) 1155 break; 1156 add(item); 1157 } 1158 } 1159 }