1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 package org.openjdk.tests.java.util.stream;
  24 
  25 import java.util.ArrayList;
  26 import java.util.Arrays;
  27 import java.util.Collection;
  28 import java.util.Comparator;
  29 import java.util.HashMap;
  30 import java.util.HashSet;
  31 import java.util.Iterator;
  32 import java.util.List;
  33 import java.util.Map;
  34 import java.util.Optional;
  35 import java.util.Set;
  36 import java.util.TreeMap;
  37 import java.util.concurrent.ConcurrentHashMap;
  38 import java.util.concurrent.ConcurrentSkipListMap;
  39 import java.util.function.BinaryOperator;
  40 import java.util.function.Function;
  41 import java.util.function.Predicate;
  42 import java.util.function.Supplier;
  43 import java.util.stream.Collector;
  44 import java.util.stream.Collectors;
  45 import java.util.stream.LambdaTestHelpers;
  46 import java.util.stream.OpTestCase;
  47 import java.util.stream.Stream;
  48 import java.util.stream.StreamOpFlagTestHelper;
  49 import java.util.stream.StreamTestDataProvider;
  50 import java.util.stream.TestData;
  51 
  52 import org.testng.annotations.Test;
  53 
  54 import static java.util.stream.Collectors.groupingBy;
  55 import static java.util.stream.Collectors.groupingByConcurrent;
  56 import static java.util.stream.Collectors.partitioningBy;
  57 import static java.util.stream.Collectors.reducing;
  58 import static java.util.stream.Collectors.toCollection;
  59 import static java.util.stream.Collectors.toConcurrentMap;
  60 import static java.util.stream.Collectors.toList;
  61 import static java.util.stream.Collectors.toMap;
  62 import static java.util.stream.Collectors.toSet;
  63 import static java.util.stream.LambdaTestHelpers.assertContents;
  64 import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
  65 import static java.util.stream.LambdaTestHelpers.mDoubler;
  66 
  67 /**
  68  * TabulatorsTest
  69  *
  70  * @author Brian Goetz
  71  */
  72 @SuppressWarnings({"rawtypes", "unchecked"})
  73 public class TabulatorsTest extends OpTestCase {
  74 
  75     private static abstract class TabulationAssertion<T, U> {
  76         abstract void assertValue(U value,
  77                                   Supplier<Stream<T>> source,
  78                                   boolean ordered) throws ReflectiveOperationException;
  79     }
  80 
  81     @SuppressWarnings({"rawtypes", "unchecked"})
  82     static class GroupedMapAssertion<T, K, V, M extends Map<K, ? extends V>> extends TabulationAssertion<T, M> {
  83         private final Class<? extends Map> clazz;
  84         private final Function<T, K> classifier;
  85         private final TabulationAssertion<T,V> downstream;
  86 
  87         protected GroupedMapAssertion(Function<T, K> classifier,
  88                                       Class<? extends Map> clazz,
  89                                       TabulationAssertion<T, V> downstream) {
  90             this.clazz = clazz;
  91             this.classifier = classifier;
  92             this.downstream = downstream;
  93         }
  94 
  95         void assertValue(M map,
  96                          Supplier<Stream<T>> source,
  97                          boolean ordered) throws ReflectiveOperationException {
  98             if (!clazz.isAssignableFrom(map.getClass()))
  99                 fail(String.format("Class mismatch in GroupedMapAssertion: %s, %s", clazz, map.getClass()));
 100             assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
 101             for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
 102                 K key = entry.getKey();
 103                 downstream.assertValue(entry.getValue(),
 104                                        () -> source.get().filter(e -> classifier.apply(e).equals(key)),
 105                                        ordered);
 106             }
 107         }
 108     }
 109 
 110     static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends TabulationAssertion<T, M> {
 111         private final Class<? extends Map> clazz;
 112         private final Function<T, K> keyFn;
 113         private final Function<T, V> valueFn;
 114         private final BinaryOperator<V> mergeFn;
 115 
 116         ToMapAssertion(Function<T, K> keyFn,
 117                        Function<T, V> valueFn,
 118                        BinaryOperator<V> mergeFn,
 119                        Class<? extends Map> clazz) {
 120             this.clazz = clazz;
 121             this.keyFn = keyFn;
 122             this.valueFn = valueFn;
 123             this.mergeFn = mergeFn;
 124         }
 125 
 126         @Override
 127         void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
 128             Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
 129             assertTrue(clazz.isAssignableFrom(map.getClass()));
 130             assertEquals(uniqueKeys, map.keySet());
 131             source.get().forEach(t -> {
 132                 K key = keyFn.apply(t);
 133                 V v = source.get()
 134                             .filter(e -> key.equals(keyFn.apply(e)))
 135                             .map(valueFn)
 136                             .reduce(mergeFn)
 137                             .get();
 138                 assertEquals(map.get(key), v);
 139             });
 140         }
 141     }
 142 
 143     static class PartitionAssertion<T, D> extends TabulationAssertion<T, Map<Boolean,D>> {
 144         private final Predicate<T> predicate;
 145         private final TabulationAssertion<T,D> downstream;
 146 
 147         protected PartitionAssertion(Predicate<T> predicate,
 148                                      TabulationAssertion<T, D> downstream) {
 149             this.predicate = predicate;
 150             this.downstream = downstream;
 151         }
 152 
 153         void assertValue(Map<Boolean, D> map,
 154                          Supplier<Stream<T>> source,
 155                          boolean ordered) throws ReflectiveOperationException {
 156             if (!Map.class.isAssignableFrom(map.getClass()))
 157                 fail(String.format("Class mismatch in PartitionAssertion: %s", map.getClass()));
 158             assertEquals(2, map.size());
 159             downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
 160             downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
 161         }
 162     }
 163 
 164     @SuppressWarnings({"rawtypes", "unchecked"})
 165     static class ListAssertion<T> extends TabulationAssertion<T, List<T>> {
 166         @Override
 167         void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
 168                 throws ReflectiveOperationException {
 169             if (!List.class.isAssignableFrom(value.getClass()))
 170                 fail(String.format("Class mismatch in ListAssertion: %s", value.getClass()));
 171             Stream<T> stream = source.get();
 172             List<T> result = new ArrayList<>();
 173             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
 174                 result.add(it.next());
 175             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
 176                 assertContents(value, result);
 177             else
 178                 assertContentsUnordered(value, result);
 179         }
 180     }
 181 
 182     @SuppressWarnings({"rawtypes", "unchecked"})
 183     static class CollectionAssertion<T> extends TabulationAssertion<T, Collection<T>> {
 184         private final Class<? extends Collection> clazz;
 185         private final boolean targetOrdered;
 186 
 187         protected CollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
 188             this.clazz = clazz;
 189             this.targetOrdered = targetOrdered;
 190         }
 191 
 192         @Override
 193         void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
 194                 throws ReflectiveOperationException {
 195             if (!clazz.isAssignableFrom(value.getClass()))
 196                 fail(String.format("Class mismatch in CollectionAssertion: %s, %s", clazz, value.getClass()));
 197             Stream<T> stream = source.get();
 198             Collection<T> result = clazz.newInstance();
 199             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
 200                 result.add(it.next());
 201             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
 202                 assertContents(value, result);
 203             else
 204                 assertContentsUnordered(value, result);
 205         }
 206     }
 207 
 208     static class ReduceAssertion<T, U> extends TabulationAssertion<T, U> {
 209         private final U identity;
 210         private final Function<T, U> mapper;
 211         private final BinaryOperator<U> reducer;
 212 
 213         ReduceAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
 214             this.identity = identity;
 215             this.mapper = mapper;
 216             this.reducer = reducer;
 217         }
 218 
 219         @Override
 220         void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
 221                 throws ReflectiveOperationException {
 222             Optional<U> reduced = source.get().map(mapper).reduce(reducer);
 223             if (value == null)
 224                 assertTrue(!reduced.isPresent());
 225             else if (!reduced.isPresent()) {
 226                 assertEquals(value, identity);
 227             }
 228             else {
 229                 assertEquals(value, reduced.get());
 230             }
 231         }
 232     }
 233 
 234     private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
 235         return (act, exp, ord, par) -> {
 236             if (par && (!ordered || !ord)) {
 237                 TabulatorsTest.nestedMapEqualityAssertion(act, exp);
 238             }
 239             else {
 240                 LambdaTestHelpers.assertContentsEqual(act, exp);
 241             }
 242         };
 243     }
 244 
 245     private<T, M extends Map>
 246     void exerciseMapTabulation(TestData<T, Stream<T>> data,
 247                                Collector<T, ?, ? extends M> collector,
 248                                TabulationAssertion<T, M> assertion)
 249             throws ReflectiveOperationException {
 250         boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
 251 
 252         M m = withData(data)
 253                 .terminal(s -> s.collect(collector))
 254                 .resultAsserter(mapTabulationAsserter(ordered))
 255                 .exercise();
 256         assertion.assertValue(m, () -> data.stream(), ordered);
 257 
 258         m = withData(data)
 259                 .terminal(s -> s.unordered().collect(collector))
 260                 .resultAsserter(mapTabulationAsserter(ordered))
 261                 .exercise();
 262         assertion.assertValue(m, () -> data.stream(), false);
 263     }
 264 
 265     private static void nestedMapEqualityAssertion(Object o1, Object o2) {
 266         if (o1 instanceof Map) {
 267             Map m1 = (Map) o1;
 268             Map m2 = (Map) o2;
 269             assertContentsUnordered(m1.keySet(), m2.keySet());
 270             for (Object k : m1.keySet())
 271                 nestedMapEqualityAssertion(m1.get(k), m2.get(k));
 272         }
 273         else if (o1 instanceof Collection) {
 274             assertContentsUnordered(((Collection) o1), ((Collection) o2));
 275         }
 276         else
 277             assertEquals(o1, o2);
 278     }
 279 
 280     private<T, R> void assertCollect(TestData.OfRef<T> data,
 281                                      Collector<T, ?, R> collector,
 282                                      Function<Stream<T>, R> streamReduction) {
 283         R check = streamReduction.apply(data.stream());
 284         withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
 285     }
 286 
 287     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 288     public void testReduce(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 289         assertCollect(data, Collectors.reducing(0, Integer::sum),
 290                       s -> s.reduce(0, Integer::sum));
 291         assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
 292                       s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
 293         assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
 294                       s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
 295 
 296         assertCollect(data, Collectors.reducing(Integer::sum),
 297                       s -> s.reduce(Integer::sum));
 298         assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
 299                       s -> s.min(Integer::compare));
 300         assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
 301                       s -> s.max(Integer::compare));
 302 
 303         assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
 304                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
 305 
 306         assertCollect(data, Collectors.summingLong(x -> x * 2L),
 307                       s -> s.map(x -> x*2L).reduce(0L, Long::sum));
 308         assertCollect(data, Collectors.summingInt(x -> x * 2),
 309                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
 310         assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
 311                       s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
 312 
 313         assertCollect(data, Collectors.averagingInt(x -> x * 2),
 314                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
 315         assertCollect(data, Collectors.averagingLong(x -> x * 2),
 316                       s -> s.mapToLong(x -> x * 2).average().orElse(0));
 317         assertCollect(data, Collectors.averagingDouble(x -> x * 2),
 318                       s -> s.mapToDouble(x -> x * 2).average().orElse(0));
 319 
 320         // Test explicit Collector.of
 321         Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
 322                                                                    (a, b) -> {
 323                                                                        a[0] += b * 2;
 324                                                                        a[1]++;
 325                                                                    },
 326                                                                    (a, b) -> {
 327                                                                        a[0] += b[0];
 328                                                                        a[1] += b[1];
 329                                                                        return a;
 330                                                                    },
 331                                                                    a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
 332         assertCollect(data, avg2xint,
 333                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
 334     }
 335 
 336     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 337     public void testJoin(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 338         withData(data)
 339                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
 340                 .expectedResult(join(data, ""))
 341                 .exercise();
 342 
 343         Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
 344         withData(data)
 345                 .terminal(s -> s.map(Object::toString).collect(likeJoining))
 346                 .expectedResult(join(data, ""))
 347                 .exercise();
 348 
 349         withData(data)
 350                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
 351                 .expectedResult(join(data, ","))
 352                 .exercise();
 353 
 354         withData(data)
 355                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
 356                 .expectedResult("[" + join(data, ",") + "]")
 357                 .exercise();
 358     }
 359 
 360     private<T> String join(TestData.OfRef<T> data, String delim) {
 361         StringBuilder sb = new StringBuilder();
 362         boolean first = true;
 363         for (T i : data) {
 364             if (!first)
 365                 sb.append(delim);
 366             sb.append(i.toString());
 367             first = false;
 368         }
 369         return sb.toString();
 370     }
 371 
 372     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 373     public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 374         Function<Integer, Integer> keyFn = i -> i * 2;
 375         Function<Integer, Integer> valueFn = i -> i * 4;
 376 
 377         List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
 378         Set<Integer> dataAsSet = new HashSet<>(dataAsList);
 379 
 380         BinaryOperator<Integer> sum = Integer::sum;
 381         for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
 382                                                         (u, v) -> v,
 383                                                         sum)) {
 384             try {
 385                 exerciseMapTabulation(data, toMap(keyFn, valueFn),
 386                                       new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
 387                 if (dataAsList.size() != dataAsSet.size())
 388                     fail("Expected ISE on input with duplicates");
 389             }
 390             catch (IllegalStateException e) {
 391                 if (dataAsList.size() == dataAsSet.size())
 392                     fail("Expected no ISE on input without duplicates");
 393             }
 394 
 395             exerciseMapTabulation(data, toMap(keyFn, valueFn, op),
 396                                   new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
 397 
 398             exerciseMapTabulation(data, toMap(keyFn, valueFn, op, TreeMap::new),
 399                                   new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
 400         }
 401 
 402         // For concurrent maps, only use commutative merge functions
 403         try {
 404             exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn),
 405                                   new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
 406             if (dataAsList.size() != dataAsSet.size())
 407                 fail("Expected ISE on input with duplicates");
 408         }
 409         catch (IllegalStateException e) {
 410             if (dataAsList.size() == dataAsSet.size())
 411                 fail("Expected no ISE on input without duplicates");
 412         }
 413 
 414         exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum),
 415                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
 416 
 417         exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
 418                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
 419     }
 420 
 421     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 422     public void testSimpleGroupBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 423         Function<Integer, Integer> classifier = i -> i % 3;
 424 
 425         // Single-level groupBy
 426         exerciseMapTabulation(data, groupingBy(classifier),
 427                               new GroupedMapAssertion<>(classifier, HashMap.class,
 428                                                         new ListAssertion<>()));
 429         exerciseMapTabulation(data, groupingByConcurrent(classifier),
 430                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
 431                                                         new ListAssertion<>()));
 432 
 433         // With explicit constructors
 434         exerciseMapTabulation(data,
 435                               groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
 436                               new GroupedMapAssertion<>(classifier, TreeMap.class,
 437                                                         new CollectionAssertion<Integer>(HashSet.class, false)));
 438         exerciseMapTabulation(data,
 439                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
 440                                                    toCollection(HashSet::new)),
 441                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
 442                                                         new CollectionAssertion<Integer>(HashSet.class, false)));
 443     }
 444 
 445     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 446     public void testTwoLevelGroupBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 447         Function<Integer, Integer> classifier = i -> i % 6;
 448         Function<Integer, Integer> classifier2 = i -> i % 23;
 449 
 450         // Two-level groupBy
 451         exerciseMapTabulation(data,
 452                               groupingBy(classifier, groupingBy(classifier2)),
 453                               new GroupedMapAssertion<>(classifier, HashMap.class,
 454                                                         new GroupedMapAssertion<>(classifier2, HashMap.class,
 455                                                                                   new ListAssertion<>())));
 456         // with concurrent as upstream
 457         exerciseMapTabulation(data,
 458                               groupingByConcurrent(classifier, groupingBy(classifier2)),
 459                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
 460                                                         new GroupedMapAssertion<>(classifier2, HashMap.class,
 461                                                                                   new ListAssertion<>())));
 462         // with concurrent as downstream
 463         exerciseMapTabulation(data,
 464                               groupingBy(classifier, groupingByConcurrent(classifier2)),
 465                               new GroupedMapAssertion<>(classifier, HashMap.class,
 466                                                         new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class,
 467                                                                                   new ListAssertion<>())));
 468         // with concurrent as upstream and downstream
 469         exerciseMapTabulation(data,
 470                               groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
 471                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
 472                                                         new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class,
 473                                                                                   new ListAssertion<>())));
 474 
 475         // With explicit constructors
 476         exerciseMapTabulation(data,
 477                               groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
 478                               new GroupedMapAssertion<>(classifier, TreeMap.class,
 479                                                         new GroupedMapAssertion<>(classifier2, TreeMap.class,
 480                                                                                   new CollectionAssertion<Integer>(HashSet.class, false))));
 481         // with concurrent as upstream
 482         exerciseMapTabulation(data,
 483                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
 484                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
 485                                                         new GroupedMapAssertion<>(classifier2, TreeMap.class,
 486                                                                                   new ListAssertion<>())));
 487         // with concurrent as downstream
 488         exerciseMapTabulation(data,
 489                               groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
 490                               new GroupedMapAssertion<>(classifier, TreeMap.class,
 491                                                         new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class,
 492                                                                                   new ListAssertion<>())));
 493         // with concurrent as upstream and downstream
 494         exerciseMapTabulation(data,
 495                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
 496                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
 497                                                         new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class,
 498                                                                                   new ListAssertion<>())));
 499     }
 500 
 501     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 502     public void testGroupedReduce(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 503         Function<Integer, Integer> classifier = i -> i % 3;
 504 
 505         // Single-level simple reduce
 506         exerciseMapTabulation(data,
 507                               groupingBy(classifier, reducing(0, Integer::sum)),
 508                               new GroupedMapAssertion<>(classifier, HashMap.class,
 509                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 510         // with concurrent
 511         exerciseMapTabulation(data,
 512                               groupingByConcurrent(classifier, reducing(0, Integer::sum)),
 513                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
 514                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 515 
 516         // With explicit constructors
 517         exerciseMapTabulation(data,
 518                               groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
 519                               new GroupedMapAssertion<>(classifier, TreeMap.class,
 520                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 521         // with concurrent
 522         exerciseMapTabulation(data,
 523                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
 524                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
 525                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 526 
 527         // Single-level map-reduce
 528         exerciseMapTabulation(data,
 529                               groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
 530                               new GroupedMapAssertion<>(classifier, HashMap.class,
 531                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
 532         // with concurrent
 533         exerciseMapTabulation(data,
 534                               groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
 535                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
 536                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
 537 
 538         // With explicit constructors
 539         exerciseMapTabulation(data,
 540                               groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
 541                               new GroupedMapAssertion<>(classifier, TreeMap.class,
 542                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
 543         // with concurrent
 544         exerciseMapTabulation(data,
 545                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
 546                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
 547                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
 548     }
 549 
 550     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 551     public void testSimplePartition(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 552         Predicate<Integer> classifier = i -> i % 3 == 0;
 553 
 554         // Single-level partition to downstream List
 555         exerciseMapTabulation(data,
 556                               partitioningBy(classifier),
 557                               new PartitionAssertion<>(classifier, new ListAssertion<>()));
 558         exerciseMapTabulation(data,
 559                               partitioningBy(classifier, toList()),
 560                               new PartitionAssertion<>(classifier, new ListAssertion<>()));
 561     }
 562 
 563     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 564     public void testTwoLevelPartition(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 565         Predicate<Integer> classifier = i -> i % 3 == 0;
 566         Predicate<Integer> classifier2 = i -> i % 7 == 0;
 567 
 568         // Two level partition
 569         exerciseMapTabulation(data,
 570                               partitioningBy(classifier, partitioningBy(classifier2)),
 571                               new PartitionAssertion<>(classifier,
 572                                                        new PartitionAssertion(classifier2, new ListAssertion<>())));
 573 
 574         // Two level partition with reduce
 575         exerciseMapTabulation(data,
 576                               partitioningBy(classifier, reducing(0, Integer::sum)),
 577                               new PartitionAssertion<>(classifier,
 578                                                        new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 579     }
 580 }