1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea, Bill Scherer, and Michael Scott with
  32  * assistance from members of JCP JSR-166 Expert Group and released to
  33  * the public domain, as explained at
  34  * http://creativecommons.org/publicdomain/zero/1.0/
  35  */
  36 
  37 package java.util.concurrent;
  38 
  39 /**
  40  * A synchronization point at which threads can pair and swap elements
  41  * within pairs.  Each thread presents some object on entry to the
  42  * {@link #exchange exchange} method, matches with a partner thread,
  43  * and receives its partner's object on return.  An Exchanger may be
  44  * viewed as a bidirectional form of a {@link SynchronousQueue}.
  45  * Exchangers may be useful in applications such as genetic algorithms
  46  * and pipeline designs.
  47  *
  48  * <p><b>Sample Usage:</b>
  49  * Here are the highlights of a class that uses an {@code Exchanger}
  50  * to swap buffers between threads so that the thread filling the
  51  * buffer gets a freshly emptied one when it needs it, handing off the
  52  * filled one to the thread emptying the buffer.
  53  * <pre> {@code
  54  * class FillAndEmpty {
  55  *   Exchanger<DataBuffer> exchanger = new Exchanger<>();
  56  *   DataBuffer initialEmptyBuffer = ... a made-up type
  57  *   DataBuffer initialFullBuffer = ...
  58  *
  59  *   class FillingLoop implements Runnable {
  60  *     public void run() {
  61  *       DataBuffer currentBuffer = initialEmptyBuffer;
  62  *       try {
  63  *         while (currentBuffer != null) {
  64  *           addToBuffer(currentBuffer);
  65  *           if (currentBuffer.isFull())
  66  *             currentBuffer = exchanger.exchange(currentBuffer);
  67  *         }
  68  *       } catch (InterruptedException ex) { ... handle ... }
  69  *     }
  70  *   }
  71  *
  72  *   class EmptyingLoop implements Runnable {
  73  *     public void run() {
  74  *       DataBuffer currentBuffer = initialFullBuffer;
  75  *       try {
  76  *         while (currentBuffer != null) {
  77  *           takeFromBuffer(currentBuffer);
  78  *           if (currentBuffer.isEmpty())
  79  *             currentBuffer = exchanger.exchange(currentBuffer);
  80  *         }
  81  *       } catch (InterruptedException ex) { ... handle ...}
  82  *     }
  83  *   }
  84  *
  85  *   void start() {
  86  *     new Thread(new FillingLoop()).start();
  87  *     new Thread(new EmptyingLoop()).start();
  88  *   }
  89  * }}</pre>
  90  *
  91  * <p>Memory consistency effects: For each pair of threads that
  92  * successfully exchange objects via an {@code Exchanger}, actions
  93  * prior to the {@code exchange()} in each thread
  94  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
  95  * those subsequent to a return from the corresponding {@code exchange()}
  96  * in the other thread.
  97  *
  98  * @since 1.5
  99  * @author Doug Lea and Bill Scherer and Michael Scott
 100  * @param <V> The type of objects that may be exchanged
 101  */
 102 public class Exchanger<V> {
 103 
 104     /*
 105      * Overview: The core algorithm is, for an exchange "slot",
 106      * and a participant (caller) with an item:
 107      *
 108      * for (;;) {
 109      *   if (slot is empty) {                       // offer
 110      *     place item in a Node;
 111      *     if (can CAS slot from empty to node) {
 112      *       wait for release;
 113      *       return matching item in node;
 114      *     }
 115      *   }
 116      *   else if (can CAS slot from node to empty) { // release
 117      *     get the item in node;
 118      *     set matching item in node;
 119      *     release waiting thread;
 120      *   }
 121      *   // else retry on CAS failure
 122      * }
 123      *
 124      * This is among the simplest forms of a "dual data structure" --
 125      * see Scott and Scherer's DISC 04 paper and
 126      * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
 127      *
 128      * This works great in principle. But in practice, like many
 129      * algorithms centered on atomic updates to a single location, it
 130      * scales horribly when there are more than a few participants
 131      * using the same Exchanger. So the implementation instead uses a
 132      * form of elimination arena, that spreads out this contention by
 133      * arranging that some threads typically use different slots,
 134      * while still ensuring that eventually, any two parties will be
 135      * able to exchange items. That is, we cannot completely partition
 136      * across threads, but instead give threads arena indices that
 137      * will on average grow under contention and shrink under lack of
 138      * contention. We approach this by defining the Nodes that we need
 139      * anyway as ThreadLocals, and include in them per-thread index
 140      * and related bookkeeping state. (We can safely reuse per-thread
 141      * nodes rather than creating them fresh each time because slots
 142      * alternate between pointing to a node vs null, so cannot
 143      * encounter ABA problems. However, we do need some care in
 144      * resetting them between uses.)
 145      *
 146      * Implementing an effective arena requires allocating a bunch of
 147      * space, so we only do so upon detecting contention (except on
 148      * uniprocessors, where they wouldn't help, so aren't used).
 149      * Otherwise, exchanges use the single-slot slotExchange method.
 150      * On contention, not only must the slots be in different
 151      * locations, but the locations must not encounter memory
 152      * contention due to being on the same cache line (or more
 153      * generally, the same coherence unit).  Because, as of this
 154      * writing, there is no way to determine cacheline size, we define
 155      * a value that is enough for common platforms.  Additionally,
 156      * extra care elsewhere is taken to avoid other false/unintended
 157      * sharing and to enhance locality, including adding padding (via
 158      * sun.misc.Contended) to Nodes, embedding "bound" as an Exchanger
 159      * field, and reworking some park/unpark mechanics compared to
 160      * LockSupport versions.
 161      *
 162      * The arena starts out with only one used slot. We expand the
 163      * effective arena size by tracking collisions; i.e., failed CASes
 164      * while trying to exchange. By nature of the above algorithm, the
 165      * only kinds of collision that reliably indicate contention are
 166      * when two attempted releases collide -- one of two attempted
 167      * offers can legitimately fail to CAS without indicating
 168      * contention by more than one other thread. (Note: it is possible
 169      * but not worthwhile to more precisely detect contention by
 170      * reading slot values after CAS failures.)  When a thread has
 171      * collided at each slot within the current arena bound, it tries
 172      * to expand the arena size by one. We track collisions within
 173      * bounds by using a version (sequence) number on the "bound"
 174      * field, and conservatively reset collision counts when a
 175      * participant notices that bound has been updated (in either
 176      * direction).
 177      *
 178      * The effective arena size is reduced (when there is more than
 179      * one slot) by giving up on waiting after a while and trying to
 180      * decrement the arena size on expiration. The value of "a while"
 181      * is an empirical matter.  We implement by piggybacking on the
 182      * use of spin->yield->block that is essential for reasonable
 183      * waiting performance anyway -- in a busy exchanger, offers are
 184      * usually almost immediately released, in which case context
 185      * switching on multiprocessors is extremely slow/wasteful.  Arena
 186      * waits just omit the blocking part, and instead cancel. The spin
 187      * count is empirically chosen to be a value that avoids blocking
 188      * 99% of the time under maximum sustained exchange rates on a
 189      * range of test machines. Spins and yields entail some limited
 190      * randomness (using a cheap xorshift) to avoid regular patterns
 191      * that can induce unproductive grow/shrink cycles. (Using a
 192      * pseudorandom also helps regularize spin cycle duration by
 193      * making branches unpredictable.)  Also, during an offer, a
 194      * waiter can "know" that it will be released when its slot has
 195      * changed, but cannot yet proceed until match is set.  In the
 196      * mean time it cannot cancel the offer, so instead spins/yields.
 197      * Note: It is possible to avoid this secondary check by changing
 198      * the linearization point to be a CAS of the match field (as done
 199      * in one case in the Scott & Scherer DISC paper), which also
 200      * increases asynchrony a bit, at the expense of poorer collision
 201      * detection and inability to always reuse per-thread nodes. So
 202      * the current scheme is typically a better tradeoff.
 203      *
 204      * On collisions, indices traverse the arena cyclically in reverse
 205      * order, restarting at the maximum index (which will tend to be
 206      * sparsest) when bounds change. (On expirations, indices instead
 207      * are halved until reaching 0.) It is possible (and has been
 208      * tried) to use randomized, prime-value-stepped, or double-hash
 209      * style traversal instead of simple cyclic traversal to reduce
 210      * bunching.  But empirically, whatever benefits these may have
 211      * don't overcome their added overhead: We are managing operations
 212      * that occur very quickly unless there is sustained contention,
 213      * so simpler/faster control policies work better than more
 214      * accurate but slower ones.
 215      *
 216      * Because we use expiration for arena size control, we cannot
 217      * throw TimeoutExceptions in the timed version of the public
 218      * exchange method until the arena size has shrunken to zero (or
 219      * the arena isn't enabled). This may delay response to timeout
 220      * but is still within spec.
 221      *
 222      * Essentially all of the implementation is in methods
 223      * slotExchange and arenaExchange. These have similar overall
 224      * structure, but differ in too many details to combine. The
 225      * slotExchange method uses the single Exchanger field "slot"
 226      * rather than arena array elements. However, it still needs
 227      * minimal collision detection to trigger arena construction.
 228      * (The messiest part is making sure interrupt status and
 229      * InterruptedExceptions come out right during transitions when
 230      * both methods may be called. This is done by using null return
 231      * as a sentinel to recheck interrupt status.)
 232      *
 233      * As is too common in this sort of code, methods are monolithic
 234      * because most of the logic relies on reads of fields that are
 235      * maintained as local variables so can't be nicely factored --
 236      * mainly, here, bulky spin->yield->block/cancel code), and
 237      * heavily dependent on intrinsics (Unsafe) to use inlined
 238      * embedded CAS and related memory access operations (that tend
 239      * not to be as readily inlined by dynamic compilers when they are
 240      * hidden behind other methods that would more nicely name and
 241      * encapsulate the intended effects). This includes the use of
 242      * putOrderedX to clear fields of the per-thread Nodes between
 243      * uses. Note that field Node.item is not declared as volatile
 244      * even though it is read by releasing threads, because they only
 245      * do so after CAS operations that must precede access, and all
 246      * uses by the owning thread are otherwise acceptably ordered by
 247      * other operations. (Because the actual points of atomicity are
 248      * slot CASes, it would also be legal for the write to Node.match
 249      * in a release to be weaker than a full volatile write. However,
 250      * this is not done because it could allow further postponement of
 251      * the write, delaying progress.)
 252      */
 253 
 254     /**
 255      * The byte distance (as a shift value) between any two used slots
 256      * in the arena.  1 << ASHIFT should be at least cacheline size.
 257      */
 258     private static final int ASHIFT = 7;
 259 
 260     /**
 261      * The maximum supported arena index. The maximum allocatable
 262      * arena size is MMASK + 1. Must be a power of two minus one, less
 263      * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices
 264      * for the expected scaling limits of the main algorithms.
 265      */
 266     private static final int MMASK = 0xff;
 267 
 268     /**
 269      * Unit for sequence/version bits of bound field. Each successful
 270      * change to the bound also adds SEQ.
 271      */
 272     private static final int SEQ = MMASK + 1;
 273 
 274     /** The number of CPUs, for sizing and spin control */
 275     private static final int NCPU = Runtime.getRuntime().availableProcessors();
 276 
 277     /**
 278      * The maximum slot index of the arena: The number of slots that
 279      * can in principle hold all threads without contention, or at
 280      * most the maximum indexable value.
 281      */
 282     static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
 283 
 284     /**
 285      * The bound for spins while waiting for a match. The actual
 286      * number of iterations will on average be about twice this value
 287      * due to randomization. Note: Spinning is disabled when NCPU==1.
 288      */
 289     private static final int SPINS = 1 << 10;
 290 
 291     /**
 292      * Value representing null arguments/returns from public
 293      * methods. Needed because the API originally didn't disallow null
 294      * arguments, which it should have.
 295      */
 296     private static final Object NULL_ITEM = new Object();
 297 
 298     /**
 299      * Sentinel value returned by internal exchange methods upon
 300      * timeout, to avoid need for separate timed versions of these
 301      * methods.
 302      */
 303     private static final Object TIMED_OUT = new Object();
 304 
 305     /**
 306      * Nodes hold partially exchanged data, plus other per-thread
 307      * bookkeeping. Padded via @sun.misc.Contended to reduce memory
 308      * contention.
 309      */
 310     @sun.misc.Contended static final class Node {
 311         int index;              // Arena index
 312         int bound;              // Last recorded value of Exchanger.bound
 313         int collides;           // Number of CAS failures at current bound
 314         int hash;               // Pseudo-random for spins
 315         Object item;            // This thread's current item
 316         volatile Object match;  // Item provided by releasing thread
 317         volatile Thread parked; // Set to this thread when parked, else null
 318     }
 319 
 320     /** The corresponding thread local class */
 321     static final class Participant extends ThreadLocal<Node> {
 322         public Node initialValue() { return new Node(); }
 323     }
 324 
 325     /**
 326      * Per-thread state.
 327      */
 328     private final Participant participant;
 329 
 330     /**
 331      * Elimination array; null until enabled (within slotExchange).
 332      * Element accesses use emulation of volatile gets and CAS.
 333      */
 334     private volatile Node[] arena;
 335 
 336     /**
 337      * Slot used until contention detected.
 338      */
 339     private volatile Node slot;
 340 
 341     /**
 342      * The index of the largest valid arena position, OR'ed with SEQ
 343      * number in high bits, incremented on each update.  The initial
 344      * update from 0 to SEQ is used to ensure that the arena array is
 345      * constructed only once.
 346      */
 347     private volatile int bound;
 348 
 349     /**
 350      * Exchange function when arenas enabled. See above for explanation.
 351      *
 352      * @param item the (non-null) item to exchange
 353      * @param timed true if the wait is timed
 354      * @param ns if timed, the maximum wait time, else 0L
 355      * @return the other thread's item; or null if interrupted; or
 356      * TIMED_OUT if timed and timed out
 357      */
 358     private final Object arenaExchange(Object item, boolean timed, long ns) {
 359         Node[] a = arena;
 360         Node p = participant.get();
 361         for (int i = p.index;;) {                      // access slot at i
 362             int b, m, c; long j;                       // j is raw array offset
 363             Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
 364             if (q != null && U.compareAndSwapObject(a, j, q, null)) {
 365                 Object v = q.item;                     // release
 366                 q.match = item;
 367                 Thread w = q.parked;
 368                 if (w != null)
 369                     U.unpark(w);
 370                 return v;
 371             }
 372             else if (i <= (m = (b = bound) & MMASK) && q == null) {
 373                 p.item = item;                         // offer
 374                 if (U.compareAndSwapObject(a, j, null, p)) {
 375                     long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
 376                     Thread t = Thread.currentThread(); // wait
 377                     for (int h = p.hash, spins = SPINS;;) {
 378                         Object v = p.match;
 379                         if (v != null) {
 380                             U.putOrderedObject(p, MATCH, null);
 381                             p.item = null;             // clear for next use
 382                             p.hash = h;
 383                             return v;
 384                         }
 385                         else if (spins > 0) {
 386                             h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
 387                             if (h == 0)                // initialize hash
 388                                 h = SPINS | (int)t.getId();
 389                             else if (h < 0 &&          // approx 50% true
 390                                      (--spins & ((SPINS >>> 1) - 1)) == 0)
 391                                 Thread.yield();        // two yields per wait
 392                         }
 393                         else if (U.getObjectVolatile(a, j) != p)
 394                             spins = SPINS;       // releaser hasn't set match yet
 395                         else if (!t.isInterrupted() && m == 0 &&
 396                                  (!timed ||
 397                                   (ns = end - System.nanoTime()) > 0L)) {
 398                             U.putObject(t, BLOCKER, this); // emulate LockSupport
 399                             p.parked = t;              // minimize window
 400                             if (U.getObjectVolatile(a, j) == p)
 401                                 U.park(false, ns);
 402                             p.parked = null;
 403                             U.putObject(t, BLOCKER, null);
 404                         }
 405                         else if (U.getObjectVolatile(a, j) == p &&
 406                                  U.compareAndSwapObject(a, j, p, null)) {
 407                             if (m != 0)                // try to shrink
 408                                 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
 409                             p.item = null;
 410                             p.hash = h;
 411                             i = p.index >>>= 1;        // descend
 412                             if (Thread.interrupted())
 413                                 return null;
 414                             if (timed && m == 0 && ns <= 0L)
 415                                 return TIMED_OUT;
 416                             break;                     // expired; restart
 417                         }
 418                     }
 419                 }
 420                 else
 421                     p.item = null;                     // clear offer
 422             }
 423             else {
 424                 if (p.bound != b) {                    // stale; reset
 425                     p.bound = b;
 426                     p.collides = 0;
 427                     i = (i != m || m == 0) ? m : m - 1;
 428                 }
 429                 else if ((c = p.collides) < m || m == FULL ||
 430                          !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
 431                     p.collides = c + 1;
 432                     i = (i == 0) ? m : i - 1;          // cyclically traverse
 433                 }
 434                 else
 435                     i = m + 1;                         // grow
 436                 p.index = i;
 437             }
 438         }
 439     }
 440 
 441     /**
 442      * Exchange function used until arenas enabled. See above for explanation.
 443      *
 444      * @param item the item to exchange
 445      * @param timed true if the wait is timed
 446      * @param ns if timed, the maximum wait time, else 0L
 447      * @return the other thread's item; or null if either the arena
 448      * was enabled or the thread was interrupted before completion; or
 449      * TIMED_OUT if timed and timed out
 450      */
 451     private final Object slotExchange(Object item, boolean timed, long ns) {
 452         Node p = participant.get();
 453         Thread t = Thread.currentThread();
 454         if (t.isInterrupted()) // preserve interrupt status so caller can recheck
 455             return null;
 456 
 457         for (Node q;;) {
 458             if ((q = slot) != null) {
 459                 if (U.compareAndSwapObject(this, SLOT, q, null)) {
 460                     Object v = q.item;
 461                     q.match = item;
 462                     Thread w = q.parked;
 463                     if (w != null)
 464                         U.unpark(w);
 465                     return v;
 466                 }
 467                 // create arena on contention, but continue until slot null
 468                 if (NCPU > 1 && bound == 0 &&
 469                     U.compareAndSwapInt(this, BOUND, 0, SEQ))
 470                     arena = new Node[(FULL + 2) << ASHIFT];
 471             }
 472             else if (arena != null)
 473                 return null; // caller must reroute to arenaExchange
 474             else {
 475                 p.item = item;
 476                 if (U.compareAndSwapObject(this, SLOT, null, p))
 477                     break;
 478                 p.item = null;
 479             }
 480         }
 481 
 482         // await release
 483         int h = p.hash;
 484         long end = timed ? System.nanoTime() + ns : 0L;
 485         int spins = (NCPU > 1) ? SPINS : 1;
 486         Object v;
 487         while ((v = p.match) == null) {
 488             if (spins > 0) {
 489                 h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
 490                 if (h == 0)
 491                     h = SPINS | (int)t.getId();
 492                 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
 493                     Thread.yield();
 494             }
 495             else if (slot != p)
 496                 spins = SPINS;
 497             else if (!t.isInterrupted() && arena == null &&
 498                      (!timed || (ns = end - System.nanoTime()) > 0L)) {
 499                 U.putObject(t, BLOCKER, this);
 500                 p.parked = t;
 501                 if (slot == p)
 502                     U.park(false, ns);
 503                 p.parked = null;
 504                 U.putObject(t, BLOCKER, null);
 505             }
 506             else if (U.compareAndSwapObject(this, SLOT, p, null)) {
 507                 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
 508                 break;
 509             }
 510         }
 511         U.putOrderedObject(p, MATCH, null);
 512         p.item = null;
 513         p.hash = h;
 514         return v;
 515     }
 516 
 517     /**
 518      * Creates a new Exchanger.
 519      */
 520     public Exchanger() {
 521         participant = new Participant();
 522     }
 523 
 524     /**
 525      * Waits for another thread to arrive at this exchange point (unless
 526      * the current thread is {@linkplain Thread#interrupt interrupted}),
 527      * and then transfers the given object to it, receiving its object
 528      * in return.
 529      *
 530      * <p>If another thread is already waiting at the exchange point then
 531      * it is resumed for thread scheduling purposes and receives the object
 532      * passed in by the current thread.  The current thread returns immediately,
 533      * receiving the object passed to the exchange by that other thread.
 534      *
 535      * <p>If no other thread is already waiting at the exchange then the
 536      * current thread is disabled for thread scheduling purposes and lies
 537      * dormant until one of two things happens:
 538      * <ul>
 539      * <li>Some other thread enters the exchange; or
 540      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 541      * the current thread.
 542      * </ul>
 543      * <p>If the current thread:
 544      * <ul>
 545      * <li>has its interrupted status set on entry to this method; or
 546      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 547      * for the exchange,
 548      * </ul>
 549      * then {@link InterruptedException} is thrown and the current thread's
 550      * interrupted status is cleared.
 551      *
 552      * @param x the object to exchange
 553      * @return the object provided by the other thread
 554      * @throws InterruptedException if the current thread was
 555      *         interrupted while waiting
 556      */
 557     @SuppressWarnings("unchecked")
 558     public V exchange(V x) throws InterruptedException {
 559         Object v;
 560         Object item = (x == null) ? NULL_ITEM : x; // translate null args
 561         if ((arena != null ||
 562              (v = slotExchange(item, false, 0L)) == null) &&
 563             ((Thread.interrupted() || // disambiguates null return
 564               (v = arenaExchange(item, false, 0L)) == null)))
 565             throw new InterruptedException();
 566         return (v == NULL_ITEM) ? null : (V)v;
 567     }
 568 
 569     /**
 570      * Waits for another thread to arrive at this exchange point (unless
 571      * the current thread is {@linkplain Thread#interrupt interrupted} or
 572      * the specified waiting time elapses), and then transfers the given
 573      * object to it, receiving its object in return.
 574      *
 575      * <p>If another thread is already waiting at the exchange point then
 576      * it is resumed for thread scheduling purposes and receives the object
 577      * passed in by the current thread.  The current thread returns immediately,
 578      * receiving the object passed to the exchange by that other thread.
 579      *
 580      * <p>If no other thread is already waiting at the exchange then the
 581      * current thread is disabled for thread scheduling purposes and lies
 582      * dormant until one of three things happens:
 583      * <ul>
 584      * <li>Some other thread enters the exchange; or
 585      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
 586      * the current thread; or
 587      * <li>The specified waiting time elapses.
 588      * </ul>
 589      * <p>If the current thread:
 590      * <ul>
 591      * <li>has its interrupted status set on entry to this method; or
 592      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 593      * for the exchange,
 594      * </ul>
 595      * then {@link InterruptedException} is thrown and the current thread's
 596      * interrupted status is cleared.
 597      *
 598      * <p>If the specified waiting time elapses then {@link
 599      * TimeoutException} is thrown.  If the time is less than or equal
 600      * to zero, the method will not wait at all.
 601      *
 602      * @param x the object to exchange
 603      * @param timeout the maximum time to wait
 604      * @param unit the time unit of the {@code timeout} argument
 605      * @return the object provided by the other thread
 606      * @throws InterruptedException if the current thread was
 607      *         interrupted while waiting
 608      * @throws TimeoutException if the specified waiting time elapses
 609      *         before another thread enters the exchange
 610      */
 611     @SuppressWarnings("unchecked")
 612     public V exchange(V x, long timeout, TimeUnit unit)
 613         throws InterruptedException, TimeoutException {
 614         Object v;
 615         Object item = (x == null) ? NULL_ITEM : x;
 616         long ns = unit.toNanos(timeout);
 617         if ((arena != null ||
 618              (v = slotExchange(item, true, ns)) == null) &&
 619             ((Thread.interrupted() ||
 620               (v = arenaExchange(item, true, ns)) == null)))
 621             throw new InterruptedException();
 622         if (v == TIMED_OUT)
 623             throw new TimeoutException();
 624         return (v == NULL_ITEM) ? null : (V)v;
 625     }
 626 
 627     // Unsafe mechanics
 628     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
 629     private static final long BOUND;
 630     private static final long SLOT;
 631     private static final long MATCH;
 632     private static final long BLOCKER;
 633     private static final int ABASE;
 634     static {
 635         try {
 636             BOUND = U.objectFieldOffset
 637                 (Exchanger.class.getDeclaredField("bound"));
 638             SLOT = U.objectFieldOffset
 639                 (Exchanger.class.getDeclaredField("slot"));
 640 
 641             MATCH = U.objectFieldOffset
 642                 (Node.class.getDeclaredField("match"));
 643 
 644             BLOCKER = U.objectFieldOffset
 645                 (Thread.class.getDeclaredField("parkBlocker"));
 646 
 647             int scale = U.arrayIndexScale(Node[].class);
 648             if ((scale & (scale - 1)) != 0 || scale > (1 << ASHIFT))
 649                 throw new Error("Unsupported array scale");
 650             // ABASE absorbs padding in front of element 0
 651             ABASE = U.arrayBaseOffset(Node[].class) + (1 << ASHIFT);
 652         } catch (ReflectiveOperationException e) {
 653             throw new Error(e);
 654         }
 655     }
 656 
 657 }