Print this page


Split Close
Expand all
Collapse all
          --- old/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java
          +++ new/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java
↓ open down ↓ 41 lines elided ↑ open up ↑
  42   42   * array.  This queue orders elements FIFO (first-in-first-out).  The
  43   43   * <em>head</em> of the queue is that element that has been on the
  44   44   * queue the longest time.  The <em>tail</em> of the queue is that
  45   45   * element that has been on the queue the shortest time. New elements
  46   46   * are inserted at the tail of the queue, and the queue retrieval
  47   47   * operations obtain elements at the head of the queue.
  48   48   *
  49   49   * <p>This is a classic &quot;bounded buffer&quot;, in which a
  50   50   * fixed-sized array holds elements inserted by producers and
  51   51   * extracted by consumers.  Once created, the capacity cannot be
  52      - * increased.  Attempts to <tt>put</tt> an element into a full queue
  53      - * will result in the operation blocking; attempts to <tt>take</tt> an
       52 + * changed.  Attempts to {@code put} an element into a full queue
       53 + * will result in the operation blocking; attempts to {@code take} an
  54   54   * element from an empty queue will similarly block.
  55   55   *
  56      - * <p> This class supports an optional fairness policy for ordering
       56 + * <p>This class supports an optional fairness policy for ordering
  57   57   * waiting producer and consumer threads.  By default, this ordering
  58   58   * is not guaranteed. However, a queue constructed with fairness set
  59      - * to <tt>true</tt> grants threads access in FIFO order. Fairness
       59 + * to {@code true} grants threads access in FIFO order. Fairness
  60   60   * generally decreases throughput but reduces variability and avoids
  61   61   * starvation.
  62   62   *
  63   63   * <p>This class and its iterator implement all of the
  64   64   * <em>optional</em> methods of the {@link Collection} and {@link
  65   65   * Iterator} interfaces.
  66   66   *
  67   67   * <p>This class is a member of the
  68   68   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  69   69   * Java Collections Framework</a>.
↓ open down ↓ 6 lines elided ↑ open up ↑
  76   76          implements BlockingQueue<E>, java.io.Serializable {
  77   77  
  78   78      /**
  79   79       * Serialization ID. This class relies on default serialization
  80   80       * even for the items array, which is default-serialized, even if
  81   81       * it is empty. Otherwise it could not be declared final, which is
  82   82       * necessary here.
  83   83       */
  84   84      private static final long serialVersionUID = -817911632652898426L;
  85   85  
  86      -    /** The queued items  */
  87      -    private final E[] items;
  88      -    /** items index for next take, poll or remove */
  89      -    private int takeIndex;
  90      -    /** items index for next put, offer, or add. */
  91      -    private int putIndex;
  92      -    /** Number of items in the queue */
  93      -    private int count;
       86 +    /** The queued items */
       87 +    final Object[] items;
  94   88  
       89 +    /** items index for next take, poll, peek or remove */
       90 +    int takeIndex;
       91 +
       92 +    /** items index for next put, offer, or add */
       93 +    int putIndex;
       94 +
       95 +    /** Number of elements in the queue */
       96 +    int count;
       97 +
  95   98      /*
  96   99       * Concurrency control uses the classic two-condition algorithm
  97  100       * found in any textbook.
  98  101       */
  99  102  
 100  103      /** Main lock guarding all access */
 101      -    private final ReentrantLock lock;
      104 +    final ReentrantLock lock;
 102  105      /** Condition for waiting takes */
 103  106      private final Condition notEmpty;
 104  107      /** Condition for waiting puts */
 105  108      private final Condition notFull;
 106  109  
 107  110      // Internal helper methods
 108  111  
 109  112      /**
 110  113       * Circularly increment i.
 111  114       */
 112  115      final int inc(int i) {
 113      -        return (++i == items.length)? 0 : i;
      116 +        return (++i == items.length) ? 0 : i;
 114  117      }
 115  118  
 116  119      /**
      120 +     * Circularly decrement i.
      121 +     */
      122 +    final int dec(int i) {
      123 +        return ((i == 0) ? items.length : i) - 1;
      124 +    }
      125 +
      126 +    @SuppressWarnings("unchecked")
      127 +    static <E> E cast(Object item) {
      128 +        return (E) item;
      129 +    }
      130 +
      131 +    /**
      132 +     * Returns item at index i.
      133 +     */
      134 +    final E itemAt(int i) {
      135 +        return this.<E>cast(items[i]);
      136 +    }
      137 +
      138 +    /**
      139 +     * Throws NullPointerException if argument is null.
      140 +     *
      141 +     * @param v the element
      142 +     */
      143 +    private static void checkNotNull(Object v) {
      144 +        if (v == null)
      145 +            throw new NullPointerException();
      146 +    }
      147 +
      148 +    /**
 117  149       * Inserts element at current put position, advances, and signals.
 118  150       * Call only when holding lock.
 119  151       */
 120  152      private void insert(E x) {
 121  153          items[putIndex] = x;
 122  154          putIndex = inc(putIndex);
 123  155          ++count;
 124  156          notEmpty.signal();
 125  157      }
 126  158  
 127  159      /**
 128  160       * Extracts element at current take position, advances, and signals.
 129  161       * Call only when holding lock.
 130  162       */
 131  163      private E extract() {
 132      -        final E[] items = this.items;
 133      -        E x = items[takeIndex];
      164 +        final Object[] items = this.items;
      165 +        E x = this.<E>cast(items[takeIndex]);
 134  166          items[takeIndex] = null;
 135  167          takeIndex = inc(takeIndex);
 136  168          --count;
 137  169          notFull.signal();
 138  170          return x;
 139  171      }
 140  172  
 141  173      /**
 142      -     * Utility for remove and iterator.remove: Delete item at position i.
      174 +     * Deletes item at position i.
      175 +     * Utility for remove and iterator.remove.
 143  176       * Call only when holding lock.
 144  177       */
 145  178      void removeAt(int i) {
 146      -        final E[] items = this.items;
      179 +        final Object[] items = this.items;
 147  180          // if removing front item, just advance
 148  181          if (i == takeIndex) {
 149  182              items[takeIndex] = null;
 150  183              takeIndex = inc(takeIndex);
 151  184          } else {
 152  185              // slide over all others up through putIndex.
 153  186              for (;;) {
 154  187                  int nexti = inc(i);
 155  188                  if (nexti != putIndex) {
 156  189                      items[i] = items[nexti];
↓ open down ↓ 3 lines elided ↑ open up ↑
 160  193                      putIndex = i;
 161  194                      break;
 162  195                  }
 163  196              }
 164  197          }
 165  198          --count;
 166  199          notFull.signal();
 167  200      }
 168  201  
 169  202      /**
 170      -     * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
      203 +     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 171  204       * capacity and default access policy.
 172  205       *
 173  206       * @param capacity the capacity of this queue
 174      -     * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
      207 +     * @throws IllegalArgumentException if {@code capacity < 1}
 175  208       */
 176  209      public ArrayBlockingQueue(int capacity) {
 177  210          this(capacity, false);
 178  211      }
 179  212  
 180  213      /**
 181      -     * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
      214 +     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 182  215       * capacity and the specified access policy.
 183  216       *
 184  217       * @param capacity the capacity of this queue
 185      -     * @param fair if <tt>true</tt> then queue accesses for threads blocked
      218 +     * @param fair if {@code true} then queue accesses for threads blocked
 186  219       *        on insertion or removal, are processed in FIFO order;
 187      -     *        if <tt>false</tt> the access order is unspecified.
 188      -     * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
      220 +     *        if {@code false} the access order is unspecified.
      221 +     * @throws IllegalArgumentException if {@code capacity < 1}
 189  222       */
 190  223      public ArrayBlockingQueue(int capacity, boolean fair) {
 191  224          if (capacity <= 0)
 192  225              throw new IllegalArgumentException();
 193      -        this.items = (E[]) new Object[capacity];
      226 +        this.items = new Object[capacity];
 194  227          lock = new ReentrantLock(fair);
 195  228          notEmpty = lock.newCondition();
 196  229          notFull =  lock.newCondition();
 197  230      }
 198  231  
 199  232      /**
 200      -     * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
      233 +     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 201  234       * capacity, the specified access policy and initially containing the
 202  235       * elements of the given collection,
 203  236       * added in traversal order of the collection's iterator.
 204  237       *
 205  238       * @param capacity the capacity of this queue
 206      -     * @param fair if <tt>true</tt> then queue accesses for threads blocked
      239 +     * @param fair if {@code true} then queue accesses for threads blocked
 207  240       *        on insertion or removal, are processed in FIFO order;
 208      -     *        if <tt>false</tt> the access order is unspecified.
      241 +     *        if {@code false} the access order is unspecified.
 209  242       * @param c the collection of elements to initially contain
 210      -     * @throws IllegalArgumentException if <tt>capacity</tt> is less than
 211      -     *         <tt>c.size()</tt>, or less than 1.
      243 +     * @throws IllegalArgumentException if {@code capacity} is less than
      244 +     *         {@code c.size()}, or less than 1.
 212  245       * @throws NullPointerException if the specified collection or any
 213  246       *         of its elements are null
 214  247       */
 215  248      public ArrayBlockingQueue(int capacity, boolean fair,
 216  249                                Collection<? extends E> c) {
 217  250          this(capacity, fair);
 218      -        if (capacity < c.size())
 219      -            throw new IllegalArgumentException();
 220  251  
 221      -        for (E e : c)
 222      -            add(e);
      252 +        final ReentrantLock lock = this.lock;
      253 +        lock.lock(); // Lock only for visibility, not mutual exclusion
      254 +        try {
      255 +            int i = 0;
      256 +            try {
      257 +                for (E e : c) {
      258 +                    checkNotNull(e);
      259 +                    items[i++] = e;
      260 +                }
      261 +            } catch (ArrayIndexOutOfBoundsException ex) {
      262 +                throw new IllegalArgumentException();
      263 +            }
      264 +            count = i;
      265 +            putIndex = (i == capacity) ? 0 : i;
      266 +        } finally {
      267 +            lock.unlock();
      268 +        }
 223  269      }
 224  270  
 225  271      /**
 226  272       * Inserts the specified element at the tail of this queue if it is
 227  273       * possible to do so immediately without exceeding the queue's capacity,
 228      -     * returning <tt>true</tt> upon success and throwing an
 229      -     * <tt>IllegalStateException</tt> if this queue is full.
      274 +     * returning {@code true} upon success and throwing an
      275 +     * {@code IllegalStateException} if this queue is full.
 230  276       *
 231  277       * @param e the element to add
 232      -     * @return <tt>true</tt> (as specified by {@link Collection#add})
      278 +     * @return {@code true} (as specified by {@link Collection#add})
 233  279       * @throws IllegalStateException if this queue is full
 234  280       * @throws NullPointerException if the specified element is null
 235  281       */
 236  282      public boolean add(E e) {
 237  283          return super.add(e);
 238  284      }
 239  285  
 240  286      /**
 241  287       * Inserts the specified element at the tail of this queue if it is
 242  288       * possible to do so immediately without exceeding the queue's capacity,
 243      -     * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
      289 +     * returning {@code true} upon success and {@code false} if this queue
 244  290       * is full.  This method is generally preferable to method {@link #add},
 245  291       * which can fail to insert an element only by throwing an exception.
 246  292       *
 247  293       * @throws NullPointerException if the specified element is null
 248  294       */
 249  295      public boolean offer(E e) {
 250      -        if (e == null) throw new NullPointerException();
      296 +        checkNotNull(e);
 251  297          final ReentrantLock lock = this.lock;
 252  298          lock.lock();
 253  299          try {
 254  300              if (count == items.length)
 255  301                  return false;
 256  302              else {
 257  303                  insert(e);
 258  304                  return true;
 259  305              }
 260  306          } finally {
↓ open down ↓ 2 lines elided ↑ open up ↑
 263  309      }
 264  310  
 265  311      /**
 266  312       * Inserts the specified element at the tail of this queue, waiting
 267  313       * for space to become available if the queue is full.
 268  314       *
 269  315       * @throws InterruptedException {@inheritDoc}
 270  316       * @throws NullPointerException {@inheritDoc}
 271  317       */
 272  318      public void put(E e) throws InterruptedException {
 273      -        if (e == null) throw new NullPointerException();
 274      -        final E[] items = this.items;
      319 +        checkNotNull(e);
 275  320          final ReentrantLock lock = this.lock;
 276  321          lock.lockInterruptibly();
 277  322          try {
 278      -            try {
 279      -                while (count == items.length)
 280      -                    notFull.await();
 281      -            } catch (InterruptedException ie) {
 282      -                notFull.signal(); // propagate to non-interrupted thread
 283      -                throw ie;
 284      -            }
      323 +            while (count == items.length)
      324 +                notFull.await();
 285  325              insert(e);
 286  326          } finally {
 287  327              lock.unlock();
 288  328          }
 289  329      }
 290  330  
 291  331      /**
 292  332       * Inserts the specified element at the tail of this queue, waiting
 293  333       * up to the specified wait time for space to become available if
 294  334       * the queue is full.
 295  335       *
 296  336       * @throws InterruptedException {@inheritDoc}
 297  337       * @throws NullPointerException {@inheritDoc}
 298  338       */
 299  339      public boolean offer(E e, long timeout, TimeUnit unit)
 300  340          throws InterruptedException {
 301  341  
 302      -        if (e == null) throw new NullPointerException();
      342 +        checkNotNull(e);
 303  343          long nanos = unit.toNanos(timeout);
 304  344          final ReentrantLock lock = this.lock;
 305  345          lock.lockInterruptibly();
 306  346          try {
 307      -            for (;;) {
 308      -                if (count != items.length) {
 309      -                    insert(e);
 310      -                    return true;
 311      -                }
      347 +            while (count == items.length) {
 312  348                  if (nanos <= 0)
 313  349                      return false;
 314      -                try {
 315      -                    nanos = notFull.awaitNanos(nanos);
 316      -                } catch (InterruptedException ie) {
 317      -                    notFull.signal(); // propagate to non-interrupted thread
 318      -                    throw ie;
 319      -                }
      350 +                nanos = notFull.awaitNanos(nanos);
 320  351              }
      352 +            insert(e);
      353 +            return true;
 321  354          } finally {
 322  355              lock.unlock();
 323  356          }
 324  357      }
 325  358  
 326  359      public E poll() {
 327  360          final ReentrantLock lock = this.lock;
 328  361          lock.lock();
 329  362          try {
 330      -            if (count == 0)
 331      -                return null;
 332      -            E x = extract();
 333      -            return x;
      363 +            return (count == 0) ? null : extract();
 334  364          } finally {
 335  365              lock.unlock();
 336  366          }
 337  367      }
 338  368  
 339  369      public E take() throws InterruptedException {
 340  370          final ReentrantLock lock = this.lock;
 341  371          lock.lockInterruptibly();
 342  372          try {
 343      -            try {
 344      -                while (count == 0)
 345      -                    notEmpty.await();
 346      -            } catch (InterruptedException ie) {
 347      -                notEmpty.signal(); // propagate to non-interrupted thread
 348      -                throw ie;
 349      -            }
 350      -            E x = extract();
 351      -            return x;
      373 +            while (count == 0)
      374 +                notEmpty.await();
      375 +            return extract();
 352  376          } finally {
 353  377              lock.unlock();
 354  378          }
 355  379      }
 356  380  
 357  381      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 358  382          long nanos = unit.toNanos(timeout);
 359  383          final ReentrantLock lock = this.lock;
 360  384          lock.lockInterruptibly();
 361  385          try {
 362      -            for (;;) {
 363      -                if (count != 0) {
 364      -                    E x = extract();
 365      -                    return x;
 366      -                }
      386 +            while (count == 0) {
 367  387                  if (nanos <= 0)
 368  388                      return null;
 369      -                try {
 370      -                    nanos = notEmpty.awaitNanos(nanos);
 371      -                } catch (InterruptedException ie) {
 372      -                    notEmpty.signal(); // propagate to non-interrupted thread
 373      -                    throw ie;
 374      -                }
 375      -
      389 +                nanos = notEmpty.awaitNanos(nanos);
 376  390              }
      391 +            return extract();
 377  392          } finally {
 378  393              lock.unlock();
 379  394          }
 380  395      }
 381  396  
 382  397      public E peek() {
 383  398          final ReentrantLock lock = this.lock;
 384  399          lock.lock();
 385  400          try {
 386      -            return (count == 0) ? null : items[takeIndex];
      401 +            return (count == 0) ? null : itemAt(takeIndex);
 387  402          } finally {
 388  403              lock.unlock();
 389  404          }
 390  405      }
 391  406  
 392  407      // this doc comment is overridden to remove the reference to collections
 393  408      // greater in size than Integer.MAX_VALUE
 394  409      /**
 395  410       * Returns the number of elements in this queue.
 396  411       *
↓ open down ↓ 8 lines elided ↑ open up ↑
 405  420              lock.unlock();
 406  421          }
 407  422      }
 408  423  
 409  424      // this doc comment is a modified copy of the inherited doc comment,
 410  425      // without the reference to unlimited queues.
 411  426      /**
 412  427       * Returns the number of additional elements that this queue can ideally
 413  428       * (in the absence of memory or resource constraints) accept without
 414  429       * blocking. This is always equal to the initial capacity of this queue
 415      -     * less the current <tt>size</tt> of this queue.
      430 +     * less the current {@code size} of this queue.
 416  431       *
 417  432       * <p>Note that you <em>cannot</em> always tell if an attempt to insert
 418      -     * an element will succeed by inspecting <tt>remainingCapacity</tt>
      433 +     * an element will succeed by inspecting {@code remainingCapacity}
 419  434       * because it may be the case that another thread is about to
 420  435       * insert or remove an element.
 421  436       */
 422  437      public int remainingCapacity() {
 423  438          final ReentrantLock lock = this.lock;
 424  439          lock.lock();
 425  440          try {
 426  441              return items.length - count;
 427  442          } finally {
 428  443              lock.unlock();
 429  444          }
 430  445      }
 431  446  
 432  447      /**
 433  448       * Removes a single instance of the specified element from this queue,
 434      -     * if it is present.  More formally, removes an element <tt>e</tt> such
 435      -     * that <tt>o.equals(e)</tt>, if this queue contains one or more such
      449 +     * if it is present.  More formally, removes an element {@code e} such
      450 +     * that {@code o.equals(e)}, if this queue contains one or more such
 436  451       * elements.
 437      -     * Returns <tt>true</tt> if this queue contained the specified element
      452 +     * Returns {@code true} if this queue contained the specified element
 438  453       * (or equivalently, if this queue changed as a result of the call).
 439  454       *
      455 +     * <p>Removal of interior elements in circular array based queues
      456 +     * is an intrinsically slow and disruptive operation, so should
      457 +     * be undertaken only in exceptional circumstances, ideally
      458 +     * only when the queue is known not to be accessible by other
      459 +     * threads.
      460 +     *
 440  461       * @param o element to be removed from this queue, if present
 441      -     * @return <tt>true</tt> if this queue changed as a result of the call
      462 +     * @return {@code true} if this queue changed as a result of the call
 442  463       */
 443  464      public boolean remove(Object o) {
 444  465          if (o == null) return false;
 445      -        final E[] items = this.items;
      466 +        final Object[] items = this.items;
 446  467          final ReentrantLock lock = this.lock;
 447  468          lock.lock();
 448  469          try {
 449      -            int i = takeIndex;
 450      -            int k = 0;
 451      -            for (;;) {
 452      -                if (k++ >= count)
 453      -                    return false;
      470 +            for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
 454  471                  if (o.equals(items[i])) {
 455  472                      removeAt(i);
 456  473                      return true;
 457  474                  }
 458      -                i = inc(i);
 459  475              }
 460      -
      476 +            return false;
 461  477          } finally {
 462  478              lock.unlock();
 463  479          }
 464  480      }
 465  481  
 466  482      /**
 467      -     * Returns <tt>true</tt> if this queue contains the specified element.
 468      -     * More formally, returns <tt>true</tt> if and only if this queue contains
 469      -     * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
      483 +     * Returns {@code true} if this queue contains the specified element.
      484 +     * More formally, returns {@code true} if and only if this queue contains
      485 +     * at least one element {@code e} such that {@code o.equals(e)}.
 470  486       *
 471  487       * @param o object to be checked for containment in this queue
 472      -     * @return <tt>true</tt> if this queue contains the specified element
      488 +     * @return {@code true} if this queue contains the specified element
 473  489       */
 474  490      public boolean contains(Object o) {
 475  491          if (o == null) return false;
 476      -        final E[] items = this.items;
      492 +        final Object[] items = this.items;
 477  493          final ReentrantLock lock = this.lock;
 478  494          lock.lock();
 479  495          try {
 480      -            int i = takeIndex;
 481      -            int k = 0;
 482      -            while (k++ < count) {
      496 +            for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
 483  497                  if (o.equals(items[i]))
 484  498                      return true;
 485      -                i = inc(i);
 486      -            }
 487  499              return false;
 488  500          } finally {
 489  501              lock.unlock();
 490  502          }
 491  503      }
 492  504  
 493  505      /**
 494  506       * Returns an array containing all of the elements in this queue, in
 495  507       * proper sequence.
 496  508       *
 497  509       * <p>The returned array will be "safe" in that no references to it are
 498  510       * maintained by this queue.  (In other words, this method must allocate
 499  511       * a new array).  The caller is thus free to modify the returned array.
 500  512       *
 501  513       * <p>This method acts as bridge between array-based and collection-based
 502  514       * APIs.
 503  515       *
 504  516       * @return an array containing all of the elements in this queue
 505  517       */
 506  518      public Object[] toArray() {
 507      -        final E[] items = this.items;
      519 +        final Object[] items = this.items;
 508  520          final ReentrantLock lock = this.lock;
 509  521          lock.lock();
 510  522          try {
      523 +            final int count = this.count;
 511  524              Object[] a = new Object[count];
 512      -            int k = 0;
 513      -            int i = takeIndex;
 514      -            while (k < count) {
 515      -                a[k++] = items[i];
 516      -                i = inc(i);
 517      -            }
      525 +            for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
      526 +                a[k] = items[i];
 518  527              return a;
 519  528          } finally {
 520  529              lock.unlock();
 521  530          }
 522  531      }
 523  532  
 524  533      /**
 525  534       * Returns an array containing all of the elements in this queue, in
 526  535       * proper sequence; the runtime type of the returned array is that of
 527  536       * the specified array.  If the queue fits in the specified array, it
 528  537       * is returned therein.  Otherwise, a new array is allocated with the
 529  538       * runtime type of the specified array and the size of this queue.
 530  539       *
 531  540       * <p>If this queue fits in the specified array with room to spare
 532  541       * (i.e., the array has more elements than this queue), the element in
 533  542       * the array immediately following the end of the queue is set to
 534      -     * <tt>null</tt>.
      543 +     * {@code null}.
 535  544       *
 536  545       * <p>Like the {@link #toArray()} method, this method acts as bridge between
 537  546       * array-based and collection-based APIs.  Further, this method allows
 538  547       * precise control over the runtime type of the output array, and may,
 539  548       * under certain circumstances, be used to save allocation costs.
 540  549       *
 541      -     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
      550 +     * <p>Suppose {@code x} is a queue known to contain only strings.
 542  551       * The following code can be used to dump the queue into a newly
 543      -     * allocated array of <tt>String</tt>:
      552 +     * allocated array of {@code String}:
 544  553       *
 545  554       * <pre>
 546  555       *     String[] y = x.toArray(new String[0]);</pre>
 547  556       *
 548      -     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
 549      -     * <tt>toArray()</tt>.
      557 +     * Note that {@code toArray(new Object[0])} is identical in function to
      558 +     * {@code toArray()}.
 550  559       *
 551  560       * @param a the array into which the elements of the queue are to
 552  561       *          be stored, if it is big enough; otherwise, a new array of the
 553  562       *          same runtime type is allocated for this purpose
 554  563       * @return an array containing all of the elements in this queue
 555  564       * @throws ArrayStoreException if the runtime type of the specified array
 556  565       *         is not a supertype of the runtime type of every element in
 557  566       *         this queue
 558  567       * @throws NullPointerException if the specified array is null
 559  568       */
      569 +    @SuppressWarnings("unchecked")
 560  570      public <T> T[] toArray(T[] a) {
 561      -        final E[] items = this.items;
      571 +        final Object[] items = this.items;
 562  572          final ReentrantLock lock = this.lock;
 563  573          lock.lock();
 564  574          try {
 565      -            if (a.length < count)
      575 +            final int count = this.count;
      576 +            final int len = a.length;
      577 +            if (len < count)
 566  578                  a = (T[])java.lang.reflect.Array.newInstance(
 567      -                    a.getClass().getComponentType(),
 568      -                    count
 569      -                    );
 570      -
 571      -            int k = 0;
 572      -            int i = takeIndex;
 573      -            while (k < count) {
 574      -                a[k++] = (T)items[i];
 575      -                i = inc(i);
 576      -            }
 577      -            if (a.length > count)
      579 +                    a.getClass().getComponentType(), count);
      580 +            for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
      581 +                a[k] = (T) items[i];
      582 +            if (len > count)
 578  583                  a[count] = null;
 579  584              return a;
 580  585          } finally {
 581  586              lock.unlock();
 582  587          }
 583  588      }
 584  589  
 585  590      public String toString() {
 586  591          final ReentrantLock lock = this.lock;
 587  592          lock.lock();
 588  593          try {
 589      -            return super.toString();
      594 +            int k = count;
      595 +            if (k == 0)
      596 +                return "[]";
      597 +
      598 +            StringBuilder sb = new StringBuilder();
      599 +            sb.append('[');
      600 +            for (int i = takeIndex; ; i = inc(i)) {
      601 +                Object e = items[i];
      602 +                sb.append(e == this ? "(this Collection)" : e);
      603 +                if (--k == 0)
      604 +                    return sb.append(']').toString();
      605 +                sb.append(',').append(' ');
      606 +            }
 590  607          } finally {
 591  608              lock.unlock();
 592  609          }
 593  610      }
 594  611  
 595  612      /**
 596  613       * Atomically removes all of the elements from this queue.
 597  614       * The queue will be empty after this call returns.
 598  615       */
 599  616      public void clear() {
 600      -        final E[] items = this.items;
      617 +        final Object[] items = this.items;
 601  618          final ReentrantLock lock = this.lock;
 602  619          lock.lock();
 603  620          try {
 604      -            int i = takeIndex;
 605      -            int k = count;
 606      -            while (k-- > 0) {
      621 +            for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
 607  622                  items[i] = null;
 608      -                i = inc(i);
 609      -            }
 610  623              count = 0;
 611  624              putIndex = 0;
 612  625              takeIndex = 0;
 613  626              notFull.signalAll();
 614  627          } finally {
 615  628              lock.unlock();
 616  629          }
 617  630      }
 618  631  
 619  632      /**
 620  633       * @throws UnsupportedOperationException {@inheritDoc}
 621  634       * @throws ClassCastException            {@inheritDoc}
 622  635       * @throws NullPointerException          {@inheritDoc}
 623  636       * @throws IllegalArgumentException      {@inheritDoc}
 624  637       */
 625  638      public int drainTo(Collection<? super E> c) {
 626      -        if (c == null)
 627      -            throw new NullPointerException();
      639 +        checkNotNull(c);
 628  640          if (c == this)
 629  641              throw new IllegalArgumentException();
 630      -        final E[] items = this.items;
      642 +        final Object[] items = this.items;
 631  643          final ReentrantLock lock = this.lock;
 632  644          lock.lock();
 633  645          try {
 634  646              int i = takeIndex;
 635  647              int n = 0;
 636  648              int max = count;
 637  649              while (n < max) {
 638      -                c.add(items[i]);
      650 +                c.add(this.<E>cast(items[i]));
 639  651                  items[i] = null;
 640  652                  i = inc(i);
 641  653                  ++n;
 642  654              }
 643  655              if (n > 0) {
 644  656                  count = 0;
 645  657                  putIndex = 0;
 646  658                  takeIndex = 0;
 647  659                  notFull.signalAll();
 648  660              }
↓ open down ↓ 3 lines elided ↑ open up ↑
 652  664          }
 653  665      }
 654  666  
 655  667      /**
 656  668       * @throws UnsupportedOperationException {@inheritDoc}
 657  669       * @throws ClassCastException            {@inheritDoc}
 658  670       * @throws NullPointerException          {@inheritDoc}
 659  671       * @throws IllegalArgumentException      {@inheritDoc}
 660  672       */
 661  673      public int drainTo(Collection<? super E> c, int maxElements) {
 662      -        if (c == null)
 663      -            throw new NullPointerException();
      674 +        checkNotNull(c);
 664  675          if (c == this)
 665  676              throw new IllegalArgumentException();
 666  677          if (maxElements <= 0)
 667  678              return 0;
 668      -        final E[] items = this.items;
      679 +        final Object[] items = this.items;
 669  680          final ReentrantLock lock = this.lock;
 670  681          lock.lock();
 671  682          try {
 672  683              int i = takeIndex;
 673  684              int n = 0;
 674      -            int sz = count;
 675      -            int max = (maxElements < count)? maxElements : count;
      685 +            int max = (maxElements < count) ? maxElements : count;
 676  686              while (n < max) {
 677      -                c.add(items[i]);
      687 +                c.add(this.<E>cast(items[i]));
 678  688                  items[i] = null;
 679  689                  i = inc(i);
 680  690                  ++n;
 681  691              }
 682  692              if (n > 0) {
 683  693                  count -= n;
 684  694                  takeIndex = i;
 685  695                  notFull.signalAll();
 686  696              }
 687  697              return n;
 688  698          } finally {
 689  699              lock.unlock();
 690  700          }
 691  701      }
 692  702  
 693      -
 694  703      /**
 695  704       * Returns an iterator over the elements in this queue in proper sequence.
 696      -     * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
 697      -     * will never throw {@link ConcurrentModificationException},
      705 +     * The elements will be returned in order from first (head) to last (tail).
      706 +     *
      707 +     * <p>The returned {@code Iterator} is a "weakly consistent" iterator that
      708 +     * will never throw {@link java.util.ConcurrentModificationException
      709 +     * ConcurrentModificationException},
 698  710       * and guarantees to traverse elements as they existed upon
 699  711       * construction of the iterator, and may (but is not guaranteed to)
 700  712       * reflect any modifications subsequent to construction.
 701  713       *
 702  714       * @return an iterator over the elements in this queue in proper sequence
 703  715       */
 704  716      public Iterator<E> iterator() {
 705      -        final ReentrantLock lock = this.lock;
 706      -        lock.lock();
 707      -        try {
 708      -            return new Itr();
 709      -        } finally {
 710      -            lock.unlock();
 711      -        }
      717 +        return new Itr();
 712  718      }
 713  719  
 714  720      /**
 715      -     * Iterator for ArrayBlockingQueue
      721 +     * Iterator for ArrayBlockingQueue. To maintain weak consistency
      722 +     * with respect to puts and takes, we (1) read ahead one slot, so
      723 +     * as to not report hasNext true but then not have an element to
      724 +     * return -- however we later recheck this slot to use the most
      725 +     * current value; (2) ensure that each array slot is traversed at
      726 +     * most once (by tracking "remaining" elements); (3) skip over
      727 +     * null slots, which can occur if takes race ahead of iterators.
      728 +     * However, for circular array-based queues, we cannot rely on any
      729 +     * well established definition of what it means to be weakly
      730 +     * consistent with respect to interior removes since these may
      731 +     * require slot overwrites in the process of sliding elements to
      732 +     * cover gaps. So we settle for resiliency, operating on
      733 +     * established apparent nexts, which may miss some elements that
      734 +     * have moved between calls to next.
 716  735       */
 717  736      private class Itr implements Iterator<E> {
 718      -        /**
 719      -         * Index of element to be returned by next,
 720      -         * or a negative number if no such.
 721      -         */
 722      -        private int nextIndex;
      737 +        private int remaining; // Number of elements yet to be returned
      738 +        private int nextIndex; // Index of element to be returned by next
      739 +        private E nextItem;    // Element to be returned by next call to next
      740 +        private E lastItem;    // Element returned by last call to next
      741 +        private int lastRet;   // Index of last element returned, or -1 if none
 723  742  
 724      -        /**
 725      -         * nextItem holds on to item fields because once we claim
 726      -         * that an element exists in hasNext(), we must return it in
 727      -         * the following next() call even if it was in the process of
 728      -         * being removed when hasNext() was called.
 729      -         */
 730      -        private E nextItem;
 731      -
 732      -        /**
 733      -         * Index of element returned by most recent call to next.
 734      -         * Reset to -1 if this element is deleted by a call to remove.
 735      -         */
 736      -        private int lastRet;
 737      -
 738  743          Itr() {
 739      -            lastRet = -1;
 740      -            if (count == 0)
 741      -                nextIndex = -1;
 742      -            else {
 743      -                nextIndex = takeIndex;
 744      -                nextItem = items[takeIndex];
      744 +            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
      745 +            lock.lock();
      746 +            try {
      747 +                lastRet = -1;
      748 +                if ((remaining = count) > 0)
      749 +                    nextItem = itemAt(nextIndex = takeIndex);
      750 +            } finally {
      751 +                lock.unlock();
 745  752              }
 746  753          }
 747  754  
 748  755          public boolean hasNext() {
 749      -            /*
 750      -             * No sync. We can return true by mistake here
 751      -             * only if this iterator passed across threads,
 752      -             * which we don't support anyway.
 753      -             */
 754      -            return nextIndex >= 0;
      756 +            return remaining > 0;
 755  757          }
 756  758  
 757      -        /**
 758      -         * Checks whether nextIndex is valid; if so setting nextItem.
 759      -         * Stops iterator when either hits putIndex or sees null item.
 760      -         */
 761      -        private void checkNext() {
 762      -            if (nextIndex == putIndex) {
 763      -                nextIndex = -1;
 764      -                nextItem = null;
 765      -            } else {
 766      -                nextItem = items[nextIndex];
 767      -                if (nextItem == null)
 768      -                    nextIndex = -1;
 769      -            }
 770      -        }
 771      -
 772  759          public E next() {
 773  760              final ReentrantLock lock = ArrayBlockingQueue.this.lock;
 774  761              lock.lock();
 775  762              try {
 776      -                if (nextIndex < 0)
      763 +                if (remaining <= 0)
 777  764                      throw new NoSuchElementException();
 778  765                  lastRet = nextIndex;
 779      -                E x = nextItem;
 780      -                nextIndex = inc(nextIndex);
 781      -                checkNext();
      766 +                E x = itemAt(nextIndex);  // check for fresher value
      767 +                if (x == null) {
      768 +                    x = nextItem;         // we are forced to report old value
      769 +                    lastItem = null;      // but ensure remove fails
      770 +                }
      771 +                else
      772 +                    lastItem = x;
      773 +                while (--remaining > 0 && // skip over nulls
      774 +                       (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
      775 +                    ;
 782  776                  return x;
 783  777              } finally {
 784  778                  lock.unlock();
 785  779              }
 786  780          }
 787  781  
 788  782          public void remove() {
 789  783              final ReentrantLock lock = ArrayBlockingQueue.this.lock;
 790  784              lock.lock();
 791  785              try {
 792  786                  int i = lastRet;
 793  787                  if (i == -1)
 794  788                      throw new IllegalStateException();
 795  789                  lastRet = -1;
 796      -
 797      -                int ti = takeIndex;
 798      -                removeAt(i);
 799      -                // back up cursor (reset to front if was first element)
 800      -                nextIndex = (i == ti) ? takeIndex : i;
 801      -                checkNext();
      790 +                E x = lastItem;
      791 +                lastItem = null;
      792 +                // only remove if item still at index
      793 +                if (x != null && x == items[i]) {
      794 +                    boolean removingHead = (i == takeIndex);
      795 +                    removeAt(i);
      796 +                    if (!removingHead)
      797 +                        nextIndex = dec(nextIndex);
      798 +                }
 802  799              } finally {
 803  800                  lock.unlock();
 804  801              }
 805  802          }
 806  803      }
      804 +
 807  805  }
    
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX