src/share/classes/java/util/concurrent/SynchronousQueue.java

Print this page

        

*** 34,44 **** * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; import java.util.concurrent.locks.*; - import java.util.concurrent.atomic.*; import java.util.*; /** * A {@linkplain BlockingQueue blocking queue} in which each insert * operation must wait for a corresponding remove operation by another --- 34,43 ----
*** 161,171 **** */ /** * Shared internal API for dual stacks and queues. */ ! abstract static class Transferer { /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item --- 160,170 ---- */ /** * Shared internal API for dual stacks and queues. */ ! abstract static class Transferer<E> { /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item
*** 175,185 **** * @return if non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking Thread.interrupted. */ ! abstract Object transfer(Object e, boolean timed, long nanos); } /** The number of CPUs, for spin control */ static final int NCPUS = Runtime.getRuntime().availableProcessors(); --- 174,184 ---- * @return if non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking Thread.interrupted. */ ! abstract E transfer(E e, boolean timed, long nanos); } /** The number of CPUs, for spin control */ static final int NCPUS = Runtime.getRuntime().availableProcessors();
*** 204,214 **** * rather than to use timed park. A rough estimate suffices. */ static final long spinForTimeoutThreshold = 1000L; /** Dual stack */ ! static final class TransferStack extends Transferer { /* * This extends Scherer-Scott dual stack algorithm, differing, * among other ways, by using "covering" nodes rather than * bit-marked pointers: Fulfilling operations push on marker * nodes (with FULFILLING bit set in mode) to reserve a spot --- 203,213 ---- * rather than to use timed park. A rough estimate suffices. */ static final long spinForTimeoutThreshold = 1000L; /** Dual stack */ ! static final class TransferStack<E> extends Transferer<E> { /* * This extends Scherer-Scott dual stack algorithm, differing, * among other ways, by using "covering" nodes rather than * bit-marked pointers: Fulfilling operations push on marker * nodes (with FULFILLING bit set in mode) to reserve a spot
*** 284,294 **** private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); ! Class k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { --- 283,293 ---- private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); ! Class<?> k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) {
*** 320,330 **** } /** * Puts or takes an item. */ ! Object transfer(Object e, boolean timed, long nanos) { /* * Basic algorithm is to loop trying one of three actions: * * 1. If apparently empty or already containing nodes of same * mode, try to push node on stack and wait for a match, --- 319,330 ---- } /** * Puts or takes an item. */ ! @SuppressWarnings("unchecked") ! E transfer(E e, boolean timed, long nanos) { /* * Basic algorithm is to loop trying one of three actions: * * 1. If apparently empty or already containing nodes of same * mode, try to push node on stack and wait for a match,
*** 361,371 **** clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller ! return (mode == REQUEST) ? m.item : s.item; } } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { --- 361,371 ---- clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller ! return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
*** 377,387 **** break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m ! return (mode == REQUEST) ? m.item : s.item; } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller --- 377,387 ---- break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m ! return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller
*** 511,531 **** private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); ! Class k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } } /** Dual Queue */ ! static final class TransferQueue extends Transferer { /* * This extends Scherer-Scott dual queue algorithm, differing, * among other ways, by using modes within nodes rather than * marked pointers. The algorithm is a little simpler than * that for stacks because fulfillers do not need explicit --- 511,531 ---- private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); ! Class<?> k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } } /** Dual Queue */ ! static final class TransferQueue<E> extends Transferer<E> { /* * This extends Scherer-Scott dual queue algorithm, differing, * among other ways, by using modes within nodes rather than * marked pointers. The algorithm is a little simpler than * that for stacks because fulfillers do not need explicit
*** 581,591 **** private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); ! Class k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { --- 581,591 ---- private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); ! Class<?> k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) {
*** 638,648 **** } /** * Puts or takes an item. */ ! Object transfer(Object e, boolean timed, long nanos) { /* Basic algorithm is to loop trying to take either of * two actions: * * 1. If queue apparently empty or holding same-mode nodes, * try to add node to queue of waiters, wait to be --- 638,649 ---- } /** * Puts or takes an item. */ ! @SuppressWarnings("unchecked") ! E transfer(E e, boolean timed, long nanos) { /* Basic algorithm is to loop trying to take either of * two actions: * * 1. If queue apparently empty or holding same-mode nodes, * try to add node to queue of waiters, wait to be
*** 701,711 **** advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } ! return (x != null) ? x : e; } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read --- 702,712 ---- advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } ! return (x != null) ? (E)x : e; } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read
*** 718,728 **** continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); ! return (x != null) ? x : e; } } } /** --- 719,729 ---- continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); ! return (x != null) ? (E)x : e; } } } /**
*** 732,742 **** * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled */ ! Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ long lastTime = timed ? System.nanoTime() : 0; Thread w = Thread.currentThread(); int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); --- 733,743 ---- * @param e the comparison value for checking match * @param timed true if timed wait * @param nanos timeout value * @return matched item, or s if cancelled */ ! Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ long lastTime = timed ? System.nanoTime() : 0; Thread w = Thread.currentThread(); int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
*** 825,835 **** private static final long tailOffset; private static final long cleanMeOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); ! Class k = TransferQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); cleanMeOffset = UNSAFE.objectFieldOffset --- 826,836 ---- private static final long tailOffset; private static final long cleanMeOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); ! Class<?> k = TransferQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); cleanMeOffset = UNSAFE.objectFieldOffset
*** 845,855 **** * as final without further complicating serialization. Since * this is accessed only at most once per public method, there * isn't a noticeable performance penalty for using volatile * instead of final here. */ ! private transient volatile Transferer transferer; /** * Creates a <tt>SynchronousQueue</tt> with nonfair access policy. */ public SynchronousQueue() { --- 846,856 ---- * as final without further complicating serialization. Since * this is accessed only at most once per public method, there * isn't a noticeable performance penalty for using volatile * instead of final here. */ ! private transient volatile Transferer<E> transferer; /** * Creates a <tt>SynchronousQueue</tt> with nonfair access policy. */ public SynchronousQueue() {
*** 861,871 **** * * @param fair if true, waiting threads contend in FIFO order for * access; otherwise the order is unspecified. */ public SynchronousQueue(boolean fair) { ! transferer = fair ? new TransferQueue() : new TransferStack(); } /** * Adds the specified element to this queue, waiting if necessary for * another thread to receive it. --- 862,872 ---- * * @param fair if true, waiting threads contend in FIFO order for * access; otherwise the order is unspecified. */ public SynchronousQueue(boolean fair) { ! transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); } /** * Adds the specified element to this queue, waiting if necessary for * another thread to receive it.
*** 920,932 **** * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { ! Object e = transferer.transfer(null, false, 0); if (e != null) ! return (E)e; Thread.interrupted(); throw new InterruptedException(); } /** --- 921,933 ---- * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { ! E e = transferer.transfer(null, false, 0); if (e != null) ! return e; Thread.interrupted(); throw new InterruptedException(); } /**
*** 937,949 **** * @return the head of this queue, or <tt>null</tt> if the * specified waiting time elapses before an element is present. * @throws InterruptedException {@inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { ! Object e = transferer.transfer(null, true, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) ! return (E)e; throw new InterruptedException(); } /** * Retrieves and removes the head of this queue, if another thread --- 938,950 ---- * @return the head of this queue, or <tt>null</tt> if the * specified waiting time elapses before an element is present. * @throws InterruptedException {@inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { ! E e = transferer.transfer(null, true, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) ! return e; throw new InterruptedException(); } /** * Retrieves and removes the head of this queue, if another thread
*** 951,961 **** * * @return the head of this queue, or <tt>null</tt> if no * element is available. */ public E poll() { ! return (E)transferer.transfer(null, true, 0); } /** * Always returns <tt>true</tt>. * A <tt>SynchronousQueue</tt> has no internal capacity. --- 952,962 ---- * * @return the head of this queue, or <tt>null</tt> if no * element is available. */ public E poll() { ! return transferer.transfer(null, true, 0); } /** * Always returns <tt>true</tt>. * A <tt>SynchronousQueue</tt> has no internal capacity.
*** 1063,1076 **** * Returns an empty iterator in which <tt>hasNext</tt> always returns * <tt>false</tt>. * * @return an empty iterator */ public Iterator<E> iterator() { ! return Collections.emptyIterator(); } /** * Returns a zero-length array. * @return a zero-length array */ public Object[] toArray() { --- 1064,1088 ---- * Returns an empty iterator in which <tt>hasNext</tt> always returns * <tt>false</tt>. * * @return an empty iterator */ + @SuppressWarnings("unchecked") public Iterator<E> iterator() { ! return (Iterator<E>) EmptyIterator.EMPTY_ITERATOR; } + // Replicated from a previous version of Collections + private static class EmptyIterator<E> implements Iterator<E> { + static final EmptyIterator<Object> EMPTY_ITERATOR + = new EmptyIterator<Object>(); + + public boolean hasNext() { return false; } + public E next() { throw new NoSuchElementException(); } + public void remove() { throw new IllegalStateException(); } + } + /** * Returns a zero-length array. * @return a zero-length array */ public Object[] toArray() {
*** 1101,1112 **** if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); int n = 0; ! E e; ! while ( (e = poll()) != null) { c.add(e); ++n; } return n; } --- 1113,1123 ---- if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); int n = 0; ! for (E e; (e = poll()) != null;) { c.add(e); ++n; } return n; }
*** 1121,1132 **** if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); int n = 0; ! E e; ! while (n < maxElements && (e = poll()) != null) { c.add(e); ++n; } return n; } --- 1132,1142 ---- if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); int n = 0; ! for (E e; n < maxElements && (e = poll()) != null;) { c.add(e); ++n; } return n; }
*** 1137,1146 **** --- 1147,1157 ---- * that exist solely to enable serializability across versions. * These fields are never used, so are initialized only if this * object is ever serialized or deserialized. */ + @SuppressWarnings("serial") static class WaitQueue implements java.io.Serializable { } static class LifoWaitQueue extends WaitQueue { private static final long serialVersionUID = -3633113410248163686L; } static class FifoWaitQueue extends WaitQueue {
*** 1149,1159 **** private ReentrantLock qlock; private WaitQueue waitingProducers; private WaitQueue waitingConsumers; /** ! * Save the state to a stream (that is, serialize it). * * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { --- 1160,1170 ---- private ReentrantLock qlock; private WaitQueue waitingProducers; private WaitQueue waitingConsumers; /** ! * Saves the state to a stream (that is, serializes it). * * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException {
*** 1173,1185 **** private void readObject(final java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); if (waitingProducers instanceof FifoWaitQueue) ! transferer = new TransferQueue(); else ! transferer = new TransferStack(); } // Unsafe mechanics static long objectFieldOffset(sun.misc.Unsafe UNSAFE, String field, Class<?> klazz) { --- 1184,1196 ---- private void readObject(final java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); if (waitingProducers instanceof FifoWaitQueue) ! transferer = new TransferQueue<E>(); else ! transferer = new TransferStack<E>(); } // Unsafe mechanics static long objectFieldOffset(sun.misc.Unsafe UNSAFE, String field, Class<?> klazz) {