src/share/classes/java/util/concurrent/SynchronousQueue.java
Print this page
@@ -34,11 +34,10 @@
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.util.concurrent.locks.*;
-import java.util.concurrent.atomic.*;
import java.util.*;
/**
* A {@linkplain BlockingQueue blocking queue} in which each insert
* operation must wait for a corresponding remove operation by another
@@ -161,11 +160,11 @@
*/
/**
* Shared internal API for dual stacks and queues.
*/
- abstract static class Transferer {
+ abstract static class Transferer<E> {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
@@ -175,11 +174,11 @@
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
- abstract Object transfer(Object e, boolean timed, long nanos);
+ abstract E transfer(E e, boolean timed, long nanos);
}
/** The number of CPUs, for spin control */
static final int NCPUS = Runtime.getRuntime().availableProcessors();
@@ -204,11 +203,11 @@
* rather than to use timed park. A rough estimate suffices.
*/
static final long spinForTimeoutThreshold = 1000L;
/** Dual stack */
- static final class TransferStack extends Transferer {
+ static final class TransferStack<E> extends Transferer<E> {
/*
* This extends Scherer-Scott dual stack algorithm, differing,
* among other ways, by using "covering" nodes rather than
* bit-marked pointers: Fulfilling operations push on marker
* nodes (with FULFILLING bit set in mode) to reserve a spot
@@ -284,11 +283,11 @@
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class k = SNode.class;
+ Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
@@ -320,11 +319,12 @@
}
/**
* Puts or takes an item.
*/
- Object transfer(Object e, boolean timed, long nanos) {
+ @SuppressWarnings("unchecked")
+ E transfer(E e, boolean timed, long nanos) {
/*
* Basic algorithm is to loop trying one of three actions:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
@@ -361,11 +361,11 @@
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
- return (mode == REQUEST) ? m.item : s.item;
+ return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
@@ -377,11 +377,11 @@
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
- return (mode == REQUEST) ? m.item : s.item;
+ return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
@@ -511,21 +511,21 @@
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class k = TransferStack.class;
+ Class<?> k = TransferStack.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/** Dual Queue */
- static final class TransferQueue extends Transferer {
+ static final class TransferQueue<E> extends Transferer<E> {
/*
* This extends Scherer-Scott dual queue algorithm, differing,
* among other ways, by using modes within nodes rather than
* marked pointers. The algorithm is a little simpler than
* that for stacks because fulfillers do not need explicit
@@ -581,11 +581,11 @@
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class k = QNode.class;
+ Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
@@ -638,11 +638,12 @@
}
/**
* Puts or takes an item.
*/
- Object transfer(Object e, boolean timed, long nanos) {
+ @SuppressWarnings("unchecked")
+ E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
@@ -701,11 +702,11 @@
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
- return (x != null) ? x : e;
+ return (x != null) ? (E)x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
@@ -718,11 +719,11 @@
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
- return (x != null) ? x : e;
+ return (x != null) ? (E)x : e;
}
}
}
/**
@@ -732,11 +733,11 @@
* @param e the comparison value for checking match
* @param timed true if timed wait
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
- Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
+ Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
@@ -825,11 +826,11 @@
private static final long tailOffset;
private static final long cleanMeOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class k = TransferQueue.class;
+ Class<?> k = TransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
cleanMeOffset = UNSAFE.objectFieldOffset
@@ -845,11 +846,11 @@
* as final without further complicating serialization. Since
* this is accessed only at most once per public method, there
* isn't a noticeable performance penalty for using volatile
* instead of final here.
*/
- private transient volatile Transferer transferer;
+ private transient volatile Transferer<E> transferer;
/**
* Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
*/
public SynchronousQueue() {
@@ -861,11 +862,11 @@
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
- transferer = fair ? new TransferQueue() : new TransferStack();
+ transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
/**
* Adds the specified element to this queue, waiting if necessary for
* another thread to receive it.
@@ -920,13 +921,13 @@
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
- Object e = transferer.transfer(null, false, 0);
+ E e = transferer.transfer(null, false, 0);
if (e != null)
- return (E)e;
+ return e;
Thread.interrupted();
throw new InterruptedException();
}
/**
@@ -937,13 +938,13 @@
* @return the head of this queue, or <tt>null</tt> if the
* specified waiting time elapses before an element is present.
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- Object e = transferer.transfer(null, true, unit.toNanos(timeout));
+ E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
- return (E)e;
+ return e;
throw new InterruptedException();
}
/**
* Retrieves and removes the head of this queue, if another thread
@@ -951,11 +952,11 @@
*
* @return the head of this queue, or <tt>null</tt> if no
* element is available.
*/
public E poll() {
- return (E)transferer.transfer(null, true, 0);
+ return transferer.transfer(null, true, 0);
}
/**
* Always returns <tt>true</tt>.
* A <tt>SynchronousQueue</tt> has no internal capacity.
@@ -1063,14 +1064,25 @@
* Returns an empty iterator in which <tt>hasNext</tt> always returns
* <tt>false</tt>.
*
* @return an empty iterator
*/
+ @SuppressWarnings("unchecked")
public Iterator<E> iterator() {
- return Collections.emptyIterator();
+ return (Iterator<E>) EmptyIterator.EMPTY_ITERATOR;
}
+ // Replicated from a previous version of Collections
+ private static class EmptyIterator<E> implements Iterator<E> {
+ static final EmptyIterator<Object> EMPTY_ITERATOR
+ = new EmptyIterator<Object>();
+
+ public boolean hasNext() { return false; }
+ public E next() { throw new NoSuchElementException(); }
+ public void remove() { throw new IllegalStateException(); }
+ }
+
/**
* Returns a zero-length array.
* @return a zero-length array
*/
public Object[] toArray() {
@@ -1101,12 +1113,11 @@
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
int n = 0;
- E e;
- while ( (e = poll()) != null) {
+ for (E e; (e = poll()) != null;) {
c.add(e);
++n;
}
return n;
}
@@ -1121,12 +1132,11 @@
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
int n = 0;
- E e;
- while (n < maxElements && (e = poll()) != null) {
+ for (E e; n < maxElements && (e = poll()) != null;) {
c.add(e);
++n;
}
return n;
}
@@ -1137,10 +1147,11 @@
* that exist solely to enable serializability across versions.
* These fields are never used, so are initialized only if this
* object is ever serialized or deserialized.
*/
+ @SuppressWarnings("serial")
static class WaitQueue implements java.io.Serializable { }
static class LifoWaitQueue extends WaitQueue {
private static final long serialVersionUID = -3633113410248163686L;
}
static class FifoWaitQueue extends WaitQueue {
@@ -1149,11 +1160,11 @@
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;
/**
- * Save the state to a stream (that is, serialize it).
+ * Saves the state to a stream (that is, serializes it).
*
* @param s the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
@@ -1173,13 +1184,13 @@
private void readObject(final java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
if (waitingProducers instanceof FifoWaitQueue)
- transferer = new TransferQueue();
+ transferer = new TransferQueue<E>();
else
- transferer = new TransferStack();
+ transferer = new TransferStack<E>();
}
// Unsafe mechanics
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {