src/share/classes/java/util/concurrent/SynchronousQueue.java

Print this page

        

@@ -34,11 +34,10 @@
  * http://creativecommons.org/publicdomain/zero/1.0/
  */
 
 package java.util.concurrent;
 import java.util.concurrent.locks.*;
-import java.util.concurrent.atomic.*;
 import java.util.*;
 
 /**
  * A {@linkplain BlockingQueue blocking queue} in which each insert
  * operation must wait for a corresponding remove operation by another

@@ -161,11 +160,11 @@
      */
 
     /**
      * Shared internal API for dual stacks and queues.
      */
-    abstract static class Transferer {
+    abstract static class Transferer<E> {
         /**
          * Performs a put or take.
          *
          * @param e if non-null, the item to be handed to a consumer;
          *          if null, requests that transfer return an item

@@ -175,11 +174,11 @@
          * @return if non-null, the item provided or received; if null,
          *         the operation failed due to timeout or interrupt --
          *         the caller can distinguish which of these occurred
          *         by checking Thread.interrupted.
          */
-        abstract Object transfer(Object e, boolean timed, long nanos);
+        abstract E transfer(E e, boolean timed, long nanos);
     }
 
     /** The number of CPUs, for spin control */
     static final int NCPUS = Runtime.getRuntime().availableProcessors();
 

@@ -204,11 +203,11 @@
      * rather than to use timed park. A rough estimate suffices.
      */
     static final long spinForTimeoutThreshold = 1000L;
 
     /** Dual stack */
-    static final class TransferStack extends Transferer {
+    static final class TransferStack<E> extends Transferer<E> {
         /*
          * This extends Scherer-Scott dual stack algorithm, differing,
          * among other ways, by using "covering" nodes rather than
          * bit-marked pointers: Fulfilling operations push on marker
          * nodes (with FULFILLING bit set in mode) to reserve a spot

@@ -284,11 +283,11 @@
             private static final long nextOffset;
 
             static {
                 try {
                     UNSAFE = sun.misc.Unsafe.getUnsafe();
-                    Class k = SNode.class;
+                    Class<?> k = SNode.class;
                     matchOffset = UNSAFE.objectFieldOffset
                         (k.getDeclaredField("match"));
                     nextOffset = UNSAFE.objectFieldOffset
                         (k.getDeclaredField("next"));
                 } catch (Exception e) {

@@ -320,11 +319,12 @@
         }
 
         /**
          * Puts or takes an item.
          */
-        Object transfer(Object e, boolean timed, long nanos) {
+        @SuppressWarnings("unchecked")
+        E transfer(E e, boolean timed, long nanos) {
             /*
              * Basic algorithm is to loop trying one of three actions:
              *
              * 1. If apparently empty or already containing nodes of same
              *    mode, try to push node on stack and wait for a match,

@@ -361,11 +361,11 @@
                             clean(s);
                             return null;
                         }
                         if ((h = head) != null && h.next == s)
                             casHead(h, s.next);     // help s's fulfiller
-                        return (mode == REQUEST) ? m.item : s.item;
+                        return (E) ((mode == REQUEST) ? m.item : s.item);
                     }
                 } else if (!isFulfilling(h.mode)) { // try to fulfill
                     if (h.isCancelled())            // already cancelled
                         casHead(h, h.next);         // pop and retry
                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {

@@ -377,11 +377,11 @@
                                 break;              // restart main loop
                             }
                             SNode mn = m.next;
                             if (m.tryMatch(s)) {
                                 casHead(s, mn);     // pop both s and m
-                                return (mode == REQUEST) ? m.item : s.item;
+                                return (E) ((mode == REQUEST) ? m.item : s.item);
                             } else                  // lost match
                                 s.casNext(m, mn);   // help unlink
                         }
                     }
                 } else {                            // help a fulfiller

@@ -511,21 +511,21 @@
         private static final sun.misc.Unsafe UNSAFE;
         private static final long headOffset;
         static {
             try {
                 UNSAFE = sun.misc.Unsafe.getUnsafe();
-                Class k = TransferStack.class;
+                Class<?> k = TransferStack.class;
                 headOffset = UNSAFE.objectFieldOffset
                     (k.getDeclaredField("head"));
             } catch (Exception e) {
                 throw new Error(e);
             }
         }
     }
 
     /** Dual Queue */
-    static final class TransferQueue extends Transferer {
+    static final class TransferQueue<E> extends Transferer<E> {
         /*
          * This extends Scherer-Scott dual queue algorithm, differing,
          * among other ways, by using modes within nodes rather than
          * marked pointers. The algorithm is a little simpler than
          * that for stacks because fulfillers do not need explicit

@@ -581,11 +581,11 @@
             private static final long nextOffset;
 
             static {
                 try {
                     UNSAFE = sun.misc.Unsafe.getUnsafe();
-                    Class k = QNode.class;
+                    Class<?> k = QNode.class;
                     itemOffset = UNSAFE.objectFieldOffset
                         (k.getDeclaredField("item"));
                     nextOffset = UNSAFE.objectFieldOffset
                         (k.getDeclaredField("next"));
                 } catch (Exception e) {

@@ -638,11 +638,12 @@
         }
 
         /**
          * Puts or takes an item.
          */
-        Object transfer(Object e, boolean timed, long nanos) {
+        @SuppressWarnings("unchecked")
+        E transfer(E e, boolean timed, long nanos) {
             /* Basic algorithm is to loop trying to take either of
              * two actions:
              *
              * 1. If queue apparently empty or holding same-mode nodes,
              *    try to add node to queue of waiters, wait to be

@@ -701,11 +702,11 @@
                         advanceHead(t, s);          // unlink if head
                         if (x != null)              // and forget fields
                             s.item = s;
                         s.waiter = null;
                     }
-                    return (x != null) ? x : e;
+                    return (x != null) ? (E)x : e;
 
                 } else {                            // complementary-mode
                     QNode m = h.next;               // node to fulfill
                     if (t != tail || m == null || h != head)
                         continue;                   // inconsistent read

@@ -718,11 +719,11 @@
                         continue;
                     }
 
                     advanceHead(h, m);              // successfully fulfilled
                     LockSupport.unpark(m.waiter);
-                    return (x != null) ? x : e;
+                    return (x != null) ? (E)x : e;
                 }
             }
         }
 
         /**

@@ -732,11 +733,11 @@
          * @param e the comparison value for checking match
          * @param timed true if timed wait
          * @param nanos timeout value
          * @return matched item, or s if cancelled
          */
-        Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
+        Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
             /* Same idea as TransferStack.awaitFulfill */
             long lastTime = timed ? System.nanoTime() : 0;
             Thread w = Thread.currentThread();
             int spins = ((head.next == s) ?
                          (timed ? maxTimedSpins : maxUntimedSpins) : 0);

@@ -825,11 +826,11 @@
         private static final long tailOffset;
         private static final long cleanMeOffset;
         static {
             try {
                 UNSAFE = sun.misc.Unsafe.getUnsafe();
-                Class k = TransferQueue.class;
+                Class<?> k = TransferQueue.class;
                 headOffset = UNSAFE.objectFieldOffset
                     (k.getDeclaredField("head"));
                 tailOffset = UNSAFE.objectFieldOffset
                     (k.getDeclaredField("tail"));
                 cleanMeOffset = UNSAFE.objectFieldOffset

@@ -845,11 +846,11 @@
      * as final without further complicating serialization.  Since
      * this is accessed only at most once per public method, there
      * isn't a noticeable performance penalty for using volatile
      * instead of final here.
      */
-    private transient volatile Transferer transferer;
+    private transient volatile Transferer<E> transferer;
 
     /**
      * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
      */
     public SynchronousQueue() {

@@ -861,11 +862,11 @@
      *
      * @param fair if true, waiting threads contend in FIFO order for
      *        access; otherwise the order is unspecified.
      */
     public SynchronousQueue(boolean fair) {
-        transferer = fair ? new TransferQueue() : new TransferStack();
+        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
     }
 
     /**
      * Adds the specified element to this queue, waiting if necessary for
      * another thread to receive it.

@@ -920,13 +921,13 @@
      *
      * @return the head of this queue
      * @throws InterruptedException {@inheritDoc}
      */
     public E take() throws InterruptedException {
-        Object e = transferer.transfer(null, false, 0);
+        E e = transferer.transfer(null, false, 0);
         if (e != null)
-            return (E)e;
+            return e;
         Thread.interrupted();
         throw new InterruptedException();
     }
 
     /**

@@ -937,13 +938,13 @@
      * @return the head of this queue, or <tt>null</tt> if the
      *         specified waiting time elapses before an element is present.
      * @throws InterruptedException {@inheritDoc}
      */
     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
-        Object e = transferer.transfer(null, true, unit.toNanos(timeout));
+        E e = transferer.transfer(null, true, unit.toNanos(timeout));
         if (e != null || !Thread.interrupted())
-            return (E)e;
+            return e;
         throw new InterruptedException();
     }
 
     /**
      * Retrieves and removes the head of this queue, if another thread

@@ -951,11 +952,11 @@
      *
      * @return the head of this queue, or <tt>null</tt> if no
      *         element is available.
      */
     public E poll() {
-        return (E)transferer.transfer(null, true, 0);
+        return transferer.transfer(null, true, 0);
     }
 
     /**
      * Always returns <tt>true</tt>.
      * A <tt>SynchronousQueue</tt> has no internal capacity.

@@ -1063,14 +1064,25 @@
      * Returns an empty iterator in which <tt>hasNext</tt> always returns
      * <tt>false</tt>.
      *
      * @return an empty iterator
      */
+    @SuppressWarnings("unchecked")
     public Iterator<E> iterator() {
-        return Collections.emptyIterator();
+        return (Iterator<E>) EmptyIterator.EMPTY_ITERATOR;
     }
 
+    // Replicated from a previous version of Collections
+    private static class EmptyIterator<E> implements Iterator<E> {
+        static final EmptyIterator<Object> EMPTY_ITERATOR
+            = new EmptyIterator<Object>();
+
+        public boolean hasNext() { return false; }
+        public E next() { throw new NoSuchElementException(); }
+        public void remove() { throw new IllegalStateException(); }
+    }
+
     /**
      * Returns a zero-length array.
      * @return a zero-length array
      */
     public Object[] toArray() {

@@ -1101,12 +1113,11 @@
         if (c == null)
             throw new NullPointerException();
         if (c == this)
             throw new IllegalArgumentException();
         int n = 0;
-        E e;
-        while ( (e = poll()) != null) {
+        for (E e; (e = poll()) != null;) {
             c.add(e);
             ++n;
         }
         return n;
     }

@@ -1121,12 +1132,11 @@
         if (c == null)
             throw new NullPointerException();
         if (c == this)
             throw new IllegalArgumentException();
         int n = 0;
-        E e;
-        while (n < maxElements && (e = poll()) != null) {
+        for (E e; n < maxElements && (e = poll()) != null;) {
             c.add(e);
             ++n;
         }
         return n;
     }

@@ -1137,10 +1147,11 @@
      * that exist solely to enable serializability across versions.
      * These fields are never used, so are initialized only if this
      * object is ever serialized or deserialized.
      */
 
+    @SuppressWarnings("serial")
     static class WaitQueue implements java.io.Serializable { }
     static class LifoWaitQueue extends WaitQueue {
         private static final long serialVersionUID = -3633113410248163686L;
     }
     static class FifoWaitQueue extends WaitQueue {

@@ -1149,11 +1160,11 @@
     private ReentrantLock qlock;
     private WaitQueue waitingProducers;
     private WaitQueue waitingConsumers;
 
     /**
-     * Save the state to a stream (that is, serialize it).
+     * Saves the state to a stream (that is, serializes it).
      *
      * @param s the stream
      */
     private void writeObject(java.io.ObjectOutputStream s)
         throws java.io.IOException {

@@ -1173,13 +1184,13 @@
 
     private void readObject(final java.io.ObjectInputStream s)
         throws java.io.IOException, ClassNotFoundException {
         s.defaultReadObject();
         if (waitingProducers instanceof FifoWaitQueue)
-            transferer = new TransferQueue();
+            transferer = new TransferQueue<E>();
         else
-            transferer = new TransferStack();
+            transferer = new TransferStack<E>();
     }
 
     // Unsafe mechanics
     static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
                                   String field, Class<?> klazz) {