1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 package org.openjdk.tests.java.util.stream;
  24 
  25 import java.util.ArrayList;
  26 import java.util.Arrays;
  27 import java.util.Collection;
  28 import java.util.Collections;
  29 import java.util.Comparator;
  30 import java.util.HashMap;
  31 import java.util.HashSet;
  32 import java.util.Iterator;
  33 import java.util.List;
  34 import java.util.Map;
  35 import java.util.Optional;
  36 import java.util.Set;
  37 import java.util.StringJoiner;
  38 import java.util.TreeMap;
  39 import java.util.concurrent.ConcurrentHashMap;
  40 import java.util.concurrent.ConcurrentSkipListMap;
  41 import java.util.function.BinaryOperator;
  42 import java.util.function.Function;
  43 import java.util.function.Predicate;
  44 import java.util.function.Supplier;
  45 import java.util.stream.Collector;
  46 import java.util.stream.Collectors;
  47 import java.util.stream.LambdaTestHelpers;
  48 import java.util.stream.OpTestCase;
  49 import java.util.stream.Stream;
  50 import java.util.stream.StreamOpFlagTestHelper;
  51 import java.util.stream.StreamTestDataProvider;
  52 import java.util.stream.TestData;
  53 
  54 import org.testng.annotations.Test;
  55 
  56 import static java.util.stream.Collectors.collectingAndThen;
  57 import static java.util.stream.Collectors.groupingBy;
  58 import static java.util.stream.Collectors.groupingByConcurrent;
  59 import static java.util.stream.Collectors.partitioningBy;
  60 import static java.util.stream.Collectors.reducing;
  61 import static java.util.stream.Collectors.toCollection;
  62 import static java.util.stream.Collectors.toConcurrentMap;
  63 import static java.util.stream.Collectors.toList;
  64 import static java.util.stream.Collectors.toMap;
  65 import static java.util.stream.Collectors.toSet;
  66 import static java.util.stream.LambdaTestHelpers.assertContents;
  67 import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
  68 import static java.util.stream.LambdaTestHelpers.mDoubler;
  69 
  70 /**
  71  * TabulatorsTest
  72  *
  73  * @author Brian Goetz
  74  */
  75 @SuppressWarnings({"rawtypes", "unchecked"})
  76 public class TabulatorsTest extends OpTestCase {
  77 
  78     private static abstract class TabulationAssertion<T, U> {
  79         abstract void assertValue(U value,
  80                                   Supplier<Stream<T>> source,
  81                                   boolean ordered) throws ReflectiveOperationException;
  82     }
  83 
  84     @SuppressWarnings({"rawtypes", "unchecked"})
  85     static class GroupedMapAssertion<T, K, V, M extends Map<K, ? extends V>> extends TabulationAssertion<T, M> {
  86         private final Class<? extends Map> clazz;
  87         private final Function<T, K> classifier;
  88         private final TabulationAssertion<T,V> downstream;
  89 
  90         protected GroupedMapAssertion(Function<T, K> classifier,
  91                                       Class<? extends Map> clazz,
  92                                       TabulationAssertion<T, V> downstream) {
  93             this.clazz = clazz;
  94             this.classifier = classifier;
  95             this.downstream = downstream;
  96         }
  97 
  98         void assertValue(M map,
  99                          Supplier<Stream<T>> source,
 100                          boolean ordered) throws ReflectiveOperationException {
 101             if (!clazz.isAssignableFrom(map.getClass()))
 102                 fail(String.format("Class mismatch in GroupedMapAssertion: %s, %s", clazz, map.getClass()));
 103             assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
 104             for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
 105                 K key = entry.getKey();
 106                 downstream.assertValue(entry.getValue(),
 107                                        () -> source.get().filter(e -> classifier.apply(e).equals(key)),
 108                                        ordered);
 109             }
 110         }
 111     }
 112 
 113     static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends TabulationAssertion<T, M> {
 114         private final Class<? extends Map> clazz;
 115         private final Function<T, K> keyFn;
 116         private final Function<T, V> valueFn;
 117         private final BinaryOperator<V> mergeFn;
 118 
 119         ToMapAssertion(Function<T, K> keyFn,
 120                        Function<T, V> valueFn,
 121                        BinaryOperator<V> mergeFn,
 122                        Class<? extends Map> clazz) {
 123             this.clazz = clazz;
 124             this.keyFn = keyFn;
 125             this.valueFn = valueFn;
 126             this.mergeFn = mergeFn;
 127         }
 128 
 129         @Override
 130         void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
 131             Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
 132             assertTrue(clazz.isAssignableFrom(map.getClass()));
 133             assertEquals(uniqueKeys, map.keySet());
 134             source.get().forEach(t -> {
 135                 K key = keyFn.apply(t);
 136                 V v = source.get()
 137                             .filter(e -> key.equals(keyFn.apply(e)))
 138                             .map(valueFn)
 139                             .reduce(mergeFn)
 140                             .get();
 141                 assertEquals(map.get(key), v);
 142             });
 143         }
 144     }
 145 
 146     static class PartitionAssertion<T, D> extends TabulationAssertion<T, Map<Boolean,D>> {
 147         private final Predicate<T> predicate;
 148         private final TabulationAssertion<T,D> downstream;
 149 
 150         protected PartitionAssertion(Predicate<T> predicate,
 151                                      TabulationAssertion<T, D> downstream) {
 152             this.predicate = predicate;
 153             this.downstream = downstream;
 154         }
 155 
 156         void assertValue(Map<Boolean, D> map,
 157                          Supplier<Stream<T>> source,
 158                          boolean ordered) throws ReflectiveOperationException {
 159             if (!Map.class.isAssignableFrom(map.getClass()))
 160                 fail(String.format("Class mismatch in PartitionAssertion: %s", map.getClass()));
 161             assertEquals(2, map.size());
 162             downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
 163             downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
 164         }
 165     }
 166 
 167     @SuppressWarnings({"rawtypes", "unchecked"})
 168     static class ListAssertion<T> extends TabulationAssertion<T, List<T>> {
 169         @Override
 170         void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
 171                 throws ReflectiveOperationException {
 172             if (!List.class.isAssignableFrom(value.getClass()))
 173                 fail(String.format("Class mismatch in ListAssertion: %s", value.getClass()));
 174             Stream<T> stream = source.get();
 175             List<T> result = new ArrayList<>();
 176             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
 177                 result.add(it.next());
 178             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
 179                 assertContents(value, result);
 180             else
 181                 assertContentsUnordered(value, result);
 182         }
 183     }
 184 
 185     @SuppressWarnings({"rawtypes", "unchecked"})
 186     static class CollectionAssertion<T> extends TabulationAssertion<T, Collection<T>> {
 187         private final Class<? extends Collection> clazz;
 188         private final boolean targetOrdered;
 189 
 190         protected CollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
 191             this.clazz = clazz;
 192             this.targetOrdered = targetOrdered;
 193         }
 194 
 195         @Override
 196         void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
 197                 throws ReflectiveOperationException {
 198             if (!clazz.isAssignableFrom(value.getClass()))
 199                 fail(String.format("Class mismatch in CollectionAssertion: %s, %s", clazz, value.getClass()));
 200             Stream<T> stream = source.get();
 201             Collection<T> result = clazz.newInstance();
 202             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
 203                 result.add(it.next());
 204             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
 205                 assertContents(value, result);
 206             else
 207                 assertContentsUnordered(value, result);
 208         }
 209     }
 210 
 211     static class ReduceAssertion<T, U> extends TabulationAssertion<T, U> {
 212         private final U identity;
 213         private final Function<T, U> mapper;
 214         private final BinaryOperator<U> reducer;
 215 
 216         ReduceAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
 217             this.identity = identity;
 218             this.mapper = mapper;
 219             this.reducer = reducer;
 220         }
 221 
 222         @Override
 223         void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
 224                 throws ReflectiveOperationException {
 225             Optional<U> reduced = source.get().map(mapper).reduce(reducer);
 226             if (value == null)
 227                 assertTrue(!reduced.isPresent());
 228             else if (!reduced.isPresent()) {
 229                 assertEquals(value, identity);
 230             }
 231             else {
 232                 assertEquals(value, reduced.get());
 233             }
 234         }
 235     }
 236 
 237     private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
 238         return (act, exp, ord, par) -> {
 239             if (par && (!ordered || !ord)) {
 240                 TabulatorsTest.nestedMapEqualityAssertion(act, exp);
 241             }
 242             else {
 243                 LambdaTestHelpers.assertContentsEqual(act, exp);
 244             }
 245         };
 246     }
 247 
 248     private<T, M extends Map>
 249     void exerciseMapTabulation(TestData<T, Stream<T>> data,
 250                                Collector<T, ?, ? extends M> collector,
 251                                TabulationAssertion<T, M> assertion)
 252             throws ReflectiveOperationException {
 253         boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
 254 
 255         M m = withData(data)
 256                 .terminal(s -> s.collect(collector))
 257                 .resultAsserter(mapTabulationAsserter(ordered))
 258                 .exercise();
 259         assertion.assertValue(m, () -> data.stream(), ordered);
 260 
 261         m = withData(data)
 262                 .terminal(s -> s.unordered().collect(collector))
 263                 .resultAsserter(mapTabulationAsserter(ordered))
 264                 .exercise();
 265         assertion.assertValue(m, () -> data.stream(), false);
 266     }
 267 
 268     private static void nestedMapEqualityAssertion(Object o1, Object o2) {
 269         if (o1 instanceof Map) {
 270             Map m1 = (Map) o1;
 271             Map m2 = (Map) o2;
 272             assertContentsUnordered(m1.keySet(), m2.keySet());
 273             for (Object k : m1.keySet())
 274                 nestedMapEqualityAssertion(m1.get(k), m2.get(k));
 275         }
 276         else if (o1 instanceof Collection) {
 277             assertContentsUnordered(((Collection) o1), ((Collection) o2));
 278         }
 279         else
 280             assertEquals(o1, o2);
 281     }
 282 
 283     private<T, R> void assertCollect(TestData.OfRef<T> data,
 284                                      Collector<T, ?, R> collector,
 285                                      Function<Stream<T>, R> streamReduction) {
 286         R check = streamReduction.apply(data.stream());
 287         withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
 288     }
 289 
 290     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 291     public void testReduce(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 292         assertCollect(data, Collectors.reducing(0, Integer::sum),
 293                       s -> s.reduce(0, Integer::sum));
 294         assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
 295                       s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
 296         assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
 297                       s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
 298 
 299         assertCollect(data, Collectors.reducing(Integer::sum),
 300                       s -> s.reduce(Integer::sum));
 301         assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
 302                       s -> s.min(Integer::compare));
 303         assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
 304                       s -> s.max(Integer::compare));
 305 
 306         assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
 307                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
 308 
 309         assertCollect(data, Collectors.summingLong(x -> x * 2L),
 310                       s -> s.map(x -> x*2L).reduce(0L, Long::sum));
 311         assertCollect(data, Collectors.summingInt(x -> x * 2),
 312                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
 313         assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
 314                       s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
 315 
 316         assertCollect(data, Collectors.averagingInt(x -> x * 2),
 317                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
 318         assertCollect(data, Collectors.averagingLong(x -> x * 2),
 319                       s -> s.mapToLong(x -> x * 2).average().orElse(0));
 320         assertCollect(data, Collectors.averagingDouble(x -> x * 2),
 321                       s -> s.mapToDouble(x -> x * 2).average().orElse(0));
 322 
 323         // Test explicit Collector.of
 324         Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
 325                                                                    (a, b) -> {
 326                                                                        a[0] += b * 2;
 327                                                                        a[1]++;
 328                                                                    },
 329                                                                    (a, b) -> {
 330                                                                        a[0] += b[0];
 331                                                                        a[1] += b[1];
 332                                                                        return a;
 333                                                                    },
 334                                                                    a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
 335         assertCollect(data, avg2xint,
 336                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
 337     }
 338 
 339     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 340     public void testJoin(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 341         withData(data)
 342                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
 343                 .expectedResult(join(data, ""))
 344                 .exercise();
 345 
 346         Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
 347         withData(data)
 348                 .terminal(s -> s.map(Object::toString).collect(likeJoining))
 349                 .expectedResult(join(data, ""))
 350                 .exercise();
 351 
 352         withData(data)
 353                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
 354                 .expectedResult(join(data, ","))
 355                 .exercise();
 356 
 357         withData(data)
 358                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
 359                 .expectedResult("[" + join(data, ",") + "]")
 360                 .exercise();
 361 
 362         withData(data)
 363                 .terminal(s -> s.map(Object::toString)
 364                                 .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
 365                                 .toString())
 366                 .expectedResult(join(data, ""))
 367                 .exercise();
 368 
 369         withData(data)
 370                 .terminal(s -> s.map(Object::toString)
 371                                 .collect(() -> new StringJoiner(","),
 372                                          (sj, cs) -> sj.add(cs),
 373                                          (j1, j2) -> j1.merge(j2))
 374                                 .toString())
 375                 .expectedResult(join(data, ","))
 376                 .exercise();
 377 
 378         withData(data)
 379                 .terminal(s -> s.map(Object::toString)
 380                                 .collect(() -> new StringJoiner(",", "[", "]"),
 381                                          (sj, cs) -> sj.add(cs),
 382                                          (j1, j2) -> j1.merge(j2))
 383                                 .toString())
 384                 .expectedResult("[" + join(data, ",") + "]")
 385                 .exercise();
 386     }
 387 
 388     private<T> String join(TestData.OfRef<T> data, String delim) {
 389         StringBuilder sb = new StringBuilder();
 390         boolean first = true;
 391         for (T i : data) {
 392             if (!first)
 393                 sb.append(delim);
 394             sb.append(i.toString());
 395             first = false;
 396         }
 397         return sb.toString();
 398     }
 399 
 400     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 401     public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 402         Function<Integer, Integer> keyFn = i -> i * 2;
 403         Function<Integer, Integer> valueFn = i -> i * 4;
 404 
 405         List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
 406         Set<Integer> dataAsSet = new HashSet<>(dataAsList);
 407 
 408         BinaryOperator<Integer> sum = Integer::sum;
 409         for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
 410                                                         (u, v) -> v,
 411                                                         sum)) {
 412             try {
 413                 exerciseMapTabulation(data, toMap(keyFn, valueFn),
 414                                       new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
 415                 if (dataAsList.size() != dataAsSet.size())
 416                     fail("Expected ISE on input with duplicates");
 417             }
 418             catch (IllegalStateException e) {
 419                 if (dataAsList.size() == dataAsSet.size())
 420                     fail("Expected no ISE on input without duplicates");
 421             }
 422 
 423             exerciseMapTabulation(data, toMap(keyFn, valueFn, op),
 424                                   new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
 425 
 426             exerciseMapTabulation(data, toMap(keyFn, valueFn, op, TreeMap::new),
 427                                   new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
 428         }
 429 
 430         // For concurrent maps, only use commutative merge functions
 431         try {
 432             exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn),
 433                                   new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
 434             if (dataAsList.size() != dataAsSet.size())
 435                 fail("Expected ISE on input with duplicates");
 436         }
 437         catch (IllegalStateException e) {
 438             if (dataAsList.size() == dataAsSet.size())
 439                 fail("Expected no ISE on input without duplicates");
 440         }
 441 
 442         exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum),
 443                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
 444 
 445         exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
 446                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
 447     }
 448 
 449     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 450     public void testSimpleGroupBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 451         Function<Integer, Integer> classifier = i -> i % 3;
 452 
 453         // Single-level groupBy
 454         exerciseMapTabulation(data, groupingBy(classifier),
 455                               new GroupedMapAssertion<>(classifier, HashMap.class,
 456                                                         new ListAssertion<>()));
 457         exerciseMapTabulation(data, groupingByConcurrent(classifier),
 458                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
 459                                                         new ListAssertion<>()));
 460 
 461         // With explicit constructors
 462         exerciseMapTabulation(data,
 463                               groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
 464                               new GroupedMapAssertion<>(classifier, TreeMap.class,
 465                                                         new CollectionAssertion<Integer>(HashSet.class, false)));
 466         exerciseMapTabulation(data,
 467                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
 468                                                    toCollection(HashSet::new)),
 469                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
 470                                                         new CollectionAssertion<Integer>(HashSet.class, false)));
 471     }
 472 
 473     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 474     public void testTwoLevelGroupBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 475         Function<Integer, Integer> classifier = i -> i % 6;
 476         Function<Integer, Integer> classifier2 = i -> i % 23;
 477 
 478         // Two-level groupBy
 479         exerciseMapTabulation(data,
 480                               groupingBy(classifier, groupingBy(classifier2)),
 481                               new GroupedMapAssertion<>(classifier, HashMap.class,
 482                                                         new GroupedMapAssertion<>(classifier2, HashMap.class,
 483                                                                                   new ListAssertion<>())));
 484         // with concurrent as upstream
 485         exerciseMapTabulation(data,
 486                               groupingByConcurrent(classifier, groupingBy(classifier2)),
 487                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
 488                                                         new GroupedMapAssertion<>(classifier2, HashMap.class,
 489                                                                                   new ListAssertion<>())));
 490         // with concurrent as downstream
 491         exerciseMapTabulation(data,
 492                               groupingBy(classifier, groupingByConcurrent(classifier2)),
 493                               new GroupedMapAssertion<>(classifier, HashMap.class,
 494                                                         new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class,
 495                                                                                   new ListAssertion<>())));
 496         // with concurrent as upstream and downstream
 497         exerciseMapTabulation(data,
 498                               groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
 499                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
 500                                                         new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class,
 501                                                                                   new ListAssertion<>())));
 502 
 503         // With explicit constructors
 504         exerciseMapTabulation(data,
 505                               groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
 506                               new GroupedMapAssertion<>(classifier, TreeMap.class,
 507                                                         new GroupedMapAssertion<>(classifier2, TreeMap.class,
 508                                                                                   new CollectionAssertion<Integer>(HashSet.class, false))));
 509         // with concurrent as upstream
 510         exerciseMapTabulation(data,
 511                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
 512                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
 513                                                         new GroupedMapAssertion<>(classifier2, TreeMap.class,
 514                                                                                   new ListAssertion<>())));
 515         // with concurrent as downstream
 516         exerciseMapTabulation(data,
 517                               groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
 518                               new GroupedMapAssertion<>(classifier, TreeMap.class,
 519                                                         new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class,
 520                                                                                   new ListAssertion<>())));
 521         // with concurrent as upstream and downstream
 522         exerciseMapTabulation(data,
 523                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
 524                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
 525                                                         new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class,
 526                                                                                   new ListAssertion<>())));
 527     }
 528 
 529     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 530     public void testGroupedReduce(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 531         Function<Integer, Integer> classifier = i -> i % 3;
 532 
 533         // Single-level simple reduce
 534         exerciseMapTabulation(data,
 535                               groupingBy(classifier, reducing(0, Integer::sum)),
 536                               new GroupedMapAssertion<>(classifier, HashMap.class,
 537                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 538         // with concurrent
 539         exerciseMapTabulation(data,
 540                               groupingByConcurrent(classifier, reducing(0, Integer::sum)),
 541                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
 542                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 543 
 544         // With explicit constructors
 545         exerciseMapTabulation(data,
 546                               groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
 547                               new GroupedMapAssertion<>(classifier, TreeMap.class,
 548                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 549         // with concurrent
 550         exerciseMapTabulation(data,
 551                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
 552                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
 553                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 554 
 555         // Single-level map-reduce
 556         exerciseMapTabulation(data,
 557                               groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
 558                               new GroupedMapAssertion<>(classifier, HashMap.class,
 559                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
 560         // with concurrent
 561         exerciseMapTabulation(data,
 562                               groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
 563                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
 564                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
 565 
 566         // With explicit constructors
 567         exerciseMapTabulation(data,
 568                               groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
 569                               new GroupedMapAssertion<>(classifier, TreeMap.class,
 570                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
 571         // with concurrent
 572         exerciseMapTabulation(data,
 573                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
 574                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
 575                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
 576     }
 577 
 578     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 579     public void testSimplePartition(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 580         Predicate<Integer> classifier = i -> i % 3 == 0;
 581 
 582         // Single-level partition to downstream List
 583         exerciseMapTabulation(data,
 584                               partitioningBy(classifier),
 585                               new PartitionAssertion<>(classifier, new ListAssertion<>()));
 586         exerciseMapTabulation(data,
 587                               partitioningBy(classifier, toList()),
 588                               new PartitionAssertion<>(classifier, new ListAssertion<>()));
 589     }
 590 
 591     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 592     public void testTwoLevelPartition(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 593         Predicate<Integer> classifier = i -> i % 3 == 0;
 594         Predicate<Integer> classifier2 = i -> i % 7 == 0;
 595 
 596         // Two level partition
 597         exerciseMapTabulation(data,
 598                               partitioningBy(classifier, partitioningBy(classifier2)),
 599                               new PartitionAssertion<>(classifier,
 600                                                        new PartitionAssertion(classifier2, new ListAssertion<>())));
 601 
 602         // Two level partition with reduce
 603         exerciseMapTabulation(data,
 604                               partitioningBy(classifier, reducing(0, Integer::sum)),
 605                               new PartitionAssertion<>(classifier,
 606                                                        new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 607     }
 608 
 609     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 610     public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 611         List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList()));
 612         List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList)));
 613         assertEquals(asList, asImmutableList);
 614         try {
 615             asImmutableList.add(0);
 616             fail("Expecting immutable result");
 617         }
 618         catch (UnsupportedOperationException ignored) { }
 619     }
 620 
 621 }