Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java
+++ new/src/share/classes/java/util/concurrent/ArrayBlockingQueue.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation. Oracle designates this
7 7 * particular file as subject to the "Classpath" exception as provided
8 8 * by Oracle in the LICENSE file that accompanied this code.
9 9 *
10 10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 13 * version 2 for more details (a copy is included in the LICENSE file that
14 14 * accompanied this code).
15 15 *
16 16 * You should have received a copy of the GNU General Public License version
17 17 * 2 along with this work; if not, write to the Free Software Foundation,
18 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 19 *
20 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 21 * or visit www.oracle.com if you need additional information or have any
22 22 * questions.
23 23 */
24 24
25 25 /*
26 26 * This file is available under and governed by the GNU General Public
27 27 * License version 2 only, as published by the Free Software Foundation.
28 28 * However, the following notice accompanied the original version of this
29 29 * file:
30 30 *
31 31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 32 * Expert Group and released to the public domain, as explained at
33 33 * http://creativecommons.org/licenses/publicdomain
34 34 */
35 35
36 36 package java.util.concurrent;
37 37 import java.util.concurrent.locks.*;
38 38 import java.util.*;
39 39
40 40 /**
41 41 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
↓ open down ↓ |
41 lines elided |
↑ open up ↑ |
42 42 * array. This queue orders elements FIFO (first-in-first-out). The
43 43 * <em>head</em> of the queue is that element that has been on the
44 44 * queue the longest time. The <em>tail</em> of the queue is that
45 45 * element that has been on the queue the shortest time. New elements
46 46 * are inserted at the tail of the queue, and the queue retrieval
47 47 * operations obtain elements at the head of the queue.
48 48 *
49 49 * <p>This is a classic "bounded buffer", in which a
50 50 * fixed-sized array holds elements inserted by producers and
51 51 * extracted by consumers. Once created, the capacity cannot be
52 - * increased. Attempts to <tt>put</tt> an element into a full queue
53 - * will result in the operation blocking; attempts to <tt>take</tt> an
52 + * changed. Attempts to {@code put} an element into a full queue
53 + * will result in the operation blocking; attempts to {@code take} an
54 54 * element from an empty queue will similarly block.
55 55 *
56 - * <p> This class supports an optional fairness policy for ordering
56 + * <p>This class supports an optional fairness policy for ordering
57 57 * waiting producer and consumer threads. By default, this ordering
58 58 * is not guaranteed. However, a queue constructed with fairness set
59 - * to <tt>true</tt> grants threads access in FIFO order. Fairness
59 + * to {@code true} grants threads access in FIFO order. Fairness
60 60 * generally decreases throughput but reduces variability and avoids
61 61 * starvation.
62 62 *
63 63 * <p>This class and its iterator implement all of the
64 64 * <em>optional</em> methods of the {@link Collection} and {@link
65 65 * Iterator} interfaces.
66 66 *
67 67 * <p>This class is a member of the
68 68 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
69 69 * Java Collections Framework</a>.
70 70 *
71 71 * @since 1.5
72 72 * @author Doug Lea
73 73 * @param <E> the type of elements held in this collection
74 74 */
75 75 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
↓ open down ↓ |
6 lines elided |
↑ open up ↑ |
76 76 implements BlockingQueue<E>, java.io.Serializable {
77 77
78 78 /**
79 79 * Serialization ID. This class relies on default serialization
80 80 * even for the items array, which is default-serialized, even if
81 81 * it is empty. Otherwise it could not be declared final, which is
82 82 * necessary here.
83 83 */
84 84 private static final long serialVersionUID = -817911632652898426L;
85 85
86 - /** The queued items */
87 - private final E[] items;
88 - /** items index for next take, poll or remove */
89 - private int takeIndex;
90 - /** items index for next put, offer, or add. */
91 - private int putIndex;
92 - /** Number of items in the queue */
93 - private int count;
86 + /** The queued items */
87 + final Object[] items;
94 88
89 + /** items index for next take, poll, peek or remove */
90 + int takeIndex;
91 +
92 + /** items index for next put, offer, or add */
93 + int putIndex;
94 +
95 + /** Number of elements in the queue */
96 + int count;
97 +
95 98 /*
96 99 * Concurrency control uses the classic two-condition algorithm
97 100 * found in any textbook.
98 101 */
99 102
100 103 /** Main lock guarding all access */
101 - private final ReentrantLock lock;
104 + final ReentrantLock lock;
102 105 /** Condition for waiting takes */
103 106 private final Condition notEmpty;
104 107 /** Condition for waiting puts */
105 108 private final Condition notFull;
106 109
107 110 // Internal helper methods
108 111
109 112 /**
110 113 * Circularly increment i.
111 114 */
112 115 final int inc(int i) {
113 - return (++i == items.length)? 0 : i;
116 + return (++i == items.length) ? 0 : i;
114 117 }
115 118
116 119 /**
120 + * Circularly decrement i.
121 + */
122 + final int dec(int i) {
123 + return ((i == 0) ? items.length : i) - 1;
124 + }
125 +
126 + @SuppressWarnings("unchecked")
127 + static <E> E cast(Object item) {
128 + return (E) item;
129 + }
130 +
131 + /**
132 + * Returns item at index i.
133 + */
134 + final E itemAt(int i) {
135 + return this.<E>cast(items[i]);
136 + }
137 +
138 + /**
139 + * Throws NullPointerException if argument is null.
140 + *
141 + * @param v the element
142 + */
143 + private static void checkNotNull(Object v) {
144 + if (v == null)
145 + throw new NullPointerException();
146 + }
147 +
148 + /**
117 149 * Inserts element at current put position, advances, and signals.
118 150 * Call only when holding lock.
119 151 */
120 152 private void insert(E x) {
121 153 items[putIndex] = x;
122 154 putIndex = inc(putIndex);
123 155 ++count;
124 156 notEmpty.signal();
125 157 }
126 158
127 159 /**
128 160 * Extracts element at current take position, advances, and signals.
129 161 * Call only when holding lock.
130 162 */
131 163 private E extract() {
132 - final E[] items = this.items;
133 - E x = items[takeIndex];
164 + final Object[] items = this.items;
165 + E x = this.<E>cast(items[takeIndex]);
134 166 items[takeIndex] = null;
135 167 takeIndex = inc(takeIndex);
136 168 --count;
137 169 notFull.signal();
138 170 return x;
139 171 }
140 172
141 173 /**
142 - * Utility for remove and iterator.remove: Delete item at position i.
174 + * Deletes item at position i.
175 + * Utility for remove and iterator.remove.
143 176 * Call only when holding lock.
144 177 */
145 178 void removeAt(int i) {
146 - final E[] items = this.items;
179 + final Object[] items = this.items;
147 180 // if removing front item, just advance
148 181 if (i == takeIndex) {
149 182 items[takeIndex] = null;
150 183 takeIndex = inc(takeIndex);
151 184 } else {
152 185 // slide over all others up through putIndex.
153 186 for (;;) {
154 187 int nexti = inc(i);
155 188 if (nexti != putIndex) {
156 189 items[i] = items[nexti];
157 190 i = nexti;
158 191 } else {
159 192 items[i] = null;
↓ open down ↓ |
3 lines elided |
↑ open up ↑ |
160 193 putIndex = i;
161 194 break;
162 195 }
163 196 }
164 197 }
165 198 --count;
166 199 notFull.signal();
167 200 }
168 201
169 202 /**
170 - * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
203 + * Creates an {@code ArrayBlockingQueue} with the given (fixed)
171 204 * capacity and default access policy.
172 205 *
173 206 * @param capacity the capacity of this queue
174 - * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
207 + * @throws IllegalArgumentException if {@code capacity < 1}
175 208 */
176 209 public ArrayBlockingQueue(int capacity) {
177 210 this(capacity, false);
178 211 }
179 212
180 213 /**
181 - * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
214 + * Creates an {@code ArrayBlockingQueue} with the given (fixed)
182 215 * capacity and the specified access policy.
183 216 *
184 217 * @param capacity the capacity of this queue
185 - * @param fair if <tt>true</tt> then queue accesses for threads blocked
218 + * @param fair if {@code true} then queue accesses for threads blocked
186 219 * on insertion or removal, are processed in FIFO order;
187 - * if <tt>false</tt> the access order is unspecified.
188 - * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
220 + * if {@code false} the access order is unspecified.
221 + * @throws IllegalArgumentException if {@code capacity < 1}
189 222 */
190 223 public ArrayBlockingQueue(int capacity, boolean fair) {
191 224 if (capacity <= 0)
192 225 throw new IllegalArgumentException();
193 - this.items = (E[]) new Object[capacity];
226 + this.items = new Object[capacity];
194 227 lock = new ReentrantLock(fair);
195 228 notEmpty = lock.newCondition();
196 229 notFull = lock.newCondition();
197 230 }
198 231
199 232 /**
200 - * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
233 + * Creates an {@code ArrayBlockingQueue} with the given (fixed)
201 234 * capacity, the specified access policy and initially containing the
202 235 * elements of the given collection,
203 236 * added in traversal order of the collection's iterator.
204 237 *
205 238 * @param capacity the capacity of this queue
206 - * @param fair if <tt>true</tt> then queue accesses for threads blocked
239 + * @param fair if {@code true} then queue accesses for threads blocked
207 240 * on insertion or removal, are processed in FIFO order;
208 - * if <tt>false</tt> the access order is unspecified.
241 + * if {@code false} the access order is unspecified.
209 242 * @param c the collection of elements to initially contain
210 - * @throws IllegalArgumentException if <tt>capacity</tt> is less than
211 - * <tt>c.size()</tt>, or less than 1.
243 + * @throws IllegalArgumentException if {@code capacity} is less than
244 + * {@code c.size()}, or less than 1.
212 245 * @throws NullPointerException if the specified collection or any
213 246 * of its elements are null
214 247 */
215 248 public ArrayBlockingQueue(int capacity, boolean fair,
216 249 Collection<? extends E> c) {
217 250 this(capacity, fair);
218 - if (capacity < c.size())
219 - throw new IllegalArgumentException();
220 251
221 - for (E e : c)
222 - add(e);
252 + final ReentrantLock lock = this.lock;
253 + lock.lock(); // Lock only for visibility, not mutual exclusion
254 + try {
255 + int i = 0;
256 + try {
257 + for (E e : c) {
258 + checkNotNull(e);
259 + items[i++] = e;
260 + }
261 + } catch (ArrayIndexOutOfBoundsException ex) {
262 + throw new IllegalArgumentException();
263 + }
264 + count = i;
265 + putIndex = (i == capacity) ? 0 : i;
266 + } finally {
267 + lock.unlock();
268 + }
223 269 }
224 270
225 271 /**
226 272 * Inserts the specified element at the tail of this queue if it is
227 273 * possible to do so immediately without exceeding the queue's capacity,
228 - * returning <tt>true</tt> upon success and throwing an
229 - * <tt>IllegalStateException</tt> if this queue is full.
274 + * returning {@code true} upon success and throwing an
275 + * {@code IllegalStateException} if this queue is full.
230 276 *
231 277 * @param e the element to add
232 - * @return <tt>true</tt> (as specified by {@link Collection#add})
278 + * @return {@code true} (as specified by {@link Collection#add})
233 279 * @throws IllegalStateException if this queue is full
234 280 * @throws NullPointerException if the specified element is null
235 281 */
236 282 public boolean add(E e) {
237 283 return super.add(e);
238 284 }
239 285
240 286 /**
241 287 * Inserts the specified element at the tail of this queue if it is
242 288 * possible to do so immediately without exceeding the queue's capacity,
243 - * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
289 + * returning {@code true} upon success and {@code false} if this queue
244 290 * is full. This method is generally preferable to method {@link #add},
245 291 * which can fail to insert an element only by throwing an exception.
246 292 *
247 293 * @throws NullPointerException if the specified element is null
248 294 */
249 295 public boolean offer(E e) {
250 - if (e == null) throw new NullPointerException();
296 + checkNotNull(e);
251 297 final ReentrantLock lock = this.lock;
252 298 lock.lock();
253 299 try {
254 300 if (count == items.length)
255 301 return false;
256 302 else {
257 303 insert(e);
258 304 return true;
259 305 }
260 306 } finally {
261 307 lock.unlock();
262 308 }
↓ open down ↓ |
2 lines elided |
↑ open up ↑ |
263 309 }
264 310
265 311 /**
266 312 * Inserts the specified element at the tail of this queue, waiting
267 313 * for space to become available if the queue is full.
268 314 *
269 315 * @throws InterruptedException {@inheritDoc}
270 316 * @throws NullPointerException {@inheritDoc}
271 317 */
272 318 public void put(E e) throws InterruptedException {
273 - if (e == null) throw new NullPointerException();
274 - final E[] items = this.items;
319 + checkNotNull(e);
275 320 final ReentrantLock lock = this.lock;
276 321 lock.lockInterruptibly();
277 322 try {
278 - try {
279 - while (count == items.length)
280 - notFull.await();
281 - } catch (InterruptedException ie) {
282 - notFull.signal(); // propagate to non-interrupted thread
283 - throw ie;
284 - }
323 + while (count == items.length)
324 + notFull.await();
285 325 insert(e);
286 326 } finally {
287 327 lock.unlock();
288 328 }
289 329 }
290 330
291 331 /**
292 332 * Inserts the specified element at the tail of this queue, waiting
293 333 * up to the specified wait time for space to become available if
294 334 * the queue is full.
295 335 *
296 336 * @throws InterruptedException {@inheritDoc}
297 337 * @throws NullPointerException {@inheritDoc}
298 338 */
299 339 public boolean offer(E e, long timeout, TimeUnit unit)
300 340 throws InterruptedException {
301 341
302 - if (e == null) throw new NullPointerException();
342 + checkNotNull(e);
303 343 long nanos = unit.toNanos(timeout);
304 344 final ReentrantLock lock = this.lock;
305 345 lock.lockInterruptibly();
306 346 try {
307 - for (;;) {
308 - if (count != items.length) {
309 - insert(e);
310 - return true;
311 - }
347 + while (count == items.length) {
312 348 if (nanos <= 0)
313 349 return false;
314 - try {
315 - nanos = notFull.awaitNanos(nanos);
316 - } catch (InterruptedException ie) {
317 - notFull.signal(); // propagate to non-interrupted thread
318 - throw ie;
319 - }
350 + nanos = notFull.awaitNanos(nanos);
320 351 }
352 + insert(e);
353 + return true;
321 354 } finally {
322 355 lock.unlock();
323 356 }
324 357 }
325 358
326 359 public E poll() {
327 360 final ReentrantLock lock = this.lock;
328 361 lock.lock();
329 362 try {
330 - if (count == 0)
331 - return null;
332 - E x = extract();
333 - return x;
363 + return (count == 0) ? null : extract();
334 364 } finally {
335 365 lock.unlock();
336 366 }
337 367 }
338 368
339 369 public E take() throws InterruptedException {
340 370 final ReentrantLock lock = this.lock;
341 371 lock.lockInterruptibly();
342 372 try {
343 - try {
344 - while (count == 0)
345 - notEmpty.await();
346 - } catch (InterruptedException ie) {
347 - notEmpty.signal(); // propagate to non-interrupted thread
348 - throw ie;
349 - }
350 - E x = extract();
351 - return x;
373 + while (count == 0)
374 + notEmpty.await();
375 + return extract();
352 376 } finally {
353 377 lock.unlock();
354 378 }
355 379 }
356 380
357 381 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
358 382 long nanos = unit.toNanos(timeout);
359 383 final ReentrantLock lock = this.lock;
360 384 lock.lockInterruptibly();
361 385 try {
362 - for (;;) {
363 - if (count != 0) {
364 - E x = extract();
365 - return x;
366 - }
386 + while (count == 0) {
367 387 if (nanos <= 0)
368 388 return null;
369 - try {
370 - nanos = notEmpty.awaitNanos(nanos);
371 - } catch (InterruptedException ie) {
372 - notEmpty.signal(); // propagate to non-interrupted thread
373 - throw ie;
374 - }
375 -
389 + nanos = notEmpty.awaitNanos(nanos);
376 390 }
391 + return extract();
377 392 } finally {
378 393 lock.unlock();
379 394 }
380 395 }
381 396
382 397 public E peek() {
383 398 final ReentrantLock lock = this.lock;
384 399 lock.lock();
385 400 try {
386 - return (count == 0) ? null : items[takeIndex];
401 + return (count == 0) ? null : itemAt(takeIndex);
387 402 } finally {
388 403 lock.unlock();
389 404 }
390 405 }
391 406
392 407 // this doc comment is overridden to remove the reference to collections
393 408 // greater in size than Integer.MAX_VALUE
394 409 /**
395 410 * Returns the number of elements in this queue.
396 411 *
397 412 * @return the number of elements in this queue
398 413 */
399 414 public int size() {
400 415 final ReentrantLock lock = this.lock;
401 416 lock.lock();
402 417 try {
403 418 return count;
404 419 } finally {
↓ open down ↓ |
8 lines elided |
↑ open up ↑ |
405 420 lock.unlock();
406 421 }
407 422 }
408 423
409 424 // this doc comment is a modified copy of the inherited doc comment,
410 425 // without the reference to unlimited queues.
411 426 /**
412 427 * Returns the number of additional elements that this queue can ideally
413 428 * (in the absence of memory or resource constraints) accept without
414 429 * blocking. This is always equal to the initial capacity of this queue
415 - * less the current <tt>size</tt> of this queue.
430 + * less the current {@code size} of this queue.
416 431 *
417 432 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
418 - * an element will succeed by inspecting <tt>remainingCapacity</tt>
433 + * an element will succeed by inspecting {@code remainingCapacity}
419 434 * because it may be the case that another thread is about to
420 435 * insert or remove an element.
421 436 */
422 437 public int remainingCapacity() {
423 438 final ReentrantLock lock = this.lock;
424 439 lock.lock();
425 440 try {
426 441 return items.length - count;
427 442 } finally {
428 443 lock.unlock();
429 444 }
430 445 }
431 446
432 447 /**
433 448 * Removes a single instance of the specified element from this queue,
434 - * if it is present. More formally, removes an element <tt>e</tt> such
435 - * that <tt>o.equals(e)</tt>, if this queue contains one or more such
449 + * if it is present. More formally, removes an element {@code e} such
450 + * that {@code o.equals(e)}, if this queue contains one or more such
436 451 * elements.
437 - * Returns <tt>true</tt> if this queue contained the specified element
452 + * Returns {@code true} if this queue contained the specified element
438 453 * (or equivalently, if this queue changed as a result of the call).
439 454 *
455 + * <p>Removal of interior elements in circular array based queues
456 + * is an intrinsically slow and disruptive operation, so should
457 + * be undertaken only in exceptional circumstances, ideally
458 + * only when the queue is known not to be accessible by other
459 + * threads.
460 + *
440 461 * @param o element to be removed from this queue, if present
441 - * @return <tt>true</tt> if this queue changed as a result of the call
462 + * @return {@code true} if this queue changed as a result of the call
442 463 */
443 464 public boolean remove(Object o) {
444 465 if (o == null) return false;
445 - final E[] items = this.items;
466 + final Object[] items = this.items;
446 467 final ReentrantLock lock = this.lock;
447 468 lock.lock();
448 469 try {
449 - int i = takeIndex;
450 - int k = 0;
451 - for (;;) {
452 - if (k++ >= count)
453 - return false;
470 + for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
454 471 if (o.equals(items[i])) {
455 472 removeAt(i);
456 473 return true;
457 474 }
458 - i = inc(i);
459 475 }
460 -
476 + return false;
461 477 } finally {
462 478 lock.unlock();
463 479 }
464 480 }
465 481
466 482 /**
467 - * Returns <tt>true</tt> if this queue contains the specified element.
468 - * More formally, returns <tt>true</tt> if and only if this queue contains
469 - * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
483 + * Returns {@code true} if this queue contains the specified element.
484 + * More formally, returns {@code true} if and only if this queue contains
485 + * at least one element {@code e} such that {@code o.equals(e)}.
470 486 *
471 487 * @param o object to be checked for containment in this queue
472 - * @return <tt>true</tt> if this queue contains the specified element
488 + * @return {@code true} if this queue contains the specified element
473 489 */
474 490 public boolean contains(Object o) {
475 491 if (o == null) return false;
476 - final E[] items = this.items;
492 + final Object[] items = this.items;
477 493 final ReentrantLock lock = this.lock;
478 494 lock.lock();
479 495 try {
480 - int i = takeIndex;
481 - int k = 0;
482 - while (k++ < count) {
496 + for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
483 497 if (o.equals(items[i]))
484 498 return true;
485 - i = inc(i);
486 - }
487 499 return false;
488 500 } finally {
489 501 lock.unlock();
490 502 }
491 503 }
492 504
493 505 /**
494 506 * Returns an array containing all of the elements in this queue, in
495 507 * proper sequence.
496 508 *
497 509 * <p>The returned array will be "safe" in that no references to it are
498 510 * maintained by this queue. (In other words, this method must allocate
499 511 * a new array). The caller is thus free to modify the returned array.
500 512 *
501 513 * <p>This method acts as bridge between array-based and collection-based
502 514 * APIs.
503 515 *
504 516 * @return an array containing all of the elements in this queue
505 517 */
506 518 public Object[] toArray() {
507 - final E[] items = this.items;
519 + final Object[] items = this.items;
508 520 final ReentrantLock lock = this.lock;
509 521 lock.lock();
510 522 try {
523 + final int count = this.count;
511 524 Object[] a = new Object[count];
512 - int k = 0;
513 - int i = takeIndex;
514 - while (k < count) {
515 - a[k++] = items[i];
516 - i = inc(i);
517 - }
525 + for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
526 + a[k] = items[i];
518 527 return a;
519 528 } finally {
520 529 lock.unlock();
521 530 }
522 531 }
523 532
524 533 /**
525 534 * Returns an array containing all of the elements in this queue, in
526 535 * proper sequence; the runtime type of the returned array is that of
527 536 * the specified array. If the queue fits in the specified array, it
528 537 * is returned therein. Otherwise, a new array is allocated with the
529 538 * runtime type of the specified array and the size of this queue.
530 539 *
531 540 * <p>If this queue fits in the specified array with room to spare
532 541 * (i.e., the array has more elements than this queue), the element in
533 542 * the array immediately following the end of the queue is set to
534 - * <tt>null</tt>.
543 + * {@code null}.
535 544 *
536 545 * <p>Like the {@link #toArray()} method, this method acts as bridge between
537 546 * array-based and collection-based APIs. Further, this method allows
538 547 * precise control over the runtime type of the output array, and may,
539 548 * under certain circumstances, be used to save allocation costs.
540 549 *
541 - * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
550 + * <p>Suppose {@code x} is a queue known to contain only strings.
542 551 * The following code can be used to dump the queue into a newly
543 - * allocated array of <tt>String</tt>:
552 + * allocated array of {@code String}:
544 553 *
545 554 * <pre>
546 555 * String[] y = x.toArray(new String[0]);</pre>
547 556 *
548 - * Note that <tt>toArray(new Object[0])</tt> is identical in function to
549 - * <tt>toArray()</tt>.
557 + * Note that {@code toArray(new Object[0])} is identical in function to
558 + * {@code toArray()}.
550 559 *
551 560 * @param a the array into which the elements of the queue are to
552 561 * be stored, if it is big enough; otherwise, a new array of the
553 562 * same runtime type is allocated for this purpose
554 563 * @return an array containing all of the elements in this queue
555 564 * @throws ArrayStoreException if the runtime type of the specified array
556 565 * is not a supertype of the runtime type of every element in
557 566 * this queue
558 567 * @throws NullPointerException if the specified array is null
559 568 */
569 + @SuppressWarnings("unchecked")
560 570 public <T> T[] toArray(T[] a) {
561 - final E[] items = this.items;
571 + final Object[] items = this.items;
562 572 final ReentrantLock lock = this.lock;
563 573 lock.lock();
564 574 try {
565 - if (a.length < count)
575 + final int count = this.count;
576 + final int len = a.length;
577 + if (len < count)
566 578 a = (T[])java.lang.reflect.Array.newInstance(
567 - a.getClass().getComponentType(),
568 - count
569 - );
570 -
571 - int k = 0;
572 - int i = takeIndex;
573 - while (k < count) {
574 - a[k++] = (T)items[i];
575 - i = inc(i);
576 - }
577 - if (a.length > count)
579 + a.getClass().getComponentType(), count);
580 + for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
581 + a[k] = (T) items[i];
582 + if (len > count)
578 583 a[count] = null;
579 584 return a;
580 585 } finally {
581 586 lock.unlock();
582 587 }
583 588 }
584 589
585 590 public String toString() {
586 591 final ReentrantLock lock = this.lock;
587 592 lock.lock();
588 593 try {
589 - return super.toString();
594 + int k = count;
595 + if (k == 0)
596 + return "[]";
597 +
598 + StringBuilder sb = new StringBuilder();
599 + sb.append('[');
600 + for (int i = takeIndex; ; i = inc(i)) {
601 + Object e = items[i];
602 + sb.append(e == this ? "(this Collection)" : e);
603 + if (--k == 0)
604 + return sb.append(']').toString();
605 + sb.append(',').append(' ');
606 + }
590 607 } finally {
591 608 lock.unlock();
592 609 }
593 610 }
594 611
595 612 /**
596 613 * Atomically removes all of the elements from this queue.
597 614 * The queue will be empty after this call returns.
598 615 */
599 616 public void clear() {
600 - final E[] items = this.items;
617 + final Object[] items = this.items;
601 618 final ReentrantLock lock = this.lock;
602 619 lock.lock();
603 620 try {
604 - int i = takeIndex;
605 - int k = count;
606 - while (k-- > 0) {
621 + for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
607 622 items[i] = null;
608 - i = inc(i);
609 - }
610 623 count = 0;
611 624 putIndex = 0;
612 625 takeIndex = 0;
613 626 notFull.signalAll();
614 627 } finally {
615 628 lock.unlock();
616 629 }
617 630 }
618 631
619 632 /**
620 633 * @throws UnsupportedOperationException {@inheritDoc}
621 634 * @throws ClassCastException {@inheritDoc}
622 635 * @throws NullPointerException {@inheritDoc}
623 636 * @throws IllegalArgumentException {@inheritDoc}
624 637 */
625 638 public int drainTo(Collection<? super E> c) {
626 - if (c == null)
627 - throw new NullPointerException();
639 + checkNotNull(c);
628 640 if (c == this)
629 641 throw new IllegalArgumentException();
630 - final E[] items = this.items;
642 + final Object[] items = this.items;
631 643 final ReentrantLock lock = this.lock;
632 644 lock.lock();
633 645 try {
634 646 int i = takeIndex;
635 647 int n = 0;
636 648 int max = count;
637 649 while (n < max) {
638 - c.add(items[i]);
650 + c.add(this.<E>cast(items[i]));
639 651 items[i] = null;
640 652 i = inc(i);
641 653 ++n;
642 654 }
643 655 if (n > 0) {
644 656 count = 0;
645 657 putIndex = 0;
646 658 takeIndex = 0;
647 659 notFull.signalAll();
648 660 }
649 661 return n;
650 662 } finally {
651 663 lock.unlock();
↓ open down ↓ |
3 lines elided |
↑ open up ↑ |
652 664 }
653 665 }
654 666
655 667 /**
656 668 * @throws UnsupportedOperationException {@inheritDoc}
657 669 * @throws ClassCastException {@inheritDoc}
658 670 * @throws NullPointerException {@inheritDoc}
659 671 * @throws IllegalArgumentException {@inheritDoc}
660 672 */
661 673 public int drainTo(Collection<? super E> c, int maxElements) {
662 - if (c == null)
663 - throw new NullPointerException();
674 + checkNotNull(c);
664 675 if (c == this)
665 676 throw new IllegalArgumentException();
666 677 if (maxElements <= 0)
667 678 return 0;
668 - final E[] items = this.items;
679 + final Object[] items = this.items;
669 680 final ReentrantLock lock = this.lock;
670 681 lock.lock();
671 682 try {
672 683 int i = takeIndex;
673 684 int n = 0;
674 - int sz = count;
675 - int max = (maxElements < count)? maxElements : count;
685 + int max = (maxElements < count) ? maxElements : count;
676 686 while (n < max) {
677 - c.add(items[i]);
687 + c.add(this.<E>cast(items[i]));
678 688 items[i] = null;
679 689 i = inc(i);
680 690 ++n;
681 691 }
682 692 if (n > 0) {
683 693 count -= n;
684 694 takeIndex = i;
685 695 notFull.signalAll();
686 696 }
687 697 return n;
688 698 } finally {
689 699 lock.unlock();
690 700 }
691 701 }
692 702
693 -
694 703 /**
695 704 * Returns an iterator over the elements in this queue in proper sequence.
696 - * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
697 - * will never throw {@link ConcurrentModificationException},
705 + * The elements will be returned in order from first (head) to last (tail).
706 + *
707 + * <p>The returned {@code Iterator} is a "weakly consistent" iterator that
708 + * will never throw {@link java.util.ConcurrentModificationException
709 + * ConcurrentModificationException},
698 710 * and guarantees to traverse elements as they existed upon
699 711 * construction of the iterator, and may (but is not guaranteed to)
700 712 * reflect any modifications subsequent to construction.
701 713 *
702 714 * @return an iterator over the elements in this queue in proper sequence
703 715 */
704 716 public Iterator<E> iterator() {
705 - final ReentrantLock lock = this.lock;
706 - lock.lock();
707 - try {
708 - return new Itr();
709 - } finally {
710 - lock.unlock();
711 - }
717 + return new Itr();
712 718 }
713 719
714 720 /**
715 - * Iterator for ArrayBlockingQueue
721 + * Iterator for ArrayBlockingQueue. To maintain weak consistency
722 + * with respect to puts and takes, we (1) read ahead one slot, so
723 + * as to not report hasNext true but then not have an element to
724 + * return -- however we later recheck this slot to use the most
725 + * current value; (2) ensure that each array slot is traversed at
726 + * most once (by tracking "remaining" elements); (3) skip over
727 + * null slots, which can occur if takes race ahead of iterators.
728 + * However, for circular array-based queues, we cannot rely on any
729 + * well established definition of what it means to be weakly
730 + * consistent with respect to interior removes since these may
731 + * require slot overwrites in the process of sliding elements to
732 + * cover gaps. So we settle for resiliency, operating on
733 + * established apparent nexts, which may miss some elements that
734 + * have moved between calls to next.
716 735 */
717 736 private class Itr implements Iterator<E> {
718 - /**
719 - * Index of element to be returned by next,
720 - * or a negative number if no such.
721 - */
722 - private int nextIndex;
737 + private int remaining; // Number of elements yet to be returned
738 + private int nextIndex; // Index of element to be returned by next
739 + private E nextItem; // Element to be returned by next call to next
740 + private E lastItem; // Element returned by last call to next
741 + private int lastRet; // Index of last element returned, or -1 if none
723 742
724 - /**
725 - * nextItem holds on to item fields because once we claim
726 - * that an element exists in hasNext(), we must return it in
727 - * the following next() call even if it was in the process of
728 - * being removed when hasNext() was called.
729 - */
730 - private E nextItem;
731 -
732 - /**
733 - * Index of element returned by most recent call to next.
734 - * Reset to -1 if this element is deleted by a call to remove.
735 - */
736 - private int lastRet;
737 -
738 743 Itr() {
739 - lastRet = -1;
740 - if (count == 0)
741 - nextIndex = -1;
742 - else {
743 - nextIndex = takeIndex;
744 - nextItem = items[takeIndex];
744 + final ReentrantLock lock = ArrayBlockingQueue.this.lock;
745 + lock.lock();
746 + try {
747 + lastRet = -1;
748 + if ((remaining = count) > 0)
749 + nextItem = itemAt(nextIndex = takeIndex);
750 + } finally {
751 + lock.unlock();
745 752 }
746 753 }
747 754
748 755 public boolean hasNext() {
749 - /*
750 - * No sync. We can return true by mistake here
751 - * only if this iterator passed across threads,
752 - * which we don't support anyway.
753 - */
754 - return nextIndex >= 0;
756 + return remaining > 0;
755 757 }
756 758
757 - /**
758 - * Checks whether nextIndex is valid; if so setting nextItem.
759 - * Stops iterator when either hits putIndex or sees null item.
760 - */
761 - private void checkNext() {
762 - if (nextIndex == putIndex) {
763 - nextIndex = -1;
764 - nextItem = null;
765 - } else {
766 - nextItem = items[nextIndex];
767 - if (nextItem == null)
768 - nextIndex = -1;
769 - }
770 - }
771 -
772 759 public E next() {
773 760 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
774 761 lock.lock();
775 762 try {
776 - if (nextIndex < 0)
763 + if (remaining <= 0)
777 764 throw new NoSuchElementException();
778 765 lastRet = nextIndex;
779 - E x = nextItem;
780 - nextIndex = inc(nextIndex);
781 - checkNext();
766 + E x = itemAt(nextIndex); // check for fresher value
767 + if (x == null) {
768 + x = nextItem; // we are forced to report old value
769 + lastItem = null; // but ensure remove fails
770 + }
771 + else
772 + lastItem = x;
773 + while (--remaining > 0 && // skip over nulls
774 + (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
775 + ;
782 776 return x;
783 777 } finally {
784 778 lock.unlock();
785 779 }
786 780 }
787 781
788 782 public void remove() {
789 783 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
790 784 lock.lock();
791 785 try {
792 786 int i = lastRet;
793 787 if (i == -1)
794 788 throw new IllegalStateException();
795 789 lastRet = -1;
796 -
797 - int ti = takeIndex;
798 - removeAt(i);
799 - // back up cursor (reset to front if was first element)
800 - nextIndex = (i == ti) ? takeIndex : i;
801 - checkNext();
790 + E x = lastItem;
791 + lastItem = null;
792 + // only remove if item still at index
793 + if (x != null && x == items[i]) {
794 + boolean removingHead = (i == takeIndex);
795 + removeAt(i);
796 + if (!removingHead)
797 + nextIndex = dec(nextIndex);
798 + }
802 799 } finally {
803 800 lock.unlock();
804 801 }
805 802 }
806 803 }
804 +
807 805 }
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX