338 Node me = new Node(item); // Create in case occupying
339 int index = hashIndex(); // Index of current slot
340 int fails = 0; // Number of CAS failures
341
342 for (;;) {
343 Object y; // Contents of current slot
344 Slot slot = arena[index];
345 if (slot == null) // Lazily initialize slots
346 createSlot(index); // Continue loop to reread
347 else if ((y = slot.get()) != null && // Try to fulfill
348 slot.compareAndSet(y, null)) {
349 Node you = (Node)y; // Transfer item
350 if (you.compareAndSet(null, item)) {
351 LockSupport.unpark(you.waiter);
352 return you.item;
353 } // Else cancelled; continue
354 }
355 else if (y == null && // Try to occupy
356 slot.compareAndSet(null, me)) {
357 if (index == 0) // Blocking wait for slot 0
358 return timed? awaitNanos(me, slot, nanos): await(me, slot);
359 Object v = spinWait(me, slot); // Spin wait for non-0
360 if (v != CANCEL)
361 return v;
362 me = new Node(item); // Throw away cancelled node
363 int m = max.get();
364 if (m > (index >>>= 1)) // Decrease index
365 max.compareAndSet(m, m - 1); // Maybe shrink table
366 }
367 else if (++fails > 1) { // Allow 2 fails on 1st slot
368 int m = max.get();
369 if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
370 index = m + 1; // Grow on 3rd failed slot
371 else if (--index < 0)
372 index = m; // Circularly traverse
373 }
374 }
375 }
376
377 /**
378 * Returns a hash index for the current thread. Uses a one-step
580 */
581 public Exchanger() {
582 }
583
584 /**
585 * Waits for another thread to arrive at this exchange point (unless
586 * the current thread is {@linkplain Thread#interrupt interrupted}),
587 * and then transfers the given object to it, receiving its object
588 * in return.
589 *
590 * <p>If another thread is already waiting at the exchange point then
591 * it is resumed for thread scheduling purposes and receives the object
592 * passed in by the current thread. The current thread returns immediately,
593 * receiving the object passed to the exchange by that other thread.
594 *
595 * <p>If no other thread is already waiting at the exchange then the
596 * current thread is disabled for thread scheduling purposes and lies
597 * dormant until one of two things happens:
598 * <ul>
599 * <li>Some other thread enters the exchange; or
600 * <li>Some other thread {@linkplain Thread#interrupt interrupts} the current
601 * thread.
602 * </ul>
603 * <p>If the current thread:
604 * <ul>
605 * <li>has its interrupted status set on entry to this method; or
606 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
607 * for the exchange,
608 * </ul>
609 * then {@link InterruptedException} is thrown and the current thread's
610 * interrupted status is cleared.
611 *
612 * @param x the object to exchange
613 * @return the object provided by the other thread
614 * @throws InterruptedException if the current thread was
615 * interrupted while waiting
616 */
617 public V exchange(V x) throws InterruptedException {
618 if (!Thread.interrupted()) {
619 Object v = doExchange(x == null? NULL_ITEM : x, false, 0);
620 if (v == NULL_ITEM)
621 return null;
622 if (v != CANCEL)
623 return (V)v;
624 Thread.interrupted(); // Clear interrupt status on IE throw
625 }
626 throw new InterruptedException();
627 }
628
629 /**
630 * Waits for another thread to arrive at this exchange point (unless
631 * the current thread is {@linkplain Thread#interrupt interrupted} or
632 * the specified waiting time elapses), and then transfers the given
633 * object to it, receiving its object in return.
634 *
635 * <p>If another thread is already waiting at the exchange point then
636 * it is resumed for thread scheduling purposes and receives the object
637 * passed in by the current thread. The current thread returns immediately,
638 * receiving the object passed to the exchange by that other thread.
639 *
654 * </ul>
655 * then {@link InterruptedException} is thrown and the current thread's
656 * interrupted status is cleared.
657 *
658 * <p>If the specified waiting time elapses then {@link
659 * TimeoutException} is thrown. If the time is less than or equal
660 * to zero, the method will not wait at all.
661 *
662 * @param x the object to exchange
663 * @param timeout the maximum time to wait
664 * @param unit the time unit of the <tt>timeout</tt> argument
665 * @return the object provided by the other thread
666 * @throws InterruptedException if the current thread was
667 * interrupted while waiting
668 * @throws TimeoutException if the specified waiting time elapses
669 * before another thread enters the exchange
670 */
671 public V exchange(V x, long timeout, TimeUnit unit)
672 throws InterruptedException, TimeoutException {
673 if (!Thread.interrupted()) {
674 Object v = doExchange(x == null? NULL_ITEM : x,
675 true, unit.toNanos(timeout));
676 if (v == NULL_ITEM)
677 return null;
678 if (v != CANCEL)
679 return (V)v;
680 if (!Thread.interrupted())
681 throw new TimeoutException();
682 }
683 throw new InterruptedException();
684 }
685 }
|
338 Node me = new Node(item); // Create in case occupying
339 int index = hashIndex(); // Index of current slot
340 int fails = 0; // Number of CAS failures
341
342 for (;;) {
343 Object y; // Contents of current slot
344 Slot slot = arena[index];
345 if (slot == null) // Lazily initialize slots
346 createSlot(index); // Continue loop to reread
347 else if ((y = slot.get()) != null && // Try to fulfill
348 slot.compareAndSet(y, null)) {
349 Node you = (Node)y; // Transfer item
350 if (you.compareAndSet(null, item)) {
351 LockSupport.unpark(you.waiter);
352 return you.item;
353 } // Else cancelled; continue
354 }
355 else if (y == null && // Try to occupy
356 slot.compareAndSet(null, me)) {
357 if (index == 0) // Blocking wait for slot 0
358 return timed ?
359 awaitNanos(me, slot, nanos) :
360 await(me, slot);
361 Object v = spinWait(me, slot); // Spin wait for non-0
362 if (v != CANCEL)
363 return v;
364 me = new Node(item); // Throw away cancelled node
365 int m = max.get();
366 if (m > (index >>>= 1)) // Decrease index
367 max.compareAndSet(m, m - 1); // Maybe shrink table
368 }
369 else if (++fails > 1) { // Allow 2 fails on 1st slot
370 int m = max.get();
371 if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
372 index = m + 1; // Grow on 3rd failed slot
373 else if (--index < 0)
374 index = m; // Circularly traverse
375 }
376 }
377 }
378
379 /**
380 * Returns a hash index for the current thread. Uses a one-step
582 */
583 public Exchanger() {
584 }
585
586 /**
587 * Waits for another thread to arrive at this exchange point (unless
588 * the current thread is {@linkplain Thread#interrupt interrupted}),
589 * and then transfers the given object to it, receiving its object
590 * in return.
591 *
592 * <p>If another thread is already waiting at the exchange point then
593 * it is resumed for thread scheduling purposes and receives the object
594 * passed in by the current thread. The current thread returns immediately,
595 * receiving the object passed to the exchange by that other thread.
596 *
597 * <p>If no other thread is already waiting at the exchange then the
598 * current thread is disabled for thread scheduling purposes and lies
599 * dormant until one of two things happens:
600 * <ul>
601 * <li>Some other thread enters the exchange; or
602 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
603 * the current thread.
604 * </ul>
605 * <p>If the current thread:
606 * <ul>
607 * <li>has its interrupted status set on entry to this method; or
608 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
609 * for the exchange,
610 * </ul>
611 * then {@link InterruptedException} is thrown and the current thread's
612 * interrupted status is cleared.
613 *
614 * @param x the object to exchange
615 * @return the object provided by the other thread
616 * @throws InterruptedException if the current thread was
617 * interrupted while waiting
618 */
619 public V exchange(V x) throws InterruptedException {
620 if (!Thread.interrupted()) {
621 Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0);
622 if (v == NULL_ITEM)
623 return null;
624 if (v != CANCEL)
625 return (V)v;
626 Thread.interrupted(); // Clear interrupt status on IE throw
627 }
628 throw new InterruptedException();
629 }
630
631 /**
632 * Waits for another thread to arrive at this exchange point (unless
633 * the current thread is {@linkplain Thread#interrupt interrupted} or
634 * the specified waiting time elapses), and then transfers the given
635 * object to it, receiving its object in return.
636 *
637 * <p>If another thread is already waiting at the exchange point then
638 * it is resumed for thread scheduling purposes and receives the object
639 * passed in by the current thread. The current thread returns immediately,
640 * receiving the object passed to the exchange by that other thread.
641 *
656 * </ul>
657 * then {@link InterruptedException} is thrown and the current thread's
658 * interrupted status is cleared.
659 *
660 * <p>If the specified waiting time elapses then {@link
661 * TimeoutException} is thrown. If the time is less than or equal
662 * to zero, the method will not wait at all.
663 *
664 * @param x the object to exchange
665 * @param timeout the maximum time to wait
666 * @param unit the time unit of the <tt>timeout</tt> argument
667 * @return the object provided by the other thread
668 * @throws InterruptedException if the current thread was
669 * interrupted while waiting
670 * @throws TimeoutException if the specified waiting time elapses
671 * before another thread enters the exchange
672 */
673 public V exchange(V x, long timeout, TimeUnit unit)
674 throws InterruptedException, TimeoutException {
675 if (!Thread.interrupted()) {
676 Object v = doExchange((x == null) ? NULL_ITEM : x,
677 true, unit.toNanos(timeout));
678 if (v == NULL_ITEM)
679 return null;
680 if (v != CANCEL)
681 return (V)v;
682 if (!Thread.interrupted())
683 throw new TimeoutException();
684 }
685 throw new InterruptedException();
686 }
687 }
|