1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 19 * CA 95054 USA or visit www.sun.com if you need additional information or 20 * have any questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/licenses/publicdomain 32 */ 33 34 /* 35 * @test 36 * @bug 4486658 37 * @compile -source 1.5 MultipleProducersSingleConsumerLoops.java 38 * @run main/timeout=3600 MultipleProducersSingleConsumerLoops 39 * @summary multiple producers and single consumer using blocking queues 40 */ 41 42 import java.util.concurrent.*; 43 44 public class MultipleProducersSingleConsumerLoops { 45 static final int CAPACITY = 100; 46 static final ExecutorService pool = Executors.newCachedThreadPool(); 47 static boolean print = false; 48 static int producerSum; 49 static int consumerSum; 50 51 static synchronized void addProducerSum(int x) { 52 producerSum += x; 53 } 54 55 static synchronized void addConsumerSum(int x) { 56 consumerSum += x; 57 } 58 59 static synchronized void checkSum() { 60 if (producerSum != consumerSum) 61 throw new Error("CheckSum mismatch"); 62 } 63 64 public static void main(String[] args) throws Exception { 65 int maxProducers = 5; 66 int iters = 100000; 67 68 if (args.length > 0) 69 maxProducers = Integer.parseInt(args[0]); 70 71 print = false; 72 System.out.println("Warmup..."); 73 oneTest(1, 10000); 74 Thread.sleep(100); 75 oneTest(2, 10000); 76 Thread.sleep(100); 77 print = true; 78 79 for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) { 80 System.out.println("----------------------------------------"); 81 System.out.println("Producers:" + i); 82 oneTest(i, iters); 83 Thread.sleep(100); 84 } 85 pool.shutdown(); 86 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) 87 throw new Error(); 88 } 89 90 static final class LTQasSQ<T> extends LinkedTransferQueue<T> { 91 LTQasSQ() { super(); } 92 public void put(T x) { 93 try { super.transfer(x); } 94 catch (InterruptedException ex) { throw new Error(); } 95 } 96 private final static long serialVersionUID = 42; 97 } 98 99 static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> { 100 HalfSyncLTQ() { super(); } 101 public void put(T x) { 102 if (ThreadLocalRandom.current().nextBoolean()) 103 super.put(x); 104 else { 105 try { super.transfer(x); } 106 catch (InterruptedException ex) { throw new Error(); } 107 } 108 } 109 private final static long serialVersionUID = 42; 110 } 111 112 static void oneTest(int producers, int iters) throws Exception { 113 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters); 114 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters); 115 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), producers, iters); 116 oneRun(new LinkedTransferQueue<Integer>(), producers, iters); 117 oneRun(new LTQasSQ<Integer>(), producers, iters); 118 oneRun(new HalfSyncLTQ<Integer>(), producers, iters); 119 120 // Don't run PBQ since can legitimately run out of memory 121 // if (print) 122 // System.out.print("PriorityBlockingQueue "); 123 // oneRun(new PriorityBlockingQueue<Integer>(), producers, iters); 124 125 oneRun(new SynchronousQueue<Integer>(), producers, iters); 126 if (print) 127 System.out.println("fair implementations:"); 128 oneRun(new SynchronousQueue<Integer>(true), producers, iters); 129 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters); 130 } 131 132 static abstract class Stage implements Runnable { 133 final int iters; 134 final BlockingQueue<Integer> queue; 135 final CyclicBarrier barrier; 136 Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 137 queue = q; 138 barrier = b; 139 this.iters = iters; 140 } 141 } 142 143 static class Producer extends Stage { 144 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 145 super(q, b, iters); 146 } 147 148 public void run() { 149 try { 150 barrier.await(); 151 int s = 0; 152 int l = hashCode(); 153 for (int i = 0; i < iters; ++i) { 154 l = LoopHelpers.compute1(l); 155 l = LoopHelpers.compute2(l); 156 queue.put(new Integer(l)); 157 s += l; 158 } 159 addProducerSum(s); 160 barrier.await(); 161 } 162 catch (Exception ie) { 163 ie.printStackTrace(); 164 return; 165 } 166 } 167 } 168 169 static class Consumer extends Stage { 170 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 171 super(q, b, iters); 172 } 173 174 public void run() { 175 try { 176 barrier.await(); 177 int s = 0; 178 for (int i = 0; i < iters; ++i) { 179 s += queue.take().intValue(); 180 } 181 addConsumerSum(s); 182 barrier.await(); 183 } 184 catch (Exception ie) { 185 ie.printStackTrace(); 186 return; 187 } 188 } 189 190 } 191 192 static void oneRun(BlockingQueue<Integer> q, int nproducers, int iters) throws Exception { 193 if (print) 194 System.out.printf("%-18s", q.getClass().getSimpleName()); 195 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); 196 CyclicBarrier barrier = new CyclicBarrier(nproducers + 2, timer); 197 for (int i = 0; i < nproducers; ++i) { 198 pool.execute(new Producer(q, barrier, iters)); 199 } 200 pool.execute(new Consumer(q, barrier, iters * nproducers)); 201 barrier.await(); 202 barrier.await(); 203 long time = timer.getTime(); 204 checkSum(); 205 if (print) 206 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nproducers)) + " ns per transfer"); 207 } 208 209 }