Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/test/java/util/concurrent/BlockingQueue/SingleProducerMultipleConsumerLoops.java
+++ new/test/java/util/concurrent/BlockingQueue/SingleProducerMultipleConsumerLoops.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation.
7 7 *
8 8 * This code is distributed in the hope that it will be useful, but WITHOUT
9 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
11 11 * version 2 for more details (a copy is included in the LICENSE file that
12 12 * accompanied this code).
13 13 *
14 14 * You should have received a copy of the GNU General Public License version
15 15 * 2 along with this work; if not, write to the Free Software Foundation,
16 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17 17 *
18 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 19 * or visit www.oracle.com if you need additional information or have any
20 20 * questions.
21 21 */
22 22
23 23 /*
24 24 * This file is available under and governed by the GNU General Public
25 25 * License version 2 only, as published by the Free Software Foundation.
26 26 * However, the following notice accompanied the original version of this
↓ open down ↓ |
26 lines elided |
↑ open up ↑ |
27 27 * file:
28 28 *
29 29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 30 * Expert Group and released to the public domain, as explained at
31 31 * http://creativecommons.org/licenses/publicdomain
32 32 */
33 33
34 34 /*
35 35 * @test
36 36 * @bug 4486658
37 - * @compile SingleProducerMultipleConsumerLoops.java
37 + * @compile -source 1.5 SingleProducerMultipleConsumerLoops.java
38 38 * @run main/timeout=600 SingleProducerMultipleConsumerLoops
39 39 * @summary check ordering for blocking queues with 1 producer and multiple consumers
40 40 */
41 41
42 42 import java.util.concurrent.*;
43 43
44 44 public class SingleProducerMultipleConsumerLoops {
45 45 static final int CAPACITY = 100;
46 46
47 47 static final ExecutorService pool = Executors.newCachedThreadPool();
48 48 static boolean print = false;
49 49
50 50 public static void main(String[] args) throws Exception {
51 51 int maxConsumers = 5;
52 52 int iters = 10000;
53 53
54 54 if (args.length > 0)
55 55 maxConsumers = Integer.parseInt(args[0]);
56 56
57 57 print = false;
58 58 System.out.println("Warmup...");
59 59 oneTest(1, 10000);
60 60 Thread.sleep(100);
61 61 oneTest(2, 10000);
62 62 Thread.sleep(100);
63 63 print = true;
64 64
65 65 for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) {
↓ open down ↓ |
18 lines elided |
↑ open up ↑ |
66 66 System.out.println("----------------------------------------");
67 67 System.out.println("Consumers: " + i);
68 68 oneTest(i, iters);
69 69 Thread.sleep(100);
70 70 }
71 71 pool.shutdown();
72 72 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
73 73 throw new Error();
74 74 }
75 75
76 - static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
77 - LTQasSQ() { super(); }
78 - public void put(T x) {
79 - try { super.transfer(x); }
80 - catch (InterruptedException ex) { throw new Error(); }
81 - }
82 - private final static long serialVersionUID = 42;
83 - }
84 -
85 - static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
86 - HalfSyncLTQ() { super(); }
87 - public void put(T x) {
88 - if (ThreadLocalRandom.current().nextBoolean())
89 - super.put(x);
90 - else {
91 - try { super.transfer(x); }
92 - catch (InterruptedException ex) { throw new Error(); }
93 - }
94 - }
95 - private final static long serialVersionUID = 42;
96 - }
97 -
98 76 static void oneTest(int consumers, int iters) throws Exception {
99 77 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
100 78 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
101 79 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), consumers, iters);
102 80 oneRun(new LinkedTransferQueue<Integer>(), consumers, iters);
103 - oneRun(new LTQasSQ<Integer>(), consumers, iters);
104 - oneRun(new HalfSyncLTQ<Integer>(), consumers, iters);
105 81 oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters);
106 82 oneRun(new SynchronousQueue<Integer>(), consumers, iters);
107 83 if (print)
108 84 System.out.println("fair implementations:");
109 85 oneRun(new SynchronousQueue<Integer>(true), consumers, iters);
110 86 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters);
111 87 }
112 88
113 - static abstract class Stage implements Runnable {
89 + abstract static class Stage implements Runnable {
114 90 final int iters;
115 91 final BlockingQueue<Integer> queue;
116 92 final CyclicBarrier barrier;
117 93 volatile int result;
118 - Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
94 + Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
119 95 queue = q;
120 96 barrier = b;
121 97 this.iters = iters;
122 98 }
123 99 }
124 100
125 101 static class Producer extends Stage {
126 102 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
127 103 super(q, b, iters);
128 104 }
129 105
130 106 public void run() {
131 107 try {
132 108 barrier.await();
133 109 for (int i = 0; i < iters; ++i) {
134 110 queue.put(new Integer(i));
135 111 }
136 112 barrier.await();
137 113 result = 432;
138 114 }
139 115 catch (Exception ie) {
140 116 ie.printStackTrace();
141 117 return;
142 118 }
143 119 }
144 120 }
145 121
146 122 static class Consumer extends Stage {
147 123 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
148 124 super(q, b, iters);
149 125 }
150 126
151 127 public void run() {
152 128 try {
153 129 barrier.await();
154 130 int l = 0;
155 131 int s = 0;
156 132 int last = -1;
157 133 for (int i = 0; i < iters; ++i) {
158 134 Integer item = queue.take();
159 135 int v = item.intValue();
160 136 if (v < last)
161 137 throw new Error("Out-of-Order transfer");
162 138 last = v;
163 139 l = LoopHelpers.compute1(v);
164 140 s += l;
165 141 }
166 142 barrier.await();
167 143 result = s;
168 144 }
169 145 catch (Exception ie) {
170 146 ie.printStackTrace();
171 147 return;
172 148 }
173 149 }
174 150
175 151 }
176 152
177 153 static void oneRun(BlockingQueue<Integer> q, int nconsumers, int iters) throws Exception {
178 154 if (print)
179 155 System.out.printf("%-18s", q.getClass().getSimpleName());
180 156 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
181 157 CyclicBarrier barrier = new CyclicBarrier(nconsumers + 2, timer);
182 158 pool.execute(new Producer(q, barrier, iters * nconsumers));
183 159 for (int i = 0; i < nconsumers; ++i) {
184 160 pool.execute(new Consumer(q, barrier, iters));
185 161 }
186 162 barrier.await();
187 163 barrier.await();
188 164 long time = timer.getTime();
189 165 if (print)
190 166 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nconsumers)) + " ns per transfer");
191 167 }
192 168
193 169 }
↓ open down ↓ |
65 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX