1 /*
   2  * Copyright (c) 2013, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.DoubleSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.PrimitiveIterator;
  31 import java.util.Spliterator;
  32 import java.util.Spliterators;
  33 import java.util.function.BiConsumer;
  34 import java.util.function.BinaryOperator;
  35 import java.util.function.DoubleBinaryOperator;
  36 import java.util.function.DoubleConsumer;
  37 import java.util.function.DoubleFunction;
  38 import java.util.function.DoublePredicate;
  39 import java.util.function.DoubleToIntFunction;
  40 import java.util.function.DoubleToLongFunction;
  41 import java.util.function.DoubleUnaryOperator;
  42 import java.util.function.IntFunction;
  43 import java.util.function.ObjDoubleConsumer;
  44 import java.util.function.Supplier;
  45 
  46 /**
  47  * Abstract base class for an intermediate pipeline stage or pipeline source
  48  * stage implementing whose elements are of type {@code double}.
  49  *
  50  * @param <E_IN> type of elements in the upstream source
  51  *
  52  * @since 1.8
  53  */
  54 abstract class DoublePipeline<E_IN>
  55         extends AbstractPipeline<E_IN, Double, DoubleStream>
  56         implements DoubleStream {
  57 
  58     /**
  59      * Constructor for the head of a stream pipeline.
  60      *
  61      * @param source {@code Supplier<Spliterator>} describing the stream source
  62      * @param sourceFlags the source flags for the stream source, described in
  63      * {@link StreamOpFlag}
  64      */
  65     DoublePipeline(Supplier<? extends Spliterator<Double>> source,
  66                    int sourceFlags, boolean parallel) {
  67         super(source, sourceFlags, parallel);
  68     }
  69 
  70     /**
  71      * Constructor for the head of a stream pipeline.
  72      *
  73      * @param source {@code Spliterator} describing the stream source
  74      * @param sourceFlags the source flags for the stream source, described in
  75      * {@link StreamOpFlag}
  76      */
  77     DoublePipeline(Spliterator<Double> source,
  78                    int sourceFlags, boolean parallel) {
  79         super(source, sourceFlags, parallel);
  80     }
  81 
  82     /**
  83      * Constructor for appending an intermediate operation onto an existing
  84      * pipeline.
  85      *
  86      * @param upstream the upstream element source.
  87      * @param opFlags the operation flags
  88      */
  89     DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  90         super(upstream, opFlags);
  91     }
  92 
  93     /**
  94      * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply
  95      * by casting.
  96      */
  97     private static DoubleConsumer adapt(Sink<Double> sink) {
  98         if (sink instanceof DoubleConsumer) {
  99             return (DoubleConsumer) sink;
 100         } else {
 101             if (Tripwire.ENABLED)
 102                 Tripwire.trip(AbstractPipeline.class,
 103                               "using DoubleStream.adapt(Sink<Double> s)");
 104             return sink::accept;
 105         }
 106     }
 107 
 108     /**
 109      * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}.
 110      *
 111      * @implNote
 112      * The implementation attempts to cast to a Spliterator.OfDouble, and throws
 113      * an exception if this cast is not possible.
 114      */
 115     private static Spliterator.OfDouble adapt(Spliterator<Double> s) {
 116         if (s instanceof Spliterator.OfDouble) {
 117             return (Spliterator.OfDouble) s;
 118         } else {
 119             if (Tripwire.ENABLED)
 120                 Tripwire.trip(AbstractPipeline.class,
 121                               "using DoubleStream.adapt(Spliterator<Double> s)");
 122             throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)");
 123         }
 124     }
 125 
 126 
 127     // Shape-specific methods
 128 
 129     @Override
 130     final StreamShape getOutputShape() {
 131         return StreamShape.DOUBLE_VALUE;
 132     }
 133 
 134     @Override
 135     final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper,
 136                                              Spliterator<P_IN> spliterator,
 137                                              boolean flattenTree,
 138                                              IntFunction<Double[]> generator) {
 139         return Nodes.collectDouble(helper, spliterator, flattenTree);
 140     }
 141 
 142     @Override
 143     final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph,
 144                                           Supplier<Spliterator<P_IN>> supplier,
 145                                           boolean isParallel) {
 146         return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel);
 147     }
 148 
 149     @Override
 150     @SuppressWarnings("unchecked")
 151     final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) {
 152         return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier);
 153     }
 154 
 155     @Override
 156     final boolean forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
 157         Spliterator.OfDouble spl = adapt(spliterator);
 158         DoubleConsumer adaptedSink = adapt(sink);
 159         boolean cancelled;
 160         do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
 161         return cancelled;
 162     }
 163 
 164     @Override
 165     final  Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) {
 166         return Nodes.doubleBuilder(exactSizeIfKnown);
 167     }
 168 
 169     private <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper, int opFlags) {
 170         return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE, opFlags) {
 171             @Override
 172             Sink<Double> opWrapSink(int flags, Sink<U> sink) {
 173                 return new Sink.ChainedDouble<U>(sink) {
 174                     @Override
 175                     public void accept(double t) {
 176                         downstream.accept(mapper.apply(t));
 177                     }
 178                 };
 179             }
 180         };
 181     }
 182 
 183     // DoubleStream
 184 
 185     @Override
 186     public final PrimitiveIterator.OfDouble iterator() {
 187         return Spliterators.iterator(spliterator());
 188     }
 189 
 190     @Override
 191     public final Spliterator.OfDouble spliterator() {
 192         return adapt(super.spliterator());
 193     }
 194 
 195     // Stateless intermediate ops from DoubleStream
 196 
 197     @Override
 198     public final Stream<Double> boxed() {
 199         return mapToObj(Double::valueOf, 0);
 200     }
 201 
 202     @Override
 203     public final DoubleStream map(DoubleUnaryOperator mapper) {
 204         Objects.requireNonNull(mapper);
 205         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 206                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 207             @Override
 208             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 209                 return new Sink.ChainedDouble<Double>(sink) {
 210                     @Override
 211                     public void accept(double t) {
 212                         downstream.accept(mapper.applyAsDouble(t));
 213                     }
 214                 };
 215             }
 216         };
 217     }
 218 
 219     @Override
 220     public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
 221         Objects.requireNonNull(mapper);
 222         return mapToObj(mapper, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT);
 223     }
 224 
 225     @Override
 226     public final IntStream mapToInt(DoubleToIntFunction mapper) {
 227         Objects.requireNonNull(mapper);
 228         return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 229                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 230             @Override
 231             Sink<Double> opWrapSink(int flags, Sink<Integer> sink) {
 232                 return new Sink.ChainedDouble<Integer>(sink) {
 233                     @Override
 234                     public void accept(double t) {
 235                         downstream.accept(mapper.applyAsInt(t));
 236                     }
 237                 };
 238             }
 239         };
 240     }
 241 
 242     @Override
 243     public final LongStream mapToLong(DoubleToLongFunction mapper) {
 244         Objects.requireNonNull(mapper);
 245         return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 246                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 247             @Override
 248             Sink<Double> opWrapSink(int flags, Sink<Long> sink) {
 249                 return new Sink.ChainedDouble<Long>(sink) {
 250                     @Override
 251                     public void accept(double t) {
 252                         downstream.accept(mapper.applyAsLong(t));
 253                     }
 254                 };
 255             }
 256         };
 257     }
 258 
 259     @Override
 260     public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
 261         Objects.requireNonNull(mapper);
 262         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 263                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 264             @Override
 265             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 266                 return new Sink.ChainedDouble<Double>(sink) {
 267                     // true if cancellationRequested() has been called
 268                     boolean cancellationRequestedCalled;
 269 
 270                     // cache the consumer to avoid creation on every accepted element
 271                     DoubleConsumer downstreamAsDouble = downstream::accept;
 272 
 273                     @Override
 274                     public void begin(long size) {
 275                         downstream.begin(-1);
 276                     }
 277 
 278                     @Override
 279                     public void accept(double t) {
 280                         try (DoubleStream result = mapper.apply(t)) {
 281                             if (result != null) {
 282                                 if (!cancellationRequestedCalled) {
 283                                     result.sequential().forEach(downstreamAsDouble);
 284                                 }
 285                                 else {
 286                                     var s = result.sequential().spliterator();
 287                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
 288                                 }
 289                             }
 290                         }
 291                     }
 292 
 293                     @Override
 294                     public boolean cancellationRequested() {
 295                         // If this method is called then an operation within the stream
 296                         // pipeline is short-circuiting (see AbstractPipeline.copyInto).
 297                         // Note that we cannot differentiate between an upstream or
 298                         // downstream operation
 299                         cancellationRequestedCalled = true;
 300                         return downstream.cancellationRequested();
 301                     }
 302                 };
 303             }
 304         };
 305     }
 306 
 307     @Override
 308     public DoubleStream unordered() {
 309         if (!isOrdered())
 310             return this;
 311         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) {
 312             @Override
 313             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 314                 return sink;
 315             }
 316         };
 317     }
 318 
 319     @Override
 320     public final DoubleStream filter(DoublePredicate predicate) {
 321         Objects.requireNonNull(predicate);
 322         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 323                                        StreamOpFlag.NOT_SIZED) {
 324             @Override
 325             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 326                 return new Sink.ChainedDouble<Double>(sink) {
 327                     @Override
 328                     public void begin(long size) {
 329                         downstream.begin(-1);
 330                     }
 331 
 332                     @Override
 333                     public void accept(double t) {
 334                         if (predicate.test(t))
 335                             downstream.accept(t);
 336                     }
 337                 };
 338             }
 339         };
 340     }
 341 
 342     @Override
 343     public final DoubleStream peek(DoubleConsumer action) {
 344         Objects.requireNonNull(action);
 345         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 346                                        0) {
 347             @Override
 348             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 349                 return new Sink.ChainedDouble<Double>(sink) {
 350                     @Override
 351                     public void accept(double t) {
 352                         action.accept(t);
 353                         downstream.accept(t);
 354                     }
 355                 };
 356             }
 357         };
 358     }
 359 
 360     // Stateful intermediate ops from DoubleStream
 361 
 362     @Override
 363     public final DoubleStream limit(long maxSize) {
 364         if (maxSize < 0)
 365             throw new IllegalArgumentException(Long.toString(maxSize));
 366         return SliceOps.makeDouble(this, (long) 0, maxSize);
 367     }
 368 
 369     @Override
 370     public final DoubleStream skip(long n) {
 371         if (n < 0)
 372             throw new IllegalArgumentException(Long.toString(n));
 373         if (n == 0)
 374             return this;
 375         else {
 376             long limit = -1;
 377             return SliceOps.makeDouble(this, n, limit);
 378         }
 379     }
 380 
 381     @Override
 382     public final DoubleStream takeWhile(DoublePredicate predicate) {
 383         return WhileOps.makeTakeWhileDouble(this, predicate);
 384     }
 385 
 386     @Override
 387     public final DoubleStream dropWhile(DoublePredicate predicate) {
 388         return WhileOps.makeDropWhileDouble(this, predicate);
 389     }
 390 
 391     @Override
 392     public final DoubleStream sorted() {
 393         return SortedOps.makeDouble(this);
 394     }
 395 
 396     @Override
 397     public final DoubleStream distinct() {
 398         // While functional and quick to implement, this approach is not very efficient.
 399         // An efficient version requires a double-specific map/set implementation.
 400         return boxed().distinct().mapToDouble(i -> (double) i);
 401     }
 402 
 403     // Terminal ops from DoubleStream
 404 
 405     @Override
 406     public void forEach(DoubleConsumer consumer) {
 407         evaluate(ForEachOps.makeDouble(consumer, false));
 408     }
 409 
 410     @Override
 411     public void forEachOrdered(DoubleConsumer consumer) {
 412         evaluate(ForEachOps.makeDouble(consumer, true));
 413     }
 414 
 415     @Override
 416     public final double sum() {
 417         /*
 418          * In the arrays allocated for the collect operation, index 0
 419          * holds the high-order bits of the running sum, index 1 holds
 420          * the (negative) low-order bits of the sum computed via
 421          * compensated summation, and index 2 holds the simple sum used
 422          * to compute the proper result if the stream contains infinite
 423          * values of the same sign.
 424          */
 425         double[] summation = collect(() -> new double[3],
 426                                (ll, d) -> {
 427                                    Collectors.sumWithCompensation(ll, d);
 428                                    ll[2] += d;
 429                                },
 430                                (ll, rr) -> {
 431                                    Collectors.sumWithCompensation(ll, rr[0]);
 432                                    Collectors.sumWithCompensation(ll, -rr[1]);
 433                                    ll[2] += rr[2];
 434                                });
 435 
 436         return Collectors.computeFinalSum(summation);
 437     }
 438 
 439     @Override
 440     public final OptionalDouble min() {
 441         return reduce(Math::min);
 442     }
 443 
 444     @Override
 445     public final OptionalDouble max() {
 446         return reduce(Math::max);
 447     }
 448 
 449     /**
 450      * {@inheritDoc}
 451      *
 452      * @implNote The {@code double} format can represent all
 453      * consecutive integers in the range -2<sup>53</sup> to
 454      * 2<sup>53</sup>. If the pipeline has more than 2<sup>53</sup>
 455      * values, the divisor in the average computation will saturate at
 456      * 2<sup>53</sup>, leading to additional numerical errors.
 457      */
 458     @Override
 459     public final OptionalDouble average() {
 460         /*
 461          * In the arrays allocated for the collect operation, index 0
 462          * holds the high-order bits of the running sum, index 1 holds
 463          * the (negative) low-order bits of the sum computed via
 464          * compensated summation, index 2 holds the number of values
 465          * seen, index 3 holds the simple sum.
 466          */
 467         double[] avg = collect(() -> new double[4],
 468                                (ll, d) -> {
 469                                    ll[2]++;
 470                                    Collectors.sumWithCompensation(ll, d);
 471                                    ll[3] += d;
 472                                },
 473                                (ll, rr) -> {
 474                                    Collectors.sumWithCompensation(ll, rr[0]);
 475                                    Collectors.sumWithCompensation(ll, -rr[1]);
 476                                    ll[2] += rr[2];
 477                                    ll[3] += rr[3];
 478                                });
 479         return avg[2] > 0
 480             ? OptionalDouble.of(Collectors.computeFinalSum(avg) / avg[2])
 481             : OptionalDouble.empty();
 482     }
 483 
 484     @Override
 485     public final long count() {
 486         return evaluate(ReduceOps.makeDoubleCounting());
 487     }
 488 
 489     @Override
 490     public final DoubleSummaryStatistics summaryStatistics() {
 491         return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept,
 492                        DoubleSummaryStatistics::combine);
 493     }
 494 
 495     @Override
 496     public final double reduce(double identity, DoubleBinaryOperator op) {
 497         return evaluate(ReduceOps.makeDouble(identity, op));
 498     }
 499 
 500     @Override
 501     public final OptionalDouble reduce(DoubleBinaryOperator op) {
 502         return evaluate(ReduceOps.makeDouble(op));
 503     }
 504 
 505     @Override
 506     public final <R> R collect(Supplier<R> supplier,
 507                                ObjDoubleConsumer<R> accumulator,
 508                                BiConsumer<R, R> combiner) {
 509         Objects.requireNonNull(combiner);
 510         BinaryOperator<R> operator = (left, right) -> {
 511             combiner.accept(left, right);
 512             return left;
 513         };
 514         return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator));
 515     }
 516 
 517     @Override
 518     public final boolean anyMatch(DoublePredicate predicate) {
 519         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY));
 520     }
 521 
 522     @Override
 523     public final boolean allMatch(DoublePredicate predicate) {
 524         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL));
 525     }
 526 
 527     @Override
 528     public final boolean noneMatch(DoublePredicate predicate) {
 529         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE));
 530     }
 531 
 532     @Override
 533     public final OptionalDouble findFirst() {
 534         return evaluate(FindOps.makeDouble(true));
 535     }
 536 
 537     @Override
 538     public final OptionalDouble findAny() {
 539         return evaluate(FindOps.makeDouble(false));
 540     }
 541 
 542     @Override
 543     public final double[] toArray() {
 544         return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new))
 545                         .asPrimitiveArray();
 546     }
 547 
 548     //
 549 
 550     /**
 551      * Source stage of a DoubleStream
 552      *
 553      * @param <E_IN> type of elements in the upstream source
 554      */
 555     static class Head<E_IN> extends DoublePipeline<E_IN> {
 556         /**
 557          * Constructor for the source stage of a DoubleStream.
 558          *
 559          * @param source {@code Supplier<Spliterator>} describing the stream
 560          *               source
 561          * @param sourceFlags the source flags for the stream source, described
 562          *                    in {@link StreamOpFlag}
 563          * @param parallel {@code true} if the pipeline is parallel
 564          */
 565         Head(Supplier<? extends Spliterator<Double>> source,
 566              int sourceFlags, boolean parallel) {
 567             super(source, sourceFlags, parallel);
 568         }
 569 
 570         /**
 571          * Constructor for the source stage of a DoubleStream.
 572          *
 573          * @param source {@code Spliterator} describing the stream source
 574          * @param sourceFlags the source flags for the stream source, described
 575          *                    in {@link StreamOpFlag}
 576          * @param parallel {@code true} if the pipeline is parallel
 577          */
 578         Head(Spliterator<Double> source,
 579              int sourceFlags, boolean parallel) {
 580             super(source, sourceFlags, parallel);
 581         }
 582 
 583         @Override
 584         final boolean opIsStateful() {
 585             throw new UnsupportedOperationException();
 586         }
 587 
 588         @Override
 589         final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) {
 590             throw new UnsupportedOperationException();
 591         }
 592 
 593         // Optimized sequential terminal operations for the head of the pipeline
 594 
 595         @Override
 596         public void forEach(DoubleConsumer consumer) {
 597             if (!isParallel()) {
 598                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
 599             }
 600             else {
 601                 super.forEach(consumer);
 602             }
 603         }
 604 
 605         @Override
 606         public void forEachOrdered(DoubleConsumer consumer) {
 607             if (!isParallel()) {
 608                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
 609             }
 610             else {
 611                 super.forEachOrdered(consumer);
 612             }
 613         }
 614 
 615     }
 616 
 617     /**
 618      * Base class for a stateless intermediate stage of a DoubleStream.
 619      *
 620      * @param <E_IN> type of elements in the upstream source
 621      * @since 1.8
 622      */
 623     abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> {
 624         /**
 625          * Construct a new DoubleStream by appending a stateless intermediate
 626          * operation to an existing stream.
 627          *
 628          * @param upstream the upstream pipeline stage
 629          * @param inputShape the stream shape for the upstream pipeline stage
 630          * @param opFlags operation flags for the new stage
 631          */
 632         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 633                     StreamShape inputShape,
 634                     int opFlags) {
 635             super(upstream, opFlags);
 636             assert upstream.getOutputShape() == inputShape;
 637         }
 638 
 639         @Override
 640         final boolean opIsStateful() {
 641             return false;
 642         }
 643     }
 644 
 645     /**
 646      * Base class for a stateful intermediate stage of a DoubleStream.
 647      *
 648      * @param <E_IN> type of elements in the upstream source
 649      * @since 1.8
 650      */
 651     abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> {
 652         /**
 653          * Construct a new DoubleStream by appending a stateful intermediate
 654          * operation to an existing stream.
 655          *
 656          * @param upstream the upstream pipeline stage
 657          * @param inputShape the stream shape for the upstream pipeline stage
 658          * @param opFlags operation flags for the new stage
 659          */
 660         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 661                    StreamShape inputShape,
 662                    int opFlags) {
 663             super(upstream, opFlags);
 664             assert upstream.getOutputShape() == inputShape;
 665         }
 666 
 667         @Override
 668         final boolean opIsStateful() {
 669             return true;
 670         }
 671 
 672         @Override
 673         abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 674                                                         Spliterator<P_IN> spliterator,
 675                                                         IntFunction<Double[]> generator);
 676     }
 677 }