146 * potentially O(n) traversal to be sure that we can remove the
147 * node, but this can run concurrently with other threads
148 * accessing the stack.
149 *
150 * While garbage collection takes care of most node reclamation
151 * issues that otherwise complicate nonblocking algorithms, care
152 * is taken to "forget" references to data, other nodes, and
153 * threads that might be held on to long-term by blocked
154 * threads. In cases where setting to null would otherwise
155 * conflict with main algorithms, this is done by changing a
156 * node's link to now point to the node itself. This doesn't arise
157 * much for Stack nodes (because blocked threads do not hang on to
158 * old head pointers), but references in Queue nodes must be
159 * aggressively forgotten to avoid reachability of everything any
160 * node has ever referred to since arrival.
161 */
162
163 /**
164 * Shared internal API for dual stacks and queues.
165 */
166 static abstract class Transferer {
167 /**
168 * Performs a put or take.
169 *
170 * @param e if non-null, the item to be handed to a consumer;
171 * if null, requests that transfer return an item
172 * offered by producer.
173 * @param timed if this operation should timeout
174 * @param nanos the timeout, in nanoseconds
175 * @return if non-null, the item provided or received; if null,
176 * the operation failed due to timeout or interrupt --
177 * the caller can distinguish which of these occurred
178 * by checking Thread.interrupted.
179 */
180 abstract Object transfer(Object e, boolean timed, long nanos);
181 }
182
183 /** The number of CPUs, for spin control */
184 static final int NCPUS = Runtime.getRuntime().availableProcessors();
185
186 /**
187 * The number of times to spin before blocking in timed waits.
188 * The value is empirically derived -- it works well across a
189 * variety of processors and OSes. Empirically, the best value
190 * seems not to vary with number of CPUs (beyond 2) so is just
191 * a constant.
192 */
193 static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
194
195 /**
196 * The number of times to spin before blocking in untimed waits.
197 * This is greater than timed value because untimed waits spin
198 * faster since they don't need to check times on each spin.
199 */
200 static final int maxUntimedSpins = maxTimedSpins * 16;
201
202 /**
203 * The number of nanoseconds for which it is faster to spin
204 * rather than to use timed park. A rough estimate suffices.
205 */
206 static final long spinForTimeoutThreshold = 1000L;
207
208 /** Dual stack */
209 static final class TransferStack extends Transferer {
210 /*
211 * This extends Scherer-Scott dual stack algorithm, differing,
212 * among other ways, by using "covering" nodes rather than
213 * bit-marked pointers: Fulfilling operations push on marker
224 static final int FULFILLING = 2;
225
226 /** Return true if m has fulfilling bit set */
227 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
228
229 /** Node class for TransferStacks. */
230 static final class SNode {
231 volatile SNode next; // next node in stack
232 volatile SNode match; // the node matched to this
233 volatile Thread waiter; // to control park/unpark
234 Object item; // data; or null for REQUESTs
235 int mode;
236 // Note: item and mode fields don't need to be volatile
237 // since they are always written before, and read after,
238 // other volatile/atomic operations.
239
240 SNode(Object item) {
241 this.item = item;
242 }
243
244 static final AtomicReferenceFieldUpdater<SNode, SNode>
245 nextUpdater = AtomicReferenceFieldUpdater.newUpdater
246 (SNode.class, SNode.class, "next");
247
248 boolean casNext(SNode cmp, SNode val) {
249 return (cmp == next &&
250 nextUpdater.compareAndSet(this, cmp, val));
251 }
252
253 static final AtomicReferenceFieldUpdater<SNode, SNode>
254 matchUpdater = AtomicReferenceFieldUpdater.newUpdater
255 (SNode.class, SNode.class, "match");
256
257 /**
258 * Tries to match node s to this node, if so, waking up thread.
259 * Fulfillers call tryMatch to identify their waiters.
260 * Waiters block until they have been matched.
261 *
262 * @param s the node to match
263 * @return true if successfully matched to s
264 */
265 boolean tryMatch(SNode s) {
266 if (match == null &&
267 matchUpdater.compareAndSet(this, null, s)) {
268 Thread w = waiter;
269 if (w != null) { // waiters need at most one unpark
270 waiter = null;
271 LockSupport.unpark(w);
272 }
273 return true;
274 }
275 return match == s;
276 }
277
278 /**
279 * Tries to cancel a wait by matching node to itself.
280 */
281 void tryCancel() {
282 matchUpdater.compareAndSet(this, null, this);
283 }
284
285 boolean isCancelled() {
286 return match == this;
287 }
288 }
289
290 /** The head (top) of the stack */
291 volatile SNode head;
292
293 static final AtomicReferenceFieldUpdater<TransferStack, SNode>
294 headUpdater = AtomicReferenceFieldUpdater.newUpdater
295 (TransferStack.class, SNode.class, "head");
296
297 boolean casHead(SNode h, SNode nh) {
298 return h == head && headUpdater.compareAndSet(this, h, nh);
299 }
300
301 /**
302 * Creates or resets fields of a node. Called only from transfer
303 * where the node to push on stack is lazily created and
304 * reused when possible to help reduce intervals between reads
305 * and CASes of head and to avoid surges of garbage when CASes
306 * to push nodes fail due to contention.
307 */
308 static SNode snode(SNode s, Object e, SNode next, int mode) {
309 if (s == null) s = new SNode(e);
310 s.mode = mode;
311 s.next = next;
312 return s;
313 }
314
315 /**
316 * Puts or takes an item.
317 */
318 Object transfer(Object e, boolean timed, long nanos) {
321 *
322 * 1. If apparently empty or already containing nodes of same
323 * mode, try to push node on stack and wait for a match,
324 * returning it, or null if cancelled.
325 *
326 * 2. If apparently containing node of complementary mode,
327 * try to push a fulfilling node on to stack, match
328 * with corresponding waiting node, pop both from
329 * stack, and return matched item. The matching or
330 * unlinking might not actually be necessary because of
331 * other threads performing action 3:
332 *
333 * 3. If top of stack already holds another fulfilling node,
334 * help it out by doing its match and/or pop
335 * operations, and then continue. The code for helping
336 * is essentially the same as for fulfilling, except
337 * that it doesn't return the item.
338 */
339
340 SNode s = null; // constructed/reused as needed
341 int mode = (e == null)? REQUEST : DATA;
342
343 for (;;) {
344 SNode h = head;
345 if (h == null || h.mode == mode) { // empty or same-mode
346 if (timed && nanos <= 0) { // can't wait
347 if (h != null && h.isCancelled())
348 casHead(h, h.next); // pop cancelled node
349 else
350 return null;
351 } else if (casHead(h, s = snode(s, e, h, mode))) {
352 SNode m = awaitFulfill(s, timed, nanos);
353 if (m == s) { // wait was cancelled
354 clean(s);
355 return null;
356 }
357 if ((h = head) != null && h.next == s)
358 casHead(h, s.next); // help s's fulfiller
359 return mode == REQUEST? m.item : s.item;
360 }
361 } else if (!isFulfilling(h.mode)) { // try to fulfill
362 if (h.isCancelled()) // already cancelled
363 casHead(h, h.next); // pop and retry
364 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
365 for (;;) { // loop until matched or waiters disappear
366 SNode m = s.next; // m is s's match
367 if (m == null) { // all waiters are gone
368 casHead(s, null); // pop fulfill node
369 s = null; // use new node next time
370 break; // restart main loop
371 }
372 SNode mn = m.next;
373 if (m.tryMatch(s)) {
374 casHead(s, mn); // pop both s and m
375 return (mode == REQUEST)? m.item : s.item;
376 } else // lost match
377 s.casNext(m, mn); // help unlink
378 }
379 }
380 } else { // help a fulfiller
381 SNode m = h.next; // m is h's match
382 if (m == null) // waiter is gone
383 casHead(h, null); // pop fulfilling node
384 else {
385 SNode mn = m.next;
386 if (m.tryMatch(h)) // help match
387 casHead(h, mn); // pop both h and m
388 else // lost match
389 h.casNext(m, mn); // help unlink
390 }
391 }
392 }
393 }
394
395 /**
406 * field and then rechecks state at least one more time
407 * before actually parking, thus covering race vs
408 * fulfiller noticing that waiter is non-null so should be
409 * woken.
410 *
411 * When invoked by nodes that appear at the point of call
412 * to be at the head of the stack, calls to park are
413 * preceded by spins to avoid blocking when producers and
414 * consumers are arriving very close in time. This can
415 * happen enough to bother only on multiprocessors.
416 *
417 * The order of checks for returning out of main loop
418 * reflects fact that interrupts have precedence over
419 * normal returns, which have precedence over
420 * timeouts. (So, on timeout, one last check for match is
421 * done before giving up.) Except that calls from untimed
422 * SynchronousQueue.{poll/offer} don't check interrupts
423 * and don't wait at all, so are trapped in transfer
424 * method rather than calling awaitFulfill.
425 */
426 long lastTime = (timed)? System.nanoTime() : 0;
427 Thread w = Thread.currentThread();
428 SNode h = head;
429 int spins = (shouldSpin(s)?
430 (timed? maxTimedSpins : maxUntimedSpins) : 0);
431 for (;;) {
432 if (w.isInterrupted())
433 s.tryCancel();
434 SNode m = s.match;
435 if (m != null)
436 return m;
437 if (timed) {
438 long now = System.nanoTime();
439 nanos -= now - lastTime;
440 lastTime = now;
441 if (nanos <= 0) {
442 s.tryCancel();
443 continue;
444 }
445 }
446 if (spins > 0)
447 spins = shouldSpin(s)? (spins-1) : 0;
448 else if (s.waiter == null)
449 s.waiter = w; // establish waiter so can park next iter
450 else if (!timed)
451 LockSupport.park(this);
452 else if (nanos > spinForTimeoutThreshold)
453 LockSupport.parkNanos(this, nanos);
454 }
455 }
456
457 /**
458 * Returns true if node s is at head or there is an active
459 * fulfiller.
460 */
461 boolean shouldSpin(SNode s) {
462 SNode h = head;
463 return (h == s || h == null || isFulfilling(h.mode));
464 }
465
466 /**
467 * Unlinks s from the stack.
482 */
483
484 SNode past = s.next;
485 if (past != null && past.isCancelled())
486 past = past.next;
487
488 // Absorb cancelled nodes at head
489 SNode p;
490 while ((p = head) != null && p != past && p.isCancelled())
491 casHead(p, p.next);
492
493 // Unsplice embedded nodes
494 while (p != null && p != past) {
495 SNode n = p.next;
496 if (n != null && n.isCancelled())
497 p.casNext(n, n.next);
498 else
499 p = n;
500 }
501 }
502 }
503
504 /** Dual Queue */
505 static final class TransferQueue extends Transferer {
506 /*
507 * This extends Scherer-Scott dual queue algorithm, differing,
508 * among other ways, by using modes within nodes rather than
509 * marked pointers. The algorithm is a little simpler than
510 * that for stacks because fulfillers do not need explicit
511 * nodes, and matching is done by CAS'ing QNode.item field
512 * from non-null to null (for put) or vice versa (for take).
513 */
514
515 /** Node class for TransferQueue. */
516 static final class QNode {
517 volatile QNode next; // next node in queue
518 volatile Object item; // CAS'ed to or from null
519 volatile Thread waiter; // to control park/unpark
520 final boolean isData;
521
522 QNode(Object item, boolean isData) {
523 this.item = item;
524 this.isData = isData;
525 }
526
527 static final AtomicReferenceFieldUpdater<QNode, QNode>
528 nextUpdater = AtomicReferenceFieldUpdater.newUpdater
529 (QNode.class, QNode.class, "next");
530
531 boolean casNext(QNode cmp, QNode val) {
532 return (next == cmp &&
533 nextUpdater.compareAndSet(this, cmp, val));
534 }
535
536 static final AtomicReferenceFieldUpdater<QNode, Object>
537 itemUpdater = AtomicReferenceFieldUpdater.newUpdater
538 (QNode.class, Object.class, "item");
539
540 boolean casItem(Object cmp, Object val) {
541 return (item == cmp &&
542 itemUpdater.compareAndSet(this, cmp, val));
543 }
544
545 /**
546 * Tries to cancel by CAS'ing ref to this as item.
547 */
548 void tryCancel(Object cmp) {
549 itemUpdater.compareAndSet(this, cmp, this);
550 }
551
552 boolean isCancelled() {
553 return item == this;
554 }
555
556 /**
557 * Returns true if this node is known to be off the queue
558 * because its next pointer has been forgotten due to
559 * an advanceHead operation.
560 */
561 boolean isOffList() {
562 return next == this;
563 }
564 }
565
566 /** Head of queue */
567 transient volatile QNode head;
568 /** Tail of queue */
569 transient volatile QNode tail;
570 /**
571 * Reference to a cancelled node that might not yet have been
572 * unlinked from queue because it was the last inserted node
573 * when it cancelled.
574 */
575 transient volatile QNode cleanMe;
576
577 TransferQueue() {
578 QNode h = new QNode(null, false); // initialize to dummy node.
579 head = h;
580 tail = h;
581 }
582
583 static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
584 headUpdater = AtomicReferenceFieldUpdater.newUpdater
585 (TransferQueue.class, QNode.class, "head");
586
587 /**
588 * Tries to cas nh as new head; if successful, unlink
589 * old head's next node to avoid garbage retention.
590 */
591 void advanceHead(QNode h, QNode nh) {
592 if (h == head && headUpdater.compareAndSet(this, h, nh))
593 h.next = h; // forget old next
594 }
595
596 static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
597 tailUpdater = AtomicReferenceFieldUpdater.newUpdater
598 (TransferQueue.class, QNode.class, "tail");
599
600 /**
601 * Tries to cas nt as new tail.
602 */
603 void advanceTail(QNode t, QNode nt) {
604 if (tail == t)
605 tailUpdater.compareAndSet(this, t, nt);
606 }
607
608 static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
609 cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater
610 (TransferQueue.class, QNode.class, "cleanMe");
611
612 /**
613 * Tries to CAS cleanMe slot.
614 */
615 boolean casCleanMe(QNode cmp, QNode val) {
616 return (cleanMe == cmp &&
617 cleanMeUpdater.compareAndSet(this, cmp, val));
618 }
619
620 /**
621 * Puts or takes an item.
622 */
623 Object transfer(Object e, boolean timed, long nanos) {
624 /* Basic algorithm is to loop trying to take either of
625 * two actions:
626 *
627 * 1. If queue apparently empty or holding same-mode nodes,
628 * try to add node to queue of waiters, wait to be
629 * fulfilled (or cancelled) and return matching item.
630 *
631 * 2. If queue apparently contains waiting items, and this
632 * call is of complementary mode, try to fulfill by CAS'ing
633 * item field of waiting node and dequeuing it, and then
634 * returning matching item.
635 *
636 * In each case, along the way, check for and try to help
637 * advance head and tail on behalf of other stalled/slow
666 if (timed && nanos <= 0) // can't wait
667 return null;
668 if (s == null)
669 s = new QNode(e, isData);
670 if (!t.casNext(null, s)) // failed to link in
671 continue;
672
673 advanceTail(t, s); // swing tail and wait
674 Object x = awaitFulfill(s, e, timed, nanos);
675 if (x == s) { // wait was cancelled
676 clean(t, s);
677 return null;
678 }
679
680 if (!s.isOffList()) { // not already unlinked
681 advanceHead(t, s); // unlink if head
682 if (x != null) // and forget fields
683 s.item = s;
684 s.waiter = null;
685 }
686 return (x != null)? x : e;
687
688 } else { // complementary-mode
689 QNode m = h.next; // node to fulfill
690 if (t != tail || m == null || h != head)
691 continue; // inconsistent read
692
693 Object x = m.item;
694 if (isData == (x != null) || // m already fulfilled
695 x == m || // m cancelled
696 !m.casItem(x, e)) { // lost CAS
697 advanceHead(h, m); // dequeue and retry
698 continue;
699 }
700
701 advanceHead(h, m); // successfully fulfilled
702 LockSupport.unpark(m.waiter);
703 return (x != null)? x : e;
704 }
705 }
706 }
707
708 /**
709 * Spins/blocks until node s is fulfilled.
710 *
711 * @param s the waiting node
712 * @param e the comparison value for checking match
713 * @param timed true if timed wait
714 * @param nanos timeout value
715 * @return matched item, or s if cancelled
716 */
717 Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
718 /* Same idea as TransferStack.awaitFulfill */
719 long lastTime = (timed)? System.nanoTime() : 0;
720 Thread w = Thread.currentThread();
721 int spins = ((head.next == s) ?
722 (timed? maxTimedSpins : maxUntimedSpins) : 0);
723 for (;;) {
724 if (w.isInterrupted())
725 s.tryCancel(e);
726 Object x = s.item;
727 if (x != e)
728 return x;
729 if (timed) {
730 long now = System.nanoTime();
731 nanos -= now - lastTime;
732 lastTime = now;
733 if (nanos <= 0) {
734 s.tryCancel(e);
735 continue;
736 }
737 }
738 if (spins > 0)
739 --spins;
740 else if (s.waiter == null)
741 s.waiter = w;
742 else if (!timed)
782 return;
783 }
784 QNode dp = cleanMe;
785 if (dp != null) { // Try unlinking previous cancelled node
786 QNode d = dp.next;
787 QNode dn;
788 if (d == null || // d is gone or
789 d == dp || // d is off list or
790 !d.isCancelled() || // d not cancelled or
791 (d != t && // d not tail and
792 (dn = d.next) != null && // has successor
793 dn != d && // that is on list
794 dp.casNext(d, dn))) // d unspliced
795 casCleanMe(dp, null);
796 if (dp == pred)
797 return; // s is already saved node
798 } else if (casCleanMe(null, pred))
799 return; // Postpone cleaning s
800 }
801 }
802 }
803
804 /**
805 * The transferer. Set only in constructor, but cannot be declared
806 * as final without further complicating serialization. Since
807 * this is accessed only at most once per public method, there
808 * isn't a noticeable performance penalty for using volatile
809 * instead of final here.
810 */
811 private transient volatile Transferer transferer;
812
813 /**
814 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
815 */
816 public SynchronousQueue() {
817 this(false);
818 }
819
820 /**
821 * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
822 *
823 * @param fair if true, waiting threads contend in FIFO order for
824 * access; otherwise the order is unspecified.
825 */
826 public SynchronousQueue(boolean fair) {
827 transferer = (fair)? new TransferQueue() : new TransferStack();
828 }
829
830 /**
831 * Adds the specified element to this queue, waiting if necessary for
832 * another thread to receive it.
833 *
834 * @throws InterruptedException {@inheritDoc}
835 * @throws NullPointerException {@inheritDoc}
836 */
837 public void put(E o) throws InterruptedException {
838 if (o == null) throw new NullPointerException();
839 if (transferer.transfer(o, false, 0) == null) {
840 Thread.interrupted();
841 throw new InterruptedException();
842 }
843 }
844
845 /**
846 * Inserts the specified element into this queue, waiting if necessary
847 * up to the specified wait time for another thread to receive it.
1124 waitingProducers = new FifoWaitQueue();
1125 waitingConsumers = new FifoWaitQueue();
1126 }
1127 else {
1128 qlock = new ReentrantLock();
1129 waitingProducers = new LifoWaitQueue();
1130 waitingConsumers = new LifoWaitQueue();
1131 }
1132 s.defaultWriteObject();
1133 }
1134
1135 private void readObject(final java.io.ObjectInputStream s)
1136 throws java.io.IOException, ClassNotFoundException {
1137 s.defaultReadObject();
1138 if (waitingProducers instanceof FifoWaitQueue)
1139 transferer = new TransferQueue();
1140 else
1141 transferer = new TransferStack();
1142 }
1143
1144 }
|
146 * potentially O(n) traversal to be sure that we can remove the
147 * node, but this can run concurrently with other threads
148 * accessing the stack.
149 *
150 * While garbage collection takes care of most node reclamation
151 * issues that otherwise complicate nonblocking algorithms, care
152 * is taken to "forget" references to data, other nodes, and
153 * threads that might be held on to long-term by blocked
154 * threads. In cases where setting to null would otherwise
155 * conflict with main algorithms, this is done by changing a
156 * node's link to now point to the node itself. This doesn't arise
157 * much for Stack nodes (because blocked threads do not hang on to
158 * old head pointers), but references in Queue nodes must be
159 * aggressively forgotten to avoid reachability of everything any
160 * node has ever referred to since arrival.
161 */
162
163 /**
164 * Shared internal API for dual stacks and queues.
165 */
166 abstract static class Transferer {
167 /**
168 * Performs a put or take.
169 *
170 * @param e if non-null, the item to be handed to a consumer;
171 * if null, requests that transfer return an item
172 * offered by producer.
173 * @param timed if this operation should timeout
174 * @param nanos the timeout, in nanoseconds
175 * @return if non-null, the item provided or received; if null,
176 * the operation failed due to timeout or interrupt --
177 * the caller can distinguish which of these occurred
178 * by checking Thread.interrupted.
179 */
180 abstract Object transfer(Object e, boolean timed, long nanos);
181 }
182
183 /** The number of CPUs, for spin control */
184 static final int NCPUS = Runtime.getRuntime().availableProcessors();
185
186 /**
187 * The number of times to spin before blocking in timed waits.
188 * The value is empirically derived -- it works well across a
189 * variety of processors and OSes. Empirically, the best value
190 * seems not to vary with number of CPUs (beyond 2) so is just
191 * a constant.
192 */
193 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
194
195 /**
196 * The number of times to spin before blocking in untimed waits.
197 * This is greater than timed value because untimed waits spin
198 * faster since they don't need to check times on each spin.
199 */
200 static final int maxUntimedSpins = maxTimedSpins * 16;
201
202 /**
203 * The number of nanoseconds for which it is faster to spin
204 * rather than to use timed park. A rough estimate suffices.
205 */
206 static final long spinForTimeoutThreshold = 1000L;
207
208 /** Dual stack */
209 static final class TransferStack extends Transferer {
210 /*
211 * This extends Scherer-Scott dual stack algorithm, differing,
212 * among other ways, by using "covering" nodes rather than
213 * bit-marked pointers: Fulfilling operations push on marker
224 static final int FULFILLING = 2;
225
226 /** Return true if m has fulfilling bit set */
227 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
228
229 /** Node class for TransferStacks. */
230 static final class SNode {
231 volatile SNode next; // next node in stack
232 volatile SNode match; // the node matched to this
233 volatile Thread waiter; // to control park/unpark
234 Object item; // data; or null for REQUESTs
235 int mode;
236 // Note: item and mode fields don't need to be volatile
237 // since they are always written before, and read after,
238 // other volatile/atomic operations.
239
240 SNode(Object item) {
241 this.item = item;
242 }
243
244 boolean casNext(SNode cmp, SNode val) {
245 return cmp == next &&
246 UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
247 }
248
249 /**
250 * Tries to match node s to this node, if so, waking up thread.
251 * Fulfillers call tryMatch to identify their waiters.
252 * Waiters block until they have been matched.
253 *
254 * @param s the node to match
255 * @return true if successfully matched to s
256 */
257 boolean tryMatch(SNode s) {
258 if (match == null &&
259 UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
260 Thread w = waiter;
261 if (w != null) { // waiters need at most one unpark
262 waiter = null;
263 LockSupport.unpark(w);
264 }
265 return true;
266 }
267 return match == s;
268 }
269
270 /**
271 * Tries to cancel a wait by matching node to itself.
272 */
273 void tryCancel() {
274 UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
275 }
276
277 boolean isCancelled() {
278 return match == this;
279 }
280
281 // Unsafe mechanics
282 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
283 private static final long nextOffset =
284 objectFieldOffset(UNSAFE, "next", SNode.class);
285 private static final long matchOffset =
286 objectFieldOffset(UNSAFE, "match", SNode.class);
287
288 }
289
290 /** The head (top) of the stack */
291 volatile SNode head;
292
293 boolean casHead(SNode h, SNode nh) {
294 return h == head &&
295 UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
296 }
297
298 /**
299 * Creates or resets fields of a node. Called only from transfer
300 * where the node to push on stack is lazily created and
301 * reused when possible to help reduce intervals between reads
302 * and CASes of head and to avoid surges of garbage when CASes
303 * to push nodes fail due to contention.
304 */
305 static SNode snode(SNode s, Object e, SNode next, int mode) {
306 if (s == null) s = new SNode(e);
307 s.mode = mode;
308 s.next = next;
309 return s;
310 }
311
312 /**
313 * Puts or takes an item.
314 */
315 Object transfer(Object e, boolean timed, long nanos) {
318 *
319 * 1. If apparently empty or already containing nodes of same
320 * mode, try to push node on stack and wait for a match,
321 * returning it, or null if cancelled.
322 *
323 * 2. If apparently containing node of complementary mode,
324 * try to push a fulfilling node on to stack, match
325 * with corresponding waiting node, pop both from
326 * stack, and return matched item. The matching or
327 * unlinking might not actually be necessary because of
328 * other threads performing action 3:
329 *
330 * 3. If top of stack already holds another fulfilling node,
331 * help it out by doing its match and/or pop
332 * operations, and then continue. The code for helping
333 * is essentially the same as for fulfilling, except
334 * that it doesn't return the item.
335 */
336
337 SNode s = null; // constructed/reused as needed
338 int mode = (e == null) ? REQUEST : DATA;
339
340 for (;;) {
341 SNode h = head;
342 if (h == null || h.mode == mode) { // empty or same-mode
343 if (timed && nanos <= 0) { // can't wait
344 if (h != null && h.isCancelled())
345 casHead(h, h.next); // pop cancelled node
346 else
347 return null;
348 } else if (casHead(h, s = snode(s, e, h, mode))) {
349 SNode m = awaitFulfill(s, timed, nanos);
350 if (m == s) { // wait was cancelled
351 clean(s);
352 return null;
353 }
354 if ((h = head) != null && h.next == s)
355 casHead(h, s.next); // help s's fulfiller
356 return (mode == REQUEST) ? m.item : s.item;
357 }
358 } else if (!isFulfilling(h.mode)) { // try to fulfill
359 if (h.isCancelled()) // already cancelled
360 casHead(h, h.next); // pop and retry
361 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
362 for (;;) { // loop until matched or waiters disappear
363 SNode m = s.next; // m is s's match
364 if (m == null) { // all waiters are gone
365 casHead(s, null); // pop fulfill node
366 s = null; // use new node next time
367 break; // restart main loop
368 }
369 SNode mn = m.next;
370 if (m.tryMatch(s)) {
371 casHead(s, mn); // pop both s and m
372 return (mode == REQUEST) ? m.item : s.item;
373 } else // lost match
374 s.casNext(m, mn); // help unlink
375 }
376 }
377 } else { // help a fulfiller
378 SNode m = h.next; // m is h's match
379 if (m == null) // waiter is gone
380 casHead(h, null); // pop fulfilling node
381 else {
382 SNode mn = m.next;
383 if (m.tryMatch(h)) // help match
384 casHead(h, mn); // pop both h and m
385 else // lost match
386 h.casNext(m, mn); // help unlink
387 }
388 }
389 }
390 }
391
392 /**
403 * field and then rechecks state at least one more time
404 * before actually parking, thus covering race vs
405 * fulfiller noticing that waiter is non-null so should be
406 * woken.
407 *
408 * When invoked by nodes that appear at the point of call
409 * to be at the head of the stack, calls to park are
410 * preceded by spins to avoid blocking when producers and
411 * consumers are arriving very close in time. This can
412 * happen enough to bother only on multiprocessors.
413 *
414 * The order of checks for returning out of main loop
415 * reflects fact that interrupts have precedence over
416 * normal returns, which have precedence over
417 * timeouts. (So, on timeout, one last check for match is
418 * done before giving up.) Except that calls from untimed
419 * SynchronousQueue.{poll/offer} don't check interrupts
420 * and don't wait at all, so are trapped in transfer
421 * method rather than calling awaitFulfill.
422 */
423 long lastTime = timed ? System.nanoTime() : 0;
424 Thread w = Thread.currentThread();
425 SNode h = head;
426 int spins = (shouldSpin(s) ?
427 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
428 for (;;) {
429 if (w.isInterrupted())
430 s.tryCancel();
431 SNode m = s.match;
432 if (m != null)
433 return m;
434 if (timed) {
435 long now = System.nanoTime();
436 nanos -= now - lastTime;
437 lastTime = now;
438 if (nanos <= 0) {
439 s.tryCancel();
440 continue;
441 }
442 }
443 if (spins > 0)
444 spins = shouldSpin(s) ? (spins-1) : 0;
445 else if (s.waiter == null)
446 s.waiter = w; // establish waiter so can park next iter
447 else if (!timed)
448 LockSupport.park(this);
449 else if (nanos > spinForTimeoutThreshold)
450 LockSupport.parkNanos(this, nanos);
451 }
452 }
453
454 /**
455 * Returns true if node s is at head or there is an active
456 * fulfiller.
457 */
458 boolean shouldSpin(SNode s) {
459 SNode h = head;
460 return (h == s || h == null || isFulfilling(h.mode));
461 }
462
463 /**
464 * Unlinks s from the stack.
479 */
480
481 SNode past = s.next;
482 if (past != null && past.isCancelled())
483 past = past.next;
484
485 // Absorb cancelled nodes at head
486 SNode p;
487 while ((p = head) != null && p != past && p.isCancelled())
488 casHead(p, p.next);
489
490 // Unsplice embedded nodes
491 while (p != null && p != past) {
492 SNode n = p.next;
493 if (n != null && n.isCancelled())
494 p.casNext(n, n.next);
495 else
496 p = n;
497 }
498 }
499
500 // Unsafe mechanics
501 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
502 private static final long headOffset =
503 objectFieldOffset(UNSAFE, "head", TransferStack.class);
504
505 }
506
507 /** Dual Queue */
508 static final class TransferQueue extends Transferer {
509 /*
510 * This extends Scherer-Scott dual queue algorithm, differing,
511 * among other ways, by using modes within nodes rather than
512 * marked pointers. The algorithm is a little simpler than
513 * that for stacks because fulfillers do not need explicit
514 * nodes, and matching is done by CAS'ing QNode.item field
515 * from non-null to null (for put) or vice versa (for take).
516 */
517
518 /** Node class for TransferQueue. */
519 static final class QNode {
520 volatile QNode next; // next node in queue
521 volatile Object item; // CAS'ed to or from null
522 volatile Thread waiter; // to control park/unpark
523 final boolean isData;
524
525 QNode(Object item, boolean isData) {
526 this.item = item;
527 this.isData = isData;
528 }
529
530 boolean casNext(QNode cmp, QNode val) {
531 return next == cmp &&
532 UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
533 }
534
535 boolean casItem(Object cmp, Object val) {
536 return item == cmp &&
537 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
538 }
539
540 /**
541 * Tries to cancel by CAS'ing ref to this as item.
542 */
543 void tryCancel(Object cmp) {
544 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
545 }
546
547 boolean isCancelled() {
548 return item == this;
549 }
550
551 /**
552 * Returns true if this node is known to be off the queue
553 * because its next pointer has been forgotten due to
554 * an advanceHead operation.
555 */
556 boolean isOffList() {
557 return next == this;
558 }
559
560 // Unsafe mechanics
561 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
562 private static final long nextOffset =
563 objectFieldOffset(UNSAFE, "next", QNode.class);
564 private static final long itemOffset =
565 objectFieldOffset(UNSAFE, "item", QNode.class);
566 }
567
568 /** Head of queue */
569 transient volatile QNode head;
570 /** Tail of queue */
571 transient volatile QNode tail;
572 /**
573 * Reference to a cancelled node that might not yet have been
574 * unlinked from queue because it was the last inserted node
575 * when it cancelled.
576 */
577 transient volatile QNode cleanMe;
578
579 TransferQueue() {
580 QNode h = new QNode(null, false); // initialize to dummy node.
581 head = h;
582 tail = h;
583 }
584
585 /**
586 * Tries to cas nh as new head; if successful, unlink
587 * old head's next node to avoid garbage retention.
588 */
589 void advanceHead(QNode h, QNode nh) {
590 if (h == head &&
591 UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
592 h.next = h; // forget old next
593 }
594
595 /**
596 * Tries to cas nt as new tail.
597 */
598 void advanceTail(QNode t, QNode nt) {
599 if (tail == t)
600 UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
601 }
602
603 /**
604 * Tries to CAS cleanMe slot.
605 */
606 boolean casCleanMe(QNode cmp, QNode val) {
607 return cleanMe == cmp &&
608 UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
609 }
610
611 /**
612 * Puts or takes an item.
613 */
614 Object transfer(Object e, boolean timed, long nanos) {
615 /* Basic algorithm is to loop trying to take either of
616 * two actions:
617 *
618 * 1. If queue apparently empty or holding same-mode nodes,
619 * try to add node to queue of waiters, wait to be
620 * fulfilled (or cancelled) and return matching item.
621 *
622 * 2. If queue apparently contains waiting items, and this
623 * call is of complementary mode, try to fulfill by CAS'ing
624 * item field of waiting node and dequeuing it, and then
625 * returning matching item.
626 *
627 * In each case, along the way, check for and try to help
628 * advance head and tail on behalf of other stalled/slow
657 if (timed && nanos <= 0) // can't wait
658 return null;
659 if (s == null)
660 s = new QNode(e, isData);
661 if (!t.casNext(null, s)) // failed to link in
662 continue;
663
664 advanceTail(t, s); // swing tail and wait
665 Object x = awaitFulfill(s, e, timed, nanos);
666 if (x == s) { // wait was cancelled
667 clean(t, s);
668 return null;
669 }
670
671 if (!s.isOffList()) { // not already unlinked
672 advanceHead(t, s); // unlink if head
673 if (x != null) // and forget fields
674 s.item = s;
675 s.waiter = null;
676 }
677 return (x != null) ? x : e;
678
679 } else { // complementary-mode
680 QNode m = h.next; // node to fulfill
681 if (t != tail || m == null || h != head)
682 continue; // inconsistent read
683
684 Object x = m.item;
685 if (isData == (x != null) || // m already fulfilled
686 x == m || // m cancelled
687 !m.casItem(x, e)) { // lost CAS
688 advanceHead(h, m); // dequeue and retry
689 continue;
690 }
691
692 advanceHead(h, m); // successfully fulfilled
693 LockSupport.unpark(m.waiter);
694 return (x != null) ? x : e;
695 }
696 }
697 }
698
699 /**
700 * Spins/blocks until node s is fulfilled.
701 *
702 * @param s the waiting node
703 * @param e the comparison value for checking match
704 * @param timed true if timed wait
705 * @param nanos timeout value
706 * @return matched item, or s if cancelled
707 */
708 Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
709 /* Same idea as TransferStack.awaitFulfill */
710 long lastTime = timed ? System.nanoTime() : 0;
711 Thread w = Thread.currentThread();
712 int spins = ((head.next == s) ?
713 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
714 for (;;) {
715 if (w.isInterrupted())
716 s.tryCancel(e);
717 Object x = s.item;
718 if (x != e)
719 return x;
720 if (timed) {
721 long now = System.nanoTime();
722 nanos -= now - lastTime;
723 lastTime = now;
724 if (nanos <= 0) {
725 s.tryCancel(e);
726 continue;
727 }
728 }
729 if (spins > 0)
730 --spins;
731 else if (s.waiter == null)
732 s.waiter = w;
733 else if (!timed)
773 return;
774 }
775 QNode dp = cleanMe;
776 if (dp != null) { // Try unlinking previous cancelled node
777 QNode d = dp.next;
778 QNode dn;
779 if (d == null || // d is gone or
780 d == dp || // d is off list or
781 !d.isCancelled() || // d not cancelled or
782 (d != t && // d not tail and
783 (dn = d.next) != null && // has successor
784 dn != d && // that is on list
785 dp.casNext(d, dn))) // d unspliced
786 casCleanMe(dp, null);
787 if (dp == pred)
788 return; // s is already saved node
789 } else if (casCleanMe(null, pred))
790 return; // Postpone cleaning s
791 }
792 }
793
794 // unsafe mechanics
795 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
796 private static final long headOffset =
797 objectFieldOffset(UNSAFE, "head", TransferQueue.class);
798 private static final long tailOffset =
799 objectFieldOffset(UNSAFE, "tail", TransferQueue.class);
800 private static final long cleanMeOffset =
801 objectFieldOffset(UNSAFE, "cleanMe", TransferQueue.class);
802
803 }
804
805 /**
806 * The transferer. Set only in constructor, but cannot be declared
807 * as final without further complicating serialization. Since
808 * this is accessed only at most once per public method, there
809 * isn't a noticeable performance penalty for using volatile
810 * instead of final here.
811 */
812 private transient volatile Transferer transferer;
813
814 /**
815 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
816 */
817 public SynchronousQueue() {
818 this(false);
819 }
820
821 /**
822 * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
823 *
824 * @param fair if true, waiting threads contend in FIFO order for
825 * access; otherwise the order is unspecified.
826 */
827 public SynchronousQueue(boolean fair) {
828 transferer = fair ? new TransferQueue() : new TransferStack();
829 }
830
831 /**
832 * Adds the specified element to this queue, waiting if necessary for
833 * another thread to receive it.
834 *
835 * @throws InterruptedException {@inheritDoc}
836 * @throws NullPointerException {@inheritDoc}
837 */
838 public void put(E o) throws InterruptedException {
839 if (o == null) throw new NullPointerException();
840 if (transferer.transfer(o, false, 0) == null) {
841 Thread.interrupted();
842 throw new InterruptedException();
843 }
844 }
845
846 /**
847 * Inserts the specified element into this queue, waiting if necessary
848 * up to the specified wait time for another thread to receive it.
1125 waitingProducers = new FifoWaitQueue();
1126 waitingConsumers = new FifoWaitQueue();
1127 }
1128 else {
1129 qlock = new ReentrantLock();
1130 waitingProducers = new LifoWaitQueue();
1131 waitingConsumers = new LifoWaitQueue();
1132 }
1133 s.defaultWriteObject();
1134 }
1135
1136 private void readObject(final java.io.ObjectInputStream s)
1137 throws java.io.IOException, ClassNotFoundException {
1138 s.defaultReadObject();
1139 if (waitingProducers instanceof FifoWaitQueue)
1140 transferer = new TransferQueue();
1141 else
1142 transferer = new TransferStack();
1143 }
1144
1145 // Unsafe mechanics
1146 static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
1147 String field, Class<?> klazz) {
1148 try {
1149 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1150 } catch (NoSuchFieldException e) {
1151 // Convert Exception to corresponding Error
1152 NoSuchFieldError error = new NoSuchFieldError(field);
1153 error.initCause(e);
1154 throw error;
1155 }
1156 }
1157
1158 }
|