1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.ref.WeakReference; 39 import java.util.AbstractQueue; 40 import java.util.Arrays; 41 import java.util.Collection; 42 import java.util.Iterator; 43 import java.util.NoSuchElementException; 44 import java.util.Objects; 45 import java.util.Spliterator; 46 import java.util.Spliterators; 47 import java.util.concurrent.locks.Condition; 48 import java.util.concurrent.locks.ReentrantLock; 49 import java.util.function.Consumer; 50 import java.util.function.Predicate; 51 52 /** 53 * A bounded {@linkplain BlockingQueue blocking queue} backed by an 54 * array. This queue orders elements FIFO (first-in-first-out). The 55 * <em>head</em> of the queue is that element that has been on the 56 * queue the longest time. The <em>tail</em> of the queue is that 57 * element that has been on the queue the shortest time. New elements 58 * are inserted at the tail of the queue, and the queue retrieval 59 * operations obtain elements at the head of the queue. 60 * 61 * <p>This is a classic "bounded buffer", in which a 62 * fixed-sized array holds elements inserted by producers and 63 * extracted by consumers. Once created, the capacity cannot be 64 * changed. Attempts to {@code put} an element into a full queue 65 * will result in the operation blocking; attempts to {@code take} an 66 * element from an empty queue will similarly block. 67 * 68 * <p>This class supports an optional fairness policy for ordering 69 * waiting producer and consumer threads. By default, this ordering 70 * is not guaranteed. However, a queue constructed with fairness set 71 * to {@code true} grants threads access in FIFO order. Fairness 72 * generally decreases throughput but reduces variability and avoids 73 * starvation. 74 * 75 * <p>This class and its iterator implement all of the <em>optional</em> 76 * methods of the {@link Collection} and {@link Iterator} interfaces. 77 * 78 * <p>This class is a member of the 79 * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 80 * Java Collections Framework</a>. 81 * 82 * @since 1.5 83 * @author Doug Lea 84 * @param <E> the type of elements held in this queue 85 */ 86 public class ArrayBlockingQueue<E> extends AbstractQueue<E> 87 implements BlockingQueue<E>, java.io.Serializable { 88 89 /* 90 * Much of the implementation mechanics, especially the unusual 91 * nested loops, are shared and co-maintained with ArrayDeque. 92 */ 93 94 /** 95 * Serialization ID. This class relies on default serialization 96 * even for the items array, which is default-serialized, even if 97 * it is empty. Otherwise it could not be declared final, which is 98 * necessary here. 99 */ 100 private static final long serialVersionUID = -817911632652898426L; 101 102 /** The queued items */ 103 final Object[] items; 104 105 /** items index for next take, poll, peek or remove */ 106 int takeIndex; 107 108 /** items index for next put, offer, or add */ 109 int putIndex; 110 111 /** Number of elements in the queue */ 112 int count; 113 114 /* 115 * Concurrency control uses the classic two-condition algorithm 116 * found in any textbook. 117 */ 118 119 /** Main lock guarding all access */ 120 final ReentrantLock lock; 121 122 /** Condition for waiting takes */ 123 private final Condition notEmpty; 124 125 /** Condition for waiting puts */ 126 private final Condition notFull; 127 128 /** 129 * Shared state for currently active iterators, or null if there 130 * are known not to be any. Allows queue operations to update 131 * iterator state. 132 */ 133 transient Itrs itrs; 134 135 // Internal helper methods 136 137 /** 138 * Increments i, mod modulus. 139 * Precondition and postcondition: 0 <= i < modulus. 140 */ 141 static final int inc(int i, int modulus) { 142 if (++i >= modulus) i = 0; 143 return i; 144 } 145 146 /** 147 * Decrements i, mod modulus. 148 * Precondition and postcondition: 0 <= i < modulus. 149 */ 150 static final int dec(int i, int modulus) { 151 if (--i < 0) i = modulus - 1; 152 return i; 153 } 154 155 /** 156 * Returns item at index i. 157 */ 158 @SuppressWarnings("unchecked") 159 final E itemAt(int i) { 160 return (E) items[i]; 161 } 162 163 /** 164 * Returns element at array index i. 165 * This is a slight abuse of generics, accepted by javac. 166 */ 167 @SuppressWarnings("unchecked") 168 static <E> E itemAt(Object[] items, int i) { 169 return (E) items[i]; 170 } 171 172 /** 173 * Inserts element at current put position, advances, and signals. 174 * Call only when holding lock. 175 */ 176 private void enqueue(E e) { 177 // assert lock.isHeldByCurrentThread(); 178 // assert lock.getHoldCount() == 1; 179 // assert items[putIndex] == null; 180 final Object[] items = this.items; 181 items[putIndex] = e; 182 if (++putIndex == items.length) putIndex = 0; 183 count++; 184 notEmpty.signal(); 185 } 186 187 /** 188 * Extracts element at current take position, advances, and signals. 189 * Call only when holding lock. 190 */ 191 private E dequeue() { 192 // assert lock.isHeldByCurrentThread(); 193 // assert lock.getHoldCount() == 1; 194 // assert items[takeIndex] != null; 195 final Object[] items = this.items; 196 @SuppressWarnings("unchecked") 197 E e = (E) items[takeIndex]; 198 items[takeIndex] = null; 199 if (++takeIndex == items.length) takeIndex = 0; 200 count--; 201 if (itrs != null) 202 itrs.elementDequeued(); 203 notFull.signal(); 204 return e; 205 } 206 207 /** 208 * Deletes item at array index removeIndex. 209 * Utility for remove(Object) and iterator.remove. 210 * Call only when holding lock. 211 */ 212 void removeAt(final int removeIndex) { 213 // assert lock.isHeldByCurrentThread(); 214 // assert lock.getHoldCount() == 1; 215 // assert items[removeIndex] != null; 216 // assert removeIndex >= 0 && removeIndex < items.length; 217 final Object[] items = this.items; 218 if (removeIndex == takeIndex) { 219 // removing front item; just advance 220 items[takeIndex] = null; 221 if (++takeIndex == items.length) takeIndex = 0; 222 count--; 223 if (itrs != null) 224 itrs.elementDequeued(); 225 } else { 226 // an "interior" remove 227 228 // slide over all others up through putIndex. 229 for (int i = removeIndex, putIndex = this.putIndex;;) { 230 int pred = i; 231 if (++i == items.length) i = 0; 232 if (i == putIndex) { 233 items[pred] = null; 234 this.putIndex = pred; 235 break; 236 } 237 items[pred] = items[i]; 238 } 239 count--; 240 if (itrs != null) 241 itrs.removedAt(removeIndex); 242 } 243 notFull.signal(); 244 } 245 246 /** 247 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 248 * capacity and default access policy. 249 * 250 * @param capacity the capacity of this queue 251 * @throws IllegalArgumentException if {@code capacity < 1} 252 */ 253 public ArrayBlockingQueue(int capacity) { 254 this(capacity, false); 255 } 256 257 /** 258 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 259 * capacity and the specified access policy. 260 * 261 * @param capacity the capacity of this queue 262 * @param fair if {@code true} then queue accesses for threads blocked 263 * on insertion or removal, are processed in FIFO order; 264 * if {@code false} the access order is unspecified. 265 * @throws IllegalArgumentException if {@code capacity < 1} 266 */ 267 public ArrayBlockingQueue(int capacity, boolean fair) { 268 if (capacity <= 0) 269 throw new IllegalArgumentException(); 270 this.items = new Object[capacity]; 271 lock = new ReentrantLock(fair); 272 notEmpty = lock.newCondition(); 273 notFull = lock.newCondition(); 274 } 275 276 /** 277 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 278 * capacity, the specified access policy and initially containing the 279 * elements of the given collection, 280 * added in traversal order of the collection's iterator. 281 * 282 * @param capacity the capacity of this queue 283 * @param fair if {@code true} then queue accesses for threads blocked 284 * on insertion or removal, are processed in FIFO order; 285 * if {@code false} the access order is unspecified. 286 * @param c the collection of elements to initially contain 287 * @throws IllegalArgumentException if {@code capacity} is less than 288 * {@code c.size()}, or less than 1. 289 * @throws NullPointerException if the specified collection or any 290 * of its elements are null 291 */ 292 public ArrayBlockingQueue(int capacity, boolean fair, 293 Collection<? extends E> c) { 294 this(capacity, fair); 295 296 final ReentrantLock lock = this.lock; 297 lock.lock(); // Lock only for visibility, not mutual exclusion 298 try { 299 final Object[] items = this.items; 300 int i = 0; 301 try { 302 for (E e : c) 303 items[i++] = Objects.requireNonNull(e); 304 } catch (ArrayIndexOutOfBoundsException ex) { 305 throw new IllegalArgumentException(); 306 } 307 count = i; 308 putIndex = (i == capacity) ? 0 : i; 309 } finally { 310 lock.unlock(); 311 } 312 } 313 314 /** 315 * Inserts the specified element at the tail of this queue if it is 316 * possible to do so immediately without exceeding the queue's capacity, 317 * returning {@code true} upon success and throwing an 318 * {@code IllegalStateException} if this queue is full. 319 * 320 * @param e the element to add 321 * @return {@code true} (as specified by {@link Collection#add}) 322 * @throws IllegalStateException if this queue is full 323 * @throws NullPointerException if the specified element is null 324 */ 325 public boolean add(E e) { 326 return super.add(e); 327 } 328 329 /** 330 * Inserts the specified element at the tail of this queue if it is 331 * possible to do so immediately without exceeding the queue's capacity, 332 * returning {@code true} upon success and {@code false} if this queue 333 * is full. This method is generally preferable to method {@link #add}, 334 * which can fail to insert an element only by throwing an exception. 335 * 336 * @throws NullPointerException if the specified element is null 337 */ 338 public boolean offer(E e) { 339 Objects.requireNonNull(e); 340 final ReentrantLock lock = this.lock; 341 lock.lock(); 342 try { 343 if (count == items.length) 344 return false; 345 else { 346 enqueue(e); 347 return true; 348 } 349 } finally { 350 lock.unlock(); 351 } 352 } 353 354 /** 355 * Inserts the specified element at the tail of this queue, waiting 356 * for space to become available if the queue is full. 357 * 358 * @throws InterruptedException {@inheritDoc} 359 * @throws NullPointerException {@inheritDoc} 360 */ 361 public void put(E e) throws InterruptedException { 362 Objects.requireNonNull(e); 363 final ReentrantLock lock = this.lock; 364 lock.lockInterruptibly(); 365 try { 366 while (count == items.length) 367 notFull.await(); 368 enqueue(e); 369 } finally { 370 lock.unlock(); 371 } 372 } 373 374 /** 375 * Inserts the specified element at the tail of this queue, waiting 376 * up to the specified wait time for space to become available if 377 * the queue is full. 378 * 379 * @throws InterruptedException {@inheritDoc} 380 * @throws NullPointerException {@inheritDoc} 381 */ 382 public boolean offer(E e, long timeout, TimeUnit unit) 383 throws InterruptedException { 384 385 Objects.requireNonNull(e); 386 long nanos = unit.toNanos(timeout); 387 final ReentrantLock lock = this.lock; 388 lock.lockInterruptibly(); 389 try { 390 while (count == items.length) { 391 if (nanos <= 0L) 392 return false; 393 nanos = notFull.awaitNanos(nanos); 394 } 395 enqueue(e); 396 return true; 397 } finally { 398 lock.unlock(); 399 } 400 } 401 402 public E poll() { 403 final ReentrantLock lock = this.lock; 404 lock.lock(); 405 try { 406 return (count == 0) ? null : dequeue(); 407 } finally { 408 lock.unlock(); 409 } 410 } 411 412 public E take() throws InterruptedException { 413 final ReentrantLock lock = this.lock; 414 lock.lockInterruptibly(); 415 try { 416 while (count == 0) 417 notEmpty.await(); 418 return dequeue(); 419 } finally { 420 lock.unlock(); 421 } 422 } 423 424 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 425 long nanos = unit.toNanos(timeout); 426 final ReentrantLock lock = this.lock; 427 lock.lockInterruptibly(); 428 try { 429 while (count == 0) { 430 if (nanos <= 0L) 431 return null; 432 nanos = notEmpty.awaitNanos(nanos); 433 } 434 return dequeue(); 435 } finally { 436 lock.unlock(); 437 } 438 } 439 440 public E peek() { 441 final ReentrantLock lock = this.lock; 442 lock.lock(); 443 try { 444 return itemAt(takeIndex); // null when queue is empty 445 } finally { 446 lock.unlock(); 447 } 448 } 449 450 // this doc comment is overridden to remove the reference to collections 451 // greater in size than Integer.MAX_VALUE 452 /** 453 * Returns the number of elements in this queue. 454 * 455 * @return the number of elements in this queue 456 */ 457 public int size() { 458 final ReentrantLock lock = this.lock; 459 lock.lock(); 460 try { 461 return count; 462 } finally { 463 lock.unlock(); 464 } 465 } 466 467 // this doc comment is a modified copy of the inherited doc comment, 468 // without the reference to unlimited queues. 469 /** 470 * Returns the number of additional elements that this queue can ideally 471 * (in the absence of memory or resource constraints) accept without 472 * blocking. This is always equal to the initial capacity of this queue 473 * less the current {@code size} of this queue. 474 * 475 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 476 * an element will succeed by inspecting {@code remainingCapacity} 477 * because it may be the case that another thread is about to 478 * insert or remove an element. 479 */ 480 public int remainingCapacity() { 481 final ReentrantLock lock = this.lock; 482 lock.lock(); 483 try { 484 return items.length - count; 485 } finally { 486 lock.unlock(); 487 } 488 } 489 490 /** 491 * Removes a single instance of the specified element from this queue, 492 * if it is present. More formally, removes an element {@code e} such 493 * that {@code o.equals(e)}, if this queue contains one or more such 494 * elements. 495 * Returns {@code true} if this queue contained the specified element 496 * (or equivalently, if this queue changed as a result of the call). 497 * 498 * <p>Removal of interior elements in circular array based queues 499 * is an intrinsically slow and disruptive operation, so should 500 * be undertaken only in exceptional circumstances, ideally 501 * only when the queue is known not to be accessible by other 502 * threads. 503 * 504 * @param o element to be removed from this queue, if present 505 * @return {@code true} if this queue changed as a result of the call 506 */ 507 public boolean remove(Object o) { 508 if (o == null) return false; 509 final ReentrantLock lock = this.lock; 510 lock.lock(); 511 try { 512 if (count > 0) { 513 final Object[] items = this.items; 514 for (int i = takeIndex, end = putIndex, 515 to = (i < end) ? end : items.length; 516 ; i = 0, to = end) { 517 for (; i < to; i++) 518 if (o.equals(items[i])) { 519 removeAt(i); 520 return true; 521 } 522 if (to == end) break; 523 } 524 } 525 return false; 526 } finally { 527 lock.unlock(); 528 } 529 } 530 531 /** 532 * Returns {@code true} if this queue contains the specified element. 533 * More formally, returns {@code true} if and only if this queue contains 534 * at least one element {@code e} such that {@code o.equals(e)}. 535 * 536 * @param o object to be checked for containment in this queue 537 * @return {@code true} if this queue contains the specified element 538 */ 539 public boolean contains(Object o) { 540 if (o == null) return false; 541 final ReentrantLock lock = this.lock; 542 lock.lock(); 543 try { 544 if (count > 0) { 545 final Object[] items = this.items; 546 for (int i = takeIndex, end = putIndex, 547 to = (i < end) ? end : items.length; 548 ; i = 0, to = end) { 549 for (; i < to; i++) 550 if (o.equals(items[i])) 551 return true; 552 if (to == end) break; 553 } 554 } 555 return false; 556 } finally { 557 lock.unlock(); 558 } 559 } 560 561 /** 562 * Returns an array containing all of the elements in this queue, in 563 * proper sequence. 564 * 565 * <p>The returned array will be "safe" in that no references to it are 566 * maintained by this queue. (In other words, this method must allocate 567 * a new array). The caller is thus free to modify the returned array. 568 * 569 * <p>This method acts as bridge between array-based and collection-based 570 * APIs. 571 * 572 * @return an array containing all of the elements in this queue 573 */ 574 public Object[] toArray() { 575 final ReentrantLock lock = this.lock; 576 lock.lock(); 577 try { 578 final Object[] items = this.items; 579 final int end = takeIndex + count; 580 final Object[] a = Arrays.copyOfRange(items, takeIndex, end); 581 if (end != putIndex) 582 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex); 583 return a; 584 } finally { 585 lock.unlock(); 586 } 587 } 588 589 /** 590 * Returns an array containing all of the elements in this queue, in 591 * proper sequence; the runtime type of the returned array is that of 592 * the specified array. If the queue fits in the specified array, it 593 * is returned therein. Otherwise, a new array is allocated with the 594 * runtime type of the specified array and the size of this queue. 595 * 596 * <p>If this queue fits in the specified array with room to spare 597 * (i.e., the array has more elements than this queue), the element in 598 * the array immediately following the end of the queue is set to 599 * {@code null}. 600 * 601 * <p>Like the {@link #toArray()} method, this method acts as bridge between 602 * array-based and collection-based APIs. Further, this method allows 603 * precise control over the runtime type of the output array, and may, 604 * under certain circumstances, be used to save allocation costs. 605 * 606 * <p>Suppose {@code x} is a queue known to contain only strings. 607 * The following code can be used to dump the queue into a newly 608 * allocated array of {@code String}: 609 * 610 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 611 * 612 * Note that {@code toArray(new Object[0])} is identical in function to 613 * {@code toArray()}. 614 * 615 * @param a the array into which the elements of the queue are to 616 * be stored, if it is big enough; otherwise, a new array of the 617 * same runtime type is allocated for this purpose 618 * @return an array containing all of the elements in this queue 619 * @throws ArrayStoreException if the runtime type of the specified array 620 * is not a supertype of the runtime type of every element in 621 * this queue 622 * @throws NullPointerException if the specified array is null 623 */ 624 @SuppressWarnings("unchecked") 625 public <T> T[] toArray(T[] a) { 626 final ReentrantLock lock = this.lock; 627 lock.lock(); 628 try { 629 final Object[] items = this.items; 630 final int count = this.count; 631 final int firstLeg = Math.min(items.length - takeIndex, count); 632 if (a.length < count) { 633 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count, 634 a.getClass()); 635 } else { 636 System.arraycopy(items, takeIndex, a, 0, firstLeg); 637 if (a.length > count) 638 a[count] = null; 639 } 640 if (firstLeg < count) 641 System.arraycopy(items, 0, a, firstLeg, putIndex); 642 return a; 643 } finally { 644 lock.unlock(); 645 } 646 } 647 648 public String toString() { 649 return Helpers.collectionToString(this); 650 } 651 652 /** 653 * Atomically removes all of the elements from this queue. 654 * The queue will be empty after this call returns. 655 */ 656 public void clear() { 657 final ReentrantLock lock = this.lock; 658 lock.lock(); 659 try { 660 int k; 661 if ((k = count) > 0) { 662 circularClear(items, takeIndex, putIndex); 663 takeIndex = putIndex; 664 count = 0; 665 if (itrs != null) 666 itrs.queueIsEmpty(); 667 for (; k > 0 && lock.hasWaiters(notFull); k--) 668 notFull.signal(); 669 } 670 } finally { 671 lock.unlock(); 672 } 673 } 674 675 /** 676 * Nulls out slots starting at array index i, upto index end. 677 * Condition i == end means "full" - the entire array is cleared. 678 */ 679 private static void circularClear(Object[] items, int i, int end) { 680 // assert 0 <= i && i < items.length; 681 // assert 0 <= end && end < items.length; 682 for (int to = (i < end) ? end : items.length; 683 ; i = 0, to = end) { 684 for (; i < to; i++) items[i] = null; 685 if (to == end) break; 686 } 687 } 688 689 /** 690 * @throws UnsupportedOperationException {@inheritDoc} 691 * @throws ClassCastException {@inheritDoc} 692 * @throws NullPointerException {@inheritDoc} 693 * @throws IllegalArgumentException {@inheritDoc} 694 */ 695 public int drainTo(Collection<? super E> c) { 696 return drainTo(c, Integer.MAX_VALUE); 697 } 698 699 /** 700 * @throws UnsupportedOperationException {@inheritDoc} 701 * @throws ClassCastException {@inheritDoc} 702 * @throws NullPointerException {@inheritDoc} 703 * @throws IllegalArgumentException {@inheritDoc} 704 */ 705 public int drainTo(Collection<? super E> c, int maxElements) { 706 Objects.requireNonNull(c); 707 if (c == this) 708 throw new IllegalArgumentException(); 709 if (maxElements <= 0) 710 return 0; 711 final Object[] items = this.items; 712 final ReentrantLock lock = this.lock; 713 lock.lock(); 714 try { 715 int n = Math.min(maxElements, count); 716 int take = takeIndex; 717 int i = 0; 718 try { 719 while (i < n) { 720 @SuppressWarnings("unchecked") 721 E e = (E) items[take]; 722 c.add(e); 723 items[take] = null; 724 if (++take == items.length) take = 0; 725 i++; 726 } 727 return n; 728 } finally { 729 // Restore invariants even if c.add() threw 730 if (i > 0) { 731 count -= i; 732 takeIndex = take; 733 if (itrs != null) { 734 if (count == 0) 735 itrs.queueIsEmpty(); 736 else if (i > take) 737 itrs.takeIndexWrapped(); 738 } 739 for (; i > 0 && lock.hasWaiters(notFull); i--) 740 notFull.signal(); 741 } 742 } 743 } finally { 744 lock.unlock(); 745 } 746 } 747 748 /** 749 * Returns an iterator over the elements in this queue in proper sequence. 750 * The elements will be returned in order from first (head) to last (tail). 751 * 752 * <p>The returned iterator is 753 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 754 * 755 * @return an iterator over the elements in this queue in proper sequence 756 */ 757 public Iterator<E> iterator() { 758 return new Itr(); 759 } 760 761 /** 762 * Shared data between iterators and their queue, allowing queue 763 * modifications to update iterators when elements are removed. 764 * 765 * This adds a lot of complexity for the sake of correctly 766 * handling some uncommon operations, but the combination of 767 * circular-arrays and supporting interior removes (i.e., those 768 * not at head) would cause iterators to sometimes lose their 769 * places and/or (re)report elements they shouldn't. To avoid 770 * this, when a queue has one or more iterators, it keeps iterator 771 * state consistent by: 772 * 773 * (1) keeping track of the number of "cycles", that is, the 774 * number of times takeIndex has wrapped around to 0. 775 * (2) notifying all iterators via the callback removedAt whenever 776 * an interior element is removed (and thus other elements may 777 * be shifted). 778 * 779 * These suffice to eliminate iterator inconsistencies, but 780 * unfortunately add the secondary responsibility of maintaining 781 * the list of iterators. We track all active iterators in a 782 * simple linked list (accessed only when the queue's lock is 783 * held) of weak references to Itr. The list is cleaned up using 784 * 3 different mechanisms: 785 * 786 * (1) Whenever a new iterator is created, do some O(1) checking for 787 * stale list elements. 788 * 789 * (2) Whenever takeIndex wraps around to 0, check for iterators 790 * that have been unused for more than one wrap-around cycle. 791 * 792 * (3) Whenever the queue becomes empty, all iterators are notified 793 * and this entire data structure is discarded. 794 * 795 * So in addition to the removedAt callback that is necessary for 796 * correctness, iterators have the shutdown and takeIndexWrapped 797 * callbacks that help remove stale iterators from the list. 798 * 799 * Whenever a list element is examined, it is expunged if either 800 * the GC has determined that the iterator is discarded, or if the 801 * iterator reports that it is "detached" (does not need any 802 * further state updates). Overhead is maximal when takeIndex 803 * never advances, iterators are discarded before they are 804 * exhausted, and all removals are interior removes, in which case 805 * all stale iterators are discovered by the GC. But even in this 806 * case we don't increase the amortized complexity. 807 * 808 * Care must be taken to keep list sweeping methods from 809 * reentrantly invoking another such method, causing subtle 810 * corruption bugs. 811 */ 812 class Itrs { 813 814 /** 815 * Node in a linked list of weak iterator references. 816 */ 817 private class Node extends WeakReference<Itr> { 818 Node next; 819 820 Node(Itr iterator, Node next) { 821 super(iterator); 822 this.next = next; 823 } 824 } 825 826 /** Incremented whenever takeIndex wraps around to 0 */ 827 int cycles; 828 829 /** Linked list of weak iterator references */ 830 private Node head; 831 832 /** Used to expunge stale iterators */ 833 private Node sweeper; 834 835 private static final int SHORT_SWEEP_PROBES = 4; 836 private static final int LONG_SWEEP_PROBES = 16; 837 838 Itrs(Itr initial) { 839 register(initial); 840 } 841 842 /** 843 * Sweeps itrs, looking for and expunging stale iterators. 844 * If at least one was found, tries harder to find more. 845 * Called only from iterating thread. 846 * 847 * @param tryHarder whether to start in try-harder mode, because 848 * there is known to be at least one iterator to collect 849 */ 850 void doSomeSweeping(boolean tryHarder) { 851 // assert lock.isHeldByCurrentThread(); 852 // assert head != null; 853 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; 854 Node o, p; 855 final Node sweeper = this.sweeper; 856 boolean passedGo; // to limit search to one full sweep 857 858 if (sweeper == null) { 859 o = null; 860 p = head; 861 passedGo = true; 862 } else { 863 o = sweeper; 864 p = o.next; 865 passedGo = false; 866 } 867 868 for (; probes > 0; probes--) { 869 if (p == null) { 870 if (passedGo) 871 break; 872 o = null; 873 p = head; 874 passedGo = true; 875 } 876 final Itr it = p.get(); 877 final Node next = p.next; 878 if (it == null || it.isDetached()) { 879 // found a discarded/exhausted iterator 880 probes = LONG_SWEEP_PROBES; // "try harder" 881 // unlink p 882 p.clear(); 883 p.next = null; 884 if (o == null) { 885 head = next; 886 if (next == null) { 887 // We've run out of iterators to track; retire 888 itrs = null; 889 return; 890 } 891 } 892 else 893 o.next = next; 894 } else { 895 o = p; 896 } 897 p = next; 898 } 899 900 this.sweeper = (p == null) ? null : o; 901 } 902 903 /** 904 * Adds a new iterator to the linked list of tracked iterators. 905 */ 906 void register(Itr itr) { 907 // assert lock.isHeldByCurrentThread(); 908 head = new Node(itr, head); 909 } 910 911 /** 912 * Called whenever takeIndex wraps around to 0. 913 * 914 * Notifies all iterators, and expunges any that are now stale. 915 */ 916 void takeIndexWrapped() { 917 // assert lock.isHeldByCurrentThread(); 918 cycles++; 919 for (Node o = null, p = head; p != null;) { 920 final Itr it = p.get(); 921 final Node next = p.next; 922 if (it == null || it.takeIndexWrapped()) { 923 // unlink p 924 // assert it == null || it.isDetached(); 925 p.clear(); 926 p.next = null; 927 if (o == null) 928 head = next; 929 else 930 o.next = next; 931 } else { 932 o = p; 933 } 934 p = next; 935 } 936 if (head == null) // no more iterators to track 937 itrs = null; 938 } 939 940 /** 941 * Called whenever an interior remove (not at takeIndex) occurred. 942 * 943 * Notifies all iterators, and expunges any that are now stale. 944 */ 945 void removedAt(int removedIndex) { 946 for (Node o = null, p = head; p != null;) { 947 final Itr it = p.get(); 948 final Node next = p.next; 949 if (it == null || it.removedAt(removedIndex)) { 950 // unlink p 951 // assert it == null || it.isDetached(); 952 p.clear(); 953 p.next = null; 954 if (o == null) 955 head = next; 956 else 957 o.next = next; 958 } else { 959 o = p; 960 } 961 p = next; 962 } 963 if (head == null) // no more iterators to track 964 itrs = null; 965 } 966 967 /** 968 * Called whenever the queue becomes empty. 969 * 970 * Notifies all active iterators that the queue is empty, 971 * clears all weak refs, and unlinks the itrs datastructure. 972 */ 973 void queueIsEmpty() { 974 // assert lock.isHeldByCurrentThread(); 975 for (Node p = head; p != null; p = p.next) { 976 Itr it = p.get(); 977 if (it != null) { 978 p.clear(); 979 it.shutdown(); 980 } 981 } 982 head = null; 983 itrs = null; 984 } 985 986 /** 987 * Called whenever an element has been dequeued (at takeIndex). 988 */ 989 void elementDequeued() { 990 // assert lock.isHeldByCurrentThread(); 991 if (count == 0) 992 queueIsEmpty(); 993 else if (takeIndex == 0) 994 takeIndexWrapped(); 995 } 996 } 997 998 /** 999 * Iterator for ArrayBlockingQueue. 1000 * 1001 * To maintain weak consistency with respect to puts and takes, we 1002 * read ahead one slot, so as to not report hasNext true but then 1003 * not have an element to return. 1004 * 1005 * We switch into "detached" mode (allowing prompt unlinking from 1006 * itrs without help from the GC) when all indices are negative, or 1007 * when hasNext returns false for the first time. This allows the 1008 * iterator to track concurrent updates completely accurately, 1009 * except for the corner case of the user calling Iterator.remove() 1010 * after hasNext() returned false. Even in this case, we ensure 1011 * that we don't remove the wrong element by keeping track of the 1012 * expected element to remove, in lastItem. Yes, we may fail to 1013 * remove lastItem from the queue if it moved due to an interleaved 1014 * interior remove while in detached mode. 1015 * 1016 * Method forEachRemaining, added in Java 8, is treated similarly 1017 * to hasNext returning false, in that we switch to detached mode, 1018 * but we regard it as an even stronger request to "close" this 1019 * iteration, and don't bother supporting subsequent remove(). 1020 */ 1021 private class Itr implements Iterator<E> { 1022 /** Index to look for new nextItem; NONE at end */ 1023 private int cursor; 1024 1025 /** Element to be returned by next call to next(); null if none */ 1026 private E nextItem; 1027 1028 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ 1029 private int nextIndex; 1030 1031 /** Last element returned; null if none or not detached. */ 1032 private E lastItem; 1033 1034 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ 1035 private int lastRet; 1036 1037 /** Previous value of takeIndex, or DETACHED when detached */ 1038 private int prevTakeIndex; 1039 1040 /** Previous value of iters.cycles */ 1041 private int prevCycles; 1042 1043 /** Special index value indicating "not available" or "undefined" */ 1044 private static final int NONE = -1; 1045 1046 /** 1047 * Special index value indicating "removed elsewhere", that is, 1048 * removed by some operation other than a call to this.remove(). 1049 */ 1050 private static final int REMOVED = -2; 1051 1052 /** Special value for prevTakeIndex indicating "detached mode" */ 1053 private static final int DETACHED = -3; 1054 1055 Itr() { 1056 lastRet = NONE; 1057 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1058 lock.lock(); 1059 try { 1060 if (count == 0) { 1061 // assert itrs == null; 1062 cursor = NONE; 1063 nextIndex = NONE; 1064 prevTakeIndex = DETACHED; 1065 } else { 1066 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1067 prevTakeIndex = takeIndex; 1068 nextItem = itemAt(nextIndex = takeIndex); 1069 cursor = incCursor(takeIndex); 1070 if (itrs == null) { 1071 itrs = new Itrs(this); 1072 } else { 1073 itrs.register(this); // in this order 1074 itrs.doSomeSweeping(false); 1075 } 1076 prevCycles = itrs.cycles; 1077 // assert takeIndex >= 0; 1078 // assert prevTakeIndex == takeIndex; 1079 // assert nextIndex >= 0; 1080 // assert nextItem != null; 1081 } 1082 } finally { 1083 lock.unlock(); 1084 } 1085 } 1086 1087 boolean isDetached() { 1088 // assert lock.isHeldByCurrentThread(); 1089 return prevTakeIndex < 0; 1090 } 1091 1092 private int incCursor(int index) { 1093 // assert lock.isHeldByCurrentThread(); 1094 if (++index == items.length) index = 0; 1095 if (index == putIndex) index = NONE; 1096 return index; 1097 } 1098 1099 /** 1100 * Returns true if index is invalidated by the given number of 1101 * dequeues, starting from prevTakeIndex. 1102 */ 1103 private boolean invalidated(int index, int prevTakeIndex, 1104 long dequeues, int length) { 1105 if (index < 0) 1106 return false; 1107 int distance = index - prevTakeIndex; 1108 if (distance < 0) 1109 distance += length; 1110 return dequeues > distance; 1111 } 1112 1113 /** 1114 * Adjusts indices to incorporate all dequeues since the last 1115 * operation on this iterator. Call only from iterating thread. 1116 */ 1117 private void incorporateDequeues() { 1118 // assert lock.isHeldByCurrentThread(); 1119 // assert itrs != null; 1120 // assert !isDetached(); 1121 // assert count > 0; 1122 1123 final int cycles = itrs.cycles; 1124 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1125 final int prevCycles = this.prevCycles; 1126 final int prevTakeIndex = this.prevTakeIndex; 1127 1128 if (cycles != prevCycles || takeIndex != prevTakeIndex) { 1129 final int len = items.length; 1130 // how far takeIndex has advanced since the previous 1131 // operation of this iterator 1132 long dequeues = (cycles - prevCycles) * len 1133 + (takeIndex - prevTakeIndex); 1134 1135 // Check indices for invalidation 1136 if (invalidated(lastRet, prevTakeIndex, dequeues, len)) 1137 lastRet = REMOVED; 1138 if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) 1139 nextIndex = REMOVED; 1140 if (invalidated(cursor, prevTakeIndex, dequeues, len)) 1141 cursor = takeIndex; 1142 1143 if (cursor < 0 && nextIndex < 0 && lastRet < 0) 1144 detach(); 1145 else { 1146 this.prevCycles = cycles; 1147 this.prevTakeIndex = takeIndex; 1148 } 1149 } 1150 } 1151 1152 /** 1153 * Called when itrs should stop tracking this iterator, either 1154 * because there are no more indices to update (cursor < 0 && 1155 * nextIndex < 0 && lastRet < 0) or as a special exception, when 1156 * lastRet >= 0, because hasNext() is about to return false for the 1157 * first time. Call only from iterating thread. 1158 */ 1159 private void detach() { 1160 // Switch to detached mode 1161 // assert lock.isHeldByCurrentThread(); 1162 // assert cursor == NONE; 1163 // assert nextIndex < 0; 1164 // assert lastRet < 0 || nextItem == null; 1165 // assert lastRet < 0 ^ lastItem != null; 1166 if (prevTakeIndex >= 0) { 1167 // assert itrs != null; 1168 prevTakeIndex = DETACHED; 1169 // try to unlink from itrs (but not too hard) 1170 itrs.doSomeSweeping(true); 1171 } 1172 } 1173 1174 /** 1175 * For performance reasons, we would like not to acquire a lock in 1176 * hasNext in the common case. To allow for this, we only access 1177 * fields (i.e. nextItem) that are not modified by update operations 1178 * triggered by queue modifications. 1179 */ 1180 public boolean hasNext() { 1181 if (nextItem != null) 1182 return true; 1183 noNext(); 1184 return false; 1185 } 1186 1187 private void noNext() { 1188 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1189 lock.lock(); 1190 try { 1191 // assert cursor == NONE; 1192 // assert nextIndex == NONE; 1193 if (!isDetached()) { 1194 // assert lastRet >= 0; 1195 incorporateDequeues(); // might update lastRet 1196 if (lastRet >= 0) { 1197 lastItem = itemAt(lastRet); 1198 // assert lastItem != null; 1199 detach(); 1200 } 1201 } 1202 // assert isDetached(); 1203 // assert lastRet < 0 ^ lastItem != null; 1204 } finally { 1205 lock.unlock(); 1206 } 1207 } 1208 1209 public E next() { 1210 final E e = nextItem; 1211 if (e == null) 1212 throw new NoSuchElementException(); 1213 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1214 lock.lock(); 1215 try { 1216 if (!isDetached()) 1217 incorporateDequeues(); 1218 // assert nextIndex != NONE; 1219 // assert lastItem == null; 1220 lastRet = nextIndex; 1221 final int cursor = this.cursor; 1222 if (cursor >= 0) { 1223 nextItem = itemAt(nextIndex = cursor); 1224 // assert nextItem != null; 1225 this.cursor = incCursor(cursor); 1226 } else { 1227 nextIndex = NONE; 1228 nextItem = null; 1229 if (lastRet == REMOVED) detach(); 1230 } 1231 } finally { 1232 lock.unlock(); 1233 } 1234 return e; 1235 } 1236 1237 public void forEachRemaining(Consumer<? super E> action) { 1238 Objects.requireNonNull(action); 1239 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1240 lock.lock(); 1241 try { 1242 final E e = nextItem; 1243 if (e == null) return; 1244 if (!isDetached()) 1245 incorporateDequeues(); 1246 action.accept(e); 1247 if (isDetached() || cursor < 0) return; 1248 final Object[] items = ArrayBlockingQueue.this.items; 1249 for (int i = cursor, end = putIndex, 1250 to = (i < end) ? end : items.length; 1251 ; i = 0, to = end) { 1252 for (; i < to; i++) 1253 action.accept(itemAt(items, i)); 1254 if (to == end) break; 1255 } 1256 } finally { 1257 // Calling forEachRemaining is a strong hint that this 1258 // iteration is surely over; supporting remove() after 1259 // forEachRemaining() is more trouble than it's worth 1260 cursor = nextIndex = lastRet = NONE; 1261 nextItem = lastItem = null; 1262 detach(); 1263 lock.unlock(); 1264 } 1265 } 1266 1267 public void remove() { 1268 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1269 lock.lock(); 1270 // assert lock.getHoldCount() == 1; 1271 try { 1272 if (!isDetached()) 1273 incorporateDequeues(); // might update lastRet or detach 1274 final int lastRet = this.lastRet; 1275 this.lastRet = NONE; 1276 if (lastRet >= 0) { 1277 if (!isDetached()) 1278 removeAt(lastRet); 1279 else { 1280 final E lastItem = this.lastItem; 1281 // assert lastItem != null; 1282 this.lastItem = null; 1283 if (itemAt(lastRet) == lastItem) 1284 removeAt(lastRet); 1285 } 1286 } else if (lastRet == NONE) 1287 throw new IllegalStateException(); 1288 // else lastRet == REMOVED and the last returned element was 1289 // previously asynchronously removed via an operation other 1290 // than this.remove(), so nothing to do. 1291 1292 if (cursor < 0 && nextIndex < 0) 1293 detach(); 1294 } finally { 1295 lock.unlock(); 1296 // assert lastRet == NONE; 1297 // assert lastItem == null; 1298 } 1299 } 1300 1301 /** 1302 * Called to notify the iterator that the queue is empty, or that it 1303 * has fallen hopelessly behind, so that it should abandon any 1304 * further iteration, except possibly to return one more element 1305 * from next(), as promised by returning true from hasNext(). 1306 */ 1307 void shutdown() { 1308 // assert lock.isHeldByCurrentThread(); 1309 cursor = NONE; 1310 if (nextIndex >= 0) 1311 nextIndex = REMOVED; 1312 if (lastRet >= 0) { 1313 lastRet = REMOVED; 1314 lastItem = null; 1315 } 1316 prevTakeIndex = DETACHED; 1317 // Don't set nextItem to null because we must continue to be 1318 // able to return it on next(). 1319 // 1320 // Caller will unlink from itrs when convenient. 1321 } 1322 1323 private int distance(int index, int prevTakeIndex, int length) { 1324 int distance = index - prevTakeIndex; 1325 if (distance < 0) 1326 distance += length; 1327 return distance; 1328 } 1329 1330 /** 1331 * Called whenever an interior remove (not at takeIndex) occurred. 1332 * 1333 * @return true if this iterator should be unlinked from itrs 1334 */ 1335 boolean removedAt(int removedIndex) { 1336 // assert lock.isHeldByCurrentThread(); 1337 if (isDetached()) 1338 return true; 1339 1340 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1341 final int prevTakeIndex = this.prevTakeIndex; 1342 final int len = items.length; 1343 // distance from prevTakeIndex to removedIndex 1344 final int removedDistance = 1345 len * (itrs.cycles - this.prevCycles 1346 + ((removedIndex < takeIndex) ? 1 : 0)) 1347 + (removedIndex - prevTakeIndex); 1348 // assert itrs.cycles - this.prevCycles >= 0; 1349 // assert itrs.cycles - this.prevCycles <= 1; 1350 // assert removedDistance > 0; 1351 // assert removedIndex != takeIndex; 1352 int cursor = this.cursor; 1353 if (cursor >= 0) { 1354 int x = distance(cursor, prevTakeIndex, len); 1355 if (x == removedDistance) { 1356 if (cursor == putIndex) 1357 this.cursor = cursor = NONE; 1358 } 1359 else if (x > removedDistance) { 1360 // assert cursor != prevTakeIndex; 1361 this.cursor = cursor = dec(cursor, len); 1362 } 1363 } 1364 int lastRet = this.lastRet; 1365 if (lastRet >= 0) { 1366 int x = distance(lastRet, prevTakeIndex, len); 1367 if (x == removedDistance) 1368 this.lastRet = lastRet = REMOVED; 1369 else if (x > removedDistance) 1370 this.lastRet = lastRet = dec(lastRet, len); 1371 } 1372 int nextIndex = this.nextIndex; 1373 if (nextIndex >= 0) { 1374 int x = distance(nextIndex, prevTakeIndex, len); 1375 if (x == removedDistance) 1376 this.nextIndex = nextIndex = REMOVED; 1377 else if (x > removedDistance) 1378 this.nextIndex = nextIndex = dec(nextIndex, len); 1379 } 1380 if (cursor < 0 && nextIndex < 0 && lastRet < 0) { 1381 this.prevTakeIndex = DETACHED; 1382 return true; 1383 } 1384 return false; 1385 } 1386 1387 /** 1388 * Called whenever takeIndex wraps around to zero. 1389 * 1390 * @return true if this iterator should be unlinked from itrs 1391 */ 1392 boolean takeIndexWrapped() { 1393 // assert lock.isHeldByCurrentThread(); 1394 if (isDetached()) 1395 return true; 1396 if (itrs.cycles - prevCycles > 1) { 1397 // All the elements that existed at the time of the last 1398 // operation are gone, so abandon further iteration. 1399 shutdown(); 1400 return true; 1401 } 1402 return false; 1403 } 1404 1405 // /** Uncomment for debugging. */ 1406 // public String toString() { 1407 // return ("cursor=" + cursor + " " + 1408 // "nextIndex=" + nextIndex + " " + 1409 // "lastRet=" + lastRet + " " + 1410 // "nextItem=" + nextItem + " " + 1411 // "lastItem=" + lastItem + " " + 1412 // "prevCycles=" + prevCycles + " " + 1413 // "prevTakeIndex=" + prevTakeIndex + " " + 1414 // "size()=" + size() + " " + 1415 // "remainingCapacity()=" + remainingCapacity()); 1416 // } 1417 } 1418 1419 /** 1420 * Returns a {@link Spliterator} over the elements in this queue. 1421 * 1422 * <p>The returned spliterator is 1423 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 1424 * 1425 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 1426 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 1427 * 1428 * @implNote 1429 * The {@code Spliterator} implements {@code trySplit} to permit limited 1430 * parallelism. 1431 * 1432 * @return a {@code Spliterator} over the elements in this queue 1433 * @since 1.8 1434 */ 1435 public Spliterator<E> spliterator() { 1436 return Spliterators.spliterator 1437 (this, (Spliterator.ORDERED | 1438 Spliterator.NONNULL | 1439 Spliterator.CONCURRENT)); 1440 } 1441 1442 /** 1443 * @throws NullPointerException {@inheritDoc} 1444 */ 1445 public void forEach(Consumer<? super E> action) { 1446 Objects.requireNonNull(action); 1447 final ReentrantLock lock = this.lock; 1448 lock.lock(); 1449 try { 1450 if (count > 0) { 1451 final Object[] items = this.items; 1452 for (int i = takeIndex, end = putIndex, 1453 to = (i < end) ? end : items.length; 1454 ; i = 0, to = end) { 1455 for (; i < to; i++) 1456 action.accept(itemAt(items, i)); 1457 if (to == end) break; 1458 } 1459 } 1460 } finally { 1461 lock.unlock(); 1462 } 1463 } 1464 1465 /** 1466 * @throws NullPointerException {@inheritDoc} 1467 */ 1468 public boolean removeIf(Predicate<? super E> filter) { 1469 Objects.requireNonNull(filter); 1470 return bulkRemove(filter); 1471 } 1472 1473 /** 1474 * @throws NullPointerException {@inheritDoc} 1475 */ 1476 public boolean removeAll(Collection<?> c) { 1477 Objects.requireNonNull(c); 1478 return bulkRemove(e -> c.contains(e)); 1479 } 1480 1481 /** 1482 * @throws NullPointerException {@inheritDoc} 1483 */ 1484 public boolean retainAll(Collection<?> c) { 1485 Objects.requireNonNull(c); 1486 return bulkRemove(e -> !c.contains(e)); 1487 } 1488 1489 /** Implementation of bulk remove methods. */ 1490 private boolean bulkRemove(Predicate<? super E> filter) { 1491 final ReentrantLock lock = this.lock; 1492 lock.lock(); 1493 try { 1494 if (itrs == null) { // check for active iterators 1495 if (count > 0) { 1496 final Object[] items = this.items; 1497 // Optimize for initial run of survivors 1498 for (int i = takeIndex, end = putIndex, 1499 to = (i < end) ? end : items.length; 1500 ; i = 0, to = end) { 1501 for (; i < to; i++) 1502 if (filter.test(itemAt(items, i))) 1503 return bulkRemoveModified(filter, i); 1504 if (to == end) break; 1505 } 1506 } 1507 return false; 1508 } 1509 } finally { 1510 lock.unlock(); 1511 } 1512 // Active iterators are too hairy! 1513 // Punting (for now) to the slow n^2 algorithm ... 1514 return super.removeIf(filter); 1515 } 1516 1517 // A tiny bit set implementation 1518 1519 private static long[] nBits(int n) { 1520 return new long[((n - 1) >> 6) + 1]; 1521 } 1522 private static void setBit(long[] bits, int i) { 1523 bits[i >> 6] |= 1L << i; 1524 } 1525 private static boolean isClear(long[] bits, int i) { 1526 return (bits[i >> 6] & (1L << i)) == 0; 1527 } 1528 1529 /** 1530 * Returns circular distance from i to j, disambiguating i == j to 1531 * items.length; never returns 0. 1532 */ 1533 private int distanceNonEmpty(int i, int j) { 1534 if ((j -= i) <= 0) j += items.length; 1535 return j; 1536 } 1537 1538 /** 1539 * Helper for bulkRemove, in case of at least one deletion. 1540 * Tolerate predicates that reentrantly access the collection for 1541 * read (but not write), so traverse once to find elements to 1542 * delete, a second pass to physically expunge. 1543 * 1544 * @param beg valid index of first element to be deleted 1545 */ 1546 private boolean bulkRemoveModified( 1547 Predicate<? super E> filter, final int beg) { 1548 final Object[] es = items; 1549 final int capacity = items.length; 1550 final int end = putIndex; 1551 final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex)); 1552 deathRow[0] = 1L; // set bit 0 1553 for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg; 1554 ; i = 0, to = end, k -= capacity) { 1555 for (; i < to; i++) 1556 if (filter.test(itemAt(es, i))) 1557 setBit(deathRow, i - k); 1558 if (to == end) break; 1559 } 1560 // a two-finger traversal, with hare i reading, tortoise w writing 1561 int w = beg; 1562 for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg; 1563 ; w = 0) { // w rejoins i on second leg 1564 // In this loop, i and w are on the same leg, with i > w 1565 for (; i < to; i++) 1566 if (isClear(deathRow, i - k)) 1567 es[w++] = es[i]; 1568 if (to == end) break; 1569 // In this loop, w is on the first leg, i on the second 1570 for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++) 1571 if (isClear(deathRow, i - k)) 1572 es[w++] = es[i]; 1573 if (i >= to) { 1574 if (w == capacity) w = 0; // "corner" case 1575 break; 1576 } 1577 } 1578 count -= distanceNonEmpty(w, end); 1579 circularClear(es, putIndex = w, end); 1580 return true; 1581 } 1582 1583 /** debugging */ 1584 void checkInvariants() { 1585 // meta-assertions 1586 // assert lock.isHeldByCurrentThread(); 1587 try { 1588 // Unlike ArrayDeque, we have a count field but no spare slot. 1589 // We prefer ArrayDeque's strategy (and the names of its fields!), 1590 // but our field layout is baked into the serial form, and so is 1591 // too annoying to change. 1592 // 1593 // putIndex == takeIndex must be disambiguated by checking count. 1594 int capacity = items.length; 1595 // assert capacity > 0; 1596 // assert takeIndex >= 0 && takeIndex < capacity; 1597 // assert putIndex >= 0 && putIndex < capacity; 1598 // assert count <= capacity; 1599 // assert takeIndex == putIndex || items[takeIndex] != null; 1600 // assert count == capacity || items[putIndex] == null; 1601 // assert takeIndex == putIndex || items[dec(putIndex, capacity)] != null; 1602 } catch (Throwable t) { 1603 System.err.printf("takeIndex=%d putIndex=%d count=%d capacity=%d%n", 1604 takeIndex, putIndex, count, items.length); 1605 System.err.printf("items=%s%n", 1606 Arrays.toString(items)); 1607 throw t; 1608 } 1609 } 1610 1611 }