582 @Override 583 public OptionalDouble get() { 584 return empty ? OptionalDouble.empty() : OptionalDouble.of(state); 585 } 586 587 @Override 588 public void combine(ReducingSink other) { 589 if (!other.empty) 590 accept(other.state); 591 } 592 } 593 return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) { 594 @Override 595 public ReducingSink makeSink() { 596 return new ReducingSink(); 597 } 598 }; 599 } 600 601 /** 602 * Constructs a {@code TerminalOp} that implements a mutable reduce on 603 * {@code double} values. 604 * 605 * @param <R> the type of the result 606 * @param supplier a factory to produce a new accumulator of the result type 607 * @param accumulator a function to incorporate an int into an 608 * accumulator 609 * @param combiner a function to combine an accumulator into another 610 * @return a {@code TerminalOp} implementing the reduction 611 */ 612 public static <R> TerminalOp<Double, R> 613 makeDouble(Supplier<R> supplier, 614 ObjDoubleConsumer<R> accumulator, 615 BinaryOperator<R> combiner) { 616 Objects.requireNonNull(supplier); 617 Objects.requireNonNull(accumulator); 618 Objects.requireNonNull(combiner); 619 class ReducingSink extends Box<R> 620 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { 621 @Override | 582 @Override 583 public OptionalDouble get() { 584 return empty ? OptionalDouble.empty() : OptionalDouble.of(state); 585 } 586 587 @Override 588 public void combine(ReducingSink other) { 589 if (!other.empty) 590 accept(other.state); 591 } 592 } 593 return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) { 594 @Override 595 public ReducingSink makeSink() { 596 return new ReducingSink(); 597 } 598 }; 599 } 600 601 /** 602 * Constructs a {@code TerminalOp} that implements a functional reduce on 603 * {@code double} values. 604 * 605 * @param identity the identity for the combining function 606 * @param operator the combining function 607 * @return a {@code TerminalOp} implementing the reduction 608 */ 609 public static TerminalOp<Double, Double> 610 makeDoubleCompensatingSummer() { 611 class ReducingSink 612 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { 613 private double sum; 614 private double sumCompensation; 615 616 /** 617 * Incorporate a new double value using Kahan summation / 618 * compensation summation. 619 */ 620 private void sumWithCompensation(double value) { 621 double tmp = value - sumCompensation; 622 double velvel = sum + tmp; // Little wolf of rounding error 623 sumCompensation = (velvel - sum) - tmp; 624 sum = velvel; 625 } 626 627 @Override 628 public void begin(long size) { 629 sum = 0.0; 630 sumCompensation = 0.0; 631 } 632 633 @Override 634 public void accept(double t) { 635 sumWithCompensation(t); 636 } 637 638 @Override 639 public Double get() { 640 return sum; 641 } 642 643 @Override 644 public void combine(ReducingSink other) { 645 accept(other.sum); 646 accept(other.sumCompensation); 647 } 648 } 649 return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) { 650 @Override 651 public ReducingSink makeSink() { 652 return new ReducingSink(); 653 } 654 }; 655 } 656 657 /** 658 * Constructs a {@code TerminalOp} that implements a mutable reduce on 659 * {@code double} values. 660 * 661 * @param <R> the type of the result 662 * @param supplier a factory to produce a new accumulator of the result type 663 * @param accumulator a function to incorporate an int into an 664 * accumulator 665 * @param combiner a function to combine an accumulator into another 666 * @return a {@code TerminalOp} implementing the reduction 667 */ 668 public static <R> TerminalOp<Double, R> 669 makeDouble(Supplier<R> supplier, 670 ObjDoubleConsumer<R> accumulator, 671 BinaryOperator<R> combiner) { 672 Objects.requireNonNull(supplier); 673 Objects.requireNonNull(accumulator); 674 Objects.requireNonNull(combiner); 675 class ReducingSink extends Box<R> 676 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { 677 @Override |