Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/test/java/util/concurrent/BlockingQueue/MultipleProducersSingleConsumerLoops.java
+++ new/test/java/util/concurrent/BlockingQueue/MultipleProducersSingleConsumerLoops.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation.
7 7 *
8 8 * This code is distributed in the hope that it will be useful, but WITHOUT
9 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
11 11 * version 2 for more details (a copy is included in the LICENSE file that
12 12 * accompanied this code).
13 13 *
14 14 * You should have received a copy of the GNU General Public License version
15 15 * 2 along with this work; if not, write to the Free Software Foundation,
16 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17 17 *
18 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 19 * or visit www.oracle.com if you need additional information or have any
20 20 * questions.
21 21 */
22 22
23 23 /*
24 24 * This file is available under and governed by the GNU General Public
25 25 * License version 2 only, as published by the Free Software Foundation.
26 26 * However, the following notice accompanied the original version of this
↓ open down ↓ |
26 lines elided |
↑ open up ↑ |
27 27 * file:
28 28 *
29 29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 30 * Expert Group and released to the public domain, as explained at
31 31 * http://creativecommons.org/licenses/publicdomain
32 32 */
33 33
34 34 /*
35 35 * @test
36 36 * @bug 4486658
37 - * @compile MultipleProducersSingleConsumerLoops.java
37 + * @compile -source 1.5 MultipleProducersSingleConsumerLoops.java
38 38 * @run main/timeout=3600 MultipleProducersSingleConsumerLoops
39 39 * @summary multiple producers and single consumer using blocking queues
40 40 */
41 41
42 42 import java.util.concurrent.*;
43 43
44 44 public class MultipleProducersSingleConsumerLoops {
45 45 static final int CAPACITY = 100;
46 46 static final ExecutorService pool = Executors.newCachedThreadPool();
47 47 static boolean print = false;
48 48 static int producerSum;
49 49 static int consumerSum;
50 50
51 51 static synchronized void addProducerSum(int x) {
52 52 producerSum += x;
53 53 }
54 54
55 55 static synchronized void addConsumerSum(int x) {
56 56 consumerSum += x;
57 57 }
58 58
59 59 static synchronized void checkSum() {
60 60 if (producerSum != consumerSum)
61 61 throw new Error("CheckSum mismatch");
62 62 }
63 63
64 64 public static void main(String[] args) throws Exception {
65 65 int maxProducers = 5;
66 66 int iters = 100000;
67 67
68 68 if (args.length > 0)
69 69 maxProducers = Integer.parseInt(args[0]);
70 70
71 71 print = false;
72 72 System.out.println("Warmup...");
73 73 oneTest(1, 10000);
74 74 Thread.sleep(100);
75 75 oneTest(2, 10000);
76 76 Thread.sleep(100);
77 77 print = true;
78 78
79 79 for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) {
↓ open down ↓ |
32 lines elided |
↑ open up ↑ |
80 80 System.out.println("----------------------------------------");
81 81 System.out.println("Producers:" + i);
82 82 oneTest(i, iters);
83 83 Thread.sleep(100);
84 84 }
85 85 pool.shutdown();
86 86 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
87 87 throw new Error();
88 88 }
89 89
90 - static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
91 - LTQasSQ() { super(); }
92 - public void put(T x) {
93 - try { super.transfer(x); }
94 - catch (InterruptedException ex) { throw new Error(); }
95 - }
96 - private final static long serialVersionUID = 42;
97 - }
98 -
99 - static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
100 - HalfSyncLTQ() { super(); }
101 - public void put(T x) {
102 - if (ThreadLocalRandom.current().nextBoolean())
103 - super.put(x);
104 - else {
105 - try { super.transfer(x); }
106 - catch (InterruptedException ex) { throw new Error(); }
107 - }
108 - }
109 - private final static long serialVersionUID = 42;
110 - }
111 -
112 90 static void oneTest(int producers, int iters) throws Exception {
113 91 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
114 92 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
115 93 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), producers, iters);
116 94 oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
117 - oneRun(new LTQasSQ<Integer>(), producers, iters);
118 - oneRun(new HalfSyncLTQ<Integer>(), producers, iters);
119 95
120 96 // Don't run PBQ since can legitimately run out of memory
121 97 // if (print)
122 98 // System.out.print("PriorityBlockingQueue ");
123 99 // oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
124 100
125 101 oneRun(new SynchronousQueue<Integer>(), producers, iters);
126 102 if (print)
127 103 System.out.println("fair implementations:");
128 104 oneRun(new SynchronousQueue<Integer>(true), producers, iters);
129 105 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters);
130 106 }
131 107
132 - static abstract class Stage implements Runnable {
108 + abstract static class Stage implements Runnable {
133 109 final int iters;
134 110 final BlockingQueue<Integer> queue;
135 111 final CyclicBarrier barrier;
136 - Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
112 + Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
137 113 queue = q;
138 114 barrier = b;
139 115 this.iters = iters;
140 116 }
141 117 }
142 118
143 119 static class Producer extends Stage {
144 120 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
145 121 super(q, b, iters);
146 122 }
147 123
148 124 public void run() {
149 125 try {
150 126 barrier.await();
151 127 int s = 0;
152 128 int l = hashCode();
153 129 for (int i = 0; i < iters; ++i) {
154 130 l = LoopHelpers.compute1(l);
155 131 l = LoopHelpers.compute2(l);
156 132 queue.put(new Integer(l));
157 133 s += l;
158 134 }
159 135 addProducerSum(s);
160 136 barrier.await();
161 137 }
162 138 catch (Exception ie) {
163 139 ie.printStackTrace();
164 140 return;
165 141 }
166 142 }
167 143 }
168 144
169 145 static class Consumer extends Stage {
170 146 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
171 147 super(q, b, iters);
172 148 }
173 149
174 150 public void run() {
175 151 try {
176 152 barrier.await();
177 153 int s = 0;
178 154 for (int i = 0; i < iters; ++i) {
179 155 s += queue.take().intValue();
180 156 }
181 157 addConsumerSum(s);
182 158 barrier.await();
183 159 }
184 160 catch (Exception ie) {
185 161 ie.printStackTrace();
186 162 return;
187 163 }
188 164 }
189 165
190 166 }
191 167
192 168 static void oneRun(BlockingQueue<Integer> q, int nproducers, int iters) throws Exception {
193 169 if (print)
194 170 System.out.printf("%-18s", q.getClass().getSimpleName());
195 171 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
196 172 CyclicBarrier barrier = new CyclicBarrier(nproducers + 2, timer);
197 173 for (int i = 0; i < nproducers; ++i) {
198 174 pool.execute(new Producer(q, barrier, iters));
199 175 }
200 176 pool.execute(new Consumer(q, barrier, iters * nproducers));
201 177 barrier.await();
202 178 barrier.await();
203 179 long time = timer.getTime();
204 180 checkSum();
205 181 if (print)
206 182 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nproducers)) + " ns per transfer");
207 183 }
208 184
209 185 }
↓ open down ↓ |
63 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX