1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 * Other contributors include Andrew Wright, Jeffrey Hayes, 33 * Pat Fisher, Mike Judd. 34 */ 35 36 import static java.util.concurrent.TimeUnit.MILLISECONDS; 37 38 import java.util.ArrayList; 39 import java.util.Arrays; 40 import java.util.Collection; 41 import java.util.Iterator; 42 import java.util.NoSuchElementException; 43 import java.util.concurrent.BlockingQueue; 44 import java.util.concurrent.CountDownLatch; 45 import java.util.concurrent.Executors; 46 import java.util.concurrent.ExecutorService; 47 import java.util.concurrent.SynchronousQueue; 48 import java.util.concurrent.ThreadLocalRandom; 49 50 import junit.framework.Test; 51 52 public class SynchronousQueueTest extends JSR166TestCase { 53 54 public static class Fair extends BlockingQueueTest { 55 protected BlockingQueue emptyCollection() { 56 return new SynchronousQueue(true); 57 } 58 } 59 60 public static class NonFair extends BlockingQueueTest { 61 protected BlockingQueue emptyCollection() { 62 return new SynchronousQueue(false); 63 } 64 } 65 66 public static void main(String[] args) { 67 main(suite(), args); 68 } 69 70 public static Test suite() { 71 return newTestSuite(SynchronousQueueTest.class, 72 new Fair().testSuite(), 73 new NonFair().testSuite()); 74 } 75 76 /** 77 * Any SynchronousQueue is both empty and full 78 */ 79 public void testEmptyFull() { testEmptyFull(false); } 80 public void testEmptyFull_fair() { testEmptyFull(true); } 81 public void testEmptyFull(boolean fair) { 82 final SynchronousQueue q = new SynchronousQueue(fair); 83 assertTrue(q.isEmpty()); 84 assertEquals(0, q.size()); 85 assertEquals(0, q.remainingCapacity()); 86 assertFalse(q.offer(zero)); 87 } 88 89 /** 90 * offer fails if no active taker 91 */ 92 public void testOffer() { testOffer(false); } 93 public void testOffer_fair() { testOffer(true); } 94 public void testOffer(boolean fair) { 95 SynchronousQueue q = new SynchronousQueue(fair); 96 assertFalse(q.offer(one)); 97 } 98 99 /** 100 * add throws IllegalStateException if no active taker 101 */ 102 public void testAdd() { testAdd(false); } 103 public void testAdd_fair() { testAdd(true); } 104 public void testAdd(boolean fair) { 105 SynchronousQueue q = new SynchronousQueue(fair); 106 assertEquals(0, q.remainingCapacity()); 107 try { 108 q.add(one); 109 shouldThrow(); 110 } catch (IllegalStateException success) {} 111 } 112 113 /** 114 * addAll(this) throws IllegalArgumentException 115 */ 116 public void testAddAll_self() { testAddAll_self(false); } 117 public void testAddAll_self_fair() { testAddAll_self(true); } 118 public void testAddAll_self(boolean fair) { 119 SynchronousQueue q = new SynchronousQueue(fair); 120 try { 121 q.addAll(q); 122 shouldThrow(); 123 } catch (IllegalArgumentException success) {} 124 } 125 126 /** 127 * addAll throws IllegalStateException if no active taker 128 */ 129 public void testAddAll_ISE() { testAddAll_ISE(false); } 130 public void testAddAll_ISE_fair() { testAddAll_ISE(true); } 131 public void testAddAll_ISE(boolean fair) { 132 SynchronousQueue q = new SynchronousQueue(fair); 133 Integer[] ints = new Integer[1]; 134 for (int i = 0; i < ints.length; i++) 135 ints[i] = i; 136 Collection<Integer> coll = Arrays.asList(ints); 137 try { 138 q.addAll(coll); 139 shouldThrow(); 140 } catch (IllegalStateException success) {} 141 } 142 143 /** 144 * put blocks interruptibly if no active taker 145 */ 146 public void testBlockingPut() { testBlockingPut(false); } 147 public void testBlockingPut_fair() { testBlockingPut(true); } 148 public void testBlockingPut(boolean fair) { 149 final SynchronousQueue q = new SynchronousQueue(fair); 150 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 151 Thread t = newStartedThread(new CheckedRunnable() { 152 public void realRun() throws InterruptedException { 153 Thread.currentThread().interrupt(); 154 try { 155 q.put(99); 156 shouldThrow(); 157 } catch (InterruptedException success) {} 158 assertFalse(Thread.interrupted()); 159 160 pleaseInterrupt.countDown(); 161 try { 162 q.put(99); 163 shouldThrow(); 164 } catch (InterruptedException success) {} 165 assertFalse(Thread.interrupted()); 166 }}); 167 168 await(pleaseInterrupt); 169 assertThreadBlocks(t, Thread.State.WAITING); 170 t.interrupt(); 171 awaitTermination(t); 172 assertEquals(0, q.remainingCapacity()); 173 } 174 175 /** 176 * put blocks interruptibly waiting for take 177 */ 178 public void testPutWithTake() { testPutWithTake(false); } 179 public void testPutWithTake_fair() { testPutWithTake(true); } 180 public void testPutWithTake(boolean fair) { 181 final SynchronousQueue q = new SynchronousQueue(fair); 182 final CountDownLatch pleaseTake = new CountDownLatch(1); 183 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 184 Thread t = newStartedThread(new CheckedRunnable() { 185 public void realRun() throws InterruptedException { 186 pleaseTake.countDown(); 187 q.put(one); 188 189 Thread.currentThread().interrupt(); 190 try { 191 q.put(99); 192 shouldThrow(); 193 } catch (InterruptedException success) {} 194 assertFalse(Thread.interrupted()); 195 196 pleaseInterrupt.countDown(); 197 try { 198 q.put(99); 199 shouldThrow(); 200 } catch (InterruptedException success) {} 201 assertFalse(Thread.interrupted()); 202 }}); 203 204 await(pleaseTake); 205 assertEquals(0, q.remainingCapacity()); 206 try { assertSame(one, q.take()); } 207 catch (InterruptedException e) { threadUnexpectedException(e); } 208 209 await(pleaseInterrupt); 210 assertThreadBlocks(t, Thread.State.WAITING); 211 t.interrupt(); 212 awaitTermination(t); 213 assertEquals(0, q.remainingCapacity()); 214 } 215 216 /** 217 * timed offer times out if elements not taken 218 */ 219 public void testTimedOffer() { 220 final boolean fair = ThreadLocalRandom.current().nextBoolean(); 221 final SynchronousQueue q = new SynchronousQueue(fair); 222 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 223 Thread t = newStartedThread(new CheckedRunnable() { 224 public void realRun() throws InterruptedException { 225 long startTime = System.nanoTime(); 226 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS)); 227 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 228 229 Thread.currentThread().interrupt(); 230 try { 231 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); 232 shouldThrow(); 233 } catch (InterruptedException success) {} 234 assertFalse(Thread.interrupted()); 235 236 pleaseInterrupt.countDown(); 237 try { 238 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); 239 shouldThrow(); 240 } catch (InterruptedException success) {} 241 assertFalse(Thread.interrupted()); 242 }}); 243 244 await(pleaseInterrupt); 245 assertThreadBlocks(t, Thread.State.TIMED_WAITING); 246 t.interrupt(); 247 awaitTermination(t); 248 } 249 250 /** 251 * poll return null if no active putter 252 */ 253 public void testPoll() { testPoll(false); } 254 public void testPoll_fair() { testPoll(true); } 255 public void testPoll(boolean fair) { 256 final SynchronousQueue q = new SynchronousQueue(fair); 257 assertNull(q.poll()); 258 } 259 260 /** 261 * timed poll with zero timeout times out if no active putter 262 */ 263 public void testTimedPoll0() { testTimedPoll0(false); } 264 public void testTimedPoll0_fair() { testTimedPoll0(true); } 265 public void testTimedPoll0(boolean fair) { 266 final SynchronousQueue q = new SynchronousQueue(fair); 267 try { assertNull(q.poll(0, MILLISECONDS)); } 268 catch (InterruptedException e) { threadUnexpectedException(e); } 269 } 270 271 /** 272 * timed poll with nonzero timeout times out if no active putter 273 */ 274 public void testTimedPoll() { 275 final boolean fair = ThreadLocalRandom.current().nextBoolean(); 276 final SynchronousQueue q = new SynchronousQueue(fair); 277 final long startTime = System.nanoTime(); 278 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); } 279 catch (InterruptedException e) { threadUnexpectedException(e); } 280 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 281 } 282 283 /** 284 * timed poll before a delayed offer times out, returning null; 285 * after offer succeeds; on interruption throws 286 */ 287 public void testTimedPollWithOffer() { 288 final boolean fair = ThreadLocalRandom.current().nextBoolean(); 289 final SynchronousQueue q = new SynchronousQueue(fair); 290 final CountDownLatch pleaseOffer = new CountDownLatch(1); 291 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 292 Thread t = newStartedThread(new CheckedRunnable() { 293 public void realRun() throws InterruptedException { 294 long startTime = System.nanoTime(); 295 assertNull(q.poll(timeoutMillis(), MILLISECONDS)); 296 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 297 298 pleaseOffer.countDown(); 299 startTime = System.nanoTime(); 300 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS)); 301 302 Thread.currentThread().interrupt(); 303 try { 304 q.poll(LONG_DELAY_MS, MILLISECONDS); 305 shouldThrow(); 306 } catch (InterruptedException success) {} 307 assertFalse(Thread.interrupted()); 308 309 pleaseInterrupt.countDown(); 310 try { 311 q.poll(LONG_DELAY_MS, MILLISECONDS); 312 shouldThrow(); 313 } catch (InterruptedException success) {} 314 assertFalse(Thread.interrupted()); 315 316 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 317 }}); 318 319 await(pleaseOffer); 320 long startTime = System.nanoTime(); 321 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); } 322 catch (InterruptedException e) { threadUnexpectedException(e); } 323 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 324 325 await(pleaseInterrupt); 326 assertThreadBlocks(t, Thread.State.TIMED_WAITING); 327 t.interrupt(); 328 awaitTermination(t); 329 } 330 331 /** 332 * peek() returns null if no active putter 333 */ 334 public void testPeek() { testPeek(false); } 335 public void testPeek_fair() { testPeek(true); } 336 public void testPeek(boolean fair) { 337 final SynchronousQueue q = new SynchronousQueue(fair); 338 assertNull(q.peek()); 339 } 340 341 /** 342 * element() throws NoSuchElementException if no active putter 343 */ 344 public void testElement() { testElement(false); } 345 public void testElement_fair() { testElement(true); } 346 public void testElement(boolean fair) { 347 final SynchronousQueue q = new SynchronousQueue(fair); 348 try { 349 q.element(); 350 shouldThrow(); 351 } catch (NoSuchElementException success) {} 352 } 353 354 /** 355 * remove() throws NoSuchElementException if no active putter 356 */ 357 public void testRemove() { testRemove(false); } 358 public void testRemove_fair() { testRemove(true); } 359 public void testRemove(boolean fair) { 360 final SynchronousQueue q = new SynchronousQueue(fair); 361 try { 362 q.remove(); 363 shouldThrow(); 364 } catch (NoSuchElementException success) {} 365 } 366 367 /** 368 * contains returns false 369 */ 370 public void testContains() { testContains(false); } 371 public void testContains_fair() { testContains(true); } 372 public void testContains(boolean fair) { 373 final SynchronousQueue q = new SynchronousQueue(fair); 374 assertFalse(q.contains(zero)); 375 } 376 377 /** 378 * clear ensures isEmpty 379 */ 380 public void testClear() { testClear(false); } 381 public void testClear_fair() { testClear(true); } 382 public void testClear(boolean fair) { 383 final SynchronousQueue q = new SynchronousQueue(fair); 384 q.clear(); 385 assertTrue(q.isEmpty()); 386 } 387 388 /** 389 * containsAll returns false unless empty 390 */ 391 public void testContainsAll() { testContainsAll(false); } 392 public void testContainsAll_fair() { testContainsAll(true); } 393 public void testContainsAll(boolean fair) { 394 final SynchronousQueue q = new SynchronousQueue(fair); 395 Integer[] empty = new Integer[0]; 396 assertTrue(q.containsAll(Arrays.asList(empty))); 397 Integer[] ints = new Integer[1]; ints[0] = zero; 398 assertFalse(q.containsAll(Arrays.asList(ints))); 399 } 400 401 /** 402 * retainAll returns false 403 */ 404 public void testRetainAll() { testRetainAll(false); } 405 public void testRetainAll_fair() { testRetainAll(true); } 406 public void testRetainAll(boolean fair) { 407 final SynchronousQueue q = new SynchronousQueue(fair); 408 Integer[] empty = new Integer[0]; 409 assertFalse(q.retainAll(Arrays.asList(empty))); 410 Integer[] ints = new Integer[1]; ints[0] = zero; 411 assertFalse(q.retainAll(Arrays.asList(ints))); 412 } 413 414 /** 415 * removeAll returns false 416 */ 417 public void testRemoveAll() { testRemoveAll(false); } 418 public void testRemoveAll_fair() { testRemoveAll(true); } 419 public void testRemoveAll(boolean fair) { 420 final SynchronousQueue q = new SynchronousQueue(fair); 421 Integer[] empty = new Integer[0]; 422 assertFalse(q.removeAll(Arrays.asList(empty))); 423 Integer[] ints = new Integer[1]; ints[0] = zero; 424 assertFalse(q.containsAll(Arrays.asList(ints))); 425 } 426 427 /** 428 * toArray is empty 429 */ 430 public void testToArray() { testToArray(false); } 431 public void testToArray_fair() { testToArray(true); } 432 public void testToArray(boolean fair) { 433 final SynchronousQueue q = new SynchronousQueue(fair); 434 Object[] o = q.toArray(); 435 assertEquals(0, o.length); 436 } 437 438 /** 439 * toArray(Integer array) returns its argument with the first 440 * element (if present) nulled out 441 */ 442 public void testToArray2() { testToArray2(false); } 443 public void testToArray2_fair() { testToArray2(true); } 444 public void testToArray2(boolean fair) { 445 final SynchronousQueue<Integer> q = new SynchronousQueue<>(fair); 446 Integer[] a; 447 448 a = new Integer[0]; 449 assertSame(a, q.toArray(a)); 450 451 a = new Integer[3]; 452 Arrays.fill(a, 42); 453 assertSame(a, q.toArray(a)); 454 assertNull(a[0]); 455 for (int i = 1; i < a.length; i++) 456 assertEquals(42, (int) a[i]); 457 } 458 459 /** 460 * toArray(null) throws NPE 461 */ 462 public void testToArray_null() { testToArray_null(false); } 463 public void testToArray_null_fair() { testToArray_null(true); } 464 public void testToArray_null(boolean fair) { 465 final SynchronousQueue q = new SynchronousQueue(fair); 466 try { 467 Object[] o = q.toArray(null); 468 shouldThrow(); 469 } catch (NullPointerException success) {} 470 } 471 472 /** 473 * iterator does not traverse any elements 474 */ 475 public void testIterator() { testIterator(false); } 476 public void testIterator_fair() { testIterator(true); } 477 public void testIterator(boolean fair) { 478 assertIteratorExhausted(new SynchronousQueue(fair).iterator()); 479 } 480 481 /** 482 * iterator remove throws IllegalStateException 483 */ 484 public void testIteratorRemove() { testIteratorRemove(false); } 485 public void testIteratorRemove_fair() { testIteratorRemove(true); } 486 public void testIteratorRemove(boolean fair) { 487 final SynchronousQueue q = new SynchronousQueue(fair); 488 Iterator it = q.iterator(); 489 try { 490 it.remove(); 491 shouldThrow(); 492 } catch (IllegalStateException success) {} 493 } 494 495 /** 496 * toString returns a non-null string 497 */ 498 public void testToString() { testToString(false); } 499 public void testToString_fair() { testToString(true); } 500 public void testToString(boolean fair) { 501 final SynchronousQueue q = new SynchronousQueue(fair); 502 String s = q.toString(); 503 assertNotNull(s); 504 } 505 506 /** 507 * offer transfers elements across Executor tasks 508 */ 509 public void testOfferInExecutor() { testOfferInExecutor(false); } 510 public void testOfferInExecutor_fair() { testOfferInExecutor(true); } 511 public void testOfferInExecutor(boolean fair) { 512 final SynchronousQueue q = new SynchronousQueue(fair); 513 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 514 final ExecutorService executor = Executors.newFixedThreadPool(2); 515 try (PoolCleaner cleaner = cleaner(executor)) { 516 517 executor.execute(new CheckedRunnable() { 518 public void realRun() throws InterruptedException { 519 assertFalse(q.offer(one)); 520 threadsStarted.await(); 521 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); 522 assertEquals(0, q.remainingCapacity()); 523 }}); 524 525 executor.execute(new CheckedRunnable() { 526 public void realRun() throws InterruptedException { 527 threadsStarted.await(); 528 assertSame(one, q.take()); 529 }}); 530 } 531 } 532 533 /** 534 * timed poll retrieves elements across Executor threads 535 */ 536 public void testPollInExecutor() { testPollInExecutor(false); } 537 public void testPollInExecutor_fair() { testPollInExecutor(true); } 538 public void testPollInExecutor(boolean fair) { 539 final SynchronousQueue q = new SynchronousQueue(fair); 540 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 541 final ExecutorService executor = Executors.newFixedThreadPool(2); 542 try (PoolCleaner cleaner = cleaner(executor)) { 543 executor.execute(new CheckedRunnable() { 544 public void realRun() throws InterruptedException { 545 assertNull(q.poll()); 546 threadsStarted.await(); 547 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 548 assertTrue(q.isEmpty()); 549 }}); 550 551 executor.execute(new CheckedRunnable() { 552 public void realRun() throws InterruptedException { 553 threadsStarted.await(); 554 q.put(one); 555 }}); 556 } 557 } 558 559 /** 560 * a deserialized/reserialized queue is usable 561 */ 562 public void testSerialization() { 563 final SynchronousQueue x = new SynchronousQueue(); 564 final SynchronousQueue y = new SynchronousQueue(false); 565 final SynchronousQueue z = new SynchronousQueue(true); 566 assertSerialEquals(x, y); 567 assertNotSerialEquals(x, z); 568 SynchronousQueue[] qs = { x, y, z }; 569 for (SynchronousQueue q : qs) { 570 SynchronousQueue clone = serialClone(q); 571 assertNotSame(q, clone); 572 assertSerialEquals(q, clone); 573 assertTrue(clone.isEmpty()); 574 assertEquals(0, clone.size()); 575 assertEquals(0, clone.remainingCapacity()); 576 assertFalse(clone.offer(zero)); 577 } 578 } 579 580 /** 581 * drainTo(c) of empty queue doesn't transfer elements 582 */ 583 public void testDrainTo() { testDrainTo(false); } 584 public void testDrainTo_fair() { testDrainTo(true); } 585 public void testDrainTo(boolean fair) { 586 final SynchronousQueue q = new SynchronousQueue(fair); 587 ArrayList l = new ArrayList(); 588 q.drainTo(l); 589 assertEquals(0, q.size()); 590 assertEquals(0, l.size()); 591 } 592 593 /** 594 * drainTo empties queue, unblocking a waiting put. 595 */ 596 public void testDrainToWithActivePut() { testDrainToWithActivePut(false); } 597 public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); } 598 public void testDrainToWithActivePut(boolean fair) { 599 final SynchronousQueue q = new SynchronousQueue(fair); 600 Thread t = newStartedThread(new CheckedRunnable() { 601 public void realRun() throws InterruptedException { 602 q.put(one); 603 }}); 604 605 ArrayList l = new ArrayList(); 606 long startTime = System.nanoTime(); 607 while (l.isEmpty()) { 608 q.drainTo(l); 609 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 610 fail("timed out"); 611 Thread.yield(); 612 } 613 assertEquals(1, l.size()); 614 assertSame(one, l.get(0)); 615 awaitTermination(t); 616 } 617 618 /** 619 * drainTo(c, n) empties up to n elements of queue into c 620 */ 621 public void testDrainToN() throws InterruptedException { 622 final SynchronousQueue q = new SynchronousQueue(); 623 Thread t1 = newStartedThread(new CheckedRunnable() { 624 public void realRun() throws InterruptedException { 625 q.put(one); 626 }}); 627 628 Thread t2 = newStartedThread(new CheckedRunnable() { 629 public void realRun() throws InterruptedException { 630 q.put(two); 631 }}); 632 633 ArrayList l = new ArrayList(); 634 int drained; 635 while ((drained = q.drainTo(l, 1)) == 0) Thread.yield(); 636 assertEquals(1, drained); 637 assertEquals(1, l.size()); 638 while ((drained = q.drainTo(l, 1)) == 0) Thread.yield(); 639 assertEquals(1, drained); 640 assertEquals(2, l.size()); 641 assertTrue(l.contains(one)); 642 assertTrue(l.contains(two)); 643 awaitTermination(t1); 644 awaitTermination(t2); 645 } 646 647 /** 648 * remove(null), contains(null) always return false 649 */ 650 public void testNeverContainsNull() { 651 Collection<?> q = new SynchronousQueue(); 652 assertFalse(q.contains(null)); 653 assertFalse(q.remove(null)); 654 } 655 656 }