1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Comparator;
  28 import java.util.Iterator;
  29 import java.util.Objects;
  30 import java.util.Optional;
  31 import java.util.Spliterator;
  32 import java.util.Spliterators;
  33 import java.util.function.BiConsumer;
  34 import java.util.function.BiFunction;
  35 import java.util.function.BinaryOperator;
  36 import java.util.function.Consumer;
  37 import java.util.function.DoubleConsumer;
  38 import java.util.function.Function;
  39 import java.util.function.IntConsumer;
  40 import java.util.function.IntFunction;
  41 import java.util.function.LongConsumer;
  42 import java.util.function.Predicate;
  43 import java.util.function.Supplier;
  44 import java.util.function.ToDoubleFunction;
  45 import java.util.function.ToIntFunction;
  46 import java.util.function.ToLongFunction;
  47 
  48 /**
  49  * Abstract base class for an intermediate pipeline stage or pipeline source
  50  * stage implementing whose elements are of type {@code U}.
  51  *
  52  * @param <P_IN> type of elements in the upstream source
  53  * @param <P_OUT> type of elements in produced by this stage
  54  *
  55  * @since 1.8
  56  */
  57 abstract class ReferencePipeline<P_IN, P_OUT>
  58         extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
  59         implements Stream<P_OUT>  {
  60 
  61     /**
  62      * Constructor for the head of a stream pipeline.
  63      *
  64      * @param source {@code Supplier<Spliterator>} describing the stream source
  65      * @param sourceFlags the source flags for the stream source, described in
  66      *        {@link StreamOpFlag}
  67      * @param parallel {@code true} if the pipeline is parallel
  68      */
  69     ReferencePipeline(Supplier<? extends Spliterator<?>> source,
  70                       int sourceFlags, boolean parallel) {
  71         super(source, sourceFlags, parallel);
  72     }
  73 
  74     /**
  75      * Constructor for the head of a stream pipeline.
  76      *
  77      * @param source {@code Spliterator} describing the stream source
  78      * @param sourceFlags The source flags for the stream source, described in
  79      *        {@link StreamOpFlag}
  80      * @param parallel {@code true} if the pipeline is parallel
  81      */
  82     ReferencePipeline(Spliterator<?> source,
  83                       int sourceFlags, boolean parallel) {
  84         super(source, sourceFlags, parallel);
  85     }
  86 
  87     /**
  88      * Constructor for appending an intermediate operation onto an existing
  89      * pipeline.
  90      *
  91      * @param upstream the upstream element source.
  92      */
  93     ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
  94         super(upstream, opFlags);
  95     }
  96 
  97     // Shape-specific methods
  98 
  99     @Override
 100     final StreamShape getOutputShape() {
 101         return StreamShape.REFERENCE;
 102     }
 103 
 104     @Override
 105     final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
 106                                         Spliterator<P_IN> spliterator,
 107                                         boolean flattenTree,
 108                                         IntFunction<P_OUT[]> generator) {
 109         return Nodes.collect(helper, spliterator, flattenTree, generator);
 110     }
 111 
 112     @Override
 113     final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
 114                                      Supplier<Spliterator<P_IN>> supplier,
 115                                      boolean isParallel) {
 116         return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
 117     }
 118 
 119     @Override
 120     final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
 121         return new StreamSpliterators.DelegatingSpliterator<>(supplier);
 122     }
 123 
 124     @Override
 125     final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
 126         do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
 127     }
 128 
 129     @Override
 130     final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
 131         return Nodes.builder(exactSizeIfKnown, generator);
 132     }
 133 
 134 
 135     // BaseStream
 136 
 137     @Override
 138     public final Iterator<P_OUT> iterator() {
 139         return Spliterators.iterator(spliterator());
 140     }
 141 
 142 
 143     // Stream
 144 
 145     // Stateless intermediate operations from Stream
 146 
 147     @Override
 148     public Stream<P_OUT> unordered() {
 149         if (!isOrdered())
 150             return this;
 151         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
 152             @Override
 153             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 154                 return sink;
 155             }
 156         };
 157     }
 158 
 159     @Override
 160     public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
 161         Objects.requireNonNull(predicate);
 162         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
 163                                      StreamOpFlag.NOT_SIZED) {
 164             @Override
 165             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 166                 return new Sink.ChainedReference<P_OUT>(sink) {
 167                     @Override
 168                     public void begin(long size) {
 169                         downstream.begin(-1);
 170                     }
 171 
 172                     @Override
 173                     public void accept(P_OUT u) {
 174                         if (predicate.test(u))
 175                             downstream.accept(u);
 176                     }
 177                 };
 178             }
 179         };
 180     }
 181 
 182     @Override
 183     public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
 184         Objects.requireNonNull(mapper);
 185         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
 186                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 187             @Override
 188             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
 189                 return new Sink.ChainedReference<P_OUT>(sink) {
 190                     @Override
 191                     public void accept(P_OUT u) {
 192                         downstream.accept(mapper.apply(u));
 193                     }
 194                 };
 195             }
 196         };
 197     }
 198 
 199     @Override
 200     public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
 201         Objects.requireNonNull(mapper);
 202         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 203                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 204             @Override
 205             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
 206                 return new Sink.ChainedReference<P_OUT>(sink) {
 207                     @Override
 208                     public void accept(P_OUT u) {
 209                         downstream.accept(mapper.applyAsInt(u));
 210                     }
 211                 };
 212             }
 213         };
 214     }
 215 
 216     @Override
 217     public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
 218         Objects.requireNonNull(mapper);
 219         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 220                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 221             @Override
 222             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
 223                 return new Sink.ChainedReference<P_OUT>(sink) {
 224                     @Override
 225                     public void accept(P_OUT u) {
 226                         downstream.accept(mapper.applyAsLong(u));
 227                     }
 228                 };
 229             }
 230         };
 231     }
 232 
 233     @Override
 234     public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) {
 235         Objects.requireNonNull(mapper);
 236         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 237                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 238             @Override
 239             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
 240                 return new Sink.ChainedReference<P_OUT>(sink) {
 241                     @Override
 242                     public void accept(P_OUT u) {
 243                         downstream.accept(mapper.applyAsDouble(u));
 244                     }
 245                 };
 246             }
 247         };
 248     }
 249 
 250     @Override
 251     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
 252         Objects.requireNonNull(mapper);
 253         // We can do better than this, by polling cancellationRequested when stream is infinite
 254         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
 255                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 256             @Override
 257             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
 258                 return new Sink.ChainedReference<P_OUT>(sink) {
 259                     @Override
 260                     public void begin(long size) {
 261                         downstream.begin(-1);
 262                     }
 263 
 264                     @Override
 265                     public void accept(P_OUT u) {
 266                         try (Stream<? extends R> result = mapper.apply(u)) {
 267                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 268                             if (result != null)
 269                                 result.sequential().forEach(downstream);
 270                         }
 271                     }
 272                 };
 273             }
 274         };
 275     }
 276 
 277     @Override
 278     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
 279         Objects.requireNonNull(mapper);
 280         // We can do better than this, by polling cancellationRequested when stream is infinite
 281         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 282                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 283             @Override
 284             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
 285                 return new Sink.ChainedReference<P_OUT>(sink) {
 286                     IntConsumer downstreamAsInt = downstream::accept;
 287                     @Override
 288                     public void begin(long size) {
 289                         downstream.begin(-1);
 290                     }
 291 
 292                     @Override
 293                     public void accept(P_OUT u) {
 294                         try (IntStream result = mapper.apply(u)) {
 295                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 296                             if (result != null)
 297                                 result.sequential().forEach(downstreamAsInt);
 298                         }
 299                     }
 300                 };
 301             }
 302         };
 303     }
 304 
 305     @Override
 306     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
 307         Objects.requireNonNull(mapper);
 308         // We can do better than this, by polling cancellationRequested when stream is infinite
 309         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 310                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 311             @Override
 312             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
 313                 return new Sink.ChainedReference<P_OUT>(sink) {
 314                     DoubleConsumer downstreamAsDouble = downstream::accept;
 315                     @Override
 316                     public void begin(long size) {
 317                         downstream.begin(-1);
 318                     }
 319 
 320                     @Override
 321                     public void accept(P_OUT u) {
 322                         try (DoubleStream result = mapper.apply(u)) {
 323                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 324                             if (result != null)
 325                                 result.sequential().forEach(downstreamAsDouble);
 326                         }
 327                     }
 328                 };
 329             }
 330         };
 331     }
 332 
 333     @Override
 334     public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
 335         Objects.requireNonNull(mapper);
 336         // We can do better than this, by polling cancellationRequested when stream is infinite
 337         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 338                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 339             @Override
 340             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
 341                 return new Sink.ChainedReference<P_OUT>(sink) {
 342                     LongConsumer downstreamAsLong = downstream::accept;
 343                     @Override
 344                     public void begin(long size) {
 345                         downstream.begin(-1);
 346                     }
 347 
 348                     @Override
 349                     public void accept(P_OUT u) {
 350                         try (LongStream result = mapper.apply(u)) {
 351                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 352                             if (result != null)
 353                                 result.sequential().forEach(downstreamAsLong);
 354                         }
 355                     }
 356                 };
 357             }
 358         };
 359     }
 360 
 361     @Override
 362     public final Stream<P_OUT> peek(Consumer<? super P_OUT> tee) {
 363         Objects.requireNonNull(tee);
 364         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
 365                                      0) {
 366             @Override
 367             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 368                 return new Sink.ChainedReference<P_OUT>(sink) {
 369                     @Override
 370                     public void accept(P_OUT u) {
 371                         tee.accept(u);
 372                         downstream.accept(u);
 373                     }
 374                 };
 375             }
 376         };
 377     }
 378 
 379     // Stateful intermediate operations from Stream
 380 
 381     @Override
 382     public final Stream<P_OUT> distinct() {
 383         return DistinctOps.makeRef(this);
 384     }
 385 
 386     @Override
 387     public final Stream<P_OUT> sorted() {
 388         return SortedOps.makeRef(this);
 389     }
 390 
 391     @Override
 392     public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
 393         return SortedOps.makeRef(this, comparator);
 394     }
 395 
 396     private Stream<P_OUT> slice(long skip, long limit) {
 397         return SliceOps.makeRef(this, skip, limit);
 398     }
 399 
 400     @Override
 401     public final Stream<P_OUT> limit(long maxSize) {
 402         if (maxSize < 0)
 403             throw new IllegalArgumentException(Long.toString(maxSize));
 404         return slice(0, maxSize);
 405     }
 406 
 407     @Override
 408     public final Stream<P_OUT> substream(long startingOffset) {
 409         if (startingOffset < 0)
 410             throw new IllegalArgumentException(Long.toString(startingOffset));
 411         if (startingOffset == 0)
 412             return this;
 413         else
 414             return slice(startingOffset, -1);
 415     }
 416 
 417     @Override
 418     public final Stream<P_OUT> substream(long startingOffset, long endingOffset) {
 419         if (startingOffset < 0 || endingOffset < startingOffset)
 420             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
 421         return slice(startingOffset, endingOffset - startingOffset);
 422     }
 423 
 424     // Terminal operations from Stream
 425 
 426     @Override
 427     public void forEach(Consumer<? super P_OUT> action) {
 428         evaluate(ForEachOps.makeRef(action, false));
 429     }
 430 
 431     @Override
 432     public void forEachOrdered(Consumer<? super P_OUT> action) {
 433         evaluate(ForEachOps.makeRef(action, true));
 434     }
 435 
 436     @Override
 437     @SuppressWarnings("unchecked")
 438     public final <A> A[] toArray(IntFunction<A[]> generator) {
 439         // Since A has no relation to U (not possible to declare that A is an upper bound of U)
 440         // there will be no static type checking.
 441         // Therefore use a raw type and assume A == U rather than propagating the separation of A and U
 442         // throughout the code-base.
 443         // The runtime type of U is never checked for equality with the component type of the runtime type of A[].
 444         // Runtime checking will be performed when an element is stored in A[], thus if A is not a
 445         // super type of U an ArrayStoreException will be thrown.
 446         IntFunction rawGenerator = (IntFunction) generator;
 447         return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator)
 448                               .asArray(rawGenerator);
 449     }
 450 
 451     @Override
 452     public final Object[] toArray() {
 453         return toArray(Object[]::new);
 454     }
 455 
 456     @Override
 457     public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
 458         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
 459     }
 460 
 461     @Override
 462     public final boolean allMatch(Predicate<? super P_OUT> predicate) {
 463         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
 464     }
 465 
 466     @Override
 467     public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
 468         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
 469     }
 470 
 471     @Override
 472     public final Optional<P_OUT> findFirst() {
 473         return evaluate(FindOps.makeRef(true));
 474     }
 475 
 476     @Override
 477     public final Optional<P_OUT> findAny() {
 478         return evaluate(FindOps.makeRef(false));
 479     }
 480 
 481     @Override
 482     public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
 483         return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
 484     }
 485 
 486     @Override
 487     public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
 488         return evaluate(ReduceOps.makeRef(accumulator));
 489     }
 490 
 491     @Override
 492     public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
 493         return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
 494     }
 495 
 496     @Override
 497     public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
 498         A container;
 499         if (isParallel()
 500                 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
 501                 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
 502             container = collector.supplier().get();
 503             BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
 504             forEach(u -> accumulator.accept(container, u));
 505         }
 506         else {
 507             container = evaluate(ReduceOps.makeRef(collector));
 508         }
 509         return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
 510                ? (R) container
 511                : collector.finisher().apply(container);
 512     }
 513 
 514     @Override
 515     public final <R> R collect(Supplier<R> resultFactory,
 516                                BiConsumer<R, ? super P_OUT> accumulator,
 517                                BiConsumer<R, R> combiner) {
 518         return evaluate(ReduceOps.makeRef(resultFactory, accumulator, combiner));
 519     }
 520 
 521     @Override
 522     public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
 523         return reduce(BinaryOperator.maxBy(comparator));
 524     }
 525 
 526     @Override
 527     public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
 528         return reduce(BinaryOperator.minBy(comparator));
 529 
 530     }
 531 
 532     @Override
 533     public final long count() {
 534         return mapToLong(e -> 1L).sum();
 535     }
 536 
 537 
 538     //
 539 
 540     /**
 541      * Source stage of a ReferencePipeline.
 542      *
 543      * @param <E_IN> type of elements in the upstream source
 544      * @param <E_OUT> type of elements in produced by this stage
 545      * @since 1.8
 546      */
 547     static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
 548         /**
 549          * Constructor for the source stage of a Stream.
 550          *
 551          * @param source {@code Supplier<Spliterator>} describing the stream
 552          *               source
 553          * @param sourceFlags the source flags for the stream source, described
 554          *                    in {@link StreamOpFlag}
 555          */
 556         Head(Supplier<? extends Spliterator<?>> source,
 557              int sourceFlags, boolean parallel) {
 558             super(source, sourceFlags, parallel);
 559         }
 560 
 561         /**
 562          * Constructor for the source stage of a Stream.
 563          *
 564          * @param source {@code Spliterator} describing the stream source
 565          * @param sourceFlags the source flags for the stream source, described
 566          *                    in {@link StreamOpFlag}
 567          */
 568         Head(Spliterator<?> source,
 569              int sourceFlags, boolean parallel) {
 570             super(source, sourceFlags, parallel);
 571         }
 572 
 573         @Override
 574         final boolean opIsStateful() {
 575             throw new UnsupportedOperationException();
 576         }
 577 
 578         @Override
 579         final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
 580             throw new UnsupportedOperationException();
 581         }
 582 
 583         // Optimized sequential terminal operations for the head of the pipeline
 584 
 585         @Override
 586         public void forEach(Consumer<? super E_OUT> action) {
 587             if (!isParallel()) {
 588                 sourceStageSpliterator().forEachRemaining(action);
 589             }
 590             else {
 591                 super.forEach(action);
 592             }
 593         }
 594 
 595         @Override
 596         public void forEachOrdered(Consumer<? super E_OUT> action) {
 597             if (!isParallel()) {
 598                 sourceStageSpliterator().forEachRemaining(action);
 599             }
 600             else {
 601                 super.forEachOrdered(action);
 602             }
 603         }
 604     }
 605 
 606     /**
 607      * Base class for a stateless intermediate stage of a Stream.
 608      *
 609      * @param <E_IN> type of elements in the upstream source
 610      * @param <E_OUT> type of elements in produced by this stage
 611      * @since 1.8
 612      */
 613     abstract static class StatelessOp<E_IN, E_OUT>
 614             extends ReferencePipeline<E_IN, E_OUT> {
 615         /**
 616          * Construct a new Stream by appending a stateless intermediate
 617          * operation to an existing stream.
 618          *
 619          * @param upstream The upstream pipeline stage
 620          * @param inputShape The stream shape for the upstream pipeline stage
 621          * @param opFlags Operation flags for the new stage
 622          */
 623         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 624                     StreamShape inputShape,
 625                     int opFlags) {
 626             super(upstream, opFlags);
 627             assert upstream.getOutputShape() == inputShape;
 628         }
 629 
 630         @Override
 631         final boolean opIsStateful() {
 632             return false;
 633         }
 634     }
 635 
 636     /**
 637      * Base class for a stateful intermediate stage of a Stream.
 638      *
 639      * @param <E_IN> type of elements in the upstream source
 640      * @param <E_OUT> type of elements in produced by this stage
 641      * @since 1.8
 642      */
 643     abstract static class StatefulOp<E_IN, E_OUT>
 644             extends ReferencePipeline<E_IN, E_OUT> {
 645         /**
 646          * Construct a new Stream by appending a stateful intermediate operation
 647          * to an existing stream.
 648          * @param upstream The upstream pipeline stage
 649          * @param inputShape The stream shape for the upstream pipeline stage
 650          * @param opFlags Operation flags for the new stage
 651          */
 652         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 653                    StreamShape inputShape,
 654                    int opFlags) {
 655             super(upstream, opFlags);
 656             assert upstream.getOutputShape() == inputShape;
 657         }
 658 
 659         @Override
 660         final boolean opIsStateful() {
 661             return true;
 662         }
 663 
 664         @Override
 665         abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
 666                                                        Spliterator<P_IN> spliterator,
 667                                                        IntFunction<E_OUT[]> generator);
 668     }
 669 }