1 /*
   2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 package org.openjdk.tests.java.util.stream;
  24 
  25 import java.util.ArrayList;
  26 import java.util.Arrays;
  27 import java.util.Collection;
  28 import java.util.Collections;
  29 import java.util.Comparator;
  30 import java.util.HashMap;
  31 import java.util.HashSet;
  32 import java.util.IntSummaryStatistics;
  33 import java.util.Iterator;
  34 import java.util.List;
  35 import java.util.Map;
  36 import java.util.Optional;
  37 import java.util.Set;
  38 import java.util.StringJoiner;
  39 import java.util.TreeMap;
  40 import java.util.concurrent.ConcurrentHashMap;
  41 import java.util.concurrent.ConcurrentSkipListMap;
  42 import java.util.concurrent.atomic.AtomicInteger;
  43 import java.util.function.BiFunction;
  44 import java.util.function.BinaryOperator;
  45 import java.util.function.Function;
  46 import java.util.function.Predicate;
  47 import java.util.function.Supplier;
  48 import java.util.stream.Collector;
  49 import java.util.stream.Collectors;
  50 import java.util.stream.LambdaTestHelpers;
  51 import java.util.stream.OpTestCase;
  52 import java.util.stream.Stream;
  53 import java.util.stream.StreamOpFlagTestHelper;
  54 import java.util.stream.StreamTestDataProvider;
  55 import java.util.stream.TestData;
  56 
  57 import org.testng.annotations.Test;
  58 
  59 import static java.util.stream.Collectors.collectingAndThen;
  60 import static java.util.stream.Collectors.flatMapping;
  61 import static java.util.stream.Collectors.filtering;
  62 import static java.util.stream.Collectors.groupingBy;
  63 import static java.util.stream.Collectors.groupingByConcurrent;
  64 import static java.util.stream.Collectors.mapping;
  65 import static java.util.stream.Collectors.partitioningBy;
  66 import static java.util.stream.Collectors.reducing;
  67 import static java.util.stream.Collectors.toCollection;
  68 import static java.util.stream.Collectors.toConcurrentMap;
  69 import static java.util.stream.Collectors.toList;
  70 import static java.util.stream.Collectors.toMap;
  71 import static java.util.stream.Collectors.toSet;
  72 import static java.util.stream.LambdaTestHelpers.assertContents;
  73 import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
  74 import static java.util.stream.LambdaTestHelpers.mDoubler;
  75 
  76 /*
  77  * @test
  78  * @bug 8071600 8144675
  79  * @summary Test for collectors.
  80  */
  81 public class CollectorsTest extends OpTestCase {
  82 
  83     private abstract static class CollectorAssertion<T, U> {
  84         abstract void assertValue(U value,
  85                                   Supplier<Stream<T>> source,
  86                                   boolean ordered) throws ReflectiveOperationException;
  87     }
  88 
  89     static class MappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
  90         private final Function<T, V> mapper;
  91         private final CollectorAssertion<V, R> downstream;
  92 
  93         MappingAssertion(Function<T, V> mapper, CollectorAssertion<V, R> downstream) {
  94             this.mapper = mapper;
  95             this.downstream = downstream;
  96         }
  97 
  98         @Override
  99         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
 100             downstream.assertValue(value,
 101                                    () -> source.get().map(mapper::apply),
 102                                    ordered);
 103         }
 104     }
 105 
 106     static class FlatMappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
 107         private final Function<T, Stream<V>> mapper;
 108         private final CollectorAssertion<V, R> downstream;
 109 
 110         FlatMappingAssertion(Function<T, Stream<V>> mapper,
 111                              CollectorAssertion<V, R> downstream) {
 112             this.mapper = mapper;
 113             this.downstream = downstream;
 114         }
 115 
 116         @Override
 117         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
 118             downstream.assertValue(value,
 119                                    () -> source.get().flatMap(mapper::apply),
 120                                    ordered);
 121         }
 122     }
 123 
 124     static class FilteringAssertion<T, R> extends CollectorAssertion<T, R> {
 125         private final Predicate<T> filter;
 126         private final CollectorAssertion<T, R> downstream;
 127 
 128         public FilteringAssertion(Predicate<T> filter, CollectorAssertion<T, R> downstream) {
 129             this.filter = filter;
 130             this.downstream = downstream;
 131         }
 132 
 133         @Override
 134         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
 135             downstream.assertValue(value,
 136                                    () -> source.get().filter(filter),
 137                                    ordered);
 138         }
 139     }
 140 
 141     static class GroupingByAssertion<T, K, V, M extends Map<K, ? extends V>> extends CollectorAssertion<T, M> {
 142         private final Class<? extends Map> clazz;
 143         private final Function<T, K> classifier;
 144         private final CollectorAssertion<T,V> downstream;
 145 
 146         GroupingByAssertion(Function<T, K> classifier, Class<? extends Map> clazz,
 147                             CollectorAssertion<T, V> downstream) {
 148             this.clazz = clazz;
 149             this.classifier = classifier;
 150             this.downstream = downstream;
 151         }
 152 
 153         @Override
 154         void assertValue(M map,
 155                          Supplier<Stream<T>> source,
 156                          boolean ordered) throws ReflectiveOperationException {
 157             if (!clazz.isAssignableFrom(map.getClass()))
 158                 fail(String.format("Class mismatch in GroupingByAssertion: %s, %s", clazz, map.getClass()));
 159             assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
 160             for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
 161                 K key = entry.getKey();
 162                 downstream.assertValue(entry.getValue(),
 163                                        () -> source.get().filter(e -> classifier.apply(e).equals(key)),
 164                                        ordered);
 165             }
 166         }
 167     }
 168 
 169     static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends CollectorAssertion<T, M> {
 170         private final Class<? extends Map> clazz;
 171         private final Function<T, K> keyFn;
 172         private final Function<T, V> valueFn;
 173         private final BinaryOperator<V> mergeFn;
 174 
 175         ToMapAssertion(Function<T, K> keyFn,
 176                        Function<T, V> valueFn,
 177                        BinaryOperator<V> mergeFn,
 178                        Class<? extends Map> clazz) {
 179             this.clazz = clazz;
 180             this.keyFn = keyFn;
 181             this.valueFn = valueFn;
 182             this.mergeFn = mergeFn;
 183         }
 184 
 185         @Override
 186         void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
 187             if (!clazz.isAssignableFrom(map.getClass()))
 188                 fail(String.format("Class mismatch in ToMapAssertion: %s, %s", clazz, map.getClass()));
 189             Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
 190             assertEquals(uniqueKeys, map.keySet());
 191             source.get().forEach(t -> {
 192                 K key = keyFn.apply(t);
 193                 V v = source.get()
 194                             .filter(e -> key.equals(keyFn.apply(e)))
 195                             .map(valueFn)
 196                             .reduce(mergeFn)
 197                             .get();
 198                 assertEquals(map.get(key), v);
 199             });
 200         }
 201     }
 202 
 203     static class PartitioningByAssertion<T, D> extends CollectorAssertion<T, Map<Boolean,D>> {
 204         private final Predicate<T> predicate;
 205         private final CollectorAssertion<T,D> downstream;
 206 
 207         PartitioningByAssertion(Predicate<T> predicate, CollectorAssertion<T, D> downstream) {
 208             this.predicate = predicate;
 209             this.downstream = downstream;
 210         }
 211 
 212         @Override
 213         void assertValue(Map<Boolean, D> map,
 214                          Supplier<Stream<T>> source,
 215                          boolean ordered) throws ReflectiveOperationException {
 216             if (!Map.class.isAssignableFrom(map.getClass()))
 217                 fail(String.format("Class mismatch in PartitioningByAssertion: %s", map.getClass()));
 218             assertEquals(2, map.size());
 219             downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
 220             downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
 221         }
 222     }
 223 
 224     static class ToListAssertion<T> extends CollectorAssertion<T, List<T>> {
 225         @Override
 226         void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
 227                 throws ReflectiveOperationException {
 228             if (!List.class.isAssignableFrom(value.getClass()))
 229                 fail(String.format("Class mismatch in ToListAssertion: %s", value.getClass()));
 230             Stream<T> stream = source.get();
 231             List<T> result = new ArrayList<>();
 232             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
 233                 result.add(it.next());
 234             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
 235                 assertContents(value, result);
 236             else
 237                 assertContentsUnordered(value, result);
 238         }
 239     }
 240 
 241     static class ToCollectionAssertion<T> extends CollectorAssertion<T, Collection<T>> {
 242         private final Class<? extends Collection> clazz;
 243         private final boolean targetOrdered;
 244 
 245         ToCollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
 246             this.clazz = clazz;
 247             this.targetOrdered = targetOrdered;
 248         }
 249 
 250         @Override
 251         void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
 252                 throws ReflectiveOperationException {
 253             if (!clazz.isAssignableFrom(value.getClass()))
 254                 fail(String.format("Class mismatch in ToCollectionAssertion: %s, %s", clazz, value.getClass()));
 255             Stream<T> stream = source.get();
 256             Collection<T> result = clazz.newInstance();
 257             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
 258                 result.add(it.next());
 259             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
 260                 assertContents(value, result);
 261             else
 262                 assertContentsUnordered(value, result);
 263         }
 264     }
 265 
 266     static class ReducingAssertion<T, U> extends CollectorAssertion<T, U> {
 267         private final U identity;
 268         private final Function<T, U> mapper;
 269         private final BinaryOperator<U> reducer;
 270 
 271         ReducingAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
 272             this.identity = identity;
 273             this.mapper = mapper;
 274             this.reducer = reducer;
 275         }
 276 
 277         @Override
 278         void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
 279                 throws ReflectiveOperationException {
 280             Optional<U> reduced = source.get().map(mapper).reduce(reducer);
 281             if (value == null)
 282                 assertTrue(!reduced.isPresent());
 283             else if (!reduced.isPresent()) {
 284                 assertEquals(value, identity);
 285             }
 286             else {
 287                 assertEquals(value, reduced.get());
 288             }
 289         }
 290     }
 291 
 292     static class PairingAssertion<T, R1, R2, RR> extends CollectorAssertion<T, RR> {
 293         private final Collector<T, ?, R1> c1;
 294         private final Collector<T, ?, R2> c2;
 295         private final BiFunction<? super R1, ? super R2, ? extends RR> finisher;
 296 
 297         PairingAssertion(Collector<T, ?, R1> c1, Collector<T, ?, R2> c2,
 298                          BiFunction<? super R1, ? super R2, ? extends RR> finisher) {
 299             this.c1 = c1;
 300             this.c2 = c2;
 301             this.finisher = finisher;
 302         }
 303 
 304         @Override
 305         void assertValue(RR value, Supplier<Stream<T>> source, boolean ordered) {
 306             R1 r1 = source.get().collect(c1);
 307             R2 r2 = source.get().collect(c2);
 308             RR expected = finisher.apply(r1, r2);
 309             assertEquals(value, expected);
 310         }
 311     }
 312 
 313     private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
 314         return (act, exp, ord, par) -> {
 315             if (par && (!ordered || !ord)) {
 316                 CollectorsTest.nestedMapEqualityAssertion(act, exp);
 317             }
 318             else {
 319                 LambdaTestHelpers.assertContentsEqual(act, exp);
 320             }
 321         };
 322     }
 323 
 324     private<T, M extends Map>
 325     void exerciseMapCollection(TestData<T, Stream<T>> data,
 326                                Collector<T, ?, ? extends M> collector,
 327                                CollectorAssertion<T, M> assertion)
 328             throws ReflectiveOperationException {
 329         boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
 330 
 331         M m = withData(data)
 332                 .terminal(s -> s.collect(collector))
 333                 .resultAsserter(mapTabulationAsserter(ordered))
 334                 .exercise();
 335         assertion.assertValue(m, () -> data.stream(), ordered);
 336 
 337         m = withData(data)
 338                 .terminal(s -> s.unordered().collect(collector))
 339                 .resultAsserter(mapTabulationAsserter(ordered))
 340                 .exercise();
 341         assertion.assertValue(m, () -> data.stream(), false);
 342     }
 343 
 344     private static void nestedMapEqualityAssertion(Object o1, Object o2) {
 345         if (o1 instanceof Map) {
 346             Map m1 = (Map) o1;
 347             Map m2 = (Map) o2;
 348             assertContentsUnordered(m1.keySet(), m2.keySet());
 349             for (Object k : m1.keySet())
 350                 nestedMapEqualityAssertion(m1.get(k), m2.get(k));
 351         }
 352         else if (o1 instanceof Collection) {
 353             assertContentsUnordered(((Collection) o1), ((Collection) o2));
 354         }
 355         else
 356             assertEquals(o1, o2);
 357     }
 358 
 359     private<T, R> void assertCollect(TestData.OfRef<T> data,
 360                                      Collector<T, ?, R> collector,
 361                                      Function<Stream<T>, R> streamReduction) {
 362         R check = streamReduction.apply(data.stream());
 363         withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
 364     }
 365 
 366     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 367     public void testReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 368         assertCollect(data, Collectors.reducing(0, Integer::sum),
 369                       s -> s.reduce(0, Integer::sum));
 370         assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
 371                       s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
 372         assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
 373                       s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
 374 
 375         assertCollect(data, Collectors.reducing(Integer::sum),
 376                       s -> s.reduce(Integer::sum));
 377         assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
 378                       s -> s.min(Integer::compare));
 379         assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
 380                       s -> s.max(Integer::compare));
 381 
 382         assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
 383                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
 384 
 385         assertCollect(data, Collectors.summingLong(x -> x * 2L),
 386                       s -> s.map(x -> x*2L).reduce(0L, Long::sum));
 387         assertCollect(data, Collectors.summingInt(x -> x * 2),
 388                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
 389         assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
 390                       s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
 391 
 392         assertCollect(data, Collectors.averagingInt(x -> x * 2),
 393                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
 394         assertCollect(data, Collectors.averagingLong(x -> x * 2),
 395                       s -> s.mapToLong(x -> x * 2).average().orElse(0));
 396         assertCollect(data, Collectors.averagingDouble(x -> x * 2),
 397                       s -> s.mapToDouble(x -> x * 2).average().orElse(0));
 398 
 399         // Test explicit Collector.of
 400         Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
 401                                                                    (a, b) -> {
 402                                                                        a[0] += b * 2;
 403                                                                        a[1]++;
 404                                                                    },
 405                                                                    (a, b) -> {
 406                                                                        a[0] += b[0];
 407                                                                        a[1] += b[1];
 408                                                                        return a;
 409                                                                    },
 410                                                                    a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
 411         assertCollect(data, avg2xint,
 412                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
 413     }
 414 
 415     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 416     public void testJoining(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 417         withData(data)
 418                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
 419                 .expectedResult(join(data, ""))
 420                 .exercise();
 421 
 422         Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
 423         withData(data)
 424                 .terminal(s -> s.map(Object::toString).collect(likeJoining))
 425                 .expectedResult(join(data, ""))
 426                 .exercise();
 427 
 428         withData(data)
 429                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
 430                 .expectedResult(join(data, ","))
 431                 .exercise();
 432 
 433         withData(data)
 434                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
 435                 .expectedResult("[" + join(data, ",") + "]")
 436                 .exercise();
 437 
 438         withData(data)
 439                 .terminal(s -> s.map(Object::toString)
 440                                 .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
 441                                 .toString())
 442                 .expectedResult(join(data, ""))
 443                 .exercise();
 444 
 445         withData(data)
 446                 .terminal(s -> s.map(Object::toString)
 447                                 .collect(() -> new StringJoiner(","),
 448                                          (sj, cs) -> sj.add(cs),
 449                                          (j1, j2) -> j1.merge(j2))
 450                                 .toString())
 451                 .expectedResult(join(data, ","))
 452                 .exercise();
 453 
 454         withData(data)
 455                 .terminal(s -> s.map(Object::toString)
 456                                 .collect(() -> new StringJoiner(",", "[", "]"),
 457                                          (sj, cs) -> sj.add(cs),
 458                                          (j1, j2) -> j1.merge(j2))
 459                                 .toString())
 460                 .expectedResult("[" + join(data, ",") + "]")
 461                 .exercise();
 462     }
 463 
 464     private<T> String join(TestData.OfRef<T> data, String delim) {
 465         StringBuilder sb = new StringBuilder();
 466         boolean first = true;
 467         for (T i : data) {
 468             if (!first)
 469                 sb.append(delim);
 470             sb.append(i.toString());
 471             first = false;
 472         }
 473         return sb.toString();
 474     }
 475 
 476     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 477     public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 478         Function<Integer, Integer> keyFn = i -> i * 2;
 479         Function<Integer, Integer> valueFn = i -> i * 4;
 480 
 481         List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
 482         Set<Integer> dataAsSet = new HashSet<>(dataAsList);
 483 
 484         BinaryOperator<Integer> sum = Integer::sum;
 485         for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
 486                                                         (u, v) -> v,
 487                                                         sum)) {
 488             try {
 489                 exerciseMapCollection(data, toMap(keyFn, valueFn),
 490                                       new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
 491                 if (dataAsList.size() != dataAsSet.size())
 492                     fail("Expected ISE on input with duplicates");
 493             }
 494             catch (IllegalStateException e) {
 495                 if (dataAsList.size() == dataAsSet.size())
 496                     fail("Expected no ISE on input without duplicates");
 497             }
 498 
 499             exerciseMapCollection(data, toMap(keyFn, valueFn, op),
 500                                   new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
 501 
 502             exerciseMapCollection(data, toMap(keyFn, valueFn, op, TreeMap::new),
 503                                   new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
 504         }
 505 
 506         // For concurrent maps, only use commutative merge functions
 507         try {
 508             exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn),
 509                                   new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
 510             if (dataAsList.size() != dataAsSet.size())
 511                 fail("Expected ISE on input with duplicates");
 512         }
 513         catch (IllegalStateException e) {
 514             if (dataAsList.size() == dataAsSet.size())
 515                 fail("Expected no ISE on input without duplicates");
 516         }
 517 
 518         exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum),
 519                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
 520 
 521         exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
 522                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
 523     }
 524 
 525     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 526     public void testSimpleGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 527         Function<Integer, Integer> classifier = i -> i % 3;
 528 
 529         // Single-level groupBy
 530         exerciseMapCollection(data, groupingBy(classifier),
 531                               new GroupingByAssertion<>(classifier, HashMap.class,
 532                                                         new ToListAssertion<>()));
 533         exerciseMapCollection(data, groupingByConcurrent(classifier),
 534                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 535                                                         new ToListAssertion<>()));
 536 
 537         // With explicit constructors
 538         exerciseMapCollection(data,
 539                               groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
 540                               new GroupingByAssertion<>(classifier, TreeMap.class,
 541                                                         new ToCollectionAssertion<Integer>(HashSet.class, false)));
 542         exerciseMapCollection(data,
 543                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
 544                                                    toCollection(HashSet::new)),
 545                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 546                                                         new ToCollectionAssertion<Integer>(HashSet.class, false)));
 547     }
 548 
 549     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 550     public void testGroupingByWithMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 551         Function<Integer, Integer> classifier = i -> i % 3;
 552         Function<Integer, Integer> mapper = i -> i * 2;
 553 
 554         exerciseMapCollection(data,
 555                               groupingBy(classifier, mapping(mapper, toList())),
 556                               new GroupingByAssertion<>(classifier, HashMap.class,
 557                                                         new MappingAssertion<>(mapper,
 558                                                                                new ToListAssertion<>())));
 559     }
 560 
 561     @Test(groups = { "serialization-hostile" })
 562     public void testFlatMappingClose() {
 563         Function<Integer, Integer> classifier = i -> i;
 564         AtomicInteger ai = new AtomicInteger();
 565         Function<Integer, Stream<Integer>> flatMapper = i -> Stream.of(i, i).onClose(ai::getAndIncrement);
 566         Map<Integer, List<Integer>> m = Stream.of(1, 2).collect(groupingBy(classifier, flatMapping(flatMapper, toList())));
 567         assertEquals(m.size(), ai.get());
 568     }
 569 
 570     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 571     public void testGroupingByWithFlatMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 572         Function<Integer, Integer> classifier = i -> i % 3;
 573         Function<Integer, Stream<Integer>> flatMapperByNull = i -> null;
 574         Function<Integer, Stream<Integer>> flatMapperBy0 = i -> Stream.empty();
 575         Function<Integer, Stream<Integer>> flatMapperBy2 = i -> Stream.of(i, i);
 576 
 577         exerciseMapCollection(data,
 578                               groupingBy(classifier, flatMapping(flatMapperByNull, toList())),
 579                               new GroupingByAssertion<>(classifier, HashMap.class,
 580                                                         new FlatMappingAssertion<>(flatMapperBy0,
 581                                                                                    new ToListAssertion<>())));
 582         exerciseMapCollection(data,
 583                               groupingBy(classifier, flatMapping(flatMapperBy0, toList())),
 584                               new GroupingByAssertion<>(classifier, HashMap.class,
 585                                                         new FlatMappingAssertion<>(flatMapperBy0,
 586                                                                                    new ToListAssertion<>())));
 587         exerciseMapCollection(data,
 588                               groupingBy(classifier, flatMapping(flatMapperBy2, toList())),
 589                               new GroupingByAssertion<>(classifier, HashMap.class,
 590                                                         new FlatMappingAssertion<>(flatMapperBy2,
 591                                                                                    new ToListAssertion<>())));
 592     }
 593 
 594     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 595     public void testGroupingByWithFiltering(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 596         Function<Integer, Integer> classifier = i -> i % 3;
 597         Predicate<Integer> filteringByMod2 = i -> i % 2 == 0;
 598         Predicate<Integer> filteringByUnder100 = i -> i % 2 < 100;
 599         Predicate<Integer> filteringByTrue = i -> true;
 600         Predicate<Integer> filteringByFalse = i -> false;
 601 
 602         exerciseMapCollection(data,
 603                               groupingBy(classifier, filtering(filteringByMod2, toList())),
 604                               new GroupingByAssertion<>(classifier, HashMap.class,
 605                                                         new FilteringAssertion<>(filteringByMod2,
 606                                                                                    new ToListAssertion<>())));
 607         exerciseMapCollection(data,
 608                               groupingBy(classifier, filtering(filteringByUnder100, toList())),
 609                               new GroupingByAssertion<>(classifier, HashMap.class,
 610                                                         new FilteringAssertion<>(filteringByUnder100,
 611                                                                                    new ToListAssertion<>())));
 612         exerciseMapCollection(data,
 613                               groupingBy(classifier, filtering(filteringByTrue, toList())),
 614                               new GroupingByAssertion<>(classifier, HashMap.class,
 615                                                         new FilteringAssertion<>(filteringByTrue,
 616                                                                                    new ToListAssertion<>())));
 617         exerciseMapCollection(data,
 618                               groupingBy(classifier, filtering(filteringByFalse, toList())),
 619                               new GroupingByAssertion<>(classifier, HashMap.class,
 620                                                         new FilteringAssertion<>(filteringByFalse,
 621                                                                                    new ToListAssertion<>())));
 622     }
 623 
 624     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 625     public void testTwoLevelGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 626         Function<Integer, Integer> classifier = i -> i % 6;
 627         Function<Integer, Integer> classifier2 = i -> i % 23;
 628 
 629         // Two-level groupBy
 630         exerciseMapCollection(data,
 631                               groupingBy(classifier, groupingBy(classifier2)),
 632                               new GroupingByAssertion<>(classifier, HashMap.class,
 633                                                         new GroupingByAssertion<>(classifier2, HashMap.class,
 634                                                                                   new ToListAssertion<>())));
 635         // with concurrent as upstream
 636         exerciseMapCollection(data,
 637                               groupingByConcurrent(classifier, groupingBy(classifier2)),
 638                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 639                                                         new GroupingByAssertion<>(classifier2, HashMap.class,
 640                                                                                   new ToListAssertion<>())));
 641         // with concurrent as downstream
 642         exerciseMapCollection(data,
 643                               groupingBy(classifier, groupingByConcurrent(classifier2)),
 644                               new GroupingByAssertion<>(classifier, HashMap.class,
 645                                                         new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
 646                                                                                   new ToListAssertion<>())));
 647         // with concurrent as upstream and downstream
 648         exerciseMapCollection(data,
 649                               groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
 650                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 651                                                         new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
 652                                                                                   new ToListAssertion<>())));
 653 
 654         // With explicit constructors
 655         exerciseMapCollection(data,
 656                               groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
 657                               new GroupingByAssertion<>(classifier, TreeMap.class,
 658                                                         new GroupingByAssertion<>(classifier2, TreeMap.class,
 659                                                                                   new ToCollectionAssertion<Integer>(HashSet.class, false))));
 660         // with concurrent as upstream
 661         exerciseMapCollection(data,
 662                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
 663                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 664                                                         new GroupingByAssertion<>(classifier2, TreeMap.class,
 665                                                                                   new ToListAssertion<>())));
 666         // with concurrent as downstream
 667         exerciseMapCollection(data,
 668                               groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
 669                               new GroupingByAssertion<>(classifier, TreeMap.class,
 670                                                         new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
 671                                                                                   new ToListAssertion<>())));
 672         // with concurrent as upstream and downstream
 673         exerciseMapCollection(data,
 674                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
 675                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 676                                                         new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
 677                                                                                   new ToListAssertion<>())));
 678     }
 679 
 680     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 681     public void testGroupubgByWithReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 682         Function<Integer, Integer> classifier = i -> i % 3;
 683 
 684         // Single-level simple reduce
 685         exerciseMapCollection(data,
 686                               groupingBy(classifier, reducing(0, Integer::sum)),
 687                               new GroupingByAssertion<>(classifier, HashMap.class,
 688                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 689         // with concurrent
 690         exerciseMapCollection(data,
 691                               groupingByConcurrent(classifier, reducing(0, Integer::sum)),
 692                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 693                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 694 
 695         // With explicit constructors
 696         exerciseMapCollection(data,
 697                               groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
 698                               new GroupingByAssertion<>(classifier, TreeMap.class,
 699                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 700         // with concurrent
 701         exerciseMapCollection(data,
 702                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
 703                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 704                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 705 
 706         // Single-level map-reduce
 707         exerciseMapCollection(data,
 708                               groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
 709                               new GroupingByAssertion<>(classifier, HashMap.class,
 710                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 711         // with concurrent
 712         exerciseMapCollection(data,
 713                               groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
 714                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
 715                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 716 
 717         // With explicit constructors
 718         exerciseMapCollection(data,
 719                               groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
 720                               new GroupingByAssertion<>(classifier, TreeMap.class,
 721                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 722         // with concurrent
 723         exerciseMapCollection(data,
 724                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
 725                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
 726                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
 727     }
 728 
 729     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 730     public void testSimplePartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 731         Predicate<Integer> classifier = i -> i % 3 == 0;
 732 
 733         // Single-level partition to downstream List
 734         exerciseMapCollection(data,
 735                               partitioningBy(classifier),
 736                               new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
 737         exerciseMapCollection(data,
 738                               partitioningBy(classifier, toList()),
 739                               new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
 740     }
 741 
 742     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 743     public void testTwoLevelPartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 744         Predicate<Integer> classifier = i -> i % 3 == 0;
 745         Predicate<Integer> classifier2 = i -> i % 7 == 0;
 746 
 747         // Two level partition
 748         exerciseMapCollection(data,
 749                               partitioningBy(classifier, partitioningBy(classifier2)),
 750                               new PartitioningByAssertion<>(classifier,
 751                                                             new PartitioningByAssertion(classifier2, new ToListAssertion<>())));
 752 
 753         // Two level partition with reduce
 754         exerciseMapCollection(data,
 755                               partitioningBy(classifier, reducing(0, Integer::sum)),
 756                               new PartitioningByAssertion<>(classifier,
 757                                                             new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
 758     }
 759 
 760     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 761     public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 762         List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList()));
 763         List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList)));
 764         assertEquals(asList, asImmutableList);
 765         try {
 766             asImmutableList.add(0);
 767             fail("Expecting immutable result");
 768         }
 769         catch (UnsupportedOperationException ignored) { }
 770     }
 771 
 772     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
 773     public void testPairing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
 774         Collector<Integer, ?, Long> summing = Collectors.summingLong(Integer::valueOf);
 775         Collector<Integer, ?, Long> counting = Collectors.counting();
 776         Collector<Integer, ?, Integer> min = collectingAndThen(Collectors.<Integer>minBy(Comparator.naturalOrder()),
 777                 opt -> opt.orElse(Integer.MAX_VALUE));
 778         Collector<Integer, ?, Integer> max = collectingAndThen(Collectors.<Integer>maxBy(Comparator.naturalOrder()),
 779                 opt -> opt.orElse(Integer.MIN_VALUE));
 780         Collector<Integer, ?, String> joining = mapping(String::valueOf, Collectors.joining(", ", "[", "]"));
 781 
 782         Collector<Integer, ?, Map.Entry<Long, Long>> sumAndCount = Collectors.pairing(summing, counting, Map::entry);
 783         Collector<Integer, ?, Map.Entry<Integer, Integer>> minAndMax = Collectors.pairing(min, max, Map::entry);
 784         Collector<Integer, ?, Double> averaging = Collectors.pairing(summing, counting,
 785                 (sum, count) -> ((double)sum) / count);
 786         Collector<Integer, ?, String> summaryStatistics = Collectors.pairing(sumAndCount, minAndMax,
 787                 (sumCountEntry, minMaxEntry) -> new IntSummaryStatistics(
 788                         sumCountEntry.getValue(), minMaxEntry.getKey(),
 789                         minMaxEntry.getValue(), sumCountEntry.getKey()).toString());
 790         Collector<Integer, ?, String> countAndContent = Collectors.pairing(counting, joining,
 791                 (count, content) -> count+": "+content);
 792 
 793         assertCollect(data, sumAndCount, stream -> {
 794             List<Integer> list = stream.collect(toList());
 795             return Map.entry(list.stream().mapToLong(Integer::intValue).sum(), (long) list.size());
 796         });
 797         assertCollect(data, averaging, stream -> stream.mapToInt(Integer::intValue).average().orElse(Double.NaN));
 798         assertCollect(data, summaryStatistics,
 799                 stream -> stream.mapToInt(Integer::intValue).summaryStatistics().toString());
 800         assertCollect(data, countAndContent, stream -> {
 801             List<Integer> list = stream.collect(toList());
 802             return list.size()+": "+list;
 803         });
 804 
 805         Function<Integer, Integer> classifier = i -> i % 3;
 806         exerciseMapCollection(data, groupingBy(classifier, sumAndCount),
 807                 new GroupingByAssertion<>(classifier, Map.class,
 808                         new PairingAssertion<>(summing, counting, Map::entry)));
 809     }
 810 }