Print this page


Split Close
Expand all
Collapse all
          --- old/src/share/classes/java/util/concurrent/LinkedTransferQueue.java
          +++ new/src/share/classes/java/util/concurrent/LinkedTransferQueue.java
↓ open down ↓ 29 lines elided ↑ open up ↑
  30   30   *
  31   31   * Written by Doug Lea with assistance from members of JCP JSR-166
  32   32   * Expert Group and released to the public domain, as explained at
  33   33   * http://creativecommons.org/licenses/publicdomain
  34   34   */
  35   35  
  36   36  package java.util.concurrent;
  37   37  
  38   38  import java.util.AbstractQueue;
  39   39  import java.util.Collection;
  40      -import java.util.ConcurrentModificationException;
  41   40  import java.util.Iterator;
  42   41  import java.util.NoSuchElementException;
  43   42  import java.util.Queue;
       43 +import java.util.concurrent.TimeUnit;
  44   44  import java.util.concurrent.locks.LockSupport;
  45   45  
  46   46  /**
  47   47   * An unbounded {@link TransferQueue} based on linked nodes.
  48   48   * This queue orders elements FIFO (first-in-first-out) with respect
  49   49   * to any given producer.  The <em>head</em> of the queue is that
  50   50   * element that has been on the queue the longest time for some
  51   51   * producer.  The <em>tail</em> of the queue is that element that has
  52   52   * been on the queue the shortest time for some producer.
  53   53   *
↓ open down ↓ 389 lines elided ↑ open up ↑
 443  443          volatile Object item;   // initially non-null if isData; CASed to match
 444  444          volatile Node next;
 445  445          volatile Thread waiter; // null until waiting
 446  446  
 447  447          // CAS methods for fields
 448  448          final boolean casNext(Node cmp, Node val) {
 449  449              return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
 450  450          }
 451  451  
 452  452          final boolean casItem(Object cmp, Object val) {
 453      -            //            assert cmp == null || cmp.getClass() != Node.class;
      453 +            // assert cmp == null || cmp.getClass() != Node.class;
 454  454              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
 455  455          }
 456  456  
 457  457          /**
 458  458           * Constructs a new node.  Uses relaxed write because item can
 459  459           * only be seen after publication via casNext.
 460  460           */
 461  461          Node(Object item, boolean isData) {
 462  462              UNSAFE.putObject(this, itemOffset, item); // relaxed write
 463  463              this.isData = isData;
↓ open down ↓ 45 lines elided ↑ open up ↑
 509  509          final boolean cannotPrecede(boolean haveData) {
 510  510              boolean d = isData;
 511  511              Object x;
 512  512              return d != haveData && (x = item) != this && (x != null) == d;
 513  513          }
 514  514  
 515  515          /**
 516  516           * Tries to artificially match a data node -- used by remove.
 517  517           */
 518  518          final boolean tryMatchData() {
 519      -            //            assert isData;
      519 +            // assert isData;
 520  520              Object x = item;
 521  521              if (x != null && x != this && casItem(x, null)) {
 522  522                  LockSupport.unpark(waiter);
 523  523                  return true;
 524  524              }
 525  525              return false;
 526  526          }
 527  527  
 528  528          // Unsafe mechanics
 529  529          private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
↓ open down ↓ 32 lines elided ↑ open up ↑
 562  562      /*
 563  563       * Possible values for "how" argument in xfer method.
 564  564       */
 565  565      private static final int NOW   = 0; // for untimed poll, tryTransfer
 566  566      private static final int ASYNC = 1; // for offer, put, add
 567  567      private static final int SYNC  = 2; // for transfer, take
 568  568      private static final int TIMED = 3; // for timed poll, tryTransfer
 569  569  
 570  570      @SuppressWarnings("unchecked")
 571  571      static <E> E cast(Object item) {
 572      -        //        assert item == null || item.getClass() != Node.class;
      572 +        // assert item == null || item.getClass() != Node.class;
 573  573          return (E) item;
 574  574      }
 575  575  
 576  576      /**
 577  577       * Implements all queuing methods. See above for explanation.
 578  578       *
 579  579       * @param e the item or null for take
 580  580       * @param haveData true if this is a put, else a take
 581  581       * @param how NOW, ASYNC, SYNC, or TIMED
 582  582       * @param nanos timeout in nanosecs, used only if mode is TIMED
 583  583       * @return an item if matched, else e
 584  584       * @throws NullPointerException if haveData mode but e is null
 585  585       */
 586  586      private E xfer(E e, boolean haveData, int how, long nanos) {
 587  587          if (haveData && (e == null))
 588  588              throw new NullPointerException();
 589  589          Node s = null;                        // the node to append, if needed
 590  590  
 591      -        retry: for (;;) {                     // restart on append race
      591 +        retry:
      592 +        for (;;) {                            // restart on append race
 592  593  
 593  594              for (Node h = head, p = h; p != null;) { // find & match first node
 594  595                  boolean isData = p.isData;
 595  596                  Object item = p.item;
 596  597                  if (item != p && (item != null) == isData) { // unmatched
 597  598                      if (isData == haveData)   // can't match
 598  599                          break;
 599  600                      if (p.casItem(item, e)) { // match
 600  601                          for (Node q = p; q != h;) {
 601  602                              Node n = q.next;  // update by 2 unless singleton
 602      -                            if (head == h && casHead(h, n == null? q : n)) {
      603 +                            if (head == h && casHead(h, n == null ? q : n)) {
 603  604                                  h.forgetNext();
 604  605                                  break;
 605  606                              }                 // advance and retry
 606  607                              if ((h = head)   == null ||
 607  608                                  (q = h.next) == null || !q.isMatched())
 608  609                                  break;        // unless slack < 2
 609  610                          }
 610  611                          LockSupport.unpark(p.waiter);
 611  612                          return this.<E>cast(item);
 612  613                      }
↓ open down ↓ 64 lines elided ↑ open up ↑
 677  678       */
 678  679      private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
 679  680          long lastTime = timed ? System.nanoTime() : 0L;
 680  681          Thread w = Thread.currentThread();
 681  682          int spins = -1; // initialized after first item and cancel checks
 682  683          ThreadLocalRandom randomYields = null; // bound if needed
 683  684  
 684  685          for (;;) {
 685  686              Object item = s.item;
 686  687              if (item != e) {                  // matched
 687      -                //                assert item != s;
      688 +                // assert item != s;
 688  689                  s.forgetContents();           // avoid garbage
 689  690                  return this.<E>cast(item);
 690  691              }
 691  692              if ((w.isInterrupted() || (timed && nanos <= 0)) &&
 692  693                      s.casItem(e, s)) {        // cancel
 693  694                  unsplice(pred, s);
 694  695                  return e;
 695  696              }
 696  697  
 697  698              if (spins < 0) {                  // establish spins at/near front
↓ open down ↓ 104 lines elided ↑ open up ↑
 802  803      final class Itr implements Iterator<E> {
 803  804          private Node nextNode;   // next node to return item for
 804  805          private E nextItem;      // the corresponding item
 805  806          private Node lastRet;    // last returned node, to support remove
 806  807          private Node lastPred;   // predecessor to unlink lastRet
 807  808  
 808  809          /**
 809  810           * Moves to next node after prev, or first node if prev null.
 810  811           */
 811  812          private void advance(Node prev) {
 812      -            lastPred = lastRet;
 813      -            lastRet = prev;
 814      -            for (Node p = (prev == null) ? head : succ(prev);
 815      -                 p != null; p = succ(p)) {
 816      -                Object item = p.item;
 817      -                if (p.isData) {
 818      -                    if (item != null && item != p) {
 819      -                        nextItem = LinkedTransferQueue.this.<E>cast(item);
 820      -                        nextNode = p;
      813 +            /*
      814 +             * To track and avoid buildup of deleted nodes in the face
      815 +             * of calls to both Queue.remove and Itr.remove, we must
      816 +             * include variants of unsplice and sweep upon each
      817 +             * advance: Upon Itr.remove, we may need to catch up links
      818 +             * from lastPred, and upon other removes, we might need to
      819 +             * skip ahead from stale nodes and unsplice deleted ones
      820 +             * found while advancing.
      821 +             */
      822 +
      823 +            Node r, b; // reset lastPred upon possible deletion of lastRet
      824 +            if ((r = lastRet) != null && !r.isMatched())
      825 +                lastPred = r;    // next lastPred is old lastRet
      826 +            else if ((b = lastPred) == null || b.isMatched())
      827 +                lastPred = null; // at start of list
      828 +            else {
      829 +                Node s, n;       // help with removal of lastPred.next
      830 +                while ((s = b.next) != null &&
      831 +                       s != b && s.isMatched() &&
      832 +                       (n = s.next) != null && n != s)
      833 +                    b.casNext(s, n);
      834 +            }
      835 +
      836 +            this.lastRet = prev;
      837 +
      838 +            for (Node p = prev, s, n;;) {
      839 +                s = (p == null) ? head : p.next;
      840 +                if (s == null)
      841 +                    break;
      842 +                else if (s == p) {
      843 +                    p = null;
      844 +                    continue;
      845 +                }
      846 +                Object item = s.item;
      847 +                if (s.isData) {
      848 +                    if (item != null && item != s) {
      849 +                        nextItem = LinkedTransferQueue.<E>cast(item);
      850 +                        nextNode = s;
 821  851                          return;
 822  852                      }
 823  853                  }
 824  854                  else if (item == null)
 825  855                      break;
      856 +                // assert s.isMatched();
      857 +                if (p == null)
      858 +                    p = s;
      859 +                else if ((n = s.next) == null)
      860 +                    break;
      861 +                else if (s == n)
      862 +                    p = null;
      863 +                else
      864 +                    p.casNext(s, n);
 826  865              }
 827  866              nextNode = null;
      867 +            nextItem = null;
 828  868          }
 829  869  
 830  870          Itr() {
 831  871              advance(null);
 832  872          }
 833  873  
 834  874          public final boolean hasNext() {
 835  875              return nextNode != null;
 836  876          }
 837  877  
 838  878          public final E next() {
 839  879              Node p = nextNode;
 840  880              if (p == null) throw new NoSuchElementException();
 841  881              E e = nextItem;
 842  882              advance(p);
 843  883              return e;
 844  884          }
 845  885  
 846  886          public final void remove() {
 847      -            Node p = lastRet;
 848      -            if (p == null) throw new IllegalStateException();
 849      -            if (p.tryMatchData())
 850      -                unsplice(lastPred, p);
      887 +            final Node lastRet = this.lastRet;
      888 +            if (lastRet == null)
      889 +                throw new IllegalStateException();
      890 +            this.lastRet = null;
      891 +            if (lastRet.tryMatchData())
      892 +                unsplice(lastPred, lastRet);
 851  893          }
 852  894      }
 853  895  
 854  896      /* -------------- Removal methods -------------- */
 855  897  
 856  898      /**
 857  899       * Unsplices (now or later) the given deleted/cancelled node with
 858  900       * the given predecessor.
 859  901       *
 860  902       * @param pred a node that was at one time known to be the
↓ open down ↓ 129 lines elided ↑ open up ↑
 990 1032       */
 991 1033      public boolean offer(E e, long timeout, TimeUnit unit) {
 992 1034          xfer(e, true, ASYNC, 0);
 993 1035          return true;
 994 1036      }
 995 1037  
 996 1038      /**
 997 1039       * Inserts the specified element at the tail of this queue.
 998 1040       * As the queue is unbounded, this method will never return {@code false}.
 999 1041       *
1000      -     * @return {@code true} (as specified by
1001      -     *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
     1042 +     * @return {@code true} (as specified by {@link Queue#offer})
1002 1043       * @throws NullPointerException if the specified element is null
1003 1044       */
1004 1045      public boolean offer(E e) {
1005 1046          xfer(e, true, ASYNC, 0);
1006 1047          return true;
1007 1048      }
1008 1049  
1009 1050      /**
1010 1051       * Inserts the specified element at the tail of this queue.
1011 1052       * As the queue is unbounded, this method will never throw
↓ open down ↓ 111 lines elided ↑ open up ↑
1123 1164          int n = 0;
1124 1165          E e;
1125 1166          while (n < maxElements && (e = poll()) != null) {
1126 1167              c.add(e);
1127 1168              ++n;
1128 1169          }
1129 1170          return n;
1130 1171      }
1131 1172  
1132 1173      /**
1133      -     * Returns an iterator over the elements in this queue in proper
1134      -     * sequence, from head to tail.
     1174 +     * Returns an iterator over the elements in this queue in proper sequence.
     1175 +     * The elements will be returned in order from first (head) to last (tail).
1135 1176       *
1136 1177       * <p>The returned iterator is a "weakly consistent" iterator that
1137      -     * will never throw
1138      -     * {@link ConcurrentModificationException ConcurrentModificationException},
1139      -     * and guarantees to traverse elements as they existed upon
1140      -     * construction of the iterator, and may (but is not guaranteed
1141      -     * to) reflect any modifications subsequent to construction.
     1178 +     * will never throw {@link java.util.ConcurrentModificationException
     1179 +     * ConcurrentModificationException}, and guarantees to traverse
     1180 +     * elements as they existed upon construction of the iterator, and
     1181 +     * may (but is not guaranteed to) reflect any modifications
     1182 +     * subsequent to construction.
1142 1183       *
1143 1184       * @return an iterator over the elements in this queue in proper sequence
1144 1185       */
1145 1186      public Iterator<E> iterator() {
1146 1187          return new Itr();
1147 1188      }
1148 1189  
1149 1190      public E peek() {
1150 1191          return firstDataItem();
1151 1192      }
↓ open down ↓ 43 lines elided ↑ open up ↑
1195 1236       * Returns {@code true} if this queue contained the specified element
1196 1237       * (or equivalently, if this queue changed as a result of the call).
1197 1238       *
1198 1239       * @param o element to be removed from this queue, if present
1199 1240       * @return {@code true} if this queue changed as a result of the call
1200 1241       */
1201 1242      public boolean remove(Object o) {
1202 1243          return findAndRemove(o);
1203 1244      }
1204 1245  
     1246 +    /**
     1247 +     * Returns {@code true} if this queue contains the specified element.
     1248 +     * More formally, returns {@code true} if and only if this queue contains
     1249 +     * at least one element {@code e} such that {@code o.equals(e)}.
     1250 +     *
     1251 +     * @param o object to be checked for containment in this queue
     1252 +     * @return {@code true} if this queue contains the specified element
     1253 +     */
     1254 +    public boolean contains(Object o) {
     1255 +        if (o == null) return false;
     1256 +        for (Node p = head; p != null; p = succ(p)) {
     1257 +            Object item = p.item;
     1258 +            if (p.isData) {
     1259 +                if (item != null && item != p && o.equals(item))
     1260 +                    return true;
     1261 +            }
     1262 +            else if (item == null)
     1263 +                break;
     1264 +        }
     1265 +        return false;
     1266 +    }
     1267 +
1205 1268      /**
1206 1269       * Always returns {@code Integer.MAX_VALUE} because a
1207 1270       * {@code LinkedTransferQueue} is not capacity constrained.
1208 1271       *
1209 1272       * @return {@code Integer.MAX_VALUE} (as specified by
1210 1273       *         {@link BlockingQueue#remainingCapacity()})
1211 1274       */
1212 1275      public int remainingCapacity() {
1213 1276          return Integer.MAX_VALUE;
1214 1277      }
↓ open down ↓ 57 lines elided ↑ open up ↑
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX