1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea and Martin Buchholz with assistance from 30 * members of JCP JSR-166 Expert Group and released to the public 31 * domain, as explained at 32 * http://creativecommons.org/publicdomain/zero/1.0/ 33 */ 34 35 import java.util.concurrent.CompletableFuture; 36 import java.util.concurrent.CountDownLatch; 37 import java.util.concurrent.Executor; 38 import java.util.concurrent.Executors; 39 import java.util.concurrent.Flow; 40 import java.util.concurrent.ForkJoinPool; 41 import java.util.concurrent.SubmissionPublisher; 42 import java.util.concurrent.atomic.AtomicInteger; 43 import junit.framework.Test; 44 import junit.framework.TestSuite; 45 46 import static java.util.concurrent.Flow.Subscriber; 47 import static java.util.concurrent.Flow.Subscription; 48 import static java.util.concurrent.TimeUnit.MILLISECONDS; 49 50 public class SubmissionPublisherTest extends JSR166TestCase { 51 52 public static void main(String[] args) { 53 main(suite(), args); 54 } 55 public static Test suite() { 56 return new TestSuite(SubmissionPublisherTest.class); 57 } 58 59 final Executor basicExecutor = basicPublisher().getExecutor(); 60 61 static SubmissionPublisher<Integer> basicPublisher() { 62 return new SubmissionPublisher<Integer>(); 63 } 64 65 static class SPException extends RuntimeException {} 66 67 class TestSubscriber implements Subscriber<Integer> { 68 volatile Subscription sn; 69 int last; // Requires that onNexts are in numeric order 70 volatile int nexts; 71 volatile int errors; 72 volatile int completes; 73 volatile boolean throwOnCall = false; 74 volatile boolean request = true; 75 volatile Throwable lastError; 76 77 public synchronized void onSubscribe(Subscription s) { 78 threadAssertTrue(sn == null); 79 sn = s; 80 notifyAll(); 81 if (throwOnCall) 82 throw new SPException(); 83 if (request) 84 sn.request(1L); 85 } 86 public synchronized void onNext(Integer t) { 87 ++nexts; 88 notifyAll(); 89 int current = t.intValue(); 90 threadAssertTrue(current >= last); 91 last = current; 92 if (request) 93 sn.request(1L); 94 if (throwOnCall) 95 throw new SPException(); 96 } 97 public synchronized void onError(Throwable t) { 98 threadAssertTrue(completes == 0); 99 threadAssertTrue(errors == 0); 100 lastError = t; 101 ++errors; 102 notifyAll(); 103 } 104 public synchronized void onComplete() { 105 threadAssertTrue(completes == 0); 106 ++completes; 107 notifyAll(); 108 } 109 110 synchronized void awaitSubscribe() { 111 while (sn == null) { 112 try { 113 wait(); 114 } catch (Exception ex) { 115 threadUnexpectedException(ex); 116 break; 117 } 118 } 119 } 120 synchronized void awaitNext(int n) { 121 while (nexts < n) { 122 try { 123 wait(); 124 } catch (Exception ex) { 125 threadUnexpectedException(ex); 126 break; 127 } 128 } 129 } 130 synchronized void awaitComplete() { 131 while (completes == 0 && errors == 0) { 132 try { 133 wait(); 134 } catch (Exception ex) { 135 threadUnexpectedException(ex); 136 break; 137 } 138 } 139 } 140 synchronized void awaitError() { 141 while (errors == 0) { 142 try { 143 wait(); 144 } catch (Exception ex) { 145 threadUnexpectedException(ex); 146 break; 147 } 148 } 149 } 150 151 } 152 153 /** 154 * A new SubmissionPublisher has no subscribers, a non-null 155 * executor, a power-of-two capacity, is not closed, and reports 156 * zero demand and lag 157 */ 158 void checkInitialState(SubmissionPublisher<?> p) { 159 assertFalse(p.hasSubscribers()); 160 assertEquals(0, p.getNumberOfSubscribers()); 161 assertTrue(p.getSubscribers().isEmpty()); 162 assertFalse(p.isClosed()); 163 assertNull(p.getClosedException()); 164 int n = p.getMaxBufferCapacity(); 165 assertTrue((n & (n - 1)) == 0); // power of two 166 assertNotNull(p.getExecutor()); 167 assertEquals(0, p.estimateMinimumDemand()); 168 assertEquals(0, p.estimateMaximumLag()); 169 } 170 171 /** 172 * A default-constructed SubmissionPublisher has no subscribers, 173 * is not closed, has default buffer size, and uses the 174 * defaultExecutor 175 */ 176 public void testConstructor1() { 177 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(); 178 checkInitialState(p); 179 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize()); 180 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool(); 181 if (ForkJoinPool.getCommonPoolParallelism() > 1) 182 assertSame(e, c); 183 else 184 assertNotSame(e, c); 185 } 186 187 /** 188 * A new SubmissionPublisher has no subscribers, is not closed, 189 * has the given buffer size, and uses the given executor 190 */ 191 public void testConstructor2() { 192 Executor e = Executors.newFixedThreadPool(1); 193 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8); 194 checkInitialState(p); 195 assertSame(p.getExecutor(), e); 196 assertEquals(8, p.getMaxBufferCapacity()); 197 } 198 199 /** 200 * A null Executor argument to SubmissionPublisher constructor 201 * throws NullPointerException 202 */ 203 public void testConstructor3() { 204 try { 205 new SubmissionPublisher<Integer>(null, 8); 206 shouldThrow(); 207 } catch (NullPointerException success) {} 208 } 209 210 /** 211 * A negative capacity argument to SubmissionPublisher constructor 212 * throws IllegalArgumentException 213 */ 214 public void testConstructor4() { 215 Executor e = Executors.newFixedThreadPool(1); 216 try { 217 new SubmissionPublisher<Integer>(e, -1); 218 shouldThrow(); 219 } catch (IllegalArgumentException success) {} 220 } 221 222 /** 223 * A closed publisher reports isClosed with no closedException and 224 * throws IllegalStateException upon attempted submission; a 225 * subsequent close or closeExceptionally has no additional 226 * effect. 227 */ 228 public void testClose() { 229 SubmissionPublisher<Integer> p = basicPublisher(); 230 checkInitialState(p); 231 p.close(); 232 assertTrue(p.isClosed()); 233 assertNull(p.getClosedException()); 234 try { 235 p.submit(1); 236 shouldThrow(); 237 } catch (IllegalStateException success) {} 238 Throwable ex = new SPException(); 239 p.closeExceptionally(ex); 240 assertTrue(p.isClosed()); 241 assertNull(p.getClosedException()); 242 } 243 244 /** 245 * A publisher closedExceptionally reports isClosed with the 246 * closedException and throws IllegalStateException upon attempted 247 * submission; a subsequent close or closeExceptionally has no 248 * additional effect. 249 */ 250 public void testCloseExceptionally() { 251 SubmissionPublisher<Integer> p = basicPublisher(); 252 checkInitialState(p); 253 Throwable ex = new SPException(); 254 p.closeExceptionally(ex); 255 assertTrue(p.isClosed()); 256 assertSame(p.getClosedException(), ex); 257 try { 258 p.submit(1); 259 shouldThrow(); 260 } catch (IllegalStateException success) {} 261 p.close(); 262 assertTrue(p.isClosed()); 263 assertSame(p.getClosedException(), ex); 264 } 265 266 /** 267 * Upon subscription, the subscriber's onSubscribe is called, no 268 * other Subscriber methods are invoked, the publisher 269 * hasSubscribers, isSubscribed is true, and existing 270 * subscriptions are unaffected. 271 */ 272 public void testSubscribe1() { 273 TestSubscriber s = new TestSubscriber(); 274 SubmissionPublisher<Integer> p = basicPublisher(); 275 p.subscribe(s); 276 assertTrue(p.hasSubscribers()); 277 assertEquals(1, p.getNumberOfSubscribers()); 278 assertTrue(p.getSubscribers().contains(s)); 279 assertTrue(p.isSubscribed(s)); 280 s.awaitSubscribe(); 281 assertNotNull(s.sn); 282 assertEquals(0, s.nexts); 283 assertEquals(0, s.errors); 284 assertEquals(0, s.completes); 285 TestSubscriber s2 = new TestSubscriber(); 286 p.subscribe(s2); 287 assertTrue(p.hasSubscribers()); 288 assertEquals(2, p.getNumberOfSubscribers()); 289 assertTrue(p.getSubscribers().contains(s)); 290 assertTrue(p.getSubscribers().contains(s2)); 291 assertTrue(p.isSubscribed(s)); 292 assertTrue(p.isSubscribed(s2)); 293 s2.awaitSubscribe(); 294 assertNotNull(s2.sn); 295 assertEquals(0, s2.nexts); 296 assertEquals(0, s2.errors); 297 assertEquals(0, s2.completes); 298 p.close(); 299 } 300 301 /** 302 * If closed, upon subscription, the subscriber's onComplete 303 * method is invoked 304 */ 305 public void testSubscribe2() { 306 TestSubscriber s = new TestSubscriber(); 307 SubmissionPublisher<Integer> p = basicPublisher(); 308 p.close(); 309 p.subscribe(s); 310 s.awaitComplete(); 311 assertEquals(0, s.nexts); 312 assertEquals(0, s.errors); 313 assertEquals(1, s.completes, 1); 314 } 315 316 /** 317 * If closedExceptionally, upon subscription, the subscriber's 318 * onError method is invoked 319 */ 320 public void testSubscribe3() { 321 TestSubscriber s = new TestSubscriber(); 322 SubmissionPublisher<Integer> p = basicPublisher(); 323 Throwable ex = new SPException(); 324 p.closeExceptionally(ex); 325 assertTrue(p.isClosed()); 326 assertSame(p.getClosedException(), ex); 327 p.subscribe(s); 328 s.awaitError(); 329 assertEquals(0, s.nexts); 330 assertEquals(1, s.errors); 331 } 332 333 /** 334 * Upon attempted resubscription, the subscriber's onError is 335 * called and the subscription is cancelled. 336 */ 337 public void testSubscribe4() { 338 TestSubscriber s = new TestSubscriber(); 339 SubmissionPublisher<Integer> p = basicPublisher(); 340 p.subscribe(s); 341 assertTrue(p.hasSubscribers()); 342 assertEquals(1, p.getNumberOfSubscribers()); 343 assertTrue(p.getSubscribers().contains(s)); 344 assertTrue(p.isSubscribed(s)); 345 s.awaitSubscribe(); 346 assertNotNull(s.sn); 347 assertEquals(0, s.nexts); 348 assertEquals(0, s.errors); 349 assertEquals(0, s.completes); 350 p.subscribe(s); 351 s.awaitError(); 352 assertEquals(0, s.nexts); 353 assertEquals(1, s.errors); 354 assertFalse(p.isSubscribed(s)); 355 } 356 357 /** 358 * An exception thrown in onSubscribe causes onError 359 */ 360 public void testSubscribe5() { 361 TestSubscriber s = new TestSubscriber(); 362 SubmissionPublisher<Integer> p = basicPublisher(); 363 s.throwOnCall = true; 364 p.subscribe(s); 365 s.awaitError(); 366 assertEquals(0, s.nexts); 367 assertEquals(1, s.errors); 368 assertEquals(0, s.completes); 369 } 370 371 /** 372 * subscribe(null) throws NPE 373 */ 374 public void testSubscribe6() { 375 SubmissionPublisher<Integer> p = basicPublisher(); 376 try { 377 p.subscribe(null); 378 shouldThrow(); 379 } catch (NullPointerException success) {} 380 checkInitialState(p); 381 } 382 383 /** 384 * Closing a publisher causes onComplete to subscribers 385 */ 386 public void testCloseCompletes() { 387 SubmissionPublisher<Integer> p = basicPublisher(); 388 TestSubscriber s1 = new TestSubscriber(); 389 TestSubscriber s2 = new TestSubscriber(); 390 p.subscribe(s1); 391 p.subscribe(s2); 392 p.submit(1); 393 p.close(); 394 assertTrue(p.isClosed()); 395 assertNull(p.getClosedException()); 396 s1.awaitComplete(); 397 assertEquals(1, s1.nexts); 398 assertEquals(1, s1.completes); 399 s2.awaitComplete(); 400 assertEquals(1, s2.nexts); 401 assertEquals(1, s2.completes); 402 } 403 404 /** 405 * Closing a publisher exceptionally causes onError to subscribers 406 * after they are subscribed 407 */ 408 public void testCloseExceptionallyError() { 409 SubmissionPublisher<Integer> p = basicPublisher(); 410 TestSubscriber s1 = new TestSubscriber(); 411 TestSubscriber s2 = new TestSubscriber(); 412 p.subscribe(s1); 413 p.subscribe(s2); 414 p.submit(1); 415 p.closeExceptionally(new SPException()); 416 assertTrue(p.isClosed()); 417 s1.awaitSubscribe(); 418 s1.awaitError(); 419 assertTrue(s1.nexts <= 1); 420 assertEquals(1, s1.errors); 421 s2.awaitSubscribe(); 422 s2.awaitError(); 423 assertTrue(s2.nexts <= 1); 424 assertEquals(1, s2.errors); 425 } 426 427 /** 428 * Cancelling a subscription eventually causes no more onNexts to be issued 429 */ 430 public void testCancel() { 431 SubmissionPublisher<Integer> p = 432 new SubmissionPublisher<>(basicExecutor, 4); // must be < 20 433 TestSubscriber s1 = new TestSubscriber(); 434 TestSubscriber s2 = new TestSubscriber(); 435 p.subscribe(s1); 436 p.subscribe(s2); 437 s1.awaitSubscribe(); 438 p.submit(1); 439 s1.sn.cancel(); 440 for (int i = 2; i <= 20; ++i) 441 p.submit(i); 442 p.close(); 443 s2.awaitComplete(); 444 assertEquals(20, s2.nexts); 445 assertEquals(1, s2.completes); 446 assertTrue(s1.nexts < 20); 447 assertFalse(p.isSubscribed(s1)); 448 } 449 450 /** 451 * Throwing an exception in onNext causes onError 452 */ 453 public void testThrowOnNext() { 454 SubmissionPublisher<Integer> p = basicPublisher(); 455 TestSubscriber s1 = new TestSubscriber(); 456 TestSubscriber s2 = new TestSubscriber(); 457 p.subscribe(s1); 458 p.subscribe(s2); 459 s1.awaitSubscribe(); 460 p.submit(1); 461 s1.throwOnCall = true; 462 p.submit(2); 463 p.close(); 464 s2.awaitComplete(); 465 assertEquals(2, s2.nexts); 466 s1.awaitComplete(); 467 assertEquals(1, s1.errors); 468 } 469 470 /** 471 * If a handler is supplied in constructor, it is invoked when 472 * subscriber throws an exception in onNext 473 */ 474 public void testThrowOnNextHandler() { 475 AtomicInteger calls = new AtomicInteger(); 476 SubmissionPublisher<Integer> p = new SubmissionPublisher<>( 477 basicExecutor, 8, (s, e) -> calls.getAndIncrement()); 478 TestSubscriber s1 = new TestSubscriber(); 479 TestSubscriber s2 = new TestSubscriber(); 480 p.subscribe(s1); 481 p.subscribe(s2); 482 s1.awaitSubscribe(); 483 p.submit(1); 484 s1.throwOnCall = true; 485 p.submit(2); 486 p.close(); 487 s2.awaitComplete(); 488 assertEquals(2, s2.nexts); 489 assertEquals(1, s2.completes); 490 s1.awaitError(); 491 assertEquals(1, s1.errors); 492 assertEquals(1, calls.get()); 493 } 494 495 /** 496 * onNext items are issued in the same order to each subscriber 497 */ 498 public void testOrder() { 499 SubmissionPublisher<Integer> p = basicPublisher(); 500 TestSubscriber s1 = new TestSubscriber(); 501 TestSubscriber s2 = new TestSubscriber(); 502 p.subscribe(s1); 503 p.subscribe(s2); 504 for (int i = 1; i <= 20; ++i) 505 p.submit(i); 506 p.close(); 507 s2.awaitComplete(); 508 s1.awaitComplete(); 509 assertEquals(20, s2.nexts); 510 assertEquals(1, s2.completes); 511 assertEquals(20, s1.nexts); 512 assertEquals(1, s1.completes); 513 } 514 515 /** 516 * onNext is issued only if requested 517 */ 518 public void testRequest1() { 519 SubmissionPublisher<Integer> p = basicPublisher(); 520 TestSubscriber s1 = new TestSubscriber(); 521 s1.request = false; 522 p.subscribe(s1); 523 s1.awaitSubscribe(); 524 assertEquals(0, p.estimateMinimumDemand()); 525 TestSubscriber s2 = new TestSubscriber(); 526 p.subscribe(s2); 527 p.submit(1); 528 p.submit(2); 529 s2.awaitNext(1); 530 assertEquals(0, s1.nexts); 531 s1.sn.request(3); 532 p.submit(3); 533 p.close(); 534 s2.awaitComplete(); 535 assertEquals(3, s2.nexts); 536 assertEquals(1, s2.completes); 537 s1.awaitComplete(); 538 assertTrue(s1.nexts > 0); 539 assertEquals(1, s1.completes); 540 } 541 542 /** 543 * onNext is not issued when requests become zero 544 */ 545 public void testRequest2() { 546 SubmissionPublisher<Integer> p = basicPublisher(); 547 TestSubscriber s1 = new TestSubscriber(); 548 TestSubscriber s2 = new TestSubscriber(); 549 p.subscribe(s1); 550 p.subscribe(s2); 551 s2.awaitSubscribe(); 552 s1.awaitSubscribe(); 553 s1.request = false; 554 p.submit(1); 555 p.submit(2); 556 p.close(); 557 s2.awaitComplete(); 558 assertEquals(2, s2.nexts); 559 assertEquals(1, s2.completes); 560 s1.awaitNext(1); 561 assertEquals(1, s1.nexts); 562 } 563 564 /** 565 * Non-positive request causes error 566 */ 567 public void testRequest3() { 568 SubmissionPublisher<Integer> p = basicPublisher(); 569 TestSubscriber s1 = new TestSubscriber(); 570 TestSubscriber s2 = new TestSubscriber(); 571 TestSubscriber s3 = new TestSubscriber(); 572 p.subscribe(s1); 573 p.subscribe(s2); 574 p.subscribe(s3); 575 s3.awaitSubscribe(); 576 s2.awaitSubscribe(); 577 s1.awaitSubscribe(); 578 s1.sn.request(-1L); 579 s3.sn.request(0L); 580 p.submit(1); 581 p.submit(2); 582 p.close(); 583 s2.awaitComplete(); 584 assertEquals(2, s2.nexts); 585 assertEquals(1, s2.completes); 586 s1.awaitError(); 587 assertEquals(1, s1.errors); 588 assertTrue(s1.lastError instanceof IllegalArgumentException); 589 s3.awaitError(); 590 assertEquals(1, s3.errors); 591 assertTrue(s3.lastError instanceof IllegalArgumentException); 592 } 593 594 /** 595 * estimateMinimumDemand reports 0 until request, nonzero after 596 * request 597 */ 598 public void testEstimateMinimumDemand() { 599 TestSubscriber s = new TestSubscriber(); 600 SubmissionPublisher<Integer> p = basicPublisher(); 601 s.request = false; 602 p.subscribe(s); 603 s.awaitSubscribe(); 604 assertEquals(0, p.estimateMinimumDemand()); 605 s.sn.request(1); 606 assertEquals(1, p.estimateMinimumDemand()); 607 } 608 609 /** 610 * submit to a publisher with no subscribers returns lag 0 611 */ 612 public void testEmptySubmit() { 613 SubmissionPublisher<Integer> p = basicPublisher(); 614 assertEquals(0, p.submit(1)); 615 } 616 617 /** 618 * submit(null) throws NPE 619 */ 620 public void testNullSubmit() { 621 SubmissionPublisher<Integer> p = basicPublisher(); 622 try { 623 p.submit(null); 624 shouldThrow(); 625 } catch (NullPointerException success) {} 626 } 627 628 /** 629 * submit returns number of lagged items, compatible with result 630 * of estimateMaximumLag. 631 */ 632 public void testLaggedSubmit() { 633 SubmissionPublisher<Integer> p = basicPublisher(); 634 TestSubscriber s1 = new TestSubscriber(); 635 s1.request = false; 636 TestSubscriber s2 = new TestSubscriber(); 637 s2.request = false; 638 p.subscribe(s1); 639 p.subscribe(s2); 640 s2.awaitSubscribe(); 641 s1.awaitSubscribe(); 642 assertEquals(1, p.submit(1)); 643 assertTrue(p.estimateMaximumLag() >= 1); 644 assertTrue(p.submit(2) >= 2); 645 assertTrue(p.estimateMaximumLag() >= 2); 646 s1.sn.request(4); 647 assertTrue(p.submit(3) >= 3); 648 assertTrue(p.estimateMaximumLag() >= 3); 649 s2.sn.request(4); 650 p.submit(4); 651 p.close(); 652 s2.awaitComplete(); 653 assertEquals(4, s2.nexts); 654 s1.awaitComplete(); 655 assertEquals(4, s2.nexts); 656 } 657 658 /** 659 * submit eventually issues requested items when buffer capacity is 1 660 */ 661 public void testCap1Submit() { 662 SubmissionPublisher<Integer> p 663 = new SubmissionPublisher<>(basicExecutor, 1); 664 TestSubscriber s1 = new TestSubscriber(); 665 TestSubscriber s2 = new TestSubscriber(); 666 p.subscribe(s1); 667 p.subscribe(s2); 668 for (int i = 1; i <= 20; ++i) { 669 assertTrue(p.submit(i) >= 0); 670 } 671 p.close(); 672 s2.awaitComplete(); 673 s1.awaitComplete(); 674 assertEquals(20, s2.nexts); 675 assertEquals(1, s2.completes); 676 assertEquals(20, s1.nexts); 677 assertEquals(1, s1.completes); 678 } 679 680 static boolean noopHandle(AtomicInteger count) { 681 count.getAndIncrement(); 682 return false; 683 } 684 685 static boolean reqHandle(AtomicInteger count, Subscriber s) { 686 count.getAndIncrement(); 687 ((TestSubscriber)s).sn.request(Long.MAX_VALUE); 688 return true; 689 } 690 691 /** 692 * offer to a publisher with no subscribers returns lag 0 693 */ 694 public void testEmptyOffer() { 695 SubmissionPublisher<Integer> p = basicPublisher(); 696 assertEquals(0, p.offer(1, null)); 697 } 698 699 /** 700 * offer(null) throws NPE 701 */ 702 public void testNullOffer() { 703 SubmissionPublisher<Integer> p = basicPublisher(); 704 try { 705 p.offer(null, null); 706 shouldThrow(); 707 } catch (NullPointerException success) {} 708 } 709 710 /** 711 * offer returns number of lagged items if not saturated 712 */ 713 public void testLaggedOffer() { 714 SubmissionPublisher<Integer> p = basicPublisher(); 715 TestSubscriber s1 = new TestSubscriber(); 716 s1.request = false; 717 TestSubscriber s2 = new TestSubscriber(); 718 s2.request = false; 719 p.subscribe(s1); 720 p.subscribe(s2); 721 s2.awaitSubscribe(); 722 s1.awaitSubscribe(); 723 assertTrue(p.offer(1, null) >= 1); 724 assertTrue(p.offer(2, null) >= 2); 725 s1.sn.request(4); 726 assertTrue(p.offer(3, null) >= 3); 727 s2.sn.request(4); 728 p.offer(4, null); 729 p.close(); 730 s2.awaitComplete(); 731 assertEquals(4, s2.nexts); 732 s1.awaitComplete(); 733 assertEquals(4, s2.nexts); 734 } 735 736 /** 737 * offer reports drops if saturated 738 */ 739 public void testDroppedOffer() { 740 SubmissionPublisher<Integer> p 741 = new SubmissionPublisher<>(basicExecutor, 4); 742 TestSubscriber s1 = new TestSubscriber(); 743 s1.request = false; 744 TestSubscriber s2 = new TestSubscriber(); 745 s2.request = false; 746 p.subscribe(s1); 747 p.subscribe(s2); 748 s2.awaitSubscribe(); 749 s1.awaitSubscribe(); 750 for (int i = 1; i <= 4; ++i) 751 assertTrue(p.offer(i, null) >= 0); 752 p.offer(5, null); 753 assertTrue(p.offer(6, null) < 0); 754 s1.sn.request(64); 755 assertTrue(p.offer(7, null) < 0); 756 s2.sn.request(64); 757 p.close(); 758 s2.awaitComplete(); 759 assertTrue(s2.nexts >= 4); 760 s1.awaitComplete(); 761 assertTrue(s1.nexts >= 4); 762 } 763 764 /** 765 * offer invokes drop handler if saturated 766 */ 767 public void testHandledDroppedOffer() { 768 AtomicInteger calls = new AtomicInteger(); 769 SubmissionPublisher<Integer> p 770 = new SubmissionPublisher<>(basicExecutor, 4); 771 TestSubscriber s1 = new TestSubscriber(); 772 s1.request = false; 773 TestSubscriber s2 = new TestSubscriber(); 774 s2.request = false; 775 p.subscribe(s1); 776 p.subscribe(s2); 777 s2.awaitSubscribe(); 778 s1.awaitSubscribe(); 779 for (int i = 1; i <= 4; ++i) 780 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0); 781 p.offer(4, (s, x) -> noopHandle(calls)); 782 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0); 783 s1.sn.request(64); 784 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0); 785 s2.sn.request(64); 786 p.close(); 787 s2.awaitComplete(); 788 s1.awaitComplete(); 789 assertTrue(calls.get() >= 4); 790 } 791 792 /** 793 * offer succeeds if drop handler forces request 794 */ 795 public void testRecoveredHandledDroppedOffer() { 796 AtomicInteger calls = new AtomicInteger(); 797 SubmissionPublisher<Integer> p 798 = new SubmissionPublisher<>(basicExecutor, 4); 799 TestSubscriber s1 = new TestSubscriber(); 800 s1.request = false; 801 TestSubscriber s2 = new TestSubscriber(); 802 s2.request = false; 803 p.subscribe(s1); 804 p.subscribe(s2); 805 s2.awaitSubscribe(); 806 s1.awaitSubscribe(); 807 int n = 0; 808 for (int i = 1; i <= 8; ++i) { 809 int d = p.offer(i, (s, x) -> reqHandle(calls, s)); 810 n = n + 2 + (d < 0 ? d : 0); 811 } 812 p.close(); 813 s2.awaitComplete(); 814 s1.awaitComplete(); 815 assertEquals(n, s1.nexts + s2.nexts); 816 assertTrue(calls.get() >= 2); 817 } 818 819 /** 820 * Timed offer to a publisher with no subscribers returns lag 0 821 */ 822 public void testEmptyTimedOffer() { 823 SubmissionPublisher<Integer> p = basicPublisher(); 824 long startTime = System.nanoTime(); 825 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null)); 826 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); 827 } 828 829 /** 830 * Timed offer with null item or TimeUnit throws NPE 831 */ 832 public void testNullTimedOffer() { 833 SubmissionPublisher<Integer> p = basicPublisher(); 834 long startTime = System.nanoTime(); 835 try { 836 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null); 837 shouldThrow(); 838 } catch (NullPointerException success) {} 839 try { 840 p.offer(1, LONG_DELAY_MS, null, null); 841 shouldThrow(); 842 } catch (NullPointerException success) {} 843 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); 844 } 845 846 /** 847 * Timed offer returns number of lagged items if not saturated 848 */ 849 public void testLaggedTimedOffer() { 850 SubmissionPublisher<Integer> p = basicPublisher(); 851 TestSubscriber s1 = new TestSubscriber(); 852 s1.request = false; 853 TestSubscriber s2 = new TestSubscriber(); 854 s2.request = false; 855 p.subscribe(s1); 856 p.subscribe(s2); 857 s2.awaitSubscribe(); 858 s1.awaitSubscribe(); 859 long startTime = System.nanoTime(); 860 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1); 861 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2); 862 s1.sn.request(4); 863 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3); 864 s2.sn.request(4); 865 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null); 866 p.close(); 867 s2.awaitComplete(); 868 assertEquals(4, s2.nexts); 869 s1.awaitComplete(); 870 assertEquals(4, s2.nexts); 871 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); 872 } 873 874 /** 875 * Timed offer reports drops if saturated 876 */ 877 public void testDroppedTimedOffer() { 878 SubmissionPublisher<Integer> p 879 = new SubmissionPublisher<>(basicExecutor, 4); 880 TestSubscriber s1 = new TestSubscriber(); 881 s1.request = false; 882 TestSubscriber s2 = new TestSubscriber(); 883 s2.request = false; 884 p.subscribe(s1); 885 p.subscribe(s2); 886 s2.awaitSubscribe(); 887 s1.awaitSubscribe(); 888 long delay = timeoutMillis(); 889 for (int i = 1; i <= 4; ++i) 890 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0); 891 long startTime = System.nanoTime(); 892 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0); 893 s1.sn.request(64); 894 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0); 895 // 2 * delay should elapse but check only 1 * delay to allow timer slop 896 assertTrue(millisElapsedSince(startTime) >= delay); 897 s2.sn.request(64); 898 p.close(); 899 s2.awaitComplete(); 900 assertTrue(s2.nexts >= 2); 901 s1.awaitComplete(); 902 assertTrue(s1.nexts >= 2); 903 } 904 905 /** 906 * Timed offer invokes drop handler if saturated 907 */ 908 public void testHandledDroppedTimedOffer() { 909 AtomicInteger calls = new AtomicInteger(); 910 SubmissionPublisher<Integer> p 911 = new SubmissionPublisher<>(basicExecutor, 4); 912 TestSubscriber s1 = new TestSubscriber(); 913 s1.request = false; 914 TestSubscriber s2 = new TestSubscriber(); 915 s2.request = false; 916 p.subscribe(s1); 917 p.subscribe(s2); 918 s2.awaitSubscribe(); 919 s1.awaitSubscribe(); 920 long delay = timeoutMillis(); 921 for (int i = 1; i <= 4; ++i) 922 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); 923 long startTime = System.nanoTime(); 924 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); 925 s1.sn.request(64); 926 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); 927 assertTrue(millisElapsedSince(startTime) >= delay); 928 s2.sn.request(64); 929 p.close(); 930 s2.awaitComplete(); 931 s1.awaitComplete(); 932 assertTrue(calls.get() >= 2); 933 } 934 935 /** 936 * Timed offer succeeds if drop handler forces request 937 */ 938 public void testRecoveredHandledDroppedTimedOffer() { 939 AtomicInteger calls = new AtomicInteger(); 940 SubmissionPublisher<Integer> p 941 = new SubmissionPublisher<>(basicExecutor, 4); 942 TestSubscriber s1 = new TestSubscriber(); 943 s1.request = false; 944 TestSubscriber s2 = new TestSubscriber(); 945 s2.request = false; 946 p.subscribe(s1); 947 p.subscribe(s2); 948 s2.awaitSubscribe(); 949 s1.awaitSubscribe(); 950 int n = 0; 951 long delay = timeoutMillis(); 952 long startTime = System.nanoTime(); 953 for (int i = 1; i <= 6; ++i) { 954 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s)); 955 n = n + 2 + (d < 0 ? d : 0); 956 } 957 assertTrue(millisElapsedSince(startTime) >= delay); 958 p.close(); 959 s2.awaitComplete(); 960 s1.awaitComplete(); 961 assertEquals(n, s1.nexts + s2.nexts); 962 assertTrue(calls.get() >= 2); 963 } 964 965 /** 966 * consume returns a CompletableFuture that is done when 967 * publisher completes 968 */ 969 public void testConsume() { 970 AtomicInteger sum = new AtomicInteger(); 971 SubmissionPublisher<Integer> p = basicPublisher(); 972 CompletableFuture<Void> f = 973 p.consume((Integer x) -> sum.getAndAdd(x.intValue())); 974 int n = 20; 975 for (int i = 1; i <= n; ++i) 976 p.submit(i); 977 p.close(); 978 f.join(); 979 assertEquals((n * (n + 1)) / 2, sum.get()); 980 } 981 982 /** 983 * consume(null) throws NPE 984 */ 985 public void testConsumeNPE() { 986 SubmissionPublisher<Integer> p = basicPublisher(); 987 try { 988 CompletableFuture<Void> f = p.consume(null); 989 shouldThrow(); 990 } catch (NullPointerException success) {} 991 } 992 993 /** 994 * consume eventually stops processing published items if cancelled 995 */ 996 public void testCancelledConsume() { 997 AtomicInteger count = new AtomicInteger(); 998 SubmissionPublisher<Integer> p = basicPublisher(); 999 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement()); 1000 f.cancel(true); 1001 int n = 1000000; // arbitrary limit 1002 for (int i = 1; i <= n; ++i) 1003 p.submit(i); 1004 assertTrue(count.get() < n); 1005 } 1006 1007 /** 1008 * Tests scenario for 1009 * JDK-8187947: A race condition in SubmissionPublisher 1010 * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java 1011 */ 1012 public void testMissedSignal_8187947() throws Exception { 1013 if (!atLeastJava9()) return; // backport to jdk8 too hard 1014 final int N = 1015 ((ForkJoinPool.getCommonPoolParallelism() < 2) // JDK-8212899 1016 ? (1 << 5) 1017 : (1 << 10)) 1018 * (expensiveTests ? (1 << 10) : 1); 1019 final CountDownLatch finished = new CountDownLatch(1); 1020 final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>(); 1021 class Sub implements Subscriber<Boolean> { 1022 int received; 1023 public void onSubscribe(Subscription s) { 1024 s.request(N); 1025 } 1026 public void onNext(Boolean item) { 1027 if (++received == N) 1028 finished.countDown(); 1029 else 1030 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); 1031 } 1032 public void onError(Throwable t) { throw new AssertionError(t); } 1033 public void onComplete() {} 1034 } 1035 pub.subscribe(new Sub()); 1036 checkTimedGet( 1037 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)), 1038 null); 1039 await(finished); 1040 } 1041 }