1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 */ 33 34 /* 35 * @test 36 * @bug 4486658 37 * @compile -source 1.5 CancelledProducerConsumerLoops.java 38 * @run main/timeout=7000 CancelledProducerConsumerLoops 39 * @summary Checks for responsiveness of blocking queues to cancellation. 40 * Runs under the assumption that ITERS computations require more than 41 * TIMEOUT msecs to complete. 42 */ 43 44 import java.util.concurrent.*; 45 46 public class CancelledProducerConsumerLoops { 47 static final int CAPACITY = 100; 48 static final long TIMEOUT = 100; 49 50 static final ExecutorService pool = Executors.newCachedThreadPool(); 51 static boolean print = false; 52 53 public static void main(String[] args) throws Exception { 54 int maxPairs = 8; 55 int iters = 1000000; 56 57 if (args.length > 0) 58 maxPairs = Integer.parseInt(args[0]); 59 60 print = true; 61 62 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) { 63 System.out.println("Pairs:" + i); 64 try { 65 oneTest(i, iters); 66 } 67 catch (BrokenBarrierException bb) { 68 // OK, ignore 69 } 70 Thread.sleep(100); 71 } 72 pool.shutdown(); 73 if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) 74 throw new Error(); 75 } 76 77 static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception { 78 if (print) 79 System.out.printf("%-18s", q.getClass().getSimpleName()); 80 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); 81 CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer); 82 Future<?>[] prods = new Future<?>[npairs]; 83 Future<?>[] cons = new Future<?>[npairs]; 84 85 for (int i = 0; i < npairs; ++i) { 86 prods[i] = pool.submit(new Producer(q, barrier, iters)); 87 cons[i] = pool.submit(new Consumer(q, barrier, iters)); 88 } 89 barrier.await(); 90 Thread.sleep(TIMEOUT); 91 boolean tooLate = false; 92 93 for (int i = 1; i < npairs; ++i) { 94 if (!prods[i].cancel(true)) 95 tooLate = true; 96 if (!cons[i].cancel(true)) 97 tooLate = true; 98 } 99 100 Object p0 = prods[0].get(); 101 Object c0 = cons[0].get(); 102 103 if (!tooLate) { 104 for (int i = 1; i < npairs; ++i) { 105 if (!prods[i].isDone() || !prods[i].isCancelled()) 106 throw new Error("Only one producer thread should complete"); 107 if (!cons[i].isDone() || !cons[i].isCancelled()) 108 throw new Error("Only one consumer thread should complete"); 109 } 110 } 111 else 112 System.out.print("(cancelled too late) "); 113 114 long endTime = System.nanoTime(); 115 long time = endTime - timer.startTime; 116 if (print) { 117 double secs = (double)(time) / 1000000000.0; 118 System.out.println("\t " + secs + "s run time"); 119 } 120 } 121 122 static void oneTest(int pairs, int iters) throws Exception { 123 124 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters); 125 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters); 126 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters); 127 oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8); 128 129 /* unbounded queue implementations are prone to OOME 130 oneRun(new PriorityBlockingQueue<Integer>(iters / 2 * pairs), pairs, iters / 4); 131 oneRun(new LinkedTransferQueue<Integer>(), pairs, iters); 132 */ 133 } 134 135 abstract static class Stage implements Callable<Integer> { 136 final BlockingQueue<Integer> queue; 137 final CyclicBarrier barrier; 138 final int iters; 139 Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 140 queue = q; 141 barrier = b; 142 this.iters = iters; 143 } 144 } 145 146 static class Producer extends Stage { 147 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 148 super(q, b, iters); 149 } 150 151 public Integer call() throws Exception { 152 barrier.await(); 153 int s = 0; 154 int l = 4321; 155 for (int i = 0; i < iters; ++i) { 156 l = LoopHelpers.compute1(l); 157 s += LoopHelpers.compute2(l); 158 if (!queue.offer(new Integer(l), 1, TimeUnit.SECONDS)) 159 break; 160 } 161 return new Integer(s); 162 } 163 } 164 165 static class Consumer extends Stage { 166 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) { 167 super(q, b, iters); 168 } 169 170 public Integer call() throws Exception { 171 barrier.await(); 172 int l = 0; 173 int s = 0; 174 for (int i = 0; i < iters; ++i) { 175 Integer x = queue.poll(1, TimeUnit.SECONDS); 176 if (x == null) 177 break; 178 l = LoopHelpers.compute1(x.intValue()); 179 s += l; 180 } 181 return new Integer(s); 182 } 183 } 184 }