Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/test/java/util/concurrent/BlockingQueue/CancelledProducerConsumerLoops.java
+++ new/test/java/util/concurrent/BlockingQueue/CancelledProducerConsumerLoops.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation.
7 7 *
8 8 * This code is distributed in the hope that it will be useful, but WITHOUT
9 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
11 11 * version 2 for more details (a copy is included in the LICENSE file that
12 12 * accompanied this code).
13 13 *
14 14 * You should have received a copy of the GNU General Public License version
15 15 * 2 along with this work; if not, write to the Free Software Foundation,
16 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17 17 *
18 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 19 * or visit www.oracle.com if you need additional information or have any
20 20 * questions.
21 21 */
22 22
23 23 /*
24 24 * This file is available under and governed by the GNU General Public
25 25 * License version 2 only, as published by the Free Software Foundation.
26 26 * However, the following notice accompanied the original version of this
↓ open down ↓ |
26 lines elided |
↑ open up ↑ |
27 27 * file:
28 28 *
29 29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 30 * Expert Group and released to the public domain, as explained at
31 31 * http://creativecommons.org/licenses/publicdomain
32 32 */
33 33
34 34 /*
35 35 * @test
36 36 * @bug 4486658
37 - * @compile CancelledProducerConsumerLoops.java
37 + * @compile -source 1.5 CancelledProducerConsumerLoops.java
38 38 * @run main/timeout=7000 CancelledProducerConsumerLoops
39 39 * @summary Checks for responsiveness of blocking queues to cancellation.
40 40 * Runs under the assumption that ITERS computations require more than
41 41 * TIMEOUT msecs to complete.
42 42 */
43 43
44 44 import java.util.concurrent.*;
45 45
46 46 public class CancelledProducerConsumerLoops {
47 47 static final int CAPACITY = 100;
48 48 static final long TIMEOUT = 100;
49 49
50 50 static final ExecutorService pool = Executors.newCachedThreadPool();
51 51 static boolean print = false;
52 52
53 53 public static void main(String[] args) throws Exception {
54 54 int maxPairs = 8;
55 55 int iters = 1000000;
56 56
57 57 if (args.length > 0)
58 58 maxPairs = Integer.parseInt(args[0]);
59 59
60 60 print = true;
61 61
62 62 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
63 63 System.out.println("Pairs:" + i);
64 64 try {
65 65 oneTest(i, iters);
66 66 }
67 67 catch (BrokenBarrierException bb) {
68 68 // OK, ignore
69 69 }
70 70 Thread.sleep(100);
71 71 }
72 72 pool.shutdown();
73 73 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
74 74 throw new Error();
75 75 }
76 76
77 77 static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
78 78 if (print)
79 79 System.out.printf("%-18s", q.getClass().getSimpleName());
80 80 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
81 81 CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
82 82 Future<?>[] prods = new Future<?>[npairs];
83 83 Future<?>[] cons = new Future<?>[npairs];
84 84
85 85 for (int i = 0; i < npairs; ++i) {
86 86 prods[i] = pool.submit(new Producer(q, barrier, iters));
87 87 cons[i] = pool.submit(new Consumer(q, barrier, iters));
88 88 }
89 89 barrier.await();
90 90 Thread.sleep(TIMEOUT);
91 91 boolean tooLate = false;
92 92
93 93 for (int i = 1; i < npairs; ++i) {
94 94 if (!prods[i].cancel(true))
95 95 tooLate = true;
96 96 if (!cons[i].cancel(true))
97 97 tooLate = true;
98 98 }
99 99
100 100 Object p0 = prods[0].get();
101 101 Object c0 = cons[0].get();
102 102
103 103 if (!tooLate) {
104 104 for (int i = 1; i < npairs; ++i) {
105 105 if (!prods[i].isDone() || !prods[i].isCancelled())
106 106 throw new Error("Only one producer thread should complete");
107 107 if (!cons[i].isDone() || !cons[i].isCancelled())
108 108 throw new Error("Only one consumer thread should complete");
109 109 }
110 110 }
111 111 else
↓ open down ↓ |
64 lines elided |
↑ open up ↑ |
112 112 System.out.print("(cancelled too late) ");
113 113
114 114 long endTime = System.nanoTime();
115 115 long time = endTime - timer.startTime;
116 116 if (print) {
117 117 double secs = (double)(time) / 1000000000.0;
118 118 System.out.println("\t " + secs + "s run time");
119 119 }
120 120 }
121 121
122 - static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
123 - LTQasSQ() { super(); }
124 - public void put(T x) {
125 - try { super.transfer(x); }
126 - catch (InterruptedException ex) { throw new Error(); }
127 - }
128 - private final static long serialVersionUID = 42;
129 - }
130 -
131 - static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
132 - HalfSyncLTQ() { super(); }
133 - public void put(T x) {
134 - if (ThreadLocalRandom.current().nextBoolean())
135 - super.put(x);
136 - else {
137 - try { super.transfer(x); }
138 - catch (InterruptedException ex) { throw new Error(); }
139 - }
140 - }
141 - private final static long serialVersionUID = 42;
142 - }
143 -
144 122 static void oneTest(int pairs, int iters) throws Exception {
145 123
146 124 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
147 125 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
148 126 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
127 + oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
149 128 oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8);
150 129
151 - /* TODO: unbounded queue implementations are prone to OOME
130 + /* PriorityBlockingQueue is unbounded
152 131 oneRun(new PriorityBlockingQueue<Integer>(iters / 2 * pairs), pairs, iters / 4);
153 - oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
154 - oneRun(new LTQasSQ<Integer>(), pairs, iters);
155 - oneRun(new HalfSyncLTQ<Integer>(), pairs, iters);
156 132 */
157 133 }
158 134
159 - static abstract class Stage implements Callable<Integer> {
135 + abstract static class Stage implements Callable<Integer> {
160 136 final BlockingQueue<Integer> queue;
161 137 final CyclicBarrier barrier;
162 138 final int iters;
163 - Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
139 + Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
164 140 queue = q;
165 141 barrier = b;
166 142 this.iters = iters;
167 143 }
168 144 }
169 145
170 146 static class Producer extends Stage {
171 147 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
172 148 super(q, b, iters);
173 149 }
174 150
175 151 public Integer call() throws Exception {
176 152 barrier.await();
177 153 int s = 0;
178 154 int l = 4321;
179 155 for (int i = 0; i < iters; ++i) {
180 156 l = LoopHelpers.compute1(l);
181 157 s += LoopHelpers.compute2(l);
182 158 if (!queue.offer(new Integer(l), 1, TimeUnit.SECONDS))
183 159 break;
184 160 }
185 161 return new Integer(s);
186 162 }
187 163 }
188 164
189 165 static class Consumer extends Stage {
190 166 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
191 167 super(q, b, iters);
192 168 }
193 169
194 170 public Integer call() throws Exception {
195 171 barrier.await();
196 172 int l = 0;
197 173 int s = 0;
198 174 for (int i = 0; i < iters; ++i) {
199 175 Integer x = queue.poll(1, TimeUnit.SECONDS);
200 176 if (x == null)
201 177 break;
202 178 l = LoopHelpers.compute1(x.intValue());
203 179 s += l;
204 180 }
205 181 return new Integer(s);
206 182 }
207 183 }
208 184 }
↓ open down ↓ |
35 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX