1 /* 2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.Optional; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.OptionalLong; 32 import java.util.Spliterator; 33 import java.util.concurrent.CountedCompleter; 34 import java.util.function.BiConsumer; 35 import java.util.function.BiFunction; 36 import java.util.function.BinaryOperator; 37 import java.util.function.Consumer; 38 import java.util.function.DoubleBinaryOperator; 39 import java.util.function.DoubleConsumer; 40 import java.util.function.IntBinaryOperator; 41 import java.util.function.IntConsumer; 42 import java.util.function.LongBinaryOperator; 43 import java.util.function.LongConsumer; 44 import java.util.function.ObjDoubleConsumer; 45 import java.util.function.ObjIntConsumer; 46 import java.util.function.ObjLongConsumer; 47 import java.util.function.Supplier; 48 49 /** 50 * Factory for creating instances of {@code TerminalOp} that implement 51 * reductions. 52 * 53 * @since 1.8 54 */ 55 final class ReduceOps { 56 57 private ReduceOps() { } 58 59 /** 60 * Constructs a {@code TerminalOp} that implements a functional reduce on 61 * reference values. 62 * 63 * @param <T> the type of the input elements 64 * @param <U> the type of the result 65 * @param seed the identity element for the reduction 66 * @param reducer the accumulating function that incorporates an additional 67 * input element into the result 68 * @param combiner the combining function that combines two intermediate 69 * results 70 * @return a {@code TerminalOp} implementing the reduction 71 */ 72 public static <T, U> TerminalOp<T, U> 73 makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { 74 Objects.requireNonNull(reducer); 75 Objects.requireNonNull(combiner); 76 class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { 77 @Override 78 public void begin(long size) { 79 state = seed; 80 } 81 82 @Override 83 public void accept(T t) { 84 state = reducer.apply(state, t); 85 } 86 87 @Override 88 public void combine(ReducingSink other) { 89 state = combiner.apply(state, other.state); 90 } 91 } 92 return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) { 93 @Override 94 public ReducingSink makeSink() { 95 return new ReducingSink(); 96 } 97 }; 98 } 99 100 /** 101 * Constructs a {@code TerminalOp} that implements a functional reduce on 102 * reference values producing an optional reference result. 103 * 104 * @param <T> The type of the input elements, and the type of the result 105 * @param operator The reducing function 106 * @return A {@code TerminalOp} implementing the reduction 107 */ 108 public static <T> TerminalOp<T, Optional<T>> 109 makeRef(BinaryOperator<T> operator) { 110 Objects.requireNonNull(operator); 111 class ReducingSink 112 implements AccumulatingSink<T, Optional<T>, ReducingSink> { 113 private boolean empty; 114 private T state; 115 116 public void begin(long size) { 117 empty = true; 118 state = null; 119 } 120 121 @Override 122 public void accept(T t) { 123 if (empty) { 124 empty = false; 125 state = t; 126 } else { 127 state = operator.apply(state, t); 128 } 129 } 130 131 @Override 132 public Optional<T> get() { 133 return empty ? Optional.empty() : Optional.of(state); 134 } 135 136 @Override 137 public void combine(ReducingSink other) { 138 if (!other.empty) 139 accept(other.state); 140 } 141 } 142 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) { 143 @Override 144 public ReducingSink makeSink() { 145 return new ReducingSink(); 146 } 147 }; 148 } 149 150 /** 151 * Constructs a {@code TerminalOp} that implements a mutable reduce on 152 * reference values. 153 * 154 * @param <T> the type of the input elements 155 * @param <I> the type of the intermediate reduction result 156 * @param collector a {@code Collector} defining the reduction 157 * @return a {@code ReduceOp} implementing the reduction 158 */ 159 public static <T, I> TerminalOp<T, I> 160 makeRef(Collector<? super T, I, ?> collector) { 161 Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); 162 BiConsumer<I, ? super T> accumulator = collector.accumulator(); 163 BinaryOperator<I> combiner = collector.combiner(); 164 class ReducingSink extends Box<I> 165 implements AccumulatingSink<T, I, ReducingSink> { 166 @Override 167 public void begin(long size) { 168 state = supplier.get(); 169 } 170 171 @Override 172 public void accept(T t) { 173 accumulator.accept(state, t); 174 } 175 176 @Override 177 public void combine(ReducingSink other) { 178 state = combiner.apply(state, other.state); 179 } 180 } 181 return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { 182 @Override 183 public ReducingSink makeSink() { 184 return new ReducingSink(); 185 } 186 187 @Override 188 public int getOpFlags() { 189 return collector.characteristics().contains(Collector.Characteristics.UNORDERED) 190 ? StreamOpFlag.NOT_ORDERED 191 : 0; 192 } 193 }; 194 } 195 196 /** 197 * Constructs a {@code TerminalOp} that implements a mutable reduce on 198 * reference values. 199 * 200 * @param <T> the type of the input elements 201 * @param <R> the type of the result 202 * @param seedFactory a factory to produce a new base accumulator 203 * @param accumulator a function to incorporate an element into an 204 * accumulator 205 * @param reducer a function to combine an accumulator into another 206 * @return a {@code TerminalOp} implementing the reduction 207 */ 208 public static <T, R> TerminalOp<T, R> 209 makeRef(Supplier<R> seedFactory, 210 BiConsumer<R, ? super T> accumulator, 211 BiConsumer<R,R> reducer) { 212 Objects.requireNonNull(seedFactory); 213 Objects.requireNonNull(accumulator); 214 Objects.requireNonNull(reducer); 215 class ReducingSink extends Box<R> 216 implements AccumulatingSink<T, R, ReducingSink> { 217 @Override 218 public void begin(long size) { 219 state = seedFactory.get(); 220 } 221 222 @Override 223 public void accept(T t) { 224 accumulator.accept(state, t); 225 } 226 227 @Override 228 public void combine(ReducingSink other) { 229 reducer.accept(state, other.state); 230 } 231 } 232 return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) { 233 @Override 234 public ReducingSink makeSink() { 235 return new ReducingSink(); 236 } 237 }; 238 } 239 240 /** 241 * Constructs a {@code TerminalOp} that counts the number of stream 242 * elements. If the size of the pipeline is known then count is the size 243 * and there is no need to evaluate the pipeline. If the size of the 244 * pipeline is non known then count is produced, via reduction, using a 245 * {@link CountingSink}. 246 * 247 * @param <T> the type of the input elements 248 * @return a {@code TerminalOp} implementing the counting 249 */ 250 public static <T> TerminalOp<T, Long> 251 makeRefCounting() { 252 return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) { 253 @Override 254 public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); } 255 256 @Override 257 public <P_IN> Long evaluateSequential(PipelineHelper<T> helper, 258 Spliterator<P_IN> spliterator) { 259 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 260 return spliterator.getExactSizeIfKnown(); 261 return super.evaluateSequential(helper, spliterator); 262 } 263 264 @Override 265 public <P_IN> Long evaluateParallel(PipelineHelper<T> helper, 266 Spliterator<P_IN> spliterator) { 267 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 268 return spliterator.getExactSizeIfKnown(); 269 return super.evaluateParallel(helper, spliterator); 270 } 271 272 @Override 273 public int getOpFlags() { 274 return StreamOpFlag.NOT_ORDERED; 275 } 276 }; 277 } 278 279 /** 280 * Constructs a {@code TerminalOp} that implements a functional reduce on 281 * {@code int} values. 282 * 283 * @param identity the identity for the combining function 284 * @param operator the combining function 285 * @return a {@code TerminalOp} implementing the reduction 286 */ 287 public static TerminalOp<Integer, Integer> 288 makeInt(int identity, IntBinaryOperator operator) { 289 Objects.requireNonNull(operator); 290 class ReducingSink 291 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { 292 private int state; 293 294 @Override 295 public void begin(long size) { 296 state = identity; 297 } 298 299 @Override 300 public void accept(int t) { 301 state = operator.applyAsInt(state, t); 302 } 303 304 @Override 305 public Integer get() { 306 return state; 307 } 308 309 @Override 310 public void combine(ReducingSink other) { 311 accept(other.state); 312 } 313 } 314 return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) { 315 @Override 316 public ReducingSink makeSink() { 317 return new ReducingSink(); 318 } 319 }; 320 } 321 322 /** 323 * Constructs a {@code TerminalOp} that implements a functional reduce on 324 * {@code int} values, producing an optional integer result. 325 * 326 * @param operator the combining function 327 * @return a {@code TerminalOp} implementing the reduction 328 */ 329 public static TerminalOp<Integer, OptionalInt> 330 makeInt(IntBinaryOperator operator) { 331 Objects.requireNonNull(operator); 332 class ReducingSink 333 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { 334 private boolean empty; 335 private int state; 336 337 public void begin(long size) { 338 empty = true; 339 state = 0; 340 } 341 342 @Override 343 public void accept(int t) { 344 if (empty) { 345 empty = false; 346 state = t; 347 } 348 else { 349 state = operator.applyAsInt(state, t); 350 } 351 } 352 353 @Override 354 public OptionalInt get() { 355 return empty ? OptionalInt.empty() : OptionalInt.of(state); 356 } 357 358 @Override 359 public void combine(ReducingSink other) { 360 if (!other.empty) 361 accept(other.state); 362 } 363 } 364 return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) { 365 @Override 366 public ReducingSink makeSink() { 367 return new ReducingSink(); 368 } 369 }; 370 } 371 372 /** 373 * Constructs a {@code TerminalOp} that implements a mutable reduce on 374 * {@code int} values. 375 * 376 * @param <R> The type of the result 377 * @param supplier a factory to produce a new accumulator of the result type 378 * @param accumulator a function to incorporate an int into an 379 * accumulator 380 * @param combiner a function to combine an accumulator into another 381 * @return A {@code ReduceOp} implementing the reduction 382 */ 383 public static <R> TerminalOp<Integer, R> 384 makeInt(Supplier<R> supplier, 385 ObjIntConsumer<R> accumulator, 386 BinaryOperator<R> combiner) { 387 Objects.requireNonNull(supplier); 388 Objects.requireNonNull(accumulator); 389 Objects.requireNonNull(combiner); 390 class ReducingSink extends Box<R> 391 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { 392 @Override 393 public void begin(long size) { 394 state = supplier.get(); 395 } 396 397 @Override 398 public void accept(int t) { 399 accumulator.accept(state, t); 400 } 401 402 @Override 403 public void combine(ReducingSink other) { 404 state = combiner.apply(state, other.state); 405 } 406 } 407 return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) { 408 @Override 409 public ReducingSink makeSink() { 410 return new ReducingSink(); 411 } 412 }; 413 } 414 415 /** 416 * Constructs a {@code TerminalOp} that counts the number of stream 417 * elements. If the size of the pipeline is known then count is the size 418 * and there is no need to evaluate the pipeline. If the size of the 419 * pipeline is non known then count is produced, via reduction, using a 420 * {@link CountingSink}. 421 * 422 * @return a {@code TerminalOp} implementing the counting 423 */ 424 public static TerminalOp<Integer, Long> 425 makeIntCounting() { 426 return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) { 427 @Override 428 public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); } 429 430 @Override 431 public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper, 432 Spliterator<P_IN> spliterator) { 433 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 434 return spliterator.getExactSizeIfKnown(); 435 return super.evaluateSequential(helper, spliterator); 436 } 437 438 @Override 439 public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper, 440 Spliterator<P_IN> spliterator) { 441 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 442 return spliterator.getExactSizeIfKnown(); 443 return super.evaluateParallel(helper, spliterator); 444 } 445 446 @Override 447 public int getOpFlags() { 448 return StreamOpFlag.NOT_ORDERED; 449 } 450 }; 451 } 452 453 /** 454 * Constructs a {@code TerminalOp} that implements a functional reduce on 455 * {@code long} values. 456 * 457 * @param identity the identity for the combining function 458 * @param operator the combining function 459 * @return a {@code TerminalOp} implementing the reduction 460 */ 461 public static TerminalOp<Long, Long> 462 makeLong(long identity, LongBinaryOperator operator) { 463 Objects.requireNonNull(operator); 464 class ReducingSink 465 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { 466 private long state; 467 468 @Override 469 public void begin(long size) { 470 state = identity; 471 } 472 473 @Override 474 public void accept(long t) { 475 state = operator.applyAsLong(state, t); 476 } 477 478 @Override 479 public Long get() { 480 return state; 481 } 482 483 @Override 484 public void combine(ReducingSink other) { 485 accept(other.state); 486 } 487 } 488 return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) { 489 @Override 490 public ReducingSink makeSink() { 491 return new ReducingSink(); 492 } 493 }; 494 } 495 496 /** 497 * Constructs a {@code TerminalOp} that implements a functional reduce on 498 * {@code long} values, producing an optional long result. 499 * 500 * @param operator the combining function 501 * @return a {@code TerminalOp} implementing the reduction 502 */ 503 public static TerminalOp<Long, OptionalLong> 504 makeLong(LongBinaryOperator operator) { 505 Objects.requireNonNull(operator); 506 class ReducingSink 507 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { 508 private boolean empty; 509 private long state; 510 511 public void begin(long size) { 512 empty = true; 513 state = 0; 514 } 515 516 @Override 517 public void accept(long t) { 518 if (empty) { 519 empty = false; 520 state = t; 521 } 522 else { 523 state = operator.applyAsLong(state, t); 524 } 525 } 526 527 @Override 528 public OptionalLong get() { 529 return empty ? OptionalLong.empty() : OptionalLong.of(state); 530 } 531 532 @Override 533 public void combine(ReducingSink other) { 534 if (!other.empty) 535 accept(other.state); 536 } 537 } 538 return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) { 539 @Override 540 public ReducingSink makeSink() { 541 return new ReducingSink(); 542 } 543 }; 544 } 545 546 /** 547 * Constructs a {@code TerminalOp} that implements a mutable reduce on 548 * {@code long} values. 549 * 550 * @param <R> the type of the result 551 * @param supplier a factory to produce a new accumulator of the result type 552 * @param accumulator a function to incorporate an int into an 553 * accumulator 554 * @param combiner a function to combine an accumulator into another 555 * @return a {@code TerminalOp} implementing the reduction 556 */ 557 public static <R> TerminalOp<Long, R> 558 makeLong(Supplier<R> supplier, 559 ObjLongConsumer<R> accumulator, 560 BinaryOperator<R> combiner) { 561 Objects.requireNonNull(supplier); 562 Objects.requireNonNull(accumulator); 563 Objects.requireNonNull(combiner); 564 class ReducingSink extends Box<R> 565 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { 566 @Override 567 public void begin(long size) { 568 state = supplier.get(); 569 } 570 571 @Override 572 public void accept(long t) { 573 accumulator.accept(state, t); 574 } 575 576 @Override 577 public void combine(ReducingSink other) { 578 state = combiner.apply(state, other.state); 579 } 580 } 581 return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) { 582 @Override 583 public ReducingSink makeSink() { 584 return new ReducingSink(); 585 } 586 }; 587 } 588 589 /** 590 * Constructs a {@code TerminalOp} that counts the number of stream 591 * elements. If the size of the pipeline is known then count is the size 592 * and there is no need to evaluate the pipeline. If the size of the 593 * pipeline is non known then count is produced, via reduction, using a 594 * {@link CountingSink}. 595 * 596 * @return a {@code TerminalOp} implementing the counting 597 */ 598 public static TerminalOp<Long, Long> 599 makeLongCounting() { 600 return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) { 601 @Override 602 public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); } 603 604 @Override 605 public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper, 606 Spliterator<P_IN> spliterator) { 607 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 608 return spliterator.getExactSizeIfKnown(); 609 return super.evaluateSequential(helper, spliterator); 610 } 611 612 @Override 613 public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper, 614 Spliterator<P_IN> spliterator) { 615 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 616 return spliterator.getExactSizeIfKnown(); 617 return super.evaluateParallel(helper, spliterator); 618 } 619 620 @Override 621 public int getOpFlags() { 622 return StreamOpFlag.NOT_ORDERED; 623 } 624 }; 625 } 626 627 /** 628 * Constructs a {@code TerminalOp} that implements a functional reduce on 629 * {@code double} values. 630 * 631 * @param identity the identity for the combining function 632 * @param operator the combining function 633 * @return a {@code TerminalOp} implementing the reduction 634 */ 635 public static TerminalOp<Double, Double> 636 makeDouble(double identity, DoubleBinaryOperator operator) { 637 Objects.requireNonNull(operator); 638 class ReducingSink 639 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { 640 private double state; 641 642 @Override 643 public void begin(long size) { 644 state = identity; 645 } 646 647 @Override 648 public void accept(double t) { 649 state = operator.applyAsDouble(state, t); 650 } 651 652 @Override 653 public Double get() { 654 return state; 655 } 656 657 @Override 658 public void combine(ReducingSink other) { 659 accept(other.state); 660 } 661 } 662 return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) { 663 @Override 664 public ReducingSink makeSink() { 665 return new ReducingSink(); 666 } 667 }; 668 } 669 670 /** 671 * Constructs a {@code TerminalOp} that implements a functional reduce on 672 * {@code double} values, producing an optional double result. 673 * 674 * @param operator the combining function 675 * @return a {@code TerminalOp} implementing the reduction 676 */ 677 public static TerminalOp<Double, OptionalDouble> 678 makeDouble(DoubleBinaryOperator operator) { 679 Objects.requireNonNull(operator); 680 class ReducingSink 681 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { 682 private boolean empty; 683 private double state; 684 685 public void begin(long size) { 686 empty = true; 687 state = 0; 688 } 689 690 @Override 691 public void accept(double t) { 692 if (empty) { 693 empty = false; 694 state = t; 695 } 696 else { 697 state = operator.applyAsDouble(state, t); 698 } 699 } 700 701 @Override 702 public OptionalDouble get() { 703 return empty ? OptionalDouble.empty() : OptionalDouble.of(state); 704 } 705 706 @Override 707 public void combine(ReducingSink other) { 708 if (!other.empty) 709 accept(other.state); 710 } 711 } 712 return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) { 713 @Override 714 public ReducingSink makeSink() { 715 return new ReducingSink(); 716 } 717 }; 718 } 719 720 /** 721 * Constructs a {@code TerminalOp} that implements a mutable reduce on 722 * {@code double} values. 723 * 724 * @param <R> the type of the result 725 * @param supplier a factory to produce a new accumulator of the result type 726 * @param accumulator a function to incorporate an int into an 727 * accumulator 728 * @param combiner a function to combine an accumulator into another 729 * @return a {@code TerminalOp} implementing the reduction 730 */ 731 public static <R> TerminalOp<Double, R> 732 makeDouble(Supplier<R> supplier, 733 ObjDoubleConsumer<R> accumulator, 734 BinaryOperator<R> combiner) { 735 Objects.requireNonNull(supplier); 736 Objects.requireNonNull(accumulator); 737 Objects.requireNonNull(combiner); 738 class ReducingSink extends Box<R> 739 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { 740 @Override 741 public void begin(long size) { 742 state = supplier.get(); 743 } 744 745 @Override 746 public void accept(double t) { 747 accumulator.accept(state, t); 748 } 749 750 @Override 751 public void combine(ReducingSink other) { 752 state = combiner.apply(state, other.state); 753 } 754 } 755 return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) { 756 @Override 757 public ReducingSink makeSink() { 758 return new ReducingSink(); 759 } 760 }; 761 } 762 763 /** 764 * Constructs a {@code TerminalOp} that counts the number of stream 765 * elements. If the size of the pipeline is known then count is the size 766 * and there is no need to evaluate the pipeline. If the size of the 767 * pipeline is non known then count is produced, via reduction, using a 768 * {@link CountingSink}. 769 * 770 * @return a {@code TerminalOp} implementing the counting 771 */ 772 public static TerminalOp<Double, Long> 773 makeDoubleCounting() { 774 return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) { 775 @Override 776 public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); } 777 778 @Override 779 public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper, 780 Spliterator<P_IN> spliterator) { 781 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 782 return spliterator.getExactSizeIfKnown(); 783 return super.evaluateSequential(helper, spliterator); 784 } 785 786 @Override 787 public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper, 788 Spliterator<P_IN> spliterator) { 789 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags())) 790 return spliterator.getExactSizeIfKnown(); 791 return super.evaluateParallel(helper, spliterator); 792 } 793 794 @Override 795 public int getOpFlags() { 796 return StreamOpFlag.NOT_ORDERED; 797 } 798 }; 799 } 800 801 /** 802 * A sink that counts elements 803 */ 804 abstract static class CountingSink<T> 805 extends Box<Long> 806 implements AccumulatingSink<T, Long, CountingSink<T>> { 807 long count; 808 809 @Override 810 public void begin(long size) { 811 count = 0L; 812 } 813 814 @Override 815 public Long get() { 816 return count; 817 } 818 819 @Override 820 public void combine(CountingSink<T> other) { 821 count += other.count; 822 } 823 824 static final class OfRef<T> extends CountingSink<T> { 825 @Override 826 public void accept(T t) { 827 count++; 828 } 829 } 830 831 static final class OfInt extends CountingSink<Integer> implements Sink.OfInt { 832 @Override 833 public void accept(int t) { 834 count++; 835 } 836 } 837 838 static final class OfLong extends CountingSink<Long> implements Sink.OfLong { 839 @Override 840 public void accept(long t) { 841 count++; 842 } 843 } 844 845 static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble { 846 @Override 847 public void accept(double t) { 848 count++; 849 } 850 } 851 } 852 853 /** 854 * A type of {@code TerminalSink} that implements an associative reducing 855 * operation on elements of type {@code T} and producing a result of type 856 * {@code R}. 857 * 858 * @param <T> the type of input element to the combining operation 859 * @param <R> the result type 860 * @param <K> the type of the {@code AccumulatingSink}. 861 */ 862 private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> 863 extends TerminalSink<T, R> { 864 void combine(K other); 865 } 866 867 /** 868 * State box for a single state element, used as a base class for 869 * {@code AccumulatingSink} instances 870 * 871 * @param <U> The type of the state element 872 */ 873 private abstract static class Box<U> { 874 U state; 875 876 Box() {} // Avoid creation of special accessor 877 878 public U get() { 879 return state; 880 } 881 } 882 883 /** 884 * A {@code TerminalOp} that evaluates a stream pipeline and sends the 885 * output into an {@code AccumulatingSink}, which performs a reduce 886 * operation. The {@code AccumulatingSink} must represent an associative 887 * reducing operation. 888 * 889 * @param <T> the output type of the stream pipeline 890 * @param <R> the result type of the reducing operation 891 * @param <S> the type of the {@code AccumulatingSink} 892 */ 893 private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> 894 implements TerminalOp<T, R> { 895 private final StreamShape inputShape; 896 897 /** 898 * Create a {@code ReduceOp} of the specified stream shape which uses 899 * the specified {@code Supplier} to create accumulating sinks. 900 * 901 * @param shape The shape of the stream pipeline 902 */ 903 ReduceOp(StreamShape shape) { 904 inputShape = shape; 905 } 906 907 public abstract S makeSink(); 908 909 @Override 910 public StreamShape inputShape() { 911 return inputShape; 912 } 913 914 @Override 915 public <P_IN> R evaluateSequential(PipelineHelper<T> helper, 916 Spliterator<P_IN> spliterator) { 917 return helper.wrapAndCopyInto(makeSink(), spliterator).get(); 918 } 919 920 @Override 921 public <P_IN> R evaluateParallel(PipelineHelper<T> helper, 922 Spliterator<P_IN> spliterator) { 923 return new ReduceTask<>(this, helper, spliterator).invoke().get(); 924 } 925 } 926 927 /** 928 * A {@code ForkJoinTask} for performing a parallel reduce operation. 929 */ 930 @SuppressWarnings("serial") 931 private static final class ReduceTask<P_IN, P_OUT, R, 932 S extends AccumulatingSink<P_OUT, R, S>> 933 extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { 934 private final ReduceOp<P_OUT, R, S> op; 935 936 ReduceTask(ReduceOp<P_OUT, R, S> op, 937 PipelineHelper<P_OUT> helper, 938 Spliterator<P_IN> spliterator) { 939 super(helper, spliterator); 940 this.op = op; 941 } 942 943 ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, 944 Spliterator<P_IN> spliterator) { 945 super(parent, spliterator); 946 this.op = parent.op; 947 } 948 949 @Override 950 protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { 951 return new ReduceTask<>(this, spliterator); 952 } 953 954 @Override 955 protected S doLeaf() { 956 return helper.wrapAndCopyInto(op.makeSink(), spliterator); 957 } 958 959 @Override 960 public void onCompletion(CountedCompleter<?> caller) { 961 if (!isLeaf()) { 962 S leftResult = leftChild.getLocalResult(); 963 leftResult.combine(rightChild.getLocalResult()); 964 setLocalResult(leftResult); 965 } 966 // GC spliterator, left and right child 967 super.onCompletion(caller); 968 } 969 } 970 971 static final class FoldLeftWithSeedOp<U, T> extends Box<U> implements Consumer<T> { 972 private final BiFunction<U, ? super T, U> op; 973 974 FoldLeftWithSeedOp(U seed, BiFunction<U, ? super T, U> op) { 975 this.state = seed; 976 this.op = op; 977 } 978 979 @Override 980 public void accept(T t) { 981 state = op.apply(state, t); 982 } 983 } 984 985 static final class FoldLeftOp<T> implements Consumer<T> { 986 private final BinaryOperator<T> op; 987 private T state; 988 private boolean empty = true; 989 990 FoldLeftOp(BinaryOperator<T> op) { 991 this.op = op; 992 } 993 994 @Override 995 public void accept(T t) { 996 if (empty) { 997 state = t; 998 empty = false; 999 } else { 1000 state = op.apply(state, t); 1001 } 1002 } 1003 1004 Optional<T> get() { 1005 return empty ? Optional.empty() : Optional.of(state); 1006 } 1007 } 1008 1009 static final class IntFoldLeftOp implements IntConsumer { 1010 private final IntBinaryOperator op; 1011 private int state; 1012 private boolean empty = true; 1013 1014 IntFoldLeftOp(IntBinaryOperator op) { 1015 this.op = op; 1016 } 1017 1018 @Override 1019 public void accept(int t) { 1020 if (empty) { 1021 state = t; 1022 empty = false; 1023 } else { 1024 state = op.applyAsInt(state, t); 1025 } 1026 } 1027 1028 OptionalInt get() { 1029 return empty ? OptionalInt.empty() : OptionalInt.of(state); 1030 } 1031 1032 int getAsInt() { 1033 if (empty) { 1034 throw new IllegalStateException(); 1035 } 1036 return state; 1037 } 1038 } 1039 1040 static final class LongFoldLeftOp implements LongConsumer { 1041 private final LongBinaryOperator op; 1042 private long state; 1043 private boolean empty = true; 1044 1045 LongFoldLeftOp(LongBinaryOperator op) { 1046 this.op = op; 1047 } 1048 1049 @Override 1050 public void accept(long t) { 1051 if (empty) { 1052 state = t; 1053 empty = false; 1054 } else { 1055 state = op.applyAsLong(state, t); 1056 } 1057 } 1058 1059 OptionalLong get() { 1060 return empty ? OptionalLong.empty() : OptionalLong.of(state); 1061 } 1062 1063 long getAsLong() { 1064 if (empty) { 1065 throw new IllegalStateException(); 1066 } 1067 return state; 1068 } 1069 } 1070 1071 static final class DoubleFoldLeftOp implements DoubleConsumer { 1072 private final DoubleBinaryOperator op; 1073 private double state; 1074 private boolean empty = true; 1075 1076 DoubleFoldLeftOp(DoubleBinaryOperator op) { 1077 this.op = op; 1078 } 1079 1080 @Override 1081 public void accept(double t) { 1082 if (empty) { 1083 state = t; 1084 empty = false; 1085 } else { 1086 state = op.applyAsDouble(state, t); 1087 } 1088 } 1089 1090 OptionalDouble get() { 1091 return empty ? OptionalDouble.empty() : OptionalDouble.of(state); 1092 } 1093 1094 double getAsDouble() { 1095 if (empty) { 1096 throw new IllegalStateException(); 1097 } 1098 return state; 1099 } 1100 } 1101 }