968 */
969 public void testConsume() {
970 AtomicInteger sum = new AtomicInteger();
971 SubmissionPublisher<Integer> p = basicPublisher();
972 CompletableFuture<Void> f =
973 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
974 int n = 20;
975 for (int i = 1; i <= n; ++i)
976 p.submit(i);
977 p.close();
978 f.join();
979 assertEquals((n * (n + 1)) / 2, sum.get());
980 }
981
982 /**
983 * consume(null) throws NPE
984 */
985 public void testConsumeNPE() {
986 SubmissionPublisher<Integer> p = basicPublisher();
987 try {
988 CompletableFuture<Void> f = p.consume(null);
989 shouldThrow();
990 } catch (NullPointerException success) {}
991 }
992
993 /**
994 * consume eventually stops processing published items if cancelled
995 */
996 public void testCancelledConsume() {
997 AtomicInteger count = new AtomicInteger();
998 SubmissionPublisher<Integer> p = basicPublisher();
999 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
1000 f.cancel(true);
1001 int n = 1000000; // arbitrary limit
1002 for (int i = 1; i <= n; ++i)
1003 p.submit(i);
1004 assertTrue(count.get() < n);
1005 }
1006
1007 /**
1008 * Tests scenario for
|
968 */
969 public void testConsume() {
970 AtomicInteger sum = new AtomicInteger();
971 SubmissionPublisher<Integer> p = basicPublisher();
972 CompletableFuture<Void> f =
973 p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
974 int n = 20;
975 for (int i = 1; i <= n; ++i)
976 p.submit(i);
977 p.close();
978 f.join();
979 assertEquals((n * (n + 1)) / 2, sum.get());
980 }
981
982 /**
983 * consume(null) throws NPE
984 */
985 public void testConsumeNPE() {
986 SubmissionPublisher<Integer> p = basicPublisher();
987 try {
988 CompletableFuture<Void> unused = p.consume(null);
989 shouldThrow();
990 } catch (NullPointerException success) {}
991 }
992
993 /**
994 * consume eventually stops processing published items if cancelled
995 */
996 public void testCancelledConsume() {
997 AtomicInteger count = new AtomicInteger();
998 SubmissionPublisher<Integer> p = basicPublisher();
999 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
1000 f.cancel(true);
1001 int n = 1000000; // arbitrary limit
1002 for (int i = 1; i <= n; ++i)
1003 p.submit(i);
1004 assertTrue(count.get() < n);
1005 }
1006
1007 /**
1008 * Tests scenario for
|