20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/licenses/publicdomain
34 */
35
36 package java.util.concurrent;
37
38 import java.util.AbstractQueue;
39 import java.util.Collection;
40 import java.util.ConcurrentModificationException;
41 import java.util.Iterator;
42 import java.util.NoSuchElementException;
43 import java.util.Queue;
44 import java.util.concurrent.locks.LockSupport;
45
46 /**
47 * An unbounded {@link TransferQueue} based on linked nodes.
48 * This queue orders elements FIFO (first-in-first-out) with respect
49 * to any given producer. The <em>head</em> of the queue is that
50 * element that has been on the queue the longest time for some
51 * producer. The <em>tail</em> of the queue is that element that has
52 * been on the queue the shortest time for some producer.
53 *
54 * <p>Beware that, unlike in most collections, the {@code size}
55 * method is <em>NOT</em> a constant-time operation. Because of the
56 * asynchronous nature of these queues, determining the current number
57 * of elements requires a traversal of the elements.
58 *
59 * <p>This class and its iterator implement all of the
60 * <em>optional</em> methods of the {@link Collection} and {@link
61 * Iterator} interfaces.
62 *
63 * <p>Memory consistency effects: As with other concurrent
571 static <E> E cast(Object item) {
572 // assert item == null || item.getClass() != Node.class;
573 return (E) item;
574 }
575
576 /**
577 * Implements all queuing methods. See above for explanation.
578 *
579 * @param e the item or null for take
580 * @param haveData true if this is a put, else a take
581 * @param how NOW, ASYNC, SYNC, or TIMED
582 * @param nanos timeout in nanosecs, used only if mode is TIMED
583 * @return an item if matched, else e
584 * @throws NullPointerException if haveData mode but e is null
585 */
586 private E xfer(E e, boolean haveData, int how, long nanos) {
587 if (haveData && (e == null))
588 throw new NullPointerException();
589 Node s = null; // the node to append, if needed
590
591 retry: for (;;) { // restart on append race
592
593 for (Node h = head, p = h; p != null;) { // find & match first node
594 boolean isData = p.isData;
595 Object item = p.item;
596 if (item != p && (item != null) == isData) { // unmatched
597 if (isData == haveData) // can't match
598 break;
599 if (p.casItem(item, e)) { // match
600 for (Node q = p; q != h;) {
601 Node n = q.next; // update by 2 unless singleton
602 if (head == h && casHead(h, n == null? q : n)) {
603 h.forgetNext();
604 break;
605 } // advance and retry
606 if ((h = head) == null ||
607 (q = h.next) == null || !q.isMatched())
608 break; // unless slack < 2
609 }
610 LockSupport.unpark(p.waiter);
611 return this.<E>cast(item);
612 }
613 }
614 Node n = p.next;
615 p = (p != n) ? n : (h = head); // Use head if p offlist
616 }
617
618 if (how != NOW) { // No matches available
619 if (s == null)
620 s = new Node(e, haveData);
621 Node pred = tryAppend(s, haveData);
622 if (pred == null)
792 if (n != p)
793 p = n;
794 else {
795 count = 0;
796 p = head;
797 }
798 }
799 return count;
800 }
801
802 final class Itr implements Iterator<E> {
803 private Node nextNode; // next node to return item for
804 private E nextItem; // the corresponding item
805 private Node lastRet; // last returned node, to support remove
806 private Node lastPred; // predecessor to unlink lastRet
807
808 /**
809 * Moves to next node after prev, or first node if prev null.
810 */
811 private void advance(Node prev) {
812 lastPred = lastRet;
813 lastRet = prev;
814 for (Node p = (prev == null) ? head : succ(prev);
815 p != null; p = succ(p)) {
816 Object item = p.item;
817 if (p.isData) {
818 if (item != null && item != p) {
819 nextItem = LinkedTransferQueue.this.<E>cast(item);
820 nextNode = p;
821 return;
822 }
823 }
824 else if (item == null)
825 break;
826 }
827 nextNode = null;
828 }
829
830 Itr() {
831 advance(null);
832 }
833
834 public final boolean hasNext() {
835 return nextNode != null;
836 }
837
838 public final E next() {
839 Node p = nextNode;
840 if (p == null) throw new NoSuchElementException();
841 E e = nextItem;
842 advance(p);
843 return e;
844 }
845
846 public final void remove() {
847 Node p = lastRet;
848 if (p == null) throw new IllegalStateException();
849 if (p.tryMatchData())
850 unsplice(lastPred, p);
851 }
852 }
853
854 /* -------------- Removal methods -------------- */
855
856 /**
857 * Unsplices (now or later) the given deleted/cancelled node with
858 * the given predecessor.
859 *
860 * @param pred a node that was at one time known to be the
861 * predecessor of s, or null or s itself if s is/was at head
862 * @param s the node to be unspliced
863 */
864 final void unsplice(Node pred, Node s) {
865 s.forgetContents(); // forget unneeded fields
866 /*
867 * See above for rationale. Briefly: if pred still points to
868 * s, try to unlink s. If s cannot be unlinked, because it is
869 * trailing node or pred might be unlinked, and neither pred
870 * nor s are head or offlist, add to sweepVotes, and if enough
980 }
981
982 /**
983 * Inserts the specified element at the tail of this queue.
984 * As the queue is unbounded, this method will never block or
985 * return {@code false}.
986 *
987 * @return {@code true} (as specified by
988 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
989 * @throws NullPointerException if the specified element is null
990 */
991 public boolean offer(E e, long timeout, TimeUnit unit) {
992 xfer(e, true, ASYNC, 0);
993 return true;
994 }
995
996 /**
997 * Inserts the specified element at the tail of this queue.
998 * As the queue is unbounded, this method will never return {@code false}.
999 *
1000 * @return {@code true} (as specified by
1001 * {@link BlockingQueue#offer(Object) BlockingQueue.offer})
1002 * @throws NullPointerException if the specified element is null
1003 */
1004 public boolean offer(E e) {
1005 xfer(e, true, ASYNC, 0);
1006 return true;
1007 }
1008
1009 /**
1010 * Inserts the specified element at the tail of this queue.
1011 * As the queue is unbounded, this method will never throw
1012 * {@link IllegalStateException} or return {@code false}.
1013 *
1014 * @return {@code true} (as specified by {@link Collection#add})
1015 * @throws NullPointerException if the specified element is null
1016 */
1017 public boolean add(E e) {
1018 xfer(e, true, ASYNC, 0);
1019 return true;
1020 }
1021
1113
1114 /**
1115 * @throws NullPointerException {@inheritDoc}
1116 * @throws IllegalArgumentException {@inheritDoc}
1117 */
1118 public int drainTo(Collection<? super E> c, int maxElements) {
1119 if (c == null)
1120 throw new NullPointerException();
1121 if (c == this)
1122 throw new IllegalArgumentException();
1123 int n = 0;
1124 E e;
1125 while (n < maxElements && (e = poll()) != null) {
1126 c.add(e);
1127 ++n;
1128 }
1129 return n;
1130 }
1131
1132 /**
1133 * Returns an iterator over the elements in this queue in proper
1134 * sequence, from head to tail.
1135 *
1136 * <p>The returned iterator is a "weakly consistent" iterator that
1137 * will never throw
1138 * {@link ConcurrentModificationException ConcurrentModificationException},
1139 * and guarantees to traverse elements as they existed upon
1140 * construction of the iterator, and may (but is not guaranteed
1141 * to) reflect any modifications subsequent to construction.
1142 *
1143 * @return an iterator over the elements in this queue in proper sequence
1144 */
1145 public Iterator<E> iterator() {
1146 return new Itr();
1147 }
1148
1149 public E peek() {
1150 return firstDataItem();
1151 }
1152
1153 /**
1154 * Returns {@code true} if this queue contains no elements.
1155 *
1156 * @return {@code true} if this queue contains no elements
1157 */
1158 public boolean isEmpty() {
1159 for (Node p = head; p != null; p = succ(p)) {
1160 if (!p.isMatched())
1161 return !p.isData;
1185
1186 public int getWaitingConsumerCount() {
1187 return countOfMode(false);
1188 }
1189
1190 /**
1191 * Removes a single instance of the specified element from this queue,
1192 * if it is present. More formally, removes an element {@code e} such
1193 * that {@code o.equals(e)}, if this queue contains one or more such
1194 * elements.
1195 * Returns {@code true} if this queue contained the specified element
1196 * (or equivalently, if this queue changed as a result of the call).
1197 *
1198 * @param o element to be removed from this queue, if present
1199 * @return {@code true} if this queue changed as a result of the call
1200 */
1201 public boolean remove(Object o) {
1202 return findAndRemove(o);
1203 }
1204
1205 /**
1206 * Always returns {@code Integer.MAX_VALUE} because a
1207 * {@code LinkedTransferQueue} is not capacity constrained.
1208 *
1209 * @return {@code Integer.MAX_VALUE} (as specified by
1210 * {@link BlockingQueue#remainingCapacity()})
1211 */
1212 public int remainingCapacity() {
1213 return Integer.MAX_VALUE;
1214 }
1215
1216 /**
1217 * Saves the state to a stream (that is, serializes it).
1218 *
1219 * @serialData All of the elements (each an {@code E}) in
1220 * the proper order, followed by a null
1221 * @param s the stream
1222 */
1223 private void writeObject(java.io.ObjectOutputStream s)
1224 throws java.io.IOException {
|
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/licenses/publicdomain
34 */
35
36 package java.util.concurrent;
37
38 import java.util.AbstractQueue;
39 import java.util.Collection;
40 import java.util.Iterator;
41 import java.util.NoSuchElementException;
42 import java.util.Queue;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.locks.LockSupport;
45
46 /**
47 * An unbounded {@link TransferQueue} based on linked nodes.
48 * This queue orders elements FIFO (first-in-first-out) with respect
49 * to any given producer. The <em>head</em> of the queue is that
50 * element that has been on the queue the longest time for some
51 * producer. The <em>tail</em> of the queue is that element that has
52 * been on the queue the shortest time for some producer.
53 *
54 * <p>Beware that, unlike in most collections, the {@code size}
55 * method is <em>NOT</em> a constant-time operation. Because of the
56 * asynchronous nature of these queues, determining the current number
57 * of elements requires a traversal of the elements.
58 *
59 * <p>This class and its iterator implement all of the
60 * <em>optional</em> methods of the {@link Collection} and {@link
61 * Iterator} interfaces.
62 *
63 * <p>Memory consistency effects: As with other concurrent
571 static <E> E cast(Object item) {
572 // assert item == null || item.getClass() != Node.class;
573 return (E) item;
574 }
575
576 /**
577 * Implements all queuing methods. See above for explanation.
578 *
579 * @param e the item or null for take
580 * @param haveData true if this is a put, else a take
581 * @param how NOW, ASYNC, SYNC, or TIMED
582 * @param nanos timeout in nanosecs, used only if mode is TIMED
583 * @return an item if matched, else e
584 * @throws NullPointerException if haveData mode but e is null
585 */
586 private E xfer(E e, boolean haveData, int how, long nanos) {
587 if (haveData && (e == null))
588 throw new NullPointerException();
589 Node s = null; // the node to append, if needed
590
591 retry:
592 for (;;) { // restart on append race
593
594 for (Node h = head, p = h; p != null;) { // find & match first node
595 boolean isData = p.isData;
596 Object item = p.item;
597 if (item != p && (item != null) == isData) { // unmatched
598 if (isData == haveData) // can't match
599 break;
600 if (p.casItem(item, e)) { // match
601 for (Node q = p; q != h;) {
602 Node n = q.next; // update by 2 unless singleton
603 if (head == h && casHead(h, n == null ? q : n)) {
604 h.forgetNext();
605 break;
606 } // advance and retry
607 if ((h = head) == null ||
608 (q = h.next) == null || !q.isMatched())
609 break; // unless slack < 2
610 }
611 LockSupport.unpark(p.waiter);
612 return this.<E>cast(item);
613 }
614 }
615 Node n = p.next;
616 p = (p != n) ? n : (h = head); // Use head if p offlist
617 }
618
619 if (how != NOW) { // No matches available
620 if (s == null)
621 s = new Node(e, haveData);
622 Node pred = tryAppend(s, haveData);
623 if (pred == null)
793 if (n != p)
794 p = n;
795 else {
796 count = 0;
797 p = head;
798 }
799 }
800 return count;
801 }
802
803 final class Itr implements Iterator<E> {
804 private Node nextNode; // next node to return item for
805 private E nextItem; // the corresponding item
806 private Node lastRet; // last returned node, to support remove
807 private Node lastPred; // predecessor to unlink lastRet
808
809 /**
810 * Moves to next node after prev, or first node if prev null.
811 */
812 private void advance(Node prev) {
813 /*
814 * To track and avoid buildup of deleted nodes in the face
815 * of calls to both Queue.remove and Itr.remove, we must
816 * include variants of unsplice and sweep upon each
817 * advance: Upon Itr.remove, we may need to catch up links
818 * from lastPred, and upon other removes, we might need to
819 * skip ahead from stale nodes and unsplice deleted ones
820 * found while advancing.
821 */
822
823 Node r, b; // reset lastPred upon possible deletion of lastRet
824 if ((r = lastRet) != null && !r.isMatched())
825 lastPred = r; // next lastPred is old lastRet
826 else if ((b = lastPred) == null || b.isMatched())
827 lastPred = null; // at start of list
828 else {
829 Node s, n; // help with removal of lastPred.next
830 while ((s = b.next) != null &&
831 s != b && s.isMatched() &&
832 (n = s.next) != null && n != s)
833 b.casNext(s, n);
834 }
835
836 this.lastRet = prev;
837
838 for (Node p = prev, s, n;;) {
839 s = (p == null) ? head : p.next;
840 if (s == null)
841 break;
842 else if (s == p) {
843 p = null;
844 continue;
845 }
846 Object item = s.item;
847 if (s.isData) {
848 if (item != null && item != s) {
849 nextItem = LinkedTransferQueue.<E>cast(item);
850 nextNode = s;
851 return;
852 }
853 }
854 else if (item == null)
855 break;
856 // assert s.isMatched();
857 if (p == null)
858 p = s;
859 else if ((n = s.next) == null)
860 break;
861 else if (s == n)
862 p = null;
863 else
864 p.casNext(s, n);
865 }
866 nextNode = null;
867 nextItem = null;
868 }
869
870 Itr() {
871 advance(null);
872 }
873
874 public final boolean hasNext() {
875 return nextNode != null;
876 }
877
878 public final E next() {
879 Node p = nextNode;
880 if (p == null) throw new NoSuchElementException();
881 E e = nextItem;
882 advance(p);
883 return e;
884 }
885
886 public final void remove() {
887 final Node lastRet = this.lastRet;
888 if (lastRet == null)
889 throw new IllegalStateException();
890 this.lastRet = null;
891 if (lastRet.tryMatchData())
892 unsplice(lastPred, lastRet);
893 }
894 }
895
896 /* -------------- Removal methods -------------- */
897
898 /**
899 * Unsplices (now or later) the given deleted/cancelled node with
900 * the given predecessor.
901 *
902 * @param pred a node that was at one time known to be the
903 * predecessor of s, or null or s itself if s is/was at head
904 * @param s the node to be unspliced
905 */
906 final void unsplice(Node pred, Node s) {
907 s.forgetContents(); // forget unneeded fields
908 /*
909 * See above for rationale. Briefly: if pred still points to
910 * s, try to unlink s. If s cannot be unlinked, because it is
911 * trailing node or pred might be unlinked, and neither pred
912 * nor s are head or offlist, add to sweepVotes, and if enough
1022 }
1023
1024 /**
1025 * Inserts the specified element at the tail of this queue.
1026 * As the queue is unbounded, this method will never block or
1027 * return {@code false}.
1028 *
1029 * @return {@code true} (as specified by
1030 * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
1031 * @throws NullPointerException if the specified element is null
1032 */
1033 public boolean offer(E e, long timeout, TimeUnit unit) {
1034 xfer(e, true, ASYNC, 0);
1035 return true;
1036 }
1037
1038 /**
1039 * Inserts the specified element at the tail of this queue.
1040 * As the queue is unbounded, this method will never return {@code false}.
1041 *
1042 * @return {@code true} (as specified by {@link Queue#offer})
1043 * @throws NullPointerException if the specified element is null
1044 */
1045 public boolean offer(E e) {
1046 xfer(e, true, ASYNC, 0);
1047 return true;
1048 }
1049
1050 /**
1051 * Inserts the specified element at the tail of this queue.
1052 * As the queue is unbounded, this method will never throw
1053 * {@link IllegalStateException} or return {@code false}.
1054 *
1055 * @return {@code true} (as specified by {@link Collection#add})
1056 * @throws NullPointerException if the specified element is null
1057 */
1058 public boolean add(E e) {
1059 xfer(e, true, ASYNC, 0);
1060 return true;
1061 }
1062
1154
1155 /**
1156 * @throws NullPointerException {@inheritDoc}
1157 * @throws IllegalArgumentException {@inheritDoc}
1158 */
1159 public int drainTo(Collection<? super E> c, int maxElements) {
1160 if (c == null)
1161 throw new NullPointerException();
1162 if (c == this)
1163 throw new IllegalArgumentException();
1164 int n = 0;
1165 E e;
1166 while (n < maxElements && (e = poll()) != null) {
1167 c.add(e);
1168 ++n;
1169 }
1170 return n;
1171 }
1172
1173 /**
1174 * Returns an iterator over the elements in this queue in proper sequence.
1175 * The elements will be returned in order from first (head) to last (tail).
1176 *
1177 * <p>The returned iterator is a "weakly consistent" iterator that
1178 * will never throw {@link java.util.ConcurrentModificationException
1179 * ConcurrentModificationException}, and guarantees to traverse
1180 * elements as they existed upon construction of the iterator, and
1181 * may (but is not guaranteed to) reflect any modifications
1182 * subsequent to construction.
1183 *
1184 * @return an iterator over the elements in this queue in proper sequence
1185 */
1186 public Iterator<E> iterator() {
1187 return new Itr();
1188 }
1189
1190 public E peek() {
1191 return firstDataItem();
1192 }
1193
1194 /**
1195 * Returns {@code true} if this queue contains no elements.
1196 *
1197 * @return {@code true} if this queue contains no elements
1198 */
1199 public boolean isEmpty() {
1200 for (Node p = head; p != null; p = succ(p)) {
1201 if (!p.isMatched())
1202 return !p.isData;
1226
1227 public int getWaitingConsumerCount() {
1228 return countOfMode(false);
1229 }
1230
1231 /**
1232 * Removes a single instance of the specified element from this queue,
1233 * if it is present. More formally, removes an element {@code e} such
1234 * that {@code o.equals(e)}, if this queue contains one or more such
1235 * elements.
1236 * Returns {@code true} if this queue contained the specified element
1237 * (or equivalently, if this queue changed as a result of the call).
1238 *
1239 * @param o element to be removed from this queue, if present
1240 * @return {@code true} if this queue changed as a result of the call
1241 */
1242 public boolean remove(Object o) {
1243 return findAndRemove(o);
1244 }
1245
1246 /**
1247 * Returns {@code true} if this queue contains the specified element.
1248 * More formally, returns {@code true} if and only if this queue contains
1249 * at least one element {@code e} such that {@code o.equals(e)}.
1250 *
1251 * @param o object to be checked for containment in this queue
1252 * @return {@code true} if this queue contains the specified element
1253 */
1254 public boolean contains(Object o) {
1255 if (o == null) return false;
1256 for (Node p = head; p != null; p = succ(p)) {
1257 Object item = p.item;
1258 if (p.isData) {
1259 if (item != null && item != p && o.equals(item))
1260 return true;
1261 }
1262 else if (item == null)
1263 break;
1264 }
1265 return false;
1266 }
1267
1268 /**
1269 * Always returns {@code Integer.MAX_VALUE} because a
1270 * {@code LinkedTransferQueue} is not capacity constrained.
1271 *
1272 * @return {@code Integer.MAX_VALUE} (as specified by
1273 * {@link BlockingQueue#remainingCapacity()})
1274 */
1275 public int remainingCapacity() {
1276 return Integer.MAX_VALUE;
1277 }
1278
1279 /**
1280 * Saves the state to a stream (that is, serializes it).
1281 *
1282 * @serialData All of the elements (each an {@code E}) in
1283 * the proper order, followed by a null
1284 * @param s the stream
1285 */
1286 private void writeObject(java.io.ObjectOutputStream s)
1287 throws java.io.IOException {
|