17 *
18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 * or visit www.oracle.com if you need additional information or have any
20 * questions.
21 */
22
23 /*
24 * This file is available under and governed by the GNU General Public
25 * License version 2 only, as published by the Free Software Foundation.
26 * However, the following notice accompanied the original version of this
27 * file:
28 *
29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 * Expert Group and released to the public domain, as explained at
31 * http://creativecommons.org/licenses/publicdomain
32 */
33
34 /*
35 * @test
36 * @bug 4486658
37 * @compile MultipleProducersSingleConsumerLoops.java
38 * @run main/timeout=3600 MultipleProducersSingleConsumerLoops
39 * @summary multiple producers and single consumer using blocking queues
40 */
41
42 import java.util.concurrent.*;
43
44 public class MultipleProducersSingleConsumerLoops {
45 static final int CAPACITY = 100;
46 static final ExecutorService pool = Executors.newCachedThreadPool();
47 static boolean print = false;
48 static int producerSum;
49 static int consumerSum;
50
51 static synchronized void addProducerSum(int x) {
52 producerSum += x;
53 }
54
55 static synchronized void addConsumerSum(int x) {
56 consumerSum += x;
57 }
70
71 print = false;
72 System.out.println("Warmup...");
73 oneTest(1, 10000);
74 Thread.sleep(100);
75 oneTest(2, 10000);
76 Thread.sleep(100);
77 print = true;
78
79 for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) {
80 System.out.println("----------------------------------------");
81 System.out.println("Producers:" + i);
82 oneTest(i, iters);
83 Thread.sleep(100);
84 }
85 pool.shutdown();
86 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
87 throw new Error();
88 }
89
90 static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
91 LTQasSQ() { super(); }
92 public void put(T x) {
93 try { super.transfer(x); }
94 catch (InterruptedException ex) { throw new Error(); }
95 }
96 private final static long serialVersionUID = 42;
97 }
98
99 static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
100 HalfSyncLTQ() { super(); }
101 public void put(T x) {
102 if (ThreadLocalRandom.current().nextBoolean())
103 super.put(x);
104 else {
105 try { super.transfer(x); }
106 catch (InterruptedException ex) { throw new Error(); }
107 }
108 }
109 private final static long serialVersionUID = 42;
110 }
111
112 static void oneTest(int producers, int iters) throws Exception {
113 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
114 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
115 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), producers, iters);
116 oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
117 oneRun(new LTQasSQ<Integer>(), producers, iters);
118 oneRun(new HalfSyncLTQ<Integer>(), producers, iters);
119
120 // Don't run PBQ since can legitimately run out of memory
121 // if (print)
122 // System.out.print("PriorityBlockingQueue ");
123 // oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
124
125 oneRun(new SynchronousQueue<Integer>(), producers, iters);
126 if (print)
127 System.out.println("fair implementations:");
128 oneRun(new SynchronousQueue<Integer>(true), producers, iters);
129 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters);
130 }
131
132 static abstract class Stage implements Runnable {
133 final int iters;
134 final BlockingQueue<Integer> queue;
135 final CyclicBarrier barrier;
136 Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
137 queue = q;
138 barrier = b;
139 this.iters = iters;
140 }
141 }
142
143 static class Producer extends Stage {
144 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
145 super(q, b, iters);
146 }
147
148 public void run() {
149 try {
150 barrier.await();
151 int s = 0;
152 int l = hashCode();
153 for (int i = 0; i < iters; ++i) {
154 l = LoopHelpers.compute1(l);
155 l = LoopHelpers.compute2(l);
156 queue.put(new Integer(l));
|
17 *
18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 * or visit www.oracle.com if you need additional information or have any
20 * questions.
21 */
22
23 /*
24 * This file is available under and governed by the GNU General Public
25 * License version 2 only, as published by the Free Software Foundation.
26 * However, the following notice accompanied the original version of this
27 * file:
28 *
29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 * Expert Group and released to the public domain, as explained at
31 * http://creativecommons.org/licenses/publicdomain
32 */
33
34 /*
35 * @test
36 * @bug 4486658
37 * @compile -source 1.5 MultipleProducersSingleConsumerLoops.java
38 * @run main/timeout=3600 MultipleProducersSingleConsumerLoops
39 * @summary multiple producers and single consumer using blocking queues
40 */
41
42 import java.util.concurrent.*;
43
44 public class MultipleProducersSingleConsumerLoops {
45 static final int CAPACITY = 100;
46 static final ExecutorService pool = Executors.newCachedThreadPool();
47 static boolean print = false;
48 static int producerSum;
49 static int consumerSum;
50
51 static synchronized void addProducerSum(int x) {
52 producerSum += x;
53 }
54
55 static synchronized void addConsumerSum(int x) {
56 consumerSum += x;
57 }
70
71 print = false;
72 System.out.println("Warmup...");
73 oneTest(1, 10000);
74 Thread.sleep(100);
75 oneTest(2, 10000);
76 Thread.sleep(100);
77 print = true;
78
79 for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) {
80 System.out.println("----------------------------------------");
81 System.out.println("Producers:" + i);
82 oneTest(i, iters);
83 Thread.sleep(100);
84 }
85 pool.shutdown();
86 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
87 throw new Error();
88 }
89
90 static void oneTest(int producers, int iters) throws Exception {
91 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
92 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
93 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), producers, iters);
94 oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
95
96 // Don't run PBQ since can legitimately run out of memory
97 // if (print)
98 // System.out.print("PriorityBlockingQueue ");
99 // oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
100
101 oneRun(new SynchronousQueue<Integer>(), producers, iters);
102 if (print)
103 System.out.println("fair implementations:");
104 oneRun(new SynchronousQueue<Integer>(true), producers, iters);
105 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters);
106 }
107
108 abstract static class Stage implements Runnable {
109 final int iters;
110 final BlockingQueue<Integer> queue;
111 final CyclicBarrier barrier;
112 Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
113 queue = q;
114 barrier = b;
115 this.iters = iters;
116 }
117 }
118
119 static class Producer extends Stage {
120 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
121 super(q, b, iters);
122 }
123
124 public void run() {
125 try {
126 barrier.await();
127 int s = 0;
128 int l = hashCode();
129 for (int i = 0; i < iters; ++i) {
130 l = LoopHelpers.compute1(l);
131 l = LoopHelpers.compute2(l);
132 queue.put(new Integer(l));
|