315 * an element will succeed by inspecting {@code remainingCapacity}
316 * because it may be the case that another thread is about to
317 * insert or remove an element.
318 */
319 public int remainingCapacity() {
320 return capacity - count.get();
321 }
322
323 /**
324 * Inserts the specified element at the tail of this queue, waiting if
325 * necessary for space to become available.
326 *
327 * @throws InterruptedException {@inheritDoc}
328 * @throws NullPointerException {@inheritDoc}
329 */
330 public void put(E e) throws InterruptedException {
331 if (e == null) throw new NullPointerException();
332 // Note: convention in all put/take/etc is to preset local var
333 // holding count negative to indicate failure unless set.
334 int c = -1;
335 Node<E> node = new Node(e);
336 final ReentrantLock putLock = this.putLock;
337 final AtomicInteger count = this.count;
338 putLock.lockInterruptibly();
339 try {
340 /*
341 * Note that count is used in wait guard even though it is
342 * not protected by lock. This works because count can
343 * only decrease at this point (all other puts are shut
344 * out by lock), and we (or some other waiting put) are
345 * signalled if it ever changes from capacity. Similarly
346 * for all other uses of count in other wait guards.
347 */
348 while (count.get() == capacity) {
349 notFull.await();
350 }
351 enqueue(node);
352 c = count.getAndIncrement();
353 if (c + 1 < capacity)
354 notFull.signal();
355 } finally {
395 return true;
396 }
397
398 /**
399 * Inserts the specified element at the tail of this queue if it is
400 * possible to do so immediately without exceeding the queue's capacity,
401 * returning {@code true} upon success and {@code false} if this queue
402 * is full.
403 * When using a capacity-restricted queue, this method is generally
404 * preferable to method {@link BlockingQueue#add add}, which can fail to
405 * insert an element only by throwing an exception.
406 *
407 * @throws NullPointerException if the specified element is null
408 */
409 public boolean offer(E e) {
410 if (e == null) throw new NullPointerException();
411 final AtomicInteger count = this.count;
412 if (count.get() == capacity)
413 return false;
414 int c = -1;
415 Node<E> node = new Node(e);
416 final ReentrantLock putLock = this.putLock;
417 putLock.lock();
418 try {
419 if (count.get() < capacity) {
420 enqueue(node);
421 c = count.getAndIncrement();
422 if (c + 1 < capacity)
423 notFull.signal();
424 }
425 } finally {
426 putLock.unlock();
427 }
428 if (c == 0)
429 signalNotEmpty();
430 return c >= 0;
431 }
432
433
434 public E take() throws InterruptedException {
435 E x;
711 * @throws UnsupportedOperationException {@inheritDoc}
712 * @throws ClassCastException {@inheritDoc}
713 * @throws NullPointerException {@inheritDoc}
714 * @throws IllegalArgumentException {@inheritDoc}
715 */
716 public int drainTo(Collection<? super E> c) {
717 return drainTo(c, Integer.MAX_VALUE);
718 }
719
720 /**
721 * @throws UnsupportedOperationException {@inheritDoc}
722 * @throws ClassCastException {@inheritDoc}
723 * @throws NullPointerException {@inheritDoc}
724 * @throws IllegalArgumentException {@inheritDoc}
725 */
726 public int drainTo(Collection<? super E> c, int maxElements) {
727 if (c == null)
728 throw new NullPointerException();
729 if (c == this)
730 throw new IllegalArgumentException();
731 boolean signalNotFull = false;
732 final ReentrantLock takeLock = this.takeLock;
733 takeLock.lock();
734 try {
735 int n = Math.min(maxElements, count.get());
736 // count.get provides visibility to first n Nodes
737 Node<E> h = head;
738 int i = 0;
739 try {
740 while (i < n) {
741 Node<E> p = h.next;
742 c.add(p.item);
743 p.item = null;
744 h.next = h;
745 h = p;
746 ++i;
747 }
748 return n;
749 } finally {
750 // Restore invariants even if c.add() threw
|
315 * an element will succeed by inspecting {@code remainingCapacity}
316 * because it may be the case that another thread is about to
317 * insert or remove an element.
318 */
319 public int remainingCapacity() {
320 return capacity - count.get();
321 }
322
323 /**
324 * Inserts the specified element at the tail of this queue, waiting if
325 * necessary for space to become available.
326 *
327 * @throws InterruptedException {@inheritDoc}
328 * @throws NullPointerException {@inheritDoc}
329 */
330 public void put(E e) throws InterruptedException {
331 if (e == null) throw new NullPointerException();
332 // Note: convention in all put/take/etc is to preset local var
333 // holding count negative to indicate failure unless set.
334 int c = -1;
335 Node<E> node = new Node<E>(e);
336 final ReentrantLock putLock = this.putLock;
337 final AtomicInteger count = this.count;
338 putLock.lockInterruptibly();
339 try {
340 /*
341 * Note that count is used in wait guard even though it is
342 * not protected by lock. This works because count can
343 * only decrease at this point (all other puts are shut
344 * out by lock), and we (or some other waiting put) are
345 * signalled if it ever changes from capacity. Similarly
346 * for all other uses of count in other wait guards.
347 */
348 while (count.get() == capacity) {
349 notFull.await();
350 }
351 enqueue(node);
352 c = count.getAndIncrement();
353 if (c + 1 < capacity)
354 notFull.signal();
355 } finally {
395 return true;
396 }
397
398 /**
399 * Inserts the specified element at the tail of this queue if it is
400 * possible to do so immediately without exceeding the queue's capacity,
401 * returning {@code true} upon success and {@code false} if this queue
402 * is full.
403 * When using a capacity-restricted queue, this method is generally
404 * preferable to method {@link BlockingQueue#add add}, which can fail to
405 * insert an element only by throwing an exception.
406 *
407 * @throws NullPointerException if the specified element is null
408 */
409 public boolean offer(E e) {
410 if (e == null) throw new NullPointerException();
411 final AtomicInteger count = this.count;
412 if (count.get() == capacity)
413 return false;
414 int c = -1;
415 Node<E> node = new Node<E>(e);
416 final ReentrantLock putLock = this.putLock;
417 putLock.lock();
418 try {
419 if (count.get() < capacity) {
420 enqueue(node);
421 c = count.getAndIncrement();
422 if (c + 1 < capacity)
423 notFull.signal();
424 }
425 } finally {
426 putLock.unlock();
427 }
428 if (c == 0)
429 signalNotEmpty();
430 return c >= 0;
431 }
432
433
434 public E take() throws InterruptedException {
435 E x;
711 * @throws UnsupportedOperationException {@inheritDoc}
712 * @throws ClassCastException {@inheritDoc}
713 * @throws NullPointerException {@inheritDoc}
714 * @throws IllegalArgumentException {@inheritDoc}
715 */
716 public int drainTo(Collection<? super E> c) {
717 return drainTo(c, Integer.MAX_VALUE);
718 }
719
720 /**
721 * @throws UnsupportedOperationException {@inheritDoc}
722 * @throws ClassCastException {@inheritDoc}
723 * @throws NullPointerException {@inheritDoc}
724 * @throws IllegalArgumentException {@inheritDoc}
725 */
726 public int drainTo(Collection<? super E> c, int maxElements) {
727 if (c == null)
728 throw new NullPointerException();
729 if (c == this)
730 throw new IllegalArgumentException();
731 if (maxElements <= 0)
732 return 0;
733 boolean signalNotFull = false;
734 final ReentrantLock takeLock = this.takeLock;
735 takeLock.lock();
736 try {
737 int n = Math.min(maxElements, count.get());
738 // count.get provides visibility to first n Nodes
739 Node<E> h = head;
740 int i = 0;
741 try {
742 while (i < n) {
743 Node<E> p = h.next;
744 c.add(p.item);
745 p.item = null;
746 h.next = h;
747 h = p;
748 ++i;
749 }
750 return n;
751 } finally {
752 // Restore invariants even if c.add() threw
|