test/java/util/concurrent/BlockingQueue/SingleProducerMultipleConsumerLoops.java

Print this page

        

*** 32,42 **** */ /* * @test * @bug 4486658 ! * @compile SingleProducerMultipleConsumerLoops.java * @run main/timeout=600 SingleProducerMultipleConsumerLoops * @summary check ordering for blocking queues with 1 producer and multiple consumers */ import java.util.concurrent.*; --- 32,42 ---- */ /* * @test * @bug 4486658 ! * @compile -source 1.5 SingleProducerMultipleConsumerLoops.java * @run main/timeout=600 SingleProducerMultipleConsumerLoops * @summary check ordering for blocking queues with 1 producer and multiple consumers */ import java.util.concurrent.*;
*** 71,123 **** pool.shutdown(); if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) throw new Error(); } - static final class LTQasSQ<T> extends LinkedTransferQueue<T> { - LTQasSQ() { super(); } - public void put(T x) { - try { super.transfer(x); } - catch (InterruptedException ex) { throw new Error(); } - } - private final static long serialVersionUID = 42; - } - - static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> { - HalfSyncLTQ() { super(); } - public void put(T x) { - if (ThreadLocalRandom.current().nextBoolean()) - super.put(x); - else { - try { super.transfer(x); } - catch (InterruptedException ex) { throw new Error(); } - } - } - private final static long serialVersionUID = 42; - } - static void oneTest(int consumers, int iters) throws Exception { oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters); oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters); oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), consumers, iters); oneRun(new LinkedTransferQueue<Integer>(), consumers, iters); - oneRun(new LTQasSQ<Integer>(), consumers, iters); - oneRun(new HalfSyncLTQ<Integer>(), consumers, iters); oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters); oneRun(new SynchronousQueue<Integer>(), consumers, iters); if (print) System.out.println("fair implementations:"); oneRun(new SynchronousQueue<Integer>(true), consumers, iters); oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters); } ! static abstract class Stage implements Runnable { final int iters; final BlockingQueue<Integer> queue; final CyclicBarrier barrier; volatile int result; ! Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) { queue = q; barrier = b; this.iters = iters; } } --- 71,99 ---- pool.shutdown(); if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) throw new Error(); } static void oneTest(int consumers, int iters) throws Exception { oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters); oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters); oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), consumers, iters); oneRun(new LinkedTransferQueue<Integer>(), consumers, iters); oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters); oneRun(new SynchronousQueue<Integer>(), consumers, iters); if (print) System.out.println("fair implementations:"); oneRun(new SynchronousQueue<Integer>(true), consumers, iters); oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters); } ! abstract static class Stage implements Runnable { final int iters; final BlockingQueue<Integer> queue; final CyclicBarrier barrier; volatile int result; ! Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) { queue = q; barrier = b; this.iters = iters; } }