src/share/classes/java/util/concurrent/LinkedBlockingQueue.java

Print this page




 315      * an element will succeed by inspecting {@code remainingCapacity}
 316      * because it may be the case that another thread is about to
 317      * insert or remove an element.
 318      */
 319     public int remainingCapacity() {
 320         return capacity - count.get();
 321     }
 322 
 323     /**
 324      * Inserts the specified element at the tail of this queue, waiting if
 325      * necessary for space to become available.
 326      *
 327      * @throws InterruptedException {@inheritDoc}
 328      * @throws NullPointerException {@inheritDoc}
 329      */
 330     public void put(E e) throws InterruptedException {
 331         if (e == null) throw new NullPointerException();
 332         // Note: convention in all put/take/etc is to preset local var
 333         // holding count negative to indicate failure unless set.
 334         int c = -1;
 335         Node<E> node = new Node(e);
 336         final ReentrantLock putLock = this.putLock;
 337         final AtomicInteger count = this.count;
 338         putLock.lockInterruptibly();
 339         try {
 340             /*
 341              * Note that count is used in wait guard even though it is
 342              * not protected by lock. This works because count can
 343              * only decrease at this point (all other puts are shut
 344              * out by lock), and we (or some other waiting put) are
 345              * signalled if it ever changes from capacity. Similarly
 346              * for all other uses of count in other wait guards.
 347              */
 348             while (count.get() == capacity) {
 349                 notFull.await();
 350             }
 351             enqueue(node);
 352             c = count.getAndIncrement();
 353             if (c + 1 < capacity)
 354                 notFull.signal();
 355         } finally {


 395         return true;
 396     }
 397 
 398     /**
 399      * Inserts the specified element at the tail of this queue if it is
 400      * possible to do so immediately without exceeding the queue's capacity,
 401      * returning {@code true} upon success and {@code false} if this queue
 402      * is full.
 403      * When using a capacity-restricted queue, this method is generally
 404      * preferable to method {@link BlockingQueue#add add}, which can fail to
 405      * insert an element only by throwing an exception.
 406      *
 407      * @throws NullPointerException if the specified element is null
 408      */
 409     public boolean offer(E e) {
 410         if (e == null) throw new NullPointerException();
 411         final AtomicInteger count = this.count;
 412         if (count.get() == capacity)
 413             return false;
 414         int c = -1;
 415         Node<E> node = new Node(e);
 416         final ReentrantLock putLock = this.putLock;
 417         putLock.lock();
 418         try {
 419             if (count.get() < capacity) {
 420                 enqueue(node);
 421                 c = count.getAndIncrement();
 422                 if (c + 1 < capacity)
 423                     notFull.signal();
 424             }
 425         } finally {
 426             putLock.unlock();
 427         }
 428         if (c == 0)
 429             signalNotEmpty();
 430         return c >= 0;
 431     }
 432 
 433 
 434     public E take() throws InterruptedException {
 435         E x;


 711      * @throws UnsupportedOperationException {@inheritDoc}
 712      * @throws ClassCastException            {@inheritDoc}
 713      * @throws NullPointerException          {@inheritDoc}
 714      * @throws IllegalArgumentException      {@inheritDoc}
 715      */
 716     public int drainTo(Collection<? super E> c) {
 717         return drainTo(c, Integer.MAX_VALUE);
 718     }
 719 
 720     /**
 721      * @throws UnsupportedOperationException {@inheritDoc}
 722      * @throws ClassCastException            {@inheritDoc}
 723      * @throws NullPointerException          {@inheritDoc}
 724      * @throws IllegalArgumentException      {@inheritDoc}
 725      */
 726     public int drainTo(Collection<? super E> c, int maxElements) {
 727         if (c == null)
 728             throw new NullPointerException();
 729         if (c == this)
 730             throw new IllegalArgumentException();


 731         boolean signalNotFull = false;
 732         final ReentrantLock takeLock = this.takeLock;
 733         takeLock.lock();
 734         try {
 735             int n = Math.min(maxElements, count.get());
 736             // count.get provides visibility to first n Nodes
 737             Node<E> h = head;
 738             int i = 0;
 739             try {
 740                 while (i < n) {
 741                     Node<E> p = h.next;
 742                     c.add(p.item);
 743                     p.item = null;
 744                     h.next = h;
 745                     h = p;
 746                     ++i;
 747                 }
 748                 return n;
 749             } finally {
 750                 // Restore invariants even if c.add() threw




 315      * an element will succeed by inspecting {@code remainingCapacity}
 316      * because it may be the case that another thread is about to
 317      * insert or remove an element.
 318      */
 319     public int remainingCapacity() {
 320         return capacity - count.get();
 321     }
 322 
 323     /**
 324      * Inserts the specified element at the tail of this queue, waiting if
 325      * necessary for space to become available.
 326      *
 327      * @throws InterruptedException {@inheritDoc}
 328      * @throws NullPointerException {@inheritDoc}
 329      */
 330     public void put(E e) throws InterruptedException {
 331         if (e == null) throw new NullPointerException();
 332         // Note: convention in all put/take/etc is to preset local var
 333         // holding count negative to indicate failure unless set.
 334         int c = -1;
 335         Node<E> node = new Node<E>(e);
 336         final ReentrantLock putLock = this.putLock;
 337         final AtomicInteger count = this.count;
 338         putLock.lockInterruptibly();
 339         try {
 340             /*
 341              * Note that count is used in wait guard even though it is
 342              * not protected by lock. This works because count can
 343              * only decrease at this point (all other puts are shut
 344              * out by lock), and we (or some other waiting put) are
 345              * signalled if it ever changes from capacity. Similarly
 346              * for all other uses of count in other wait guards.
 347              */
 348             while (count.get() == capacity) {
 349                 notFull.await();
 350             }
 351             enqueue(node);
 352             c = count.getAndIncrement();
 353             if (c + 1 < capacity)
 354                 notFull.signal();
 355         } finally {


 395         return true;
 396     }
 397 
 398     /**
 399      * Inserts the specified element at the tail of this queue if it is
 400      * possible to do so immediately without exceeding the queue's capacity,
 401      * returning {@code true} upon success and {@code false} if this queue
 402      * is full.
 403      * When using a capacity-restricted queue, this method is generally
 404      * preferable to method {@link BlockingQueue#add add}, which can fail to
 405      * insert an element only by throwing an exception.
 406      *
 407      * @throws NullPointerException if the specified element is null
 408      */
 409     public boolean offer(E e) {
 410         if (e == null) throw new NullPointerException();
 411         final AtomicInteger count = this.count;
 412         if (count.get() == capacity)
 413             return false;
 414         int c = -1;
 415         Node<E> node = new Node<E>(e);
 416         final ReentrantLock putLock = this.putLock;
 417         putLock.lock();
 418         try {
 419             if (count.get() < capacity) {
 420                 enqueue(node);
 421                 c = count.getAndIncrement();
 422                 if (c + 1 < capacity)
 423                     notFull.signal();
 424             }
 425         } finally {
 426             putLock.unlock();
 427         }
 428         if (c == 0)
 429             signalNotEmpty();
 430         return c >= 0;
 431     }
 432 
 433 
 434     public E take() throws InterruptedException {
 435         E x;


 711      * @throws UnsupportedOperationException {@inheritDoc}
 712      * @throws ClassCastException            {@inheritDoc}
 713      * @throws NullPointerException          {@inheritDoc}
 714      * @throws IllegalArgumentException      {@inheritDoc}
 715      */
 716     public int drainTo(Collection<? super E> c) {
 717         return drainTo(c, Integer.MAX_VALUE);
 718     }
 719 
 720     /**
 721      * @throws UnsupportedOperationException {@inheritDoc}
 722      * @throws ClassCastException            {@inheritDoc}
 723      * @throws NullPointerException          {@inheritDoc}
 724      * @throws IllegalArgumentException      {@inheritDoc}
 725      */
 726     public int drainTo(Collection<? super E> c, int maxElements) {
 727         if (c == null)
 728             throw new NullPointerException();
 729         if (c == this)
 730             throw new IllegalArgumentException();
 731         if (maxElements <= 0)
 732             return 0;
 733         boolean signalNotFull = false;
 734         final ReentrantLock takeLock = this.takeLock;
 735         takeLock.lock();
 736         try {
 737             int n = Math.min(maxElements, count.get());
 738             // count.get provides visibility to first n Nodes
 739             Node<E> h = head;
 740             int i = 0;
 741             try {
 742                 while (i < n) {
 743                     Node<E> p = h.next;
 744                     c.add(p.item);
 745                     p.item = null;
 746                     h.next = h;
 747                     h = p;
 748                     ++i;
 749                 }
 750                 return n;
 751             } finally {
 752                 // Restore invariants even if c.add() threw