1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 */ 33 34 /* 35 * @test 36 * @bug 4486658 37 * @summary Checks for responsiveness of blocking queues to cancellation. 38 * @library /lib/testlibrary/ 39 */ 40 41 import static java.util.concurrent.TimeUnit.MILLISECONDS; 42 43 import java.util.ArrayList; 44 import java.util.List; 45 import java.util.concurrent.ArrayBlockingQueue; 46 import java.util.concurrent.BlockingQueue; 47 import java.util.concurrent.Callable; 48 import java.util.concurrent.CancellationException; 49 import java.util.concurrent.CountDownLatch; 50 import java.util.concurrent.CyclicBarrier; 51 import java.util.concurrent.ExecutorService; 52 import java.util.concurrent.Executors; 53 import java.util.concurrent.Future; 54 import java.util.concurrent.LinkedBlockingDeque; 55 import java.util.concurrent.LinkedBlockingQueue; 56 import java.util.concurrent.SynchronousQueue; 57 import java.util.concurrent.ThreadLocalRandom; 58 import java.util.concurrent.TimeUnit; 59 import jdk.testlibrary.Utils; 60 61 public class CancelledProducerConsumerLoops { 62 static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000); 63 static ExecutorService pool; 64 65 public static void main(String[] args) throws Exception { 66 final int maxPairs = (args.length > 0) ? Integer.parseInt(args[0]) : 5; 67 68 pool = Executors.newCachedThreadPool(); 69 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) { 70 final List<BlockingQueue<Integer>> queues = new ArrayList<>(); 71 queues.add(new ArrayBlockingQueue<Integer>(100)); 72 queues.add(new LinkedBlockingQueue<Integer>(100)); 73 queues.add(new LinkedBlockingDeque<Integer>(100)); 74 queues.add(new SynchronousQueue<Integer>()); 75 // unbounded queue implementations are prone to OOME: 76 // PriorityBlockingQueue, LinkedTransferQueue 77 for (BlockingQueue<Integer> queue : queues) 78 new CancelledProducerConsumerLoops(i, queue).run(); 79 } 80 pool.shutdown(); 81 if (! pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS)) 82 throw new AssertionError("timed out"); 83 pool = null; 84 } 85 86 final int npairs; 87 final BlockingQueue<Integer> queue; 88 final CountDownLatch producersInterrupted; 89 final CountDownLatch consumersInterrupted; 90 final LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); 91 final CyclicBarrier barrier; 92 volatile boolean done = false; 93 94 CancelledProducerConsumerLoops(int npairs, BlockingQueue<Integer> queue) { 95 this.npairs = npairs; 96 this.queue = queue; 97 this.producersInterrupted = new CountDownLatch(npairs - 1); 98 this.consumersInterrupted = new CountDownLatch(npairs - 1); 99 this.barrier = new CyclicBarrier(npairs * 2 + 1, timer); 100 } 101 102 void run() throws Exception { 103 Future<?>[] prods = new Future<?>[npairs]; 104 Future<?>[] cons = new Future<?>[npairs]; 105 106 for (int i = 0; i < npairs; i++) { 107 prods[i] = pool.submit(new Producer()); 108 cons[i] = pool.submit(new Consumer()); 109 } 110 barrier.await(); 111 Thread.sleep(ThreadLocalRandom.current().nextInt(5)); 112 113 for (int i = 1; i < npairs; i++) { 114 if (!prods[i].cancel(true) || 115 !cons[i].cancel(true)) 116 throw new AssertionError("completed before done"); 117 } 118 119 for (int i = 1; i < npairs; i++) { 120 assertCancelled(prods[i]); 121 assertCancelled(cons[i]); 122 } 123 124 if (!producersInterrupted.await(LONG_DELAY_MS, MILLISECONDS)) 125 throw new AssertionError("timed out"); 126 if (!consumersInterrupted.await(LONG_DELAY_MS, MILLISECONDS)) 127 throw new AssertionError("timed out"); 128 if (prods[0].isDone() || prods[0].isCancelled()) 129 throw new AssertionError("completed too early"); 130 131 done = true; 132 133 if (! (prods[0].get(LONG_DELAY_MS, MILLISECONDS) instanceof Integer)) 134 throw new AssertionError("expected Integer"); 135 if (! (cons[0].get(LONG_DELAY_MS, MILLISECONDS) instanceof Integer)) 136 throw new AssertionError("expected Integer"); 137 } 138 139 void assertCancelled(Future<?> future) throws Exception { 140 if (!future.isDone()) 141 throw new AssertionError("not done"); 142 if (!future.isCancelled()) 143 throw new AssertionError("not cancelled"); 144 try { 145 future.get(LONG_DELAY_MS, MILLISECONDS); 146 throw new AssertionError("should throw CancellationException"); 147 } catch (CancellationException success) {} 148 } 149 150 class Producer implements Callable<Integer> { 151 public Integer call() throws Exception { 152 barrier.await(); 153 int sum = 0; 154 try { 155 int x = 4321; 156 while (!done) { 157 if (Thread.interrupted()) throw new InterruptedException(); 158 x = LoopHelpers.compute1(x); 159 sum += LoopHelpers.compute2(x); 160 queue.offer(new Integer(x), 1, TimeUnit.MILLISECONDS); 161 } 162 } catch (InterruptedException cancelled) { 163 producersInterrupted.countDown(); 164 } 165 return sum; 166 } 167 } 168 169 class Consumer implements Callable<Integer> { 170 public Integer call() throws Exception { 171 barrier.await(); 172 int sum = 0; 173 try { 174 while (!done) { 175 Integer x = queue.poll(1, TimeUnit.MILLISECONDS); 176 if (x != null) 177 sum += LoopHelpers.compute1(x.intValue()); 178 } 179 } catch (InterruptedException cancelled) { 180 consumersInterrupted.countDown(); 181 } 182 return sum; 183 } 184 } 185 }