Print this page


Split Close
Expand all
Collapse all
          --- old/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java
          +++ new/src/share/classes/java/util/concurrent/PriorityBlockingQueue.java
↓ open down ↓ 27 lines elided ↑ open up ↑
  28   28   * However, the following notice accompanied the original version of this
  29   29   * file:
  30   30   *
  31   31   * Written by Doug Lea with assistance from members of JCP JSR-166
  32   32   * Expert Group and released to the public domain, as explained at
  33   33   * http://creativecommons.org/publicdomain/zero/1.0/
  34   34   */
  35   35  
  36   36  package java.util.concurrent;
  37   37  
  38      -import java.util.concurrent.locks.*;
       38 +import java.util.concurrent.locks.Condition;
       39 +import java.util.concurrent.locks.ReentrantLock;
  39   40  import java.util.*;
  40   41  
  41   42  /**
  42   43   * An unbounded {@linkplain BlockingQueue blocking queue} that uses
  43   44   * the same ordering rules as class {@link PriorityQueue} and supplies
  44   45   * blocking retrieval operations.  While this queue is logically
  45   46   * unbounded, attempted additions may fail due to resource exhaustion
  46   47   * (causing {@code OutOfMemoryError}). This class does not permit
  47   48   * {@code null} elements.  A priority queue relying on {@linkplain
  48   49   * Comparable natural ordering} also does not permit insertion of
↓ open down ↓ 55 lines elided ↑ open up ↑
 104  105       * operations protected with a single lock. However, allocation
 105  106       * during resizing uses a simple spinlock (used only while not
 106  107       * holding main lock) in order to allow takes to operate
 107  108       * concurrently with allocation.  This avoids repeated
 108  109       * postponement of waiting consumers and consequent element
 109  110       * build-up. The need to back away from lock during allocation
 110  111       * makes it impossible to simply wrap delegated
 111  112       * java.util.PriorityQueue operations within a lock, as was done
 112  113       * in a previous version of this class. To maintain
 113  114       * interoperability, a plain PriorityQueue is still used during
 114      -     * serialization, which maintains compatibility at the espense of
      115 +     * serialization, which maintains compatibility at the expense of
 115  116       * transiently doubling overhead.
 116  117       */
 117  118  
 118  119      /**
 119  120       * Default array capacity.
 120  121       */
 121  122      private static final int DEFAULT_INITIAL_CAPACITY = 11;
 122  123  
 123  124      /**
 124  125       * The maximum size of array to allocate.
↓ open down ↓ 176 lines elided ↑ open up ↑
 301  302          lock.lock();
 302  303          if (newArray != null && queue == array) {
 303  304              queue = newArray;
 304  305              System.arraycopy(array, 0, newArray, 0, oldCap);
 305  306          }
 306  307      }
 307  308  
 308  309      /**
 309  310       * Mechanics for poll().  Call only while holding lock.
 310  311       */
 311      -    private E extract() {
 312      -        E result;
      312 +    private E dequeue() {
 313  313          int n = size - 1;
 314  314          if (n < 0)
 315      -            result = null;
      315 +            return null;
 316  316          else {
 317  317              Object[] array = queue;
 318      -            result = (E) array[0];
      318 +            E result = (E) array[0];
 319  319              E x = (E) array[n];
 320  320              array[n] = null;
 321  321              Comparator<? super E> cmp = comparator;
 322  322              if (cmp == null)
 323  323                  siftDownComparable(0, x, array, n);
 324  324              else
 325  325                  siftDownUsingComparator(0, x, array, n, cmp);
 326  326              size = n;
      327 +            return result;
 327  328          }
 328      -        return result;
 329  329      }
 330  330  
 331  331      /**
 332  332       * Inserts item x at position k, maintaining heap invariant by
 333  333       * promoting x up the tree until it is greater than or equal to
 334  334       * its parent, or is the root.
 335  335       *
 336  336       * To simplify and speed up coercions and comparisons. the
 337  337       * Comparable and Comparator versions are separated into different
 338  338       * methods that are otherwise identical. (Similarly for siftDown.)
↓ open down ↓ 36 lines elided ↑ open up ↑
 375  375       * demoting x down the tree repeatedly until it is less than or
 376  376       * equal to its children or is a leaf.
 377  377       *
 378  378       * @param k the position to fill
 379  379       * @param x the item to insert
 380  380       * @param array the heap array
 381  381       * @param n heap size
 382  382       */
 383  383      private static <T> void siftDownComparable(int k, T x, Object[] array,
 384  384                                                 int n) {
 385      -        Comparable<? super T> key = (Comparable<? super T>)x;
 386      -        int half = n >>> 1;           // loop while a non-leaf
 387      -        while (k < half) {
 388      -            int child = (k << 1) + 1; // assume left child is least
 389      -            Object c = array[child];
 390      -            int right = child + 1;
 391      -            if (right < n &&
 392      -                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
 393      -                c = array[child = right];
 394      -            if (key.compareTo((T) c) <= 0)
 395      -                break;
 396      -            array[k] = c;
 397      -            k = child;
      385 +        if (n > 0) {
      386 +            Comparable<? super T> key = (Comparable<? super T>)x;
      387 +            int half = n >>> 1;           // loop while a non-leaf
      388 +            while (k < half) {
      389 +                int child = (k << 1) + 1; // assume left child is least
      390 +                Object c = array[child];
      391 +                int right = child + 1;
      392 +                if (right < n &&
      393 +                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
      394 +                    c = array[child = right];
      395 +                if (key.compareTo((T) c) <= 0)
      396 +                    break;
      397 +                array[k] = c;
      398 +                k = child;
      399 +            }
      400 +            array[k] = key;
 398  401          }
 399      -        array[k] = key;
 400  402      }
 401  403  
 402  404      private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
 403  405                                                      int n,
 404  406                                                      Comparator<? super T> cmp) {
 405      -        int half = n >>> 1;
 406      -        while (k < half) {
 407      -            int child = (k << 1) + 1;
 408      -            Object c = array[child];
 409      -            int right = child + 1;
 410      -            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
 411      -                c = array[child = right];
 412      -            if (cmp.compare(x, (T) c) <= 0)
 413      -                break;
 414      -            array[k] = c;
 415      -            k = child;
      407 +        if (n > 0) {
      408 +            int half = n >>> 1;
      409 +            while (k < half) {
      410 +                int child = (k << 1) + 1;
      411 +                Object c = array[child];
      412 +                int right = child + 1;
      413 +                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
      414 +                    c = array[child = right];
      415 +                if (cmp.compare(x, (T) c) <= 0)
      416 +                    break;
      417 +                array[k] = c;
      418 +                k = child;
      419 +            }
      420 +            array[k] = x;
 416  421          }
 417      -        array[k] = x;
 418  422      }
 419  423  
 420  424      /**
 421  425       * Establishes the heap invariant (described above) in the entire tree,
 422  426       * assuming nothing about the order of the elements prior to the call.
 423  427       */
 424  428      private void heapify() {
 425  429          Object[] array = queue;
 426  430          int n = size;
 427  431          int half = (n >>> 1) - 1;
↓ open down ↓ 85 lines elided ↑ open up ↑
 513  517       *         priority queue's ordering
 514  518       * @throws NullPointerException if the specified element is null
 515  519       */
 516  520      public boolean offer(E e, long timeout, TimeUnit unit) {
 517  521          return offer(e); // never need to block
 518  522      }
 519  523  
 520  524      public E poll() {
 521  525          final ReentrantLock lock = this.lock;
 522  526          lock.lock();
 523      -        E result;
 524  527          try {
 525      -            result = extract();
      528 +            return dequeue();
 526  529          } finally {
 527  530              lock.unlock();
 528  531          }
 529      -        return result;
 530  532      }
 531  533  
 532  534      public E take() throws InterruptedException {
 533  535          final ReentrantLock lock = this.lock;
 534  536          lock.lockInterruptibly();
 535  537          E result;
 536  538          try {
 537      -            while ( (result = extract()) == null)
      539 +            while ( (result = dequeue()) == null)
 538  540                  notEmpty.await();
 539  541          } finally {
 540  542              lock.unlock();
 541  543          }
 542  544          return result;
 543  545      }
 544  546  
 545  547      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 546  548          long nanos = unit.toNanos(timeout);
 547  549          final ReentrantLock lock = this.lock;
 548  550          lock.lockInterruptibly();
 549  551          E result;
 550  552          try {
 551      -            while ( (result = extract()) == null && nanos > 0)
      553 +            while ( (result = dequeue()) == null && nanos > 0)
 552  554                  nanos = notEmpty.awaitNanos(nanos);
 553  555          } finally {
 554  556              lock.unlock();
 555  557          }
 556  558          return result;
 557  559      }
 558  560  
 559  561      public E peek() {
 560  562          final ReentrantLock lock = this.lock;
 561  563          lock.lock();
 562      -        E result;
 563  564          try {
 564      -            result = size > 0 ? (E) queue[0] : null;
      565 +            return (size == 0) ? null : (E) queue[0];
 565  566          } finally {
 566  567              lock.unlock();
 567  568          }
 568      -        return result;
 569  569      }
 570  570  
 571  571      /**
 572  572       * Returns the comparator used to order the elements in this queue,
 573  573       * or {@code null} if this queue uses the {@linkplain Comparable
 574  574       * natural ordering} of its elements.
 575  575       *
 576  576       * @return the comparator used to order the elements in this queue,
 577  577       *         or {@code null} if this queue uses the natural
 578  578       *         ordering of its elements
↓ open down ↓ 63 lines elided ↑ open up ↑
 642  642       * if it is present.  More formally, removes an element {@code e} such
 643  643       * that {@code o.equals(e)}, if this queue contains one or more such
 644  644       * elements.  Returns {@code true} if and only if this queue contained
 645  645       * the specified element (or equivalently, if this queue changed as a
 646  646       * result of the call).
 647  647       *
 648  648       * @param o element to be removed from this queue, if present
 649  649       * @return {@code true} if this queue changed as a result of the call
 650  650       */
 651  651      public boolean remove(Object o) {
 652      -        boolean removed = false;
 653  652          final ReentrantLock lock = this.lock;
 654  653          lock.lock();
 655  654          try {
 656  655              int i = indexOf(o);
 657      -            if (i != -1) {
 658      -                removeAt(i);
 659      -                removed = true;
 660      -            }
      656 +            if (i == -1)
      657 +                return false;
      658 +            removeAt(i);
      659 +            return true;
 661  660          } finally {
 662  661              lock.unlock();
 663  662          }
 664      -        return removed;
 665  663      }
 666  664  
 667      -
 668  665      /**
 669  666       * Identity-based version for use in Itr.remove
 670  667       */
 671      -    private void removeEQ(Object o) {
      668 +    void removeEQ(Object o) {
 672  669          final ReentrantLock lock = this.lock;
 673  670          lock.lock();
 674  671          try {
 675  672              Object[] array = queue;
 676      -            int n = size;
 677      -            for (int i = 0; i < n; i++) {
      673 +            for (int i = 0, n = size; i < n; i++) {
 678  674                  if (o == array[i]) {
 679  675                      removeAt(i);
 680  676                      break;
 681  677                  }
 682  678              }
 683  679          } finally {
 684  680              lock.unlock();
 685  681          }
 686  682      }
 687  683  
 688  684      /**
 689  685       * Returns {@code true} if this queue contains the specified element.
 690  686       * More formally, returns {@code true} if and only if this queue contains
 691  687       * at least one element {@code e} such that {@code o.equals(e)}.
 692  688       *
 693  689       * @param o object to be checked for containment in this queue
 694  690       * @return {@code true} if this queue contains the specified element
 695  691       */
 696  692      public boolean contains(Object o) {
 697      -        int index;
 698  693          final ReentrantLock lock = this.lock;
 699  694          lock.lock();
 700  695          try {
 701      -            index = indexOf(o);
      696 +            return indexOf(o) != -1;
 702  697          } finally {
 703  698              lock.unlock();
 704  699          }
 705      -        return index != -1;
 706  700      }
 707  701  
 708  702      /**
 709  703       * Returns an array containing all of the elements in this queue.
 710  704       * The returned array elements are in no particular order.
 711  705       *
 712  706       * <p>The returned array will be "safe" in that no references to it are
 713  707       * maintained by this queue.  (In other words, this method must allocate
 714  708       * a new array).  The caller is thus free to modify the returned array.
 715  709       *
↓ open down ↓ 5 lines elided ↑ open up ↑
 721  715      public Object[] toArray() {
 722  716          final ReentrantLock lock = this.lock;
 723  717          lock.lock();
 724  718          try {
 725  719              return Arrays.copyOf(queue, size);
 726  720          } finally {
 727  721              lock.unlock();
 728  722          }
 729  723      }
 730  724  
 731      -
 732  725      public String toString() {
 733  726          final ReentrantLock lock = this.lock;
 734  727          lock.lock();
 735  728          try {
 736  729              int n = size;
 737  730              if (n == 0)
 738  731                  return "[]";
 739  732              StringBuilder sb = new StringBuilder();
 740  733              sb.append('[');
 741  734              for (int i = 0; i < n; ++i) {
 742      -                E e = (E)queue[i];
      735 +                Object e = queue[i];
 743  736                  sb.append(e == this ? "(this Collection)" : e);
 744  737                  if (i != n - 1)
 745  738                      sb.append(',').append(' ');
 746  739              }
 747  740              return sb.append(']').toString();
 748  741          } finally {
 749  742              lock.unlock();
 750  743          }
 751  744      }
 752  745  
 753  746      /**
 754  747       * @throws UnsupportedOperationException {@inheritDoc}
 755  748       * @throws ClassCastException            {@inheritDoc}
 756  749       * @throws NullPointerException          {@inheritDoc}
 757  750       * @throws IllegalArgumentException      {@inheritDoc}
 758  751       */
 759  752      public int drainTo(Collection<? super E> c) {
 760      -        if (c == null)
 761      -            throw new NullPointerException();
 762      -        if (c == this)
 763      -            throw new IllegalArgumentException();
 764      -        final ReentrantLock lock = this.lock;
 765      -        lock.lock();
 766      -        try {
 767      -            int n = 0;
 768      -            E e;
 769      -            while ( (e = extract()) != null) {
 770      -                c.add(e);
 771      -                ++n;
 772      -            }
 773      -            return n;
 774      -        } finally {
 775      -            lock.unlock();
 776      -        }
      753 +        return drainTo(c, Integer.MAX_VALUE);
 777  754      }
 778  755  
 779  756      /**
 780  757       * @throws UnsupportedOperationException {@inheritDoc}
 781  758       * @throws ClassCastException            {@inheritDoc}
 782  759       * @throws NullPointerException          {@inheritDoc}
 783  760       * @throws IllegalArgumentException      {@inheritDoc}
 784  761       */
 785  762      public int drainTo(Collection<? super E> c, int maxElements) {
 786  763          if (c == null)
 787  764              throw new NullPointerException();
 788  765          if (c == this)
 789  766              throw new IllegalArgumentException();
 790  767          if (maxElements <= 0)
 791  768              return 0;
 792  769          final ReentrantLock lock = this.lock;
 793  770          lock.lock();
 794  771          try {
 795      -            int n = 0;
 796      -            E e;
 797      -            while (n < maxElements && (e = extract()) != null) {
 798      -                c.add(e);
 799      -                ++n;
      772 +            int n = Math.min(size, maxElements);
      773 +            for (int i = 0; i < n; i++) {
      774 +                c.add((E) queue[0]); // In this order, in case add() throws.
      775 +                dequeue();
 800  776              }
 801  777              return n;
 802  778          } finally {
 803  779              lock.unlock();
 804  780          }
 805  781      }
 806  782  
 807  783      /**
 808  784       * Atomically removes all of the elements from this queue.
 809  785       * The queue will be empty after this call returns.
↓ open down ↓ 27 lines elided ↑ open up ↑
 837  813       *
 838  814       * <p>Like the {@link #toArray()} method, this method acts as bridge between
 839  815       * array-based and collection-based APIs.  Further, this method allows
 840  816       * precise control over the runtime type of the output array, and may,
 841  817       * under certain circumstances, be used to save allocation costs.
 842  818       *
 843  819       * <p>Suppose {@code x} is a queue known to contain only strings.
 844  820       * The following code can be used to dump the queue into a newly
 845  821       * allocated array of {@code String}:
 846  822       *
 847      -     * <pre>
 848      -     *     String[] y = x.toArray(new String[0]);</pre>
      823 +     *  <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
 849  824       *
 850  825       * Note that {@code toArray(new Object[0])} is identical in function to
 851  826       * {@code toArray()}.
 852  827       *
 853  828       * @param a the array into which the elements of the queue are to
 854  829       *          be stored, if it is big enough; otherwise, a new array of the
 855  830       *          same runtime type is allocated for this purpose
 856  831       * @return an array containing all of the elements in this queue
 857  832       * @throws ArrayStoreException if the runtime type of the specified array
 858  833       *         is not a supertype of the runtime type of every element in
↓ open down ↓ 32 lines elided ↑ open up ↑
 891  866       */
 892  867      public Iterator<E> iterator() {
 893  868          return new Itr(toArray());
 894  869      }
 895  870  
 896  871      /**
 897  872       * Snapshot iterator that works off copy of underlying q array.
 898  873       */
 899  874      final class Itr implements Iterator<E> {
 900  875          final Object[] array; // Array of all elements
 901      -        int cursor;           // index of next element to return;
      876 +        int cursor;           // index of next element to return
 902  877          int lastRet;          // index of last element, or -1 if no such
 903  878  
 904  879          Itr(Object[] array) {
 905  880              lastRet = -1;
 906  881              this.array = array;
 907  882          }
 908  883  
 909  884          public boolean hasNext() {
 910  885              return cursor < array.length;
 911  886          }
↓ open down ↓ 7 lines elided ↑ open up ↑
 919  894  
 920  895          public void remove() {
 921  896              if (lastRet < 0)
 922  897                  throw new IllegalStateException();
 923  898              removeEQ(array[lastRet]);
 924  899              lastRet = -1;
 925  900          }
 926  901      }
 927  902  
 928  903      /**
 929      -     * Saves the state to a stream (that is, serializes it).  For
 930      -     * compatibility with previous version of this class,
 931      -     * elements are first copied to a java.util.PriorityQueue,
 932      -     * which is then serialized.
      904 +     * Saves this queue to a stream (that is, serializes it).
      905 +     *
      906 +     * For compatibility with previous version of this class, elements
      907 +     * are first copied to a java.util.PriorityQueue, which is then
      908 +     * serialized.
 933  909       */
 934  910      private void writeObject(java.io.ObjectOutputStream s)
 935  911          throws java.io.IOException {
 936  912          lock.lock();
 937  913          try {
 938      -            int n = size; // avoid zero capacity argument
 939      -            q = new PriorityQueue<E>(n == 0 ? 1 : n, comparator);
      914 +            // avoid zero capacity argument
      915 +            q = new PriorityQueue<E>(Math.max(size, 1), comparator);
 940  916              q.addAll(this);
 941  917              s.defaultWriteObject();
 942  918          } finally {
 943  919              q = null;
 944  920              lock.unlock();
 945  921          }
 946  922      }
 947  923  
 948  924      /**
 949      -     * Reconstitutes the {@code PriorityBlockingQueue} instance from a stream
 950      -     * (that is, deserializes it).
 951      -     *
 952      -     * @param s the stream
      925 +     * Reconstitutes this queue from a stream (that is, deserializes it).
 953  926       */
 954  927      private void readObject(java.io.ObjectInputStream s)
 955  928          throws java.io.IOException, ClassNotFoundException {
 956  929          try {
 957  930              s.defaultReadObject();
 958  931              this.queue = new Object[q.size()];
 959  932              comparator = q.comparator();
 960  933              addAll(q);
 961  934          } finally {
 962  935              q = null;
↓ open down ↓ 17 lines elided ↑ open up ↑
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX