17 *
18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 * or visit www.oracle.com if you need additional information or have any
20 * questions.
21 */
22
23 /*
24 * This file is available under and governed by the GNU General Public
25 * License version 2 only, as published by the Free Software Foundation.
26 * However, the following notice accompanied the original version of this
27 * file:
28 *
29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 * Expert Group and released to the public domain, as explained at
31 * http://creativecommons.org/licenses/publicdomain
32 */
33
34 /*
35 * @test
36 * @bug 4486658
37 * @compile ProducerConsumerLoops.java
38 * @run main/timeout=3600 ProducerConsumerLoops
39 * @summary multiple producers and consumers using blocking queues
40 */
41
42 import java.util.concurrent.*;
43
44 public class ProducerConsumerLoops {
45 static final int CAPACITY = 100;
46
47 static final ExecutorService pool = Executors.newCachedThreadPool();
48 static boolean print = false;
49 static int producerSum;
50 static int consumerSum;
51 static synchronized void addProducerSum(int x) {
52 producerSum += x;
53 }
54
55 static synchronized void addConsumerSum(int x) {
56 consumerSum += x;
57 }
70
71 print = false;
72 System.out.println("Warmup...");
73 oneTest(1, 10000);
74 Thread.sleep(100);
75 oneTest(2, 10000);
76 Thread.sleep(100);
77 print = true;
78
79 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
80 System.out.println("----------------------------------------");
81 System.out.println("Pairs: " + i);
82 oneTest(i, iters);
83 Thread.sleep(100);
84 }
85 pool.shutdown();
86 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
87 throw new Error();
88 }
89
90 static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
91 LTQasSQ() { super(); }
92 public void put(T x) {
93 try { super.transfer(x); }
94 catch (InterruptedException ex) { throw new Error(); }
95 }
96 private final static long serialVersionUID = 42;
97 }
98
99 static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
100 HalfSyncLTQ() { super(); }
101 public void put(T x) {
102 if (ThreadLocalRandom.current().nextBoolean())
103 super.put(x);
104 else {
105 try { super.transfer(x); }
106 catch (InterruptedException ex) { throw new Error(); }
107 }
108 }
109 private final static long serialVersionUID = 42;
110 }
111
112 static void oneTest(int pairs, int iters) throws Exception {
113 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
114 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
115 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
116 oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
117 oneRun(new LTQasSQ<Integer>(), pairs, iters);
118 oneRun(new HalfSyncLTQ<Integer>(), pairs, iters);
119 oneRun(new PriorityBlockingQueue<Integer>(), pairs, iters);
120 oneRun(new SynchronousQueue<Integer>(), pairs, iters);
121
122 if (print)
123 System.out.println("fair implementations:");
124
125 oneRun(new SynchronousQueue<Integer>(true), pairs, iters);
126 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, iters);
127 }
128
129 static abstract class Stage implements Runnable {
130 final int iters;
131 final BlockingQueue<Integer> queue;
132 final CyclicBarrier barrier;
133 Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
134 queue = q;
135 barrier = b;
136 this.iters = iters;
137 }
138 }
139
140 static class Producer extends Stage {
141 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
142 super(q, b, iters);
143 }
144
145 public void run() {
146 try {
147 barrier.await();
148 int s = 0;
149 int l = hashCode();
150 for (int i = 0; i < iters; ++i) {
151 l = LoopHelpers.compute2(l);
152 queue.put(new Integer(l));
153 s += LoopHelpers.compute1(l);
|
17 *
18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 * or visit www.oracle.com if you need additional information or have any
20 * questions.
21 */
22
23 /*
24 * This file is available under and governed by the GNU General Public
25 * License version 2 only, as published by the Free Software Foundation.
26 * However, the following notice accompanied the original version of this
27 * file:
28 *
29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 * Expert Group and released to the public domain, as explained at
31 * http://creativecommons.org/licenses/publicdomain
32 */
33
34 /*
35 * @test
36 * @bug 4486658
37 * @compile -source 1.5 ProducerConsumerLoops.java
38 * @run main/timeout=3600 ProducerConsumerLoops
39 * @summary multiple producers and consumers using blocking queues
40 */
41
42 import java.util.concurrent.*;
43
44 public class ProducerConsumerLoops {
45 static final int CAPACITY = 100;
46
47 static final ExecutorService pool = Executors.newCachedThreadPool();
48 static boolean print = false;
49 static int producerSum;
50 static int consumerSum;
51 static synchronized void addProducerSum(int x) {
52 producerSum += x;
53 }
54
55 static synchronized void addConsumerSum(int x) {
56 consumerSum += x;
57 }
70
71 print = false;
72 System.out.println("Warmup...");
73 oneTest(1, 10000);
74 Thread.sleep(100);
75 oneTest(2, 10000);
76 Thread.sleep(100);
77 print = true;
78
79 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
80 System.out.println("----------------------------------------");
81 System.out.println("Pairs: " + i);
82 oneTest(i, iters);
83 Thread.sleep(100);
84 }
85 pool.shutdown();
86 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
87 throw new Error();
88 }
89
90 static void oneTest(int pairs, int iters) throws Exception {
91 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
92 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
93 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
94 oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
95 oneRun(new PriorityBlockingQueue<Integer>(), pairs, iters);
96 oneRun(new SynchronousQueue<Integer>(), pairs, iters);
97
98 if (print)
99 System.out.println("fair implementations:");
100
101 oneRun(new SynchronousQueue<Integer>(true), pairs, iters);
102 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, iters);
103 }
104
105 abstract static class Stage implements Runnable {
106 final int iters;
107 final BlockingQueue<Integer> queue;
108 final CyclicBarrier barrier;
109 Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
110 queue = q;
111 barrier = b;
112 this.iters = iters;
113 }
114 }
115
116 static class Producer extends Stage {
117 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
118 super(q, b, iters);
119 }
120
121 public void run() {
122 try {
123 barrier.await();
124 int s = 0;
125 int l = hashCode();
126 for (int i = 0; i < iters; ++i) {
127 l = LoopHelpers.compute2(l);
128 queue.put(new Integer(l));
129 s += LoopHelpers.compute1(l);
|