Print this page


Split Close
Expand all
Collapse all
          --- old/src/share/classes/java/util/concurrent/SynchronousQueue.java
          +++ new/src/share/classes/java/util/concurrent/SynchronousQueue.java
↓ open down ↓ 155 lines elided ↑ open up ↑
 156  156       * node's link to now point to the node itself. This doesn't arise
 157  157       * much for Stack nodes (because blocked threads do not hang on to
 158  158       * old head pointers), but references in Queue nodes must be
 159  159       * aggressively forgotten to avoid reachability of everything any
 160  160       * node has ever referred to since arrival.
 161  161       */
 162  162  
 163  163      /**
 164  164       * Shared internal API for dual stacks and queues.
 165  165       */
 166      -    static abstract class Transferer {
      166 +    abstract static class Transferer {
 167  167          /**
 168  168           * Performs a put or take.
 169  169           *
 170  170           * @param e if non-null, the item to be handed to a consumer;
 171  171           *          if null, requests that transfer return an item
 172  172           *          offered by producer.
 173  173           * @param timed if this operation should timeout
 174  174           * @param nanos the timeout, in nanoseconds
 175  175           * @return if non-null, the item provided or received; if null,
 176  176           *         the operation failed due to timeout or interrupt --
↓ open down ↓ 6 lines elided ↑ open up ↑
 183  183      /** The number of CPUs, for spin control */
 184  184      static final int NCPUS = Runtime.getRuntime().availableProcessors();
 185  185  
 186  186      /**
 187  187       * The number of times to spin before blocking in timed waits.
 188  188       * The value is empirically derived -- it works well across a
 189  189       * variety of processors and OSes. Empirically, the best value
 190  190       * seems not to vary with number of CPUs (beyond 2) so is just
 191  191       * a constant.
 192  192       */
 193      -    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
      193 +    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
 194  194  
 195  195      /**
 196  196       * The number of times to spin before blocking in untimed waits.
 197  197       * This is greater than timed value because untimed waits spin
 198  198       * faster since they don't need to check times on each spin.
 199  199       */
 200  200      static final int maxUntimedSpins = maxTimedSpins * 16;
 201  201  
 202  202      /**
 203  203       * The number of nanoseconds for which it is faster to spin
↓ open down ↓ 30 lines elided ↑ open up ↑
 234  234              Object item;                // data; or null for REQUESTs
 235  235              int mode;
 236  236              // Note: item and mode fields don't need to be volatile
 237  237              // since they are always written before, and read after,
 238  238              // other volatile/atomic operations.
 239  239  
 240  240              SNode(Object item) {
 241  241                  this.item = item;
 242  242              }
 243  243  
 244      -            static final AtomicReferenceFieldUpdater<SNode, SNode>
 245      -                nextUpdater = AtomicReferenceFieldUpdater.newUpdater
 246      -                (SNode.class, SNode.class, "next");
 247      -
 248  244              boolean casNext(SNode cmp, SNode val) {
 249      -                return (cmp == next &&
 250      -                        nextUpdater.compareAndSet(this, cmp, val));
      245 +                return cmp == next &&
      246 +                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
 251  247              }
 252  248  
 253      -            static final AtomicReferenceFieldUpdater<SNode, SNode>
 254      -                matchUpdater = AtomicReferenceFieldUpdater.newUpdater
 255      -                (SNode.class, SNode.class, "match");
 256      -
 257  249              /**
 258  250               * Tries to match node s to this node, if so, waking up thread.
 259  251               * Fulfillers call tryMatch to identify their waiters.
 260  252               * Waiters block until they have been matched.
 261  253               *
 262  254               * @param s the node to match
 263  255               * @return true if successfully matched to s
 264  256               */
 265  257              boolean tryMatch(SNode s) {
 266  258                  if (match == null &&
 267      -                    matchUpdater.compareAndSet(this, null, s)) {
      259 +                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
 268  260                      Thread w = waiter;
 269  261                      if (w != null) {    // waiters need at most one unpark
 270  262                          waiter = null;
 271  263                          LockSupport.unpark(w);
 272  264                      }
 273  265                      return true;
 274  266                  }
 275  267                  return match == s;
 276  268              }
 277  269  
 278  270              /**
 279  271               * Tries to cancel a wait by matching node to itself.
 280  272               */
 281  273              void tryCancel() {
 282      -                matchUpdater.compareAndSet(this, null, this);
      274 +                UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
 283  275              }
 284  276  
 285  277              boolean isCancelled() {
 286  278                  return match == this;
 287  279              }
      280 +
      281 +            // Unsafe mechanics
      282 +            private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
      283 +            private static final long nextOffset =
      284 +                objectFieldOffset(UNSAFE, "next", SNode.class);
      285 +            private static final long matchOffset =
      286 +                objectFieldOffset(UNSAFE, "match", SNode.class);
      287 +
 288  288          }
 289  289  
 290  290          /** The head (top) of the stack */
 291  291          volatile SNode head;
 292  292  
 293      -        static final AtomicReferenceFieldUpdater<TransferStack, SNode>
 294      -            headUpdater = AtomicReferenceFieldUpdater.newUpdater
 295      -            (TransferStack.class,  SNode.class, "head");
 296      -
 297  293          boolean casHead(SNode h, SNode nh) {
 298      -            return h == head && headUpdater.compareAndSet(this, h, nh);
      294 +            return h == head &&
      295 +                UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
 299  296          }
 300  297  
 301  298          /**
 302  299           * Creates or resets fields of a node. Called only from transfer
 303  300           * where the node to push on stack is lazily created and
 304  301           * reused when possible to help reduce intervals between reads
 305  302           * and CASes of head and to avoid surges of garbage when CASes
 306  303           * to push nodes fail due to contention.
 307  304           */
 308  305          static SNode snode(SNode s, Object e, SNode next, int mode) {
↓ open down ↓ 22 lines elided ↑ open up ↑
 331  328               *    other threads performing action 3:
 332  329               *
 333  330               * 3. If top of stack already holds another fulfilling node,
 334  331               *    help it out by doing its match and/or pop
 335  332               *    operations, and then continue. The code for helping
 336  333               *    is essentially the same as for fulfilling, except
 337  334               *    that it doesn't return the item.
 338  335               */
 339  336  
 340  337              SNode s = null; // constructed/reused as needed
 341      -            int mode = (e == null)? REQUEST : DATA;
      338 +            int mode = (e == null) ? REQUEST : DATA;
 342  339  
 343  340              for (;;) {
 344  341                  SNode h = head;
 345  342                  if (h == null || h.mode == mode) {  // empty or same-mode
 346  343                      if (timed && nanos <= 0) {      // can't wait
 347  344                          if (h != null && h.isCancelled())
 348  345                              casHead(h, h.next);     // pop cancelled node
 349  346                          else
 350  347                              return null;
 351  348                      } else if (casHead(h, s = snode(s, e, h, mode))) {
 352  349                          SNode m = awaitFulfill(s, timed, nanos);
 353  350                          if (m == s) {               // wait was cancelled
 354  351                              clean(s);
 355  352                              return null;
 356  353                          }
 357  354                          if ((h = head) != null && h.next == s)
 358  355                              casHead(h, s.next);     // help s's fulfiller
 359      -                        return mode == REQUEST? m.item : s.item;
      356 +                        return (mode == REQUEST) ? m.item : s.item;
 360  357                      }
 361  358                  } else if (!isFulfilling(h.mode)) { // try to fulfill
 362  359                      if (h.isCancelled())            // already cancelled
 363  360                          casHead(h, h.next);         // pop and retry
 364  361                      else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
 365  362                          for (;;) { // loop until matched or waiters disappear
 366  363                              SNode m = s.next;       // m is s's match
 367  364                              if (m == null) {        // all waiters are gone
 368  365                                  casHead(s, null);   // pop fulfill node
 369  366                                  s = null;           // use new node next time
 370  367                                  break;              // restart main loop
 371  368                              }
 372  369                              SNode mn = m.next;
 373  370                              if (m.tryMatch(s)) {
 374  371                                  casHead(s, mn);     // pop both s and m
 375      -                                return (mode == REQUEST)? m.item : s.item;
      372 +                                return (mode == REQUEST) ? m.item : s.item;
 376  373                              } else                  // lost match
 377  374                                  s.casNext(m, mn);   // help unlink
 378  375                          }
 379  376                      }
 380  377                  } else {                            // help a fulfiller
 381  378                      SNode m = h.next;               // m is h's match
 382  379                      if (m == null)                  // waiter is gone
 383  380                          casHead(h, null);           // pop fulfilling node
 384  381                      else {
 385  382                          SNode mn = m.next;
↓ open down ↓ 30 lines elided ↑ open up ↑
 416  413               *
 417  414               * The order of checks for returning out of main loop
 418  415               * reflects fact that interrupts have precedence over
 419  416               * normal returns, which have precedence over
 420  417               * timeouts. (So, on timeout, one last check for match is
 421  418               * done before giving up.) Except that calls from untimed
 422  419               * SynchronousQueue.{poll/offer} don't check interrupts
 423  420               * and don't wait at all, so are trapped in transfer
 424  421               * method rather than calling awaitFulfill.
 425  422               */
 426      -            long lastTime = (timed)? System.nanoTime() : 0;
      423 +            long lastTime = timed ? System.nanoTime() : 0;
 427  424              Thread w = Thread.currentThread();
 428  425              SNode h = head;
 429      -            int spins = (shouldSpin(s)?
 430      -                         (timed? maxTimedSpins : maxUntimedSpins) : 0);
      426 +            int spins = (shouldSpin(s) ?
      427 +                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
 431  428              for (;;) {
 432  429                  if (w.isInterrupted())
 433  430                      s.tryCancel();
 434  431                  SNode m = s.match;
 435  432                  if (m != null)
 436  433                      return m;
 437  434                  if (timed) {
 438  435                      long now = System.nanoTime();
 439  436                      nanos -= now - lastTime;
 440  437                      lastTime = now;
 441  438                      if (nanos <= 0) {
 442  439                          s.tryCancel();
 443  440                          continue;
 444  441                      }
 445  442                  }
 446  443                  if (spins > 0)
 447      -                    spins = shouldSpin(s)? (spins-1) : 0;
      444 +                    spins = shouldSpin(s) ? (spins-1) : 0;
 448  445                  else if (s.waiter == null)
 449  446                      s.waiter = w; // establish waiter so can park next iter
 450  447                  else if (!timed)
 451  448                      LockSupport.park(this);
 452  449                  else if (nanos > spinForTimeoutThreshold)
 453  450                      LockSupport.parkNanos(this, nanos);
 454  451              }
 455  452          }
 456  453  
 457  454          /**
↓ open down ↓ 34 lines elided ↑ open up ↑
 492  489  
 493  490              // Unsplice embedded nodes
 494  491              while (p != null && p != past) {
 495  492                  SNode n = p.next;
 496  493                  if (n != null && n.isCancelled())
 497  494                      p.casNext(n, n.next);
 498  495                  else
 499  496                      p = n;
 500  497              }
 501  498          }
      499 +
      500 +        // Unsafe mechanics
      501 +        private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
      502 +        private static final long headOffset =
      503 +            objectFieldOffset(UNSAFE, "head", TransferStack.class);
      504 +
 502  505      }
 503  506  
 504  507      /** Dual Queue */
 505  508      static final class TransferQueue extends Transferer {
 506  509          /*
 507  510           * This extends Scherer-Scott dual queue algorithm, differing,
 508  511           * among other ways, by using modes within nodes rather than
 509  512           * marked pointers. The algorithm is a little simpler than
 510  513           * that for stacks because fulfillers do not need explicit
 511  514           * nodes, and matching is done by CAS'ing QNode.item field
↓ open down ↓ 5 lines elided ↑ open up ↑
 517  520              volatile QNode next;          // next node in queue
 518  521              volatile Object item;         // CAS'ed to or from null
 519  522              volatile Thread waiter;       // to control park/unpark
 520  523              final boolean isData;
 521  524  
 522  525              QNode(Object item, boolean isData) {
 523  526                  this.item = item;
 524  527                  this.isData = isData;
 525  528              }
 526  529  
 527      -            static final AtomicReferenceFieldUpdater<QNode, QNode>
 528      -                nextUpdater = AtomicReferenceFieldUpdater.newUpdater
 529      -                (QNode.class, QNode.class, "next");
 530      -
 531  530              boolean casNext(QNode cmp, QNode val) {
 532      -                return (next == cmp &&
 533      -                        nextUpdater.compareAndSet(this, cmp, val));
      531 +                return next == cmp &&
      532 +                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
 534  533              }
 535  534  
 536      -            static final AtomicReferenceFieldUpdater<QNode, Object>
 537      -                itemUpdater = AtomicReferenceFieldUpdater.newUpdater
 538      -                (QNode.class, Object.class, "item");
 539      -
 540  535              boolean casItem(Object cmp, Object val) {
 541      -                return (item == cmp &&
 542      -                        itemUpdater.compareAndSet(this, cmp, val));
      536 +                return item == cmp &&
      537 +                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
 543  538              }
 544  539  
 545  540              /**
 546  541               * Tries to cancel by CAS'ing ref to this as item.
 547  542               */
 548  543              void tryCancel(Object cmp) {
 549      -                itemUpdater.compareAndSet(this, cmp, this);
      544 +                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
 550  545              }
 551  546  
 552  547              boolean isCancelled() {
 553  548                  return item == this;
 554  549              }
 555  550  
 556  551              /**
 557  552               * Returns true if this node is known to be off the queue
 558  553               * because its next pointer has been forgotten due to
 559  554               * an advanceHead operation.
 560  555               */
 561  556              boolean isOffList() {
 562  557                  return next == this;
 563  558              }
      559 +
      560 +            // Unsafe mechanics
      561 +            private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
      562 +            private static final long nextOffset =
      563 +                objectFieldOffset(UNSAFE, "next", QNode.class);
      564 +            private static final long itemOffset =
      565 +                objectFieldOffset(UNSAFE, "item", QNode.class);
 564  566          }
 565  567  
 566  568          /** Head of queue */
 567  569          transient volatile QNode head;
 568  570          /** Tail of queue */
 569  571          transient volatile QNode tail;
 570  572          /**
 571  573           * Reference to a cancelled node that might not yet have been
 572  574           * unlinked from queue because it was the last inserted node
 573  575           * when it cancelled.
 574  576           */
 575  577          transient volatile QNode cleanMe;
 576  578  
 577  579          TransferQueue() {
 578  580              QNode h = new QNode(null, false); // initialize to dummy node.
 579  581              head = h;
 580  582              tail = h;
 581  583          }
 582  584  
 583      -        static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
 584      -            headUpdater = AtomicReferenceFieldUpdater.newUpdater
 585      -            (TransferQueue.class,  QNode.class, "head");
 586      -
 587  585          /**
 588  586           * Tries to cas nh as new head; if successful, unlink
 589  587           * old head's next node to avoid garbage retention.
 590  588           */
 591  589          void advanceHead(QNode h, QNode nh) {
 592      -            if (h == head && headUpdater.compareAndSet(this, h, nh))
      590 +            if (h == head &&
      591 +                UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
 593  592                  h.next = h; // forget old next
 594  593          }
 595  594  
 596      -        static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
 597      -            tailUpdater = AtomicReferenceFieldUpdater.newUpdater
 598      -            (TransferQueue.class, QNode.class, "tail");
 599      -
 600  595          /**
 601  596           * Tries to cas nt as new tail.
 602  597           */
 603  598          void advanceTail(QNode t, QNode nt) {
 604  599              if (tail == t)
 605      -                tailUpdater.compareAndSet(this, t, nt);
      600 +                UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
 606  601          }
 607  602  
 608      -        static final AtomicReferenceFieldUpdater<TransferQueue, QNode>
 609      -            cleanMeUpdater = AtomicReferenceFieldUpdater.newUpdater
 610      -            (TransferQueue.class, QNode.class, "cleanMe");
 611      -
 612  603          /**
 613  604           * Tries to CAS cleanMe slot.
 614  605           */
 615  606          boolean casCleanMe(QNode cmp, QNode val) {
 616      -            return (cleanMe == cmp &&
 617      -                    cleanMeUpdater.compareAndSet(this, cmp, val));
      607 +            return cleanMe == cmp &&
      608 +                UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
 618  609          }
 619  610  
 620  611          /**
 621  612           * Puts or takes an item.
 622  613           */
 623  614          Object transfer(Object e, boolean timed, long nanos) {
 624  615              /* Basic algorithm is to loop trying to take either of
 625  616               * two actions:
 626  617               *
 627  618               * 1. If queue apparently empty or holding same-mode nodes,
↓ open down ↓ 48 lines elided ↑ open up ↑
 676  667                          clean(t, s);
 677  668                          return null;
 678  669                      }
 679  670  
 680  671                      if (!s.isOffList()) {           // not already unlinked
 681  672                          advanceHead(t, s);          // unlink if head
 682  673                          if (x != null)              // and forget fields
 683  674                              s.item = s;
 684  675                          s.waiter = null;
 685  676                      }
 686      -                    return (x != null)? x : e;
      677 +                    return (x != null) ? x : e;
 687  678  
 688  679                  } else {                            // complementary-mode
 689  680                      QNode m = h.next;               // node to fulfill
 690  681                      if (t != tail || m == null || h != head)
 691  682                          continue;                   // inconsistent read
 692  683  
 693  684                      Object x = m.item;
 694  685                      if (isData == (x != null) ||    // m already fulfilled
 695  686                          x == m ||                   // m cancelled
 696  687                          !m.casItem(x, e)) {         // lost CAS
 697  688                          advanceHead(h, m);          // dequeue and retry
 698  689                          continue;
 699  690                      }
 700  691  
 701  692                      advanceHead(h, m);              // successfully fulfilled
 702  693                      LockSupport.unpark(m.waiter);
 703      -                    return (x != null)? x : e;
      694 +                    return (x != null) ? x : e;
 704  695                  }
 705  696              }
 706  697          }
 707  698  
 708  699          /**
 709  700           * Spins/blocks until node s is fulfilled.
 710  701           *
 711  702           * @param s the waiting node
 712  703           * @param e the comparison value for checking match
 713  704           * @param timed true if timed wait
 714  705           * @param nanos timeout value
 715  706           * @return matched item, or s if cancelled
 716  707           */
 717  708          Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
 718  709              /* Same idea as TransferStack.awaitFulfill */
 719      -            long lastTime = (timed)? System.nanoTime() : 0;
      710 +            long lastTime = timed ? System.nanoTime() : 0;
 720  711              Thread w = Thread.currentThread();
 721  712              int spins = ((head.next == s) ?
 722      -                         (timed? maxTimedSpins : maxUntimedSpins) : 0);
      713 +                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
 723  714              for (;;) {
 724  715                  if (w.isInterrupted())
 725  716                      s.tryCancel(e);
 726  717                  Object x = s.item;
 727  718                  if (x != e)
 728  719                      return x;
 729  720                  if (timed) {
 730  721                      long now = System.nanoTime();
 731  722                      nanos -= now - lastTime;
 732  723                      lastTime = now;
↓ open down ↓ 59 lines elided ↑ open up ↑
 792  783                           (dn = d.next) != null &&  //   has successor
 793  784                           dn != d &&                //   that is on list
 794  785                           dp.casNext(d, dn)))       // d unspliced
 795  786                          casCleanMe(dp, null);
 796  787                      if (dp == pred)
 797  788                          return;      // s is already saved node
 798  789                  } else if (casCleanMe(null, pred))
 799  790                      return;          // Postpone cleaning s
 800  791              }
 801  792          }
      793 +
      794 +        // unsafe mechanics
      795 +        private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
      796 +        private static final long headOffset =
      797 +            objectFieldOffset(UNSAFE, "head", TransferQueue.class);
      798 +        private static final long tailOffset =
      799 +            objectFieldOffset(UNSAFE, "tail", TransferQueue.class);
      800 +        private static final long cleanMeOffset =
      801 +            objectFieldOffset(UNSAFE, "cleanMe", TransferQueue.class);
      802 +
 802  803      }
 803  804  
 804  805      /**
 805  806       * The transferer. Set only in constructor, but cannot be declared
 806  807       * as final without further complicating serialization.  Since
 807  808       * this is accessed only at most once per public method, there
 808  809       * isn't a noticeable performance penalty for using volatile
 809  810       * instead of final here.
 810  811       */
 811  812      private transient volatile Transferer transferer;
↓ open down ↓ 5 lines elided ↑ open up ↑
 817  818          this(false);
 818  819      }
 819  820  
 820  821      /**
 821  822       * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
 822  823       *
 823  824       * @param fair if true, waiting threads contend in FIFO order for
 824  825       *        access; otherwise the order is unspecified.
 825  826       */
 826  827      public SynchronousQueue(boolean fair) {
 827      -        transferer = (fair)? new TransferQueue() : new TransferStack();
      828 +        transferer = fair ? new TransferQueue() : new TransferStack();
 828  829      }
 829  830  
 830  831      /**
 831  832       * Adds the specified element to this queue, waiting if necessary for
 832  833       * another thread to receive it.
 833  834       *
 834  835       * @throws InterruptedException {@inheritDoc}
 835  836       * @throws NullPointerException {@inheritDoc}
 836  837       */
 837  838      public void put(E o) throws InterruptedException {
↓ open down ↓ 296 lines elided ↑ open up ↑
1134 1135  
1135 1136      private void readObject(final java.io.ObjectInputStream s)
1136 1137          throws java.io.IOException, ClassNotFoundException {
1137 1138          s.defaultReadObject();
1138 1139          if (waitingProducers instanceof FifoWaitQueue)
1139 1140              transferer = new TransferQueue();
1140 1141          else
1141 1142              transferer = new TransferStack();
1142 1143      }
1143 1144  
     1145 +    // Unsafe mechanics
     1146 +    static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
     1147 +                                  String field, Class<?> klazz) {
     1148 +        try {
     1149 +            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
     1150 +        } catch (NoSuchFieldException e) {
     1151 +            // Convert Exception to corresponding Error
     1152 +            NoSuchFieldError error = new NoSuchFieldError(field);
     1153 +            error.initCause(e);
     1154 +            throw error;
     1155 +        }
     1156 +    }
     1157 +
1144 1158  }
    
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX