1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.Optional;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalInt;
  31 import java.util.OptionalLong;
  32 import java.util.Spliterator;
  33 import java.util.concurrent.CountedCompleter;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BiFunction;
  36 import java.util.function.BinaryOperator;
  37 import java.util.function.DoubleBinaryOperator;
  38 import java.util.function.IntBinaryOperator;
  39 import java.util.function.LongBinaryOperator;
  40 import java.util.function.ObjDoubleConsumer;
  41 import java.util.function.ObjIntConsumer;
  42 import java.util.function.ObjLongConsumer;
  43 import java.util.function.Supplier;
  44 
  45 /**
  46  * Factory for the creating instances of {@code TerminalOp) that implement
  47  * reductions.
  48  *
  49  * @since 1.8
  50  */
  51 final class ReduceOps {
  52 
  53     private ReduceOps() { }
  54 
  55     /**
  56      * Constructs a {@code TerminalOp} that implements a functional reduce on
  57      * reference values.
  58      *
  59      * @param <T> the type of the input elements
  60      * @param <U> the type of the result
  61      * @param seed the identity element for the reduction
  62      * @param reducer the accumulating function that incorporates an additional
  63      *        input element into the result
  64      * @param combiner the combining function that combines two intermediate
  65      *        results
  66      * @return a {@code TerminalOp} implementing the reduction
  67      */
  68     public static <T, U> TerminalOp<T, U>
  69     makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
  70         Objects.requireNonNull(reducer);
  71         Objects.requireNonNull(combiner);
  72         class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
  73             @Override
  74             public void begin(long size) {
  75                 state = seed;
  76             }
  77 
  78             @Override
  79             public void accept(T t) {
  80                 state = reducer.apply(state, t);
  81             }
  82 
  83             @Override
  84             public void combine(ReducingSink other) {
  85                 state = combiner.apply(state, other.state);
  86             }
  87         }
  88         return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
  89             @Override
  90             public ReducingSink makeSink() {
  91                 return new ReducingSink();
  92             }
  93         };
  94     }
  95 
  96     /**
  97      * Constructs a {@code TerminalOp} that implements a functional reduce on
  98      * reference values producing an optional reference result.
  99      *
 100      * @param <T> The type of the input elements, and the type of the result
 101      * @param operator The reducing function
 102      * @return A {@code TerminalOp} implementing the reduction
 103      */
 104     public static <T> TerminalOp<T, Optional<T>>
 105     makeRef(BinaryOperator<T> operator) {
 106         Objects.requireNonNull(operator);
 107         class ReducingSink
 108                 implements AccumulatingSink<T, Optional<T>, ReducingSink> {
 109             private boolean empty;
 110             private T state;
 111 
 112             public void begin(long size) {
 113                 empty = true;
 114                 state = null;
 115             }
 116 
 117             @Override
 118             public void accept(T t) {
 119                 if (empty) {
 120                     empty = false;
 121                     state = t;
 122                 } else {
 123                     state = operator.apply(state, t);
 124                 }
 125             }
 126 
 127             @Override
 128             public Optional<T> get() {
 129                 return empty ? Optional.empty() : Optional.of(state);
 130             }
 131 
 132             @Override
 133             public void combine(ReducingSink other) {
 134                 if (!other.empty)
 135                     accept(other.state);
 136             }
 137         }
 138         return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
 139             @Override
 140             public ReducingSink makeSink() {
 141                 return new ReducingSink();
 142             }
 143         };
 144     }
 145 
 146     /**
 147      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 148      * reference values.
 149      *
 150      * @param <T> the type of the input elements
 151      * @param <I> the type of the intermediate reduction result
 152      * @param <I> the type of the final reduction result
 153      * @param collector a {@code Collector} defining the reduction
 154      * @return a {@code ReduceOp} implementing the reduction
 155      */
 156     public static <T, I, R> TerminalOp<T, I>
 157     makeRef(Collector<? super T, I, R> collector) {
 158         Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
 159         BiConsumer<I, ? super T> accumulator = collector.accumulator();
 160         BinaryOperator<I> combiner = collector.combiner();
 161         class ReducingSink extends Box<I>
 162                 implements AccumulatingSink<T, I, ReducingSink> {
 163             @Override
 164             public void begin(long size) {
 165                 state = supplier.get();
 166             }
 167 
 168             @Override
 169             public void accept(T t) {
 170                 accumulator.accept(state, t);
 171             }
 172 
 173             @Override
 174             public void combine(ReducingSink other) {
 175                 state = combiner.apply(state, other.state);
 176             }
 177         }
 178         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
 179             @Override
 180             public ReducingSink makeSink() {
 181                 return new ReducingSink();
 182             }
 183 
 184             @Override
 185             public int getOpFlags() {
 186                 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
 187                        ? StreamOpFlag.NOT_ORDERED
 188                        : 0;
 189             }
 190         };
 191     }
 192 
 193     /**
 194      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 195      * reference values.
 196      *
 197      * @param <T> the type of the input elements
 198      * @param <R> the type of the result
 199      * @param seedFactory a factory to produce a new base accumulator
 200      * @param accumulator a function to incorporate an element into an
 201      *        accumulator
 202      * @param reducer a function to combine an accumulator into another
 203      * @return a {@code TerminalOp} implementing the reduction
 204      */
 205     public static <T, R> TerminalOp<T, R>
 206     makeRef(Supplier<R> seedFactory,
 207             BiConsumer<R, ? super T> accumulator,
 208             BiConsumer<R,R> reducer) {
 209         Objects.requireNonNull(seedFactory);
 210         Objects.requireNonNull(accumulator);
 211         Objects.requireNonNull(reducer);
 212         class ReducingSink extends Box<R>
 213                 implements AccumulatingSink<T, R, ReducingSink> {
 214             @Override
 215             public void begin(long size) {
 216                 state = seedFactory.get();
 217             }
 218 
 219             @Override
 220             public void accept(T t) {
 221                 accumulator.accept(state, t);
 222             }
 223 
 224             @Override
 225             public void combine(ReducingSink other) {
 226                 reducer.accept(state, other.state);
 227             }
 228         }
 229         return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
 230             @Override
 231             public ReducingSink makeSink() {
 232                 return new ReducingSink();
 233             }
 234         };
 235     }
 236 
 237     /**
 238      * Constructs a {@code TerminalOp} that implements a functional reduce on
 239      * {@code int} values.
 240      *
 241      * @param identity the identity for the combining function
 242      * @param operator the combining function
 243      * @return a {@code TerminalOp} implementing the reduction
 244      */
 245     public static TerminalOp<Integer, Integer>
 246     makeInt(int identity, IntBinaryOperator operator) {
 247         Objects.requireNonNull(operator);
 248         class ReducingSink
 249                 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
 250             private int state;
 251 
 252             @Override
 253             public void begin(long size) {
 254                 state = identity;
 255             }
 256 
 257             @Override
 258             public void accept(int t) {
 259                 state = operator.applyAsInt(state, t);
 260             }
 261 
 262             @Override
 263             public Integer get() {
 264                 return state;
 265             }
 266 
 267             @Override
 268             public void combine(ReducingSink other) {
 269                 accept(other.state);
 270             }
 271         }
 272         return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
 273             @Override
 274             public ReducingSink makeSink() {
 275                 return new ReducingSink();
 276             }
 277         };
 278     }
 279 
 280     /**
 281      * Constructs a {@code TerminalOp} that implements a functional reduce on
 282      * {@code int} values, producing an optional integer result.
 283      *
 284      * @param operator the combining function
 285      * @return a {@code TerminalOp} implementing the reduction
 286      */
 287     public static TerminalOp<Integer, OptionalInt>
 288     makeInt(IntBinaryOperator operator) {
 289         Objects.requireNonNull(operator);
 290         class ReducingSink
 291                 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
 292             private boolean empty;
 293             private int state;
 294 
 295             public void begin(long size) {
 296                 empty = true;
 297                 state = 0;
 298             }
 299 
 300             @Override
 301             public void accept(int t) {
 302                 if (empty) {
 303                     empty = false;
 304                     state = t;
 305                 }
 306                 else {
 307                     state = operator.applyAsInt(state, t);
 308                 }
 309             }
 310 
 311             @Override
 312             public OptionalInt get() {
 313                 return empty ? OptionalInt.empty() : OptionalInt.of(state);
 314             }
 315 
 316             @Override
 317             public void combine(ReducingSink other) {
 318                 if (!other.empty)
 319                     accept(other.state);
 320             }
 321         }
 322         return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
 323             @Override
 324             public ReducingSink makeSink() {
 325                 return new ReducingSink();
 326             }
 327         };
 328     }
 329 
 330     /**
 331      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 332      * {@code int} values.
 333      *
 334      * @param <R> The type of the result
 335      * @param supplier a factory to produce a new accumulator of the result type
 336      * @param accumulator a function to incorporate an int into an
 337      *        accumulator
 338      * @param combiner a function to combine an accumulator into another
 339      * @return A {@code ReduceOp} implementing the reduction
 340      */
 341     public static <R> TerminalOp<Integer, R>
 342     makeInt(Supplier<R> supplier,
 343             ObjIntConsumer<R> accumulator,
 344             BinaryOperator<R> combiner) {
 345         Objects.requireNonNull(supplier);
 346         Objects.requireNonNull(accumulator);
 347         Objects.requireNonNull(combiner);
 348         class ReducingSink extends Box<R>
 349                 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
 350             @Override
 351             public void begin(long size) {
 352                 state = supplier.get();
 353             }
 354 
 355             @Override
 356             public void accept(int t) {
 357                 accumulator.accept(state, t);
 358             }
 359 
 360             @Override
 361             public void combine(ReducingSink other) {
 362                 state = combiner.apply(state, other.state);
 363             }
 364         }
 365         return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
 366             @Override
 367             public ReducingSink makeSink() {
 368                 return new ReducingSink();
 369             }
 370         };
 371     }
 372 
 373     /**
 374      * Constructs a {@code TerminalOp} that implements a functional reduce on
 375      * {@code long} values.
 376      *
 377      * @param identity the identity for the combining function
 378      * @param operator the combining function
 379      * @return a {@code TerminalOp} implementing the reduction
 380      */
 381     public static TerminalOp<Long, Long>
 382     makeLong(long identity, LongBinaryOperator operator) {
 383         Objects.requireNonNull(operator);
 384         class ReducingSink
 385                 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
 386             private long state;
 387 
 388             @Override
 389             public void begin(long size) {
 390                 state = identity;
 391             }
 392 
 393             @Override
 394             public void accept(long t) {
 395                 state = operator.applyAsLong(state, t);
 396             }
 397 
 398             @Override
 399             public Long get() {
 400                 return state;
 401             }
 402 
 403             @Override
 404             public void combine(ReducingSink other) {
 405                 accept(other.state);
 406             }
 407         }
 408         return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
 409             @Override
 410             public ReducingSink makeSink() {
 411                 return new ReducingSink();
 412             }
 413         };
 414     }
 415 
 416     /**
 417      * Constructs a {@code TerminalOp} that implements a functional reduce on
 418      * {@code long} values, producing an optional long result.
 419      *
 420      * @param operator the combining function
 421      * @return a {@code TerminalOp} implementing the reduction
 422      */
 423     public static TerminalOp<Long, OptionalLong>
 424     makeLong(LongBinaryOperator operator) {
 425         Objects.requireNonNull(operator);
 426         class ReducingSink
 427                 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
 428             private boolean empty;
 429             private long state;
 430 
 431             public void begin(long size) {
 432                 empty = true;
 433                 state = 0;
 434             }
 435 
 436             @Override
 437             public void accept(long t) {
 438                 if (empty) {
 439                     empty = false;
 440                     state = t;
 441                 }
 442                 else {
 443                     state = operator.applyAsLong(state, t);
 444                 }
 445             }
 446 
 447             @Override
 448             public OptionalLong get() {
 449                 return empty ? OptionalLong.empty() : OptionalLong.of(state);
 450             }
 451 
 452             @Override
 453             public void combine(ReducingSink other) {
 454                 if (!other.empty)
 455                     accept(other.state);
 456             }
 457         }
 458         return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
 459             @Override
 460             public ReducingSink makeSink() {
 461                 return new ReducingSink();
 462             }
 463         };
 464     }
 465 
 466     /**
 467      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 468      * {@code long} values.
 469      *
 470      * @param <R> the type of the result
 471      * @param supplier a factory to produce a new accumulator of the result type
 472      * @param accumulator a function to incorporate an int into an
 473      *        accumulator
 474      * @param combiner a function to combine an accumulator into another
 475      * @return a {@code TerminalOp} implementing the reduction
 476      */
 477     public static <R> TerminalOp<Long, R>
 478     makeLong(Supplier<R> supplier,
 479              ObjLongConsumer<R> accumulator,
 480              BinaryOperator<R> combiner) {
 481         Objects.requireNonNull(supplier);
 482         Objects.requireNonNull(accumulator);
 483         Objects.requireNonNull(combiner);
 484         class ReducingSink extends Box<R>
 485                 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
 486             @Override
 487             public void begin(long size) {
 488                 state = supplier.get();
 489             }
 490 
 491             @Override
 492             public void accept(long t) {
 493                 accumulator.accept(state, t);
 494             }
 495 
 496             @Override
 497             public void combine(ReducingSink other) {
 498                 state = combiner.apply(state, other.state);
 499             }
 500         }
 501         return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
 502             @Override
 503             public ReducingSink makeSink() {
 504                 return new ReducingSink();
 505             }
 506         };
 507     }
 508 
 509     /**
 510      * Constructs a {@code TerminalOp} that implements a functional reduce on
 511      * {@code double} values.
 512      *
 513      * @param identity the identity for the combining function
 514      * @param operator the combining function
 515      * @return a {@code TerminalOp} implementing the reduction
 516      */
 517     public static TerminalOp<Double, Double>
 518     makeDouble(double identity, DoubleBinaryOperator operator) {
 519         Objects.requireNonNull(operator);
 520         class ReducingSink
 521                 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
 522             private double state;
 523 
 524             @Override
 525             public void begin(long size) {
 526                 state = identity;
 527             }
 528 
 529             @Override
 530             public void accept(double t) {
 531                 state = operator.applyAsDouble(state, t);
 532             }
 533 
 534             @Override
 535             public Double get() {
 536                 return state;
 537             }
 538 
 539             @Override
 540             public void combine(ReducingSink other) {
 541                 accept(other.state);
 542             }
 543         }
 544         return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 545             @Override
 546             public ReducingSink makeSink() {
 547                 return new ReducingSink();
 548             }
 549         };
 550     }
 551 
 552     /**
 553      * Constructs a {@code TerminalOp} that implements a functional reduce on
 554      * {@code double} values, producing an optional double result.
 555      *
 556      * @param operator the combining function
 557      * @return a {@code TerminalOp} implementing the reduction
 558      */
 559     public static TerminalOp<Double, OptionalDouble>
 560     makeDouble(DoubleBinaryOperator operator) {
 561         Objects.requireNonNull(operator);
 562         class ReducingSink
 563                 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
 564             private boolean empty;
 565             private double state;
 566 
 567             public void begin(long size) {
 568                 empty = true;
 569                 state = 0;
 570             }
 571 
 572             @Override
 573             public void accept(double t) {
 574                 if (empty) {
 575                     empty = false;
 576                     state = t;
 577                 }
 578                 else {
 579                     state = operator.applyAsDouble(state, t);
 580                 }
 581             }
 582 
 583             @Override
 584             public OptionalDouble get() {
 585                 return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
 586             }
 587 
 588             @Override
 589             public void combine(ReducingSink other) {
 590                 if (!other.empty)
 591                     accept(other.state);
 592             }
 593         }
 594         return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 595             @Override
 596             public ReducingSink makeSink() {
 597                 return new ReducingSink();
 598             }
 599         };
 600     }
 601 
 602     /**
 603      * Constructs a {@code TerminalOp} that implements a mutable reduce on
 604      * {@code double} values.
 605      *
 606      * @param <R> the type of the result
 607      * @param supplier a factory to produce a new accumulator of the result type
 608      * @param accumulator a function to incorporate an int into an
 609      *        accumulator
 610      * @param combiner a function to combine an accumulator into another
 611      * @return a {@code TerminalOp} implementing the reduction
 612      */
 613     public static <R> TerminalOp<Double, R>
 614     makeDouble(Supplier<R> supplier,
 615                ObjDoubleConsumer<R> accumulator,
 616                BinaryOperator<R> combiner) {
 617         Objects.requireNonNull(supplier);
 618         Objects.requireNonNull(accumulator);
 619         Objects.requireNonNull(combiner);
 620         class ReducingSink extends Box<R>
 621                 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
 622             @Override
 623             public void begin(long size) {
 624                 state = supplier.get();
 625             }
 626 
 627             @Override
 628             public void accept(double t) {
 629                 accumulator.accept(state, t);
 630             }
 631 
 632             @Override
 633             public void combine(ReducingSink other) {
 634                 state = combiner.apply(state, other.state);
 635             }
 636         }
 637         return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
 638             @Override
 639             public ReducingSink makeSink() {
 640                 return new ReducingSink();
 641             }
 642         };
 643     }
 644 
 645     /**
 646      * A type of {@code TerminalSink} that implements an associative reducing
 647      * operation on elements of type {@code T} and producing a result of type
 648      * {@code R}.
 649      *
 650      * @param <T> the type of input element to the combining operation
 651      * @param <R> the result type
 652      * @param <K> the type of the {@code AccumulatingSink}.
 653      */
 654     private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
 655             extends TerminalSink<T, R> {
 656         public void combine(K other);
 657     }
 658 
 659     /**
 660      * State box for a single state element, used as a base class for
 661      * {@code AccumulatingSink} instances
 662      *
 663      * @param <U> The type of the state element
 664      */
 665     private static abstract class Box<U> {
 666         U state;
 667 
 668         Box() {} // Avoid creation of special accessor
 669 
 670         public U get() {
 671             return state;
 672         }
 673     }
 674 
 675     /**
 676      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
 677      * output into an {@code AccumulatingSink}, which performs a reduce
 678      * operation. The {@code AccumulatingSink} must represent an associative
 679      * reducing operation.
 680      *
 681      * @param <T> the output type of the stream pipeline
 682      * @param <R> the result type of the reducing operation
 683      * @param <S> the type of the {@code AccumulatingSink}
 684      */
 685     private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
 686             implements TerminalOp<T, R> {
 687         private final StreamShape inputShape;
 688 
 689         /**
 690          * Create a {@code ReduceOp} of the specified stream shape which uses
 691          * the specified {@code Supplier} to create accumulating sinks.
 692          *
 693          * @param shape The shape of the stream pipeline
 694          */
 695         ReduceOp(StreamShape shape) {
 696             inputShape = shape;
 697         }
 698 
 699         public abstract S makeSink();
 700 
 701         @Override
 702         public StreamShape inputShape() {
 703             return inputShape;
 704         }
 705 
 706         @Override
 707         public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
 708                                            Spliterator<P_IN> spliterator) {
 709             return helper.wrapAndCopyInto(makeSink(), spliterator).get();
 710         }
 711 
 712         @Override
 713         public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
 714                                          Spliterator<P_IN> spliterator) {
 715             return new ReduceTask<>(this, helper, spliterator).invoke().get();
 716         }
 717     }
 718 
 719     /**
 720      * A {@code ForkJoinTask} for performing a parallel reduce operation.
 721      */
 722     private static final class ReduceTask<P_IN, P_OUT, R,
 723                                           S extends AccumulatingSink<P_OUT, R, S>>
 724             extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
 725         private final ReduceOp<P_OUT, R, S> op;
 726 
 727         ReduceTask(ReduceOp<P_OUT, R, S> op,
 728                    PipelineHelper<P_OUT> helper,
 729                    Spliterator<P_IN> spliterator) {
 730             super(helper, spliterator);
 731             this.op = op;
 732         }
 733 
 734         ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
 735                    Spliterator<P_IN> spliterator) {
 736             super(parent, spliterator);
 737             this.op = parent.op;
 738         }
 739 
 740         @Override
 741         protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
 742             return new ReduceTask<>(this, spliterator);
 743         }
 744 
 745         @Override
 746         protected S doLeaf() {
 747             return helper.wrapAndCopyInto(op.makeSink(), spliterator);
 748         }
 749 
 750         @Override
 751         public void onCompletion(CountedCompleter caller) {
 752             if (!isLeaf()) {
 753                 S leftResult = leftChild.getLocalResult();
 754                 leftResult.combine(rightChild.getLocalResult());
 755                 setLocalResult(leftResult);
 756             }
 757             // GC spliterator, left and right child
 758             super.onCompletion(caller);
 759         }
 760     }
 761 }