44 import java.util.Queue;
45 import java.util.concurrent.ArrayBlockingQueue;
46 import java.util.concurrent.BlockingQueue;
47 import java.util.concurrent.CountDownLatch;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.ExecutorService;
50 import java.util.concurrent.ThreadLocalRandom;
51
52 import junit.framework.Test;
53
54 public class ArrayBlockingQueueTest extends JSR166TestCase {
55
56 public static void main(String[] args) {
57 main(suite(), args);
58 }
59
60 public static Test suite() {
61 class Implementation implements CollectionImplementation {
62 public Class<?> klazz() { return ArrayBlockingQueue.class; }
63 public Collection emptyCollection() {
64 boolean fair = ThreadLocalRandom.current().nextBoolean();
65 return populatedQueue(0, SIZE, 2 * SIZE, fair);
66 }
67 public Object makeElement(int i) { return i; }
68 public boolean isConcurrent() { return true; }
69 public boolean permitsNulls() { return false; }
70 }
71
72 return newTestSuite(
73 ArrayBlockingQueueTest.class,
74 new Fair().testSuite(),
75 new NonFair().testSuite(),
76 CollectionTest.testSuite(new Implementation()));
77 }
78
79 public static class Fair extends BlockingQueueTest {
80 protected BlockingQueue emptyCollection() {
81 return populatedQueue(0, SIZE, 2 * SIZE, true);
82 }
83 }
84
350 q.put(i);
351 assertEquals(SIZE, q.size());
352 assertEquals(0, q.remainingCapacity());
353
354 Thread.currentThread().interrupt();
355 try {
356 q.put(99);
357 shouldThrow();
358 } catch (InterruptedException success) {}
359 assertFalse(Thread.interrupted());
360
361 pleaseInterrupt.countDown();
362 try {
363 q.put(99);
364 shouldThrow();
365 } catch (InterruptedException success) {}
366 assertFalse(Thread.interrupted());
367 }});
368
369 await(pleaseInterrupt);
370 assertThreadBlocks(t, Thread.State.WAITING);
371 t.interrupt();
372 awaitTermination(t);
373 assertEquals(SIZE, q.size());
374 assertEquals(0, q.remainingCapacity());
375 }
376
377 /**
378 * put blocks interruptibly waiting for take when full
379 */
380 public void testPutWithTake() throws InterruptedException {
381 final int capacity = 2;
382 final ArrayBlockingQueue q = new ArrayBlockingQueue(capacity);
383 final CountDownLatch pleaseTake = new CountDownLatch(1);
384 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
385 Thread t = newStartedThread(new CheckedRunnable() {
386 public void realRun() throws InterruptedException {
387 for (int i = 0; i < capacity; i++)
388 q.put(i);
389 pleaseTake.countDown();
390 q.put(86);
392 Thread.currentThread().interrupt();
393 try {
394 q.put(99);
395 shouldThrow();
396 } catch (InterruptedException success) {}
397 assertFalse(Thread.interrupted());
398
399 pleaseInterrupt.countDown();
400 try {
401 q.put(99);
402 shouldThrow();
403 } catch (InterruptedException success) {}
404 assertFalse(Thread.interrupted());
405 }});
406
407 await(pleaseTake);
408 assertEquals(0, q.remainingCapacity());
409 assertEquals(0, q.take());
410
411 await(pleaseInterrupt);
412 assertThreadBlocks(t, Thread.State.WAITING);
413 t.interrupt();
414 awaitTermination(t);
415 assertEquals(0, q.remainingCapacity());
416 }
417
418 /**
419 * timed offer times out if full and elements not taken
420 */
421 public void testTimedOffer() {
422 final ArrayBlockingQueue q = new ArrayBlockingQueue(2);
423 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
424 Thread t = newStartedThread(new CheckedRunnable() {
425 public void realRun() throws InterruptedException {
426 q.put(new Object());
427 q.put(new Object());
428 long startTime = System.nanoTime();
429 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
430 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
431
432 Thread.currentThread().interrupt();
433 try {
434 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
435 shouldThrow();
436 } catch (InterruptedException success) {}
437 assertFalse(Thread.interrupted());
438
439 pleaseInterrupt.countDown();
440 try {
441 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
442 shouldThrow();
443 } catch (InterruptedException success) {}
444 assertFalse(Thread.interrupted());
445 }});
446
447 await(pleaseInterrupt);
448 assertThreadBlocks(t, Thread.State.TIMED_WAITING);
449 t.interrupt();
450 awaitTermination(t);
451 }
452
453 /**
454 * take retrieves elements in FIFO order
455 */
456 public void testTake() throws InterruptedException {
457 ArrayBlockingQueue q = populatedQueue(SIZE);
458 for (int i = 0; i < SIZE; ++i) {
459 assertEquals(i, q.take());
460 }
461 }
462
463 /**
464 * Take removes existing elements until empty, then blocks interruptibly
465 */
466 public void testBlockingTake() throws InterruptedException {
467 final ArrayBlockingQueue q = populatedQueue(SIZE);
468 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
469 Thread t = newStartedThread(new CheckedRunnable() {
470 public void realRun() throws InterruptedException {
471 for (int i = 0; i < SIZE; i++) assertEquals(i, q.take());
472
473 Thread.currentThread().interrupt();
474 try {
475 q.take();
476 shouldThrow();
477 } catch (InterruptedException success) {}
478 assertFalse(Thread.interrupted());
479
480 pleaseInterrupt.countDown();
481 try {
482 q.take();
483 shouldThrow();
484 } catch (InterruptedException success) {}
485 assertFalse(Thread.interrupted());
486 }});
487
488 await(pleaseInterrupt);
489 assertThreadBlocks(t, Thread.State.WAITING);
490 t.interrupt();
491 awaitTermination(t);
492 }
493
494 /**
495 * poll succeeds unless empty
496 */
497 public void testPoll() {
498 ArrayBlockingQueue q = populatedQueue(SIZE);
499 for (int i = 0; i < SIZE; ++i) {
500 assertEquals(i, q.poll());
501 }
502 assertNull(q.poll());
503 }
504
505 /**
506 * timed poll with zero timeout succeeds when non-empty, else times out
507 */
508 public void testTimedPoll0() throws InterruptedException {
509 ArrayBlockingQueue q = populatedQueue(SIZE);
522 for (int i = 0; i < SIZE; ++i) {
523 long startTime = System.nanoTime();
524 assertEquals(i, q.poll(LONG_DELAY_MS, MILLISECONDS));
525 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
526 }
527 long startTime = System.nanoTime();
528 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
529 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
530 checkEmpty(q);
531 }
532
533 /**
534 * Interrupted timed poll throws InterruptedException instead of
535 * returning timeout status
536 */
537 public void testInterruptedTimedPoll() throws InterruptedException {
538 final BlockingQueue<Integer> q = populatedQueue(SIZE);
539 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
540 Thread t = newStartedThread(new CheckedRunnable() {
541 public void realRun() throws InterruptedException {
542 long startTime = System.nanoTime();
543 for (int i = 0; i < SIZE; i++)
544 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
545
546 Thread.currentThread().interrupt();
547 try {
548 q.poll(LONG_DELAY_MS, MILLISECONDS);
549 shouldThrow();
550 } catch (InterruptedException success) {}
551 assertFalse(Thread.interrupted());
552
553 pleaseInterrupt.countDown();
554 try {
555 q.poll(LONG_DELAY_MS, MILLISECONDS);
556 shouldThrow();
557 } catch (InterruptedException success) {}
558 assertFalse(Thread.interrupted());
559
560 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
561 }});
562
563 await(pleaseInterrupt);
564 assertThreadBlocks(t, Thread.State.TIMED_WAITING);
565 t.interrupt();
566 awaitTermination(t);
567 checkEmpty(q);
568 }
569
570 /**
571 * peek returns next element, or null if empty
572 */
573 public void testPeek() {
574 ArrayBlockingQueue q = populatedQueue(SIZE);
575 for (int i = 0; i < SIZE; ++i) {
576 assertEquals(i, q.peek());
577 assertEquals(i, q.poll());
578 assertTrue(q.peek() == null ||
579 !q.peek().equals(i));
580 }
581 assertNull(q.peek());
582 }
583
584 /**
|
44 import java.util.Queue;
45 import java.util.concurrent.ArrayBlockingQueue;
46 import java.util.concurrent.BlockingQueue;
47 import java.util.concurrent.CountDownLatch;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.ExecutorService;
50 import java.util.concurrent.ThreadLocalRandom;
51
52 import junit.framework.Test;
53
54 public class ArrayBlockingQueueTest extends JSR166TestCase {
55
56 public static void main(String[] args) {
57 main(suite(), args);
58 }
59
60 public static Test suite() {
61 class Implementation implements CollectionImplementation {
62 public Class<?> klazz() { return ArrayBlockingQueue.class; }
63 public Collection emptyCollection() {
64 boolean fair = randomBoolean();
65 return populatedQueue(0, SIZE, 2 * SIZE, fair);
66 }
67 public Object makeElement(int i) { return i; }
68 public boolean isConcurrent() { return true; }
69 public boolean permitsNulls() { return false; }
70 }
71
72 return newTestSuite(
73 ArrayBlockingQueueTest.class,
74 new Fair().testSuite(),
75 new NonFair().testSuite(),
76 CollectionTest.testSuite(new Implementation()));
77 }
78
79 public static class Fair extends BlockingQueueTest {
80 protected BlockingQueue emptyCollection() {
81 return populatedQueue(0, SIZE, 2 * SIZE, true);
82 }
83 }
84
350 q.put(i);
351 assertEquals(SIZE, q.size());
352 assertEquals(0, q.remainingCapacity());
353
354 Thread.currentThread().interrupt();
355 try {
356 q.put(99);
357 shouldThrow();
358 } catch (InterruptedException success) {}
359 assertFalse(Thread.interrupted());
360
361 pleaseInterrupt.countDown();
362 try {
363 q.put(99);
364 shouldThrow();
365 } catch (InterruptedException success) {}
366 assertFalse(Thread.interrupted());
367 }});
368
369 await(pleaseInterrupt);
370 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING);
371 t.interrupt();
372 awaitTermination(t);
373 assertEquals(SIZE, q.size());
374 assertEquals(0, q.remainingCapacity());
375 }
376
377 /**
378 * put blocks interruptibly waiting for take when full
379 */
380 public void testPutWithTake() throws InterruptedException {
381 final int capacity = 2;
382 final ArrayBlockingQueue q = new ArrayBlockingQueue(capacity);
383 final CountDownLatch pleaseTake = new CountDownLatch(1);
384 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
385 Thread t = newStartedThread(new CheckedRunnable() {
386 public void realRun() throws InterruptedException {
387 for (int i = 0; i < capacity; i++)
388 q.put(i);
389 pleaseTake.countDown();
390 q.put(86);
392 Thread.currentThread().interrupt();
393 try {
394 q.put(99);
395 shouldThrow();
396 } catch (InterruptedException success) {}
397 assertFalse(Thread.interrupted());
398
399 pleaseInterrupt.countDown();
400 try {
401 q.put(99);
402 shouldThrow();
403 } catch (InterruptedException success) {}
404 assertFalse(Thread.interrupted());
405 }});
406
407 await(pleaseTake);
408 assertEquals(0, q.remainingCapacity());
409 assertEquals(0, q.take());
410
411 await(pleaseInterrupt);
412 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING);
413 t.interrupt();
414 awaitTermination(t);
415 assertEquals(0, q.remainingCapacity());
416 }
417
418 /**
419 * timed offer times out if full and elements not taken
420 */
421 public void testTimedOffer() {
422 final ArrayBlockingQueue q = new ArrayBlockingQueue(2);
423 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
424 Thread t = newStartedThread(new CheckedRunnable() {
425 public void realRun() throws InterruptedException {
426 q.put(new Object());
427 q.put(new Object());
428 long startTime = System.nanoTime();
429 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
430 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
431
432 Thread.currentThread().interrupt();
433 try {
434 q.offer(new Object(), randomTimeout(), randomTimeUnit());
435 shouldThrow();
436 } catch (InterruptedException success) {}
437 assertFalse(Thread.interrupted());
438
439 pleaseInterrupt.countDown();
440 try {
441 q.offer(new Object(), LONGER_DELAY_MS, MILLISECONDS);
442 shouldThrow();
443 } catch (InterruptedException success) {}
444 assertFalse(Thread.interrupted());
445 }});
446
447 await(pleaseInterrupt);
448 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
449 t.interrupt();
450 awaitTermination(t);
451 }
452
453 /**
454 * take retrieves elements in FIFO order
455 */
456 public void testTake() throws InterruptedException {
457 ArrayBlockingQueue q = populatedQueue(SIZE);
458 for (int i = 0; i < SIZE; ++i) {
459 assertEquals(i, q.take());
460 }
461 }
462
463 /**
464 * Take removes existing elements until empty, then blocks interruptibly
465 */
466 public void testBlockingTake() throws InterruptedException {
467 final ArrayBlockingQueue q = populatedQueue(SIZE);
468 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
469 Thread t = newStartedThread(new CheckedRunnable() {
470 public void realRun() throws InterruptedException {
471 for (int i = 0; i < SIZE; i++) assertEquals(i, q.take());
472
473 Thread.currentThread().interrupt();
474 try {
475 q.take();
476 shouldThrow();
477 } catch (InterruptedException success) {}
478 assertFalse(Thread.interrupted());
479
480 pleaseInterrupt.countDown();
481 try {
482 q.take();
483 shouldThrow();
484 } catch (InterruptedException success) {}
485 assertFalse(Thread.interrupted());
486 }});
487
488 await(pleaseInterrupt);
489 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING);
490 t.interrupt();
491 awaitTermination(t);
492 }
493
494 /**
495 * poll succeeds unless empty
496 */
497 public void testPoll() {
498 ArrayBlockingQueue q = populatedQueue(SIZE);
499 for (int i = 0; i < SIZE; ++i) {
500 assertEquals(i, q.poll());
501 }
502 assertNull(q.poll());
503 }
504
505 /**
506 * timed poll with zero timeout succeeds when non-empty, else times out
507 */
508 public void testTimedPoll0() throws InterruptedException {
509 ArrayBlockingQueue q = populatedQueue(SIZE);
522 for (int i = 0; i < SIZE; ++i) {
523 long startTime = System.nanoTime();
524 assertEquals(i, q.poll(LONG_DELAY_MS, MILLISECONDS));
525 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
526 }
527 long startTime = System.nanoTime();
528 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
529 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
530 checkEmpty(q);
531 }
532
533 /**
534 * Interrupted timed poll throws InterruptedException instead of
535 * returning timeout status
536 */
537 public void testInterruptedTimedPoll() throws InterruptedException {
538 final BlockingQueue<Integer> q = populatedQueue(SIZE);
539 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
540 Thread t = newStartedThread(new CheckedRunnable() {
541 public void realRun() throws InterruptedException {
542 for (int i = 0; i < SIZE; i++)
543 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
544
545 Thread.currentThread().interrupt();
546 try {
547 q.poll(randomTimeout(), randomTimeUnit());
548 shouldThrow();
549 } catch (InterruptedException success) {}
550 assertFalse(Thread.interrupted());
551
552 pleaseInterrupt.countDown();
553 try {
554 q.poll(LONGER_DELAY_MS, MILLISECONDS);
555 shouldThrow();
556 } catch (InterruptedException success) {}
557 assertFalse(Thread.interrupted());
558 }});
559
560 await(pleaseInterrupt);
561 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
562 t.interrupt();
563 awaitTermination(t);
564 checkEmpty(q);
565 }
566
567 /**
568 * peek returns next element, or null if empty
569 */
570 public void testPeek() {
571 ArrayBlockingQueue q = populatedQueue(SIZE);
572 for (int i = 0; i < SIZE; ++i) {
573 assertEquals(i, q.peek());
574 assertEquals(i, q.poll());
575 assertTrue(q.peek() == null ||
576 !q.peek().equals(i));
577 }
578 assertNull(q.peek());
579 }
580
581 /**
|