test/java/util/concurrent/BlockingQueue/SingleProducerMultipleConsumerLoops.java
Print this page
*** 32,42 ****
*/
/*
* @test
* @bug 4486658
! * @compile SingleProducerMultipleConsumerLoops.java
* @run main/timeout=600 SingleProducerMultipleConsumerLoops
* @summary check ordering for blocking queues with 1 producer and multiple consumers
*/
import java.util.concurrent.*;
--- 32,42 ----
*/
/*
* @test
* @bug 4486658
! * @compile -source 1.5 SingleProducerMultipleConsumerLoops.java
* @run main/timeout=600 SingleProducerMultipleConsumerLoops
* @summary check ordering for blocking queues with 1 producer and multiple consumers
*/
import java.util.concurrent.*;
*** 71,123 ****
pool.shutdown();
if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
throw new Error();
}
- static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
- LTQasSQ() { super(); }
- public void put(T x) {
- try { super.transfer(x); }
- catch (InterruptedException ex) { throw new Error(); }
- }
- private final static long serialVersionUID = 42;
- }
-
- static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
- HalfSyncLTQ() { super(); }
- public void put(T x) {
- if (ThreadLocalRandom.current().nextBoolean())
- super.put(x);
- else {
- try { super.transfer(x); }
- catch (InterruptedException ex) { throw new Error(); }
- }
- }
- private final static long serialVersionUID = 42;
- }
-
static void oneTest(int consumers, int iters) throws Exception {
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedTransferQueue<Integer>(), consumers, iters);
- oneRun(new LTQasSQ<Integer>(), consumers, iters);
- oneRun(new HalfSyncLTQ<Integer>(), consumers, iters);
oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters);
oneRun(new SynchronousQueue<Integer>(), consumers, iters);
if (print)
System.out.println("fair implementations:");
oneRun(new SynchronousQueue<Integer>(true), consumers, iters);
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters);
}
! static abstract class Stage implements Runnable {
final int iters;
final BlockingQueue<Integer> queue;
final CyclicBarrier barrier;
volatile int result;
! Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
queue = q;
barrier = b;
this.iters = iters;
}
}
--- 71,99 ----
pool.shutdown();
if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
throw new Error();
}
static void oneTest(int consumers, int iters) throws Exception {
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), consumers, iters);
oneRun(new LinkedTransferQueue<Integer>(), consumers, iters);
oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters);
oneRun(new SynchronousQueue<Integer>(), consumers, iters);
if (print)
System.out.println("fair implementations:");
oneRun(new SynchronousQueue<Integer>(true), consumers, iters);
oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters);
}
! abstract static class Stage implements Runnable {
final int iters;
final BlockingQueue<Integer> queue;
final CyclicBarrier barrier;
volatile int result;
! Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
queue = q;
barrier = b;
this.iters = iters;
}
}