Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/test/java/util/concurrent/BlockingQueue/ProducerConsumerLoops.java
+++ new/test/java/util/concurrent/BlockingQueue/ProducerConsumerLoops.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation.
7 7 *
8 8 * This code is distributed in the hope that it will be useful, but WITHOUT
9 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
11 11 * version 2 for more details (a copy is included in the LICENSE file that
12 12 * accompanied this code).
13 13 *
14 14 * You should have received a copy of the GNU General Public License version
15 15 * 2 along with this work; if not, write to the Free Software Foundation,
16 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17 17 *
18 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 19 * or visit www.oracle.com if you need additional information or have any
20 20 * questions.
21 21 */
22 22
23 23 /*
24 24 * This file is available under and governed by the GNU General Public
25 25 * License version 2 only, as published by the Free Software Foundation.
26 26 * However, the following notice accompanied the original version of this
↓ open down ↓ |
26 lines elided |
↑ open up ↑ |
27 27 * file:
28 28 *
29 29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 30 * Expert Group and released to the public domain, as explained at
31 31 * http://creativecommons.org/licenses/publicdomain
32 32 */
33 33
34 34 /*
35 35 * @test
36 36 * @bug 4486658
37 - * @compile ProducerConsumerLoops.java
37 + * @compile -source 1.5 ProducerConsumerLoops.java
38 38 * @run main/timeout=3600 ProducerConsumerLoops
39 39 * @summary multiple producers and consumers using blocking queues
40 40 */
41 41
42 42 import java.util.concurrent.*;
43 43
44 44 public class ProducerConsumerLoops {
45 45 static final int CAPACITY = 100;
46 46
47 47 static final ExecutorService pool = Executors.newCachedThreadPool();
48 48 static boolean print = false;
49 49 static int producerSum;
50 50 static int consumerSum;
51 51 static synchronized void addProducerSum(int x) {
52 52 producerSum += x;
53 53 }
54 54
55 55 static synchronized void addConsumerSum(int x) {
56 56 consumerSum += x;
57 57 }
58 58
59 59 static synchronized void checkSum() {
60 60 if (producerSum != consumerSum)
61 61 throw new Error("CheckSum mismatch");
62 62 }
63 63
64 64 public static void main(String[] args) throws Exception {
65 65 int maxPairs = 8;
66 66 int iters = 10000;
67 67
68 68 if (args.length > 0)
69 69 maxPairs = Integer.parseInt(args[0]);
70 70
71 71 print = false;
72 72 System.out.println("Warmup...");
73 73 oneTest(1, 10000);
74 74 Thread.sleep(100);
75 75 oneTest(2, 10000);
76 76 Thread.sleep(100);
77 77 print = true;
78 78
79 79 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
↓ open down ↓ |
32 lines elided |
↑ open up ↑ |
80 80 System.out.println("----------------------------------------");
81 81 System.out.println("Pairs: " + i);
82 82 oneTest(i, iters);
83 83 Thread.sleep(100);
84 84 }
85 85 pool.shutdown();
86 86 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
87 87 throw new Error();
88 88 }
89 89
90 - static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
91 - LTQasSQ() { super(); }
92 - public void put(T x) {
93 - try { super.transfer(x); }
94 - catch (InterruptedException ex) { throw new Error(); }
95 - }
96 - private final static long serialVersionUID = 42;
97 - }
98 -
99 - static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
100 - HalfSyncLTQ() { super(); }
101 - public void put(T x) {
102 - if (ThreadLocalRandom.current().nextBoolean())
103 - super.put(x);
104 - else {
105 - try { super.transfer(x); }
106 - catch (InterruptedException ex) { throw new Error(); }
107 - }
108 - }
109 - private final static long serialVersionUID = 42;
110 - }
111 -
112 90 static void oneTest(int pairs, int iters) throws Exception {
113 91 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
114 92 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
115 93 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
116 94 oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
117 - oneRun(new LTQasSQ<Integer>(), pairs, iters);
118 - oneRun(new HalfSyncLTQ<Integer>(), pairs, iters);
119 95 oneRun(new PriorityBlockingQueue<Integer>(), pairs, iters);
120 96 oneRun(new SynchronousQueue<Integer>(), pairs, iters);
121 97
122 98 if (print)
123 99 System.out.println("fair implementations:");
124 100
125 101 oneRun(new SynchronousQueue<Integer>(true), pairs, iters);
126 102 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, iters);
127 103 }
128 104
129 - static abstract class Stage implements Runnable {
105 + abstract static class Stage implements Runnable {
130 106 final int iters;
131 107 final BlockingQueue<Integer> queue;
132 108 final CyclicBarrier barrier;
133 - Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
109 + Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
134 110 queue = q;
135 111 barrier = b;
136 112 this.iters = iters;
137 113 }
138 114 }
139 115
140 116 static class Producer extends Stage {
141 117 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
142 118 super(q, b, iters);
143 119 }
144 120
145 121 public void run() {
146 122 try {
147 123 barrier.await();
148 124 int s = 0;
149 125 int l = hashCode();
150 126 for (int i = 0; i < iters; ++i) {
151 127 l = LoopHelpers.compute2(l);
152 128 queue.put(new Integer(l));
153 129 s += LoopHelpers.compute1(l);
154 130 }
155 131 addProducerSum(s);
156 132 barrier.await();
157 133 }
158 134 catch (Exception ie) {
159 135 ie.printStackTrace();
160 136 return;
161 137 }
162 138 }
163 139 }
164 140
165 141 static class Consumer extends Stage {
166 142 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
167 143 super(q, b, iters);
168 144 }
169 145
170 146 public void run() {
171 147 try {
172 148 barrier.await();
173 149 int l = 0;
174 150 int s = 0;
175 151 for (int i = 0; i < iters; ++i) {
176 152 l = LoopHelpers.compute1(queue.take().intValue());
177 153 s += l;
178 154 }
179 155 addConsumerSum(s);
180 156 barrier.await();
181 157 }
182 158 catch (Exception ie) {
183 159 ie.printStackTrace();
184 160 return;
185 161 }
186 162 }
187 163
188 164 }
189 165
190 166 static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
191 167 if (print)
192 168 System.out.printf("%-18s", q.getClass().getSimpleName());
193 169 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
194 170 CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
195 171 for (int i = 0; i < npairs; ++i) {
196 172 pool.execute(new Producer(q, barrier, iters));
197 173 pool.execute(new Consumer(q, barrier, iters));
198 174 }
199 175 barrier.await();
200 176 barrier.await();
201 177 long time = timer.getTime();
202 178 checkSum();
203 179 if (print)
204 180 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * npairs)) + " ns per transfer");
205 181 }
206 182
207 183 }
↓ open down ↓ |
64 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX