1 /*
   2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 package org.openjdk.tests.java.util.stream;
  24 
  25 import java.util.ArrayList;
  26 import java.util.Arrays;
  27 import java.util.Collection;
  28 import java.util.Collections;
  29 import java.util.Comparator;
  30 import java.util.HashMap;
  31 import java.util.HashSet;
  32 import java.util.Iterator;
  33 import java.util.List;
  34 import java.util.Map;
  35 import java.util.Optional;
  36 import java.util.Set;
  37 import java.util.StringJoiner;
  38 import java.util.TreeMap;
  39 import java.util.concurrent.ConcurrentHashMap;
  40 import java.util.concurrent.ConcurrentSkipListMap;
  41 import java.util.concurrent.atomic.AtomicInteger;
  42 import java.util.function.BinaryOperator;
  43 import java.util.function.Function;
  44 import java.util.function.Predicate;
  45 import java.util.function.Supplier;
  46 import java.util.stream.Collector;
  47 import java.util.stream.Collectors;
  48 import java.util.stream.LambdaTestHelpers;
  49 import java.util.stream.OpTestCase;
  50 import java.util.stream.Stream;
  51 import java.util.stream.StreamOpFlagTestHelper;
  52 import java.util.stream.StreamTestDataProvider;
  53 import java.util.stream.TestData;
  54 
  55 import org.testng.annotations.Test;
  56 
  57 import static java.util.stream.Collectors.collectingAndThen;
  58 import static java.util.stream.Collectors.flatMapping;
  59 import static java.util.stream.Collectors.groupingBy;
  60 import static java.util.stream.Collectors.groupingByConcurrent;
  61 import static java.util.stream.Collectors.mapping;
  62 import static java.util.stream.Collectors.partitioningBy;
  63 import static java.util.stream.Collectors.reducing;
  64 import static java.util.stream.Collectors.toCollection;
  65 import static java.util.stream.Collectors.toConcurrentMap;
  66 import static java.util.stream.Collectors.toList;
  67 import static java.util.stream.Collectors.toMap;
  68 import static java.util.stream.Collectors.toSet;
  69 import static java.util.stream.LambdaTestHelpers.assertContents;
  70 import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
  71 import static java.util.stream.LambdaTestHelpers.mDoubler;
  72 
  73 /*
  74  * @test
  75  * @bug 8071600
  76  * @summary Test for collectors.
  77  */
  78 public class CollectorsTest extends OpTestCase {
  79 
  80     private static abstract class CollectorAssertion<T, U> {
  81         abstract void assertValue(U value,
  82                                   Supplier<Stream<T>> source,
  83                                   boolean ordered) throws ReflectiveOperationException;
  84     }
  85 
  86     static class MappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
  87         private final Function<T, V> mapper;
  88         private final CollectorAssertion<V, R> downstream;
  89 
  90         MappingAssertion(Function<T, V> mapper, CollectorAssertion<V, R> downstream) {
  91             this.mapper = mapper;
  92             this.downstream = downstream;
  93         }
  94 
  95         @Override
  96         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
  97             downstream.assertValue(value,
  98                                    () -> source.get().map(mapper::apply),
  99                                    ordered);
 100         }
 101     }
 102 
 103     static class FlatMappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
 104         private final Function<T, Stream<V>> mapper;
 105         private final CollectorAssertion<V, R> downstream;
 106 
 107         FlatMappingAssertion(Function<T, Stream<V>> mapper,
 108                              CollectorAssertion<V, R> downstream) {
 109             this.mapper = mapper;
 110             this.downstream = downstream;
 111         }
 112 
 113         @Override
 114         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
 115             downstream.assertValue(value,
 116                                    () -> source.get().flatMap(mapper::apply),
 117                                    ordered);
 118         }
 119     }
 120 
 121     static class GroupingByAssertion<T, K, V, M extends Map<K, ? extends V>> extends CollectorAssertion<T, M> {
 122         private final Class<? extends Map> clazz;
 123         private final Function<T, K> classifier;
 124         private final CollectorAssertion<T,V> downstream;
 125 
 126         GroupingByAssertion(Function<T, K> classifier, Class<? extends Map> clazz,
 127                             CollectorAssertion<T, V> downstream) {
 128             this.clazz = clazz;
 129             this.classifier = classifier;
 130             this.downstream = downstream;
 131         }
 132 
 133         @Override
 134         void assertValue(M map,
 135                          Supplier<Stream<T>> source,
 136                          boolean ordered) throws ReflectiveOperationException {
 137             if (!clazz.isAssignableFrom(map.getClass()))
 138                 fail(String.format("Class mismatch in GroupingByAssertion: %s, %s", clazz, map.getClass()));
 139             assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
 140             for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
 141                 K key = entry.getKey();
 142                 downstream.assertValue(entry.getValue(),
 143                                        () -> source.get().filter(e -> classifier.apply(e).equals(key)),
 144                                        ordered);
 145             }
 146         }
 147     }
 148 
 149     static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends CollectorAssertion<T, M> {
 150         private final Class<? extends Map> clazz;
 151         private final Function<T, K> keyFn;
 152         private final Function<T, V> valueFn;
 153         private final BinaryOperator<V> mergeFn;
 154 
 155         ToMapAssertion(Function<T, K> keyFn,
 156                        Function<T, V> valueFn,
 157                        BinaryOperator<V> mergeFn,
 158                        Class<? extends Map> clazz) {
 159             this.clazz = clazz;
 160             this.keyFn = keyFn;
 161             this.valueFn = valueFn;
 162             this.mergeFn = mergeFn;
 163         }
 164 
 165         @Override
 166         void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
 167             if (!clazz.isAssignableFrom(map.getClass()))
 168                 fail(String.format("Class mismatch in ToMapAssertion: %s, %s", clazz, map.getClass()));
 169             Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
 170             assertEquals(uniqueKeys, map.keySet());
 171             source.get().forEach(t -> {
 172                 K key = keyFn.apply(t);
 173                 V v = source.get()
 174                             .filter(e -> key.equals(keyFn.apply(e)))
 175                             .map(valueFn)
 176                             .reduce(mergeFn)
 177                             .get();
 178                 assertEquals(map.get(key), v);
 179             });
 180         }
 181     }
 182 
 183     static class PartitioningByAssertion<T, D> extends CollectorAssertion<T, Map<Boolean,D>> {
 184         private final Predicate<T> predicate;
 185         private final CollectorAssertion<T,D> downstream;
 186 
 187         PartitioningByAssertion(Predicate<T> predicate, CollectorAssertion<T, D> downstream) {
 188             this.predicate = predicate;
 189             this.downstream = downstream;
 190         }
 191 
 192         @Override
 193         void assertValue(Map<Boolean, D> map,
 194                          Supplier<Stream<T>> source,
 195                          boolean ordered) throws ReflectiveOperationException {
 196             if (!Map.class.isAssignableFrom(map.getClass()))
 197                 fail(String.format("Class mismatch in PartitioningByAssertion: %s", map.getClass()));
 198             assertEquals(2, map.size());
 199             downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
 200             downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
 201         }
 202     }
 203 
 204     static class ToListAssertion<T> extends CollectorAssertion<T, List<T>> {
 205         @Override
 206         void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
 207                 throws ReflectiveOperationException {
 208             if (!List.class.isAssignableFrom(value.getClass()))
 209                 fail(String.format("Class mismatch in ToListAssertion: %s", value.getClass()));
 210             Stream<T> stream = source.get();
 211             List<T> result = new ArrayList<>();
 212             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
 213                 result.add(it.next());
 214             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
 215                 assertContents(value, result);
 216             else
 217                 assertContentsUnordered(value, result);
 218         }
 219     }
 220 
 221     static class ToCollectionAssertion<T> extends CollectorAssertion<T, Collection<T>> {
 222         private final Class<? extends Collection> clazz;
 223         private final boolean targetOrdered;
 224 
 225         ToCollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
 226             this.clazz = clazz;
 227             this.targetOrdered = targetOrdered;
 228         }
 229 
 230         @Override
 231         void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
 232                 throws ReflectiveOperationException {
 233             if (!clazz.isAssignableFrom(value.getClass()))
 234                 fail(String.format("Class mismatch in ToCollectionAssertion: %s, %s", clazz, value.getClass()));
 235             Stream<T> stream = source.get();
 236             Collection<T> result = clazz.newInstance();
 237             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
 238                 result.add(it.next());
 239             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
 240                 assertContents(value, result);
 241             else
 242                 assertContentsUnordered(value, result);
 243         }
 244     }
 245 
 246     static class ReducingAssertion<T, U> extends CollectorAssertion<T, U> {
 247         private final U identity;
 248         private final Function<T, U> mapper;
 249         private final BinaryOperator<U> reducer;
 250 
 251         ReducingAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
 252             this.identity = identity;
 253             this.mapper = mapper;
 254             this.reducer = reducer;
 255         }
 256 
 257         @Override
 258         void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
 259                 throws ReflectiveOperationException {
 260             Optional<U> reduced = source.get().map(mapper).reduce(reducer);
 261             if (value == null)
 262                 assertTrue(!reduced.isPresent());
 263             else if (!reduced.isPresent()) {
 264                 assertEquals(value, identity);
 265             }
 266             else {
 267                 assertEquals(value, reduced.get());
 268             }
 269         }
 270     }
 271 
 272     private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
 273         return (act, exp, ord, par) -> {
 274             if (par && (!ordered || !ord)) {
 275                 CollectorsTest.nestedMapEqualityAssertion(act, exp);
 276             }
 277             else {
 278                 LambdaTestHelpers.assertContentsEqual(act, exp);
 279             }
 280         };
 281     }
 282 
 283     private<T, M extends Map>
 284     void exerciseMapCollection(TestData<T, Stream<T>> data,
 285                                Collector<T, ?, ? extends M> collector,
 286                                CollectorAssertion<T, M> assertion)
 287             throws ReflectiveOperationException {
 288         boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
 289 
 290         M m = withData(data)
 291                 .terminal(s -> s.collect(collector))
 292                 .resultAsserter(mapTabulationAsserter(ordered))
 293                 .exercise();
 294         assertion.assertValue(m, () -> data.stream(), ordered);
 295 
 296         m = withData(data)
 297                 .terminal(s -> s.unordered().collect(collector))
 298                 .resultAsserter(mapTabulationAsserter(ordered))
 299                 .exercise();
 300         assertion.assertValue(m, () -> data.stream(), false);
 301     }
 302 
 303     private static void nestedMapEqualityAssertion(Object o1, Object o2) {
 304         if (o1 instanceof Map) {
 305             Map m1 = (Map) o1;
 306             Map m2 = (Map) o2;
 307             assertContentsUnordered(m1.keySet(), m2.keySet());
 308             for (Object k : m1.keySet())
 309                 nestedMapEqualityAssertion(m1.get(k), m2.get(k));
 310         }
 311         else if (o1 instanceof Collection) {
 312             assertContentsUnordered(((Collection) o1), ((Collection) o2));
 313         }
 314         else
 315             assertEquals(o1, o2);
 316     }
 317 
 318     private<T, R> void assertCollect(TestData.OfRef<T> data,
 319                                      Collector<T, ?, R> collector,
 320                                      Function<Stream<T>, R> streamReduction) {
 321         R check = streamReduction.apply(data.stream());
 322         withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
 323     }
 324 
 325     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 326     public void testReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 327         assertCollect(data, Collectors.reducing(0, Integer::sum),
 328                       s -> s.reduce(0, Integer::sum));
 329         assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
 330                       s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
 331         assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
 332                       s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
 333 
 334         assertCollect(data, Collectors.reducing(Integer::sum),
 335                       s -> s.reduce(Integer::sum));
 336         assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
 337                       s -> s.min(Integer::compare));
 338         assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
 339                       s -> s.max(Integer::compare));
 340 
 341         assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
 342                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
 343 
 344         assertCollect(data, Collectors.summingLong(x -> x * 2L),
 345                       s -> s.map(x -> x*2L).reduce(0L, Long::sum));
 346         assertCollect(data, Collectors.summingInt(x -> x * 2),
 347                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
 348         assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
 349                       s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
 350 
 351         assertCollect(data, Collectors.averagingInt(x -> x * 2),
 352                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
 353         assertCollect(data, Collectors.averagingLong(x -> x * 2),
 354                       s -> s.mapToLong(x -> x * 2).average().orElse(0));
 355         assertCollect(data, Collectors.averagingDouble(x -> x * 2),
 356                       s -> s.mapToDouble(x -> x * 2).average().orElse(0));
 357 
 358         // Test explicit Collector.of
 359         Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
 360                                                                    (a, b) -> {
 361                                                                        a[0] += b * 2;
 362                                                                        a[1]++;
 363                                                                    },
 364                                                                    (a, b) -> {
 365                                                                        a[0] += b[0];
 366                                                                        a[1] += b[1];
 367                                                                        return a;
 368                                                                    },
 369                                                                    a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
 370         assertCollect(data, avg2xint,
 371                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
 372     }
 373 
 374     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 375     public void testJoining(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 376         withData(data)
 377                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
 378                 .expectedResult(join(data, ""))
 379                 .exercise();
 380 
 381         Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
 382         withData(data)
 383                 .terminal(s -> s.map(Object::toString).collect(likeJoining))
 384                 .expectedResult(join(data, ""))
 385                 .exercise();
 386 
 387         withData(data)
 388                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
 389                 .expectedResult(join(data, ","))
 390                 .exercise();
 391 
 392         withData(data)
 393                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
 394                 .expectedResult("[" + join(data, ",") + "]")
 395                 .exercise();
 396 
 397         withData(data)
 398                 .terminal(s -> s.map(Object::toString)
 399                                 .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
 400                                 .toString())
 401                 .expectedResult(join(data, ""))
 402                 .exercise();
 403 
 404         withData(data)
 405                 .terminal(s -> s.map(Object::toString)
 406                                 .collect(() -> new StringJoiner(","),
 407                                          (sj, cs) -> sj.add(cs),
 408                                          (j1, j2) -> j1.merge(j2))
 409                                 .toString())
 410                 .expectedResult(join(data, ","))
 411                 .exercise();
 412 
 413         withData(data)
 414                 .terminal(s -> s.map(Object::toString)
 415                                 .collect(() -> new StringJoiner(",", "[", "]"),
 416                                          (sj, cs) -> sj.add(cs),
 417                                          (j1, j2) -> j1.merge(j2))
 418                                 .toString())
 419                 .expectedResult("[" + join(data, ",") + "]")
 420                 .exercise();
 421     }
 422 
 423     private<T> String join(TestData.OfRef<T> data, String delim) {
 424         StringBuilder sb = new StringBuilder();
 425         boolean first = true;
 426         for (T i : data) {
 427             if (!first)
 428                 sb.append(delim);
 429             sb.append(i.toString());
 430             first = false;
 431         }
 432         return sb.toString();
 433     }
 434 
 435     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 436     public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 437         Function<Integer, Integer> keyFn = i -> i * 2;
 438         Function<Integer, Integer> valueFn = i -> i * 4;
 439 
 440         List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
 441         Set<Integer> dataAsSet = new HashSet<>(dataAsList);
 442 
 443         BinaryOperator<Integer> sum = Integer::sum;
 444         for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
 445                                                         (u, v) -> v,
 446                                                         sum)) {
 447             try {
 448                 exerciseMapCollection(data, toMap(keyFn, valueFn),
 449                                       new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
 450                 if (dataAsList.size() != dataAsSet.size())
 451                     fail("Expected ISE on input with duplicates");
 452             }
 453             catch (IllegalStateException e) {
 454                 if (dataAsList.size() == dataAsSet.size())
 455                     fail("Expected no ISE on input without duplicates");
 456             }
 457 
 458             exerciseMapCollection(data, toMap(keyFn, valueFn, op),
 459                                   new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
 460 
 461             exerciseMapCollection(data, toMap(keyFn, valueFn, op, TreeMap::new),
 462                                   new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
 463         }
 464 
 465         // For concurrent maps, only use commutative merge functions
 466         try {
 467             exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn),
 468                                   new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
 469             if (dataAsList.size() != dataAsSet.size())
 470                 fail("Expected ISE on input with duplicates");
 471         }
 472         catch (IllegalStateException e) {
 473             if (dataAsList.size() == dataAsSet.size())
 474                 fail("Expected no ISE on input without duplicates");
 475         }
 476 
 477         exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum),
 478                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
 479 
 480         exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
 481                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
 482     }
 483 
 484     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 485     public void testSimpleGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 486         Function<Integer, Integer> classifier = i -> i % 3;
 487 
 488         // Single-level groupBy
 489         exerciseMapCollection(data, groupingBy(classifier),
 490                               new GroupingByAssertion<>(classifier, HashMap.class,
 491                                                         new ToListAssertion<>()));
 492         exerciseMapCollection(data, groupingByConcurrent(classifier),
 493                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 494                                                         new ToListAssertion<>()));
 495 
 496         // With explicit constructors
 497         exerciseMapCollection(data,
 498                               groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
 499                               new GroupingByAssertion<>(classifier, TreeMap.class,
 500                                                         new ToCollectionAssertion<Integer>(HashSet.class, false)));
 501         exerciseMapCollection(data,
 502                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
 503                                                    toCollection(HashSet::new)),
 504                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 505                                                         new ToCollectionAssertion<Integer>(HashSet.class, false)));
 506     }
 507 
 508     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 509     public void testGroupingByWithMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 510         Function<Integer, Integer> classifier = i -> i % 3;
 511         Function<Integer, Integer> mapper = i -> i * 2;
 512 
 513         exerciseMapCollection(data,
 514                               groupingBy(classifier, mapping(mapper, toList())),
 515                               new GroupingByAssertion<>(classifier, HashMap.class,
 516                                                         new MappingAssertion<>(mapper,
 517                                                                                new ToListAssertion<>())));
 518     }
 519 
 520     @Test(groups = { "serialization-hostile" })
 521     public void testFlatMappingClose() {
 522         Function<Integer, Integer> classifier = i -> i;
 523         AtomicInteger ai = new AtomicInteger();
 524         Function<Integer, Stream<Integer>> flatMapper = i -> Stream.of(i, i).onClose(ai::getAndIncrement);
 525         Map<Integer, List<Integer>> m = Stream.of(1, 2).collect(groupingBy(classifier, flatMapping(flatMapper, toList())));
 526         assertEquals(m.size(), ai.get());
 527     }
 528 
 529     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 530     public void testGroupingByWithFlatMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 531         Function<Integer, Integer> classifier = i -> i % 3;
 532         Function<Integer, Stream<Integer>> flatMapperByNull = i -> null;
 533         Function<Integer, Stream<Integer>> flatMapperBy0 = i -> Stream.empty();
 534         Function<Integer, Stream<Integer>> flatMapperBy2 = i -> Stream.of(i, i);
 535 
 536         exerciseMapCollection(data,
 537                               groupingBy(classifier, flatMapping(flatMapperByNull, toList())),
 538                               new GroupingByAssertion<>(classifier, HashMap.class,
 539                                                         new FlatMappingAssertion<>(flatMapperBy0,
 540                                                                                    new ToListAssertion<>())));
 541         exerciseMapCollection(data,
 542                               groupingBy(classifier, flatMapping(flatMapperBy0, toList())),
 543                               new GroupingByAssertion<>(classifier, HashMap.class,
 544                                                         new FlatMappingAssertion<>(flatMapperBy0,
 545                                                                                    new ToListAssertion<>())));
 546         exerciseMapCollection(data,
 547                               groupingBy(classifier, flatMapping(flatMapperBy2, toList())),
 548                               new GroupingByAssertion<>(classifier, HashMap.class,
 549                                                         new FlatMappingAssertion<>(flatMapperBy2,
 550                                                                                    new ToListAssertion<>())));
 551     }
 552 
 553     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 554     public void testTwoLevelGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 555         Function<Integer, Integer> classifier = i -> i % 6;
 556         Function<Integer, Integer> classifier2 = i -> i % 23;
 557 
 558         // Two-level groupBy
 559         exerciseMapCollection(data,
 560                               groupingBy(classifier, groupingBy(classifier2)),
 561                               new GroupingByAssertion<>(classifier, HashMap.class,
 562                                                         new GroupingByAssertion<>(classifier2, HashMap.class,
 563                                                                                   new ToListAssertion<>())));
 564         // with concurrent as upstream
 565         exerciseMapCollection(data,
 566                               groupingByConcurrent(classifier, groupingBy(classifier2)),
 567                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 568                                                         new GroupingByAssertion<>(classifier2, HashMap.class,
 569                                                                                   new ToListAssertion<>())));
 570         // with concurrent as downstream
 571         exerciseMapCollection(data,
 572                               groupingBy(classifier, groupingByConcurrent(classifier2)),
 573                               new GroupingByAssertion<>(classifier, HashMap.class,
 574                                                         new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
 575                                                                                   new ToListAssertion<>())));
 576         // with concurrent as upstream and downstream
 577         exerciseMapCollection(data,
 578                               groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
 579                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 580                                                         new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
 581                                                                                   new ToListAssertion<>())));
 582 
 583         // With explicit constructors
 584         exerciseMapCollection(data,
 585                               groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
 586                               new GroupingByAssertion<>(classifier, TreeMap.class,
 587                                                         new GroupingByAssertion<>(classifier2, TreeMap.class,
 588                                                                                   new ToCollectionAssertion<Integer>(HashSet.class, false))));
 589         // with concurrent as upstream
 590         exerciseMapCollection(data,
 591                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
 592                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 593                                                         new GroupingByAssertion<>(classifier2, TreeMap.class,
 594                                                                                   new ToListAssertion<>())));
 595         // with concurrent as downstream
 596         exerciseMapCollection(data,
 597                               groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
 598                               new GroupingByAssertion<>(classifier, TreeMap.class,
 599                                                         new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
 600                                                                                   new ToListAssertion<>())));
 601         // with concurrent as upstream and downstream
 602         exerciseMapCollection(data,
 603                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
 604                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 605                                                         new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
 606                                                                                   new ToListAssertion<>())));
 607     }
 608 
 609     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 610     public void testGroupubgByWithReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 611         Function<Integer, Integer> classifier = i -> i % 3;
 612 
 613         // Single-level simple reduce
 614         exerciseMapCollection(data,
 615                               groupingBy(classifier, reducing(0, Integer::sum)),
 616                               new GroupingByAssertion<>(classifier, HashMap.class,
 617                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 618         // with concurrent
 619         exerciseMapCollection(data,
 620                               groupingByConcurrent(classifier, reducing(0, Integer::sum)),
 621                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 622                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 623 
 624         // With explicit constructors
 625         exerciseMapCollection(data,
 626                               groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
 627                               new GroupingByAssertion<>(classifier, TreeMap.class,
 628                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 629         // with concurrent
 630         exerciseMapCollection(data,
 631                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
 632                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 633                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 634 
 635         // Single-level map-reduce
 636         exerciseMapCollection(data,
 637                               groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
 638                               new GroupingByAssertion<>(classifier, HashMap.class,
 639                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 640         // with concurrent
 641         exerciseMapCollection(data,
 642                               groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
 643                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 644                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 645 
 646         // With explicit constructors
 647         exerciseMapCollection(data,
 648                               groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
 649                               new GroupingByAssertion<>(classifier, TreeMap.class,
 650                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 651         // with concurrent
 652         exerciseMapCollection(data,
 653                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
 654                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 655                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 656     }
 657 
 658     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 659     public void testSimplePartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 660         Predicate<Integer> classifier = i -> i % 3 == 0;
 661 
 662         // Single-level partition to downstream List
 663         exerciseMapCollection(data,
 664                               partitioningBy(classifier),
 665                               new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
 666         exerciseMapCollection(data,
 667                               partitioningBy(classifier, toList()),
 668                               new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
 669     }
 670 
 671     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 672     public void testTwoLevelPartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 673         Predicate<Integer> classifier = i -> i % 3 == 0;
 674         Predicate<Integer> classifier2 = i -> i % 7 == 0;
 675 
 676         // Two level partition
 677         exerciseMapCollection(data,
 678                               partitioningBy(classifier, partitioningBy(classifier2)),
 679                               new PartitioningByAssertion<>(classifier,
 680                                                             new PartitioningByAssertion(classifier2, new ToListAssertion<>())));
 681 
 682         // Two level partition with reduce
 683         exerciseMapCollection(data,
 684                               partitioningBy(classifier, reducing(0, Integer::sum)),
 685                               new PartitioningByAssertion<>(classifier,
 686                                                             new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 687     }
 688 
 689     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 690     public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 691         List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList()));
 692         List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList)));
 693         assertEquals(asList, asImmutableList);
 694         try {
 695             asImmutableList.add(0);
 696             fail("Expecting immutable result");
 697         }
 698         catch (UnsupportedOperationException ignored) { }
 699     }
 700 
 701 }