src/share/classes/java/util/concurrent/LinkedBlockingQueue.java

Print this page




 172             notEmpty.signal();
 173         } finally {
 174             takeLock.unlock();
 175         }
 176     }
 177 
 178     /**
 179      * Signals a waiting put. Called only from take/poll.
 180      */
 181     private void signalNotFull() {
 182         final ReentrantLock putLock = this.putLock;
 183         putLock.lock();
 184         try {
 185             notFull.signal();
 186         } finally {
 187             putLock.unlock();
 188         }
 189     }
 190 
 191     /**
 192      * Creates a node and links it at end of queue.
 193      *
 194      * @param x the item
 195      */
 196     private void enqueue(E x) {
 197         // assert putLock.isHeldByCurrentThread();
 198         // assert last.next == null;
 199         last = last.next = new Node<E>(x);
 200     }
 201 
 202     /**
 203      * Removes a node from head of queue.
 204      *
 205      * @return the node
 206      */
 207     private E dequeue() {
 208         // assert takeLock.isHeldByCurrentThread();
 209         // assert head.item == null;
 210         Node<E> h = head;
 211         Node<E> first = h.next;
 212         h.next = h; // help GC
 213         head = first;
 214         E x = first.item;
 215         first.item = null;
 216         return x;
 217     }
 218 
 219     /**


 265      * Creates a {@code LinkedBlockingQueue} with a capacity of
 266      * {@link Integer#MAX_VALUE}, initially containing the elements of the
 267      * given collection,
 268      * added in traversal order of the collection's iterator.
 269      *
 270      * @param c the collection of elements to initially contain
 271      * @throws NullPointerException if the specified collection or any
 272      *         of its elements are null
 273      */
 274     public LinkedBlockingQueue(Collection<? extends E> c) {
 275         this(Integer.MAX_VALUE);
 276         final ReentrantLock putLock = this.putLock;
 277         putLock.lock(); // Never contended, but necessary for visibility
 278         try {
 279             int n = 0;
 280             for (E e : c) {
 281                 if (e == null)
 282                     throw new NullPointerException();
 283                 if (n == capacity)
 284                     throw new IllegalStateException("Queue full");
 285                 enqueue(e);
 286                 ++n;
 287             }
 288             count.set(n);
 289         } finally {
 290             putLock.unlock();
 291         }
 292     }
 293 
 294 
 295     // this doc comment is overridden to remove the reference to collections
 296     // greater in size than Integer.MAX_VALUE
 297     /**
 298      * Returns the number of elements in this queue.
 299      *
 300      * @return the number of elements in this queue
 301      */
 302     public int size() {
 303         return count.get();
 304     }
 305 


 315      * an element will succeed by inspecting {@code remainingCapacity}
 316      * because it may be the case that another thread is about to
 317      * insert or remove an element.
 318      */
 319     public int remainingCapacity() {
 320         return capacity - count.get();
 321     }
 322 
 323     /**
 324      * Inserts the specified element at the tail of this queue, waiting if
 325      * necessary for space to become available.
 326      *
 327      * @throws InterruptedException {@inheritDoc}
 328      * @throws NullPointerException {@inheritDoc}
 329      */
 330     public void put(E e) throws InterruptedException {
 331         if (e == null) throw new NullPointerException();
 332         // Note: convention in all put/take/etc is to preset local var
 333         // holding count negative to indicate failure unless set.
 334         int c = -1;

 335         final ReentrantLock putLock = this.putLock;
 336         final AtomicInteger count = this.count;
 337         putLock.lockInterruptibly();
 338         try {
 339             /*
 340              * Note that count is used in wait guard even though it is
 341              * not protected by lock. This works because count can
 342              * only decrease at this point (all other puts are shut
 343              * out by lock), and we (or some other waiting put) are
 344              * signalled if it ever changes from capacity. Similarly
 345              * for all other uses of count in other wait guards.
 346              */
 347             while (count.get() == capacity) {
 348                 notFull.await();
 349             }
 350             enqueue(e);
 351             c = count.getAndIncrement();
 352             if (c + 1 < capacity)
 353                 notFull.signal();
 354         } finally {
 355             putLock.unlock();
 356         }
 357         if (c == 0)
 358             signalNotEmpty();
 359     }
 360 
 361     /**
 362      * Inserts the specified element at the tail of this queue, waiting if
 363      * necessary up to the specified wait time for space to become available.
 364      *
 365      * @return {@code true} if successful, or {@code false} if
 366      *         the specified waiting time elapses before space is available.
 367      * @throws InterruptedException {@inheritDoc}
 368      * @throws NullPointerException {@inheritDoc}
 369      */
 370     public boolean offer(E e, long timeout, TimeUnit unit)
 371         throws InterruptedException {
 372 
 373         if (e == null) throw new NullPointerException();
 374         long nanos = unit.toNanos(timeout);
 375         int c = -1;
 376         final ReentrantLock putLock = this.putLock;
 377         final AtomicInteger count = this.count;
 378         putLock.lockInterruptibly();
 379         try {
 380             while (count.get() == capacity) {
 381                 if (nanos <= 0)
 382                     return false;
 383                 nanos = notFull.awaitNanos(nanos);
 384             }
 385             enqueue(e);
 386             c = count.getAndIncrement();
 387             if (c + 1 < capacity)
 388                 notFull.signal();
 389         } finally {
 390             putLock.unlock();
 391         }
 392         if (c == 0)
 393             signalNotEmpty();
 394         return true;
 395     }
 396 
 397     /**
 398      * Inserts the specified element at the tail of this queue if it is
 399      * possible to do so immediately without exceeding the queue's capacity,
 400      * returning {@code true} upon success and {@code false} if this queue
 401      * is full.
 402      * When using a capacity-restricted queue, this method is generally
 403      * preferable to method {@link BlockingQueue#add add}, which can fail to
 404      * insert an element only by throwing an exception.
 405      *
 406      * @throws NullPointerException if the specified element is null
 407      */
 408     public boolean offer(E e) {
 409         if (e == null) throw new NullPointerException();
 410         final AtomicInteger count = this.count;
 411         if (count.get() == capacity)
 412             return false;
 413         int c = -1;

 414         final ReentrantLock putLock = this.putLock;
 415         putLock.lock();
 416         try {
 417             if (count.get() < capacity) {
 418                 enqueue(e);
 419                 c = count.getAndIncrement();
 420                 if (c + 1 < capacity)
 421                     notFull.signal();
 422             }
 423         } finally {
 424             putLock.unlock();
 425         }
 426         if (c == 0)
 427             signalNotEmpty();
 428         return c >= 0;
 429     }
 430 
 431 
 432     public E take() throws InterruptedException {
 433         E x;
 434         int c = -1;
 435         final AtomicInteger count = this.count;
 436         final ReentrantLock takeLock = this.takeLock;
 437         takeLock.lockInterruptibly();
 438         try {


 543      */
 544     public boolean remove(Object o) {
 545         if (o == null) return false;
 546         fullyLock();
 547         try {
 548             for (Node<E> trail = head, p = trail.next;
 549                  p != null;
 550                  trail = p, p = p.next) {
 551                 if (o.equals(p.item)) {
 552                     unlink(p, trail);
 553                     return true;
 554                 }
 555             }
 556             return false;
 557         } finally {
 558             fullyUnlock();
 559         }
 560     }
 561 
 562     /**





















 563      * Returns an array containing all of the elements in this queue, in
 564      * proper sequence.
 565      *
 566      * <p>The returned array will be "safe" in that no references to it are
 567      * maintained by this queue.  (In other words, this method must allocate
 568      * a new array).  The caller is thus free to modify the returned array.
 569      *
 570      * <p>This method acts as bridge between array-based and collection-based
 571      * APIs.
 572      *
 573      * @return an array containing all of the elements in this queue
 574      */
 575     public Object[] toArray() {
 576         fullyLock();
 577         try {
 578             int size = count.get();
 579             Object[] a = new Object[size];
 580             int k = 0;
 581             for (Node<E> p = head.next; p != null; p = p.next)
 582                 a[k++] = p.item;


 628         try {
 629             int size = count.get();
 630             if (a.length < size)
 631                 a = (T[])java.lang.reflect.Array.newInstance
 632                     (a.getClass().getComponentType(), size);
 633 
 634             int k = 0;
 635             for (Node<E> p = head.next; p != null; p = p.next)
 636                 a[k++] = (T)p.item;
 637             if (a.length > k)
 638                 a[k] = null;
 639             return a;
 640         } finally {
 641             fullyUnlock();
 642         }
 643     }
 644 
 645     public String toString() {
 646         fullyLock();
 647         try {
 648             return super.toString();













 649         } finally {
 650             fullyUnlock();
 651         }
 652     }
 653 
 654     /**
 655      * Atomically removes all of the elements from this queue.
 656      * The queue will be empty after this call returns.
 657      */
 658     public void clear() {
 659         fullyLock();
 660         try {
 661             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
 662                 h.next = h;
 663                 p.item = null;
 664             }
 665             head = last;
 666             // assert head.item == null && head.next == null;
 667             if (count.getAndSet(0) == capacity)
 668                 notFull.signal();


 710                     ++i;
 711                 }
 712                 return n;
 713             } finally {
 714                 // Restore invariants even if c.add() threw
 715                 if (i > 0) {
 716                     // assert h.item == null;
 717                     head = h;
 718                     signalNotFull = (count.getAndAdd(-i) == capacity);
 719                 }
 720             }
 721         } finally {
 722             takeLock.unlock();
 723             if (signalNotFull)
 724                 signalNotFull();
 725         }
 726     }
 727 
 728     /**
 729      * Returns an iterator over the elements in this queue in proper sequence.
 730      * The returned {@code Iterator} is a "weakly consistent" iterator that


 731      * will never throw {@link java.util.ConcurrentModificationException
 732      * ConcurrentModificationException},
 733      * and guarantees to traverse elements as they existed upon
 734      * construction of the iterator, and may (but is not guaranteed to)
 735      * reflect any modifications subsequent to construction.
 736      *
 737      * @return an iterator over the elements in this queue in proper sequence
 738      */
 739     public Iterator<E> iterator() {
 740       return new Itr();
 741     }
 742 
 743     private class Itr implements Iterator<E> {
 744         /*
 745          * Basic weakly-consistent iterator.  At all times hold the next
 746          * item to hand out so that if hasNext() reports true, we will
 747          * still have it to return even if lost race with a take etc.
 748          */
 749         private Node<E> current;
 750         private Node<E> lastRet;
 751         private E currentElement;
 752 
 753         Itr() {
 754             fullyLock();
 755             try {




 172             notEmpty.signal();
 173         } finally {
 174             takeLock.unlock();
 175         }
 176     }
 177 
 178     /**
 179      * Signals a waiting put. Called only from take/poll.
 180      */
 181     private void signalNotFull() {
 182         final ReentrantLock putLock = this.putLock;
 183         putLock.lock();
 184         try {
 185             notFull.signal();
 186         } finally {
 187             putLock.unlock();
 188         }
 189     }
 190 
 191     /**
 192      * Links node at end of queue.
 193      *
 194      * @param node the node
 195      */
 196     private void enqueue(Node<E> node) {
 197         // assert putLock.isHeldByCurrentThread();
 198         // assert last.next == null;
 199         last = last.next = node;
 200     }
 201 
 202     /**
 203      * Removes a node from head of queue.
 204      *
 205      * @return the node
 206      */
 207     private E dequeue() {
 208         // assert takeLock.isHeldByCurrentThread();
 209         // assert head.item == null;
 210         Node<E> h = head;
 211         Node<E> first = h.next;
 212         h.next = h; // help GC
 213         head = first;
 214         E x = first.item;
 215         first.item = null;
 216         return x;
 217     }
 218 
 219     /**


 265      * Creates a {@code LinkedBlockingQueue} with a capacity of
 266      * {@link Integer#MAX_VALUE}, initially containing the elements of the
 267      * given collection,
 268      * added in traversal order of the collection's iterator.
 269      *
 270      * @param c the collection of elements to initially contain
 271      * @throws NullPointerException if the specified collection or any
 272      *         of its elements are null
 273      */
 274     public LinkedBlockingQueue(Collection<? extends E> c) {
 275         this(Integer.MAX_VALUE);
 276         final ReentrantLock putLock = this.putLock;
 277         putLock.lock(); // Never contended, but necessary for visibility
 278         try {
 279             int n = 0;
 280             for (E e : c) {
 281                 if (e == null)
 282                     throw new NullPointerException();
 283                 if (n == capacity)
 284                     throw new IllegalStateException("Queue full");
 285                 enqueue(new Node<E>(e));
 286                 ++n;
 287             }
 288             count.set(n);
 289         } finally {
 290             putLock.unlock();
 291         }
 292     }
 293 
 294 
 295     // this doc comment is overridden to remove the reference to collections
 296     // greater in size than Integer.MAX_VALUE
 297     /**
 298      * Returns the number of elements in this queue.
 299      *
 300      * @return the number of elements in this queue
 301      */
 302     public int size() {
 303         return count.get();
 304     }
 305 


 315      * an element will succeed by inspecting {@code remainingCapacity}
 316      * because it may be the case that another thread is about to
 317      * insert or remove an element.
 318      */
 319     public int remainingCapacity() {
 320         return capacity - count.get();
 321     }
 322 
 323     /**
 324      * Inserts the specified element at the tail of this queue, waiting if
 325      * necessary for space to become available.
 326      *
 327      * @throws InterruptedException {@inheritDoc}
 328      * @throws NullPointerException {@inheritDoc}
 329      */
 330     public void put(E e) throws InterruptedException {
 331         if (e == null) throw new NullPointerException();
 332         // Note: convention in all put/take/etc is to preset local var
 333         // holding count negative to indicate failure unless set.
 334         int c = -1;
 335         Node<E> node = new Node(e);
 336         final ReentrantLock putLock = this.putLock;
 337         final AtomicInteger count = this.count;
 338         putLock.lockInterruptibly();
 339         try {
 340             /*
 341              * Note that count is used in wait guard even though it is
 342              * not protected by lock. This works because count can
 343              * only decrease at this point (all other puts are shut
 344              * out by lock), and we (or some other waiting put) are
 345              * signalled if it ever changes from capacity. Similarly
 346              * for all other uses of count in other wait guards.
 347              */
 348             while (count.get() == capacity) {
 349                 notFull.await();
 350             }
 351             enqueue(node);
 352             c = count.getAndIncrement();
 353             if (c + 1 < capacity)
 354                 notFull.signal();
 355         } finally {
 356             putLock.unlock();
 357         }
 358         if (c == 0)
 359             signalNotEmpty();
 360     }
 361 
 362     /**
 363      * Inserts the specified element at the tail of this queue, waiting if
 364      * necessary up to the specified wait time for space to become available.
 365      *
 366      * @return {@code true} if successful, or {@code false} if
 367      *         the specified waiting time elapses before space is available.
 368      * @throws InterruptedException {@inheritDoc}
 369      * @throws NullPointerException {@inheritDoc}
 370      */
 371     public boolean offer(E e, long timeout, TimeUnit unit)
 372         throws InterruptedException {
 373 
 374         if (e == null) throw new NullPointerException();
 375         long nanos = unit.toNanos(timeout);
 376         int c = -1;
 377         final ReentrantLock putLock = this.putLock;
 378         final AtomicInteger count = this.count;
 379         putLock.lockInterruptibly();
 380         try {
 381             while (count.get() == capacity) {
 382                 if (nanos <= 0)
 383                     return false;
 384                 nanos = notFull.awaitNanos(nanos);
 385             }
 386             enqueue(new Node<E>(e));
 387             c = count.getAndIncrement();
 388             if (c + 1 < capacity)
 389                 notFull.signal();
 390         } finally {
 391             putLock.unlock();
 392         }
 393         if (c == 0)
 394             signalNotEmpty();
 395         return true;
 396     }
 397 
 398     /**
 399      * Inserts the specified element at the tail of this queue if it is
 400      * possible to do so immediately without exceeding the queue's capacity,
 401      * returning {@code true} upon success and {@code false} if this queue
 402      * is full.
 403      * When using a capacity-restricted queue, this method is generally
 404      * preferable to method {@link BlockingQueue#add add}, which can fail to
 405      * insert an element only by throwing an exception.
 406      *
 407      * @throws NullPointerException if the specified element is null
 408      */
 409     public boolean offer(E e) {
 410         if (e == null) throw new NullPointerException();
 411         final AtomicInteger count = this.count;
 412         if (count.get() == capacity)
 413             return false;
 414         int c = -1;
 415         Node<E> node = new Node(e);
 416         final ReentrantLock putLock = this.putLock;
 417         putLock.lock();
 418         try {
 419             if (count.get() < capacity) {
 420                 enqueue(node);
 421                 c = count.getAndIncrement();
 422                 if (c + 1 < capacity)
 423                     notFull.signal();
 424             }
 425         } finally {
 426             putLock.unlock();
 427         }
 428         if (c == 0)
 429             signalNotEmpty();
 430         return c >= 0;
 431     }
 432 
 433 
 434     public E take() throws InterruptedException {
 435         E x;
 436         int c = -1;
 437         final AtomicInteger count = this.count;
 438         final ReentrantLock takeLock = this.takeLock;
 439         takeLock.lockInterruptibly();
 440         try {


 545      */
 546     public boolean remove(Object o) {
 547         if (o == null) return false;
 548         fullyLock();
 549         try {
 550             for (Node<E> trail = head, p = trail.next;
 551                  p != null;
 552                  trail = p, p = p.next) {
 553                 if (o.equals(p.item)) {
 554                     unlink(p, trail);
 555                     return true;
 556                 }
 557             }
 558             return false;
 559         } finally {
 560             fullyUnlock();
 561         }
 562     }
 563 
 564     /**
 565      * Returns {@code true} if this queue contains the specified element.
 566      * More formally, returns {@code true} if and only if this queue contains
 567      * at least one element {@code e} such that {@code o.equals(e)}.
 568      *
 569      * @param o object to be checked for containment in this queue
 570      * @return {@code true} if this queue contains the specified element
 571      */
 572     public boolean contains(Object o) {
 573         if (o == null) return false;
 574         fullyLock();
 575         try {
 576             for (Node<E> p = head.next; p != null; p = p.next)
 577                 if (o.equals(p.item))
 578                     return true;
 579             return false;
 580         } finally {
 581             fullyUnlock();
 582         }
 583     }
 584 
 585     /**
 586      * Returns an array containing all of the elements in this queue, in
 587      * proper sequence.
 588      *
 589      * <p>The returned array will be "safe" in that no references to it are
 590      * maintained by this queue.  (In other words, this method must allocate
 591      * a new array).  The caller is thus free to modify the returned array.
 592      *
 593      * <p>This method acts as bridge between array-based and collection-based
 594      * APIs.
 595      *
 596      * @return an array containing all of the elements in this queue
 597      */
 598     public Object[] toArray() {
 599         fullyLock();
 600         try {
 601             int size = count.get();
 602             Object[] a = new Object[size];
 603             int k = 0;
 604             for (Node<E> p = head.next; p != null; p = p.next)
 605                 a[k++] = p.item;


 651         try {
 652             int size = count.get();
 653             if (a.length < size)
 654                 a = (T[])java.lang.reflect.Array.newInstance
 655                     (a.getClass().getComponentType(), size);
 656 
 657             int k = 0;
 658             for (Node<E> p = head.next; p != null; p = p.next)
 659                 a[k++] = (T)p.item;
 660             if (a.length > k)
 661                 a[k] = null;
 662             return a;
 663         } finally {
 664             fullyUnlock();
 665         }
 666     }
 667 
 668     public String toString() {
 669         fullyLock();
 670         try {
 671             Node<E> p = head.next;
 672             if (p == null)
 673                 return "[]";
 674 
 675             StringBuilder sb = new StringBuilder();
 676             sb.append('[');
 677             for (;;) {
 678                 E e = p.item;
 679                 sb.append(e == this ? "(this Collection)" : e);
 680                 p = p.next;
 681                 if (p == null)
 682                     return sb.append(']').toString();
 683                 sb.append(',').append(' ');
 684             }
 685         } finally {
 686             fullyUnlock();
 687         }
 688     }
 689 
 690     /**
 691      * Atomically removes all of the elements from this queue.
 692      * The queue will be empty after this call returns.
 693      */
 694     public void clear() {
 695         fullyLock();
 696         try {
 697             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
 698                 h.next = h;
 699                 p.item = null;
 700             }
 701             head = last;
 702             // assert head.item == null && head.next == null;
 703             if (count.getAndSet(0) == capacity)
 704                 notFull.signal();


 746                     ++i;
 747                 }
 748                 return n;
 749             } finally {
 750                 // Restore invariants even if c.add() threw
 751                 if (i > 0) {
 752                     // assert h.item == null;
 753                     head = h;
 754                     signalNotFull = (count.getAndAdd(-i) == capacity);
 755                 }
 756             }
 757         } finally {
 758             takeLock.unlock();
 759             if (signalNotFull)
 760                 signalNotFull();
 761         }
 762     }
 763 
 764     /**
 765      * Returns an iterator over the elements in this queue in proper sequence.
 766      * The elements will be returned in order from first (head) to last (tail).
 767      *
 768      * <p>The returned iterator is a "weakly consistent" iterator that
 769      * will never throw {@link java.util.ConcurrentModificationException
 770      * ConcurrentModificationException}, and guarantees to traverse
 771      * elements as they existed upon construction of the iterator, and
 772      * may (but is not guaranteed to) reflect any modifications
 773      * subsequent to construction.
 774      *
 775      * @return an iterator over the elements in this queue in proper sequence
 776      */
 777     public Iterator<E> iterator() {
 778       return new Itr();
 779     }
 780 
 781     private class Itr implements Iterator<E> {
 782         /*
 783          * Basic weakly-consistent iterator.  At all times hold the next
 784          * item to hand out so that if hasNext() reports true, we will
 785          * still have it to return even if lost race with a take etc.
 786          */
 787         private Node<E> current;
 788         private Node<E> lastRet;
 789         private E currentElement;
 790 
 791         Itr() {
 792             fullyLock();
 793             try {