172 notEmpty.signal();
173 } finally {
174 takeLock.unlock();
175 }
176 }
177
178 /**
179 * Signals a waiting put. Called only from take/poll.
180 */
181 private void signalNotFull() {
182 final ReentrantLock putLock = this.putLock;
183 putLock.lock();
184 try {
185 notFull.signal();
186 } finally {
187 putLock.unlock();
188 }
189 }
190
191 /**
192 * Creates a node and links it at end of queue.
193 *
194 * @param x the item
195 */
196 private void enqueue(E x) {
197 // assert putLock.isHeldByCurrentThread();
198 // assert last.next == null;
199 last = last.next = new Node<E>(x);
200 }
201
202 /**
203 * Removes a node from head of queue.
204 *
205 * @return the node
206 */
207 private E dequeue() {
208 // assert takeLock.isHeldByCurrentThread();
209 // assert head.item == null;
210 Node<E> h = head;
211 Node<E> first = h.next;
212 h.next = h; // help GC
213 head = first;
214 E x = first.item;
215 first.item = null;
216 return x;
217 }
218
219 /**
265 * Creates a {@code LinkedBlockingQueue} with a capacity of
266 * {@link Integer#MAX_VALUE}, initially containing the elements of the
267 * given collection,
268 * added in traversal order of the collection's iterator.
269 *
270 * @param c the collection of elements to initially contain
271 * @throws NullPointerException if the specified collection or any
272 * of its elements are null
273 */
274 public LinkedBlockingQueue(Collection<? extends E> c) {
275 this(Integer.MAX_VALUE);
276 final ReentrantLock putLock = this.putLock;
277 putLock.lock(); // Never contended, but necessary for visibility
278 try {
279 int n = 0;
280 for (E e : c) {
281 if (e == null)
282 throw new NullPointerException();
283 if (n == capacity)
284 throw new IllegalStateException("Queue full");
285 enqueue(e);
286 ++n;
287 }
288 count.set(n);
289 } finally {
290 putLock.unlock();
291 }
292 }
293
294
295 // this doc comment is overridden to remove the reference to collections
296 // greater in size than Integer.MAX_VALUE
297 /**
298 * Returns the number of elements in this queue.
299 *
300 * @return the number of elements in this queue
301 */
302 public int size() {
303 return count.get();
304 }
305
315 * an element will succeed by inspecting {@code remainingCapacity}
316 * because it may be the case that another thread is about to
317 * insert or remove an element.
318 */
319 public int remainingCapacity() {
320 return capacity - count.get();
321 }
322
323 /**
324 * Inserts the specified element at the tail of this queue, waiting if
325 * necessary for space to become available.
326 *
327 * @throws InterruptedException {@inheritDoc}
328 * @throws NullPointerException {@inheritDoc}
329 */
330 public void put(E e) throws InterruptedException {
331 if (e == null) throw new NullPointerException();
332 // Note: convention in all put/take/etc is to preset local var
333 // holding count negative to indicate failure unless set.
334 int c = -1;
335 final ReentrantLock putLock = this.putLock;
336 final AtomicInteger count = this.count;
337 putLock.lockInterruptibly();
338 try {
339 /*
340 * Note that count is used in wait guard even though it is
341 * not protected by lock. This works because count can
342 * only decrease at this point (all other puts are shut
343 * out by lock), and we (or some other waiting put) are
344 * signalled if it ever changes from capacity. Similarly
345 * for all other uses of count in other wait guards.
346 */
347 while (count.get() == capacity) {
348 notFull.await();
349 }
350 enqueue(e);
351 c = count.getAndIncrement();
352 if (c + 1 < capacity)
353 notFull.signal();
354 } finally {
355 putLock.unlock();
356 }
357 if (c == 0)
358 signalNotEmpty();
359 }
360
361 /**
362 * Inserts the specified element at the tail of this queue, waiting if
363 * necessary up to the specified wait time for space to become available.
364 *
365 * @return {@code true} if successful, or {@code false} if
366 * the specified waiting time elapses before space is available.
367 * @throws InterruptedException {@inheritDoc}
368 * @throws NullPointerException {@inheritDoc}
369 */
370 public boolean offer(E e, long timeout, TimeUnit unit)
371 throws InterruptedException {
372
373 if (e == null) throw new NullPointerException();
374 long nanos = unit.toNanos(timeout);
375 int c = -1;
376 final ReentrantLock putLock = this.putLock;
377 final AtomicInteger count = this.count;
378 putLock.lockInterruptibly();
379 try {
380 while (count.get() == capacity) {
381 if (nanos <= 0)
382 return false;
383 nanos = notFull.awaitNanos(nanos);
384 }
385 enqueue(e);
386 c = count.getAndIncrement();
387 if (c + 1 < capacity)
388 notFull.signal();
389 } finally {
390 putLock.unlock();
391 }
392 if (c == 0)
393 signalNotEmpty();
394 return true;
395 }
396
397 /**
398 * Inserts the specified element at the tail of this queue if it is
399 * possible to do so immediately without exceeding the queue's capacity,
400 * returning {@code true} upon success and {@code false} if this queue
401 * is full.
402 * When using a capacity-restricted queue, this method is generally
403 * preferable to method {@link BlockingQueue#add add}, which can fail to
404 * insert an element only by throwing an exception.
405 *
406 * @throws NullPointerException if the specified element is null
407 */
408 public boolean offer(E e) {
409 if (e == null) throw new NullPointerException();
410 final AtomicInteger count = this.count;
411 if (count.get() == capacity)
412 return false;
413 int c = -1;
414 final ReentrantLock putLock = this.putLock;
415 putLock.lock();
416 try {
417 if (count.get() < capacity) {
418 enqueue(e);
419 c = count.getAndIncrement();
420 if (c + 1 < capacity)
421 notFull.signal();
422 }
423 } finally {
424 putLock.unlock();
425 }
426 if (c == 0)
427 signalNotEmpty();
428 return c >= 0;
429 }
430
431
432 public E take() throws InterruptedException {
433 E x;
434 int c = -1;
435 final AtomicInteger count = this.count;
436 final ReentrantLock takeLock = this.takeLock;
437 takeLock.lockInterruptibly();
438 try {
543 */
544 public boolean remove(Object o) {
545 if (o == null) return false;
546 fullyLock();
547 try {
548 for (Node<E> trail = head, p = trail.next;
549 p != null;
550 trail = p, p = p.next) {
551 if (o.equals(p.item)) {
552 unlink(p, trail);
553 return true;
554 }
555 }
556 return false;
557 } finally {
558 fullyUnlock();
559 }
560 }
561
562 /**
563 * Returns an array containing all of the elements in this queue, in
564 * proper sequence.
565 *
566 * <p>The returned array will be "safe" in that no references to it are
567 * maintained by this queue. (In other words, this method must allocate
568 * a new array). The caller is thus free to modify the returned array.
569 *
570 * <p>This method acts as bridge between array-based and collection-based
571 * APIs.
572 *
573 * @return an array containing all of the elements in this queue
574 */
575 public Object[] toArray() {
576 fullyLock();
577 try {
578 int size = count.get();
579 Object[] a = new Object[size];
580 int k = 0;
581 for (Node<E> p = head.next; p != null; p = p.next)
582 a[k++] = p.item;
628 try {
629 int size = count.get();
630 if (a.length < size)
631 a = (T[])java.lang.reflect.Array.newInstance
632 (a.getClass().getComponentType(), size);
633
634 int k = 0;
635 for (Node<E> p = head.next; p != null; p = p.next)
636 a[k++] = (T)p.item;
637 if (a.length > k)
638 a[k] = null;
639 return a;
640 } finally {
641 fullyUnlock();
642 }
643 }
644
645 public String toString() {
646 fullyLock();
647 try {
648 return super.toString();
649 } finally {
650 fullyUnlock();
651 }
652 }
653
654 /**
655 * Atomically removes all of the elements from this queue.
656 * The queue will be empty after this call returns.
657 */
658 public void clear() {
659 fullyLock();
660 try {
661 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
662 h.next = h;
663 p.item = null;
664 }
665 head = last;
666 // assert head.item == null && head.next == null;
667 if (count.getAndSet(0) == capacity)
668 notFull.signal();
710 ++i;
711 }
712 return n;
713 } finally {
714 // Restore invariants even if c.add() threw
715 if (i > 0) {
716 // assert h.item == null;
717 head = h;
718 signalNotFull = (count.getAndAdd(-i) == capacity);
719 }
720 }
721 } finally {
722 takeLock.unlock();
723 if (signalNotFull)
724 signalNotFull();
725 }
726 }
727
728 /**
729 * Returns an iterator over the elements in this queue in proper sequence.
730 * The returned {@code Iterator} is a "weakly consistent" iterator that
731 * will never throw {@link java.util.ConcurrentModificationException
732 * ConcurrentModificationException},
733 * and guarantees to traverse elements as they existed upon
734 * construction of the iterator, and may (but is not guaranteed to)
735 * reflect any modifications subsequent to construction.
736 *
737 * @return an iterator over the elements in this queue in proper sequence
738 */
739 public Iterator<E> iterator() {
740 return new Itr();
741 }
742
743 private class Itr implements Iterator<E> {
744 /*
745 * Basic weakly-consistent iterator. At all times hold the next
746 * item to hand out so that if hasNext() reports true, we will
747 * still have it to return even if lost race with a take etc.
748 */
749 private Node<E> current;
750 private Node<E> lastRet;
751 private E currentElement;
752
753 Itr() {
754 fullyLock();
755 try {
|
172 notEmpty.signal();
173 } finally {
174 takeLock.unlock();
175 }
176 }
177
178 /**
179 * Signals a waiting put. Called only from take/poll.
180 */
181 private void signalNotFull() {
182 final ReentrantLock putLock = this.putLock;
183 putLock.lock();
184 try {
185 notFull.signal();
186 } finally {
187 putLock.unlock();
188 }
189 }
190
191 /**
192 * Links node at end of queue.
193 *
194 * @param node the node
195 */
196 private void enqueue(Node<E> node) {
197 // assert putLock.isHeldByCurrentThread();
198 // assert last.next == null;
199 last = last.next = node;
200 }
201
202 /**
203 * Removes a node from head of queue.
204 *
205 * @return the node
206 */
207 private E dequeue() {
208 // assert takeLock.isHeldByCurrentThread();
209 // assert head.item == null;
210 Node<E> h = head;
211 Node<E> first = h.next;
212 h.next = h; // help GC
213 head = first;
214 E x = first.item;
215 first.item = null;
216 return x;
217 }
218
219 /**
265 * Creates a {@code LinkedBlockingQueue} with a capacity of
266 * {@link Integer#MAX_VALUE}, initially containing the elements of the
267 * given collection,
268 * added in traversal order of the collection's iterator.
269 *
270 * @param c the collection of elements to initially contain
271 * @throws NullPointerException if the specified collection or any
272 * of its elements are null
273 */
274 public LinkedBlockingQueue(Collection<? extends E> c) {
275 this(Integer.MAX_VALUE);
276 final ReentrantLock putLock = this.putLock;
277 putLock.lock(); // Never contended, but necessary for visibility
278 try {
279 int n = 0;
280 for (E e : c) {
281 if (e == null)
282 throw new NullPointerException();
283 if (n == capacity)
284 throw new IllegalStateException("Queue full");
285 enqueue(new Node<E>(e));
286 ++n;
287 }
288 count.set(n);
289 } finally {
290 putLock.unlock();
291 }
292 }
293
294
295 // this doc comment is overridden to remove the reference to collections
296 // greater in size than Integer.MAX_VALUE
297 /**
298 * Returns the number of elements in this queue.
299 *
300 * @return the number of elements in this queue
301 */
302 public int size() {
303 return count.get();
304 }
305
315 * an element will succeed by inspecting {@code remainingCapacity}
316 * because it may be the case that another thread is about to
317 * insert or remove an element.
318 */
319 public int remainingCapacity() {
320 return capacity - count.get();
321 }
322
323 /**
324 * Inserts the specified element at the tail of this queue, waiting if
325 * necessary for space to become available.
326 *
327 * @throws InterruptedException {@inheritDoc}
328 * @throws NullPointerException {@inheritDoc}
329 */
330 public void put(E e) throws InterruptedException {
331 if (e == null) throw new NullPointerException();
332 // Note: convention in all put/take/etc is to preset local var
333 // holding count negative to indicate failure unless set.
334 int c = -1;
335 Node<E> node = new Node(e);
336 final ReentrantLock putLock = this.putLock;
337 final AtomicInteger count = this.count;
338 putLock.lockInterruptibly();
339 try {
340 /*
341 * Note that count is used in wait guard even though it is
342 * not protected by lock. This works because count can
343 * only decrease at this point (all other puts are shut
344 * out by lock), and we (or some other waiting put) are
345 * signalled if it ever changes from capacity. Similarly
346 * for all other uses of count in other wait guards.
347 */
348 while (count.get() == capacity) {
349 notFull.await();
350 }
351 enqueue(node);
352 c = count.getAndIncrement();
353 if (c + 1 < capacity)
354 notFull.signal();
355 } finally {
356 putLock.unlock();
357 }
358 if (c == 0)
359 signalNotEmpty();
360 }
361
362 /**
363 * Inserts the specified element at the tail of this queue, waiting if
364 * necessary up to the specified wait time for space to become available.
365 *
366 * @return {@code true} if successful, or {@code false} if
367 * the specified waiting time elapses before space is available.
368 * @throws InterruptedException {@inheritDoc}
369 * @throws NullPointerException {@inheritDoc}
370 */
371 public boolean offer(E e, long timeout, TimeUnit unit)
372 throws InterruptedException {
373
374 if (e == null) throw new NullPointerException();
375 long nanos = unit.toNanos(timeout);
376 int c = -1;
377 final ReentrantLock putLock = this.putLock;
378 final AtomicInteger count = this.count;
379 putLock.lockInterruptibly();
380 try {
381 while (count.get() == capacity) {
382 if (nanos <= 0)
383 return false;
384 nanos = notFull.awaitNanos(nanos);
385 }
386 enqueue(new Node<E>(e));
387 c = count.getAndIncrement();
388 if (c + 1 < capacity)
389 notFull.signal();
390 } finally {
391 putLock.unlock();
392 }
393 if (c == 0)
394 signalNotEmpty();
395 return true;
396 }
397
398 /**
399 * Inserts the specified element at the tail of this queue if it is
400 * possible to do so immediately without exceeding the queue's capacity,
401 * returning {@code true} upon success and {@code false} if this queue
402 * is full.
403 * When using a capacity-restricted queue, this method is generally
404 * preferable to method {@link BlockingQueue#add add}, which can fail to
405 * insert an element only by throwing an exception.
406 *
407 * @throws NullPointerException if the specified element is null
408 */
409 public boolean offer(E e) {
410 if (e == null) throw new NullPointerException();
411 final AtomicInteger count = this.count;
412 if (count.get() == capacity)
413 return false;
414 int c = -1;
415 Node<E> node = new Node(e);
416 final ReentrantLock putLock = this.putLock;
417 putLock.lock();
418 try {
419 if (count.get() < capacity) {
420 enqueue(node);
421 c = count.getAndIncrement();
422 if (c + 1 < capacity)
423 notFull.signal();
424 }
425 } finally {
426 putLock.unlock();
427 }
428 if (c == 0)
429 signalNotEmpty();
430 return c >= 0;
431 }
432
433
434 public E take() throws InterruptedException {
435 E x;
436 int c = -1;
437 final AtomicInteger count = this.count;
438 final ReentrantLock takeLock = this.takeLock;
439 takeLock.lockInterruptibly();
440 try {
545 */
546 public boolean remove(Object o) {
547 if (o == null) return false;
548 fullyLock();
549 try {
550 for (Node<E> trail = head, p = trail.next;
551 p != null;
552 trail = p, p = p.next) {
553 if (o.equals(p.item)) {
554 unlink(p, trail);
555 return true;
556 }
557 }
558 return false;
559 } finally {
560 fullyUnlock();
561 }
562 }
563
564 /**
565 * Returns {@code true} if this queue contains the specified element.
566 * More formally, returns {@code true} if and only if this queue contains
567 * at least one element {@code e} such that {@code o.equals(e)}.
568 *
569 * @param o object to be checked for containment in this queue
570 * @return {@code true} if this queue contains the specified element
571 */
572 public boolean contains(Object o) {
573 if (o == null) return false;
574 fullyLock();
575 try {
576 for (Node<E> p = head.next; p != null; p = p.next)
577 if (o.equals(p.item))
578 return true;
579 return false;
580 } finally {
581 fullyUnlock();
582 }
583 }
584
585 /**
586 * Returns an array containing all of the elements in this queue, in
587 * proper sequence.
588 *
589 * <p>The returned array will be "safe" in that no references to it are
590 * maintained by this queue. (In other words, this method must allocate
591 * a new array). The caller is thus free to modify the returned array.
592 *
593 * <p>This method acts as bridge between array-based and collection-based
594 * APIs.
595 *
596 * @return an array containing all of the elements in this queue
597 */
598 public Object[] toArray() {
599 fullyLock();
600 try {
601 int size = count.get();
602 Object[] a = new Object[size];
603 int k = 0;
604 for (Node<E> p = head.next; p != null; p = p.next)
605 a[k++] = p.item;
651 try {
652 int size = count.get();
653 if (a.length < size)
654 a = (T[])java.lang.reflect.Array.newInstance
655 (a.getClass().getComponentType(), size);
656
657 int k = 0;
658 for (Node<E> p = head.next; p != null; p = p.next)
659 a[k++] = (T)p.item;
660 if (a.length > k)
661 a[k] = null;
662 return a;
663 } finally {
664 fullyUnlock();
665 }
666 }
667
668 public String toString() {
669 fullyLock();
670 try {
671 Node<E> p = head.next;
672 if (p == null)
673 return "[]";
674
675 StringBuilder sb = new StringBuilder();
676 sb.append('[');
677 for (;;) {
678 E e = p.item;
679 sb.append(e == this ? "(this Collection)" : e);
680 p = p.next;
681 if (p == null)
682 return sb.append(']').toString();
683 sb.append(',').append(' ');
684 }
685 } finally {
686 fullyUnlock();
687 }
688 }
689
690 /**
691 * Atomically removes all of the elements from this queue.
692 * The queue will be empty after this call returns.
693 */
694 public void clear() {
695 fullyLock();
696 try {
697 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
698 h.next = h;
699 p.item = null;
700 }
701 head = last;
702 // assert head.item == null && head.next == null;
703 if (count.getAndSet(0) == capacity)
704 notFull.signal();
746 ++i;
747 }
748 return n;
749 } finally {
750 // Restore invariants even if c.add() threw
751 if (i > 0) {
752 // assert h.item == null;
753 head = h;
754 signalNotFull = (count.getAndAdd(-i) == capacity);
755 }
756 }
757 } finally {
758 takeLock.unlock();
759 if (signalNotFull)
760 signalNotFull();
761 }
762 }
763
764 /**
765 * Returns an iterator over the elements in this queue in proper sequence.
766 * The elements will be returned in order from first (head) to last (tail).
767 *
768 * <p>The returned iterator is a "weakly consistent" iterator that
769 * will never throw {@link java.util.ConcurrentModificationException
770 * ConcurrentModificationException}, and guarantees to traverse
771 * elements as they existed upon construction of the iterator, and
772 * may (but is not guaranteed to) reflect any modifications
773 * subsequent to construction.
774 *
775 * @return an iterator over the elements in this queue in proper sequence
776 */
777 public Iterator<E> iterator() {
778 return new Itr();
779 }
780
781 private class Itr implements Iterator<E> {
782 /*
783 * Basic weakly-consistent iterator. At all times hold the next
784 * item to hand out so that if hasNext() reports true, we will
785 * still have it to return even if lost race with a take etc.
786 */
787 private Node<E> current;
788 private Node<E> lastRet;
789 private E currentElement;
790
791 Itr() {
792 fullyLock();
793 try {
|