19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea, Bill Scherer, and Michael Scott with
32 * assistance from members of JCP JSR-166 Expert Group and released to
33 * the public domain, as explained at
34 * http://creativecommons.org/publicdomain/zero/1.0/
35 */
36
37 package java.util.concurrent;
38 import java.util.concurrent.locks.*;
39 import java.util.concurrent.atomic.*;
40 import java.util.*;
41
42 /**
43 * A {@linkplain BlockingQueue blocking queue} in which each insert
44 * operation must wait for a corresponding remove operation by another
45 * thread, and vice versa. A synchronous queue does not have any
46 * internal capacity, not even a capacity of one. You cannot
47 * <tt>peek</tt> at a synchronous queue because an element is only
48 * present when you try to remove it; you cannot insert an element
49 * (using any method) unless another thread is trying to remove it;
50 * you cannot iterate as there is nothing to iterate. The
51 * <em>head</em> of the queue is the element that the first queued
52 * inserting thread is trying to add to the queue; if there is no such
53 * queued thread then no element is available for removal and
54 * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other
55 * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
56 * <tt>SynchronousQueue</tt> acts as an empty collection. This queue
57 * does not permit <tt>null</tt> elements.
58 *
59 * <p>Synchronous queues are similar to rendezvous channels used in
146 * potentially O(n) traversal to be sure that we can remove the
147 * node, but this can run concurrently with other threads
148 * accessing the stack.
149 *
150 * While garbage collection takes care of most node reclamation
151 * issues that otherwise complicate nonblocking algorithms, care
152 * is taken to "forget" references to data, other nodes, and
153 * threads that might be held on to long-term by blocked
154 * threads. In cases where setting to null would otherwise
155 * conflict with main algorithms, this is done by changing a
156 * node's link to now point to the node itself. This doesn't arise
157 * much for Stack nodes (because blocked threads do not hang on to
158 * old head pointers), but references in Queue nodes must be
159 * aggressively forgotten to avoid reachability of everything any
160 * node has ever referred to since arrival.
161 */
162
163 /**
164 * Shared internal API for dual stacks and queues.
165 */
166 abstract static class Transferer {
167 /**
168 * Performs a put or take.
169 *
170 * @param e if non-null, the item to be handed to a consumer;
171 * if null, requests that transfer return an item
172 * offered by producer.
173 * @param timed if this operation should timeout
174 * @param nanos the timeout, in nanoseconds
175 * @return if non-null, the item provided or received; if null,
176 * the operation failed due to timeout or interrupt --
177 * the caller can distinguish which of these occurred
178 * by checking Thread.interrupted.
179 */
180 abstract Object transfer(Object e, boolean timed, long nanos);
181 }
182
183 /** The number of CPUs, for spin control */
184 static final int NCPUS = Runtime.getRuntime().availableProcessors();
185
186 /**
187 * The number of times to spin before blocking in timed waits.
188 * The value is empirically derived -- it works well across a
189 * variety of processors and OSes. Empirically, the best value
190 * seems not to vary with number of CPUs (beyond 2) so is just
191 * a constant.
192 */
193 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
194
195 /**
196 * The number of times to spin before blocking in untimed waits.
197 * This is greater than timed value because untimed waits spin
198 * faster since they don't need to check times on each spin.
199 */
200 static final int maxUntimedSpins = maxTimedSpins * 16;
201
202 /**
203 * The number of nanoseconds for which it is faster to spin
204 * rather than to use timed park. A rough estimate suffices.
205 */
206 static final long spinForTimeoutThreshold = 1000L;
207
208 /** Dual stack */
209 static final class TransferStack extends Transferer {
210 /*
211 * This extends Scherer-Scott dual stack algorithm, differing,
212 * among other ways, by using "covering" nodes rather than
213 * bit-marked pointers: Fulfilling operations push on marker
214 * nodes (with FULFILLING bit set in mode) to reserve a spot
215 * to match a waiting node.
216 */
217
218 /* Modes for SNodes, ORed together in node fields */
219 /** Node represents an unfulfilled consumer */
220 static final int REQUEST = 0;
221 /** Node represents an unfulfilled producer */
222 static final int DATA = 1;
223 /** Node is fulfilling another unfulfilled DATA or REQUEST */
224 static final int FULFILLING = 2;
225
226 /** Return true if m has fulfilling bit set */
227 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
228
229 /** Node class for TransferStacks. */
269
270 /**
271 * Tries to cancel a wait by matching node to itself.
272 */
273 void tryCancel() {
274 UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
275 }
276
277 boolean isCancelled() {
278 return match == this;
279 }
280
281 // Unsafe mechanics
282 private static final sun.misc.Unsafe UNSAFE;
283 private static final long matchOffset;
284 private static final long nextOffset;
285
286 static {
287 try {
288 UNSAFE = sun.misc.Unsafe.getUnsafe();
289 Class k = SNode.class;
290 matchOffset = UNSAFE.objectFieldOffset
291 (k.getDeclaredField("match"));
292 nextOffset = UNSAFE.objectFieldOffset
293 (k.getDeclaredField("next"));
294 } catch (Exception e) {
295 throw new Error(e);
296 }
297 }
298 }
299
300 /** The head (top) of the stack */
301 volatile SNode head;
302
303 boolean casHead(SNode h, SNode nh) {
304 return h == head &&
305 UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
306 }
307
308 /**
309 * Creates or resets fields of a node. Called only from transfer
310 * where the node to push on stack is lazily created and
311 * reused when possible to help reduce intervals between reads
312 * and CASes of head and to avoid surges of garbage when CASes
313 * to push nodes fail due to contention.
314 */
315 static SNode snode(SNode s, Object e, SNode next, int mode) {
316 if (s == null) s = new SNode(e);
317 s.mode = mode;
318 s.next = next;
319 return s;
320 }
321
322 /**
323 * Puts or takes an item.
324 */
325 Object transfer(Object e, boolean timed, long nanos) {
326 /*
327 * Basic algorithm is to loop trying one of three actions:
328 *
329 * 1. If apparently empty or already containing nodes of same
330 * mode, try to push node on stack and wait for a match,
331 * returning it, or null if cancelled.
332 *
333 * 2. If apparently containing node of complementary mode,
334 * try to push a fulfilling node on to stack, match
335 * with corresponding waiting node, pop both from
336 * stack, and return matched item. The matching or
337 * unlinking might not actually be necessary because of
338 * other threads performing action 3:
339 *
340 * 3. If top of stack already holds another fulfilling node,
341 * help it out by doing its match and/or pop
342 * operations, and then continue. The code for helping
343 * is essentially the same as for fulfilling, except
344 * that it doesn't return the item.
345 */
346
347 SNode s = null; // constructed/reused as needed
348 int mode = (e == null) ? REQUEST : DATA;
349
350 for (;;) {
351 SNode h = head;
352 if (h == null || h.mode == mode) { // empty or same-mode
353 if (timed && nanos <= 0) { // can't wait
354 if (h != null && h.isCancelled())
355 casHead(h, h.next); // pop cancelled node
356 else
357 return null;
358 } else if (casHead(h, s = snode(s, e, h, mode))) {
359 SNode m = awaitFulfill(s, timed, nanos);
360 if (m == s) { // wait was cancelled
361 clean(s);
362 return null;
363 }
364 if ((h = head) != null && h.next == s)
365 casHead(h, s.next); // help s's fulfiller
366 return (mode == REQUEST) ? m.item : s.item;
367 }
368 } else if (!isFulfilling(h.mode)) { // try to fulfill
369 if (h.isCancelled()) // already cancelled
370 casHead(h, h.next); // pop and retry
371 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
372 for (;;) { // loop until matched or waiters disappear
373 SNode m = s.next; // m is s's match
374 if (m == null) { // all waiters are gone
375 casHead(s, null); // pop fulfill node
376 s = null; // use new node next time
377 break; // restart main loop
378 }
379 SNode mn = m.next;
380 if (m.tryMatch(s)) {
381 casHead(s, mn); // pop both s and m
382 return (mode == REQUEST) ? m.item : s.item;
383 } else // lost match
384 s.casNext(m, mn); // help unlink
385 }
386 }
387 } else { // help a fulfiller
388 SNode m = h.next; // m is h's match
389 if (m == null) // waiter is gone
390 casHead(h, null); // pop fulfilling node
391 else {
392 SNode mn = m.next;
393 if (m.tryMatch(h)) // help match
394 casHead(h, mn); // pop both h and m
395 else // lost match
396 h.casNext(m, mn); // help unlink
397 }
398 }
399 }
400 }
401
402 /**
496 SNode p;
497 while ((p = head) != null && p != past && p.isCancelled())
498 casHead(p, p.next);
499
500 // Unsplice embedded nodes
501 while (p != null && p != past) {
502 SNode n = p.next;
503 if (n != null && n.isCancelled())
504 p.casNext(n, n.next);
505 else
506 p = n;
507 }
508 }
509
510 // Unsafe mechanics
511 private static final sun.misc.Unsafe UNSAFE;
512 private static final long headOffset;
513 static {
514 try {
515 UNSAFE = sun.misc.Unsafe.getUnsafe();
516 Class k = TransferStack.class;
517 headOffset = UNSAFE.objectFieldOffset
518 (k.getDeclaredField("head"));
519 } catch (Exception e) {
520 throw new Error(e);
521 }
522 }
523 }
524
525 /** Dual Queue */
526 static final class TransferQueue extends Transferer {
527 /*
528 * This extends Scherer-Scott dual queue algorithm, differing,
529 * among other ways, by using modes within nodes rather than
530 * marked pointers. The algorithm is a little simpler than
531 * that for stacks because fulfillers do not need explicit
532 * nodes, and matching is done by CAS'ing QNode.item field
533 * from non-null to null (for put) or vice versa (for take).
534 */
535
536 /** Node class for TransferQueue. */
537 static final class QNode {
538 volatile QNode next; // next node in queue
539 volatile Object item; // CAS'ed to or from null
540 volatile Thread waiter; // to control park/unpark
541 final boolean isData;
542
543 QNode(Object item, boolean isData) {
544 this.item = item;
545 this.isData = isData;
546 }
566 return item == this;
567 }
568
569 /**
570 * Returns true if this node is known to be off the queue
571 * because its next pointer has been forgotten due to
572 * an advanceHead operation.
573 */
574 boolean isOffList() {
575 return next == this;
576 }
577
578 // Unsafe mechanics
579 private static final sun.misc.Unsafe UNSAFE;
580 private static final long itemOffset;
581 private static final long nextOffset;
582
583 static {
584 try {
585 UNSAFE = sun.misc.Unsafe.getUnsafe();
586 Class k = QNode.class;
587 itemOffset = UNSAFE.objectFieldOffset
588 (k.getDeclaredField("item"));
589 nextOffset = UNSAFE.objectFieldOffset
590 (k.getDeclaredField("next"));
591 } catch (Exception e) {
592 throw new Error(e);
593 }
594 }
595 }
596
597 /** Head of queue */
598 transient volatile QNode head;
599 /** Tail of queue */
600 transient volatile QNode tail;
601 /**
602 * Reference to a cancelled node that might not yet have been
603 * unlinked from queue because it was the last inserted node
604 * when it cancelled.
605 */
606 transient volatile QNode cleanMe;
623
624 /**
625 * Tries to cas nt as new tail.
626 */
627 void advanceTail(QNode t, QNode nt) {
628 if (tail == t)
629 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
630 }
631
632 /**
633 * Tries to CAS cleanMe slot.
634 */
635 boolean casCleanMe(QNode cmp, QNode val) {
636 return cleanMe == cmp &&
637 UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
638 }
639
640 /**
641 * Puts or takes an item.
642 */
643 Object transfer(Object e, boolean timed, long nanos) {
644 /* Basic algorithm is to loop trying to take either of
645 * two actions:
646 *
647 * 1. If queue apparently empty or holding same-mode nodes,
648 * try to add node to queue of waiters, wait to be
649 * fulfilled (or cancelled) and return matching item.
650 *
651 * 2. If queue apparently contains waiting items, and this
652 * call is of complementary mode, try to fulfill by CAS'ing
653 * item field of waiting node and dequeuing it, and then
654 * returning matching item.
655 *
656 * In each case, along the way, check for and try to help
657 * advance head and tail on behalf of other stalled/slow
658 * threads.
659 *
660 * The loop starts off with a null check guarding against
661 * seeing uninitialized head or tail values. This never
662 * happens in current SynchronousQueue, but could if
663 * callers held non-volatile/final ref to the
686 if (timed && nanos <= 0) // can't wait
687 return null;
688 if (s == null)
689 s = new QNode(e, isData);
690 if (!t.casNext(null, s)) // failed to link in
691 continue;
692
693 advanceTail(t, s); // swing tail and wait
694 Object x = awaitFulfill(s, e, timed, nanos);
695 if (x == s) { // wait was cancelled
696 clean(t, s);
697 return null;
698 }
699
700 if (!s.isOffList()) { // not already unlinked
701 advanceHead(t, s); // unlink if head
702 if (x != null) // and forget fields
703 s.item = s;
704 s.waiter = null;
705 }
706 return (x != null) ? x : e;
707
708 } else { // complementary-mode
709 QNode m = h.next; // node to fulfill
710 if (t != tail || m == null || h != head)
711 continue; // inconsistent read
712
713 Object x = m.item;
714 if (isData == (x != null) || // m already fulfilled
715 x == m || // m cancelled
716 !m.casItem(x, e)) { // lost CAS
717 advanceHead(h, m); // dequeue and retry
718 continue;
719 }
720
721 advanceHead(h, m); // successfully fulfilled
722 LockSupport.unpark(m.waiter);
723 return (x != null) ? x : e;
724 }
725 }
726 }
727
728 /**
729 * Spins/blocks until node s is fulfilled.
730 *
731 * @param s the waiting node
732 * @param e the comparison value for checking match
733 * @param timed true if timed wait
734 * @param nanos timeout value
735 * @return matched item, or s if cancelled
736 */
737 Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
738 /* Same idea as TransferStack.awaitFulfill */
739 long lastTime = timed ? System.nanoTime() : 0;
740 Thread w = Thread.currentThread();
741 int spins = ((head.next == s) ?
742 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
743 for (;;) {
744 if (w.isInterrupted())
745 s.tryCancel(e);
746 Object x = s.item;
747 if (x != e)
748 return x;
749 if (timed) {
750 long now = System.nanoTime();
751 nanos -= now - lastTime;
752 lastTime = now;
753 if (nanos <= 0) {
754 s.tryCancel(e);
755 continue;
756 }
757 }
810 !d.isCancelled() || // d not cancelled or
811 (d != t && // d not tail and
812 (dn = d.next) != null && // has successor
813 dn != d && // that is on list
814 dp.casNext(d, dn))) // d unspliced
815 casCleanMe(dp, null);
816 if (dp == pred)
817 return; // s is already saved node
818 } else if (casCleanMe(null, pred))
819 return; // Postpone cleaning s
820 }
821 }
822
823 private static final sun.misc.Unsafe UNSAFE;
824 private static final long headOffset;
825 private static final long tailOffset;
826 private static final long cleanMeOffset;
827 static {
828 try {
829 UNSAFE = sun.misc.Unsafe.getUnsafe();
830 Class k = TransferQueue.class;
831 headOffset = UNSAFE.objectFieldOffset
832 (k.getDeclaredField("head"));
833 tailOffset = UNSAFE.objectFieldOffset
834 (k.getDeclaredField("tail"));
835 cleanMeOffset = UNSAFE.objectFieldOffset
836 (k.getDeclaredField("cleanMe"));
837 } catch (Exception e) {
838 throw new Error(e);
839 }
840 }
841 }
842
843 /**
844 * The transferer. Set only in constructor, but cannot be declared
845 * as final without further complicating serialization. Since
846 * this is accessed only at most once per public method, there
847 * isn't a noticeable performance penalty for using volatile
848 * instead of final here.
849 */
850 private transient volatile Transferer transferer;
851
852 /**
853 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
854 */
855 public SynchronousQueue() {
856 this(false);
857 }
858
859 /**
860 * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
861 *
862 * @param fair if true, waiting threads contend in FIFO order for
863 * access; otherwise the order is unspecified.
864 */
865 public SynchronousQueue(boolean fair) {
866 transferer = fair ? new TransferQueue() : new TransferStack();
867 }
868
869 /**
870 * Adds the specified element to this queue, waiting if necessary for
871 * another thread to receive it.
872 *
873 * @throws InterruptedException {@inheritDoc}
874 * @throws NullPointerException {@inheritDoc}
875 */
876 public void put(E o) throws InterruptedException {
877 if (o == null) throw new NullPointerException();
878 if (transferer.transfer(o, false, 0) == null) {
879 Thread.interrupted();
880 throw new InterruptedException();
881 }
882 }
883
884 /**
885 * Inserts the specified element into this queue, waiting if necessary
886 * up to the specified wait time for another thread to receive it.
905 * waiting to receive it.
906 *
907 * @param e the element to add
908 * @return <tt>true</tt> if the element was added to this queue, else
909 * <tt>false</tt>
910 * @throws NullPointerException if the specified element is null
911 */
912 public boolean offer(E e) {
913 if (e == null) throw new NullPointerException();
914 return transferer.transfer(e, true, 0) != null;
915 }
916
917 /**
918 * Retrieves and removes the head of this queue, waiting if necessary
919 * for another thread to insert it.
920 *
921 * @return the head of this queue
922 * @throws InterruptedException {@inheritDoc}
923 */
924 public E take() throws InterruptedException {
925 Object e = transferer.transfer(null, false, 0);
926 if (e != null)
927 return (E)e;
928 Thread.interrupted();
929 throw new InterruptedException();
930 }
931
932 /**
933 * Retrieves and removes the head of this queue, waiting
934 * if necessary up to the specified wait time, for another thread
935 * to insert it.
936 *
937 * @return the head of this queue, or <tt>null</tt> if the
938 * specified waiting time elapses before an element is present.
939 * @throws InterruptedException {@inheritDoc}
940 */
941 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
942 Object e = transferer.transfer(null, true, unit.toNanos(timeout));
943 if (e != null || !Thread.interrupted())
944 return (E)e;
945 throw new InterruptedException();
946 }
947
948 /**
949 * Retrieves and removes the head of this queue, if another thread
950 * is currently making an element available.
951 *
952 * @return the head of this queue, or <tt>null</tt> if no
953 * element is available.
954 */
955 public E poll() {
956 return (E)transferer.transfer(null, true, 0);
957 }
958
959 /**
960 * Always returns <tt>true</tt>.
961 * A <tt>SynchronousQueue</tt> has no internal capacity.
962 *
963 * @return <tt>true</tt>
964 */
965 public boolean isEmpty() {
966 return true;
967 }
968
969 /**
970 * Always returns zero.
971 * A <tt>SynchronousQueue</tt> has no internal capacity.
972 *
973 * @return zero.
974 */
975 public int size() {
976 return 0;
1048 return false;
1049 }
1050
1051 /**
1052 * Always returns <tt>null</tt>.
1053 * A <tt>SynchronousQueue</tt> does not return elements
1054 * unless actively waited on.
1055 *
1056 * @return <tt>null</tt>
1057 */
1058 public E peek() {
1059 return null;
1060 }
1061
1062 /**
1063 * Returns an empty iterator in which <tt>hasNext</tt> always returns
1064 * <tt>false</tt>.
1065 *
1066 * @return an empty iterator
1067 */
1068 public Iterator<E> iterator() {
1069 return Collections.emptyIterator();
1070 }
1071
1072 /**
1073 * Returns a zero-length array.
1074 * @return a zero-length array
1075 */
1076 public Object[] toArray() {
1077 return new Object[0];
1078 }
1079
1080 /**
1081 * Sets the zeroeth element of the specified array to <tt>null</tt>
1082 * (if the array has non-zero length) and returns it.
1083 *
1084 * @param a the array
1085 * @return the specified array
1086 * @throws NullPointerException if the specified array is null
1087 */
1088 public <T> T[] toArray(T[] a) {
1089 if (a.length > 0)
1090 a[0] = null;
1091 return a;
1092 }
1093
1094 /**
1095 * @throws UnsupportedOperationException {@inheritDoc}
1096 * @throws ClassCastException {@inheritDoc}
1097 * @throws NullPointerException {@inheritDoc}
1098 * @throws IllegalArgumentException {@inheritDoc}
1099 */
1100 public int drainTo(Collection<? super E> c) {
1101 if (c == null)
1102 throw new NullPointerException();
1103 if (c == this)
1104 throw new IllegalArgumentException();
1105 int n = 0;
1106 E e;
1107 while ( (e = poll()) != null) {
1108 c.add(e);
1109 ++n;
1110 }
1111 return n;
1112 }
1113
1114 /**
1115 * @throws UnsupportedOperationException {@inheritDoc}
1116 * @throws ClassCastException {@inheritDoc}
1117 * @throws NullPointerException {@inheritDoc}
1118 * @throws IllegalArgumentException {@inheritDoc}
1119 */
1120 public int drainTo(Collection<? super E> c, int maxElements) {
1121 if (c == null)
1122 throw new NullPointerException();
1123 if (c == this)
1124 throw new IllegalArgumentException();
1125 int n = 0;
1126 E e;
1127 while (n < maxElements && (e = poll()) != null) {
1128 c.add(e);
1129 ++n;
1130 }
1131 return n;
1132 }
1133
1134 /*
1135 * To cope with serialization strategy in the 1.5 version of
1136 * SynchronousQueue, we declare some unused classes and fields
1137 * that exist solely to enable serializability across versions.
1138 * These fields are never used, so are initialized only if this
1139 * object is ever serialized or deserialized.
1140 */
1141
1142 static class WaitQueue implements java.io.Serializable { }
1143 static class LifoWaitQueue extends WaitQueue {
1144 private static final long serialVersionUID = -3633113410248163686L;
1145 }
1146 static class FifoWaitQueue extends WaitQueue {
1147 private static final long serialVersionUID = -3623113410248163686L;
1148 }
1149 private ReentrantLock qlock;
1150 private WaitQueue waitingProducers;
1151 private WaitQueue waitingConsumers;
1152
1153 /**
1154 * Save the state to a stream (that is, serialize it).
1155 *
1156 * @param s the stream
1157 */
1158 private void writeObject(java.io.ObjectOutputStream s)
1159 throws java.io.IOException {
1160 boolean fair = transferer instanceof TransferQueue;
1161 if (fair) {
1162 qlock = new ReentrantLock(true);
1163 waitingProducers = new FifoWaitQueue();
1164 waitingConsumers = new FifoWaitQueue();
1165 }
1166 else {
1167 qlock = new ReentrantLock();
1168 waitingProducers = new LifoWaitQueue();
1169 waitingConsumers = new LifoWaitQueue();
1170 }
1171 s.defaultWriteObject();
1172 }
1173
1174 private void readObject(final java.io.ObjectInputStream s)
1175 throws java.io.IOException, ClassNotFoundException {
1176 s.defaultReadObject();
1177 if (waitingProducers instanceof FifoWaitQueue)
1178 transferer = new TransferQueue();
1179 else
1180 transferer = new TransferStack();
1181 }
1182
1183 // Unsafe mechanics
1184 static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1185 String field, Class<?> klazz) {
1186 try {
1187 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1188 } catch (NoSuchFieldException e) {
1189 // Convert Exception to corresponding Error
1190 NoSuchFieldError error = new NoSuchFieldError(field);
1191 error.initCause(e);
1192 throw error;
1193 }
1194 }
1195
1196 }
|
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea, Bill Scherer, and Michael Scott with
32 * assistance from members of JCP JSR-166 Expert Group and released to
33 * the public domain, as explained at
34 * http://creativecommons.org/publicdomain/zero/1.0/
35 */
36
37 package java.util.concurrent;
38 import java.util.concurrent.locks.*;
39 import java.util.*;
40
41 /**
42 * A {@linkplain BlockingQueue blocking queue} in which each insert
43 * operation must wait for a corresponding remove operation by another
44 * thread, and vice versa. A synchronous queue does not have any
45 * internal capacity, not even a capacity of one. You cannot
46 * <tt>peek</tt> at a synchronous queue because an element is only
47 * present when you try to remove it; you cannot insert an element
48 * (using any method) unless another thread is trying to remove it;
49 * you cannot iterate as there is nothing to iterate. The
50 * <em>head</em> of the queue is the element that the first queued
51 * inserting thread is trying to add to the queue; if there is no such
52 * queued thread then no element is available for removal and
53 * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other
54 * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
55 * <tt>SynchronousQueue</tt> acts as an empty collection. This queue
56 * does not permit <tt>null</tt> elements.
57 *
58 * <p>Synchronous queues are similar to rendezvous channels used in
145 * potentially O(n) traversal to be sure that we can remove the
146 * node, but this can run concurrently with other threads
147 * accessing the stack.
148 *
149 * While garbage collection takes care of most node reclamation
150 * issues that otherwise complicate nonblocking algorithms, care
151 * is taken to "forget" references to data, other nodes, and
152 * threads that might be held on to long-term by blocked
153 * threads. In cases where setting to null would otherwise
154 * conflict with main algorithms, this is done by changing a
155 * node's link to now point to the node itself. This doesn't arise
156 * much for Stack nodes (because blocked threads do not hang on to
157 * old head pointers), but references in Queue nodes must be
158 * aggressively forgotten to avoid reachability of everything any
159 * node has ever referred to since arrival.
160 */
161
162 /**
163 * Shared internal API for dual stacks and queues.
164 */
165 abstract static class Transferer<E> {
166 /**
167 * Performs a put or take.
168 *
169 * @param e if non-null, the item to be handed to a consumer;
170 * if null, requests that transfer return an item
171 * offered by producer.
172 * @param timed if this operation should timeout
173 * @param nanos the timeout, in nanoseconds
174 * @return if non-null, the item provided or received; if null,
175 * the operation failed due to timeout or interrupt --
176 * the caller can distinguish which of these occurred
177 * by checking Thread.interrupted.
178 */
179 abstract E transfer(E e, boolean timed, long nanos);
180 }
181
182 /** The number of CPUs, for spin control */
183 static final int NCPUS = Runtime.getRuntime().availableProcessors();
184
185 /**
186 * The number of times to spin before blocking in timed waits.
187 * The value is empirically derived -- it works well across a
188 * variety of processors and OSes. Empirically, the best value
189 * seems not to vary with number of CPUs (beyond 2) so is just
190 * a constant.
191 */
192 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
193
194 /**
195 * The number of times to spin before blocking in untimed waits.
196 * This is greater than timed value because untimed waits spin
197 * faster since they don't need to check times on each spin.
198 */
199 static final int maxUntimedSpins = maxTimedSpins * 16;
200
201 /**
202 * The number of nanoseconds for which it is faster to spin
203 * rather than to use timed park. A rough estimate suffices.
204 */
205 static final long spinForTimeoutThreshold = 1000L;
206
207 /** Dual stack */
208 static final class TransferStack<E> extends Transferer<E> {
209 /*
210 * This extends Scherer-Scott dual stack algorithm, differing,
211 * among other ways, by using "covering" nodes rather than
212 * bit-marked pointers: Fulfilling operations push on marker
213 * nodes (with FULFILLING bit set in mode) to reserve a spot
214 * to match a waiting node.
215 */
216
217 /* Modes for SNodes, ORed together in node fields */
218 /** Node represents an unfulfilled consumer */
219 static final int REQUEST = 0;
220 /** Node represents an unfulfilled producer */
221 static final int DATA = 1;
222 /** Node is fulfilling another unfulfilled DATA or REQUEST */
223 static final int FULFILLING = 2;
224
225 /** Return true if m has fulfilling bit set */
226 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
227
228 /** Node class for TransferStacks. */
268
269 /**
270 * Tries to cancel a wait by matching node to itself.
271 */
272 void tryCancel() {
273 UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
274 }
275
276 boolean isCancelled() {
277 return match == this;
278 }
279
280 // Unsafe mechanics
281 private static final sun.misc.Unsafe UNSAFE;
282 private static final long matchOffset;
283 private static final long nextOffset;
284
285 static {
286 try {
287 UNSAFE = sun.misc.Unsafe.getUnsafe();
288 Class<?> k = SNode.class;
289 matchOffset = UNSAFE.objectFieldOffset
290 (k.getDeclaredField("match"));
291 nextOffset = UNSAFE.objectFieldOffset
292 (k.getDeclaredField("next"));
293 } catch (Exception e) {
294 throw new Error(e);
295 }
296 }
297 }
298
299 /** The head (top) of the stack */
300 volatile SNode head;
301
302 boolean casHead(SNode h, SNode nh) {
303 return h == head &&
304 UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
305 }
306
307 /**
308 * Creates or resets fields of a node. Called only from transfer
309 * where the node to push on stack is lazily created and
310 * reused when possible to help reduce intervals between reads
311 * and CASes of head and to avoid surges of garbage when CASes
312 * to push nodes fail due to contention.
313 */
314 static SNode snode(SNode s, Object e, SNode next, int mode) {
315 if (s == null) s = new SNode(e);
316 s.mode = mode;
317 s.next = next;
318 return s;
319 }
320
321 /**
322 * Puts or takes an item.
323 */
324 @SuppressWarnings("unchecked")
325 E transfer(E e, boolean timed, long nanos) {
326 /*
327 * Basic algorithm is to loop trying one of three actions:
328 *
329 * 1. If apparently empty or already containing nodes of same
330 * mode, try to push node on stack and wait for a match,
331 * returning it, or null if cancelled.
332 *
333 * 2. If apparently containing node of complementary mode,
334 * try to push a fulfilling node on to stack, match
335 * with corresponding waiting node, pop both from
336 * stack, and return matched item. The matching or
337 * unlinking might not actually be necessary because of
338 * other threads performing action 3:
339 *
340 * 3. If top of stack already holds another fulfilling node,
341 * help it out by doing its match and/or pop
342 * operations, and then continue. The code for helping
343 * is essentially the same as for fulfilling, except
344 * that it doesn't return the item.
345 */
346
347 SNode s = null; // constructed/reused as needed
348 int mode = (e == null) ? REQUEST : DATA;
349
350 for (;;) {
351 SNode h = head;
352 if (h == null || h.mode == mode) { // empty or same-mode
353 if (timed && nanos <= 0) { // can't wait
354 if (h != null && h.isCancelled())
355 casHead(h, h.next); // pop cancelled node
356 else
357 return null;
358 } else if (casHead(h, s = snode(s, e, h, mode))) {
359 SNode m = awaitFulfill(s, timed, nanos);
360 if (m == s) { // wait was cancelled
361 clean(s);
362 return null;
363 }
364 if ((h = head) != null && h.next == s)
365 casHead(h, s.next); // help s's fulfiller
366 return (E) ((mode == REQUEST) ? m.item : s.item);
367 }
368 } else if (!isFulfilling(h.mode)) { // try to fulfill
369 if (h.isCancelled()) // already cancelled
370 casHead(h, h.next); // pop and retry
371 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
372 for (;;) { // loop until matched or waiters disappear
373 SNode m = s.next; // m is s's match
374 if (m == null) { // all waiters are gone
375 casHead(s, null); // pop fulfill node
376 s = null; // use new node next time
377 break; // restart main loop
378 }
379 SNode mn = m.next;
380 if (m.tryMatch(s)) {
381 casHead(s, mn); // pop both s and m
382 return (E) ((mode == REQUEST) ? m.item : s.item);
383 } else // lost match
384 s.casNext(m, mn); // help unlink
385 }
386 }
387 } else { // help a fulfiller
388 SNode m = h.next; // m is h's match
389 if (m == null) // waiter is gone
390 casHead(h, null); // pop fulfilling node
391 else {
392 SNode mn = m.next;
393 if (m.tryMatch(h)) // help match
394 casHead(h, mn); // pop both h and m
395 else // lost match
396 h.casNext(m, mn); // help unlink
397 }
398 }
399 }
400 }
401
402 /**
496 SNode p;
497 while ((p = head) != null && p != past && p.isCancelled())
498 casHead(p, p.next);
499
500 // Unsplice embedded nodes
501 while (p != null && p != past) {
502 SNode n = p.next;
503 if (n != null && n.isCancelled())
504 p.casNext(n, n.next);
505 else
506 p = n;
507 }
508 }
509
510 // Unsafe mechanics
511 private static final sun.misc.Unsafe UNSAFE;
512 private static final long headOffset;
513 static {
514 try {
515 UNSAFE = sun.misc.Unsafe.getUnsafe();
516 Class<?> k = TransferStack.class;
517 headOffset = UNSAFE.objectFieldOffset
518 (k.getDeclaredField("head"));
519 } catch (Exception e) {
520 throw new Error(e);
521 }
522 }
523 }
524
525 /** Dual Queue */
526 static final class TransferQueue<E> extends Transferer<E> {
527 /*
528 * This extends Scherer-Scott dual queue algorithm, differing,
529 * among other ways, by using modes within nodes rather than
530 * marked pointers. The algorithm is a little simpler than
531 * that for stacks because fulfillers do not need explicit
532 * nodes, and matching is done by CAS'ing QNode.item field
533 * from non-null to null (for put) or vice versa (for take).
534 */
535
536 /** Node class for TransferQueue. */
537 static final class QNode {
538 volatile QNode next; // next node in queue
539 volatile Object item; // CAS'ed to or from null
540 volatile Thread waiter; // to control park/unpark
541 final boolean isData;
542
543 QNode(Object item, boolean isData) {
544 this.item = item;
545 this.isData = isData;
546 }
566 return item == this;
567 }
568
569 /**
570 * Returns true if this node is known to be off the queue
571 * because its next pointer has been forgotten due to
572 * an advanceHead operation.
573 */
574 boolean isOffList() {
575 return next == this;
576 }
577
578 // Unsafe mechanics
579 private static final sun.misc.Unsafe UNSAFE;
580 private static final long itemOffset;
581 private static final long nextOffset;
582
583 static {
584 try {
585 UNSAFE = sun.misc.Unsafe.getUnsafe();
586 Class<?> k = QNode.class;
587 itemOffset = UNSAFE.objectFieldOffset
588 (k.getDeclaredField("item"));
589 nextOffset = UNSAFE.objectFieldOffset
590 (k.getDeclaredField("next"));
591 } catch (Exception e) {
592 throw new Error(e);
593 }
594 }
595 }
596
597 /** Head of queue */
598 transient volatile QNode head;
599 /** Tail of queue */
600 transient volatile QNode tail;
601 /**
602 * Reference to a cancelled node that might not yet have been
603 * unlinked from queue because it was the last inserted node
604 * when it cancelled.
605 */
606 transient volatile QNode cleanMe;
623
624 /**
625 * Tries to cas nt as new tail.
626 */
627 void advanceTail(QNode t, QNode nt) {
628 if (tail == t)
629 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
630 }
631
632 /**
633 * Tries to CAS cleanMe slot.
634 */
635 boolean casCleanMe(QNode cmp, QNode val) {
636 return cleanMe == cmp &&
637 UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
638 }
639
640 /**
641 * Puts or takes an item.
642 */
643 @SuppressWarnings("unchecked")
644 E transfer(E e, boolean timed, long nanos) {
645 /* Basic algorithm is to loop trying to take either of
646 * two actions:
647 *
648 * 1. If queue apparently empty or holding same-mode nodes,
649 * try to add node to queue of waiters, wait to be
650 * fulfilled (or cancelled) and return matching item.
651 *
652 * 2. If queue apparently contains waiting items, and this
653 * call is of complementary mode, try to fulfill by CAS'ing
654 * item field of waiting node and dequeuing it, and then
655 * returning matching item.
656 *
657 * In each case, along the way, check for and try to help
658 * advance head and tail on behalf of other stalled/slow
659 * threads.
660 *
661 * The loop starts off with a null check guarding against
662 * seeing uninitialized head or tail values. This never
663 * happens in current SynchronousQueue, but could if
664 * callers held non-volatile/final ref to the
687 if (timed && nanos <= 0) // can't wait
688 return null;
689 if (s == null)
690 s = new QNode(e, isData);
691 if (!t.casNext(null, s)) // failed to link in
692 continue;
693
694 advanceTail(t, s); // swing tail and wait
695 Object x = awaitFulfill(s, e, timed, nanos);
696 if (x == s) { // wait was cancelled
697 clean(t, s);
698 return null;
699 }
700
701 if (!s.isOffList()) { // not already unlinked
702 advanceHead(t, s); // unlink if head
703 if (x != null) // and forget fields
704 s.item = s;
705 s.waiter = null;
706 }
707 return (x != null) ? (E)x : e;
708
709 } else { // complementary-mode
710 QNode m = h.next; // node to fulfill
711 if (t != tail || m == null || h != head)
712 continue; // inconsistent read
713
714 Object x = m.item;
715 if (isData == (x != null) || // m already fulfilled
716 x == m || // m cancelled
717 !m.casItem(x, e)) { // lost CAS
718 advanceHead(h, m); // dequeue and retry
719 continue;
720 }
721
722 advanceHead(h, m); // successfully fulfilled
723 LockSupport.unpark(m.waiter);
724 return (x != null) ? (E)x : e;
725 }
726 }
727 }
728
729 /**
730 * Spins/blocks until node s is fulfilled.
731 *
732 * @param s the waiting node
733 * @param e the comparison value for checking match
734 * @param timed true if timed wait
735 * @param nanos timeout value
736 * @return matched item, or s if cancelled
737 */
738 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
739 /* Same idea as TransferStack.awaitFulfill */
740 long lastTime = timed ? System.nanoTime() : 0;
741 Thread w = Thread.currentThread();
742 int spins = ((head.next == s) ?
743 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
744 for (;;) {
745 if (w.isInterrupted())
746 s.tryCancel(e);
747 Object x = s.item;
748 if (x != e)
749 return x;
750 if (timed) {
751 long now = System.nanoTime();
752 nanos -= now - lastTime;
753 lastTime = now;
754 if (nanos <= 0) {
755 s.tryCancel(e);
756 continue;
757 }
758 }
811 !d.isCancelled() || // d not cancelled or
812 (d != t && // d not tail and
813 (dn = d.next) != null && // has successor
814 dn != d && // that is on list
815 dp.casNext(d, dn))) // d unspliced
816 casCleanMe(dp, null);
817 if (dp == pred)
818 return; // s is already saved node
819 } else if (casCleanMe(null, pred))
820 return; // Postpone cleaning s
821 }
822 }
823
824 private static final sun.misc.Unsafe UNSAFE;
825 private static final long headOffset;
826 private static final long tailOffset;
827 private static final long cleanMeOffset;
828 static {
829 try {
830 UNSAFE = sun.misc.Unsafe.getUnsafe();
831 Class<?> k = TransferQueue.class;
832 headOffset = UNSAFE.objectFieldOffset
833 (k.getDeclaredField("head"));
834 tailOffset = UNSAFE.objectFieldOffset
835 (k.getDeclaredField("tail"));
836 cleanMeOffset = UNSAFE.objectFieldOffset
837 (k.getDeclaredField("cleanMe"));
838 } catch (Exception e) {
839 throw new Error(e);
840 }
841 }
842 }
843
844 /**
845 * The transferer. Set only in constructor, but cannot be declared
846 * as final without further complicating serialization. Since
847 * this is accessed only at most once per public method, there
848 * isn't a noticeable performance penalty for using volatile
849 * instead of final here.
850 */
851 private transient volatile Transferer<E> transferer;
852
853 /**
854 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
855 */
856 public SynchronousQueue() {
857 this(false);
858 }
859
860 /**
861 * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
862 *
863 * @param fair if true, waiting threads contend in FIFO order for
864 * access; otherwise the order is unspecified.
865 */
866 public SynchronousQueue(boolean fair) {
867 transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
868 }
869
870 /**
871 * Adds the specified element to this queue, waiting if necessary for
872 * another thread to receive it.
873 *
874 * @throws InterruptedException {@inheritDoc}
875 * @throws NullPointerException {@inheritDoc}
876 */
877 public void put(E o) throws InterruptedException {
878 if (o == null) throw new NullPointerException();
879 if (transferer.transfer(o, false, 0) == null) {
880 Thread.interrupted();
881 throw new InterruptedException();
882 }
883 }
884
885 /**
886 * Inserts the specified element into this queue, waiting if necessary
887 * up to the specified wait time for another thread to receive it.
906 * waiting to receive it.
907 *
908 * @param e the element to add
909 * @return <tt>true</tt> if the element was added to this queue, else
910 * <tt>false</tt>
911 * @throws NullPointerException if the specified element is null
912 */
913 public boolean offer(E e) {
914 if (e == null) throw new NullPointerException();
915 return transferer.transfer(e, true, 0) != null;
916 }
917
918 /**
919 * Retrieves and removes the head of this queue, waiting if necessary
920 * for another thread to insert it.
921 *
922 * @return the head of this queue
923 * @throws InterruptedException {@inheritDoc}
924 */
925 public E take() throws InterruptedException {
926 E e = transferer.transfer(null, false, 0);
927 if (e != null)
928 return e;
929 Thread.interrupted();
930 throw new InterruptedException();
931 }
932
933 /**
934 * Retrieves and removes the head of this queue, waiting
935 * if necessary up to the specified wait time, for another thread
936 * to insert it.
937 *
938 * @return the head of this queue, or <tt>null</tt> if the
939 * specified waiting time elapses before an element is present.
940 * @throws InterruptedException {@inheritDoc}
941 */
942 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
943 E e = transferer.transfer(null, true, unit.toNanos(timeout));
944 if (e != null || !Thread.interrupted())
945 return e;
946 throw new InterruptedException();
947 }
948
949 /**
950 * Retrieves and removes the head of this queue, if another thread
951 * is currently making an element available.
952 *
953 * @return the head of this queue, or <tt>null</tt> if no
954 * element is available.
955 */
956 public E poll() {
957 return transferer.transfer(null, true, 0);
958 }
959
960 /**
961 * Always returns <tt>true</tt>.
962 * A <tt>SynchronousQueue</tt> has no internal capacity.
963 *
964 * @return <tt>true</tt>
965 */
966 public boolean isEmpty() {
967 return true;
968 }
969
970 /**
971 * Always returns zero.
972 * A <tt>SynchronousQueue</tt> has no internal capacity.
973 *
974 * @return zero.
975 */
976 public int size() {
977 return 0;
1049 return false;
1050 }
1051
1052 /**
1053 * Always returns <tt>null</tt>.
1054 * A <tt>SynchronousQueue</tt> does not return elements
1055 * unless actively waited on.
1056 *
1057 * @return <tt>null</tt>
1058 */
1059 public E peek() {
1060 return null;
1061 }
1062
1063 /**
1064 * Returns an empty iterator in which <tt>hasNext</tt> always returns
1065 * <tt>false</tt>.
1066 *
1067 * @return an empty iterator
1068 */
1069 @SuppressWarnings("unchecked")
1070 public Iterator<E> iterator() {
1071 return (Iterator<E>) EmptyIterator.EMPTY_ITERATOR;
1072 }
1073
1074 // Replicated from a previous version of Collections
1075 private static class EmptyIterator<E> implements Iterator<E> {
1076 static final EmptyIterator<Object> EMPTY_ITERATOR
1077 = new EmptyIterator<Object>();
1078
1079 public boolean hasNext() { return false; }
1080 public E next() { throw new NoSuchElementException(); }
1081 public void remove() { throw new IllegalStateException(); }
1082 }
1083
1084 /**
1085 * Returns a zero-length array.
1086 * @return a zero-length array
1087 */
1088 public Object[] toArray() {
1089 return new Object[0];
1090 }
1091
1092 /**
1093 * Sets the zeroeth element of the specified array to <tt>null</tt>
1094 * (if the array has non-zero length) and returns it.
1095 *
1096 * @param a the array
1097 * @return the specified array
1098 * @throws NullPointerException if the specified array is null
1099 */
1100 public <T> T[] toArray(T[] a) {
1101 if (a.length > 0)
1102 a[0] = null;
1103 return a;
1104 }
1105
1106 /**
1107 * @throws UnsupportedOperationException {@inheritDoc}
1108 * @throws ClassCastException {@inheritDoc}
1109 * @throws NullPointerException {@inheritDoc}
1110 * @throws IllegalArgumentException {@inheritDoc}
1111 */
1112 public int drainTo(Collection<? super E> c) {
1113 if (c == null)
1114 throw new NullPointerException();
1115 if (c == this)
1116 throw new IllegalArgumentException();
1117 int n = 0;
1118 for (E e; (e = poll()) != null;) {
1119 c.add(e);
1120 ++n;
1121 }
1122 return n;
1123 }
1124
1125 /**
1126 * @throws UnsupportedOperationException {@inheritDoc}
1127 * @throws ClassCastException {@inheritDoc}
1128 * @throws NullPointerException {@inheritDoc}
1129 * @throws IllegalArgumentException {@inheritDoc}
1130 */
1131 public int drainTo(Collection<? super E> c, int maxElements) {
1132 if (c == null)
1133 throw new NullPointerException();
1134 if (c == this)
1135 throw new IllegalArgumentException();
1136 int n = 0;
1137 for (E e; n < maxElements && (e = poll()) != null;) {
1138 c.add(e);
1139 ++n;
1140 }
1141 return n;
1142 }
1143
1144 /*
1145 * To cope with serialization strategy in the 1.5 version of
1146 * SynchronousQueue, we declare some unused classes and fields
1147 * that exist solely to enable serializability across versions.
1148 * These fields are never used, so are initialized only if this
1149 * object is ever serialized or deserialized.
1150 */
1151
1152 @SuppressWarnings("serial")
1153 static class WaitQueue implements java.io.Serializable { }
1154 static class LifoWaitQueue extends WaitQueue {
1155 private static final long serialVersionUID = -3633113410248163686L;
1156 }
1157 static class FifoWaitQueue extends WaitQueue {
1158 private static final long serialVersionUID = -3623113410248163686L;
1159 }
1160 private ReentrantLock qlock;
1161 private WaitQueue waitingProducers;
1162 private WaitQueue waitingConsumers;
1163
1164 /**
1165 * Saves the state to a stream (that is, serializes it).
1166 *
1167 * @param s the stream
1168 */
1169 private void writeObject(java.io.ObjectOutputStream s)
1170 throws java.io.IOException {
1171 boolean fair = transferer instanceof TransferQueue;
1172 if (fair) {
1173 qlock = new ReentrantLock(true);
1174 waitingProducers = new FifoWaitQueue();
1175 waitingConsumers = new FifoWaitQueue();
1176 }
1177 else {
1178 qlock = new ReentrantLock();
1179 waitingProducers = new LifoWaitQueue();
1180 waitingConsumers = new LifoWaitQueue();
1181 }
1182 s.defaultWriteObject();
1183 }
1184
1185 private void readObject(final java.io.ObjectInputStream s)
1186 throws java.io.IOException, ClassNotFoundException {
1187 s.defaultReadObject();
1188 if (waitingProducers instanceof FifoWaitQueue)
1189 transferer = new TransferQueue<E>();
1190 else
1191 transferer = new TransferStack<E>();
1192 }
1193
1194 // Unsafe mechanics
1195 static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1196 String field, Class<?> klazz) {
1197 try {
1198 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1199 } catch (NoSuchFieldException e) {
1200 // Convert Exception to corresponding Error
1201 NoSuchFieldError error = new NoSuchFieldError(field);
1202 error.initCause(e);
1203 throw error;
1204 }
1205 }
1206
1207 }
|