1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.Optional; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.OptionalLong; 32 import java.util.Spliterator; 33 import java.util.concurrent.CountedCompleter; 34 import java.util.function.BiConsumer; 35 import java.util.function.BiFunction; 36 import java.util.function.BinaryOperator; 37 import java.util.function.DoubleBinaryOperator; 38 import java.util.function.IntBinaryOperator; 39 import java.util.function.LongBinaryOperator; 40 import java.util.function.ObjDoubleConsumer; 41 import java.util.function.ObjIntConsumer; 42 import java.util.function.ObjLongConsumer; 43 import java.util.function.Supplier; 44 45 /** 46 * Factory for the creating instances of {@code TerminalOp) that implement 47 * reductions. 48 * 49 * @since 1.8 50 */ 51 final class ReduceOps { 52 53 private ReduceOps() { } 54 55 /** 56 * Constructs a {@code TerminalOp} that implements a functional reduce on 57 * reference values. 58 * 59 * @param <T> the type of the input elements 60 * @param <U> the type of the result 61 * @param seed the identity element for the reduction 62 * @param reducer the accumulating function that incorporates an additional 63 * input element into the result 64 * @param combiner the combining function that combines two intermediate 65 * results 66 * @return a {@code TerminalOp} implementing the reduction 67 */ 68 public static <T, U> TerminalOp<T, U> 69 makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { 70 Objects.requireNonNull(reducer); 71 Objects.requireNonNull(combiner); 72 class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { 73 @Override 74 public void begin(long size) { 75 state = seed; 76 } 77 78 @Override 79 public void accept(T t) { 80 state = reducer.apply(state, t); 81 } 82 83 @Override 84 public void combine(ReducingSink other) { 85 state = combiner.apply(state, other.state); 86 } 87 } 88 return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) { 89 @Override 90 public ReducingSink makeSink() { 91 return new ReducingSink(); 92 } 93 }; 94 } 95 96 /** 97 * Constructs a {@code TerminalOp} that implements a functional reduce on 98 * reference values producing an optional reference result. 99 * 100 * @param <T> The type of the input elements, and the type of the result 101 * @param operator The reducing function 102 * @return A {@code TerminalOp} implementing the reduction 103 */ 104 public static <T> TerminalOp<T, Optional<T>> 105 makeRef(BinaryOperator<T> operator) { 106 Objects.requireNonNull(operator); 107 class ReducingSink 108 implements AccumulatingSink<T, Optional<T>, ReducingSink> { 109 private boolean empty; 110 private T state; 111 112 public void begin(long size) { 113 empty = true; 114 state = null; 115 } 116 117 @Override 118 public void accept(T t) { 119 if (empty) { 120 empty = false; 121 state = t; 122 } else { 123 state = operator.apply(state, t); 124 } 125 } 126 127 @Override 128 public Optional<T> get() { 129 return empty ? Optional.empty() : Optional.of(state); 130 } 131 132 @Override 133 public void combine(ReducingSink other) { 134 if (!other.empty) 135 accept(other.state); 136 } 137 } 138 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) { 139 @Override 140 public ReducingSink makeSink() { 141 return new ReducingSink(); 142 } 143 }; 144 } 145 146 /** 147 * Constructs a {@code TerminalOp} that implements a mutable reduce on 148 * reference values. 149 * 150 * @param <T> the type of the input elements 151 * @param <I> the type of the intermediate reduction result 152 * @param <I> the type of the final reduction result 153 * @param collector a {@code Collector} defining the reduction 154 * @return a {@code ReduceOp} implementing the reduction 155 */ 156 public static <T, I, R> TerminalOp<T, I> 157 makeRef(Collector<? super T, I, R> collector) { 158 Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); 159 BiConsumer<I, ? super T> accumulator = collector.accumulator(); 160 BinaryOperator<I> combiner = collector.combiner(); 161 class ReducingSink extends Box<I> 162 implements AccumulatingSink<T, I, ReducingSink> { 163 @Override 164 public void begin(long size) { 165 state = supplier.get(); 166 } 167 168 @Override 169 public void accept(T t) { 170 accumulator.accept(state, t); 171 } 172 173 @Override 174 public void combine(ReducingSink other) { 175 state = combiner.apply(state, other.state); 176 } 177 } 178 return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { 179 @Override 180 public ReducingSink makeSink() { 181 return new ReducingSink(); 182 } 183 184 @Override 185 public int getOpFlags() { 186 return collector.characteristics().contains(Collector.Characteristics.UNORDERED) 187 ? StreamOpFlag.NOT_ORDERED 188 : 0; 189 } 190 }; 191 } 192 193 /** 194 * Constructs a {@code TerminalOp} that implements a mutable reduce on 195 * reference values. 196 * 197 * @param <T> the type of the input elements 198 * @param <R> the type of the result 199 * @param seedFactory a factory to produce a new base accumulator 200 * @param accumulator a function to incorporate an element into an 201 * accumulator 202 * @param reducer a function to combine an accumulator into another 203 * @return a {@code TerminalOp} implementing the reduction 204 */ 205 public static <T, R> TerminalOp<T, R> 206 makeRef(Supplier<R> seedFactory, 207 BiConsumer<R, ? super T> accumulator, 208 BiConsumer<R,R> reducer) { 209 Objects.requireNonNull(seedFactory); 210 Objects.requireNonNull(accumulator); 211 Objects.requireNonNull(reducer); 212 class ReducingSink extends Box<R> 213 implements AccumulatingSink<T, R, ReducingSink> { 214 @Override 215 public void begin(long size) { 216 state = seedFactory.get(); 217 } 218 219 @Override 220 public void accept(T t) { 221 accumulator.accept(state, t); 222 } 223 224 @Override 225 public void combine(ReducingSink other) { 226 reducer.accept(state, other.state); 227 } 228 } 229 return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) { 230 @Override 231 public ReducingSink makeSink() { 232 return new ReducingSink(); 233 } 234 }; 235 } 236 237 /** 238 * Constructs a {@code TerminalOp} that implements a functional reduce on 239 * {@code int} values. 240 * 241 * @param identity the identity for the combining function 242 * @param operator the combining function 243 * @return a {@code TerminalOp} implementing the reduction 244 */ 245 public static TerminalOp<Integer, Integer> 246 makeInt(int identity, IntBinaryOperator operator) { 247 Objects.requireNonNull(operator); 248 class ReducingSink 249 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { 250 private int state; 251 252 @Override 253 public void begin(long size) { 254 state = identity; 255 } 256 257 @Override 258 public void accept(int t) { 259 state = operator.applyAsInt(state, t); 260 } 261 262 @Override 263 public Integer get() { 264 return state; 265 } 266 267 @Override 268 public void combine(ReducingSink other) { 269 accept(other.state); 270 } 271 } 272 return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) { 273 @Override 274 public ReducingSink makeSink() { 275 return new ReducingSink(); 276 } 277 }; 278 } 279 280 /** 281 * Constructs a {@code TerminalOp} that implements a functional reduce on 282 * {@code int} values, producing an optional integer result. 283 * 284 * @param operator the combining function 285 * @return a {@code TerminalOp} implementing the reduction 286 */ 287 public static TerminalOp<Integer, OptionalInt> 288 makeInt(IntBinaryOperator operator) { 289 Objects.requireNonNull(operator); 290 class ReducingSink 291 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { 292 private boolean empty; 293 private int state; 294 295 public void begin(long size) { 296 empty = true; 297 state = 0; 298 } 299 300 @Override 301 public void accept(int t) { 302 if (empty) { 303 empty = false; 304 state = t; 305 } 306 else { 307 state = operator.applyAsInt(state, t); 308 } 309 } 310 311 @Override 312 public OptionalInt get() { 313 return empty ? OptionalInt.empty() : OptionalInt.of(state); 314 } 315 316 @Override 317 public void combine(ReducingSink other) { 318 if (!other.empty) 319 accept(other.state); 320 } 321 } 322 return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) { 323 @Override 324 public ReducingSink makeSink() { 325 return new ReducingSink(); 326 } 327 }; 328 } 329 330 /** 331 * Constructs a {@code TerminalOp} that implements a mutable reduce on 332 * {@code int} values. 333 * 334 * @param <R> The type of the result 335 * @param supplier a factory to produce a new accumulator of the result type 336 * @param accumulator a function to incorporate an int into an 337 * accumulator 338 * @param combiner a function to combine an accumulator into another 339 * @return A {@code ReduceOp} implementing the reduction 340 */ 341 public static <R> TerminalOp<Integer, R> 342 makeInt(Supplier<R> supplier, 343 ObjIntConsumer<R> accumulator, 344 BinaryOperator<R> combiner) { 345 Objects.requireNonNull(supplier); 346 Objects.requireNonNull(accumulator); 347 Objects.requireNonNull(combiner); 348 class ReducingSink extends Box<R> 349 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { 350 @Override 351 public void begin(long size) { 352 state = supplier.get(); 353 } 354 355 @Override 356 public void accept(int t) { 357 accumulator.accept(state, t); 358 } 359 360 @Override 361 public void combine(ReducingSink other) { 362 state = combiner.apply(state, other.state); 363 } 364 } 365 return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) { 366 @Override 367 public ReducingSink makeSink() { 368 return new ReducingSink(); 369 } 370 }; 371 } 372 373 /** 374 * Constructs a {@code TerminalOp} that implements a functional reduce on 375 * {@code long} values. 376 * 377 * @param identity the identity for the combining function 378 * @param operator the combining function 379 * @return a {@code TerminalOp} implementing the reduction 380 */ 381 public static TerminalOp<Long, Long> 382 makeLong(long identity, LongBinaryOperator operator) { 383 Objects.requireNonNull(operator); 384 class ReducingSink 385 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { 386 private long state; 387 388 @Override 389 public void begin(long size) { 390 state = identity; 391 } 392 393 @Override 394 public void accept(long t) { 395 state = operator.applyAsLong(state, t); 396 } 397 398 @Override 399 public Long get() { 400 return state; 401 } 402 403 @Override 404 public void combine(ReducingSink other) { 405 accept(other.state); 406 } 407 } 408 return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) { 409 @Override 410 public ReducingSink makeSink() { 411 return new ReducingSink(); 412 } 413 }; 414 } 415 416 /** 417 * Constructs a {@code TerminalOp} that implements a functional reduce on 418 * {@code long} values, producing an optional long result. 419 * 420 * @param operator the combining function 421 * @return a {@code TerminalOp} implementing the reduction 422 */ 423 public static TerminalOp<Long, OptionalLong> 424 makeLong(LongBinaryOperator operator) { 425 Objects.requireNonNull(operator); 426 class ReducingSink 427 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { 428 private boolean empty; 429 private long state; 430 431 public void begin(long size) { 432 empty = true; 433 state = 0; 434 } 435 436 @Override 437 public void accept(long t) { 438 if (empty) { 439 empty = false; 440 state = t; 441 } 442 else { 443 state = operator.applyAsLong(state, t); 444 } 445 } 446 447 @Override 448 public OptionalLong get() { 449 return empty ? OptionalLong.empty() : OptionalLong.of(state); 450 } 451 452 @Override 453 public void combine(ReducingSink other) { 454 if (!other.empty) 455 accept(other.state); 456 } 457 } 458 return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) { 459 @Override 460 public ReducingSink makeSink() { 461 return new ReducingSink(); 462 } 463 }; 464 } 465 466 /** 467 * Constructs a {@code TerminalOp} that implements a mutable reduce on 468 * {@code long} values. 469 * 470 * @param <R> the type of the result 471 * @param supplier a factory to produce a new accumulator of the result type 472 * @param accumulator a function to incorporate an int into an 473 * accumulator 474 * @param combiner a function to combine an accumulator into another 475 * @return a {@code TerminalOp} implementing the reduction 476 */ 477 public static <R> TerminalOp<Long, R> 478 makeLong(Supplier<R> supplier, 479 ObjLongConsumer<R> accumulator, 480 BinaryOperator<R> combiner) { 481 Objects.requireNonNull(supplier); 482 Objects.requireNonNull(accumulator); 483 Objects.requireNonNull(combiner); 484 class ReducingSink extends Box<R> 485 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { 486 @Override 487 public void begin(long size) { 488 state = supplier.get(); 489 } 490 491 @Override 492 public void accept(long t) { 493 accumulator.accept(state, t); 494 } 495 496 @Override 497 public void combine(ReducingSink other) { 498 state = combiner.apply(state, other.state); 499 } 500 } 501 return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) { 502 @Override 503 public ReducingSink makeSink() { 504 return new ReducingSink(); 505 } 506 }; 507 } 508 509 /** 510 * Constructs a {@code TerminalOp} that implements a functional reduce on 511 * {@code double} values. 512 * 513 * @param identity the identity for the combining function 514 * @param operator the combining function 515 * @return a {@code TerminalOp} implementing the reduction 516 */ 517 public static TerminalOp<Double, Double> 518 makeDouble(double identity, DoubleBinaryOperator operator) { 519 Objects.requireNonNull(operator); 520 class ReducingSink 521 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { 522 private double state; 523 524 @Override 525 public void begin(long size) { 526 state = identity; 527 } 528 529 @Override 530 public void accept(double t) { 531 state = operator.applyAsDouble(state, t); 532 } 533 534 @Override 535 public Double get() { 536 return state; 537 } 538 539 @Override 540 public void combine(ReducingSink other) { 541 accept(other.state); 542 } 543 } 544 return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) { 545 @Override 546 public ReducingSink makeSink() { 547 return new ReducingSink(); 548 } 549 }; 550 } 551 552 /** 553 * Constructs a {@code TerminalOp} that implements a functional reduce on 554 * {@code double} values, producing an optional double result. 555 * 556 * @param operator the combining function 557 * @return a {@code TerminalOp} implementing the reduction 558 */ 559 public static TerminalOp<Double, OptionalDouble> 560 makeDouble(DoubleBinaryOperator operator) { 561 Objects.requireNonNull(operator); 562 class ReducingSink 563 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { 564 private boolean empty; 565 private double state; 566 567 public void begin(long size) { 568 empty = true; 569 state = 0; 570 } 571 572 @Override 573 public void accept(double t) { 574 if (empty) { 575 empty = false; 576 state = t; 577 } 578 else { 579 state = operator.applyAsDouble(state, t); 580 } 581 } 582 583 @Override 584 public OptionalDouble get() { 585 return empty ? OptionalDouble.empty() : OptionalDouble.of(state); 586 } 587 588 @Override 589 public void combine(ReducingSink other) { 590 if (!other.empty) 591 accept(other.state); 592 } 593 } 594 return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) { 595 @Override 596 public ReducingSink makeSink() { 597 return new ReducingSink(); 598 } 599 }; 600 } 601 602 /** 603 * Constructs a {@code TerminalOp} that implements a mutable reduce on 604 * {@code double} values. 605 * 606 * @param <R> the type of the result 607 * @param supplier a factory to produce a new accumulator of the result type 608 * @param accumulator a function to incorporate an int into an 609 * accumulator 610 * @param combiner a function to combine an accumulator into another 611 * @return a {@code TerminalOp} implementing the reduction 612 */ 613 public static <R> TerminalOp<Double, R> 614 makeDouble(Supplier<R> supplier, 615 ObjDoubleConsumer<R> accumulator, 616 BinaryOperator<R> combiner) { 617 Objects.requireNonNull(supplier); 618 Objects.requireNonNull(accumulator); 619 Objects.requireNonNull(combiner); 620 class ReducingSink extends Box<R> 621 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { 622 @Override 623 public void begin(long size) { 624 state = supplier.get(); 625 } 626 627 @Override 628 public void accept(double t) { 629 accumulator.accept(state, t); 630 } 631 632 @Override 633 public void combine(ReducingSink other) { 634 state = combiner.apply(state, other.state); 635 } 636 } 637 return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) { 638 @Override 639 public ReducingSink makeSink() { 640 return new ReducingSink(); 641 } 642 }; 643 } 644 645 /** 646 * A type of {@code TerminalSink} that implements an associative reducing 647 * operation on elements of type {@code T} and producing a result of type 648 * {@code R}. 649 * 650 * @param <T> the type of input element to the combining operation 651 * @param <R> the result type 652 * @param <K> the type of the {@code AccumulatingSink}. 653 */ 654 private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> 655 extends TerminalSink<T, R> { 656 public void combine(K other); 657 } 658 659 /** 660 * State box for a single state element, used as a base class for 661 * {@code AccumulatingSink} instances 662 * 663 * @param <U> The type of the state element 664 */ 665 private static abstract class Box<U> { 666 U state; 667 668 Box() {} // Avoid creation of special accessor 669 670 public U get() { 671 return state; 672 } 673 } 674 675 /** 676 * A {@code TerminalOp} that evaluates a stream pipeline and sends the 677 * output into an {@code AccumulatingSink}, which performs a reduce 678 * operation. The {@code AccumulatingSink} must represent an associative 679 * reducing operation. 680 * 681 * @param <T> the output type of the stream pipeline 682 * @param <R> the result type of the reducing operation 683 * @param <S> the type of the {@code AccumulatingSink} 684 */ 685 private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> 686 implements TerminalOp<T, R> { 687 private final StreamShape inputShape; 688 689 /** 690 * Create a {@code ReduceOp} of the specified stream shape which uses 691 * the specified {@code Supplier} to create accumulating sinks. 692 * 693 * @param shape The shape of the stream pipeline 694 */ 695 ReduceOp(StreamShape shape) { 696 inputShape = shape; 697 } 698 699 public abstract S makeSink(); 700 701 @Override 702 public StreamShape inputShape() { 703 return inputShape; 704 } 705 706 @Override 707 public <P_IN> R evaluateSequential(PipelineHelper<T> helper, 708 Spliterator<P_IN> spliterator) { 709 return helper.wrapAndCopyInto(makeSink(), spliterator).get(); 710 } 711 712 @Override 713 public <P_IN> R evaluateParallel(PipelineHelper<T> helper, 714 Spliterator<P_IN> spliterator) { 715 return new ReduceTask<>(this, helper, spliterator).invoke().get(); 716 } 717 } 718 719 /** 720 * A {@code ForkJoinTask} for performing a parallel reduce operation. 721 */ 722 private static final class ReduceTask<P_IN, P_OUT, R, 723 S extends AccumulatingSink<P_OUT, R, S>> 724 extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { 725 private final ReduceOp<P_OUT, R, S> op; 726 727 ReduceTask(ReduceOp<P_OUT, R, S> op, 728 PipelineHelper<P_OUT> helper, 729 Spliterator<P_IN> spliterator) { 730 super(helper, spliterator); 731 this.op = op; 732 } 733 734 ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, 735 Spliterator<P_IN> spliterator) { 736 super(parent, spliterator); 737 this.op = parent.op; 738 } 739 740 @Override 741 protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { 742 return new ReduceTask<>(this, spliterator); 743 } 744 745 @Override 746 protected S doLeaf() { 747 return helper.wrapAndCopyInto(op.makeSink(), spliterator); 748 } 749 750 @Override 751 public void onCompletion(CountedCompleter caller) { 752 if (!isLeaf()) { 753 S leftResult = leftChild.getLocalResult(); 754 leftResult.combine(rightChild.getLocalResult()); 755 setLocalResult(leftResult); 756 } 757 // GC spliterator, left and right child 758 super.onCompletion(caller); 759 } 760 } 761 }