1 /*
   2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.Optional;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalInt;
  31 import java.util.OptionalLong;
  32 import java.util.Spliterator;
  33 import java.util.concurrent.CountedCompleter;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BiFunction;
  36 import java.util.function.BinaryOperator;
  37 import java.util.function.Consumer;
  38 import java.util.function.DoubleBinaryOperator;
  39 import java.util.function.DoubleConsumer;
  40 import java.util.function.IntBinaryOperator;
  41 import java.util.function.IntConsumer;
  42 import java.util.function.LongBinaryOperator;
  43 import java.util.function.LongConsumer;
  44 import java.util.function.ObjDoubleConsumer;
  45 import java.util.function.ObjIntConsumer;
  46 import java.util.function.ObjLongConsumer;
  47 import java.util.function.Supplier;
  48 
  49 /**
  50  * Factory for creating instances of {@code TerminalOp} that implement
  51  * reductions.
  52  *
  53  * @since 1.8
  54  */
  55 final class ReduceOps {
  56 
  57     private ReduceOps() { }
  58 
  59     /**
  60      * Constructs a {@code TerminalOp} that implements a functional reduce on
  61      * reference values.
  62      *
  63      * @param <T> the type of the input elements
  64      * @param <U> the type of the result
  65      * @param seed the identity element for the reduction
  66      * @param reducer the accumulating function that incorporates an additional
  67      *        input element into the result
  68      * @param combiner the combining function that combines two intermediate
  69      *        results
  70      * @return a {@code TerminalOp} implementing the reduction
  71      */
  72     public static <T, U> TerminalOp<T, U>
  73     makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
  74         Objects.requireNonNull(reducer);
  75         Objects.requireNonNull(combiner);
  76         class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
  77             @Override
  78             public void begin(long size) {
  79                 state = seed;
  80             }
  81 
  82             @Override
  83             public void accept(T t) {
  84                 state = reducer.apply(state, t);
  85             }
  86 
  87             @Override
  88             public void combine(ReducingSink other) {
  89                 state = combiner.apply(state, other.state);
  90             }
  91         }
  92         return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
  93             @Override
  94             public ReducingSink makeSink() {
  95                 return new ReducingSink();
  96             }
  97         };
  98     }
  99 
 100     /**
 101      * Constructs a {@code TerminalOp} that implements a functional reduce on
 102      * reference values producing an optional reference result.
 103      *
 104      * @param <T> The type of the input elements, and the type of the result
 105      * @param operator The reducing function
 106      * @return A {@code TerminalOp} implementing the reduction
 107      */
 108     public static <T> TerminalOp<T, Optional<T>>
 109     makeRef(BinaryOperator<T> operator) {
 110         Objects.requireNonNull(operator);
 111         class ReducingSink
 112                 implements AccumulatingSink<T, Optional<T>, ReducingSink> {
 113             private boolean empty;
 114             private T state;
 115 
 116             public void begin(long size) {
 117                 empty = true;
 118                 state = null;
 119             }
 120 
 121             @Override
 122             public void accept(T t) {
 123                 if (empty) {
 124                     empty = false;
 125                     state = t;
 126                 } else {
 127                     state = operator.apply(state, t);
 128                 }
 129             }
 130 
 131             @Override
 132             public Optional<T> get() {
 133                 return empty ? Optional.empty() : Optional.of(state);
 134             }
 135 
 136             @Override
 137             public void combine(ReducingSink other) {
 138                 if (!other.empty)
 139                     accept(other.state);
 140             }
 141         }
 142         return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
 143             @Override
 144             public ReducingSink makeSink() {
 145                 return new ReducingSink();
 146             }
 147         };
 148     }
 149 
 150     /**
 151      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 152      * reference values.
 153      *
 154      * @param <T> the type of the input elements
 155      * @param <I> the type of the intermediate reduction result
 156      * @param collector a {@code Collector} defining the reduction
 157      * @return a {@code ReduceOp} implementing the reduction
 158      */
 159     public static <T, I> TerminalOp<T, I>
 160     makeRef(Collector<? super T, I, ?> collector) {
 161         Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
 162         BiConsumer<I, ? super T> accumulator = collector.accumulator();
 163         BinaryOperator<I> combiner = collector.combiner();
 164         class ReducingSink extends Box<I>
 165                 implements AccumulatingSink<T, I, ReducingSink> {
 166             @Override
 167             public void begin(long size) {
 168                 state = supplier.get();
 169             }
 170 
 171             @Override
 172             public void accept(T t) {
 173                 accumulator.accept(state, t);
 174             }
 175 
 176             @Override
 177             public void combine(ReducingSink other) {
 178                 state = combiner.apply(state, other.state);
 179             }
 180         }
 181         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
 182             @Override
 183             public ReducingSink makeSink() {
 184                 return new ReducingSink();
 185             }
 186 
 187             @Override
 188             public int getOpFlags() {
 189                 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
 190                        ? StreamOpFlag.NOT_ORDERED
 191                        : 0;
 192             }
 193         };
 194     }
 195 
 196     /**
 197      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 198      * reference values.
 199      *
 200      * @param <T> the type of the input elements
 201      * @param <R> the type of the result
 202      * @param seedFactory a factory to produce a new base accumulator
 203      * @param accumulator a function to incorporate an element into an
 204      *        accumulator
 205      * @param reducer a function to combine an accumulator into another
 206      * @return a {@code TerminalOp} implementing the reduction
 207      */
 208     public static <T, R> TerminalOp<T, R>
 209     makeRef(Supplier<R> seedFactory,
 210             BiConsumer<R, ? super T> accumulator,
 211             BiConsumer<R,R> reducer) {
 212         Objects.requireNonNull(seedFactory);
 213         Objects.requireNonNull(accumulator);
 214         Objects.requireNonNull(reducer);
 215         class ReducingSink extends Box<R>
 216                 implements AccumulatingSink<T, R, ReducingSink> {
 217             @Override
 218             public void begin(long size) {
 219                 state = seedFactory.get();
 220             }
 221 
 222             @Override
 223             public void accept(T t) {
 224                 accumulator.accept(state, t);
 225             }
 226 
 227             @Override
 228             public void combine(ReducingSink other) {
 229                 reducer.accept(state, other.state);
 230             }
 231         }
 232         return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
 233             @Override
 234             public ReducingSink makeSink() {
 235                 return new ReducingSink();
 236             }
 237         };
 238     }
 239 
 240     /**
 241      * Constructs a {@code TerminalOp} that counts the number of stream
 242      * elements.  If the size of the pipeline is known then count is the size
 243      * and there is no need to evaluate the pipeline.  If the size of the
 244      * pipeline is non known then count is produced, via reduction, using a
 245      * {@link CountingSink}.
 246      *
 247      * @param <T> the type of the input elements
 248      * @return a {@code TerminalOp} implementing the counting
 249      */
 250     public static <T> TerminalOp<T, Long>
 251     makeRefCounting() {
 252         return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) {
 253             @Override
 254             public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); }
 255 
 256             @Override
 257             public <P_IN> Long evaluateSequential(PipelineHelper<T> helper,
 258                                                   Spliterator<P_IN> spliterator) {
 259                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 260                     return spliterator.getExactSizeIfKnown();
 261                 return super.evaluateSequential(helper, spliterator);
 262             }
 263 
 264             @Override
 265             public <P_IN> Long evaluateParallel(PipelineHelper<T> helper,
 266                                                 Spliterator<P_IN> spliterator) {
 267                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 268                     return spliterator.getExactSizeIfKnown();
 269                 return super.evaluateParallel(helper, spliterator);
 270             }
 271 
 272             @Override
 273             public int getOpFlags() {
 274                 return StreamOpFlag.NOT_ORDERED;
 275             }
 276         };
 277     }
 278 
 279     /**
 280      * Constructs a {@code TerminalOp} that implements a functional reduce on
 281      * {@code int} values.
 282      *
 283      * @param identity the identity for the combining function
 284      * @param operator the combining function
 285      * @return a {@code TerminalOp} implementing the reduction
 286      */
 287     public static TerminalOp<Integer, Integer>
 288     makeInt(int identity, IntBinaryOperator operator) {
 289         Objects.requireNonNull(operator);
 290         class ReducingSink
 291                 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
 292             private int state;
 293 
 294             @Override
 295             public void begin(long size) {
 296                 state = identity;
 297             }
 298 
 299             @Override
 300             public void accept(int t) {
 301                 state = operator.applyAsInt(state, t);
 302             }
 303 
 304             @Override
 305             public Integer get() {
 306                 return state;
 307             }
 308 
 309             @Override
 310             public void combine(ReducingSink other) {
 311                 accept(other.state);
 312             }
 313         }
 314         return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
 315             @Override
 316             public ReducingSink makeSink() {
 317                 return new ReducingSink();
 318             }
 319         };
 320     }
 321 
 322     /**
 323      * Constructs a {@code TerminalOp} that implements a functional reduce on
 324      * {@code int} values, producing an optional integer result.
 325      *
 326      * @param operator the combining function
 327      * @return a {@code TerminalOp} implementing the reduction
 328      */
 329     public static TerminalOp<Integer, OptionalInt>
 330     makeInt(IntBinaryOperator operator) {
 331         Objects.requireNonNull(operator);
 332         class ReducingSink
 333                 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
 334             private boolean empty;
 335             private int state;
 336 
 337             public void begin(long size) {
 338                 empty = true;
 339                 state = 0;
 340             }
 341 
 342             @Override
 343             public void accept(int t) {
 344                 if (empty) {
 345                     empty = false;
 346                     state = t;
 347                 }
 348                 else {
 349                     state = operator.applyAsInt(state, t);
 350                 }
 351             }
 352 
 353             @Override
 354             public OptionalInt get() {
 355                 return empty ? OptionalInt.empty() : OptionalInt.of(state);
 356             }
 357 
 358             @Override
 359             public void combine(ReducingSink other) {
 360                 if (!other.empty)
 361                     accept(other.state);
 362             }
 363         }
 364         return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
 365             @Override
 366             public ReducingSink makeSink() {
 367                 return new ReducingSink();
 368             }
 369         };
 370     }
 371 
 372     /**
 373      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 374      * {@code int} values.
 375      *
 376      * @param <R> The type of the result
 377      * @param supplier a factory to produce a new accumulator of the result type
 378      * @param accumulator a function to incorporate an int into an
 379      *        accumulator
 380      * @param combiner a function to combine an accumulator into another
 381      * @return A {@code ReduceOp} implementing the reduction
 382      */
 383     public static <R> TerminalOp<Integer, R>
 384     makeInt(Supplier<R> supplier,
 385             ObjIntConsumer<R> accumulator,
 386             BinaryOperator<R> combiner) {
 387         Objects.requireNonNull(supplier);
 388         Objects.requireNonNull(accumulator);
 389         Objects.requireNonNull(combiner);
 390         class ReducingSink extends Box<R>
 391                 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
 392             @Override
 393             public void begin(long size) {
 394                 state = supplier.get();
 395             }
 396 
 397             @Override
 398             public void accept(int t) {
 399                 accumulator.accept(state, t);
 400             }
 401 
 402             @Override
 403             public void combine(ReducingSink other) {
 404                 state = combiner.apply(state, other.state);
 405             }
 406         }
 407         return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
 408             @Override
 409             public ReducingSink makeSink() {
 410                 return new ReducingSink();
 411             }
 412         };
 413     }
 414 
 415     /**
 416      * Constructs a {@code TerminalOp} that counts the number of stream
 417      * elements.  If the size of the pipeline is known then count is the size
 418      * and there is no need to evaluate the pipeline.  If the size of the
 419      * pipeline is non known then count is produced, via reduction, using a
 420      * {@link CountingSink}.
 421      *
 422      * @return a {@code TerminalOp} implementing the counting
 423      */
 424     public static TerminalOp<Integer, Long>
 425     makeIntCounting() {
 426         return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) {
 427             @Override
 428             public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); }
 429 
 430             @Override
 431             public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper,
 432                                                   Spliterator<P_IN> spliterator) {
 433                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 434                     return spliterator.getExactSizeIfKnown();
 435                 return super.evaluateSequential(helper, spliterator);
 436             }
 437 
 438             @Override
 439             public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper,
 440                                                 Spliterator<P_IN> spliterator) {
 441                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 442                     return spliterator.getExactSizeIfKnown();
 443                 return super.evaluateParallel(helper, spliterator);
 444             }
 445 
 446             @Override
 447             public int getOpFlags() {
 448                 return StreamOpFlag.NOT_ORDERED;
 449             }
 450         };
 451     }
 452 
 453     /**
 454      * Constructs a {@code TerminalOp} that implements a functional reduce on
 455      * {@code long} values.
 456      *
 457      * @param identity the identity for the combining function
 458      * @param operator the combining function
 459      * @return a {@code TerminalOp} implementing the reduction
 460      */
 461     public static TerminalOp<Long, Long>
 462     makeLong(long identity, LongBinaryOperator operator) {
 463         Objects.requireNonNull(operator);
 464         class ReducingSink
 465                 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
 466             private long state;
 467 
 468             @Override
 469             public void begin(long size) {
 470                 state = identity;
 471             }
 472 
 473             @Override
 474             public void accept(long t) {
 475                 state = operator.applyAsLong(state, t);
 476             }
 477 
 478             @Override
 479             public Long get() {
 480                 return state;
 481             }
 482 
 483             @Override
 484             public void combine(ReducingSink other) {
 485                 accept(other.state);
 486             }
 487         }
 488         return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
 489             @Override
 490             public ReducingSink makeSink() {
 491                 return new ReducingSink();
 492             }
 493         };
 494     }
 495 
 496     /**
 497      * Constructs a {@code TerminalOp} that implements a functional reduce on
 498      * {@code long} values, producing an optional long result.
 499      *
 500      * @param operator the combining function
 501      * @return a {@code TerminalOp} implementing the reduction
 502      */
 503     public static TerminalOp<Long, OptionalLong>
 504     makeLong(LongBinaryOperator operator) {
 505         Objects.requireNonNull(operator);
 506         class ReducingSink
 507                 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
 508             private boolean empty;
 509             private long state;
 510 
 511             public void begin(long size) {
 512                 empty = true;
 513                 state = 0;
 514             }
 515 
 516             @Override
 517             public void accept(long t) {
 518                 if (empty) {
 519                     empty = false;
 520                     state = t;
 521                 }
 522                 else {
 523                     state = operator.applyAsLong(state, t);
 524                 }
 525             }
 526 
 527             @Override
 528             public OptionalLong get() {
 529                 return empty ? OptionalLong.empty() : OptionalLong.of(state);
 530             }
 531 
 532             @Override
 533             public void combine(ReducingSink other) {
 534                 if (!other.empty)
 535                     accept(other.state);
 536             }
 537         }
 538         return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
 539             @Override
 540             public ReducingSink makeSink() {
 541                 return new ReducingSink();
 542             }
 543         };
 544     }
 545 
 546     /**
 547      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 548      * {@code long} values.
 549      *
 550      * @param <R> the type of the result
 551      * @param supplier a factory to produce a new accumulator of the result type
 552      * @param accumulator a function to incorporate an int into an
 553      *        accumulator
 554      * @param combiner a function to combine an accumulator into another
 555      * @return a {@code TerminalOp} implementing the reduction
 556      */
 557     public static <R> TerminalOp<Long, R>
 558     makeLong(Supplier<R> supplier,
 559              ObjLongConsumer<R> accumulator,
 560              BinaryOperator<R> combiner) {
 561         Objects.requireNonNull(supplier);
 562         Objects.requireNonNull(accumulator);
 563         Objects.requireNonNull(combiner);
 564         class ReducingSink extends Box<R>
 565                 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
 566             @Override
 567             public void begin(long size) {
 568                 state = supplier.get();
 569             }
 570 
 571             @Override
 572             public void accept(long t) {
 573                 accumulator.accept(state, t);
 574             }
 575 
 576             @Override
 577             public void combine(ReducingSink other) {
 578                 state = combiner.apply(state, other.state);
 579             }
 580         }
 581         return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
 582             @Override
 583             public ReducingSink makeSink() {
 584                 return new ReducingSink();
 585             }
 586         };
 587     }
 588 
 589     /**
 590      * Constructs a {@code TerminalOp} that counts the number of stream
 591      * elements.  If the size of the pipeline is known then count is the size
 592      * and there is no need to evaluate the pipeline.  If the size of the
 593      * pipeline is non known then count is produced, via reduction, using a
 594      * {@link CountingSink}.
 595      *
 596      * @return a {@code TerminalOp} implementing the counting
 597      */
 598     public static TerminalOp<Long, Long>
 599     makeLongCounting() {
 600         return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) {
 601             @Override
 602             public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); }
 603 
 604             @Override
 605             public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper,
 606                                                   Spliterator<P_IN> spliterator) {
 607                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 608                     return spliterator.getExactSizeIfKnown();
 609                 return super.evaluateSequential(helper, spliterator);
 610             }
 611 
 612             @Override
 613             public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper,
 614                                                 Spliterator<P_IN> spliterator) {
 615                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 616                     return spliterator.getExactSizeIfKnown();
 617                 return super.evaluateParallel(helper, spliterator);
 618             }
 619 
 620             @Override
 621             public int getOpFlags() {
 622                 return StreamOpFlag.NOT_ORDERED;
 623             }
 624         };
 625     }
 626 
 627     /**
 628      * Constructs a {@code TerminalOp} that implements a functional reduce on
 629      * {@code double} values.
 630      *
 631      * @param identity the identity for the combining function
 632      * @param operator the combining function
 633      * @return a {@code TerminalOp} implementing the reduction
 634      */
 635     public static TerminalOp<Double, Double>
 636     makeDouble(double identity, DoubleBinaryOperator operator) {
 637         Objects.requireNonNull(operator);
 638         class ReducingSink
 639                 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
 640             private double state;
 641 
 642             @Override
 643             public void begin(long size) {
 644                 state = identity;
 645             }
 646 
 647             @Override
 648             public void accept(double t) {
 649                 state = operator.applyAsDouble(state, t);
 650             }
 651 
 652             @Override
 653             public Double get() {
 654                 return state;
 655             }
 656 
 657             @Override
 658             public void combine(ReducingSink other) {
 659                 accept(other.state);
 660             }
 661         }
 662         return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 663             @Override
 664             public ReducingSink makeSink() {
 665                 return new ReducingSink();
 666             }
 667         };
 668     }
 669 
 670     /**
 671      * Constructs a {@code TerminalOp} that implements a functional reduce on
 672      * {@code double} values, producing an optional double result.
 673      *
 674      * @param operator the combining function
 675      * @return a {@code TerminalOp} implementing the reduction
 676      */
 677     public static TerminalOp<Double, OptionalDouble>
 678     makeDouble(DoubleBinaryOperator operator) {
 679         Objects.requireNonNull(operator);
 680         class ReducingSink
 681                 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
 682             private boolean empty;
 683             private double state;
 684 
 685             public void begin(long size) {
 686                 empty = true;
 687                 state = 0;
 688             }
 689 
 690             @Override
 691             public void accept(double t) {
 692                 if (empty) {
 693                     empty = false;
 694                     state = t;
 695                 }
 696                 else {
 697                     state = operator.applyAsDouble(state, t);
 698                 }
 699             }
 700 
 701             @Override
 702             public OptionalDouble get() {
 703                 return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
 704             }
 705 
 706             @Override
 707             public void combine(ReducingSink other) {
 708                 if (!other.empty)
 709                     accept(other.state);
 710             }
 711         }
 712         return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 713             @Override
 714             public ReducingSink makeSink() {
 715                 return new ReducingSink();
 716             }
 717         };
 718     }
 719 
 720     /**
 721      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 722      * {@code double} values.
 723      *
 724      * @param <R> the type of the result
 725      * @param supplier a factory to produce a new accumulator of the result type
 726      * @param accumulator a function to incorporate an int into an
 727      *        accumulator
 728      * @param combiner a function to combine an accumulator into another
 729      * @return a {@code TerminalOp} implementing the reduction
 730      */
 731     public static <R> TerminalOp<Double, R>
 732     makeDouble(Supplier<R> supplier,
 733                ObjDoubleConsumer<R> accumulator,
 734                BinaryOperator<R> combiner) {
 735         Objects.requireNonNull(supplier);
 736         Objects.requireNonNull(accumulator);
 737         Objects.requireNonNull(combiner);
 738         class ReducingSink extends Box<R>
 739                 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
 740             @Override
 741             public void begin(long size) {
 742                 state = supplier.get();
 743             }
 744 
 745             @Override
 746             public void accept(double t) {
 747                 accumulator.accept(state, t);
 748             }
 749 
 750             @Override
 751             public void combine(ReducingSink other) {
 752                 state = combiner.apply(state, other.state);
 753             }
 754         }
 755         return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 756             @Override
 757             public ReducingSink makeSink() {
 758                 return new ReducingSink();
 759             }
 760         };
 761     }
 762 
 763     /**
 764      * Constructs a {@code TerminalOp} that counts the number of stream
 765      * elements.  If the size of the pipeline is known then count is the size
 766      * and there is no need to evaluate the pipeline.  If the size of the
 767      * pipeline is non known then count is produced, via reduction, using a
 768      * {@link CountingSink}.
 769      *
 770      * @return a {@code TerminalOp} implementing the counting
 771      */
 772     public static TerminalOp<Double, Long>
 773     makeDoubleCounting() {
 774         return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) {
 775             @Override
 776             public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); }
 777 
 778             @Override
 779             public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper,
 780                                                   Spliterator<P_IN> spliterator) {
 781                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 782                     return spliterator.getExactSizeIfKnown();
 783                 return super.evaluateSequential(helper, spliterator);
 784             }
 785 
 786             @Override
 787             public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper,
 788                                                 Spliterator<P_IN> spliterator) {
 789                 if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
 790                     return spliterator.getExactSizeIfKnown();
 791                 return super.evaluateParallel(helper, spliterator);
 792             }
 793 
 794             @Override
 795             public int getOpFlags() {
 796                 return StreamOpFlag.NOT_ORDERED;
 797             }
 798         };
 799     }
 800 
 801     /**
 802      * A sink that counts elements
 803      */
 804     abstract static class CountingSink<T>
 805             extends Box<Long>
 806             implements AccumulatingSink<T, Long, CountingSink<T>> {
 807         long count;
 808 
 809         @Override
 810         public void begin(long size) {
 811             count = 0L;
 812         }
 813 
 814         @Override
 815         public Long get() {
 816             return count;
 817         }
 818 
 819         @Override
 820         public void combine(CountingSink<T> other) {
 821             count += other.count;
 822         }
 823 
 824         static final class OfRef<T> extends CountingSink<T> {
 825             @Override
 826             public void accept(T t) {
 827                 count++;
 828             }
 829         }
 830 
 831         static final class OfInt extends CountingSink<Integer> implements Sink.OfInt {
 832             @Override
 833             public void accept(int t) {
 834                 count++;
 835             }
 836         }
 837 
 838         static final class OfLong extends CountingSink<Long> implements Sink.OfLong {
 839             @Override
 840             public void accept(long t) {
 841                 count++;
 842             }
 843         }
 844 
 845         static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble {
 846             @Override
 847             public void accept(double t) {
 848                 count++;
 849             }
 850         }
 851     }
 852 
 853     /**
 854      * A type of {@code TerminalSink} that implements an associative reducing
 855      * operation on elements of type {@code T} and producing a result of type
 856      * {@code R}.
 857      *
 858      * @param <T> the type of input element to the combining operation
 859      * @param <R> the result type
 860      * @param <K> the type of the {@code AccumulatingSink}.
 861      */
 862     private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
 863             extends TerminalSink<T, R> {
 864         void combine(K other);
 865     }
 866 
 867     /**
 868      * State box for a single state element, used as a base class for
 869      * {@code AccumulatingSink} instances
 870      *
 871      * @param <U> The type of the state element
 872      */
 873     private abstract static class Box<U> {
 874         U state;
 875 
 876         Box() {} // Avoid creation of special accessor
 877 
 878         public U get() {
 879             return state;
 880         }
 881     }
 882 
 883     /**
 884      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
 885      * output into an {@code AccumulatingSink}, which performs a reduce
 886      * operation. The {@code AccumulatingSink} must represent an associative
 887      * reducing operation.
 888      *
 889      * @param <T> the output type of the stream pipeline
 890      * @param <R> the result type of the reducing operation
 891      * @param <S> the type of the {@code AccumulatingSink}
 892      */
 893     private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
 894             implements TerminalOp<T, R> {
 895         private final StreamShape inputShape;
 896 
 897         /**
 898          * Create a {@code ReduceOp} of the specified stream shape which uses
 899          * the specified {@code Supplier} to create accumulating sinks.
 900          *
 901          * @param shape The shape of the stream pipeline
 902          */
 903         ReduceOp(StreamShape shape) {
 904             inputShape = shape;
 905         }
 906 
 907         public abstract S makeSink();
 908 
 909         @Override
 910         public StreamShape inputShape() {
 911             return inputShape;
 912         }
 913 
 914         @Override
 915         public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
 916                                            Spliterator<P_IN> spliterator) {
 917             return helper.wrapAndCopyInto(makeSink(), spliterator).get();
 918         }
 919 
 920         @Override
 921         public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
 922                                          Spliterator<P_IN> spliterator) {
 923             return new ReduceTask<>(this, helper, spliterator).invoke().get();
 924         }
 925     }
 926 
 927     /**
 928      * A {@code ForkJoinTask} for performing a parallel reduce operation.
 929      */
 930     @SuppressWarnings("serial")
 931     private static final class ReduceTask<P_IN, P_OUT, R,
 932                                           S extends AccumulatingSink<P_OUT, R, S>>
 933             extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
 934         private final ReduceOp<P_OUT, R, S> op;
 935 
 936         ReduceTask(ReduceOp<P_OUT, R, S> op,
 937                    PipelineHelper<P_OUT> helper,
 938                    Spliterator<P_IN> spliterator) {
 939             super(helper, spliterator);
 940             this.op = op;
 941         }
 942 
 943         ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
 944                    Spliterator<P_IN> spliterator) {
 945             super(parent, spliterator);
 946             this.op = parent.op;
 947         }
 948 
 949         @Override
 950         protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
 951             return new ReduceTask<>(this, spliterator);
 952         }
 953 
 954         @Override
 955         protected S doLeaf() {
 956             return helper.wrapAndCopyInto(op.makeSink(), spliterator);
 957         }
 958 
 959         @Override
 960         public void onCompletion(CountedCompleter<?> caller) {
 961             if (!isLeaf()) {
 962                 S leftResult = leftChild.getLocalResult();
 963                 leftResult.combine(rightChild.getLocalResult());
 964                 setLocalResult(leftResult);
 965             }
 966             // GC spliterator, left and right child
 967             super.onCompletion(caller);
 968         }
 969     }
 970 
 971     static final class FoldLeftWithSeedOp<U, T> extends Box<U> implements Consumer<T> {
 972         private final BiFunction<U, ? super T, U> op;
 973 
 974         FoldLeftWithSeedOp(U seed, BiFunction<U, ? super T, U> op) {
 975             this.state = seed;
 976             this.op = op;
 977         }
 978 
 979         @Override
 980         public void accept(T t) {
 981             state = op.apply(state, t);
 982         }
 983     }
 984 
 985     static final class FoldLeftOp<T> implements Consumer<T> {
 986         private final BinaryOperator<T> op;
 987         private T state;
 988         private boolean empty = true;
 989 
 990         FoldLeftOp(BinaryOperator<T> op) {
 991             this.op = op;
 992         }
 993 
 994         @Override
 995         public void accept(T t) {
 996             if (empty) {
 997                 state = t;
 998                 empty = false;
 999             } else {
1000                 state = op.apply(state, t);
1001             }
1002         }
1003 
1004         Optional<T> get() {
1005             return empty ? Optional.empty() : Optional.of(state);
1006         }
1007     }
1008 
1009     static final class IntFoldLeftOp implements IntConsumer {
1010         private final IntBinaryOperator op;
1011         private int state;
1012         private boolean empty = true;
1013 
1014         IntFoldLeftOp(IntBinaryOperator op) {
1015             this.op = op;
1016         }
1017 
1018         @Override
1019         public void accept(int t) {
1020             if (empty) {
1021                 state = t;
1022                 empty = false;
1023             } else {
1024                 state = op.applyAsInt(state, t);
1025             }
1026         }
1027 
1028         OptionalInt get() {
1029             return empty ? OptionalInt.empty() : OptionalInt.of(state);
1030         }
1031 
1032         int getAsInt() {
1033             if (empty) {
1034                 throw new IllegalStateException();
1035             }
1036             return state;
1037         }
1038     }
1039 
1040     static final class LongFoldLeftOp implements LongConsumer {
1041         private final LongBinaryOperator op;
1042         private long state;
1043         private boolean empty = true;
1044 
1045         LongFoldLeftOp(LongBinaryOperator op) {
1046             this.op = op;
1047         }
1048 
1049         @Override
1050         public void accept(long t) {
1051             if (empty) {
1052                 state = t;
1053                 empty = false;
1054             } else {
1055                 state = op.applyAsLong(state, t);
1056             }
1057         }
1058 
1059         OptionalLong get() {
1060             return empty ? OptionalLong.empty() : OptionalLong.of(state);
1061         }
1062 
1063         long getAsLong() {
1064             if (empty) {
1065                 throw new IllegalStateException();
1066             }
1067             return state;
1068         }
1069     }
1070 
1071     static final class DoubleFoldLeftOp implements DoubleConsumer {
1072         private final DoubleBinaryOperator op;
1073         private double state;
1074         private boolean empty = true;
1075 
1076         DoubleFoldLeftOp(DoubleBinaryOperator op) {
1077             this.op = op;
1078         }
1079 
1080         @Override
1081         public void accept(double t) {
1082             if (empty) {
1083                 state = t;
1084                 empty = false;
1085             } else {
1086                 state = op.applyAsDouble(state, t);
1087             }
1088         }
1089 
1090         OptionalDouble get() {
1091             return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
1092         }
1093 
1094         double getAsDouble() {
1095             if (empty) {
1096                 throw new IllegalStateException();
1097             }
1098             return state;
1099         }
1100     }
1101 }