1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 * Other contributors include Andrew Wright, Jeffrey Hayes, 33 * Pat Fisher, Mike Judd. 34 */ 35 36 import static java.util.concurrent.TimeUnit.MILLISECONDS; 37 38 import java.util.ArrayList; 39 import java.util.Arrays; 40 import java.util.Collection; 41 import java.util.Iterator; 42 import java.util.NoSuchElementException; 43 import java.util.concurrent.BlockingQueue; 44 import java.util.concurrent.CountDownLatch; 45 import java.util.concurrent.Executors; 46 import java.util.concurrent.ExecutorService; 47 import java.util.concurrent.SynchronousQueue; 48 49 import junit.framework.Test; 50 51 public class SynchronousQueueTest extends JSR166TestCase { 52 53 public static class Fair extends BlockingQueueTest { 54 protected BlockingQueue emptyCollection() { 55 return new SynchronousQueue(true); 56 } 57 } 58 59 public static class NonFair extends BlockingQueueTest { 60 protected BlockingQueue emptyCollection() { 61 return new SynchronousQueue(false); 62 } 63 } 64 65 public static void main(String[] args) { 66 main(suite(), args); 67 } 68 69 public static Test suite() { 70 return newTestSuite(SynchronousQueueTest.class, 71 new Fair().testSuite(), 72 new NonFair().testSuite()); 73 } 74 75 /** 76 * Any SynchronousQueue is both empty and full 77 */ 78 public void testEmptyFull() { testEmptyFull(false); } 79 public void testEmptyFull_fair() { testEmptyFull(true); } 80 public void testEmptyFull(boolean fair) { 81 final SynchronousQueue q = new SynchronousQueue(fair); 82 assertTrue(q.isEmpty()); 83 assertEquals(0, q.size()); 84 assertEquals(0, q.remainingCapacity()); 85 assertFalse(q.offer(zero)); 86 } 87 88 /** 89 * offer fails if no active taker 90 */ 91 public void testOffer() { testOffer(false); } 92 public void testOffer_fair() { testOffer(true); } 93 public void testOffer(boolean fair) { 94 SynchronousQueue q = new SynchronousQueue(fair); 95 assertFalse(q.offer(one)); 96 } 97 98 /** 99 * add throws IllegalStateException if no active taker 100 */ 101 public void testAdd() { testAdd(false); } 102 public void testAdd_fair() { testAdd(true); } 103 public void testAdd(boolean fair) { 104 SynchronousQueue q = new SynchronousQueue(fair); 105 assertEquals(0, q.remainingCapacity()); 106 try { 107 q.add(one); 108 shouldThrow(); 109 } catch (IllegalStateException success) {} 110 } 111 112 /** 113 * addAll(this) throws IllegalArgumentException 114 */ 115 public void testAddAll_self() { testAddAll_self(false); } 116 public void testAddAll_self_fair() { testAddAll_self(true); } 117 public void testAddAll_self(boolean fair) { 118 SynchronousQueue q = new SynchronousQueue(fair); 119 try { 120 q.addAll(q); 121 shouldThrow(); 122 } catch (IllegalArgumentException success) {} 123 } 124 125 /** 126 * addAll throws IllegalStateException if no active taker 127 */ 128 public void testAddAll_ISE() { testAddAll_ISE(false); } 129 public void testAddAll_ISE_fair() { testAddAll_ISE(true); } 130 public void testAddAll_ISE(boolean fair) { 131 SynchronousQueue q = new SynchronousQueue(fair); 132 Integer[] ints = new Integer[1]; 133 for (int i = 0; i < ints.length; i++) 134 ints[i] = i; 135 Collection<Integer> coll = Arrays.asList(ints); 136 try { 137 q.addAll(coll); 138 shouldThrow(); 139 } catch (IllegalStateException success) {} 140 } 141 142 /** 143 * put blocks interruptibly if no active taker 144 */ 145 public void testBlockingPut() { testBlockingPut(false); } 146 public void testBlockingPut_fair() { testBlockingPut(true); } 147 public void testBlockingPut(boolean fair) { 148 final SynchronousQueue q = new SynchronousQueue(fair); 149 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 150 Thread t = newStartedThread(new CheckedRunnable() { 151 public void realRun() throws InterruptedException { 152 Thread.currentThread().interrupt(); 153 try { 154 q.put(99); 155 shouldThrow(); 156 } catch (InterruptedException success) {} 157 assertFalse(Thread.interrupted()); 158 159 pleaseInterrupt.countDown(); 160 try { 161 q.put(99); 162 shouldThrow(); 163 } catch (InterruptedException success) {} 164 assertFalse(Thread.interrupted()); 165 }}); 166 167 await(pleaseInterrupt); 168 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); 169 t.interrupt(); 170 awaitTermination(t); 171 assertEquals(0, q.remainingCapacity()); 172 } 173 174 /** 175 * put blocks interruptibly waiting for take 176 */ 177 public void testPutWithTake() { testPutWithTake(false); } 178 public void testPutWithTake_fair() { testPutWithTake(true); } 179 public void testPutWithTake(boolean fair) { 180 final SynchronousQueue q = new SynchronousQueue(fair); 181 final CountDownLatch pleaseTake = new CountDownLatch(1); 182 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 183 Thread t = newStartedThread(new CheckedRunnable() { 184 public void realRun() throws InterruptedException { 185 pleaseTake.countDown(); 186 q.put(one); 187 188 Thread.currentThread().interrupt(); 189 try { 190 q.put(99); 191 shouldThrow(); 192 } catch (InterruptedException success) {} 193 assertFalse(Thread.interrupted()); 194 195 pleaseInterrupt.countDown(); 196 try { 197 q.put(99); 198 shouldThrow(); 199 } catch (InterruptedException success) {} 200 assertFalse(Thread.interrupted()); 201 }}); 202 203 await(pleaseTake); 204 assertEquals(0, q.remainingCapacity()); 205 try { assertSame(one, q.take()); } 206 catch (InterruptedException e) { threadUnexpectedException(e); } 207 208 await(pleaseInterrupt); 209 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); 210 t.interrupt(); 211 awaitTermination(t); 212 assertEquals(0, q.remainingCapacity()); 213 } 214 215 /** 216 * timed offer times out if elements not taken 217 */ 218 public void testTimedOffer() { 219 final boolean fair = randomBoolean(); 220 final SynchronousQueue q = new SynchronousQueue(fair); 221 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 222 Thread t = newStartedThread(new CheckedRunnable() { 223 public void realRun() throws InterruptedException { 224 long startTime = System.nanoTime(); 225 226 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS)); 227 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 228 229 Thread.currentThread().interrupt(); 230 try { 231 q.offer(new Object(), randomTimeout(), randomTimeUnit()); 232 shouldThrow(); 233 } catch (InterruptedException success) {} 234 assertFalse(Thread.interrupted()); 235 236 pleaseInterrupt.countDown(); 237 try { 238 q.offer(new Object(), LONGER_DELAY_MS, MILLISECONDS); 239 shouldThrow(); 240 } catch (InterruptedException success) {} 241 assertFalse(Thread.interrupted()); 242 }}); 243 244 await(pleaseInterrupt); 245 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); 246 t.interrupt(); 247 awaitTermination(t); 248 } 249 250 /** 251 * poll return null if no active putter 252 */ 253 public void testPoll() { testPoll(false); } 254 public void testPoll_fair() { testPoll(true); } 255 public void testPoll(boolean fair) { 256 final SynchronousQueue q = new SynchronousQueue(fair); 257 assertNull(q.poll()); 258 } 259 260 /** 261 * timed poll with zero timeout times out if no active putter 262 */ 263 public void testTimedPoll0() { testTimedPoll0(false); } 264 public void testTimedPoll0_fair() { testTimedPoll0(true); } 265 public void testTimedPoll0(boolean fair) { 266 final SynchronousQueue q = new SynchronousQueue(fair); 267 try { assertNull(q.poll(0, MILLISECONDS)); } 268 catch (InterruptedException e) { threadUnexpectedException(e); } 269 } 270 271 /** 272 * timed poll with nonzero timeout times out if no active putter 273 */ 274 public void testTimedPoll() { 275 final boolean fair = randomBoolean(); 276 final SynchronousQueue q = new SynchronousQueue(fair); 277 final long startTime = System.nanoTime(); 278 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); } 279 catch (InterruptedException e) { threadUnexpectedException(e); } 280 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 281 } 282 283 /** 284 * timed poll before a delayed offer times out, returning null; 285 * after offer succeeds; on interruption throws 286 */ 287 public void testTimedPollWithOffer() { 288 final boolean fair = randomBoolean(); 289 final SynchronousQueue q = new SynchronousQueue(fair); 290 final CountDownLatch pleaseOffer = new CountDownLatch(1); 291 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 292 Thread t = newStartedThread(new CheckedRunnable() { 293 public void realRun() throws InterruptedException { 294 long startTime = System.nanoTime(); 295 assertNull(q.poll(timeoutMillis(), MILLISECONDS)); 296 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 297 298 pleaseOffer.countDown(); 299 startTime = System.nanoTime(); 300 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS)); 301 302 Thread.currentThread().interrupt(); 303 try { 304 q.poll(randomTimeout(), randomTimeUnit()); 305 shouldThrow(); 306 } catch (InterruptedException success) {} 307 assertFalse(Thread.interrupted()); 308 309 pleaseInterrupt.countDown(); 310 try { 311 q.poll(LONG_DELAY_MS, MILLISECONDS); 312 shouldThrow(); 313 } catch (InterruptedException success) {} 314 assertFalse(Thread.interrupted()); 315 316 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 317 }}); 318 319 await(pleaseOffer); 320 long startTime = System.nanoTime(); 321 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); } 322 catch (InterruptedException e) { threadUnexpectedException(e); } 323 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 324 325 await(pleaseInterrupt); 326 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); 327 t.interrupt(); 328 awaitTermination(t); 329 } 330 331 /** 332 * peek() returns null if no active putter 333 */ 334 public void testPeek() { testPeek(false); } 335 public void testPeek_fair() { testPeek(true); } 336 public void testPeek(boolean fair) { 337 final SynchronousQueue q = new SynchronousQueue(fair); 338 assertNull(q.peek()); 339 } 340 341 /** 342 * element() throws NoSuchElementException if no active putter 343 */ 344 public void testElement() { testElement(false); } 345 public void testElement_fair() { testElement(true); } 346 public void testElement(boolean fair) { 347 final SynchronousQueue q = new SynchronousQueue(fair); 348 try { 349 q.element(); 350 shouldThrow(); 351 } catch (NoSuchElementException success) {} 352 } 353 354 /** 355 * remove() throws NoSuchElementException if no active putter 356 */ 357 public void testRemove() { testRemove(false); } 358 public void testRemove_fair() { testRemove(true); } 359 public void testRemove(boolean fair) { 360 final SynchronousQueue q = new SynchronousQueue(fair); 361 try { 362 q.remove(); 363 shouldThrow(); 364 } catch (NoSuchElementException success) {} 365 } 366 367 /** 368 * contains returns false 369 */ 370 public void testContains() { testContains(false); } 371 public void testContains_fair() { testContains(true); } 372 public void testContains(boolean fair) { 373 final SynchronousQueue q = new SynchronousQueue(fair); 374 assertFalse(q.contains(zero)); 375 } 376 377 /** 378 * clear ensures isEmpty 379 */ 380 public void testClear() { testClear(false); } 381 public void testClear_fair() { testClear(true); } 382 public void testClear(boolean fair) { 383 final SynchronousQueue q = new SynchronousQueue(fair); 384 q.clear(); 385 assertTrue(q.isEmpty()); 386 } 387 388 /** 389 * containsAll returns false unless empty 390 */ 391 public void testContainsAll() { testContainsAll(false); } 392 public void testContainsAll_fair() { testContainsAll(true); } 393 public void testContainsAll(boolean fair) { 394 final SynchronousQueue q = new SynchronousQueue(fair); 395 Integer[] empty = new Integer[0]; 396 assertTrue(q.containsAll(Arrays.asList(empty))); 397 Integer[] ints = new Integer[1]; ints[0] = zero; 398 assertFalse(q.containsAll(Arrays.asList(ints))); 399 } 400 401 /** 402 * retainAll returns false 403 */ 404 public void testRetainAll() { testRetainAll(false); } 405 public void testRetainAll_fair() { testRetainAll(true); } 406 public void testRetainAll(boolean fair) { 407 final SynchronousQueue q = new SynchronousQueue(fair); 408 Integer[] empty = new Integer[0]; 409 assertFalse(q.retainAll(Arrays.asList(empty))); 410 Integer[] ints = new Integer[1]; ints[0] = zero; 411 assertFalse(q.retainAll(Arrays.asList(ints))); 412 } 413 414 /** 415 * removeAll returns false 416 */ 417 public void testRemoveAll() { testRemoveAll(false); } 418 public void testRemoveAll_fair() { testRemoveAll(true); } 419 public void testRemoveAll(boolean fair) { 420 final SynchronousQueue q = new SynchronousQueue(fair); 421 Integer[] empty = new Integer[0]; 422 assertFalse(q.removeAll(Arrays.asList(empty))); 423 Integer[] ints = new Integer[1]; ints[0] = zero; 424 assertFalse(q.containsAll(Arrays.asList(ints))); 425 } 426 427 /** 428 * toArray is empty 429 */ 430 public void testToArray() { testToArray(false); } 431 public void testToArray_fair() { testToArray(true); } 432 public void testToArray(boolean fair) { 433 final SynchronousQueue q = new SynchronousQueue(fair); 434 Object[] o = q.toArray(); 435 assertEquals(0, o.length); 436 } 437 438 /** 439 * toArray(Integer array) returns its argument with the first 440 * element (if present) nulled out 441 */ 442 public void testToArray2() { testToArray2(false); } 443 public void testToArray2_fair() { testToArray2(true); } 444 public void testToArray2(boolean fair) { 445 final SynchronousQueue<Integer> q = new SynchronousQueue<>(fair); 446 Integer[] a; 447 448 a = new Integer[0]; 449 assertSame(a, q.toArray(a)); 450 451 a = new Integer[3]; 452 Arrays.fill(a, 42); 453 assertSame(a, q.toArray(a)); 454 assertNull(a[0]); 455 for (int i = 1; i < a.length; i++) 456 assertEquals(42, (int) a[i]); 457 } 458 459 /** 460 * toArray(null) throws NPE 461 */ 462 public void testToArray_null() { testToArray_null(false); } 463 public void testToArray_null_fair() { testToArray_null(true); } 464 public void testToArray_null(boolean fair) { 465 final SynchronousQueue q = new SynchronousQueue(fair); 466 try { 467 Object[] unused = q.toArray((Object[])null); 468 shouldThrow(); 469 } catch (NullPointerException success) {} 470 } 471 472 /** 473 * iterator does not traverse any elements 474 */ 475 public void testIterator() { testIterator(false); } 476 public void testIterator_fair() { testIterator(true); } 477 public void testIterator(boolean fair) { 478 assertIteratorExhausted(new SynchronousQueue(fair).iterator()); 479 } 480 481 /** 482 * iterator remove throws IllegalStateException 483 */ 484 public void testIteratorRemove() { testIteratorRemove(false); } 485 public void testIteratorRemove_fair() { testIteratorRemove(true); } 486 public void testIteratorRemove(boolean fair) { 487 final SynchronousQueue q = new SynchronousQueue(fair); 488 Iterator it = q.iterator(); 489 try { 490 it.remove(); 491 shouldThrow(); 492 } catch (IllegalStateException success) {} 493 } 494 495 /** 496 * toString returns a non-null string 497 */ 498 public void testToString() { testToString(false); } 499 public void testToString_fair() { testToString(true); } 500 public void testToString(boolean fair) { 501 final SynchronousQueue q = new SynchronousQueue(fair); 502 String s = q.toString(); 503 assertNotNull(s); 504 } 505 506 /** 507 * offer transfers elements across Executor tasks 508 */ 509 public void testOfferInExecutor() { testOfferInExecutor(false); } 510 public void testOfferInExecutor_fair() { testOfferInExecutor(true); } 511 public void testOfferInExecutor(boolean fair) { 512 final SynchronousQueue q = new SynchronousQueue(fair); 513 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 514 final ExecutorService executor = Executors.newFixedThreadPool(2); 515 try (PoolCleaner cleaner = cleaner(executor)) { 516 517 executor.execute(new CheckedRunnable() { 518 public void realRun() throws InterruptedException { 519 assertFalse(q.offer(one)); 520 threadsStarted.await(); 521 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); 522 assertEquals(0, q.remainingCapacity()); 523 }}); 524 525 executor.execute(new CheckedRunnable() { 526 public void realRun() throws InterruptedException { 527 threadsStarted.await(); 528 assertSame(one, q.take()); 529 }}); 530 } 531 } 532 533 /** 534 * timed poll retrieves elements across Executor threads 535 */ 536 public void testPollInExecutor() { testPollInExecutor(false); } 537 public void testPollInExecutor_fair() { testPollInExecutor(true); } 538 public void testPollInExecutor(boolean fair) { 539 final SynchronousQueue q = new SynchronousQueue(fair); 540 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 541 final ExecutorService executor = Executors.newFixedThreadPool(2); 542 try (PoolCleaner cleaner = cleaner(executor)) { 543 executor.execute(new CheckedRunnable() { 544 public void realRun() throws InterruptedException { 545 assertNull(q.poll()); 546 threadsStarted.await(); 547 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 548 assertTrue(q.isEmpty()); 549 }}); 550 551 executor.execute(new CheckedRunnable() { 552 public void realRun() throws InterruptedException { 553 threadsStarted.await(); 554 q.put(one); 555 }}); 556 } 557 } 558 559 /** 560 * a deserialized/reserialized queue is usable 561 */ 562 public void testSerialization() { 563 final SynchronousQueue x = new SynchronousQueue(); 564 final SynchronousQueue y = new SynchronousQueue(false); 565 final SynchronousQueue z = new SynchronousQueue(true); 566 assertSerialEquals(x, y); 567 assertNotSerialEquals(x, z); 568 SynchronousQueue[] qs = { x, y, z }; 569 for (SynchronousQueue q : qs) { 570 SynchronousQueue clone = serialClone(q); 571 assertNotSame(q, clone); 572 assertSerialEquals(q, clone); 573 assertTrue(clone.isEmpty()); 574 assertEquals(0, clone.size()); 575 assertEquals(0, clone.remainingCapacity()); 576 assertFalse(clone.offer(zero)); 577 } 578 } 579 580 /** 581 * drainTo(c) of empty queue doesn't transfer elements 582 */ 583 public void testDrainTo() { testDrainTo(false); } 584 public void testDrainTo_fair() { testDrainTo(true); } 585 public void testDrainTo(boolean fair) { 586 final SynchronousQueue q = new SynchronousQueue(fair); 587 ArrayList l = new ArrayList(); 588 q.drainTo(l); 589 assertEquals(0, q.size()); 590 assertEquals(0, l.size()); 591 } 592 593 /** 594 * drainTo empties queue, unblocking a waiting put. 595 */ 596 public void testDrainToWithActivePut() { testDrainToWithActivePut(false); } 597 public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); } 598 public void testDrainToWithActivePut(boolean fair) { 599 final SynchronousQueue q = new SynchronousQueue(fair); 600 Thread t = newStartedThread(new CheckedRunnable() { 601 public void realRun() throws InterruptedException { 602 q.put(one); 603 }}); 604 605 ArrayList l = new ArrayList(); 606 long startTime = System.nanoTime(); 607 while (l.isEmpty()) { 608 q.drainTo(l); 609 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 610 fail("timed out"); 611 Thread.yield(); 612 } 613 assertEquals(1, l.size()); 614 assertSame(one, l.get(0)); 615 awaitTermination(t); 616 } 617 618 /** 619 * drainTo(c, n) empties up to n elements of queue into c 620 */ 621 public void testDrainToN() throws InterruptedException { 622 final SynchronousQueue q = new SynchronousQueue(); 623 Thread t1 = newStartedThread(new CheckedRunnable() { 624 public void realRun() throws InterruptedException { 625 q.put(one); 626 }}); 627 628 Thread t2 = newStartedThread(new CheckedRunnable() { 629 public void realRun() throws InterruptedException { 630 q.put(two); 631 }}); 632 633 ArrayList l = new ArrayList(); 634 int drained; 635 while ((drained = q.drainTo(l, 1)) == 0) Thread.yield(); 636 assertEquals(1, drained); 637 assertEquals(1, l.size()); 638 while ((drained = q.drainTo(l, 1)) == 0) Thread.yield(); 639 assertEquals(1, drained); 640 assertEquals(2, l.size()); 641 assertTrue(l.contains(one)); 642 assertTrue(l.contains(two)); 643 awaitTermination(t1); 644 awaitTermination(t2); 645 } 646 647 /** 648 * remove(null), contains(null) always return false 649 */ 650 public void testNeverContainsNull() { 651 Collection<?> q = new SynchronousQueue(); 652 assertFalse(q.contains(null)); 653 assertFalse(q.remove(null)); 654 } 655 656 }