1012 public void testMissedSignal_8187947() throws Exception {
1013 if (!atLeastJava9()) return; // backport to jdk8 too hard
1014 final int N = expensiveTests ? (1 << 20) : (1 << 10);
1015 final CountDownLatch finished = new CountDownLatch(1);
1016 final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>();
1017 class Sub implements Subscriber<Boolean> {
1018 int received;
1019 public void onSubscribe(Subscription s) {
1020 s.request(N);
1021 }
1022 public void onNext(Boolean item) {
1023 if (++received == N)
1024 finished.countDown();
1025 else
1026 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1027 }
1028 public void onError(Throwable t) { throw new AssertionError(t); }
1029 public void onComplete() {}
1030 }
1031 pub.subscribe(new Sub());
1032 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1033 await(finished);
1034 }
1035 }
|
1012 public void testMissedSignal_8187947() throws Exception {
1013 if (!atLeastJava9()) return; // backport to jdk8 too hard
1014 final int N = expensiveTests ? (1 << 20) : (1 << 10);
1015 final CountDownLatch finished = new CountDownLatch(1);
1016 final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>();
1017 class Sub implements Subscriber<Boolean> {
1018 int received;
1019 public void onSubscribe(Subscription s) {
1020 s.request(N);
1021 }
1022 public void onNext(Boolean item) {
1023 if (++received == N)
1024 finished.countDown();
1025 else
1026 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
1027 }
1028 public void onError(Throwable t) { throw new AssertionError(t); }
1029 public void onComplete() {}
1030 }
1031 pub.subscribe(new Sub());
1032 checkTimedGet(
1033 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)),
1034 null);
1035 await(finished);
1036 }
1037 }
|