28 *
29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 * Expert Group and released to the public domain, as explained at
31 * http://creativecommons.org/publicdomain/zero/1.0/
32 * Other contributors include Andrew Wright, Jeffrey Hayes,
33 * Pat Fisher, Mike Judd.
34 */
35
36 import static java.util.concurrent.TimeUnit.MILLISECONDS;
37
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collection;
41 import java.util.Iterator;
42 import java.util.NoSuchElementException;
43 import java.util.concurrent.BlockingQueue;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.Executors;
46 import java.util.concurrent.ExecutorService;
47 import java.util.concurrent.SynchronousQueue;
48 import java.util.concurrent.ThreadLocalRandom;
49
50 import junit.framework.Test;
51
52 public class SynchronousQueueTest extends JSR166TestCase {
53
54 public static class Fair extends BlockingQueueTest {
55 protected BlockingQueue emptyCollection() {
56 return new SynchronousQueue(true);
57 }
58 }
59
60 public static class NonFair extends BlockingQueueTest {
61 protected BlockingQueue emptyCollection() {
62 return new SynchronousQueue(false);
63 }
64 }
65
66 public static void main(String[] args) {
67 main(suite(), args);
68 }
149 final SynchronousQueue q = new SynchronousQueue(fair);
150 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
151 Thread t = newStartedThread(new CheckedRunnable() {
152 public void realRun() throws InterruptedException {
153 Thread.currentThread().interrupt();
154 try {
155 q.put(99);
156 shouldThrow();
157 } catch (InterruptedException success) {}
158 assertFalse(Thread.interrupted());
159
160 pleaseInterrupt.countDown();
161 try {
162 q.put(99);
163 shouldThrow();
164 } catch (InterruptedException success) {}
165 assertFalse(Thread.interrupted());
166 }});
167
168 await(pleaseInterrupt);
169 assertThreadBlocks(t, Thread.State.WAITING);
170 t.interrupt();
171 awaitTermination(t);
172 assertEquals(0, q.remainingCapacity());
173 }
174
175 /**
176 * put blocks interruptibly waiting for take
177 */
178 public void testPutWithTake() { testPutWithTake(false); }
179 public void testPutWithTake_fair() { testPutWithTake(true); }
180 public void testPutWithTake(boolean fair) {
181 final SynchronousQueue q = new SynchronousQueue(fair);
182 final CountDownLatch pleaseTake = new CountDownLatch(1);
183 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
184 Thread t = newStartedThread(new CheckedRunnable() {
185 public void realRun() throws InterruptedException {
186 pleaseTake.countDown();
187 q.put(one);
188
189 Thread.currentThread().interrupt();
190 try {
191 q.put(99);
192 shouldThrow();
193 } catch (InterruptedException success) {}
194 assertFalse(Thread.interrupted());
195
196 pleaseInterrupt.countDown();
197 try {
198 q.put(99);
199 shouldThrow();
200 } catch (InterruptedException success) {}
201 assertFalse(Thread.interrupted());
202 }});
203
204 await(pleaseTake);
205 assertEquals(0, q.remainingCapacity());
206 try { assertSame(one, q.take()); }
207 catch (InterruptedException e) { threadUnexpectedException(e); }
208
209 await(pleaseInterrupt);
210 assertThreadBlocks(t, Thread.State.WAITING);
211 t.interrupt();
212 awaitTermination(t);
213 assertEquals(0, q.remainingCapacity());
214 }
215
216 /**
217 * timed offer times out if elements not taken
218 */
219 public void testTimedOffer() {
220 final boolean fair = ThreadLocalRandom.current().nextBoolean();
221 final SynchronousQueue q = new SynchronousQueue(fair);
222 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
223 Thread t = newStartedThread(new CheckedRunnable() {
224 public void realRun() throws InterruptedException {
225 long startTime = System.nanoTime();
226 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
227 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
228
229 Thread.currentThread().interrupt();
230 try {
231 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
232 shouldThrow();
233 } catch (InterruptedException success) {}
234 assertFalse(Thread.interrupted());
235
236 pleaseInterrupt.countDown();
237 try {
238 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
239 shouldThrow();
240 } catch (InterruptedException success) {}
241 assertFalse(Thread.interrupted());
242 }});
243
244 await(pleaseInterrupt);
245 assertThreadBlocks(t, Thread.State.TIMED_WAITING);
246 t.interrupt();
247 awaitTermination(t);
248 }
249
250 /**
251 * poll return null if no active putter
252 */
253 public void testPoll() { testPoll(false); }
254 public void testPoll_fair() { testPoll(true); }
255 public void testPoll(boolean fair) {
256 final SynchronousQueue q = new SynchronousQueue(fair);
257 assertNull(q.poll());
258 }
259
260 /**
261 * timed poll with zero timeout times out if no active putter
262 */
263 public void testTimedPoll0() { testTimedPoll0(false); }
264 public void testTimedPoll0_fair() { testTimedPoll0(true); }
265 public void testTimedPoll0(boolean fair) {
266 final SynchronousQueue q = new SynchronousQueue(fair);
267 try { assertNull(q.poll(0, MILLISECONDS)); }
268 catch (InterruptedException e) { threadUnexpectedException(e); }
269 }
270
271 /**
272 * timed poll with nonzero timeout times out if no active putter
273 */
274 public void testTimedPoll() {
275 final boolean fair = ThreadLocalRandom.current().nextBoolean();
276 final SynchronousQueue q = new SynchronousQueue(fair);
277 final long startTime = System.nanoTime();
278 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); }
279 catch (InterruptedException e) { threadUnexpectedException(e); }
280 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
281 }
282
283 /**
284 * timed poll before a delayed offer times out, returning null;
285 * after offer succeeds; on interruption throws
286 */
287 public void testTimedPollWithOffer() {
288 final boolean fair = ThreadLocalRandom.current().nextBoolean();
289 final SynchronousQueue q = new SynchronousQueue(fair);
290 final CountDownLatch pleaseOffer = new CountDownLatch(1);
291 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
292 Thread t = newStartedThread(new CheckedRunnable() {
293 public void realRun() throws InterruptedException {
294 long startTime = System.nanoTime();
295 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
296 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
297
298 pleaseOffer.countDown();
299 startTime = System.nanoTime();
300 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
301
302 Thread.currentThread().interrupt();
303 try {
304 q.poll(LONG_DELAY_MS, MILLISECONDS);
305 shouldThrow();
306 } catch (InterruptedException success) {}
307 assertFalse(Thread.interrupted());
308
309 pleaseInterrupt.countDown();
310 try {
311 q.poll(LONG_DELAY_MS, MILLISECONDS);
312 shouldThrow();
313 } catch (InterruptedException success) {}
314 assertFalse(Thread.interrupted());
315
316 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
317 }});
318
319 await(pleaseOffer);
320 long startTime = System.nanoTime();
321 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); }
322 catch (InterruptedException e) { threadUnexpectedException(e); }
323 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
324
325 await(pleaseInterrupt);
326 assertThreadBlocks(t, Thread.State.TIMED_WAITING);
327 t.interrupt();
328 awaitTermination(t);
329 }
330
331 /**
332 * peek() returns null if no active putter
333 */
334 public void testPeek() { testPeek(false); }
335 public void testPeek_fair() { testPeek(true); }
336 public void testPeek(boolean fair) {
337 final SynchronousQueue q = new SynchronousQueue(fair);
338 assertNull(q.peek());
339 }
340
341 /**
342 * element() throws NoSuchElementException if no active putter
343 */
344 public void testElement() { testElement(false); }
345 public void testElement_fair() { testElement(true); }
346 public void testElement(boolean fair) {
|
28 *
29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 * Expert Group and released to the public domain, as explained at
31 * http://creativecommons.org/publicdomain/zero/1.0/
32 * Other contributors include Andrew Wright, Jeffrey Hayes,
33 * Pat Fisher, Mike Judd.
34 */
35
36 import static java.util.concurrent.TimeUnit.MILLISECONDS;
37
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collection;
41 import java.util.Iterator;
42 import java.util.NoSuchElementException;
43 import java.util.concurrent.BlockingQueue;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.Executors;
46 import java.util.concurrent.ExecutorService;
47 import java.util.concurrent.SynchronousQueue;
48
49 import junit.framework.Test;
50
51 public class SynchronousQueueTest extends JSR166TestCase {
52
53 public static class Fair extends BlockingQueueTest {
54 protected BlockingQueue emptyCollection() {
55 return new SynchronousQueue(true);
56 }
57 }
58
59 public static class NonFair extends BlockingQueueTest {
60 protected BlockingQueue emptyCollection() {
61 return new SynchronousQueue(false);
62 }
63 }
64
65 public static void main(String[] args) {
66 main(suite(), args);
67 }
148 final SynchronousQueue q = new SynchronousQueue(fair);
149 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
150 Thread t = newStartedThread(new CheckedRunnable() {
151 public void realRun() throws InterruptedException {
152 Thread.currentThread().interrupt();
153 try {
154 q.put(99);
155 shouldThrow();
156 } catch (InterruptedException success) {}
157 assertFalse(Thread.interrupted());
158
159 pleaseInterrupt.countDown();
160 try {
161 q.put(99);
162 shouldThrow();
163 } catch (InterruptedException success) {}
164 assertFalse(Thread.interrupted());
165 }});
166
167 await(pleaseInterrupt);
168 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING);
169 t.interrupt();
170 awaitTermination(t);
171 assertEquals(0, q.remainingCapacity());
172 }
173
174 /**
175 * put blocks interruptibly waiting for take
176 */
177 public void testPutWithTake() { testPutWithTake(false); }
178 public void testPutWithTake_fair() { testPutWithTake(true); }
179 public void testPutWithTake(boolean fair) {
180 final SynchronousQueue q = new SynchronousQueue(fair);
181 final CountDownLatch pleaseTake = new CountDownLatch(1);
182 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
183 Thread t = newStartedThread(new CheckedRunnable() {
184 public void realRun() throws InterruptedException {
185 pleaseTake.countDown();
186 q.put(one);
187
188 Thread.currentThread().interrupt();
189 try {
190 q.put(99);
191 shouldThrow();
192 } catch (InterruptedException success) {}
193 assertFalse(Thread.interrupted());
194
195 pleaseInterrupt.countDown();
196 try {
197 q.put(99);
198 shouldThrow();
199 } catch (InterruptedException success) {}
200 assertFalse(Thread.interrupted());
201 }});
202
203 await(pleaseTake);
204 assertEquals(0, q.remainingCapacity());
205 try { assertSame(one, q.take()); }
206 catch (InterruptedException e) { threadUnexpectedException(e); }
207
208 await(pleaseInterrupt);
209 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING);
210 t.interrupt();
211 awaitTermination(t);
212 assertEquals(0, q.remainingCapacity());
213 }
214
215 /**
216 * timed offer times out if elements not taken
217 */
218 public void testTimedOffer() {
219 final boolean fair = randomBoolean();
220 final SynchronousQueue q = new SynchronousQueue(fair);
221 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
222 Thread t = newStartedThread(new CheckedRunnable() {
223 public void realRun() throws InterruptedException {
224 long startTime = System.nanoTime();
225
226 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
227 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
228
229 Thread.currentThread().interrupt();
230 try {
231 q.offer(new Object(), randomTimeout(), randomTimeUnit());
232 shouldThrow();
233 } catch (InterruptedException success) {}
234 assertFalse(Thread.interrupted());
235
236 pleaseInterrupt.countDown();
237 try {
238 q.offer(new Object(), LONGER_DELAY_MS, MILLISECONDS);
239 shouldThrow();
240 } catch (InterruptedException success) {}
241 assertFalse(Thread.interrupted());
242 }});
243
244 await(pleaseInterrupt);
245 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
246 t.interrupt();
247 awaitTermination(t);
248 }
249
250 /**
251 * poll return null if no active putter
252 */
253 public void testPoll() { testPoll(false); }
254 public void testPoll_fair() { testPoll(true); }
255 public void testPoll(boolean fair) {
256 final SynchronousQueue q = new SynchronousQueue(fair);
257 assertNull(q.poll());
258 }
259
260 /**
261 * timed poll with zero timeout times out if no active putter
262 */
263 public void testTimedPoll0() { testTimedPoll0(false); }
264 public void testTimedPoll0_fair() { testTimedPoll0(true); }
265 public void testTimedPoll0(boolean fair) {
266 final SynchronousQueue q = new SynchronousQueue(fair);
267 try { assertNull(q.poll(0, MILLISECONDS)); }
268 catch (InterruptedException e) { threadUnexpectedException(e); }
269 }
270
271 /**
272 * timed poll with nonzero timeout times out if no active putter
273 */
274 public void testTimedPoll() {
275 final boolean fair = randomBoolean();
276 final SynchronousQueue q = new SynchronousQueue(fair);
277 final long startTime = System.nanoTime();
278 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); }
279 catch (InterruptedException e) { threadUnexpectedException(e); }
280 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
281 }
282
283 /**
284 * timed poll before a delayed offer times out, returning null;
285 * after offer succeeds; on interruption throws
286 */
287 public void testTimedPollWithOffer() {
288 final boolean fair = randomBoolean();
289 final SynchronousQueue q = new SynchronousQueue(fair);
290 final CountDownLatch pleaseOffer = new CountDownLatch(1);
291 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
292 Thread t = newStartedThread(new CheckedRunnable() {
293 public void realRun() throws InterruptedException {
294 long startTime = System.nanoTime();
295 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
296 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
297
298 pleaseOffer.countDown();
299 startTime = System.nanoTime();
300 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
301
302 Thread.currentThread().interrupt();
303 try {
304 q.poll(randomTimeout(), randomTimeUnit());
305 shouldThrow();
306 } catch (InterruptedException success) {}
307 assertFalse(Thread.interrupted());
308
309 pleaseInterrupt.countDown();
310 try {
311 q.poll(LONG_DELAY_MS, MILLISECONDS);
312 shouldThrow();
313 } catch (InterruptedException success) {}
314 assertFalse(Thread.interrupted());
315
316 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
317 }});
318
319 await(pleaseOffer);
320 long startTime = System.nanoTime();
321 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); }
322 catch (InterruptedException e) { threadUnexpectedException(e); }
323 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
324
325 await(pleaseInterrupt);
326 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
327 t.interrupt();
328 awaitTermination(t);
329 }
330
331 /**
332 * peek() returns null if no active putter
333 */
334 public void testPeek() { testPeek(false); }
335 public void testPeek_fair() { testPeek(true); }
336 public void testPeek(boolean fair) {
337 final SynchronousQueue q = new SynchronousQueue(fair);
338 assertNull(q.peek());
339 }
340
341 /**
342 * element() throws NoSuchElementException if no active putter
343 */
344 public void testElement() { testElement(false); }
345 public void testElement_fair() { testElement(true); }
346 public void testElement(boolean fair) {
|