1 /*
   2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 package org.openjdk.tests.java.util.stream;
  24 
  25 import java.util.ArrayList;
  26 import java.util.Arrays;
  27 import java.util.Collection;
  28 import java.util.Collections;
  29 import java.util.Comparator;
  30 import java.util.HashMap;
  31 import java.util.HashSet;
  32 import java.util.Iterator;
  33 import java.util.List;
  34 import java.util.Map;
  35 import java.util.Optional;
  36 import java.util.Set;
  37 import java.util.StringJoiner;
  38 import java.util.TreeMap;
  39 import java.util.concurrent.ConcurrentHashMap;
  40 import java.util.concurrent.ConcurrentSkipListMap;
  41 import java.util.concurrent.atomic.AtomicInteger;
  42 import java.util.function.BinaryOperator;
  43 import java.util.function.Function;
  44 import java.util.function.Predicate;
  45 import java.util.function.Supplier;
  46 import java.util.stream.Collector;
  47 import java.util.stream.Collectors;
  48 import java.util.stream.LambdaTestHelpers;
  49 import java.util.stream.OpTestCase;
  50 import java.util.stream.Stream;
  51 import java.util.stream.StreamOpFlagTestHelper;
  52 import java.util.stream.StreamTestDataProvider;
  53 import java.util.stream.TestData;
  54 
  55 import org.testng.annotations.Test;
  56 
  57 import static java.util.stream.Collectors.collectingAndThen;
  58 import static java.util.stream.Collectors.flatMapping;
  59 import static java.util.stream.Collectors.filtering;
  60 import static java.util.stream.Collectors.groupingBy;
  61 import static java.util.stream.Collectors.groupingByConcurrent;
  62 import static java.util.stream.Collectors.mapping;
  63 import static java.util.stream.Collectors.partitioningBy;
  64 import static java.util.stream.Collectors.reducing;
  65 import static java.util.stream.Collectors.toCollection;
  66 import static java.util.stream.Collectors.toConcurrentMap;
  67 import static java.util.stream.Collectors.toList;
  68 import static java.util.stream.Collectors.toMap;
  69 import static java.util.stream.Collectors.toSet;
  70 import static java.util.stream.LambdaTestHelpers.assertContents;
  71 import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
  72 import static java.util.stream.LambdaTestHelpers.mDoubler;
  73 
  74 /*
  75  * @test
  76  * @bug 8071600 8144675
  77  * @summary Test for collectors.
  78  */
  79 public class CollectorsTest extends OpTestCase {
  80 
  81     private abstract static class CollectorAssertion<T, U> {
  82         abstract void assertValue(U value,
  83                                   Supplier<Stream<T>> source,
  84                                   boolean ordered) throws ReflectiveOperationException;
  85     }
  86 
  87     static class MappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
  88         private final Function<T, V> mapper;
  89         private final CollectorAssertion<V, R> downstream;
  90 
  91         MappingAssertion(Function<T, V> mapper, CollectorAssertion<V, R> downstream) {
  92             this.mapper = mapper;
  93             this.downstream = downstream;
  94         }
  95 
  96         @Override
  97         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
  98             downstream.assertValue(value,
  99                                    () -> source.get().map(mapper::apply),
 100                                    ordered);
 101         }
 102     }
 103 
 104     static class FlatMappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
 105         private final Function<T, Stream<V>> mapper;
 106         private final CollectorAssertion<V, R> downstream;
 107 
 108         FlatMappingAssertion(Function<T, Stream<V>> mapper,
 109                              CollectorAssertion<V, R> downstream) {
 110             this.mapper = mapper;
 111             this.downstream = downstream;
 112         }
 113 
 114         @Override
 115         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
 116             downstream.assertValue(value,
 117                                    () -> source.get().flatMap(mapper::apply),
 118                                    ordered);
 119         }
 120     }
 121 
 122     static class FilteringAssertion<T, R> extends CollectorAssertion<T, R> {
 123         private final Predicate<T> filter;
 124         private final CollectorAssertion<T, R> downstream;
 125 
 126         public FilteringAssertion(Predicate<T> filter, CollectorAssertion<T, R> downstream) {
 127             this.filter = filter;
 128             this.downstream = downstream;
 129         }
 130 
 131         @Override
 132         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
 133             downstream.assertValue(value,
 134                                    () -> source.get().filter(filter),
 135                                    ordered);
 136         }
 137     }
 138 
 139     static class GroupingByAssertion<T, K, V, M extends Map<K, ? extends V>> extends CollectorAssertion<T, M> {
 140         private final Class<? extends Map> clazz;
 141         private final Function<T, K> classifier;
 142         private final CollectorAssertion<T,V> downstream;
 143 
 144         GroupingByAssertion(Function<T, K> classifier, Class<? extends Map> clazz,
 145                             CollectorAssertion<T, V> downstream) {
 146             this.clazz = clazz;
 147             this.classifier = classifier;
 148             this.downstream = downstream;
 149         }
 150 
 151         @Override
 152         void assertValue(M map,
 153                          Supplier<Stream<T>> source,
 154                          boolean ordered) throws ReflectiveOperationException {
 155             if (!clazz.isAssignableFrom(map.getClass()))
 156                 fail(String.format("Class mismatch in GroupingByAssertion: %s, %s", clazz, map.getClass()));
 157             assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
 158             for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
 159                 K key = entry.getKey();
 160                 downstream.assertValue(entry.getValue(),
 161                                        () -> source.get().filter(e -> classifier.apply(e).equals(key)),
 162                                        ordered);
 163             }
 164         }
 165     }
 166 
 167     static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends CollectorAssertion<T, M> {
 168         private final Class<? extends Map> clazz;
 169         private final Function<T, K> keyFn;
 170         private final Function<T, V> valueFn;
 171         private final BinaryOperator<V> mergeFn;
 172 
 173         ToMapAssertion(Function<T, K> keyFn,
 174                        Function<T, V> valueFn,
 175                        BinaryOperator<V> mergeFn,
 176                        Class<? extends Map> clazz) {
 177             this.clazz = clazz;
 178             this.keyFn = keyFn;
 179             this.valueFn = valueFn;
 180             this.mergeFn = mergeFn;
 181         }
 182 
 183         @Override
 184         void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
 185             if (!clazz.isAssignableFrom(map.getClass()))
 186                 fail(String.format("Class mismatch in ToMapAssertion: %s, %s", clazz, map.getClass()));
 187             Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
 188             assertEquals(uniqueKeys, map.keySet());
 189             source.get().forEach(t -> {
 190                 K key = keyFn.apply(t);
 191                 V v = source.get()
 192                             .filter(e -> key.equals(keyFn.apply(e)))
 193                             .map(valueFn)
 194                             .reduce(mergeFn)
 195                             .get();
 196                 assertEquals(map.get(key), v);
 197             });
 198         }
 199     }
 200 
 201     static class PartitioningByAssertion<T, D> extends CollectorAssertion<T, Map<Boolean,D>> {
 202         private final Predicate<T> predicate;
 203         private final CollectorAssertion<T,D> downstream;
 204 
 205         PartitioningByAssertion(Predicate<T> predicate, CollectorAssertion<T, D> downstream) {
 206             this.predicate = predicate;
 207             this.downstream = downstream;
 208         }
 209 
 210         @Override
 211         void assertValue(Map<Boolean, D> map,
 212                          Supplier<Stream<T>> source,
 213                          boolean ordered) throws ReflectiveOperationException {
 214             if (!Map.class.isAssignableFrom(map.getClass()))
 215                 fail(String.format("Class mismatch in PartitioningByAssertion: %s", map.getClass()));
 216             assertEquals(2, map.size());
 217             downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
 218             downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
 219         }
 220     }
 221 
 222     static class ToListAssertion<T> extends CollectorAssertion<T, List<T>> {
 223         @Override
 224         void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
 225                 throws ReflectiveOperationException {
 226             if (!List.class.isAssignableFrom(value.getClass()))
 227                 fail(String.format("Class mismatch in ToListAssertion: %s", value.getClass()));
 228             Stream<T> stream = source.get();
 229             List<T> result = new ArrayList<>();
 230             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
 231                 result.add(it.next());
 232             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
 233                 assertContents(value, result);
 234             else
 235                 assertContentsUnordered(value, result);
 236         }
 237     }
 238 
 239     static class ToCollectionAssertion<T> extends CollectorAssertion<T, Collection<T>> {
 240         private final Class<? extends Collection> clazz;
 241         private final boolean targetOrdered;
 242 
 243         ToCollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
 244             this.clazz = clazz;
 245             this.targetOrdered = targetOrdered;
 246         }
 247 
 248         @Override
 249         void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
 250                 throws ReflectiveOperationException {
 251             if (!clazz.isAssignableFrom(value.getClass()))
 252                 fail(String.format("Class mismatch in ToCollectionAssertion: %s, %s", clazz, value.getClass()));
 253             Stream<T> stream = source.get();
 254             Collection<T> result = clazz.newInstance();
 255             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
 256                 result.add(it.next());
 257             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
 258                 assertContents(value, result);
 259             else
 260                 assertContentsUnordered(value, result);
 261         }
 262     }
 263 
 264     static class ReducingAssertion<T, U> extends CollectorAssertion<T, U> {
 265         private final U identity;
 266         private final Function<T, U> mapper;
 267         private final BinaryOperator<U> reducer;
 268 
 269         ReducingAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
 270             this.identity = identity;
 271             this.mapper = mapper;
 272             this.reducer = reducer;
 273         }
 274 
 275         @Override
 276         void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
 277                 throws ReflectiveOperationException {
 278             Optional<U> reduced = source.get().map(mapper).reduce(reducer);
 279             if (value == null)
 280                 assertTrue(!reduced.isPresent());
 281             else if (!reduced.isPresent()) {
 282                 assertEquals(value, identity);
 283             }
 284             else {
 285                 assertEquals(value, reduced.get());
 286             }
 287         }
 288     }
 289 
 290     private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
 291         return (act, exp, ord, par) -> {
 292             if (par && (!ordered || !ord)) {
 293                 CollectorsTest.nestedMapEqualityAssertion(act, exp);
 294             }
 295             else {
 296                 LambdaTestHelpers.assertContentsEqual(act, exp);
 297             }
 298         };
 299     }
 300 
 301     private<T, M extends Map>
 302     void exerciseMapCollection(TestData<T, Stream<T>> data,
 303                                Collector<T, ?, ? extends M> collector,
 304                                CollectorAssertion<T, M> assertion)
 305             throws ReflectiveOperationException {
 306         boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
 307 
 308         M m = withData(data)
 309                 .terminal(s -> s.collect(collector))
 310                 .resultAsserter(mapTabulationAsserter(ordered))
 311                 .exercise();
 312         assertion.assertValue(m, () -> data.stream(), ordered);
 313 
 314         m = withData(data)
 315                 .terminal(s -> s.unordered().collect(collector))
 316                 .resultAsserter(mapTabulationAsserter(ordered))
 317                 .exercise();
 318         assertion.assertValue(m, () -> data.stream(), false);
 319     }
 320 
 321     private static void nestedMapEqualityAssertion(Object o1, Object o2) {
 322         if (o1 instanceof Map) {
 323             Map m1 = (Map) o1;
 324             Map m2 = (Map) o2;
 325             assertContentsUnordered(m1.keySet(), m2.keySet());
 326             for (Object k : m1.keySet())
 327                 nestedMapEqualityAssertion(m1.get(k), m2.get(k));
 328         }
 329         else if (o1 instanceof Collection) {
 330             assertContentsUnordered(((Collection) o1), ((Collection) o2));
 331         }
 332         else
 333             assertEquals(o1, o2);
 334     }
 335 
 336     private<T, R> void assertCollect(TestData.OfRef<T> data,
 337                                      Collector<T, ?, R> collector,
 338                                      Function<Stream<T>, R> streamReduction) {
 339         R check = streamReduction.apply(data.stream());
 340         withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
 341     }
 342 
 343     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 344     public void testReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 345         assertCollect(data, Collectors.reducing(0, Integer::sum),
 346                       s -> s.reduce(0, Integer::sum));
 347         assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
 348                       s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
 349         assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
 350                       s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
 351 
 352         assertCollect(data, Collectors.reducing(Integer::sum),
 353                       s -> s.reduce(Integer::sum));
 354         assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
 355                       s -> s.min(Integer::compare));
 356         assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
 357                       s -> s.max(Integer::compare));
 358 
 359         assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
 360                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
 361 
 362         assertCollect(data, Collectors.summingLong(x -> x * 2L),
 363                       s -> s.map(x -> x*2L).reduce(0L, Long::sum));
 364         assertCollect(data, Collectors.summingInt(x -> x * 2),
 365                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
 366         assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
 367                       s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
 368 
 369         assertCollect(data, Collectors.averagingInt(x -> x * 2),
 370                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
 371         assertCollect(data, Collectors.averagingLong(x -> x * 2),
 372                       s -> s.mapToLong(x -> x * 2).average().orElse(0));
 373         assertCollect(data, Collectors.averagingDouble(x -> x * 2),
 374                       s -> s.mapToDouble(x -> x * 2).average().orElse(0));
 375 
 376         // Test explicit Collector.of
 377         Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
 378                                                                    (a, b) -> {
 379                                                                        a[0] += b * 2;
 380                                                                        a[1]++;
 381                                                                    },
 382                                                                    (a, b) -> {
 383                                                                        a[0] += b[0];
 384                                                                        a[1] += b[1];
 385                                                                        return a;
 386                                                                    },
 387                                                                    a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
 388         assertCollect(data, avg2xint,
 389                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
 390     }
 391 
 392     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 393     public void testJoining(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 394         withData(data)
 395                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
 396                 .expectedResult(join(data, ""))
 397                 .exercise();
 398 
 399         Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
 400         withData(data)
 401                 .terminal(s -> s.map(Object::toString).collect(likeJoining))
 402                 .expectedResult(join(data, ""))
 403                 .exercise();
 404 
 405         withData(data)
 406                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
 407                 .expectedResult(join(data, ","))
 408                 .exercise();
 409 
 410         withData(data)
 411                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
 412                 .expectedResult("[" + join(data, ",") + "]")
 413                 .exercise();
 414 
 415         withData(data)
 416                 .terminal(s -> s.map(Object::toString)
 417                                 .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
 418                                 .toString())
 419                 .expectedResult(join(data, ""))
 420                 .exercise();
 421 
 422         withData(data)
 423                 .terminal(s -> s.map(Object::toString)
 424                                 .collect(() -> new StringJoiner(","),
 425                                          (sj, cs) -> sj.add(cs),
 426                                          (j1, j2) -> j1.merge(j2))
 427                                 .toString())
 428                 .expectedResult(join(data, ","))
 429                 .exercise();
 430 
 431         withData(data)
 432                 .terminal(s -> s.map(Object::toString)
 433                                 .collect(() -> new StringJoiner(",", "[", "]"),
 434                                          (sj, cs) -> sj.add(cs),
 435                                          (j1, j2) -> j1.merge(j2))
 436                                 .toString())
 437                 .expectedResult("[" + join(data, ",") + "]")
 438                 .exercise();
 439     }
 440 
 441     private<T> String join(TestData.OfRef<T> data, String delim) {
 442         StringBuilder sb = new StringBuilder();
 443         boolean first = true;
 444         for (T i : data) {
 445             if (!first)
 446                 sb.append(delim);
 447             sb.append(i.toString());
 448             first = false;
 449         }
 450         return sb.toString();
 451     }
 452 
 453     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 454     public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 455         Function<Integer, Integer> keyFn = i -> i * 2;
 456         Function<Integer, Integer> valueFn = i -> i * 4;
 457 
 458         List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
 459         Set<Integer> dataAsSet = new HashSet<>(dataAsList);
 460 
 461         BinaryOperator<Integer> sum = Integer::sum;
 462         for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
 463                                                         (u, v) -> v,
 464                                                         sum)) {
 465             try {
 466                 exerciseMapCollection(data, toMap(keyFn, valueFn),
 467                                       new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
 468                 if (dataAsList.size() != dataAsSet.size())
 469                     fail("Expected ISE on input with duplicates");
 470             }
 471             catch (IllegalStateException e) {
 472                 if (dataAsList.size() == dataAsSet.size())
 473                     fail("Expected no ISE on input without duplicates");
 474             }
 475 
 476             exerciseMapCollection(data, toMap(keyFn, valueFn, op),
 477                                   new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
 478 
 479             exerciseMapCollection(data, toMap(keyFn, valueFn, op, TreeMap::new),
 480                                   new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
 481         }
 482 
 483         // For concurrent maps, only use commutative merge functions
 484         try {
 485             exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn),
 486                                   new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
 487             if (dataAsList.size() != dataAsSet.size())
 488                 fail("Expected ISE on input with duplicates");
 489         }
 490         catch (IllegalStateException e) {
 491             if (dataAsList.size() == dataAsSet.size())
 492                 fail("Expected no ISE on input without duplicates");
 493         }
 494 
 495         exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum),
 496                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
 497 
 498         exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
 499                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
 500     }
 501 
 502     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 503     public void testSimpleGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 504         Function<Integer, Integer> classifier = i -> i % 3;
 505 
 506         // Single-level groupBy
 507         exerciseMapCollection(data, groupingBy(classifier),
 508                               new GroupingByAssertion<>(classifier, HashMap.class,
 509                                                         new ToListAssertion<>()));
 510         exerciseMapCollection(data, groupingByConcurrent(classifier),
 511                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 512                                                         new ToListAssertion<>()));
 513 
 514         // With explicit constructors
 515         exerciseMapCollection(data,
 516                               groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
 517                               new GroupingByAssertion<>(classifier, TreeMap.class,
 518                                                         new ToCollectionAssertion<Integer>(HashSet.class, false)));
 519         exerciseMapCollection(data,
 520                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
 521                                                    toCollection(HashSet::new)),
 522                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 523                                                         new ToCollectionAssertion<Integer>(HashSet.class, false)));
 524     }
 525 
 526     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 527     public void testGroupingByWithMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 528         Function<Integer, Integer> classifier = i -> i % 3;
 529         Function<Integer, Integer> mapper = i -> i * 2;
 530 
 531         exerciseMapCollection(data,
 532                               groupingBy(classifier, mapping(mapper, toList())),
 533                               new GroupingByAssertion<>(classifier, HashMap.class,
 534                                                         new MappingAssertion<>(mapper,
 535                                                                                new ToListAssertion<>())));
 536     }
 537 
 538     @Test(groups = { "serialization-hostile" })
 539     public void testFlatMappingClose() {
 540         Function<Integer, Integer> classifier = i -> i;
 541         AtomicInteger ai = new AtomicInteger();
 542         Function<Integer, Stream<Integer>> flatMapper = i -> Stream.of(i, i).onClose(ai::getAndIncrement);
 543         Map<Integer, List<Integer>> m = Stream.of(1, 2).collect(groupingBy(classifier, flatMapping(flatMapper, toList())));
 544         assertEquals(m.size(), ai.get());
 545     }
 546 
 547     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 548     public void testGroupingByWithFlatMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 549         Function<Integer, Integer> classifier = i -> i % 3;
 550         Function<Integer, Stream<Integer>> flatMapperByNull = i -> null;
 551         Function<Integer, Stream<Integer>> flatMapperBy0 = i -> Stream.empty();
 552         Function<Integer, Stream<Integer>> flatMapperBy2 = i -> Stream.of(i, i);
 553 
 554         exerciseMapCollection(data,
 555                               groupingBy(classifier, flatMapping(flatMapperByNull, toList())),
 556                               new GroupingByAssertion<>(classifier, HashMap.class,
 557                                                         new FlatMappingAssertion<>(flatMapperBy0,
 558                                                                                    new ToListAssertion<>())));
 559         exerciseMapCollection(data,
 560                               groupingBy(classifier, flatMapping(flatMapperBy0, toList())),
 561                               new GroupingByAssertion<>(classifier, HashMap.class,
 562                                                         new FlatMappingAssertion<>(flatMapperBy0,
 563                                                                                    new ToListAssertion<>())));
 564         exerciseMapCollection(data,
 565                               groupingBy(classifier, flatMapping(flatMapperBy2, toList())),
 566                               new GroupingByAssertion<>(classifier, HashMap.class,
 567                                                         new FlatMappingAssertion<>(flatMapperBy2,
 568                                                                                    new ToListAssertion<>())));
 569     }
 570 
 571     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 572     public void testGroupingByWithFiltering(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 573         Function<Integer, Integer> classifier = i -> i % 3;
 574         Predicate<Integer> filteringByMod2 = i -> i % 2 == 0;
 575         Predicate<Integer> filteringByUnder100 = i -> i % 2 < 100;
 576         Predicate<Integer> filteringByTrue = i -> true;
 577         Predicate<Integer> filteringByFalse = i -> false;
 578 
 579         exerciseMapCollection(data,
 580                               groupingBy(classifier, filtering(filteringByMod2, toList())),
 581                               new GroupingByAssertion<>(classifier, HashMap.class,
 582                                                         new FilteringAssertion<>(filteringByMod2,
 583                                                                                    new ToListAssertion<>())));
 584         exerciseMapCollection(data,
 585                               groupingBy(classifier, filtering(filteringByUnder100, toList())),
 586                               new GroupingByAssertion<>(classifier, HashMap.class,
 587                                                         new FilteringAssertion<>(filteringByUnder100,
 588                                                                                    new ToListAssertion<>())));
 589         exerciseMapCollection(data,
 590                               groupingBy(classifier, filtering(filteringByTrue, toList())),
 591                               new GroupingByAssertion<>(classifier, HashMap.class,
 592                                                         new FilteringAssertion<>(filteringByTrue,
 593                                                                                    new ToListAssertion<>())));
 594         exerciseMapCollection(data,
 595                               groupingBy(classifier, filtering(filteringByFalse, toList())),
 596                               new GroupingByAssertion<>(classifier, HashMap.class,
 597                                                         new FilteringAssertion<>(filteringByFalse,
 598                                                                                    new ToListAssertion<>())));
 599     }
 600 
 601     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 602     public void testTwoLevelGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 603         Function<Integer, Integer> classifier = i -> i % 6;
 604         Function<Integer, Integer> classifier2 = i -> i % 23;
 605 
 606         // Two-level groupBy
 607         exerciseMapCollection(data,
 608                               groupingBy(classifier, groupingBy(classifier2)),
 609                               new GroupingByAssertion<>(classifier, HashMap.class,
 610                                                         new GroupingByAssertion<>(classifier2, HashMap.class,
 611                                                                                   new ToListAssertion<>())));
 612         // with concurrent as upstream
 613         exerciseMapCollection(data,
 614                               groupingByConcurrent(classifier, groupingBy(classifier2)),
 615                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 616                                                         new GroupingByAssertion<>(classifier2, HashMap.class,
 617                                                                                   new ToListAssertion<>())));
 618         // with concurrent as downstream
 619         exerciseMapCollection(data,
 620                               groupingBy(classifier, groupingByConcurrent(classifier2)),
 621                               new GroupingByAssertion<>(classifier, HashMap.class,
 622                                                         new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
 623                                                                                   new ToListAssertion<>())));
 624         // with concurrent as upstream and downstream
 625         exerciseMapCollection(data,
 626                               groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
 627                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 628                                                         new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
 629                                                                                   new ToListAssertion<>())));
 630 
 631         // With explicit constructors
 632         exerciseMapCollection(data,
 633                               groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
 634                               new GroupingByAssertion<>(classifier, TreeMap.class,
 635                                                         new GroupingByAssertion<>(classifier2, TreeMap.class,
 636                                                                                   new ToCollectionAssertion<Integer>(HashSet.class, false))));
 637         // with concurrent as upstream
 638         exerciseMapCollection(data,
 639                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
 640                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 641                                                         new GroupingByAssertion<>(classifier2, TreeMap.class,
 642                                                                                   new ToListAssertion<>())));
 643         // with concurrent as downstream
 644         exerciseMapCollection(data,
 645                               groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
 646                               new GroupingByAssertion<>(classifier, TreeMap.class,
 647                                                         new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
 648                                                                                   new ToListAssertion<>())));
 649         // with concurrent as upstream and downstream
 650         exerciseMapCollection(data,
 651                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
 652                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 653                                                         new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
 654                                                                                   new ToListAssertion<>())));
 655     }
 656 
 657     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 658     public void testGroupubgByWithReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 659         Function<Integer, Integer> classifier = i -> i % 3;
 660 
 661         // Single-level simple reduce
 662         exerciseMapCollection(data,
 663                               groupingBy(classifier, reducing(0, Integer::sum)),
 664                               new GroupingByAssertion<>(classifier, HashMap.class,
 665                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 666         // with concurrent
 667         exerciseMapCollection(data,
 668                               groupingByConcurrent(classifier, reducing(0, Integer::sum)),
 669                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 670                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 671 
 672         // With explicit constructors
 673         exerciseMapCollection(data,
 674                               groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
 675                               new GroupingByAssertion<>(classifier, TreeMap.class,
 676                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 677         // with concurrent
 678         exerciseMapCollection(data,
 679                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
 680                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 681                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 682 
 683         // Single-level map-reduce
 684         exerciseMapCollection(data,
 685                               groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
 686                               new GroupingByAssertion<>(classifier, HashMap.class,
 687                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 688         // with concurrent
 689         exerciseMapCollection(data,
 690                               groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
 691                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 692                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 693 
 694         // With explicit constructors
 695         exerciseMapCollection(data,
 696                               groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
 697                               new GroupingByAssertion<>(classifier, TreeMap.class,
 698                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 699         // with concurrent
 700         exerciseMapCollection(data,
 701                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
 702                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 703                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 704     }
 705 
 706     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 707     public void testSimplePartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 708         Predicate<Integer> classifier = i -> i % 3 == 0;
 709 
 710         // Single-level partition to downstream List
 711         exerciseMapCollection(data,
 712                               partitioningBy(classifier),
 713                               new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
 714         exerciseMapCollection(data,
 715                               partitioningBy(classifier, toList()),
 716                               new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
 717     }
 718 
 719     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 720     public void testTwoLevelPartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 721         Predicate<Integer> classifier = i -> i % 3 == 0;
 722         Predicate<Integer> classifier2 = i -> i % 7 == 0;
 723 
 724         // Two level partition
 725         exerciseMapCollection(data,
 726                               partitioningBy(classifier, partitioningBy(classifier2)),
 727                               new PartitioningByAssertion<>(classifier,
 728                                                             new PartitioningByAssertion(classifier2, new ToListAssertion<>())));
 729 
 730         // Two level partition with reduce
 731         exerciseMapCollection(data,
 732                               partitioningBy(classifier, reducing(0, Integer::sum)),
 733                               new PartitioningByAssertion<>(classifier,
 734                                                             new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 735     }
 736 
 737     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 738     public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 739         List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList()));
 740         List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList)));
 741         assertEquals(asList, asImmutableList);
 742         try {
 743             asImmutableList.add(0);
 744             fail("Expecting immutable result");
 745         }
 746         catch (UnsupportedOperationException ignored) { }
 747     }
 748 
 749 }