262 * interruption, timeout, or elapsed spin-waits. This value is 263 * placed in holes on cancellation, and used as a return value 264 * from waiting methods to indicate failure to set or get hole. 265 */ 266 private static final Object CANCEL = new Object(); 267 268 /** 269 * Value representing null arguments/returns from public 270 * methods. This disambiguates from internal requirement that 271 * holes start out as null to mean they are not yet set. 272 */ 273 private static final Object NULL_ITEM = new Object(); 274 275 /** 276 * Nodes hold partially exchanged data. This class 277 * opportunistically subclasses AtomicReference to represent the 278 * hole. So get() returns hole, and compareAndSet CAS'es value 279 * into hole. This class cannot be parameterized as "V" because 280 * of the use of non-V CANCEL sentinels. 281 */ 282 private static final class Node extends AtomicReference<Object> { 283 /** The element offered by the Thread creating this node. */ 284 public final Object item; 285 286 /** The Thread waiting to be signalled; null until waiting. */ 287 public volatile Thread waiter; 288 289 /** 290 * Creates node with given item and empty hole. 291 * @param item the item 292 */ 293 public Node(Object item) { 294 this.item = item; 295 } 296 } 297 298 /** 299 * A Slot is an AtomicReference with heuristic padding to lessen 300 * cache effects of this heavily CAS'ed location. While the 301 * padding adds noticeable space, all slots are created only on 302 * demand, and there will be more than one of them only when it 303 * would improve throughput more than enough to outweigh using 304 * extra space. 305 */ 306 private static final class Slot extends AtomicReference<Object> { 307 // Improve likelihood of isolation on <= 64 byte cache lines 308 long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; 309 } 310 311 /** 312 * Slot array. Elements are lazily initialized when needed. 313 * Declared volatile to enable double-checked lazy construction. 314 */ 315 private volatile Slot[] arena = new Slot[CAPACITY]; 316 317 /** 318 * The maximum slot index being used. The value sometimes 319 * increases when a thread experiences too many CAS contentions, 320 * and sometimes decreases when a spin-wait elapses. Changes 321 * are performed only via compareAndSet, to avoid stale values 322 * when a thread happens to stall right before setting. 323 */ 324 private final AtomicInteger max = new AtomicInteger(); 325 599 * dormant until one of two things happens: 600 * <ul> 601 * <li>Some other thread enters the exchange; or 602 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 603 * the current thread. 604 * </ul> 605 * <p>If the current thread: 606 * <ul> 607 * <li>has its interrupted status set on entry to this method; or 608 * <li>is {@linkplain Thread#interrupt interrupted} while waiting 609 * for the exchange, 610 * </ul> 611 * then {@link InterruptedException} is thrown and the current thread's 612 * interrupted status is cleared. 613 * 614 * @param x the object to exchange 615 * @return the object provided by the other thread 616 * @throws InterruptedException if the current thread was 617 * interrupted while waiting 618 */ 619 public V exchange(V x) throws InterruptedException { 620 if (!Thread.interrupted()) { 621 Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0); 622 if (v == NULL_ITEM) 623 return null; 624 if (v != CANCEL) 625 return (V)v; 626 Thread.interrupted(); // Clear interrupt status on IE throw 627 } 628 throw new InterruptedException(); 629 } 630 631 /** 632 * Waits for another thread to arrive at this exchange point (unless 633 * the current thread is {@linkplain Thread#interrupt interrupted} or 634 * the specified waiting time elapses), and then transfers the given 635 * object to it, receiving its object in return. 636 * 637 * <p>If another thread is already waiting at the exchange point then 638 * it is resumed for thread scheduling purposes and receives the object 639 * passed in by the current thread. The current thread returns immediately, 640 * receiving the object passed to the exchange by that other thread. 641 * 642 * <p>If no other thread is already waiting at the exchange then the 643 * current thread is disabled for thread scheduling purposes and lies 644 * dormant until one of three things happens: 645 * <ul> 653 * <li>has its interrupted status set on entry to this method; or 654 * <li>is {@linkplain Thread#interrupt interrupted} while waiting 655 * for the exchange, 656 * </ul> 657 * then {@link InterruptedException} is thrown and the current thread's 658 * interrupted status is cleared. 659 * 660 * <p>If the specified waiting time elapses then {@link 661 * TimeoutException} is thrown. If the time is less than or equal 662 * to zero, the method will not wait at all. 663 * 664 * @param x the object to exchange 665 * @param timeout the maximum time to wait 666 * @param unit the time unit of the <tt>timeout</tt> argument 667 * @return the object provided by the other thread 668 * @throws InterruptedException if the current thread was 669 * interrupted while waiting 670 * @throws TimeoutException if the specified waiting time elapses 671 * before another thread enters the exchange 672 */ 673 public V exchange(V x, long timeout, TimeUnit unit) 674 throws InterruptedException, TimeoutException { 675 if (!Thread.interrupted()) { 676 Object v = doExchange((x == null) ? NULL_ITEM : x, 677 true, unit.toNanos(timeout)); 678 if (v == NULL_ITEM) 679 return null; 680 if (v != CANCEL) 681 return (V)v; 682 if (!Thread.interrupted()) 683 throw new TimeoutException(); 684 } 685 throw new InterruptedException(); 686 } 687 } | 262 * interruption, timeout, or elapsed spin-waits. This value is 263 * placed in holes on cancellation, and used as a return value 264 * from waiting methods to indicate failure to set or get hole. 265 */ 266 private static final Object CANCEL = new Object(); 267 268 /** 269 * Value representing null arguments/returns from public 270 * methods. This disambiguates from internal requirement that 271 * holes start out as null to mean they are not yet set. 272 */ 273 private static final Object NULL_ITEM = new Object(); 274 275 /** 276 * Nodes hold partially exchanged data. This class 277 * opportunistically subclasses AtomicReference to represent the 278 * hole. So get() returns hole, and compareAndSet CAS'es value 279 * into hole. This class cannot be parameterized as "V" because 280 * of the use of non-V CANCEL sentinels. 281 */ 282 @SuppressWarnings("serial") 283 private static final class Node extends AtomicReference<Object> { 284 /** The element offered by the Thread creating this node. */ 285 public final Object item; 286 287 /** The Thread waiting to be signalled; null until waiting. */ 288 public volatile Thread waiter; 289 290 /** 291 * Creates node with given item and empty hole. 292 * @param item the item 293 */ 294 public Node(Object item) { 295 this.item = item; 296 } 297 } 298 299 /** 300 * A Slot is an AtomicReference with heuristic padding to lessen 301 * cache effects of this heavily CAS'ed location. While the 302 * padding adds noticeable space, all slots are created only on 303 * demand, and there will be more than one of them only when it 304 * would improve throughput more than enough to outweigh using 305 * extra space. 306 */ 307 @SuppressWarnings("serial") 308 private static final class Slot extends AtomicReference<Object> { 309 // Improve likelihood of isolation on <= 64 byte cache lines 310 long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; 311 } 312 313 /** 314 * Slot array. Elements are lazily initialized when needed. 315 * Declared volatile to enable double-checked lazy construction. 316 */ 317 private volatile Slot[] arena = new Slot[CAPACITY]; 318 319 /** 320 * The maximum slot index being used. The value sometimes 321 * increases when a thread experiences too many CAS contentions, 322 * and sometimes decreases when a spin-wait elapses. Changes 323 * are performed only via compareAndSet, to avoid stale values 324 * when a thread happens to stall right before setting. 325 */ 326 private final AtomicInteger max = new AtomicInteger(); 327 601 * dormant until one of two things happens: 602 * <ul> 603 * <li>Some other thread enters the exchange; or 604 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 605 * the current thread. 606 * </ul> 607 * <p>If the current thread: 608 * <ul> 609 * <li>has its interrupted status set on entry to this method; or 610 * <li>is {@linkplain Thread#interrupt interrupted} while waiting 611 * for the exchange, 612 * </ul> 613 * then {@link InterruptedException} is thrown and the current thread's 614 * interrupted status is cleared. 615 * 616 * @param x the object to exchange 617 * @return the object provided by the other thread 618 * @throws InterruptedException if the current thread was 619 * interrupted while waiting 620 */ 621 @SuppressWarnings("unchecked") 622 public V exchange(V x) throws InterruptedException { 623 if (!Thread.interrupted()) { 624 Object o = doExchange((x == null) ? NULL_ITEM : x, false, 0); 625 if (o == NULL_ITEM) 626 return null; 627 if (o != CANCEL) 628 return (V)o; 629 Thread.interrupted(); // Clear interrupt status on IE throw 630 } 631 throw new InterruptedException(); 632 } 633 634 /** 635 * Waits for another thread to arrive at this exchange point (unless 636 * the current thread is {@linkplain Thread#interrupt interrupted} or 637 * the specified waiting time elapses), and then transfers the given 638 * object to it, receiving its object in return. 639 * 640 * <p>If another thread is already waiting at the exchange point then 641 * it is resumed for thread scheduling purposes and receives the object 642 * passed in by the current thread. The current thread returns immediately, 643 * receiving the object passed to the exchange by that other thread. 644 * 645 * <p>If no other thread is already waiting at the exchange then the 646 * current thread is disabled for thread scheduling purposes and lies 647 * dormant until one of three things happens: 648 * <ul> 656 * <li>has its interrupted status set on entry to this method; or 657 * <li>is {@linkplain Thread#interrupt interrupted} while waiting 658 * for the exchange, 659 * </ul> 660 * then {@link InterruptedException} is thrown and the current thread's 661 * interrupted status is cleared. 662 * 663 * <p>If the specified waiting time elapses then {@link 664 * TimeoutException} is thrown. If the time is less than or equal 665 * to zero, the method will not wait at all. 666 * 667 * @param x the object to exchange 668 * @param timeout the maximum time to wait 669 * @param unit the time unit of the <tt>timeout</tt> argument 670 * @return the object provided by the other thread 671 * @throws InterruptedException if the current thread was 672 * interrupted while waiting 673 * @throws TimeoutException if the specified waiting time elapses 674 * before another thread enters the exchange 675 */ 676 @SuppressWarnings("unchecked") 677 public V exchange(V x, long timeout, TimeUnit unit) 678 throws InterruptedException, TimeoutException { 679 if (!Thread.interrupted()) { 680 Object o = doExchange((x == null) ? NULL_ITEM : x, 681 true, unit.toNanos(timeout)); 682 if (o == NULL_ITEM) 683 return null; 684 if (o != CANCEL) 685 return (V)o; 686 if (!Thread.interrupted()) 687 throw new TimeoutException(); 688 } 689 throw new InterruptedException(); 690 } 691 } |