Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java
+++ new/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation. Oracle designates this
7 7 * particular file as subject to the "Classpath" exception as provided
8 8 * by Oracle in the LICENSE file that accompanied this code.
9 9 *
10 10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 13 * version 2 for more details (a copy is included in the LICENSE file that
14 14 * accompanied this code).
15 15 *
16 16 * You should have received a copy of the GNU General Public License version
17 17 * 2 along with this work; if not, write to the Free Software Foundation,
18 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 19 *
20 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 21 * or visit www.oracle.com if you need additional information or have any
22 22 * questions.
23 23 */
24 24
25 25 /*
26 26 * This file is available under and governed by the GNU General Public
27 27 * License version 2 only, as published by the Free Software Foundation.
28 28 * However, the following notice accompanied the original version of this
29 29 * file:
30 30 *
31 31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 32 * Expert Group and released to the public domain, as explained at
33 33 * http://creativecommons.org/licenses/publicdomain
34 34 */
35 35
36 36 package java.util.concurrent;
37 37
38 38 import java.util.concurrent.atomic.AtomicInteger;
39 39 import java.util.concurrent.locks.Condition;
40 40 import java.util.concurrent.locks.ReentrantLock;
41 41 import java.util.AbstractQueue;
42 42 import java.util.Collection;
43 43 import java.util.Iterator;
44 44 import java.util.NoSuchElementException;
45 45
46 46 /**
47 47 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
48 48 * linked nodes.
49 49 * This queue orders elements FIFO (first-in-first-out).
50 50 * The <em>head</em> of the queue is that element that has been on the
51 51 * queue the longest time.
52 52 * The <em>tail</em> of the queue is that element that has been on the
53 53 * queue the shortest time. New elements
54 54 * are inserted at the tail of the queue, and the queue retrieval
55 55 * operations obtain elements at the head of the queue.
56 56 * Linked queues typically have higher throughput than array-based queues but
57 57 * less predictable performance in most concurrent applications.
58 58 *
59 59 * <p> The optional capacity bound constructor argument serves as a
60 60 * way to prevent excessive queue expansion. The capacity, if unspecified,
61 61 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
62 62 * dynamically created upon each insertion unless this would bring the
63 63 * queue above capacity.
64 64 *
65 65 * <p>This class and its iterator implement all of the
66 66 * <em>optional</em> methods of the {@link Collection} and {@link
67 67 * Iterator} interfaces.
68 68 *
69 69 * <p>This class is a member of the
70 70 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
71 71 * Java Collections Framework</a>.
72 72 *
73 73 * @since 1.5
74 74 * @author Doug Lea
75 75 * @param <E> the type of elements held in this collection
76 76 *
77 77 */
78 78 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
79 79 implements BlockingQueue<E>, java.io.Serializable {
80 80 private static final long serialVersionUID = -6903933977591709194L;
81 81
82 82 /*
83 83 * A variant of the "two lock queue" algorithm. The putLock gates
84 84 * entry to put (and offer), and has an associated condition for
85 85 * waiting puts. Similarly for the takeLock. The "count" field
86 86 * that they both rely on is maintained as an atomic to avoid
87 87 * needing to get both locks in most cases. Also, to minimize need
88 88 * for puts to get takeLock and vice-versa, cascading notifies are
89 89 * used. When a put notices that it has enabled at least one take,
90 90 * it signals taker. That taker in turn signals others if more
91 91 * items have been entered since the signal. And symmetrically for
92 92 * takes signalling puts. Operations such as remove(Object) and
93 93 * iterators acquire both locks.
94 94 *
95 95 * Visibility between writers and readers is provided as follows:
96 96 *
97 97 * Whenever an element is enqueued, the putLock is acquired and
98 98 * count updated. A subsequent reader guarantees visibility to the
99 99 * enqueued Node by either acquiring the putLock (via fullyLock)
100 100 * or by acquiring the takeLock, and then reading n = count.get();
101 101 * this gives visibility to the first n items.
102 102 *
103 103 * To implement weakly consistent iterators, it appears we need to
104 104 * keep all Nodes GC-reachable from a predecessor dequeued Node.
105 105 * That would cause two problems:
106 106 * - allow a rogue Iterator to cause unbounded memory retention
107 107 * - cause cross-generational linking of old Nodes to new Nodes if
108 108 * a Node was tenured while live, which generational GCs have a
109 109 * hard time dealing with, causing repeated major collections.
110 110 * However, only non-deleted Nodes need to be reachable from
111 111 * dequeued Nodes, and reachability does not necessarily have to
112 112 * be of the kind understood by the GC. We use the trick of
113 113 * linking a Node that has just been dequeued to itself. Such a
114 114 * self-link implicitly means to advance to head.next.
115 115 */
116 116
117 117 /**
118 118 * Linked list node class
119 119 */
120 120 static class Node<E> {
121 121 E item;
122 122
123 123 /**
124 124 * One of:
125 125 * - the real successor Node
126 126 * - this Node, meaning the successor is head.next
127 127 * - null, meaning there is no successor (this is the last node)
128 128 */
129 129 Node<E> next;
130 130
131 131 Node(E x) { item = x; }
132 132 }
133 133
134 134 /** The capacity bound, or Integer.MAX_VALUE if none */
135 135 private final int capacity;
136 136
137 137 /** Current number of elements */
138 138 private final AtomicInteger count = new AtomicInteger(0);
139 139
140 140 /**
141 141 * Head of linked list.
142 142 * Invariant: head.item == null
143 143 */
144 144 private transient Node<E> head;
145 145
146 146 /**
147 147 * Tail of linked list.
148 148 * Invariant: last.next == null
149 149 */
150 150 private transient Node<E> last;
151 151
152 152 /** Lock held by take, poll, etc */
153 153 private final ReentrantLock takeLock = new ReentrantLock();
154 154
155 155 /** Wait queue for waiting takes */
156 156 private final Condition notEmpty = takeLock.newCondition();
157 157
158 158 /** Lock held by put, offer, etc */
159 159 private final ReentrantLock putLock = new ReentrantLock();
160 160
161 161 /** Wait queue for waiting puts */
162 162 private final Condition notFull = putLock.newCondition();
163 163
164 164 /**
165 165 * Signals a waiting take. Called only from put/offer (which do not
166 166 * otherwise ordinarily lock takeLock.)
167 167 */
168 168 private void signalNotEmpty() {
169 169 final ReentrantLock takeLock = this.takeLock;
170 170 takeLock.lock();
171 171 try {
172 172 notEmpty.signal();
173 173 } finally {
174 174 takeLock.unlock();
175 175 }
176 176 }
177 177
178 178 /**
179 179 * Signals a waiting put. Called only from take/poll.
180 180 */
181 181 private void signalNotFull() {
↓ open down ↓ |
181 lines elided |
↑ open up ↑ |
182 182 final ReentrantLock putLock = this.putLock;
183 183 putLock.lock();
184 184 try {
185 185 notFull.signal();
186 186 } finally {
187 187 putLock.unlock();
188 188 }
189 189 }
190 190
191 191 /**
192 - * Creates a node and links it at end of queue.
192 + * Links node at end of queue.
193 193 *
194 - * @param x the item
194 + * @param node the node
195 195 */
196 - private void enqueue(E x) {
196 + private void enqueue(Node<E> node) {
197 197 // assert putLock.isHeldByCurrentThread();
198 198 // assert last.next == null;
199 - last = last.next = new Node<E>(x);
199 + last = last.next = node;
200 200 }
201 201
202 202 /**
203 203 * Removes a node from head of queue.
204 204 *
205 205 * @return the node
206 206 */
207 207 private E dequeue() {
208 208 // assert takeLock.isHeldByCurrentThread();
209 209 // assert head.item == null;
210 210 Node<E> h = head;
211 211 Node<E> first = h.next;
212 212 h.next = h; // help GC
213 213 head = first;
214 214 E x = first.item;
215 215 first.item = null;
216 216 return x;
217 217 }
218 218
219 219 /**
220 220 * Lock to prevent both puts and takes.
221 221 */
222 222 void fullyLock() {
223 223 putLock.lock();
224 224 takeLock.lock();
225 225 }
226 226
227 227 /**
228 228 * Unlock to allow both puts and takes.
229 229 */
230 230 void fullyUnlock() {
231 231 takeLock.unlock();
232 232 putLock.unlock();
233 233 }
234 234
235 235 // /**
236 236 // * Tells whether both locks are held by current thread.
237 237 // */
238 238 // boolean isFullyLocked() {
239 239 // return (putLock.isHeldByCurrentThread() &&
240 240 // takeLock.isHeldByCurrentThread());
241 241 // }
242 242
243 243 /**
244 244 * Creates a {@code LinkedBlockingQueue} with a capacity of
245 245 * {@link Integer#MAX_VALUE}.
246 246 */
247 247 public LinkedBlockingQueue() {
248 248 this(Integer.MAX_VALUE);
249 249 }
250 250
251 251 /**
252 252 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
253 253 *
254 254 * @param capacity the capacity of this queue
255 255 * @throws IllegalArgumentException if {@code capacity} is not greater
256 256 * than zero
257 257 */
258 258 public LinkedBlockingQueue(int capacity) {
259 259 if (capacity <= 0) throw new IllegalArgumentException();
260 260 this.capacity = capacity;
261 261 last = head = new Node<E>(null);
262 262 }
263 263
264 264 /**
265 265 * Creates a {@code LinkedBlockingQueue} with a capacity of
266 266 * {@link Integer#MAX_VALUE}, initially containing the elements of the
267 267 * given collection,
268 268 * added in traversal order of the collection's iterator.
269 269 *
270 270 * @param c the collection of elements to initially contain
271 271 * @throws NullPointerException if the specified collection or any
272 272 * of its elements are null
273 273 */
274 274 public LinkedBlockingQueue(Collection<? extends E> c) {
↓ open down ↓ |
65 lines elided |
↑ open up ↑ |
275 275 this(Integer.MAX_VALUE);
276 276 final ReentrantLock putLock = this.putLock;
277 277 putLock.lock(); // Never contended, but necessary for visibility
278 278 try {
279 279 int n = 0;
280 280 for (E e : c) {
281 281 if (e == null)
282 282 throw new NullPointerException();
283 283 if (n == capacity)
284 284 throw new IllegalStateException("Queue full");
285 - enqueue(e);
285 + enqueue(new Node<E>(e));
286 286 ++n;
287 287 }
288 288 count.set(n);
289 289 } finally {
290 290 putLock.unlock();
291 291 }
292 292 }
293 293
294 294
295 295 // this doc comment is overridden to remove the reference to collections
296 296 // greater in size than Integer.MAX_VALUE
297 297 /**
298 298 * Returns the number of elements in this queue.
299 299 *
300 300 * @return the number of elements in this queue
301 301 */
302 302 public int size() {
303 303 return count.get();
304 304 }
305 305
306 306 // this doc comment is a modified copy of the inherited doc comment,
307 307 // without the reference to unlimited queues.
308 308 /**
309 309 * Returns the number of additional elements that this queue can ideally
310 310 * (in the absence of memory or resource constraints) accept without
311 311 * blocking. This is always equal to the initial capacity of this queue
312 312 * less the current {@code size} of this queue.
313 313 *
314 314 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
315 315 * an element will succeed by inspecting {@code remainingCapacity}
316 316 * because it may be the case that another thread is about to
317 317 * insert or remove an element.
318 318 */
319 319 public int remainingCapacity() {
320 320 return capacity - count.get();
321 321 }
322 322
323 323 /**
324 324 * Inserts the specified element at the tail of this queue, waiting if
↓ open down ↓ |
29 lines elided |
↑ open up ↑ |
325 325 * necessary for space to become available.
326 326 *
327 327 * @throws InterruptedException {@inheritDoc}
328 328 * @throws NullPointerException {@inheritDoc}
329 329 */
330 330 public void put(E e) throws InterruptedException {
331 331 if (e == null) throw new NullPointerException();
332 332 // Note: convention in all put/take/etc is to preset local var
333 333 // holding count negative to indicate failure unless set.
334 334 int c = -1;
335 + Node<E> node = new Node(e);
335 336 final ReentrantLock putLock = this.putLock;
336 337 final AtomicInteger count = this.count;
337 338 putLock.lockInterruptibly();
338 339 try {
339 340 /*
340 341 * Note that count is used in wait guard even though it is
341 342 * not protected by lock. This works because count can
342 343 * only decrease at this point (all other puts are shut
343 344 * out by lock), and we (or some other waiting put) are
344 345 * signalled if it ever changes from capacity. Similarly
345 346 * for all other uses of count in other wait guards.
346 347 */
347 348 while (count.get() == capacity) {
348 349 notFull.await();
349 350 }
350 - enqueue(e);
351 + enqueue(node);
351 352 c = count.getAndIncrement();
352 353 if (c + 1 < capacity)
353 354 notFull.signal();
354 355 } finally {
355 356 putLock.unlock();
356 357 }
357 358 if (c == 0)
358 359 signalNotEmpty();
359 360 }
360 361
361 362 /**
362 363 * Inserts the specified element at the tail of this queue, waiting if
363 364 * necessary up to the specified wait time for space to become available.
364 365 *
365 366 * @return {@code true} if successful, or {@code false} if
366 367 * the specified waiting time elapses before space is available.
367 368 * @throws InterruptedException {@inheritDoc}
368 369 * @throws NullPointerException {@inheritDoc}
369 370 */
370 371 public boolean offer(E e, long timeout, TimeUnit unit)
371 372 throws InterruptedException {
372 373
373 374 if (e == null) throw new NullPointerException();
374 375 long nanos = unit.toNanos(timeout);
↓ open down ↓ |
14 lines elided |
↑ open up ↑ |
375 376 int c = -1;
376 377 final ReentrantLock putLock = this.putLock;
377 378 final AtomicInteger count = this.count;
378 379 putLock.lockInterruptibly();
379 380 try {
380 381 while (count.get() == capacity) {
381 382 if (nanos <= 0)
382 383 return false;
383 384 nanos = notFull.awaitNanos(nanos);
384 385 }
385 - enqueue(e);
386 + enqueue(new Node<E>(e));
386 387 c = count.getAndIncrement();
387 388 if (c + 1 < capacity)
388 389 notFull.signal();
389 390 } finally {
390 391 putLock.unlock();
391 392 }
392 393 if (c == 0)
393 394 signalNotEmpty();
394 395 return true;
395 396 }
396 397
397 398 /**
398 399 * Inserts the specified element at the tail of this queue if it is
399 400 * possible to do so immediately without exceeding the queue's capacity,
400 401 * returning {@code true} upon success and {@code false} if this queue
401 402 * is full.
402 403 * When using a capacity-restricted queue, this method is generally
403 404 * preferable to method {@link BlockingQueue#add add}, which can fail to
↓ open down ↓ |
8 lines elided |
↑ open up ↑ |
404 405 * insert an element only by throwing an exception.
405 406 *
406 407 * @throws NullPointerException if the specified element is null
407 408 */
408 409 public boolean offer(E e) {
409 410 if (e == null) throw new NullPointerException();
410 411 final AtomicInteger count = this.count;
411 412 if (count.get() == capacity)
412 413 return false;
413 414 int c = -1;
415 + Node<E> node = new Node(e);
414 416 final ReentrantLock putLock = this.putLock;
415 417 putLock.lock();
416 418 try {
417 419 if (count.get() < capacity) {
418 - enqueue(e);
420 + enqueue(node);
419 421 c = count.getAndIncrement();
420 422 if (c + 1 < capacity)
421 423 notFull.signal();
422 424 }
423 425 } finally {
424 426 putLock.unlock();
425 427 }
426 428 if (c == 0)
427 429 signalNotEmpty();
428 430 return c >= 0;
429 431 }
430 432
431 433
432 434 public E take() throws InterruptedException {
433 435 E x;
434 436 int c = -1;
435 437 final AtomicInteger count = this.count;
436 438 final ReentrantLock takeLock = this.takeLock;
437 439 takeLock.lockInterruptibly();
438 440 try {
439 441 while (count.get() == 0) {
440 442 notEmpty.await();
441 443 }
442 444 x = dequeue();
443 445 c = count.getAndDecrement();
444 446 if (c > 1)
445 447 notEmpty.signal();
446 448 } finally {
447 449 takeLock.unlock();
448 450 }
449 451 if (c == capacity)
450 452 signalNotFull();
451 453 return x;
452 454 }
453 455
454 456 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
455 457 E x = null;
456 458 int c = -1;
457 459 long nanos = unit.toNanos(timeout);
458 460 final AtomicInteger count = this.count;
459 461 final ReentrantLock takeLock = this.takeLock;
460 462 takeLock.lockInterruptibly();
461 463 try {
462 464 while (count.get() == 0) {
463 465 if (nanos <= 0)
464 466 return null;
465 467 nanos = notEmpty.awaitNanos(nanos);
466 468 }
467 469 x = dequeue();
468 470 c = count.getAndDecrement();
469 471 if (c > 1)
470 472 notEmpty.signal();
471 473 } finally {
472 474 takeLock.unlock();
473 475 }
474 476 if (c == capacity)
475 477 signalNotFull();
476 478 return x;
477 479 }
478 480
479 481 public E poll() {
480 482 final AtomicInteger count = this.count;
481 483 if (count.get() == 0)
482 484 return null;
483 485 E x = null;
484 486 int c = -1;
485 487 final ReentrantLock takeLock = this.takeLock;
486 488 takeLock.lock();
487 489 try {
488 490 if (count.get() > 0) {
489 491 x = dequeue();
490 492 c = count.getAndDecrement();
491 493 if (c > 1)
492 494 notEmpty.signal();
493 495 }
494 496 } finally {
495 497 takeLock.unlock();
496 498 }
497 499 if (c == capacity)
498 500 signalNotFull();
499 501 return x;
500 502 }
501 503
502 504 public E peek() {
503 505 if (count.get() == 0)
504 506 return null;
505 507 final ReentrantLock takeLock = this.takeLock;
506 508 takeLock.lock();
507 509 try {
508 510 Node<E> first = head.next;
509 511 if (first == null)
510 512 return null;
511 513 else
512 514 return first.item;
513 515 } finally {
514 516 takeLock.unlock();
515 517 }
516 518 }
517 519
518 520 /**
519 521 * Unlinks interior Node p with predecessor trail.
520 522 */
521 523 void unlink(Node<E> p, Node<E> trail) {
522 524 // assert isFullyLocked();
523 525 // p.next is not changed, to allow iterators that are
524 526 // traversing p to maintain their weak-consistency guarantee.
525 527 p.item = null;
526 528 trail.next = p.next;
527 529 if (last == p)
528 530 last = trail;
529 531 if (count.getAndDecrement() == capacity)
530 532 notFull.signal();
531 533 }
532 534
533 535 /**
534 536 * Removes a single instance of the specified element from this queue,
535 537 * if it is present. More formally, removes an element {@code e} such
536 538 * that {@code o.equals(e)}, if this queue contains one or more such
537 539 * elements.
538 540 * Returns {@code true} if this queue contained the specified element
539 541 * (or equivalently, if this queue changed as a result of the call).
540 542 *
541 543 * @param o element to be removed from this queue, if present
542 544 * @return {@code true} if this queue changed as a result of the call
543 545 */
544 546 public boolean remove(Object o) {
545 547 if (o == null) return false;
546 548 fullyLock();
547 549 try {
548 550 for (Node<E> trail = head, p = trail.next;
549 551 p != null;
550 552 trail = p, p = p.next) {
551 553 if (o.equals(p.item)) {
552 554 unlink(p, trail);
↓ open down ↓ |
124 lines elided |
↑ open up ↑ |
553 555 return true;
554 556 }
555 557 }
556 558 return false;
557 559 } finally {
558 560 fullyUnlock();
559 561 }
560 562 }
561 563
562 564 /**
565 + * Returns {@code true} if this queue contains the specified element.
566 + * More formally, returns {@code true} if and only if this queue contains
567 + * at least one element {@code e} such that {@code o.equals(e)}.
568 + *
569 + * @param o object to be checked for containment in this queue
570 + * @return {@code true} if this queue contains the specified element
571 + */
572 + public boolean contains(Object o) {
573 + if (o == null) return false;
574 + fullyLock();
575 + try {
576 + for (Node<E> p = head.next; p != null; p = p.next)
577 + if (o.equals(p.item))
578 + return true;
579 + return false;
580 + } finally {
581 + fullyUnlock();
582 + }
583 + }
584 +
585 + /**
563 586 * Returns an array containing all of the elements in this queue, in
564 587 * proper sequence.
565 588 *
566 589 * <p>The returned array will be "safe" in that no references to it are
567 590 * maintained by this queue. (In other words, this method must allocate
568 591 * a new array). The caller is thus free to modify the returned array.
569 592 *
570 593 * <p>This method acts as bridge between array-based and collection-based
571 594 * APIs.
572 595 *
573 596 * @return an array containing all of the elements in this queue
574 597 */
575 598 public Object[] toArray() {
576 599 fullyLock();
577 600 try {
578 601 int size = count.get();
579 602 Object[] a = new Object[size];
580 603 int k = 0;
581 604 for (Node<E> p = head.next; p != null; p = p.next)
582 605 a[k++] = p.item;
583 606 return a;
584 607 } finally {
585 608 fullyUnlock();
586 609 }
587 610 }
588 611
589 612 /**
590 613 * Returns an array containing all of the elements in this queue, in
591 614 * proper sequence; the runtime type of the returned array is that of
592 615 * the specified array. If the queue fits in the specified array, it
593 616 * is returned therein. Otherwise, a new array is allocated with the
594 617 * runtime type of the specified array and the size of this queue.
595 618 *
596 619 * <p>If this queue fits in the specified array with room to spare
597 620 * (i.e., the array has more elements than this queue), the element in
598 621 * the array immediately following the end of the queue is set to
599 622 * {@code null}.
600 623 *
601 624 * <p>Like the {@link #toArray()} method, this method acts as bridge between
602 625 * array-based and collection-based APIs. Further, this method allows
603 626 * precise control over the runtime type of the output array, and may,
604 627 * under certain circumstances, be used to save allocation costs.
605 628 *
606 629 * <p>Suppose {@code x} is a queue known to contain only strings.
607 630 * The following code can be used to dump the queue into a newly
608 631 * allocated array of {@code String}:
609 632 *
610 633 * <pre>
611 634 * String[] y = x.toArray(new String[0]);</pre>
612 635 *
613 636 * Note that {@code toArray(new Object[0])} is identical in function to
614 637 * {@code toArray()}.
615 638 *
616 639 * @param a the array into which the elements of the queue are to
617 640 * be stored, if it is big enough; otherwise, a new array of the
618 641 * same runtime type is allocated for this purpose
619 642 * @return an array containing all of the elements in this queue
620 643 * @throws ArrayStoreException if the runtime type of the specified array
621 644 * is not a supertype of the runtime type of every element in
622 645 * this queue
623 646 * @throws NullPointerException if the specified array is null
624 647 */
625 648 @SuppressWarnings("unchecked")
626 649 public <T> T[] toArray(T[] a) {
627 650 fullyLock();
628 651 try {
629 652 int size = count.get();
630 653 if (a.length < size)
631 654 a = (T[])java.lang.reflect.Array.newInstance
632 655 (a.getClass().getComponentType(), size);
633 656
634 657 int k = 0;
635 658 for (Node<E> p = head.next; p != null; p = p.next)
636 659 a[k++] = (T)p.item;
637 660 if (a.length > k)
↓ open down ↓ |
65 lines elided |
↑ open up ↑ |
638 661 a[k] = null;
639 662 return a;
640 663 } finally {
641 664 fullyUnlock();
642 665 }
643 666 }
644 667
645 668 public String toString() {
646 669 fullyLock();
647 670 try {
648 - return super.toString();
671 + Node<E> p = head.next;
672 + if (p == null)
673 + return "[]";
674 +
675 + StringBuilder sb = new StringBuilder();
676 + sb.append('[');
677 + for (;;) {
678 + E e = p.item;
679 + sb.append(e == this ? "(this Collection)" : e);
680 + p = p.next;
681 + if (p == null)
682 + return sb.append(']').toString();
683 + sb.append(',').append(' ');
684 + }
649 685 } finally {
650 686 fullyUnlock();
651 687 }
652 688 }
653 689
654 690 /**
655 691 * Atomically removes all of the elements from this queue.
656 692 * The queue will be empty after this call returns.
657 693 */
658 694 public void clear() {
659 695 fullyLock();
660 696 try {
661 697 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
662 698 h.next = h;
663 699 p.item = null;
664 700 }
665 701 head = last;
666 702 // assert head.item == null && head.next == null;
667 703 if (count.getAndSet(0) == capacity)
668 704 notFull.signal();
669 705 } finally {
670 706 fullyUnlock();
671 707 }
672 708 }
673 709
674 710 /**
675 711 * @throws UnsupportedOperationException {@inheritDoc}
676 712 * @throws ClassCastException {@inheritDoc}
677 713 * @throws NullPointerException {@inheritDoc}
678 714 * @throws IllegalArgumentException {@inheritDoc}
679 715 */
680 716 public int drainTo(Collection<? super E> c) {
681 717 return drainTo(c, Integer.MAX_VALUE);
682 718 }
683 719
684 720 /**
685 721 * @throws UnsupportedOperationException {@inheritDoc}
686 722 * @throws ClassCastException {@inheritDoc}
687 723 * @throws NullPointerException {@inheritDoc}
688 724 * @throws IllegalArgumentException {@inheritDoc}
689 725 */
690 726 public int drainTo(Collection<? super E> c, int maxElements) {
691 727 if (c == null)
692 728 throw new NullPointerException();
693 729 if (c == this)
694 730 throw new IllegalArgumentException();
695 731 boolean signalNotFull = false;
696 732 final ReentrantLock takeLock = this.takeLock;
697 733 takeLock.lock();
698 734 try {
699 735 int n = Math.min(maxElements, count.get());
700 736 // count.get provides visibility to first n Nodes
701 737 Node<E> h = head;
702 738 int i = 0;
703 739 try {
704 740 while (i < n) {
705 741 Node<E> p = h.next;
706 742 c.add(p.item);
707 743 p.item = null;
708 744 h.next = h;
709 745 h = p;
710 746 ++i;
711 747 }
712 748 return n;
713 749 } finally {
714 750 // Restore invariants even if c.add() threw
715 751 if (i > 0) {
716 752 // assert h.item == null;
717 753 head = h;
718 754 signalNotFull = (count.getAndAdd(-i) == capacity);
719 755 }
↓ open down ↓ |
61 lines elided |
↑ open up ↑ |
720 756 }
721 757 } finally {
722 758 takeLock.unlock();
723 759 if (signalNotFull)
724 760 signalNotFull();
725 761 }
726 762 }
727 763
728 764 /**
729 765 * Returns an iterator over the elements in this queue in proper sequence.
730 - * The returned {@code Iterator} is a "weakly consistent" iterator that
766 + * The elements will be returned in order from first (head) to last (tail).
767 + *
768 + * <p>The returned iterator is a "weakly consistent" iterator that
731 769 * will never throw {@link java.util.ConcurrentModificationException
732 - * ConcurrentModificationException},
733 - * and guarantees to traverse elements as they existed upon
734 - * construction of the iterator, and may (but is not guaranteed to)
735 - * reflect any modifications subsequent to construction.
770 + * ConcurrentModificationException}, and guarantees to traverse
771 + * elements as they existed upon construction of the iterator, and
772 + * may (but is not guaranteed to) reflect any modifications
773 + * subsequent to construction.
736 774 *
737 775 * @return an iterator over the elements in this queue in proper sequence
738 776 */
739 777 public Iterator<E> iterator() {
740 778 return new Itr();
741 779 }
742 780
743 781 private class Itr implements Iterator<E> {
744 782 /*
745 783 * Basic weakly-consistent iterator. At all times hold the next
746 784 * item to hand out so that if hasNext() reports true, we will
747 785 * still have it to return even if lost race with a take etc.
748 786 */
749 787 private Node<E> current;
750 788 private Node<E> lastRet;
751 789 private E currentElement;
752 790
753 791 Itr() {
754 792 fullyLock();
755 793 try {
756 794 current = head.next;
757 795 if (current != null)
758 796 currentElement = current.item;
759 797 } finally {
760 798 fullyUnlock();
761 799 }
762 800 }
763 801
764 802 public boolean hasNext() {
765 803 return current != null;
766 804 }
767 805
768 806 /**
769 807 * Returns the next live successor of p, or null if no such.
770 808 *
771 809 * Unlike other traversal methods, iterators need to handle both:
772 810 * - dequeued nodes (p.next == p)
773 811 * - (possibly multiple) interior removed nodes (p.item == null)
774 812 */
775 813 private Node<E> nextNode(Node<E> p) {
776 814 for (;;) {
777 815 Node<E> s = p.next;
778 816 if (s == p)
779 817 return head.next;
780 818 if (s == null || s.item != null)
781 819 return s;
782 820 p = s;
783 821 }
784 822 }
785 823
786 824 public E next() {
787 825 fullyLock();
788 826 try {
789 827 if (current == null)
790 828 throw new NoSuchElementException();
791 829 E x = currentElement;
792 830 lastRet = current;
793 831 current = nextNode(current);
794 832 currentElement = (current == null) ? null : current.item;
795 833 return x;
796 834 } finally {
797 835 fullyUnlock();
798 836 }
799 837 }
800 838
801 839 public void remove() {
802 840 if (lastRet == null)
803 841 throw new IllegalStateException();
804 842 fullyLock();
805 843 try {
806 844 Node<E> node = lastRet;
807 845 lastRet = null;
808 846 for (Node<E> trail = head, p = trail.next;
809 847 p != null;
810 848 trail = p, p = p.next) {
811 849 if (p == node) {
812 850 unlink(p, trail);
813 851 break;
814 852 }
815 853 }
816 854 } finally {
817 855 fullyUnlock();
818 856 }
819 857 }
820 858 }
821 859
822 860 /**
823 861 * Save the state to a stream (that is, serialize it).
824 862 *
825 863 * @serialData The capacity is emitted (int), followed by all of
826 864 * its elements (each an {@code Object}) in the proper order,
827 865 * followed by a null
828 866 * @param s the stream
829 867 */
830 868 private void writeObject(java.io.ObjectOutputStream s)
831 869 throws java.io.IOException {
832 870
833 871 fullyLock();
834 872 try {
835 873 // Write out any hidden stuff, plus capacity
836 874 s.defaultWriteObject();
837 875
838 876 // Write out all elements in the proper order.
839 877 for (Node<E> p = head.next; p != null; p = p.next)
840 878 s.writeObject(p.item);
841 879
842 880 // Use trailing null as sentinel
843 881 s.writeObject(null);
844 882 } finally {
845 883 fullyUnlock();
846 884 }
847 885 }
848 886
849 887 /**
850 888 * Reconstitute this queue instance from a stream (that is,
851 889 * deserialize it).
852 890 *
853 891 * @param s the stream
854 892 */
855 893 private void readObject(java.io.ObjectInputStream s)
856 894 throws java.io.IOException, ClassNotFoundException {
857 895 // Read in capacity, and any hidden stuff
858 896 s.defaultReadObject();
859 897
860 898 count.set(0);
861 899 last = head = new Node<E>(null);
862 900
863 901 // Read in all elements and place in queue
864 902 for (;;) {
865 903 @SuppressWarnings("unchecked")
866 904 E item = (E)s.readObject();
867 905 if (item == null)
868 906 break;
869 907 add(item);
870 908 }
871 909 }
872 910 }
↓ open down ↓ |
127 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX