1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package org.openjdk.tests.java.util.stream; 24 25 import java.util.ArrayList; 26 import java.util.Arrays; 27 import java.util.Collection; 28 import java.util.Comparator; 29 import java.util.HashMap; 30 import java.util.HashSet; 31 import java.util.Iterator; 32 import java.util.List; 33 import java.util.Map; 34 import java.util.Optional; 35 import java.util.Set; 36 import java.util.TreeMap; 37 import java.util.concurrent.ConcurrentHashMap; 38 import java.util.concurrent.ConcurrentSkipListMap; 39 import java.util.function.BinaryOperator; 40 import java.util.function.Function; 41 import java.util.function.Predicate; 42 import java.util.function.Supplier; 43 import java.util.stream.Collector; 44 import java.util.stream.Collectors; 45 import java.util.stream.LambdaTestHelpers; 46 import java.util.stream.OpTestCase; 47 import java.util.stream.Stream; 48 import java.util.stream.StreamOpFlagTestHelper; 49 import java.util.stream.StreamTestDataProvider; 50 import java.util.stream.TestData; 51 52 import org.testng.annotations.Test; 53 54 import static java.util.stream.Collectors.groupingBy; 55 import static java.util.stream.Collectors.groupingByConcurrent; 56 import static java.util.stream.Collectors.partitioningBy; 57 import static java.util.stream.Collectors.reducing; 58 import static java.util.stream.Collectors.toCollection; 59 import static java.util.stream.Collectors.toConcurrentMap; 60 import static java.util.stream.Collectors.toList; 61 import static java.util.stream.Collectors.toMap; 62 import static java.util.stream.Collectors.toSet; 63 import static java.util.stream.LambdaTestHelpers.assertContents; 64 import static java.util.stream.LambdaTestHelpers.assertContentsUnordered; 65 import static java.util.stream.LambdaTestHelpers.mDoubler; 66 67 /** 68 * TabulatorsTest 69 * 70 * @author Brian Goetz 71 */ 72 @SuppressWarnings({"rawtypes", "unchecked"}) 73 public class TabulatorsTest extends OpTestCase { 74 75 private static abstract class TabulationAssertion<T, U> { 76 abstract void assertValue(U value, 77 Supplier<Stream<T>> source, 78 boolean ordered) throws ReflectiveOperationException; 79 } 80 81 @SuppressWarnings({"rawtypes", "unchecked"}) 82 static class GroupedMapAssertion<T, K, V, M extends Map<K, ? extends V>> extends TabulationAssertion<T, M> { 83 private final Class<? extends Map> clazz; 84 private final Function<T, K> classifier; 85 private final TabulationAssertion<T,V> downstream; 86 87 protected GroupedMapAssertion(Function<T, K> classifier, 88 Class<? extends Map> clazz, 89 TabulationAssertion<T, V> downstream) { 90 this.clazz = clazz; 91 this.classifier = classifier; 92 this.downstream = downstream; 93 } 94 95 void assertValue(M map, 96 Supplier<Stream<T>> source, 97 boolean ordered) throws ReflectiveOperationException { 98 if (!clazz.isAssignableFrom(map.getClass())) 99 fail(String.format("Class mismatch in GroupedMapAssertion: %s, %s", clazz, map.getClass())); 100 assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet())); 101 for (Map.Entry<K, ? extends V> entry : map.entrySet()) { 102 K key = entry.getKey(); 103 downstream.assertValue(entry.getValue(), 104 () -> source.get().filter(e -> classifier.apply(e).equals(key)), 105 ordered); 106 } 107 } 108 } 109 110 static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends TabulationAssertion<T, M> { 111 private final Class<? extends Map> clazz; 112 private final Function<T, K> keyFn; 113 private final Function<T, V> valueFn; 114 private final BinaryOperator<V> mergeFn; 115 116 ToMapAssertion(Function<T, K> keyFn, 117 Function<T, V> valueFn, 118 BinaryOperator<V> mergeFn, 119 Class<? extends Map> clazz) { 120 this.clazz = clazz; 121 this.keyFn = keyFn; 122 this.valueFn = valueFn; 123 this.mergeFn = mergeFn; 124 } 125 126 @Override 127 void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException { 128 Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet()); 129 assertTrue(clazz.isAssignableFrom(map.getClass())); 130 assertEquals(uniqueKeys, map.keySet()); 131 source.get().forEach(t -> { 132 K key = keyFn.apply(t); 133 V v = source.get() 134 .filter(e -> key.equals(keyFn.apply(e))) 135 .map(valueFn) 136 .reduce(mergeFn) 137 .get(); 138 assertEquals(map.get(key), v); 139 }); 140 } 141 } 142 143 static class PartitionAssertion<T, D> extends TabulationAssertion<T, Map<Boolean,D>> { 144 private final Predicate<T> predicate; 145 private final TabulationAssertion<T,D> downstream; 146 147 protected PartitionAssertion(Predicate<T> predicate, 148 TabulationAssertion<T, D> downstream) { 149 this.predicate = predicate; 150 this.downstream = downstream; 151 } 152 153 void assertValue(Map<Boolean, D> map, 154 Supplier<Stream<T>> source, 155 boolean ordered) throws ReflectiveOperationException { 156 if (!Map.class.isAssignableFrom(map.getClass())) 157 fail(String.format("Class mismatch in PartitionAssertion: %s", map.getClass())); 158 assertEquals(2, map.size()); 159 downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered); 160 downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered); 161 } 162 } 163 164 @SuppressWarnings({"rawtypes", "unchecked"}) 165 static class ListAssertion<T> extends TabulationAssertion<T, List<T>> { 166 @Override 167 void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered) 168 throws ReflectiveOperationException { 169 if (!List.class.isAssignableFrom(value.getClass())) 170 fail(String.format("Class mismatch in ListAssertion: %s", value.getClass())); 171 Stream<T> stream = source.get(); 172 List<T> result = new ArrayList<>(); 173 for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add 174 result.add(it.next()); 175 if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered) 176 assertContents(value, result); 177 else 178 assertContentsUnordered(value, result); 179 } 180 } 181 182 @SuppressWarnings({"rawtypes", "unchecked"}) 183 static class CollectionAssertion<T> extends TabulationAssertion<T, Collection<T>> { 184 private final Class<? extends Collection> clazz; 185 private final boolean targetOrdered; 186 187 protected CollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) { 188 this.clazz = clazz; 189 this.targetOrdered = targetOrdered; 190 } 191 192 @Override 193 void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered) 194 throws ReflectiveOperationException { 195 if (!clazz.isAssignableFrom(value.getClass())) 196 fail(String.format("Class mismatch in CollectionAssertion: %s, %s", clazz, value.getClass())); 197 Stream<T> stream = source.get(); 198 Collection<T> result = clazz.newInstance(); 199 for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add 200 result.add(it.next()); 201 if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered) 202 assertContents(value, result); 203 else 204 assertContentsUnordered(value, result); 205 } 206 } 207 208 static class ReduceAssertion<T, U> extends TabulationAssertion<T, U> { 209 private final U identity; 210 private final Function<T, U> mapper; 211 private final BinaryOperator<U> reducer; 212 213 ReduceAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) { 214 this.identity = identity; 215 this.mapper = mapper; 216 this.reducer = reducer; 217 } 218 219 @Override 220 void assertValue(U value, Supplier<Stream<T>> source, boolean ordered) 221 throws ReflectiveOperationException { 222 Optional<U> reduced = source.get().map(mapper).reduce(reducer); 223 if (value == null) 224 assertTrue(!reduced.isPresent()); 225 else if (!reduced.isPresent()) { 226 assertEquals(value, identity); 227 } 228 else { 229 assertEquals(value, reduced.get()); 230 } 231 } 232 } 233 234 private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) { 235 return (act, exp, ord, par) -> { 236 if (par && (!ordered || !ord)) { 237 TabulatorsTest.nestedMapEqualityAssertion(act, exp); 238 } 239 else { 240 LambdaTestHelpers.assertContentsEqual(act, exp); 241 } 242 }; 243 } 244 245 private<T, M extends Map> 246 void exerciseMapTabulation(TestData<T, Stream<T>> data, 247 Collector<T, ?, ? extends M> collector, 248 TabulationAssertion<T, M> assertion) 249 throws ReflectiveOperationException { 250 boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED); 251 252 M m = withData(data) 253 .terminal(s -> s.collect(collector)) 254 .resultAsserter(mapTabulationAsserter(ordered)) 255 .exercise(); 256 assertion.assertValue(m, () -> data.stream(), ordered); 257 258 m = withData(data) 259 .terminal(s -> s.unordered().collect(collector)) 260 .resultAsserter(mapTabulationAsserter(ordered)) 261 .exercise(); 262 assertion.assertValue(m, () -> data.stream(), false); 263 } 264 265 private static void nestedMapEqualityAssertion(Object o1, Object o2) { 266 if (o1 instanceof Map) { 267 Map m1 = (Map) o1; 268 Map m2 = (Map) o2; 269 assertContentsUnordered(m1.keySet(), m2.keySet()); 270 for (Object k : m1.keySet()) 271 nestedMapEqualityAssertion(m1.get(k), m2.get(k)); 272 } 273 else if (o1 instanceof Collection) { 274 assertContentsUnordered(((Collection) o1), ((Collection) o2)); 275 } 276 else 277 assertEquals(o1, o2); 278 } 279 280 private<T, R> void assertCollect(TestData.OfRef<T> data, 281 Collector<T, ?, R> collector, 282 Function<Stream<T>, R> streamReduction) { 283 R check = streamReduction.apply(data.stream()); 284 withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise(); 285 } 286 287 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 288 public void testReduce(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 289 assertCollect(data, Collectors.reducing(0, Integer::sum), 290 s -> s.reduce(0, Integer::sum)); 291 assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min), 292 s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE)); 293 assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max), 294 s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE)); 295 296 assertCollect(data, Collectors.reducing(Integer::sum), 297 s -> s.reduce(Integer::sum)); 298 assertCollect(data, Collectors.minBy(Comparator.naturalOrder()), 299 s -> s.min(Integer::compare)); 300 assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()), 301 s -> s.max(Integer::compare)); 302 303 assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum), 304 s -> s.map(x -> x*2).reduce(0, Integer::sum)); 305 306 assertCollect(data, Collectors.summingLong(x -> x * 2L), 307 s -> s.map(x -> x*2L).reduce(0L, Long::sum)); 308 assertCollect(data, Collectors.summingInt(x -> x * 2), 309 s -> s.map(x -> x*2).reduce(0, Integer::sum)); 310 assertCollect(data, Collectors.summingDouble(x -> x * 2.0d), 311 s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum)); 312 313 assertCollect(data, Collectors.averagingInt(x -> x * 2), 314 s -> s.mapToInt(x -> x * 2).average().orElse(0)); 315 assertCollect(data, Collectors.averagingLong(x -> x * 2), 316 s -> s.mapToLong(x -> x * 2).average().orElse(0)); 317 assertCollect(data, Collectors.averagingDouble(x -> x * 2), 318 s -> s.mapToDouble(x -> x * 2).average().orElse(0)); 319 320 // Test explicit Collector.of 321 Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2], 322 (a, b) -> { 323 a[0] += b * 2; 324 a[1]++; 325 }, 326 (a, b) -> { 327 a[0] += b[0]; 328 a[1] += b[1]; 329 return a; 330 }, 331 a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]); 332 assertCollect(data, avg2xint, 333 s -> s.mapToInt(x -> x * 2).average().orElse(0)); 334 } 335 336 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 337 public void testJoin(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 338 withData(data) 339 .terminal(s -> s.map(Object::toString).collect(Collectors.joining())) 340 .expectedResult(join(data, "")) 341 .exercise(); 342 343 Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString); 344 withData(data) 345 .terminal(s -> s.map(Object::toString).collect(likeJoining)) 346 .expectedResult(join(data, "")) 347 .exercise(); 348 349 withData(data) 350 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(","))) 351 .expectedResult(join(data, ",")) 352 .exercise(); 353 354 withData(data) 355 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]"))) 356 .expectedResult("[" + join(data, ",") + "]") 357 .exercise(); 358 } 359 360 private<T> String join(TestData.OfRef<T> data, String delim) { 361 StringBuilder sb = new StringBuilder(); 362 boolean first = true; 363 for (T i : data) { 364 if (!first) 365 sb.append(delim); 366 sb.append(i.toString()); 367 first = false; 368 } 369 return sb.toString(); 370 } 371 372 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 373 public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 374 Function<Integer, Integer> keyFn = i -> i * 2; 375 Function<Integer, Integer> valueFn = i -> i * 4; 376 377 List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new)); 378 Set<Integer> dataAsSet = new HashSet<>(dataAsList); 379 380 BinaryOperator<Integer> sum = Integer::sum; 381 for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u, 382 (u, v) -> v, 383 sum)) { 384 try { 385 exerciseMapTabulation(data, toMap(keyFn, valueFn), 386 new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class)); 387 if (dataAsList.size() != dataAsSet.size()) 388 fail("Expected ISE on input with duplicates"); 389 } 390 catch (IllegalStateException e) { 391 if (dataAsList.size() == dataAsSet.size()) 392 fail("Expected no ISE on input without duplicates"); 393 } 394 395 exerciseMapTabulation(data, toMap(keyFn, valueFn, op), 396 new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class)); 397 398 exerciseMapTabulation(data, toMap(keyFn, valueFn, op, TreeMap::new), 399 new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class)); 400 } 401 402 // For concurrent maps, only use commutative merge functions 403 try { 404 exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn), 405 new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class)); 406 if (dataAsList.size() != dataAsSet.size()) 407 fail("Expected ISE on input with duplicates"); 408 } 409 catch (IllegalStateException e) { 410 if (dataAsList.size() == dataAsSet.size()) 411 fail("Expected no ISE on input without duplicates"); 412 } 413 414 exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum), 415 new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class)); 416 417 exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new), 418 new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class)); 419 } 420 421 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 422 public void testSimpleGroupBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 423 Function<Integer, Integer> classifier = i -> i % 3; 424 425 // Single-level groupBy 426 exerciseMapTabulation(data, groupingBy(classifier), 427 new GroupedMapAssertion<>(classifier, HashMap.class, 428 new ListAssertion<>())); 429 exerciseMapTabulation(data, groupingByConcurrent(classifier), 430 new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, 431 new ListAssertion<>())); 432 433 // With explicit constructors 434 exerciseMapTabulation(data, 435 groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)), 436 new GroupedMapAssertion<>(classifier, TreeMap.class, 437 new CollectionAssertion<Integer>(HashSet.class, false))); 438 exerciseMapTabulation(data, 439 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, 440 toCollection(HashSet::new)), 441 new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, 442 new CollectionAssertion<Integer>(HashSet.class, false))); 443 } 444 445 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 446 public void testTwoLevelGroupBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 447 Function<Integer, Integer> classifier = i -> i % 6; 448 Function<Integer, Integer> classifier2 = i -> i % 23; 449 450 // Two-level groupBy 451 exerciseMapTabulation(data, 452 groupingBy(classifier, groupingBy(classifier2)), 453 new GroupedMapAssertion<>(classifier, HashMap.class, 454 new GroupedMapAssertion<>(classifier2, HashMap.class, 455 new ListAssertion<>()))); 456 // with concurrent as upstream 457 exerciseMapTabulation(data, 458 groupingByConcurrent(classifier, groupingBy(classifier2)), 459 new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, 460 new GroupedMapAssertion<>(classifier2, HashMap.class, 461 new ListAssertion<>()))); 462 // with concurrent as downstream 463 exerciseMapTabulation(data, 464 groupingBy(classifier, groupingByConcurrent(classifier2)), 465 new GroupedMapAssertion<>(classifier, HashMap.class, 466 new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class, 467 new ListAssertion<>()))); 468 // with concurrent as upstream and downstream 469 exerciseMapTabulation(data, 470 groupingByConcurrent(classifier, groupingByConcurrent(classifier2)), 471 new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, 472 new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class, 473 new ListAssertion<>()))); 474 475 // With explicit constructors 476 exerciseMapTabulation(data, 477 groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))), 478 new GroupedMapAssertion<>(classifier, TreeMap.class, 479 new GroupedMapAssertion<>(classifier2, TreeMap.class, 480 new CollectionAssertion<Integer>(HashSet.class, false)))); 481 // with concurrent as upstream 482 exerciseMapTabulation(data, 483 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())), 484 new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, 485 new GroupedMapAssertion<>(classifier2, TreeMap.class, 486 new ListAssertion<>()))); 487 // with concurrent as downstream 488 exerciseMapTabulation(data, 489 groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())), 490 new GroupedMapAssertion<>(classifier, TreeMap.class, 491 new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class, 492 new ListAssertion<>()))); 493 // with concurrent as upstream and downstream 494 exerciseMapTabulation(data, 495 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())), 496 new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, 497 new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class, 498 new ListAssertion<>()))); 499 } 500 501 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 502 public void testGroupedReduce(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 503 Function<Integer, Integer> classifier = i -> i % 3; 504 505 // Single-level simple reduce 506 exerciseMapTabulation(data, 507 groupingBy(classifier, reducing(0, Integer::sum)), 508 new GroupedMapAssertion<>(classifier, HashMap.class, 509 new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); 510 // with concurrent 511 exerciseMapTabulation(data, 512 groupingByConcurrent(classifier, reducing(0, Integer::sum)), 513 new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, 514 new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); 515 516 // With explicit constructors 517 exerciseMapTabulation(data, 518 groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)), 519 new GroupedMapAssertion<>(classifier, TreeMap.class, 520 new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); 521 // with concurrent 522 exerciseMapTabulation(data, 523 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)), 524 new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, 525 new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); 526 527 // Single-level map-reduce 528 exerciseMapTabulation(data, 529 groupingBy(classifier, reducing(0, mDoubler, Integer::sum)), 530 new GroupedMapAssertion<>(classifier, HashMap.class, 531 new ReduceAssertion<>(0, mDoubler, Integer::sum))); 532 // with concurrent 533 exerciseMapTabulation(data, 534 groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)), 535 new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class, 536 new ReduceAssertion<>(0, mDoubler, Integer::sum))); 537 538 // With explicit constructors 539 exerciseMapTabulation(data, 540 groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)), 541 new GroupedMapAssertion<>(classifier, TreeMap.class, 542 new ReduceAssertion<>(0, mDoubler, Integer::sum))); 543 // with concurrent 544 exerciseMapTabulation(data, 545 groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)), 546 new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class, 547 new ReduceAssertion<>(0, mDoubler, Integer::sum))); 548 } 549 550 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 551 public void testSimplePartition(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 552 Predicate<Integer> classifier = i -> i % 3 == 0; 553 554 // Single-level partition to downstream List 555 exerciseMapTabulation(data, 556 partitioningBy(classifier), 557 new PartitionAssertion<>(classifier, new ListAssertion<>())); 558 exerciseMapTabulation(data, 559 partitioningBy(classifier, toList()), 560 new PartitionAssertion<>(classifier, new ListAssertion<>())); 561 } 562 563 @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class) 564 public void testTwoLevelPartition(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException { 565 Predicate<Integer> classifier = i -> i % 3 == 0; 566 Predicate<Integer> classifier2 = i -> i % 7 == 0; 567 568 // Two level partition 569 exerciseMapTabulation(data, 570 partitioningBy(classifier, partitioningBy(classifier2)), 571 new PartitionAssertion<>(classifier, 572 new PartitionAssertion(classifier2, new ListAssertion<>()))); 573 574 // Two level partition with reduce 575 exerciseMapTabulation(data, 576 partitioningBy(classifier, reducing(0, Integer::sum)), 577 new PartitionAssertion<>(classifier, 578 new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum))); 579 } 580 }