1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.Optional; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.OptionalLong; 32 import java.util.Spliterator; 33 import java.util.concurrent.CountedCompleter; 34 import java.util.function.BiConsumer; 35 import java.util.function.BiFunction; 36 import java.util.function.BinaryOperator; 37 import java.util.function.DoubleBinaryOperator; 38 import java.util.function.IntBinaryOperator; 39 import java.util.function.LongBinaryOperator; 40 import java.util.function.ObjDoubleConsumer; 41 import java.util.function.ObjIntConsumer; 42 import java.util.function.ObjLongConsumer; 43 import java.util.function.Supplier; 44 45 /** 46 * Factory for the creating instances of {@code TerminalOp) that implement 47 * reductions. 48 * 49 * @since 1.8 50 */ 51 final class ReduceOps { 52 53 private ReduceOps() { } 54 55 /** 56 * Constructs a {@code TerminalOp} that implements a functional reduce on 57 * reference values 58 * 59 * @param seed The identity element for the reduction 60 * @param reducer The accumulating function that incorporates an additional 61 * input element into the result 62 * @param combiner The combining function that combines two intermediate 63 * results 64 * @param <T> The type of the input elements 65 * @param <U> The type of the result 66 * @return A {@code TerminalOp} implementing the reduction 67 */ 68 public static<T, U> TerminalOp<T, U> 69 makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { 70 Objects.requireNonNull(reducer); 71 Objects.requireNonNull(combiner); 72 class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { 73 @Override 74 public void begin(long size) { 75 state = seed; 76 } 77 78 @Override 79 public void accept(T t) { 80 state = reducer.apply(state, t); 81 } 82 83 @Override 84 public void combine(ReducingSink other) { 85 state = combiner.apply(state, other.state); 86 } 87 } 88 return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) { 89 @Override 90 public ReducingSink makeSink() { 91 return new ReducingSink(); 92 } 93 }; 94 } 95 96 /** 97 * Constructs a {@code TerminalOp} that implements a functional reduce on 98 * reference values producing an optional reference result 99 * 100 * @param operator The reducing function 101 * @param <T> The type of the input elements, and the type of the result 102 * @return A {@code TerminalOp} implementing the reduction 103 */ 104 public static<T> TerminalOp<T, Optional<T>> 105 makeRef(BinaryOperator<T> operator) { 106 Objects.requireNonNull(operator); 107 class ReducingSink 108 implements AccumulatingSink<T, Optional<T>, ReducingSink> { 109 private boolean empty; 110 private T state; 111 112 public void begin(long size) { 113 empty = true; 114 state = null; 115 } 116 117 @Override 118 public void accept(T t) { 119 if (empty) { 120 empty = false; 121 state = t; 122 } else { 123 state = operator.apply(state, t); 124 } 125 } 126 127 @Override 128 public Optional<T> get() { 129 return empty ? Optional.empty() : Optional.of(state); 130 } 131 132 @Override 133 public void combine(ReducingSink other) { 134 if (!other.empty) 135 accept(other.state); 136 } 137 } 138 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) { 139 @Override 140 public ReducingSink makeSink() { 141 return new ReducingSink(); 142 } 143 }; 144 } 145 146 /** 147 * Constructs a {@code TerminalOp} that implements a mutable reduce on 148 * reference values 149 * 150 * @param collector A {@code Collector} defining the reduction 151 * @param <T> The type of the input elements 152 * @param <R> The type of the result 153 * @return A {@code ReduceOp} implementing the reduction 154 */ 155 public static<T,R> TerminalOp<T, R> 156 makeRef(Collector<? super T,R> collector) { 157 Supplier<R> supplier = Objects.requireNonNull(collector).resultSupplier(); 158 BiFunction<R, ? super T, R> accumulator = collector.accumulator(); 159 BinaryOperator<R> combiner = collector.combiner(); 160 class ReducingSink extends Box<R> 161 implements AccumulatingSink<T, R, ReducingSink> { 162 @Override 163 public void begin(long size) { 164 state = supplier.get(); 165 } 166 167 @Override 168 public void accept(T t) { 169 R newResult = accumulator.apply(state, t); 170 if (state != newResult) 171 state = newResult; 172 } 173 174 @Override 175 public void combine(ReducingSink other) { 176 state = combiner.apply(state, other.state); 177 } 178 } 179 return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) { 180 @Override 181 public ReducingSink makeSink() { 182 return new ReducingSink(); 183 } 184 185 @Override 186 public int getOpFlags() { 187 return collector.characteristics().contains(Collector.Characteristics.UNORDERED) 188 ? StreamOpFlag.NOT_ORDERED 189 : 0; 190 } 191 }; 192 } 193 194 /** 195 * Constructs a {@code TerminalOp} that implements a mutable reduce on 196 * reference values 197 * 198 * @param seedFactory A factory to produce a new base accumulator 199 * @param accumulator A function to incorporate an element into an 200 * accumulator 201 * @param reducer A function to combine an accumulator into another 202 * @param <T> The type of the input elements 203 * @param <R> The type of the result 204 * @return A {@code TerminalOp} implementing the reduction 205 */ 206 public static<T, R> TerminalOp<T, R> 207 makeRef(Supplier<R> seedFactory, 208 BiConsumer<R, ? super T> accumulator, 209 BiConsumer<R,R> reducer) { 210 Objects.requireNonNull(seedFactory); 211 Objects.requireNonNull(accumulator); 212 Objects.requireNonNull(reducer); 213 class ReducingSink extends Box<R> 214 implements AccumulatingSink<T, R, ReducingSink> { 215 @Override 216 public void begin(long size) { 217 state = seedFactory.get(); 218 } 219 220 @Override 221 public void accept(T t) { 222 accumulator.accept(state, t); 223 } 224 225 @Override 226 public void combine(ReducingSink other) { 227 reducer.accept(state, other.state); 228 } 229 } 230 return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) { 231 @Override 232 public ReducingSink makeSink() { 233 return new ReducingSink(); 234 } 235 }; 236 } 237 238 /** 239 * Constructs a {@code TerminalOp} that implements a functional reduce on 240 * {@code int} values 241 * 242 * @param identity The identity for the combining function 243 * @param operator The combining function 244 * @return A {@code TerminalOp} implementing the reduction 245 */ 246 public static TerminalOp<Integer, Integer> 247 makeInt(int identity, IntBinaryOperator operator) { 248 Objects.requireNonNull(operator); 249 class ReducingSink 250 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { 251 private int state; 252 253 @Override 254 public void begin(long size) { 255 state = identity; 256 } 257 258 @Override 259 public void accept(int t) { 260 state = operator.applyAsInt(state, t); 261 } 262 263 @Override 264 public Integer get() { 265 return state; 266 } 267 268 @Override 269 public void combine(ReducingSink other) { 270 accept(other.state); 271 } 272 } 273 return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) { 274 @Override 275 public ReducingSink makeSink() { 276 return new ReducingSink(); 277 } 278 }; 279 } 280 281 /** 282 * Constructs a {@code TerminalOp} that implements a functional reduce on 283 * {@code int} values, producing an optional integer result 284 * 285 * @param operator The combining function 286 * @return A {@code TerminalOp} implementing the reduction 287 */ 288 public static TerminalOp<Integer, OptionalInt> 289 makeInt(IntBinaryOperator operator) { 290 Objects.requireNonNull(operator); 291 class ReducingSink 292 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { 293 private boolean empty; 294 private int state; 295 296 public void begin(long size) { 297 empty = true; 298 state = 0; 299 } 300 301 @Override 302 public void accept(int t) { 303 if (empty) { 304 empty = false; 305 state = t; 306 } 307 else { 308 state = operator.applyAsInt(state, t); 309 } 310 } 311 312 @Override 313 public OptionalInt get() { 314 return empty ? OptionalInt.empty() : OptionalInt.of(state); 315 } 316 317 @Override 318 public void combine(ReducingSink other) { 319 if (!other.empty) 320 accept(other.state); 321 } 322 } 323 return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) { 324 @Override 325 public ReducingSink makeSink() { 326 return new ReducingSink(); 327 } 328 }; 329 } 330 331 /** 332 * Constructs a {@code TerminalOp} that implements a mutable reduce on 333 * {@code int} values 334 * 335 * @param supplier 336 * @param accumulator 337 * @param combiner 338 * @param <R> The type of the result 339 * @return A {@code ReduceOp} implementing the reduction 340 */ 341 public static <R> TerminalOp<Integer, R> 342 makeInt(Supplier<R> supplier, 343 ObjIntConsumer<R> accumulator, 344 BinaryOperator<R> combiner) { 345 Objects.requireNonNull(supplier); 346 Objects.requireNonNull(accumulator); 347 Objects.requireNonNull(combiner); 348 class ReducingSink extends Box<R> 349 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { 350 @Override 351 public void begin(long size) { 352 state = supplier.get(); 353 } 354 355 @Override 356 public void accept(int t) { 357 accumulator.accept(state, t); 358 } 359 360 @Override 361 public void combine(ReducingSink other) { 362 state = combiner.apply(state, other.state); 363 } 364 } 365 return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) { 366 @Override 367 public ReducingSink makeSink() { 368 return new ReducingSink(); 369 } 370 }; 371 } 372 373 /** 374 * Constructs a {@code TerminalOp} that implements a functional reduce on 375 * {@code long} values 376 * 377 * @param identity The identity for the combining function 378 * @param operator The combining function 379 * @return A {@code TerminalOp} implementing the reduction 380 */ 381 public static TerminalOp<Long, Long> 382 makeLong(long identity, LongBinaryOperator operator) { 383 Objects.requireNonNull(operator); 384 class ReducingSink 385 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { 386 private long state; 387 388 @Override 389 public void begin(long size) { 390 state = identity; 391 } 392 393 @Override 394 public void accept(long t) { 395 state = operator.applyAsLong(state, t); 396 } 397 398 @Override 399 public Long get() { 400 return state; 401 } 402 403 @Override 404 public void combine(ReducingSink other) { 405 accept(other.state); 406 } 407 } 408 return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) { 409 @Override 410 public ReducingSink makeSink() { 411 return new ReducingSink(); 412 } 413 }; 414 } 415 416 /** 417 * Constructs a {@code TerminalOp} that implements a functional reduce on 418 * {@code long} values, producing an optional long result 419 * 420 * @param operator The combining function 421 * @return A {@code TerminalOp} implementing the reduction 422 */ 423 public static TerminalOp<Long, OptionalLong> 424 makeLong(LongBinaryOperator operator) { 425 Objects.requireNonNull(operator); 426 class ReducingSink 427 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { 428 private boolean empty; 429 private long state; 430 431 public void begin(long size) { 432 empty = true; 433 state = 0; 434 } 435 436 @Override 437 public void accept(long t) { 438 if (empty) { 439 empty = false; 440 state = t; 441 } 442 else { 443 state = operator.applyAsLong(state, t); 444 } 445 } 446 447 @Override 448 public OptionalLong get() { 449 return empty ? OptionalLong.empty() : OptionalLong.of(state); 450 } 451 452 @Override 453 public void combine(ReducingSink other) { 454 if (!other.empty) 455 accept(other.state); 456 } 457 } 458 return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) { 459 @Override 460 public ReducingSink makeSink() { 461 return new ReducingSink(); 462 } 463 }; 464 } 465 466 /** 467 * Constructs a {@code TerminalOp} that implements a mutable reduce on 468 * {@code long} values 469 * 470 * @param supplier 471 * @param accumulator 472 * @param combiner 473 * @param <R> The type of the result 474 * @return A {@code TerminalOp} implementing the reduction 475 */ 476 public static <R> TerminalOp<Long, R> 477 makeLong(Supplier<R> supplier, 478 ObjLongConsumer<R> accumulator, 479 BinaryOperator<R> combiner) { 480 Objects.requireNonNull(supplier); 481 Objects.requireNonNull(accumulator); 482 Objects.requireNonNull(combiner); 483 class ReducingSink extends Box<R> 484 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { 485 @Override 486 public void begin(long size) { 487 state = supplier.get(); 488 } 489 490 @Override 491 public void accept(long t) { 492 accumulator.accept(state, t); 493 } 494 495 @Override 496 public void combine(ReducingSink other) { 497 state = combiner.apply(state, other.state); 498 } 499 } 500 return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) { 501 @Override 502 public ReducingSink makeSink() { 503 return new ReducingSink(); 504 } 505 }; 506 } 507 508 /** 509 * Constructs a {@code TerminalOp} that implements a functional reduce on 510 * {@code double} values 511 * 512 * @param identity The identity for the combining function 513 * @param operator The combining function 514 * @return A {@code TerminalOp} implementing the reduction 515 */ 516 public static TerminalOp<Double, Double> 517 makeDouble(double identity, DoubleBinaryOperator operator) { 518 Objects.requireNonNull(operator); 519 class ReducingSink 520 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { 521 private double state; 522 523 @Override 524 public void begin(long size) { 525 state = identity; 526 } 527 528 @Override 529 public void accept(double t) { 530 state = operator.applyAsDouble(state, t); 531 } 532 533 @Override 534 public Double get() { 535 return state; 536 } 537 538 @Override 539 public void combine(ReducingSink other) { 540 accept(other.state); 541 } 542 } 543 return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) { 544 @Override 545 public ReducingSink makeSink() { 546 return new ReducingSink(); 547 } 548 }; 549 } 550 551 /** 552 * Constructs a {@code TerminalOp} that implements a functional reduce on 553 * {@code double} values, producing an optional double result 554 * 555 * @param operator The combining function 556 * @return A {@code TerminalOp} implementing the reduction 557 */ 558 public static TerminalOp<Double, OptionalDouble> 559 makeDouble(DoubleBinaryOperator operator) { 560 Objects.requireNonNull(operator); 561 class ReducingSink 562 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { 563 private boolean empty; 564 private double state; 565 566 public void begin(long size) { 567 empty = true; 568 state = 0; 569 } 570 571 @Override 572 public void accept(double t) { 573 if (empty) { 574 empty = false; 575 state = t; 576 } 577 else { 578 state = operator.applyAsDouble(state, t); 579 } 580 } 581 582 @Override 583 public OptionalDouble get() { 584 return empty ? OptionalDouble.empty() : OptionalDouble.of(state); 585 } 586 587 @Override 588 public void combine(ReducingSink other) { 589 if (!other.empty) 590 accept(other.state); 591 } 592 } 593 return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) { 594 @Override 595 public ReducingSink makeSink() { 596 return new ReducingSink(); 597 } 598 }; 599 } 600 601 /** 602 * Constructs a {@code TerminalOp} that implements a mutable reduce on 603 * {@code double} values 604 * 605 * @param supplier 606 * @param accumulator 607 * @param combiner 608 * @param <R> The type of the result 609 * @return A {@code TerminalOp} implementing the reduction 610 */ 611 public static <R> TerminalOp<Double, R> 612 makeDouble(Supplier<R> supplier, 613 ObjDoubleConsumer<R> accumulator, 614 BinaryOperator<R> combiner) { 615 Objects.requireNonNull(supplier); 616 Objects.requireNonNull(accumulator); 617 Objects.requireNonNull(combiner); 618 class ReducingSink extends Box<R> 619 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { 620 @Override 621 public void begin(long size) { 622 state = supplier.get(); 623 } 624 625 @Override 626 public void accept(double t) { 627 accumulator.accept(state, t); 628 } 629 630 @Override 631 public void combine(ReducingSink other) { 632 state = combiner.apply(state, other.state); 633 } 634 } 635 return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) { 636 @Override 637 public ReducingSink makeSink() { 638 return new ReducingSink(); 639 } 640 }; 641 } 642 643 /** 644 * A type of {@code TerminalSink} that implements an associative reducing 645 * operation on elements of type {@code T} and producing a result of type 646 * {@code R}. 647 * 648 * @param <T> The type of input element to the combining operation 649 * @param <R> The result type 650 * @param <K> The type of the {@code AccumulatingSink}. 651 */ 652 private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> 653 extends TerminalSink<T, R> { 654 public void combine(K other); 655 } 656 657 /** 658 * State box for a single state element, used as a base class for 659 * {@code AccumulatingSink} instances 660 * 661 * @param <U> The type of the state element 662 */ 663 private static abstract class Box<U> { 664 U state; 665 666 Box() {} // Avoid creation of special accessor 667 668 public U get() { 669 return state; 670 } 671 } 672 673 /** 674 * A {@code TerminalOp} that evaluates a stream pipeline and sends the 675 * output into an {@code AccumulatingSink}, which performs a reduce 676 * operation. The {@code AccumulatingSink} must represent an associative 677 * reducing operation. 678 * 679 * @param <T> The output type of the stream pipeline 680 * @param <R> The result type of the reducing operation 681 * @param <S> The type of the {@code AccumulatingSink} 682 */ 683 private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> 684 implements TerminalOp<T, R> { 685 private final StreamShape inputShape; 686 687 /** 688 * Create a {@code ReduceOp} of the specified stream shape which uses 689 * the specified {@code Supplier} to create accumulating sinks 690 * 691 * @param shape The shape of the stream pipeline 692 */ 693 ReduceOp(StreamShape shape) { 694 inputShape = shape; 695 } 696 697 public abstract S makeSink(); 698 699 @Override 700 public StreamShape inputShape() { 701 return inputShape; 702 } 703 704 @Override 705 public <P_IN> R evaluateSequential(PipelineHelper<T> helper, 706 Spliterator<P_IN> spliterator) { 707 return helper.wrapAndCopyInto(makeSink(), spliterator).get(); 708 } 709 710 @Override 711 public <P_IN> R evaluateParallel(PipelineHelper<T> helper, 712 Spliterator<P_IN> spliterator) { 713 return new ReduceTask<>(this, helper, spliterator).invoke().get(); 714 } 715 } 716 717 /** A {@code ForkJoinTask} for performing a parallel reduce operation */ 718 private static final class ReduceTask<P_IN, P_OUT, R, 719 S extends AccumulatingSink<P_OUT, R, S>> 720 extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { 721 private final ReduceOp<P_OUT, R, S> op; 722 723 ReduceTask(ReduceOp<P_OUT, R, S> op, 724 PipelineHelper<P_OUT> helper, 725 Spliterator<P_IN> spliterator) { 726 super(helper, spliterator); 727 this.op = op; 728 } 729 730 ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, 731 Spliterator<P_IN> spliterator) { 732 super(parent, spliterator); 733 this.op = parent.op; 734 } 735 736 @Override 737 protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { 738 return new ReduceTask<>(this, spliterator); 739 } 740 741 @Override 742 protected S doLeaf() { 743 return helper.wrapAndCopyInto(op.makeSink(), spliterator); 744 } 745 746 @Override 747 public void onCompletion(CountedCompleter caller) { 748 if (!isLeaf()) { 749 S leftResult = leftChild.getLocalResult(); 750 leftResult.combine(rightChild.getLocalResult()); 751 setLocalResult(leftResult); 752 } 753 // GC spliterator, left and right child 754 super.onCompletion(caller); 755 } 756 } 757 }