src/share/classes/java/util/concurrent/Exchanger.java

Print this page




 338         Node me = new Node(item);                 // Create in case occupying
 339         int index = hashIndex();                  // Index of current slot
 340         int fails = 0;                            // Number of CAS failures
 341 
 342         for (;;) {
 343             Object y;                             // Contents of current slot
 344             Slot slot = arena[index];
 345             if (slot == null)                     // Lazily initialize slots
 346                 createSlot(index);                // Continue loop to reread
 347             else if ((y = slot.get()) != null &&  // Try to fulfill
 348                      slot.compareAndSet(y, null)) {
 349                 Node you = (Node)y;               // Transfer item
 350                 if (you.compareAndSet(null, item)) {
 351                     LockSupport.unpark(you.waiter);
 352                     return you.item;
 353                 }                                 // Else cancelled; continue
 354             }
 355             else if (y == null &&                 // Try to occupy
 356                      slot.compareAndSet(null, me)) {
 357                 if (index == 0)                   // Blocking wait for slot 0
 358                     return timed? awaitNanos(me, slot, nanos): await(me, slot);


 359                 Object v = spinWait(me, slot);    // Spin wait for non-0
 360                 if (v != CANCEL)
 361                     return v;
 362                 me = new Node(item);              // Throw away cancelled node
 363                 int m = max.get();
 364                 if (m > (index >>>= 1))           // Decrease index
 365                     max.compareAndSet(m, m - 1);  // Maybe shrink table
 366             }
 367             else if (++fails > 1) {               // Allow 2 fails on 1st slot
 368                 int m = max.get();
 369                 if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
 370                     index = m + 1;                // Grow on 3rd failed slot
 371                 else if (--index < 0)
 372                     index = m;                    // Circularly traverse
 373             }
 374         }
 375     }
 376 
 377     /**
 378      * Returns a hash index for the current thread.  Uses a one-step


 580      */
 581     public Exchanger() {
 582     }
 583 
 584     /**
 585      * Waits for another thread to arrive at this exchange point (unless
 586      * the current thread is {@linkplain Thread#interrupt interrupted}),
 587      * and then transfers the given object to it, receiving its object
 588      * in return.
 589      *
 590      * <p>If another thread is already waiting at the exchange point then
 591      * it is resumed for thread scheduling purposes and receives the object
 592      * passed in by the current thread.  The current thread returns immediately,
 593      * receiving the object passed to the exchange by that other thread.
 594      *
 595      * <p>If no other thread is already waiting at the exchange then the
 596      * current thread is disabled for thread scheduling purposes and lies
 597      * dormant until one of two things happens:
 598      * <ul>
 599      * <li>Some other thread enters the exchange; or
 600      * <li>Some other thread {@linkplain Thread#interrupt interrupts} the current
 601      * thread.
 602      * </ul>
 603      * <p>If the current thread:
 604      * <ul>
 605      * <li>has its interrupted status set on entry to this method; or
 606      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 607      * for the exchange,
 608      * </ul>
 609      * then {@link InterruptedException} is thrown and the current thread's
 610      * interrupted status is cleared.
 611      *
 612      * @param x the object to exchange
 613      * @return the object provided by the other thread
 614      * @throws InterruptedException if the current thread was
 615      *         interrupted while waiting
 616      */
 617     public V exchange(V x) throws InterruptedException {
 618         if (!Thread.interrupted()) {
 619             Object v = doExchange(x == null? NULL_ITEM : x, false, 0);
 620             if (v == NULL_ITEM)
 621                 return null;
 622             if (v != CANCEL)
 623                 return (V)v;
 624             Thread.interrupted(); // Clear interrupt status on IE throw
 625         }
 626         throw new InterruptedException();
 627     }
 628 
 629     /**
 630      * Waits for another thread to arrive at this exchange point (unless
 631      * the current thread is {@linkplain Thread#interrupt interrupted} or
 632      * the specified waiting time elapses), and then transfers the given
 633      * object to it, receiving its object in return.
 634      *
 635      * <p>If another thread is already waiting at the exchange point then
 636      * it is resumed for thread scheduling purposes and receives the object
 637      * passed in by the current thread.  The current thread returns immediately,
 638      * receiving the object passed to the exchange by that other thread.
 639      *


 654      * </ul>
 655      * then {@link InterruptedException} is thrown and the current thread's
 656      * interrupted status is cleared.
 657      *
 658      * <p>If the specified waiting time elapses then {@link
 659      * TimeoutException} is thrown.  If the time is less than or equal
 660      * to zero, the method will not wait at all.
 661      *
 662      * @param x the object to exchange
 663      * @param timeout the maximum time to wait
 664      * @param unit the time unit of the <tt>timeout</tt> argument
 665      * @return the object provided by the other thread
 666      * @throws InterruptedException if the current thread was
 667      *         interrupted while waiting
 668      * @throws TimeoutException if the specified waiting time elapses
 669      *         before another thread enters the exchange
 670      */
 671     public V exchange(V x, long timeout, TimeUnit unit)
 672         throws InterruptedException, TimeoutException {
 673         if (!Thread.interrupted()) {
 674             Object v = doExchange(x == null? NULL_ITEM : x,
 675                                   true, unit.toNanos(timeout));
 676             if (v == NULL_ITEM)
 677                 return null;
 678             if (v != CANCEL)
 679                 return (V)v;
 680             if (!Thread.interrupted())
 681                 throw new TimeoutException();
 682         }
 683         throw new InterruptedException();
 684     }
 685 }


 338         Node me = new Node(item);                 // Create in case occupying
 339         int index = hashIndex();                  // Index of current slot
 340         int fails = 0;                            // Number of CAS failures
 341 
 342         for (;;) {
 343             Object y;                             // Contents of current slot
 344             Slot slot = arena[index];
 345             if (slot == null)                     // Lazily initialize slots
 346                 createSlot(index);                // Continue loop to reread
 347             else if ((y = slot.get()) != null &&  // Try to fulfill
 348                      slot.compareAndSet(y, null)) {
 349                 Node you = (Node)y;               // Transfer item
 350                 if (you.compareAndSet(null, item)) {
 351                     LockSupport.unpark(you.waiter);
 352                     return you.item;
 353                 }                                 // Else cancelled; continue
 354             }
 355             else if (y == null &&                 // Try to occupy
 356                      slot.compareAndSet(null, me)) {
 357                 if (index == 0)                   // Blocking wait for slot 0
 358                     return timed ?
 359                         awaitNanos(me, slot, nanos) :
 360                         await(me, slot);
 361                 Object v = spinWait(me, slot);    // Spin wait for non-0
 362                 if (v != CANCEL)
 363                     return v;
 364                 me = new Node(item);              // Throw away cancelled node
 365                 int m = max.get();
 366                 if (m > (index >>>= 1))           // Decrease index
 367                     max.compareAndSet(m, m - 1);  // Maybe shrink table
 368             }
 369             else if (++fails > 1) {               // Allow 2 fails on 1st slot
 370                 int m = max.get();
 371                 if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
 372                     index = m + 1;                // Grow on 3rd failed slot
 373                 else if (--index < 0)
 374                     index = m;                    // Circularly traverse
 375             }
 376         }
 377     }
 378 
 379     /**
 380      * Returns a hash index for the current thread.  Uses a one-step


 582      */
 583     public Exchanger() {
 584     }
 585 
 586     /**
 587      * Waits for another thread to arrive at this exchange point (unless
 588      * the current thread is {@linkplain Thread#interrupt interrupted}),
 589      * and then transfers the given object to it, receiving its object
 590      * in return.
 591      *
 592      * <p>If another thread is already waiting at the exchange point then
 593      * it is resumed for thread scheduling purposes and receives the object
 594      * passed in by the current thread.  The current thread returns immediately,
 595      * receiving the object passed to the exchange by that other thread.
 596      *
 597      * <p>If no other thread is already waiting at the exchange then the
 598      * current thread is disabled for thread scheduling purposes and lies
 599      * dormant until one of two things happens:
 600      * <ul>
 601      * <li>Some other thread enters the exchange; or
 602      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 603      * the current thread.
 604      * </ul>
 605      * <p>If the current thread:
 606      * <ul>
 607      * <li>has its interrupted status set on entry to this method; or
 608      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 609      * for the exchange,
 610      * </ul>
 611      * then {@link InterruptedException} is thrown and the current thread's
 612      * interrupted status is cleared.
 613      *
 614      * @param x the object to exchange
 615      * @return the object provided by the other thread
 616      * @throws InterruptedException if the current thread was
 617      *         interrupted while waiting
 618      */
 619     public V exchange(V x) throws InterruptedException {
 620         if (!Thread.interrupted()) {
 621             Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0);
 622             if (v == NULL_ITEM)
 623                 return null;
 624             if (v != CANCEL)
 625                 return (V)v;
 626             Thread.interrupted(); // Clear interrupt status on IE throw
 627         }
 628         throw new InterruptedException();
 629     }
 630 
 631     /**
 632      * Waits for another thread to arrive at this exchange point (unless
 633      * the current thread is {@linkplain Thread#interrupt interrupted} or
 634      * the specified waiting time elapses), and then transfers the given
 635      * object to it, receiving its object in return.
 636      *
 637      * <p>If another thread is already waiting at the exchange point then
 638      * it is resumed for thread scheduling purposes and receives the object
 639      * passed in by the current thread.  The current thread returns immediately,
 640      * receiving the object passed to the exchange by that other thread.
 641      *


 656      * </ul>
 657      * then {@link InterruptedException} is thrown and the current thread's
 658      * interrupted status is cleared.
 659      *
 660      * <p>If the specified waiting time elapses then {@link
 661      * TimeoutException} is thrown.  If the time is less than or equal
 662      * to zero, the method will not wait at all.
 663      *
 664      * @param x the object to exchange
 665      * @param timeout the maximum time to wait
 666      * @param unit the time unit of the <tt>timeout</tt> argument
 667      * @return the object provided by the other thread
 668      * @throws InterruptedException if the current thread was
 669      *         interrupted while waiting
 670      * @throws TimeoutException if the specified waiting time elapses
 671      *         before another thread enters the exchange
 672      */
 673     public V exchange(V x, long timeout, TimeUnit unit)
 674         throws InterruptedException, TimeoutException {
 675         if (!Thread.interrupted()) {
 676             Object v = doExchange((x == null) ? NULL_ITEM : x,
 677                                   true, unit.toNanos(timeout));
 678             if (v == NULL_ITEM)
 679                 return null;
 680             if (v != CANCEL)
 681                 return (V)v;
 682             if (!Thread.interrupted())
 683                 throw new TimeoutException();
 684         }
 685         throw new InterruptedException();
 686     }
 687 }