1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.
   7  *
   8  * This code is distributed in the hope that it will be useful, but WITHOUT
   9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  11  * version 2 for more details (a copy is included in the LICENSE file that
  12  * accompanied this code).
  13  *
  14  * You should have received a copy of the GNU General Public License version
  15  * 2 along with this work; if not, write to the Free Software Foundation,
  16  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  17  *
  18  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  19  * or visit www.oracle.com if you need additional information or have any
  20  * questions.
  21  */
  22 
  23 /*
  24  * This file is available under and governed by the GNU General Public
  25  * License version 2 only, as published by the Free Software Foundation.
  26  * However, the following notice accompanied the original version of this
  27  * file:
  28  *
  29  * Written by Doug Lea and Martin Buchholz with assistance from
  30  * members of JCP JSR-166 Expert Group and released to the public
  31  * domain, as explained at
  32  * http://creativecommons.org/publicdomain/zero/1.0/
  33  */
  34 
  35 import java.util.concurrent.CompletableFuture;
  36 import java.util.concurrent.CountDownLatch;
  37 import java.util.concurrent.Executor;
  38 import java.util.concurrent.Executors;
  39 import java.util.concurrent.Flow;
  40 import java.util.concurrent.ForkJoinPool;
  41 import java.util.concurrent.SubmissionPublisher;
  42 import java.util.concurrent.atomic.AtomicInteger;
  43 import junit.framework.Test;
  44 import junit.framework.TestSuite;
  45 
  46 import static java.util.concurrent.Flow.Subscriber;
  47 import static java.util.concurrent.Flow.Subscription;
  48 import static java.util.concurrent.TimeUnit.MILLISECONDS;
  49 
  50 public class SubmissionPublisherTest extends JSR166TestCase {
  51 
  52     public static void main(String[] args) {
  53         main(suite(), args);
  54     }
  55     public static Test suite() {
  56         return new TestSuite(SubmissionPublisherTest.class);
  57     }
  58 
  59     final Executor basicExecutor = basicPublisher().getExecutor();
  60 
  61     static SubmissionPublisher<Integer> basicPublisher() {
  62         return new SubmissionPublisher<Integer>();
  63     }
  64 
  65     static class SPException extends RuntimeException {}
  66 
  67     class TestSubscriber implements Subscriber<Integer> {
  68         volatile Subscription sn;
  69         int last;  // Requires that onNexts are in numeric order
  70         volatile int nexts;
  71         volatile int errors;
  72         volatile int completes;
  73         volatile boolean throwOnCall = false;
  74         volatile boolean request = true;
  75         volatile Throwable lastError;
  76 
  77         public synchronized void onSubscribe(Subscription s) {
  78             threadAssertTrue(sn == null);
  79             sn = s;
  80             notifyAll();
  81             if (throwOnCall)
  82                 throw new SPException();
  83             if (request)
  84                 sn.request(1L);
  85         }
  86         public synchronized void onNext(Integer t) {
  87             ++nexts;
  88             notifyAll();
  89             int current = t.intValue();
  90             threadAssertTrue(current >= last);
  91             last = current;
  92             if (request)
  93                 sn.request(1L);
  94             if (throwOnCall)
  95                 throw new SPException();
  96         }
  97         public synchronized void onError(Throwable t) {
  98             threadAssertTrue(completes == 0);
  99             threadAssertTrue(errors == 0);
 100             lastError = t;
 101             ++errors;
 102             notifyAll();
 103         }
 104         public synchronized void onComplete() {
 105             threadAssertTrue(completes == 0);
 106             ++completes;
 107             notifyAll();
 108         }
 109 
 110         synchronized void awaitSubscribe() {
 111             while (sn == null) {
 112                 try {
 113                     wait();
 114                 } catch (Exception ex) {
 115                     threadUnexpectedException(ex);
 116                     break;
 117                 }
 118             }
 119         }
 120         synchronized void awaitNext(int n) {
 121             while (nexts < n) {
 122                 try {
 123                     wait();
 124                 } catch (Exception ex) {
 125                     threadUnexpectedException(ex);
 126                     break;
 127                 }
 128             }
 129         }
 130         synchronized void awaitComplete() {
 131             while (completes == 0 && errors == 0) {
 132                 try {
 133                     wait();
 134                 } catch (Exception ex) {
 135                     threadUnexpectedException(ex);
 136                     break;
 137                 }
 138             }
 139         }
 140         synchronized void awaitError() {
 141             while (errors == 0) {
 142                 try {
 143                     wait();
 144                 } catch (Exception ex) {
 145                     threadUnexpectedException(ex);
 146                     break;
 147                 }
 148             }
 149         }
 150 
 151     }
 152 
 153     /**
 154      * A new SubmissionPublisher has no subscribers, a non-null
 155      * executor, a power-of-two capacity, is not closed, and reports
 156      * zero demand and lag
 157      */
 158     void checkInitialState(SubmissionPublisher<?> p) {
 159         assertFalse(p.hasSubscribers());
 160         assertEquals(0, p.getNumberOfSubscribers());
 161         assertTrue(p.getSubscribers().isEmpty());
 162         assertFalse(p.isClosed());
 163         assertNull(p.getClosedException());
 164         int n = p.getMaxBufferCapacity();
 165         assertTrue((n & (n - 1)) == 0); // power of two
 166         assertNotNull(p.getExecutor());
 167         assertEquals(0, p.estimateMinimumDemand());
 168         assertEquals(0, p.estimateMaximumLag());
 169     }
 170 
 171     /**
 172      * A default-constructed SubmissionPublisher has no subscribers,
 173      * is not closed, has default buffer size, and uses the
 174      * defaultExecutor
 175      */
 176     public void testConstructor1() {
 177         SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
 178         checkInitialState(p);
 179         assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
 180         Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
 181         if (ForkJoinPool.getCommonPoolParallelism() > 1)
 182             assertSame(e, c);
 183         else
 184             assertNotSame(e, c);
 185     }
 186 
 187     /**
 188      * A new SubmissionPublisher has no subscribers, is not closed,
 189      * has the given buffer size, and uses the given executor
 190      */
 191     public void testConstructor2() {
 192         Executor e = Executors.newFixedThreadPool(1);
 193         SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
 194         checkInitialState(p);
 195         assertSame(p.getExecutor(), e);
 196         assertEquals(8, p.getMaxBufferCapacity());
 197     }
 198 
 199     /**
 200      * A null Executor argument to SubmissionPublisher constructor
 201      * throws NullPointerException
 202      */
 203     public void testConstructor3() {
 204         try {
 205             new SubmissionPublisher<Integer>(null, 8);
 206             shouldThrow();
 207         } catch (NullPointerException success) {}
 208     }
 209 
 210     /**
 211      * A negative capacity argument to SubmissionPublisher constructor
 212      * throws IllegalArgumentException
 213      */
 214     public void testConstructor4() {
 215         Executor e = Executors.newFixedThreadPool(1);
 216         try {
 217             new SubmissionPublisher<Integer>(e, -1);
 218             shouldThrow();
 219         } catch (IllegalArgumentException success) {}
 220     }
 221 
 222     /**
 223      * A closed publisher reports isClosed with no closedException and
 224      * throws IllegalStateException upon attempted submission; a
 225      * subsequent close or closeExceptionally has no additional
 226      * effect.
 227      */
 228     public void testClose() {
 229         SubmissionPublisher<Integer> p = basicPublisher();
 230         checkInitialState(p);
 231         p.close();
 232         assertTrue(p.isClosed());
 233         assertNull(p.getClosedException());
 234         try {
 235             p.submit(1);
 236             shouldThrow();
 237         } catch (IllegalStateException success) {}
 238         Throwable ex = new SPException();
 239         p.closeExceptionally(ex);
 240         assertTrue(p.isClosed());
 241         assertNull(p.getClosedException());
 242     }
 243 
 244     /**
 245      * A publisher closedExceptionally reports isClosed with the
 246      * closedException and throws IllegalStateException upon attempted
 247      * submission; a subsequent close or closeExceptionally has no
 248      * additional effect.
 249      */
 250     public void testCloseExceptionally() {
 251         SubmissionPublisher<Integer> p = basicPublisher();
 252         checkInitialState(p);
 253         Throwable ex = new SPException();
 254         p.closeExceptionally(ex);
 255         assertTrue(p.isClosed());
 256         assertSame(p.getClosedException(), ex);
 257         try {
 258             p.submit(1);
 259             shouldThrow();
 260         } catch (IllegalStateException success) {}
 261         p.close();
 262         assertTrue(p.isClosed());
 263         assertSame(p.getClosedException(), ex);
 264     }
 265 
 266     /**
 267      * Upon subscription, the subscriber's onSubscribe is called, no
 268      * other Subscriber methods are invoked, the publisher
 269      * hasSubscribers, isSubscribed is true, and existing
 270      * subscriptions are unaffected.
 271      */
 272     public void testSubscribe1() {
 273         TestSubscriber s = new TestSubscriber();
 274         SubmissionPublisher<Integer> p = basicPublisher();
 275         p.subscribe(s);
 276         assertTrue(p.hasSubscribers());
 277         assertEquals(1, p.getNumberOfSubscribers());
 278         assertTrue(p.getSubscribers().contains(s));
 279         assertTrue(p.isSubscribed(s));
 280         s.awaitSubscribe();
 281         assertNotNull(s.sn);
 282         assertEquals(0, s.nexts);
 283         assertEquals(0, s.errors);
 284         assertEquals(0, s.completes);
 285         TestSubscriber s2 = new TestSubscriber();
 286         p.subscribe(s2);
 287         assertTrue(p.hasSubscribers());
 288         assertEquals(2, p.getNumberOfSubscribers());
 289         assertTrue(p.getSubscribers().contains(s));
 290         assertTrue(p.getSubscribers().contains(s2));
 291         assertTrue(p.isSubscribed(s));
 292         assertTrue(p.isSubscribed(s2));
 293         s2.awaitSubscribe();
 294         assertNotNull(s2.sn);
 295         assertEquals(0, s2.nexts);
 296         assertEquals(0, s2.errors);
 297         assertEquals(0, s2.completes);
 298         p.close();
 299     }
 300 
 301     /**
 302      * If closed, upon subscription, the subscriber's onComplete
 303      * method is invoked
 304      */
 305     public void testSubscribe2() {
 306         TestSubscriber s = new TestSubscriber();
 307         SubmissionPublisher<Integer> p = basicPublisher();
 308         p.close();
 309         p.subscribe(s);
 310         s.awaitComplete();
 311         assertEquals(0, s.nexts);
 312         assertEquals(0, s.errors);
 313         assertEquals(1, s.completes, 1);
 314     }
 315 
 316     /**
 317      * If closedExceptionally, upon subscription, the subscriber's
 318      * onError method is invoked
 319      */
 320     public void testSubscribe3() {
 321         TestSubscriber s = new TestSubscriber();
 322         SubmissionPublisher<Integer> p = basicPublisher();
 323         Throwable ex = new SPException();
 324         p.closeExceptionally(ex);
 325         assertTrue(p.isClosed());
 326         assertSame(p.getClosedException(), ex);
 327         p.subscribe(s);
 328         s.awaitError();
 329         assertEquals(0, s.nexts);
 330         assertEquals(1, s.errors);
 331     }
 332 
 333     /**
 334      * Upon attempted resubscription, the subscriber's onError is
 335      * called and the subscription is cancelled.
 336      */
 337     public void testSubscribe4() {
 338         TestSubscriber s = new TestSubscriber();
 339         SubmissionPublisher<Integer> p = basicPublisher();
 340         p.subscribe(s);
 341         assertTrue(p.hasSubscribers());
 342         assertEquals(1, p.getNumberOfSubscribers());
 343         assertTrue(p.getSubscribers().contains(s));
 344         assertTrue(p.isSubscribed(s));
 345         s.awaitSubscribe();
 346         assertNotNull(s.sn);
 347         assertEquals(0, s.nexts);
 348         assertEquals(0, s.errors);
 349         assertEquals(0, s.completes);
 350         p.subscribe(s);
 351         s.awaitError();
 352         assertEquals(0, s.nexts);
 353         assertEquals(1, s.errors);
 354         assertFalse(p.isSubscribed(s));
 355     }
 356 
 357     /**
 358      * An exception thrown in onSubscribe causes onError
 359      */
 360     public void testSubscribe5() {
 361         TestSubscriber s = new TestSubscriber();
 362         SubmissionPublisher<Integer> p = basicPublisher();
 363         s.throwOnCall = true;
 364         p.subscribe(s);
 365         s.awaitError();
 366         assertEquals(0, s.nexts);
 367         assertEquals(1, s.errors);
 368         assertEquals(0, s.completes);
 369     }
 370 
 371     /**
 372      * subscribe(null) throws NPE
 373      */
 374     public void testSubscribe6() {
 375         SubmissionPublisher<Integer> p = basicPublisher();
 376         try {
 377             p.subscribe(null);
 378             shouldThrow();
 379         } catch (NullPointerException success) {}
 380         checkInitialState(p);
 381     }
 382 
 383     /**
 384      * Closing a publisher causes onComplete to subscribers
 385      */
 386     public void testCloseCompletes() {
 387         SubmissionPublisher<Integer> p = basicPublisher();
 388         TestSubscriber s1 = new TestSubscriber();
 389         TestSubscriber s2 = new TestSubscriber();
 390         p.subscribe(s1);
 391         p.subscribe(s2);
 392         p.submit(1);
 393         p.close();
 394         assertTrue(p.isClosed());
 395         assertNull(p.getClosedException());
 396         s1.awaitComplete();
 397         assertEquals(1, s1.nexts);
 398         assertEquals(1, s1.completes);
 399         s2.awaitComplete();
 400         assertEquals(1, s2.nexts);
 401         assertEquals(1, s2.completes);
 402     }
 403 
 404     /**
 405      * Closing a publisher exceptionally causes onError to subscribers
 406      * after they are subscribed
 407      */
 408     public void testCloseExceptionallyError() {
 409         SubmissionPublisher<Integer> p = basicPublisher();
 410         TestSubscriber s1 = new TestSubscriber();
 411         TestSubscriber s2 = new TestSubscriber();
 412         p.subscribe(s1);
 413         p.subscribe(s2);
 414         p.submit(1);
 415         p.closeExceptionally(new SPException());
 416         assertTrue(p.isClosed());
 417         s1.awaitSubscribe();
 418         s1.awaitError();
 419         assertTrue(s1.nexts <= 1);
 420         assertEquals(1, s1.errors);
 421         s2.awaitSubscribe();
 422         s2.awaitError();
 423         assertTrue(s2.nexts <= 1);
 424         assertEquals(1, s2.errors);
 425     }
 426 
 427     /**
 428      * Cancelling a subscription eventually causes no more onNexts to be issued
 429      */
 430     public void testCancel() {
 431         SubmissionPublisher<Integer> p =
 432             new SubmissionPublisher<>(basicExecutor, 4); // must be < 20
 433         TestSubscriber s1 = new TestSubscriber();
 434         TestSubscriber s2 = new TestSubscriber();
 435         p.subscribe(s1);
 436         p.subscribe(s2);
 437         s1.awaitSubscribe();
 438         p.submit(1);
 439         s1.sn.cancel();
 440         for (int i = 2; i <= 20; ++i)
 441             p.submit(i);
 442         p.close();
 443         s2.awaitComplete();
 444         assertEquals(20, s2.nexts);
 445         assertEquals(1, s2.completes);
 446         assertTrue(s1.nexts < 20);
 447         assertFalse(p.isSubscribed(s1));
 448     }
 449 
 450     /**
 451      * Throwing an exception in onNext causes onError
 452      */
 453     public void testThrowOnNext() {
 454         SubmissionPublisher<Integer> p = basicPublisher();
 455         TestSubscriber s1 = new TestSubscriber();
 456         TestSubscriber s2 = new TestSubscriber();
 457         p.subscribe(s1);
 458         p.subscribe(s2);
 459         s1.awaitSubscribe();
 460         p.submit(1);
 461         s1.throwOnCall = true;
 462         p.submit(2);
 463         p.close();
 464         s2.awaitComplete();
 465         assertEquals(2, s2.nexts);
 466         s1.awaitComplete();
 467         assertEquals(1, s1.errors);
 468     }
 469 
 470     /**
 471      * If a handler is supplied in constructor, it is invoked when
 472      * subscriber throws an exception in onNext
 473      */
 474     public void testThrowOnNextHandler() {
 475         AtomicInteger calls = new AtomicInteger();
 476         SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
 477             basicExecutor, 8, (s, e) -> calls.getAndIncrement());
 478         TestSubscriber s1 = new TestSubscriber();
 479         TestSubscriber s2 = new TestSubscriber();
 480         p.subscribe(s1);
 481         p.subscribe(s2);
 482         s1.awaitSubscribe();
 483         p.submit(1);
 484         s1.throwOnCall = true;
 485         p.submit(2);
 486         p.close();
 487         s2.awaitComplete();
 488         assertEquals(2, s2.nexts);
 489         assertEquals(1, s2.completes);
 490         s1.awaitError();
 491         assertEquals(1, s1.errors);
 492         assertEquals(1, calls.get());
 493     }
 494 
 495     /**
 496      * onNext items are issued in the same order to each subscriber
 497      */
 498     public void testOrder() {
 499         SubmissionPublisher<Integer> p = basicPublisher();
 500         TestSubscriber s1 = new TestSubscriber();
 501         TestSubscriber s2 = new TestSubscriber();
 502         p.subscribe(s1);
 503         p.subscribe(s2);
 504         for (int i = 1; i <= 20; ++i)
 505             p.submit(i);
 506         p.close();
 507         s2.awaitComplete();
 508         s1.awaitComplete();
 509         assertEquals(20, s2.nexts);
 510         assertEquals(1, s2.completes);
 511         assertEquals(20, s1.nexts);
 512         assertEquals(1, s1.completes);
 513     }
 514 
 515     /**
 516      * onNext is issued only if requested
 517      */
 518     public void testRequest1() {
 519         SubmissionPublisher<Integer> p = basicPublisher();
 520         TestSubscriber s1 = new TestSubscriber();
 521         s1.request = false;
 522         p.subscribe(s1);
 523         s1.awaitSubscribe();
 524         assertEquals(0, p.estimateMinimumDemand());
 525         TestSubscriber s2 = new TestSubscriber();
 526         p.subscribe(s2);
 527         p.submit(1);
 528         p.submit(2);
 529         s2.awaitNext(1);
 530         assertEquals(0, s1.nexts);
 531         s1.sn.request(3);
 532         p.submit(3);
 533         p.close();
 534         s2.awaitComplete();
 535         assertEquals(3, s2.nexts);
 536         assertEquals(1, s2.completes);
 537         s1.awaitComplete();
 538         assertTrue(s1.nexts > 0);
 539         assertEquals(1, s1.completes);
 540     }
 541 
 542     /**
 543      * onNext is not issued when requests become zero
 544      */
 545     public void testRequest2() {
 546         SubmissionPublisher<Integer> p = basicPublisher();
 547         TestSubscriber s1 = new TestSubscriber();
 548         TestSubscriber s2 = new TestSubscriber();
 549         p.subscribe(s1);
 550         p.subscribe(s2);
 551         s2.awaitSubscribe();
 552         s1.awaitSubscribe();
 553         s1.request = false;
 554         p.submit(1);
 555         p.submit(2);
 556         p.close();
 557         s2.awaitComplete();
 558         assertEquals(2, s2.nexts);
 559         assertEquals(1, s2.completes);
 560         s1.awaitNext(1);
 561         assertEquals(1, s1.nexts);
 562     }
 563 
 564     /**
 565      * Non-positive request causes error
 566      */
 567     public void testRequest3() {
 568         SubmissionPublisher<Integer> p = basicPublisher();
 569         TestSubscriber s1 = new TestSubscriber();
 570         TestSubscriber s2 = new TestSubscriber();
 571         TestSubscriber s3 = new TestSubscriber();
 572         p.subscribe(s1);
 573         p.subscribe(s2);
 574         p.subscribe(s3);
 575         s3.awaitSubscribe();
 576         s2.awaitSubscribe();
 577         s1.awaitSubscribe();
 578         s1.sn.request(-1L);
 579         s3.sn.request(0L);
 580         p.submit(1);
 581         p.submit(2);
 582         p.close();
 583         s2.awaitComplete();
 584         assertEquals(2, s2.nexts);
 585         assertEquals(1, s2.completes);
 586         s1.awaitError();
 587         assertEquals(1, s1.errors);
 588         assertTrue(s1.lastError instanceof IllegalArgumentException);
 589         s3.awaitError();
 590         assertEquals(1, s3.errors);
 591         assertTrue(s3.lastError instanceof IllegalArgumentException);
 592     }
 593 
 594     /**
 595      * estimateMinimumDemand reports 0 until request, nonzero after
 596      * request
 597      */
 598     public void testEstimateMinimumDemand() {
 599         TestSubscriber s = new TestSubscriber();
 600         SubmissionPublisher<Integer> p = basicPublisher();
 601         s.request = false;
 602         p.subscribe(s);
 603         s.awaitSubscribe();
 604         assertEquals(0, p.estimateMinimumDemand());
 605         s.sn.request(1);
 606         assertEquals(1, p.estimateMinimumDemand());
 607     }
 608 
 609     /**
 610      * submit to a publisher with no subscribers returns lag 0
 611      */
 612     public void testEmptySubmit() {
 613         SubmissionPublisher<Integer> p = basicPublisher();
 614         assertEquals(0, p.submit(1));
 615     }
 616 
 617     /**
 618      * submit(null) throws NPE
 619      */
 620     public void testNullSubmit() {
 621         SubmissionPublisher<Integer> p = basicPublisher();
 622         try {
 623             p.submit(null);
 624             shouldThrow();
 625         } catch (NullPointerException success) {}
 626     }
 627 
 628     /**
 629      * submit returns number of lagged items, compatible with result
 630      * of estimateMaximumLag.
 631      */
 632     public void testLaggedSubmit() {
 633         SubmissionPublisher<Integer> p = basicPublisher();
 634         TestSubscriber s1 = new TestSubscriber();
 635         s1.request = false;
 636         TestSubscriber s2 = new TestSubscriber();
 637         s2.request = false;
 638         p.subscribe(s1);
 639         p.subscribe(s2);
 640         s2.awaitSubscribe();
 641         s1.awaitSubscribe();
 642         assertEquals(1, p.submit(1));
 643         assertTrue(p.estimateMaximumLag() >= 1);
 644         assertTrue(p.submit(2) >= 2);
 645         assertTrue(p.estimateMaximumLag() >= 2);
 646         s1.sn.request(4);
 647         assertTrue(p.submit(3) >= 3);
 648         assertTrue(p.estimateMaximumLag() >= 3);
 649         s2.sn.request(4);
 650         p.submit(4);
 651         p.close();
 652         s2.awaitComplete();
 653         assertEquals(4, s2.nexts);
 654         s1.awaitComplete();
 655         assertEquals(4, s2.nexts);
 656     }
 657 
 658     /**
 659      * submit eventually issues requested items when buffer capacity is 1
 660      */
 661     public void testCap1Submit() {
 662         SubmissionPublisher<Integer> p
 663             = new SubmissionPublisher<>(basicExecutor, 1);
 664         TestSubscriber s1 = new TestSubscriber();
 665         TestSubscriber s2 = new TestSubscriber();
 666         p.subscribe(s1);
 667         p.subscribe(s2);
 668         for (int i = 1; i <= 20; ++i) {
 669             assertTrue(p.submit(i) >= 0);
 670         }
 671         p.close();
 672         s2.awaitComplete();
 673         s1.awaitComplete();
 674         assertEquals(20, s2.nexts);
 675         assertEquals(1, s2.completes);
 676         assertEquals(20, s1.nexts);
 677         assertEquals(1, s1.completes);
 678     }
 679 
 680     static boolean noopHandle(AtomicInteger count) {
 681         count.getAndIncrement();
 682         return false;
 683     }
 684 
 685     static boolean reqHandle(AtomicInteger count, Subscriber s) {
 686         count.getAndIncrement();
 687         ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
 688         return true;
 689     }
 690 
 691     /**
 692      * offer to a publisher with no subscribers returns lag 0
 693      */
 694     public void testEmptyOffer() {
 695         SubmissionPublisher<Integer> p = basicPublisher();
 696         assertEquals(0, p.offer(1, null));
 697     }
 698 
 699     /**
 700      * offer(null) throws NPE
 701      */
 702     public void testNullOffer() {
 703         SubmissionPublisher<Integer> p = basicPublisher();
 704         try {
 705             p.offer(null, null);
 706             shouldThrow();
 707         } catch (NullPointerException success) {}
 708     }
 709 
 710     /**
 711      * offer returns number of lagged items if not saturated
 712      */
 713     public void testLaggedOffer() {
 714         SubmissionPublisher<Integer> p = basicPublisher();
 715         TestSubscriber s1 = new TestSubscriber();
 716         s1.request = false;
 717         TestSubscriber s2 = new TestSubscriber();
 718         s2.request = false;
 719         p.subscribe(s1);
 720         p.subscribe(s2);
 721         s2.awaitSubscribe();
 722         s1.awaitSubscribe();
 723         assertTrue(p.offer(1, null) >= 1);
 724         assertTrue(p.offer(2, null) >= 2);
 725         s1.sn.request(4);
 726         assertTrue(p.offer(3, null) >= 3);
 727         s2.sn.request(4);
 728         p.offer(4, null);
 729         p.close();
 730         s2.awaitComplete();
 731         assertEquals(4, s2.nexts);
 732         s1.awaitComplete();
 733         assertEquals(4, s2.nexts);
 734     }
 735 
 736     /**
 737      * offer reports drops if saturated
 738      */
 739     public void testDroppedOffer() {
 740         SubmissionPublisher<Integer> p
 741             = new SubmissionPublisher<>(basicExecutor, 4);
 742         TestSubscriber s1 = new TestSubscriber();
 743         s1.request = false;
 744         TestSubscriber s2 = new TestSubscriber();
 745         s2.request = false;
 746         p.subscribe(s1);
 747         p.subscribe(s2);
 748         s2.awaitSubscribe();
 749         s1.awaitSubscribe();
 750         for (int i = 1; i <= 4; ++i)
 751             assertTrue(p.offer(i, null) >= 0);
 752         p.offer(5, null);
 753         assertTrue(p.offer(6, null) < 0);
 754         s1.sn.request(64);
 755         assertTrue(p.offer(7, null) < 0);
 756         s2.sn.request(64);
 757         p.close();
 758         s2.awaitComplete();
 759         assertTrue(s2.nexts >= 4);
 760         s1.awaitComplete();
 761         assertTrue(s1.nexts >= 4);
 762     }
 763 
 764     /**
 765      * offer invokes drop handler if saturated
 766      */
 767     public void testHandledDroppedOffer() {
 768         AtomicInteger calls = new AtomicInteger();
 769         SubmissionPublisher<Integer> p
 770             = new SubmissionPublisher<>(basicExecutor, 4);
 771         TestSubscriber s1 = new TestSubscriber();
 772         s1.request = false;
 773         TestSubscriber s2 = new TestSubscriber();
 774         s2.request = false;
 775         p.subscribe(s1);
 776         p.subscribe(s2);
 777         s2.awaitSubscribe();
 778         s1.awaitSubscribe();
 779         for (int i = 1; i <= 4; ++i)
 780             assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
 781         p.offer(4, (s, x) -> noopHandle(calls));
 782         assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
 783         s1.sn.request(64);
 784         assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
 785         s2.sn.request(64);
 786         p.close();
 787         s2.awaitComplete();
 788         s1.awaitComplete();
 789         assertTrue(calls.get() >= 4);
 790     }
 791 
 792     /**
 793      * offer succeeds if drop handler forces request
 794      */
 795     public void testRecoveredHandledDroppedOffer() {
 796         AtomicInteger calls = new AtomicInteger();
 797         SubmissionPublisher<Integer> p
 798             = new SubmissionPublisher<>(basicExecutor, 4);
 799         TestSubscriber s1 = new TestSubscriber();
 800         s1.request = false;
 801         TestSubscriber s2 = new TestSubscriber();
 802         s2.request = false;
 803         p.subscribe(s1);
 804         p.subscribe(s2);
 805         s2.awaitSubscribe();
 806         s1.awaitSubscribe();
 807         int n = 0;
 808         for (int i = 1; i <= 8; ++i) {
 809             int d = p.offer(i, (s, x) -> reqHandle(calls, s));
 810             n = n + 2 + (d < 0 ? d : 0);
 811         }
 812         p.close();
 813         s2.awaitComplete();
 814         s1.awaitComplete();
 815         assertEquals(n, s1.nexts + s2.nexts);
 816         assertTrue(calls.get() >= 2);
 817     }
 818 
 819     /**
 820      * Timed offer to a publisher with no subscribers returns lag 0
 821      */
 822     public void testEmptyTimedOffer() {
 823         SubmissionPublisher<Integer> p = basicPublisher();
 824         long startTime = System.nanoTime();
 825         assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
 826         assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
 827     }
 828 
 829     /**
 830      * Timed offer with null item or TimeUnit throws NPE
 831      */
 832     public void testNullTimedOffer() {
 833         SubmissionPublisher<Integer> p = basicPublisher();
 834         long startTime = System.nanoTime();
 835         try {
 836             p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
 837             shouldThrow();
 838         } catch (NullPointerException success) {}
 839         try {
 840             p.offer(1, LONG_DELAY_MS, null, null);
 841             shouldThrow();
 842         } catch (NullPointerException success) {}
 843         assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
 844     }
 845 
 846     /**
 847      * Timed offer returns number of lagged items if not saturated
 848      */
 849     public void testLaggedTimedOffer() {
 850         SubmissionPublisher<Integer> p = basicPublisher();
 851         TestSubscriber s1 = new TestSubscriber();
 852         s1.request = false;
 853         TestSubscriber s2 = new TestSubscriber();
 854         s2.request = false;
 855         p.subscribe(s1);
 856         p.subscribe(s2);
 857         s2.awaitSubscribe();
 858         s1.awaitSubscribe();
 859         long startTime = System.nanoTime();
 860         assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
 861         assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
 862         s1.sn.request(4);
 863         assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
 864         s2.sn.request(4);
 865         p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
 866         p.close();
 867         s2.awaitComplete();
 868         assertEquals(4, s2.nexts);
 869         s1.awaitComplete();
 870         assertEquals(4, s2.nexts);
 871         assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
 872     }
 873 
 874     /**
 875      * Timed offer reports drops if saturated
 876      */
 877     public void testDroppedTimedOffer() {
 878         SubmissionPublisher<Integer> p
 879             = new SubmissionPublisher<>(basicExecutor, 4);
 880         TestSubscriber s1 = new TestSubscriber();
 881         s1.request = false;
 882         TestSubscriber s2 = new TestSubscriber();
 883         s2.request = false;
 884         p.subscribe(s1);
 885         p.subscribe(s2);
 886         s2.awaitSubscribe();
 887         s1.awaitSubscribe();
 888         long delay = timeoutMillis();
 889         for (int i = 1; i <= 4; ++i)
 890             assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
 891         long startTime = System.nanoTime();
 892         assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
 893         s1.sn.request(64);
 894         assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
 895         // 2 * delay should elapse but check only 1 * delay to allow timer slop
 896         assertTrue(millisElapsedSince(startTime) >= delay);
 897         s2.sn.request(64);
 898         p.close();
 899         s2.awaitComplete();
 900         assertTrue(s2.nexts >= 2);
 901         s1.awaitComplete();
 902         assertTrue(s1.nexts >= 2);
 903     }
 904 
 905     /**
 906      * Timed offer invokes drop handler if saturated
 907      */
 908     public void testHandledDroppedTimedOffer() {
 909         AtomicInteger calls = new AtomicInteger();
 910         SubmissionPublisher<Integer> p
 911             = new SubmissionPublisher<>(basicExecutor, 4);
 912         TestSubscriber s1 = new TestSubscriber();
 913         s1.request = false;
 914         TestSubscriber s2 = new TestSubscriber();
 915         s2.request = false;
 916         p.subscribe(s1);
 917         p.subscribe(s2);
 918         s2.awaitSubscribe();
 919         s1.awaitSubscribe();
 920         long delay = timeoutMillis();
 921         for (int i = 1; i <= 4; ++i)
 922             assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
 923         long startTime = System.nanoTime();
 924         assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
 925         s1.sn.request(64);
 926         assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
 927         assertTrue(millisElapsedSince(startTime) >= delay);
 928         s2.sn.request(64);
 929         p.close();
 930         s2.awaitComplete();
 931         s1.awaitComplete();
 932         assertTrue(calls.get() >= 2);
 933     }
 934 
 935     /**
 936      * Timed offer succeeds if drop handler forces request
 937      */
 938     public void testRecoveredHandledDroppedTimedOffer() {
 939         AtomicInteger calls = new AtomicInteger();
 940         SubmissionPublisher<Integer> p
 941             = new SubmissionPublisher<>(basicExecutor, 4);
 942         TestSubscriber s1 = new TestSubscriber();
 943         s1.request = false;
 944         TestSubscriber s2 = new TestSubscriber();
 945         s2.request = false;
 946         p.subscribe(s1);
 947         p.subscribe(s2);
 948         s2.awaitSubscribe();
 949         s1.awaitSubscribe();
 950         int n = 0;
 951         long delay = timeoutMillis();
 952         long startTime = System.nanoTime();
 953         for (int i = 1; i <= 6; ++i) {
 954             int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
 955             n = n + 2 + (d < 0 ? d : 0);
 956         }
 957         assertTrue(millisElapsedSince(startTime) >= delay);
 958         p.close();
 959         s2.awaitComplete();
 960         s1.awaitComplete();
 961         assertEquals(n, s1.nexts + s2.nexts);
 962         assertTrue(calls.get() >= 2);
 963     }
 964 
 965     /**
 966      * consume returns a CompletableFuture that is done when
 967      * publisher completes
 968      */
 969     public void testConsume() {
 970         AtomicInteger sum = new AtomicInteger();
 971         SubmissionPublisher<Integer> p = basicPublisher();
 972         CompletableFuture<Void> f =
 973             p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
 974         int n = 20;
 975         for (int i = 1; i <= n; ++i)
 976             p.submit(i);
 977         p.close();
 978         f.join();
 979         assertEquals((n * (n + 1)) / 2, sum.get());
 980     }
 981 
 982     /**
 983      * consume(null) throws NPE
 984      */
 985     public void testConsumeNPE() {
 986         SubmissionPublisher<Integer> p = basicPublisher();
 987         try {
 988             CompletableFuture<Void> f = p.consume(null);
 989             shouldThrow();
 990         } catch (NullPointerException success) {}
 991     }
 992 
 993     /**
 994      * consume eventually stops processing published items if cancelled
 995      */
 996     public void testCancelledConsume() {
 997         AtomicInteger count = new AtomicInteger();
 998         SubmissionPublisher<Integer> p = basicPublisher();
 999         CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
1000         f.cancel(true);
1001         int n = 1000000; // arbitrary limit
1002         for (int i = 1; i <= n; ++i)
1003             p.submit(i);
1004         assertTrue(count.get() < n);
1005     }
1006 
1007     /**
1008      * Tests scenario for
1009      * JDK-8187947: A race condition in SubmissionPublisher
1010      * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java
1011      */
1012     public void testMissedSignal_8187947() throws Exception {
1013         if (!atLeastJava9()) return; // backport to jdk8 too hard
1014         final int N =
1015             ((ForkJoinPool.getCommonPoolParallelism() < 2) // JDK-8212899
1016              ? (1 << 5)
1017              : (1 << 10))
1018             * (expensiveTests ? (1 << 10) : 1);
1019         final CountDownLatch finished = new CountDownLatch(1);
1020         final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>();
1021         class Sub implements Subscriber<Boolean> {
1022             int received;
1023             public void onSubscribe(Subscription s) {
1024                 s.request(N);
1025             }
1026             public void onNext(Boolean item) {
1027                 if (++received == N)
1028                     finished.countDown();
1029                 else
1030                     CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1031             }
1032             public void onError(Throwable t) { throw new AssertionError(t); }
1033             public void onComplete() {}
1034         }
1035         pub.subscribe(new Sub());
1036         checkTimedGet(
1037             CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)),
1038             null);
1039         await(finished);
1040     }
1041 }