Print this page


Split Close
Expand all
Collapse all
          --- old/test/java/util/concurrent/BlockingQueue/MultipleProducersSingleConsumerLoops.java
          +++ new/test/java/util/concurrent/BlockingQueue/MultipleProducersSingleConsumerLoops.java
↓ open down ↓ 26 lines elided ↑ open up ↑
  27   27   * file:
  28   28   *
  29   29   * Written by Doug Lea with assistance from members of JCP JSR-166
  30   30   * Expert Group and released to the public domain, as explained at
  31   31   * http://creativecommons.org/licenses/publicdomain
  32   32   */
  33   33  
  34   34  /*
  35   35   * @test
  36   36   * @bug 4486658
  37      - * @compile MultipleProducersSingleConsumerLoops.java
       37 + * @compile -source 1.5 MultipleProducersSingleConsumerLoops.java
  38   38   * @run main/timeout=3600 MultipleProducersSingleConsumerLoops
  39   39   * @summary  multiple producers and single consumer using blocking queues
  40   40   */
  41   41  
  42   42  import java.util.concurrent.*;
  43   43  
  44   44  public class MultipleProducersSingleConsumerLoops {
  45   45      static final int CAPACITY =      100;
  46   46      static final ExecutorService pool = Executors.newCachedThreadPool();
  47   47      static boolean print = false;
↓ open down ↓ 32 lines elided ↑ open up ↑
  80   80              System.out.println("----------------------------------------");
  81   81              System.out.println("Producers:" + i);
  82   82              oneTest(i, iters);
  83   83              Thread.sleep(100);
  84   84          }
  85   85          pool.shutdown();
  86   86          if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
  87   87              throw new Error();
  88   88     }
  89   89  
  90      -    static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
  91      -        LTQasSQ() { super(); }
  92      -        public void put(T x) {
  93      -            try { super.transfer(x); }
  94      -            catch (InterruptedException ex) { throw new Error(); }
  95      -        }
  96      -        private final static long serialVersionUID = 42;
  97      -    }
  98      -
  99      -    static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
 100      -        HalfSyncLTQ() { super(); }
 101      -        public void put(T x) {
 102      -            if (ThreadLocalRandom.current().nextBoolean())
 103      -                super.put(x);
 104      -            else {
 105      -                try { super.transfer(x); }
 106      -                catch (InterruptedException ex) { throw new Error(); }
 107      -            }
 108      -        }
 109      -        private final static long serialVersionUID = 42;
 110      -    }
 111      -
 112   90      static void oneTest(int producers, int iters) throws Exception {
 113   91          oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
 114   92          oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
 115   93          oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), producers, iters);
 116   94          oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
 117      -        oneRun(new LTQasSQ<Integer>(), producers, iters);
 118      -        oneRun(new HalfSyncLTQ<Integer>(), producers, iters);
 119   95  
 120   96          // Don't run PBQ since can legitimately run out of memory
 121   97          //        if (print)
 122   98          //            System.out.print("PriorityBlockingQueue   ");
 123   99          //        oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
 124  100  
 125  101          oneRun(new SynchronousQueue<Integer>(), producers, iters);
 126  102          if (print)
 127  103              System.out.println("fair implementations:");
 128  104          oneRun(new SynchronousQueue<Integer>(true), producers, iters);
 129  105          oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters);
 130  106      }
 131  107  
 132      -    static abstract class Stage implements Runnable {
      108 +    abstract static class Stage implements Runnable {
 133  109          final int iters;
 134  110          final BlockingQueue<Integer> queue;
 135  111          final CyclicBarrier barrier;
 136      -        Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
      112 +        Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
 137  113              queue = q;
 138  114              barrier = b;
 139  115              this.iters = iters;
 140  116          }
 141  117      }
 142  118  
 143  119      static class Producer extends Stage {
 144  120          Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
 145  121              super(q, b, iters);
 146  122          }
↓ open down ↓ 63 lines elided ↑ open up ↑
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX