src/share/classes/java/util/concurrent/Exchanger.java

Print this page




 262      * interruption, timeout, or elapsed spin-waits.  This value is
 263      * placed in holes on cancellation, and used as a return value
 264      * from waiting methods to indicate failure to set or get hole.
 265      */
 266     private static final Object CANCEL = new Object();
 267 
 268     /**
 269      * Value representing null arguments/returns from public
 270      * methods.  This disambiguates from internal requirement that
 271      * holes start out as null to mean they are not yet set.
 272      */
 273     private static final Object NULL_ITEM = new Object();
 274 
 275     /**
 276      * Nodes hold partially exchanged data.  This class
 277      * opportunistically subclasses AtomicReference to represent the
 278      * hole.  So get() returns hole, and compareAndSet CAS'es value
 279      * into hole.  This class cannot be parameterized as "V" because
 280      * of the use of non-V CANCEL sentinels.
 281      */

 282     private static final class Node extends AtomicReference<Object> {
 283         /** The element offered by the Thread creating this node. */
 284         public final Object item;
 285 
 286         /** The Thread waiting to be signalled; null until waiting. */
 287         public volatile Thread waiter;
 288 
 289         /**
 290          * Creates node with given item and empty hole.
 291          * @param item the item
 292          */
 293         public Node(Object item) {
 294             this.item = item;
 295         }
 296     }
 297 
 298     /**
 299      * A Slot is an AtomicReference with heuristic padding to lessen
 300      * cache effects of this heavily CAS'ed location.  While the
 301      * padding adds noticeable space, all slots are created only on
 302      * demand, and there will be more than one of them only when it
 303      * would improve throughput more than enough to outweigh using
 304      * extra space.
 305      */

 306     private static final class Slot extends AtomicReference<Object> {
 307         // Improve likelihood of isolation on <= 64 byte cache lines
 308         long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
 309     }
 310 
 311     /**
 312      * Slot array.  Elements are lazily initialized when needed.
 313      * Declared volatile to enable double-checked lazy construction.
 314      */
 315     private volatile Slot[] arena = new Slot[CAPACITY];
 316 
 317     /**
 318      * The maximum slot index being used.  The value sometimes
 319      * increases when a thread experiences too many CAS contentions,
 320      * and sometimes decreases when a spin-wait elapses.  Changes
 321      * are performed only via compareAndSet, to avoid stale values
 322      * when a thread happens to stall right before setting.
 323      */
 324     private final AtomicInteger max = new AtomicInteger();
 325 


 599      * dormant until one of two things happens:
 600      * <ul>
 601      * <li>Some other thread enters the exchange; or
 602      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 603      * the current thread.
 604      * </ul>
 605      * <p>If the current thread:
 606      * <ul>
 607      * <li>has its interrupted status set on entry to this method; or
 608      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 609      * for the exchange,
 610      * </ul>
 611      * then {@link InterruptedException} is thrown and the current thread's
 612      * interrupted status is cleared.
 613      *
 614      * @param x the object to exchange
 615      * @return the object provided by the other thread
 616      * @throws InterruptedException if the current thread was
 617      *         interrupted while waiting
 618      */

 619     public V exchange(V x) throws InterruptedException {
 620         if (!Thread.interrupted()) {
 621             Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0);
 622             if (v == NULL_ITEM)
 623                 return null;
 624             if (v != CANCEL)
 625                 return (V)v;
 626             Thread.interrupted(); // Clear interrupt status on IE throw
 627         }
 628         throw new InterruptedException();
 629     }
 630 
 631     /**
 632      * Waits for another thread to arrive at this exchange point (unless
 633      * the current thread is {@linkplain Thread#interrupt interrupted} or
 634      * the specified waiting time elapses), and then transfers the given
 635      * object to it, receiving its object in return.
 636      *
 637      * <p>If another thread is already waiting at the exchange point then
 638      * it is resumed for thread scheduling purposes and receives the object
 639      * passed in by the current thread.  The current thread returns immediately,
 640      * receiving the object passed to the exchange by that other thread.
 641      *
 642      * <p>If no other thread is already waiting at the exchange then the
 643      * current thread is disabled for thread scheduling purposes and lies
 644      * dormant until one of three things happens:
 645      * <ul>


 653      * <li>has its interrupted status set on entry to this method; or
 654      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 655      * for the exchange,
 656      * </ul>
 657      * then {@link InterruptedException} is thrown and the current thread's
 658      * interrupted status is cleared.
 659      *
 660      * <p>If the specified waiting time elapses then {@link
 661      * TimeoutException} is thrown.  If the time is less than or equal
 662      * to zero, the method will not wait at all.
 663      *
 664      * @param x the object to exchange
 665      * @param timeout the maximum time to wait
 666      * @param unit the time unit of the <tt>timeout</tt> argument
 667      * @return the object provided by the other thread
 668      * @throws InterruptedException if the current thread was
 669      *         interrupted while waiting
 670      * @throws TimeoutException if the specified waiting time elapses
 671      *         before another thread enters the exchange
 672      */

 673     public V exchange(V x, long timeout, TimeUnit unit)
 674         throws InterruptedException, TimeoutException {
 675         if (!Thread.interrupted()) {
 676             Object v = doExchange((x == null) ? NULL_ITEM : x,
 677                                   true, unit.toNanos(timeout));
 678             if (v == NULL_ITEM)
 679                 return null;
 680             if (v != CANCEL)
 681                 return (V)v;
 682             if (!Thread.interrupted())
 683                 throw new TimeoutException();
 684         }
 685         throw new InterruptedException();
 686     }
 687 }


 262      * interruption, timeout, or elapsed spin-waits.  This value is
 263      * placed in holes on cancellation, and used as a return value
 264      * from waiting methods to indicate failure to set or get hole.
 265      */
 266     private static final Object CANCEL = new Object();
 267 
 268     /**
 269      * Value representing null arguments/returns from public
 270      * methods.  This disambiguates from internal requirement that
 271      * holes start out as null to mean they are not yet set.
 272      */
 273     private static final Object NULL_ITEM = new Object();
 274 
 275     /**
 276      * Nodes hold partially exchanged data.  This class
 277      * opportunistically subclasses AtomicReference to represent the
 278      * hole.  So get() returns hole, and compareAndSet CAS'es value
 279      * into hole.  This class cannot be parameterized as "V" because
 280      * of the use of non-V CANCEL sentinels.
 281      */
 282     @SuppressWarnings("serial")
 283     private static final class Node extends AtomicReference<Object> {
 284         /** The element offered by the Thread creating this node. */
 285         public final Object item;
 286 
 287         /** The Thread waiting to be signalled; null until waiting. */
 288         public volatile Thread waiter;
 289 
 290         /**
 291          * Creates node with given item and empty hole.
 292          * @param item the item
 293          */
 294         public Node(Object item) {
 295             this.item = item;
 296         }
 297     }
 298 
 299     /**
 300      * A Slot is an AtomicReference with heuristic padding to lessen
 301      * cache effects of this heavily CAS'ed location.  While the
 302      * padding adds noticeable space, all slots are created only on
 303      * demand, and there will be more than one of them only when it
 304      * would improve throughput more than enough to outweigh using
 305      * extra space.
 306      */
 307     @SuppressWarnings("serial")
 308     private static final class Slot extends AtomicReference<Object> {
 309         // Improve likelihood of isolation on <= 64 byte cache lines
 310         long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
 311     }
 312 
 313     /**
 314      * Slot array.  Elements are lazily initialized when needed.
 315      * Declared volatile to enable double-checked lazy construction.
 316      */
 317     private volatile Slot[] arena = new Slot[CAPACITY];
 318 
 319     /**
 320      * The maximum slot index being used.  The value sometimes
 321      * increases when a thread experiences too many CAS contentions,
 322      * and sometimes decreases when a spin-wait elapses.  Changes
 323      * are performed only via compareAndSet, to avoid stale values
 324      * when a thread happens to stall right before setting.
 325      */
 326     private final AtomicInteger max = new AtomicInteger();
 327 


 601      * dormant until one of two things happens:
 602      * <ul>
 603      * <li>Some other thread enters the exchange; or
 604      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 605      * the current thread.
 606      * </ul>
 607      * <p>If the current thread:
 608      * <ul>
 609      * <li>has its interrupted status set on entry to this method; or
 610      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 611      * for the exchange,
 612      * </ul>
 613      * then {@link InterruptedException} is thrown and the current thread's
 614      * interrupted status is cleared.
 615      *
 616      * @param x the object to exchange
 617      * @return the object provided by the other thread
 618      * @throws InterruptedException if the current thread was
 619      *         interrupted while waiting
 620      */
 621     @SuppressWarnings("unchecked")
 622     public V exchange(V x) throws InterruptedException {
 623         if (!Thread.interrupted()) {
 624             Object o = doExchange((x == null) ? NULL_ITEM : x, false, 0);
 625             if (o == NULL_ITEM)
 626                 return null;
 627             if (o != CANCEL)
 628                 return (V)o;
 629             Thread.interrupted(); // Clear interrupt status on IE throw
 630         }
 631         throw new InterruptedException();
 632     }
 633 
 634     /**
 635      * Waits for another thread to arrive at this exchange point (unless
 636      * the current thread is {@linkplain Thread#interrupt interrupted} or
 637      * the specified waiting time elapses), and then transfers the given
 638      * object to it, receiving its object in return.
 639      *
 640      * <p>If another thread is already waiting at the exchange point then
 641      * it is resumed for thread scheduling purposes and receives the object
 642      * passed in by the current thread.  The current thread returns immediately,
 643      * receiving the object passed to the exchange by that other thread.
 644      *
 645      * <p>If no other thread is already waiting at the exchange then the
 646      * current thread is disabled for thread scheduling purposes and lies
 647      * dormant until one of three things happens:
 648      * <ul>


 656      * <li>has its interrupted status set on entry to this method; or
 657      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 658      * for the exchange,
 659      * </ul>
 660      * then {@link InterruptedException} is thrown and the current thread's
 661      * interrupted status is cleared.
 662      *
 663      * <p>If the specified waiting time elapses then {@link
 664      * TimeoutException} is thrown.  If the time is less than or equal
 665      * to zero, the method will not wait at all.
 666      *
 667      * @param x the object to exchange
 668      * @param timeout the maximum time to wait
 669      * @param unit the time unit of the <tt>timeout</tt> argument
 670      * @return the object provided by the other thread
 671      * @throws InterruptedException if the current thread was
 672      *         interrupted while waiting
 673      * @throws TimeoutException if the specified waiting time elapses
 674      *         before another thread enters the exchange
 675      */
 676     @SuppressWarnings("unchecked")
 677     public V exchange(V x, long timeout, TimeUnit unit)
 678         throws InterruptedException, TimeoutException {
 679         if (!Thread.interrupted()) {
 680             Object o = doExchange((x == null) ? NULL_ITEM : x,
 681                                   true, unit.toNanos(timeout));
 682             if (o == NULL_ITEM)
 683                 return null;
 684             if (o != CANCEL)
 685                 return (V)o;
 686             if (!Thread.interrupted())
 687                 throw new TimeoutException();
 688         }
 689         throw new InterruptedException();
 690     }
 691 }