1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 19 * CA 95054 USA or visit www.sun.com if you need additional information or 20 * have any questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/licenses/publicdomain 32 */ 33 34 /* 35 * @test 36 * @bug 4486658 37 * @compile ProducerConsumerLoops.java 38 * @run main/timeout=3600 ProducerConsumerLoops 39 * @summary multiple producers and consumers using blocking queues 40 */ 41 42 import java.util.concurrent.*; 43 44 public class ProducerConsumerLoops { 45 static final int CAPACITY = 100; 46 47 static final ExecutorService pool = Executors.newCachedThreadPool(); 48 static boolean print = false; 49 static int producerSum; 50 static int consumerSum; 51 static synchronized void addProducerSum(int x) { 52 producerSum += x; 53 } 54 55 static synchronized void addConsumerSum(int x) { 56 consumerSum += x; 57 } 58 59 static synchronized void checkSum() { 60 if (producerSum != consumerSum) 61 throw new Error("CheckSum mismatch"); 62 } 63 64 public static void main(String[] args) throws Exception { 65 int maxPairs = 8; 66 int iters = 10000; 67 68 if (args.length > 0) 69 maxPairs = Integer.parseInt(args[0]); 70 71 print = false; 72 System.out.println("Warmup..."); 73 oneTest(1, 10000); 74 Thread.sleep(100); 75 oneTest(2, 10000); 76 Thread.sleep(100); 77 print = true; 78 79 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) { 80 System.out.println("----------------------------------------"); 81 System.out.println("Pairs: " + i); 82 oneTest(i, iters); 83 Thread.sleep(100); 84 } 85 pool.shutdown(); 86 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) 87 throw new Error(); 88 } 89 90 static final class LTQasSQ<T> extends LinkedTransferQueue<T> { 91 LTQasSQ() { super(); } 92 public void put(T x) { 93 try { super.transfer(x); } 94 catch (InterruptedException ex) { throw new Error(); } 95 } 96 private final static long serialVersionUID = 42; 97 } 98 99 static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> { 100 HalfSyncLTQ() { super(); } 101 public void put(T x) { 102 if (ThreadLocalRandom.current().nextBoolean()) 103 super.put(x); 104 else { 105 try { super.transfer(x); } 106 catch (InterruptedException ex) { throw new Error(); } 107 } 108 } 109 private final static long serialVersionUID = 42; 110 } 111 112 static void oneTest(int pairs, int iters) throws Exception { 113 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters); 114 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters); 115 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters); 116 oneRun(new LinkedTransferQueue<Integer>(), pairs, iters); 117 oneRun(new LTQasSQ<Integer>(), pairs, iters); 118 oneRun(new HalfSyncLTQ<Integer>(), pairs, iters); 119 oneRun(new PriorityBlockingQueue<Integer>(), pairs, iters); 120 oneRun(new SynchronousQueue<Integer>(), pairs, iters); 121 122 if (print) 123 System.out.println("fair implementations:"); 124 125 oneRun(new SynchronousQueue<Integer>(true), pairs, iters); 126 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, iters); 127 } 128 129 static abstract class Stage implements Runnable { 130 final int iters; 131 final BlockingQueue<Integer> queue; 132 final CyclicBarrier barrier; 133 Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 134 queue = q; 135 barrier = b; 136 this.iters = iters; 137 } 138 } 139 140 static class Producer extends Stage { 141 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 142 super(q, b, iters); 143 } 144 145 public void run() { 146 try { 147 barrier.await(); 148 int s = 0; 149 int l = hashCode(); 150 for (int i = 0; i < iters; ++i) { 151 l = LoopHelpers.compute2(l); 152 queue.put(new Integer(l)); 153 s += LoopHelpers.compute1(l); 154 } 155 addProducerSum(s); 156 barrier.await(); 157 } 158 catch (Exception ie) { 159 ie.printStackTrace(); 160 return; 161 } 162 } 163 } 164 165 static class Consumer extends Stage { 166 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 167 super(q, b, iters); 168 } 169 170 public void run() { 171 try { 172 barrier.await(); 173 int l = 0; 174 int s = 0; 175 for (int i = 0; i < iters; ++i) { 176 l = LoopHelpers.compute1(queue.take().intValue()); 177 s += l; 178 } 179 addConsumerSum(s); 180 barrier.await(); 181 } 182 catch (Exception ie) { 183 ie.printStackTrace(); 184 return; 185 } 186 } 187 188 } 189 190 static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception { 191 if (print) 192 System.out.printf("%-18s", q.getClass().getSimpleName()); 193 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); 194 CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer); 195 for (int i = 0; i < npairs; ++i) { 196 pool.execute(new Producer(q, barrier, iters)); 197 pool.execute(new Consumer(q, barrier, iters)); 198 } 199 barrier.await(); 200 barrier.await(); 201 long time = timer.getTime(); 202 checkSum(); 203 if (print) 204 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * npairs)) + " ns per transfer"); 205 } 206 207 }