src/share/classes/java/util/concurrent/LinkedTransferQueue.java

Print this page




  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.AbstractQueue;
  39 import java.util.Collection;
  40 import java.util.ConcurrentModificationException;
  41 import java.util.Iterator;
  42 import java.util.NoSuchElementException;
  43 import java.util.Queue;

  44 import java.util.concurrent.locks.LockSupport;
  45 
  46 /**
  47  * An unbounded {@link TransferQueue} based on linked nodes.
  48  * This queue orders elements FIFO (first-in-first-out) with respect
  49  * to any given producer.  The <em>head</em> of the queue is that
  50  * element that has been on the queue the longest time for some
  51  * producer.  The <em>tail</em> of the queue is that element that has
  52  * been on the queue the shortest time for some producer.
  53  *
  54  * <p>Beware that, unlike in most collections, the {@code size}
  55  * method is <em>NOT</em> a constant-time operation. Because of the
  56  * asynchronous nature of these queues, determining the current number
  57  * of elements requires a traversal of the elements.
  58  *
  59  * <p>This class and its iterator implement all of the
  60  * <em>optional</em> methods of the {@link Collection} and {@link
  61  * Iterator} interfaces.
  62  *
  63  * <p>Memory consistency effects: As with other concurrent


 571     static <E> E cast(Object item) {
 572         //        assert item == null || item.getClass() != Node.class;
 573         return (E) item;
 574     }
 575 
 576     /**
 577      * Implements all queuing methods. See above for explanation.
 578      *
 579      * @param e the item or null for take
 580      * @param haveData true if this is a put, else a take
 581      * @param how NOW, ASYNC, SYNC, or TIMED
 582      * @param nanos timeout in nanosecs, used only if mode is TIMED
 583      * @return an item if matched, else e
 584      * @throws NullPointerException if haveData mode but e is null
 585      */
 586     private E xfer(E e, boolean haveData, int how, long nanos) {
 587         if (haveData && (e == null))
 588             throw new NullPointerException();
 589         Node s = null;                        // the node to append, if needed
 590 
 591         retry: for (;;) {                     // restart on append race

 592 
 593             for (Node h = head, p = h; p != null;) { // find & match first node
 594                 boolean isData = p.isData;
 595                 Object item = p.item;
 596                 if (item != p && (item != null) == isData) { // unmatched
 597                     if (isData == haveData)   // can't match
 598                         break;
 599                     if (p.casItem(item, e)) { // match
 600                         for (Node q = p; q != h;) {
 601                             Node n = q.next;  // update by 2 unless singleton
 602                             if (head == h && casHead(h, n == null? q : n)) {
 603                                 h.forgetNext();
 604                                 break;
 605                             }                 // advance and retry
 606                             if ((h = head)   == null ||
 607                                 (q = h.next) == null || !q.isMatched())
 608                                 break;        // unless slack < 2
 609                         }
 610                         LockSupport.unpark(p.waiter);
 611                         return this.<E>cast(item);
 612                     }
 613                 }
 614                 Node n = p.next;
 615                 p = (p != n) ? n : (h = head); // Use head if p offlist
 616             }
 617 
 618             if (how != NOW) {                 // No matches available
 619                 if (s == null)
 620                     s = new Node(e, haveData);
 621                 Node pred = tryAppend(s, haveData);
 622                 if (pred == null)


 792             if (n != p)
 793                 p = n;
 794             else {
 795                 count = 0;
 796                 p = head;
 797             }
 798         }
 799         return count;
 800     }
 801 
 802     final class Itr implements Iterator<E> {
 803         private Node nextNode;   // next node to return item for
 804         private E nextItem;      // the corresponding item
 805         private Node lastRet;    // last returned node, to support remove
 806         private Node lastPred;   // predecessor to unlink lastRet
 807 
 808         /**
 809          * Moves to next node after prev, or first node if prev null.
 810          */
 811         private void advance(Node prev) {
 812             lastPred = lastRet;
 813             lastRet = prev;
 814             for (Node p = (prev == null) ? head : succ(prev);
 815                  p != null; p = succ(p)) {
 816                 Object item = p.item;
 817                 if (p.isData) {
 818                     if (item != null && item != p) {
 819                         nextItem = LinkedTransferQueue.this.<E>cast(item);
 820                         nextNode = p;





























 821                         return;
 822                     }
 823                 }
 824                 else if (item == null)
 825                     break;









 826             }
 827             nextNode = null;

 828         }
 829 
 830         Itr() {
 831             advance(null);
 832         }
 833 
 834         public final boolean hasNext() {
 835             return nextNode != null;
 836         }
 837 
 838         public final E next() {
 839             Node p = nextNode;
 840             if (p == null) throw new NoSuchElementException();
 841             E e = nextItem;
 842             advance(p);
 843             return e;
 844         }
 845 
 846         public final void remove() {
 847             Node p = lastRet;
 848             if (p == null) throw new IllegalStateException();
 849             if (p.tryMatchData())
 850                 unsplice(lastPred, p);


 851         }
 852     }
 853 
 854     /* -------------- Removal methods -------------- */
 855 
 856     /**
 857      * Unsplices (now or later) the given deleted/cancelled node with
 858      * the given predecessor.
 859      *
 860      * @param pred a node that was at one time known to be the
 861      * predecessor of s, or null or s itself if s is/was at head
 862      * @param s the node to be unspliced
 863      */
 864     final void unsplice(Node pred, Node s) {
 865         s.forgetContents(); // forget unneeded fields
 866         /*
 867          * See above for rationale. Briefly: if pred still points to
 868          * s, try to unlink s.  If s cannot be unlinked, because it is
 869          * trailing node or pred might be unlinked, and neither pred
 870          * nor s are head or offlist, add to sweepVotes, and if enough


 980     }
 981 
 982     /**
 983      * Inserts the specified element at the tail of this queue.
 984      * As the queue is unbounded, this method will never block or
 985      * return {@code false}.
 986      *
 987      * @return {@code true} (as specified by
 988      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
 989      * @throws NullPointerException if the specified element is null
 990      */
 991     public boolean offer(E e, long timeout, TimeUnit unit) {
 992         xfer(e, true, ASYNC, 0);
 993         return true;
 994     }
 995 
 996     /**
 997      * Inserts the specified element at the tail of this queue.
 998      * As the queue is unbounded, this method will never return {@code false}.
 999      *
1000      * @return {@code true} (as specified by
1001      *         {@link BlockingQueue#offer(Object) BlockingQueue.offer})
1002      * @throws NullPointerException if the specified element is null
1003      */
1004     public boolean offer(E e) {
1005         xfer(e, true, ASYNC, 0);
1006         return true;
1007     }
1008 
1009     /**
1010      * Inserts the specified element at the tail of this queue.
1011      * As the queue is unbounded, this method will never throw
1012      * {@link IllegalStateException} or return {@code false}.
1013      *
1014      * @return {@code true} (as specified by {@link Collection#add})
1015      * @throws NullPointerException if the specified element is null
1016      */
1017     public boolean add(E e) {
1018         xfer(e, true, ASYNC, 0);
1019         return true;
1020     }
1021 


1113 
1114     /**
1115      * @throws NullPointerException     {@inheritDoc}
1116      * @throws IllegalArgumentException {@inheritDoc}
1117      */
1118     public int drainTo(Collection<? super E> c, int maxElements) {
1119         if (c == null)
1120             throw new NullPointerException();
1121         if (c == this)
1122             throw new IllegalArgumentException();
1123         int n = 0;
1124         E e;
1125         while (n < maxElements && (e = poll()) != null) {
1126             c.add(e);
1127             ++n;
1128         }
1129         return n;
1130     }
1131 
1132     /**
1133      * Returns an iterator over the elements in this queue in proper
1134      * sequence, from head to tail.
1135      *
1136      * <p>The returned iterator is a "weakly consistent" iterator that
1137      * will never throw
1138      * {@link ConcurrentModificationException ConcurrentModificationException},
1139      * and guarantees to traverse elements as they existed upon
1140      * construction of the iterator, and may (but is not guaranteed
1141      * to) reflect any modifications subsequent to construction.
1142      *
1143      * @return an iterator over the elements in this queue in proper sequence
1144      */
1145     public Iterator<E> iterator() {
1146         return new Itr();
1147     }
1148 
1149     public E peek() {
1150         return firstDataItem();
1151     }
1152 
1153     /**
1154      * Returns {@code true} if this queue contains no elements.
1155      *
1156      * @return {@code true} if this queue contains no elements
1157      */
1158     public boolean isEmpty() {
1159         for (Node p = head; p != null; p = succ(p)) {
1160             if (!p.isMatched())
1161                 return !p.isData;


1185 
1186     public int getWaitingConsumerCount() {
1187         return countOfMode(false);
1188     }
1189 
1190     /**
1191      * Removes a single instance of the specified element from this queue,
1192      * if it is present.  More formally, removes an element {@code e} such
1193      * that {@code o.equals(e)}, if this queue contains one or more such
1194      * elements.
1195      * Returns {@code true} if this queue contained the specified element
1196      * (or equivalently, if this queue changed as a result of the call).
1197      *
1198      * @param o element to be removed from this queue, if present
1199      * @return {@code true} if this queue changed as a result of the call
1200      */
1201     public boolean remove(Object o) {
1202         return findAndRemove(o);
1203     }
1204 






















1205     /**
1206      * Always returns {@code Integer.MAX_VALUE} because a
1207      * {@code LinkedTransferQueue} is not capacity constrained.
1208      *
1209      * @return {@code Integer.MAX_VALUE} (as specified by
1210      *         {@link BlockingQueue#remainingCapacity()})
1211      */
1212     public int remainingCapacity() {
1213         return Integer.MAX_VALUE;
1214     }
1215 
1216     /**
1217      * Saves the state to a stream (that is, serializes it).
1218      *
1219      * @serialData All of the elements (each an {@code E}) in
1220      * the proper order, followed by a null
1221      * @param s the stream
1222      */
1223     private void writeObject(java.io.ObjectOutputStream s)
1224         throws java.io.IOException {




  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.AbstractQueue;
  39 import java.util.Collection;

  40 import java.util.Iterator;
  41 import java.util.NoSuchElementException;
  42 import java.util.Queue;
  43 import java.util.concurrent.TimeUnit;
  44 import java.util.concurrent.locks.LockSupport;
  45 
  46 /**
  47  * An unbounded {@link TransferQueue} based on linked nodes.
  48  * This queue orders elements FIFO (first-in-first-out) with respect
  49  * to any given producer.  The <em>head</em> of the queue is that
  50  * element that has been on the queue the longest time for some
  51  * producer.  The <em>tail</em> of the queue is that element that has
  52  * been on the queue the shortest time for some producer.
  53  *
  54  * <p>Beware that, unlike in most collections, the {@code size}
  55  * method is <em>NOT</em> a constant-time operation. Because of the
  56  * asynchronous nature of these queues, determining the current number
  57  * of elements requires a traversal of the elements.
  58  *
  59  * <p>This class and its iterator implement all of the
  60  * <em>optional</em> methods of the {@link Collection} and {@link
  61  * Iterator} interfaces.
  62  *
  63  * <p>Memory consistency effects: As with other concurrent


 571     static <E> E cast(Object item) {
 572         // assert item == null || item.getClass() != Node.class;
 573         return (E) item;
 574     }
 575 
 576     /**
 577      * Implements all queuing methods. See above for explanation.
 578      *
 579      * @param e the item or null for take
 580      * @param haveData true if this is a put, else a take
 581      * @param how NOW, ASYNC, SYNC, or TIMED
 582      * @param nanos timeout in nanosecs, used only if mode is TIMED
 583      * @return an item if matched, else e
 584      * @throws NullPointerException if haveData mode but e is null
 585      */
 586     private E xfer(E e, boolean haveData, int how, long nanos) {
 587         if (haveData && (e == null))
 588             throw new NullPointerException();
 589         Node s = null;                        // the node to append, if needed
 590 
 591         retry:
 592         for (;;) {                            // restart on append race
 593 
 594             for (Node h = head, p = h; p != null;) { // find & match first node
 595                 boolean isData = p.isData;
 596                 Object item = p.item;
 597                 if (item != p && (item != null) == isData) { // unmatched
 598                     if (isData == haveData)   // can't match
 599                         break;
 600                     if (p.casItem(item, e)) { // match
 601                         for (Node q = p; q != h;) {
 602                             Node n = q.next;  // update by 2 unless singleton
 603                             if (head == h && casHead(h, n == null ? q : n)) {
 604                                 h.forgetNext();
 605                                 break;
 606                             }                 // advance and retry
 607                             if ((h = head)   == null ||
 608                                 (q = h.next) == null || !q.isMatched())
 609                                 break;        // unless slack < 2
 610                         }
 611                         LockSupport.unpark(p.waiter);
 612                         return this.<E>cast(item);
 613                     }
 614                 }
 615                 Node n = p.next;
 616                 p = (p != n) ? n : (h = head); // Use head if p offlist
 617             }
 618 
 619             if (how != NOW) {                 // No matches available
 620                 if (s == null)
 621                     s = new Node(e, haveData);
 622                 Node pred = tryAppend(s, haveData);
 623                 if (pred == null)


 793             if (n != p)
 794                 p = n;
 795             else {
 796                 count = 0;
 797                 p = head;
 798             }
 799         }
 800         return count;
 801     }
 802 
 803     final class Itr implements Iterator<E> {
 804         private Node nextNode;   // next node to return item for
 805         private E nextItem;      // the corresponding item
 806         private Node lastRet;    // last returned node, to support remove
 807         private Node lastPred;   // predecessor to unlink lastRet
 808 
 809         /**
 810          * Moves to next node after prev, or first node if prev null.
 811          */
 812         private void advance(Node prev) {
 813             /*
 814              * To track and avoid buildup of deleted nodes in the face
 815              * of calls to both Queue.remove and Itr.remove, we must
 816              * include variants of unsplice and sweep upon each
 817              * advance: Upon Itr.remove, we may need to catch up links
 818              * from lastPred, and upon other removes, we might need to
 819              * skip ahead from stale nodes and unsplice deleted ones
 820              * found while advancing.
 821              */
 822 
 823             Node r, b; // reset lastPred upon possible deletion of lastRet
 824             if ((r = lastRet) != null && !r.isMatched())
 825                 lastPred = r;    // next lastPred is old lastRet
 826             else if ((b = lastPred) == null || b.isMatched())
 827                 lastPred = null; // at start of list
 828             else {
 829                 Node s, n;       // help with removal of lastPred.next
 830                 while ((s = b.next) != null &&
 831                        s != b && s.isMatched() &&
 832                        (n = s.next) != null && n != s)
 833                     b.casNext(s, n);
 834             }
 835 
 836             this.lastRet = prev;
 837 
 838             for (Node p = prev, s, n;;) {
 839                 s = (p == null) ? head : p.next;
 840                 if (s == null)
 841                     break;
 842                 else if (s == p) {
 843                     p = null;
 844                     continue;
 845                 }
 846                 Object item = s.item;
 847                 if (s.isData) {
 848                     if (item != null && item != s) {
 849                         nextItem = LinkedTransferQueue.<E>cast(item);
 850                         nextNode = s;
 851                         return;
 852                     }
 853                 }
 854                 else if (item == null)
 855                     break;
 856                 // assert s.isMatched();
 857                 if (p == null)
 858                     p = s;
 859                 else if ((n = s.next) == null)
 860                     break;
 861                 else if (s == n)
 862                     p = null;
 863                 else
 864                     p.casNext(s, n);
 865             }
 866             nextNode = null;
 867             nextItem = null;
 868         }
 869 
 870         Itr() {
 871             advance(null);
 872         }
 873 
 874         public final boolean hasNext() {
 875             return nextNode != null;
 876         }
 877 
 878         public final E next() {
 879             Node p = nextNode;
 880             if (p == null) throw new NoSuchElementException();
 881             E e = nextItem;
 882             advance(p);
 883             return e;
 884         }
 885 
 886         public final void remove() {
 887             final Node lastRet = this.lastRet;
 888             if (lastRet == null)
 889                 throw new IllegalStateException();
 890             this.lastRet = null;
 891             if (lastRet.tryMatchData())
 892                 unsplice(lastPred, lastRet);
 893         }
 894     }
 895 
 896     /* -------------- Removal methods -------------- */
 897 
 898     /**
 899      * Unsplices (now or later) the given deleted/cancelled node with
 900      * the given predecessor.
 901      *
 902      * @param pred a node that was at one time known to be the
 903      * predecessor of s, or null or s itself if s is/was at head
 904      * @param s the node to be unspliced
 905      */
 906     final void unsplice(Node pred, Node s) {
 907         s.forgetContents(); // forget unneeded fields
 908         /*
 909          * See above for rationale. Briefly: if pred still points to
 910          * s, try to unlink s.  If s cannot be unlinked, because it is
 911          * trailing node or pred might be unlinked, and neither pred
 912          * nor s are head or offlist, add to sweepVotes, and if enough


1022     }
1023 
1024     /**
1025      * Inserts the specified element at the tail of this queue.
1026      * As the queue is unbounded, this method will never block or
1027      * return {@code false}.
1028      *
1029      * @return {@code true} (as specified by
1030      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
1031      * @throws NullPointerException if the specified element is null
1032      */
1033     public boolean offer(E e, long timeout, TimeUnit unit) {
1034         xfer(e, true, ASYNC, 0);
1035         return true;
1036     }
1037 
1038     /**
1039      * Inserts the specified element at the tail of this queue.
1040      * As the queue is unbounded, this method will never return {@code false}.
1041      *
1042      * @return {@code true} (as specified by {@link Queue#offer})

1043      * @throws NullPointerException if the specified element is null
1044      */
1045     public boolean offer(E e) {
1046         xfer(e, true, ASYNC, 0);
1047         return true;
1048     }
1049 
1050     /**
1051      * Inserts the specified element at the tail of this queue.
1052      * As the queue is unbounded, this method will never throw
1053      * {@link IllegalStateException} or return {@code false}.
1054      *
1055      * @return {@code true} (as specified by {@link Collection#add})
1056      * @throws NullPointerException if the specified element is null
1057      */
1058     public boolean add(E e) {
1059         xfer(e, true, ASYNC, 0);
1060         return true;
1061     }
1062 


1154 
1155     /**
1156      * @throws NullPointerException     {@inheritDoc}
1157      * @throws IllegalArgumentException {@inheritDoc}
1158      */
1159     public int drainTo(Collection<? super E> c, int maxElements) {
1160         if (c == null)
1161             throw new NullPointerException();
1162         if (c == this)
1163             throw new IllegalArgumentException();
1164         int n = 0;
1165         E e;
1166         while (n < maxElements && (e = poll()) != null) {
1167             c.add(e);
1168             ++n;
1169         }
1170         return n;
1171     }
1172 
1173     /**
1174      * Returns an iterator over the elements in this queue in proper sequence.
1175      * The elements will be returned in order from first (head) to last (tail).
1176      *
1177      * <p>The returned iterator is a "weakly consistent" iterator that
1178      * will never throw {@link java.util.ConcurrentModificationException
1179      * ConcurrentModificationException}, and guarantees to traverse
1180      * elements as they existed upon construction of the iterator, and
1181      * may (but is not guaranteed to) reflect any modifications
1182      * subsequent to construction.
1183      *
1184      * @return an iterator over the elements in this queue in proper sequence
1185      */
1186     public Iterator<E> iterator() {
1187         return new Itr();
1188     }
1189 
1190     public E peek() {
1191         return firstDataItem();
1192     }
1193 
1194     /**
1195      * Returns {@code true} if this queue contains no elements.
1196      *
1197      * @return {@code true} if this queue contains no elements
1198      */
1199     public boolean isEmpty() {
1200         for (Node p = head; p != null; p = succ(p)) {
1201             if (!p.isMatched())
1202                 return !p.isData;


1226 
1227     public int getWaitingConsumerCount() {
1228         return countOfMode(false);
1229     }
1230 
1231     /**
1232      * Removes a single instance of the specified element from this queue,
1233      * if it is present.  More formally, removes an element {@code e} such
1234      * that {@code o.equals(e)}, if this queue contains one or more such
1235      * elements.
1236      * Returns {@code true} if this queue contained the specified element
1237      * (or equivalently, if this queue changed as a result of the call).
1238      *
1239      * @param o element to be removed from this queue, if present
1240      * @return {@code true} if this queue changed as a result of the call
1241      */
1242     public boolean remove(Object o) {
1243         return findAndRemove(o);
1244     }
1245 
1246     /**
1247      * Returns {@code true} if this queue contains the specified element.
1248      * More formally, returns {@code true} if and only if this queue contains
1249      * at least one element {@code e} such that {@code o.equals(e)}.
1250      *
1251      * @param o object to be checked for containment in this queue
1252      * @return {@code true} if this queue contains the specified element
1253      */
1254     public boolean contains(Object o) {
1255         if (o == null) return false;
1256         for (Node p = head; p != null; p = succ(p)) {
1257             Object item = p.item;
1258             if (p.isData) {
1259                 if (item != null && item != p && o.equals(item))
1260                     return true;
1261             }
1262             else if (item == null)
1263                 break;
1264         }
1265         return false;
1266     }
1267 
1268     /**
1269      * Always returns {@code Integer.MAX_VALUE} because a
1270      * {@code LinkedTransferQueue} is not capacity constrained.
1271      *
1272      * @return {@code Integer.MAX_VALUE} (as specified by
1273      *         {@link BlockingQueue#remainingCapacity()})
1274      */
1275     public int remainingCapacity() {
1276         return Integer.MAX_VALUE;
1277     }
1278 
1279     /**
1280      * Saves the state to a stream (that is, serializes it).
1281      *
1282      * @serialData All of the elements (each an {@code E}) in
1283      * the proper order, followed by a null
1284      * @param s the stream
1285      */
1286     private void writeObject(java.io.ObjectOutputStream s)
1287         throws java.io.IOException {