1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 19 * CA 95054 USA or visit www.sun.com if you need additional information or 20 * have any questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/licenses/publicdomain 32 */ 33 34 /* 35 * @test 36 * @bug 4486658 37 * @compile CancelledProducerConsumerLoops.java 38 * @run main/timeout=7000 CancelledProducerConsumerLoops 39 * @summary Checks for responsiveness of blocking queues to cancellation. 40 * Runs under the assumption that ITERS computations require more than 41 * TIMEOUT msecs to complete. 42 */ 43 44 import java.util.concurrent.*; 45 46 public class CancelledProducerConsumerLoops { 47 static final int CAPACITY = 100; 48 static final long TIMEOUT = 100; 49 50 static final ExecutorService pool = Executors.newCachedThreadPool(); 51 static boolean print = false; 52 53 public static void main(String[] args) throws Exception { 54 int maxPairs = 8; 55 int iters = 1000000; 56 57 if (args.length > 0) 58 maxPairs = Integer.parseInt(args[0]); 59 60 print = true; 61 62 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) { 63 System.out.println("Pairs:" + i); 64 try { 65 oneTest(i, iters); 66 } 67 catch (BrokenBarrierException bb) { 68 // OK, ignore 69 } 70 Thread.sleep(100); 71 } 72 pool.shutdown(); 73 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) 74 throw new Error(); 75 } 76 77 static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception { 78 if (print) 79 System.out.printf("%-18s", q.getClass().getSimpleName()); 80 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); 81 CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer); 82 Future<?>[] prods = new Future<?>[npairs]; 83 Future<?>[] cons = new Future<?>[npairs]; 84 85 for (int i = 0; i < npairs; ++i) { 86 prods[i] = pool.submit(new Producer(q, barrier, iters)); 87 cons[i] = pool.submit(new Consumer(q, barrier, iters)); 88 } 89 barrier.await(); 90 Thread.sleep(TIMEOUT); 91 boolean tooLate = false; 92 93 for (int i = 1; i < npairs; ++i) { 94 if (!prods[i].cancel(true)) 95 tooLate = true; 96 if (!cons[i].cancel(true)) 97 tooLate = true; 98 } 99 100 Object p0 = prods[0].get(); 101 Object c0 = cons[0].get(); 102 103 if (!tooLate) { 104 for (int i = 1; i < npairs; ++i) { 105 if (!prods[i].isDone() || !prods[i].isCancelled()) 106 throw new Error("Only one producer thread should complete"); 107 if (!cons[i].isDone() || !cons[i].isCancelled()) 108 throw new Error("Only one consumer thread should complete"); 109 } 110 } 111 else 112 System.out.print("(cancelled too late) "); 113 114 long endTime = System.nanoTime(); 115 long time = endTime - timer.startTime; 116 if (print) { 117 double secs = (double)(time) / 1000000000.0; 118 System.out.println("\t " + secs + "s run time"); 119 } 120 } 121 122 static final class LTQasSQ<T> extends LinkedTransferQueue<T> { 123 LTQasSQ() { super(); } 124 public void put(T x) { 125 try { super.transfer(x); } 126 catch (InterruptedException ex) { throw new Error(); } 127 } 128 private final static long serialVersionUID = 42; 129 } 130 131 static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> { 132 HalfSyncLTQ() { super(); } 133 public void put(T x) { 134 if (ThreadLocalRandom.current().nextBoolean()) 135 super.put(x); 136 else { 137 try { super.transfer(x); } 138 catch (InterruptedException ex) { throw new Error(); } 139 } 140 } 141 private final static long serialVersionUID = 42; 142 } 143 144 static void oneTest(int pairs, int iters) throws Exception { 145 146 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters); 147 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters); 148 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters); 149 oneRun(new LinkedTransferQueue<Integer>(), pairs, iters); 150 oneRun(new LTQasSQ<Integer>(), pairs, iters); 151 oneRun(new HalfSyncLTQ<Integer>(), pairs, iters); 152 oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8); 153 154 /* PriorityBlockingQueue is unbounded 155 oneRun(new PriorityBlockingQueue<Integer>(iters / 2 * pairs), pairs, iters / 4); 156 */ 157 } 158 159 static abstract class Stage implements Callable<Integer> { 160 final BlockingQueue<Integer> queue; 161 final CyclicBarrier barrier; 162 final int iters; 163 Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 164 queue = q; 165 barrier = b; 166 this.iters = iters; 167 } 168 } 169 170 static class Producer extends Stage { 171 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 172 super(q, b, iters); 173 } 174 175 public Integer call() throws Exception { 176 barrier.await(); 177 int s = 0; 178 int l = 4321; 179 for (int i = 0; i < iters; ++i) { 180 l = LoopHelpers.compute1(l); 181 s += LoopHelpers.compute2(l); 182 if (!queue.offer(new Integer(l), 1, TimeUnit.SECONDS)) 183 break; 184 } 185 return new Integer(s); 186 } 187 } 188 189 static class Consumer extends Stage { 190 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 191 super(q, b, iters); 192 } 193 194 public Integer call() throws Exception { 195 barrier.await(); 196 int l = 0; 197 int s = 0; 198 for (int i = 0; i < iters; ++i) { 199 Integer x = queue.poll(1, TimeUnit.SECONDS); 200 if (x == null) 201 break; 202 l = LoopHelpers.compute1(x.intValue()); 203 s += l; 204 } 205 return new Integer(s); 206 } 207 } 208 }