1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Comparator;
  28 import java.util.Comparators;
  29 import java.util.Iterator;
  30 import java.util.Objects;
  31 import java.util.Optional;
  32 import java.util.Spliterator;
  33 import java.util.Spliterators;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BiFunction;
  36 import java.util.function.BinaryOperator;
  37 import java.util.function.Consumer;
  38 import java.util.function.DoubleConsumer;
  39 import java.util.function.Function;
  40 import java.util.function.IntConsumer;
  41 import java.util.function.IntFunction;
  42 import java.util.function.LongConsumer;
  43 import java.util.function.Predicate;
  44 import java.util.function.Supplier;
  45 import java.util.function.ToDoubleFunction;
  46 import java.util.function.ToIntFunction;
  47 import java.util.function.ToLongFunction;
  48 
  49 /**
  50  * Abstract base class for an intermediate pipeline stage or pipeline source
  51  * stage implementing whose elements are of type {@code U}.
  52  *
  53  * @param <P_IN> type of elements in the upstream source
  54  * @param <P_OUT> type of elements in produced by this stage
  55  *
  56  * @since 1.8
  57  */
  58 abstract class ReferencePipeline<P_IN, P_OUT>
  59         extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
  60         implements Stream<P_OUT>  {
  61 
  62     /**
  63      * Constructor for the head of a stream pipeline.
  64      *
  65      * @param source {@code Supplier<Spliterator>} describing the stream source
  66      * @param sourceFlags the source flags for the stream source, described in
  67      *        {@link StreamOpFlag}
  68      * @param parallel {@code true} if the pipeline is parallel
  69      */
  70     ReferencePipeline(Supplier<? extends Spliterator<?>> source,
  71                       int sourceFlags, boolean parallel) {
  72         super(source, sourceFlags, parallel);
  73     }
  74 
  75     /**
  76      * Constructor for the head of a stream pipeline.
  77      *
  78      * @param source {@code Spliterator} describing the stream source
  79      * @param sourceFlags The source flags for the stream source, described in
  80      *        {@link StreamOpFlag}
  81      * @param parallel {@code true} if the pipeline is parallel
  82      */
  83     ReferencePipeline(Spliterator<?> source,
  84                       int sourceFlags, boolean parallel) {
  85         super(source, sourceFlags, parallel);
  86     }
  87 
  88     /**
  89      * Constructor for appending an intermediate operation onto an existing
  90      * pipeline.
  91      *
  92      * @param upstream the upstream element source.
  93      */
  94     ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
  95         super(upstream, opFlags);
  96     }
  97 
  98     // Shape-specific methods
  99 
 100     @Override
 101     final StreamShape getOutputShape() {
 102         return StreamShape.REFERENCE;
 103     }
 104 
 105     @Override
 106     final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
 107                                         Spliterator<P_IN> spliterator,
 108                                         boolean flattenTree,
 109                                         IntFunction<P_OUT[]> generator) {
 110         return Nodes.collect(helper, spliterator, flattenTree, generator);
 111     }
 112 
 113     @Override
 114     final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
 115                                      Supplier<Spliterator<P_IN>> supplier,
 116                                      boolean isParallel) {
 117         return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
 118     }
 119 
 120     @Override
 121     final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
 122         return new StreamSpliterators.DelegatingSpliterator<>(supplier);
 123     }
 124 
 125     @Override
 126     final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
 127         do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
 128     }
 129 
 130     @Override
 131     final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
 132         return Nodes.builder(exactSizeIfKnown, generator);
 133     }
 134 
 135 
 136     // BaseStream
 137 
 138     @Override
 139     public final Iterator<P_OUT> iterator() {
 140         return Spliterators.iterator(spliterator());
 141     }
 142 
 143 
 144     // Stream
 145 
 146     // Stateless intermediate operations from Stream
 147 
 148     @Override
 149     public Stream<P_OUT> unordered() {
 150         if (!isOrdered())
 151             return this;
 152         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
 153             @Override
 154             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 155                 return sink;
 156             }
 157         };
 158     }
 159 
 160     @Override
 161     public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
 162         Objects.requireNonNull(predicate);
 163         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
 164                                      StreamOpFlag.NOT_SIZED) {
 165             @Override
 166             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 167                 return new Sink.ChainedReference<P_OUT>(sink) {
 168                     @Override
 169                     public void begin(long size) {
 170                         downstream.begin(-1);
 171                     }
 172 
 173                     @Override
 174                     public void accept(P_OUT u) {
 175                         if (predicate.test(u))
 176                             downstream.accept(u);
 177                     }
 178                 };
 179             }
 180         };
 181     }
 182 
 183     @Override
 184     public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
 185         Objects.requireNonNull(mapper);
 186         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
 187                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 188             @Override
 189             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
 190                 return new Sink.ChainedReference<P_OUT>(sink) {
 191                     @Override
 192                     public void accept(P_OUT u) {
 193                         downstream.accept(mapper.apply(u));
 194                     }
 195                 };
 196             }
 197         };
 198     }
 199 
 200     @Override
 201     public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
 202         Objects.requireNonNull(mapper);
 203         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 204                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 205             @Override
 206             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
 207                 return new Sink.ChainedReference<P_OUT>(sink) {
 208                     @Override
 209                     public void accept(P_OUT u) {
 210                         downstream.accept(mapper.applyAsInt(u));
 211                     }
 212                 };
 213             }
 214         };
 215     }
 216 
 217     @Override
 218     public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
 219         Objects.requireNonNull(mapper);
 220         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 221                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 222             @Override
 223             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
 224                 return new Sink.ChainedReference<P_OUT>(sink) {
 225                     @Override
 226                     public void accept(P_OUT u) {
 227                         downstream.accept(mapper.applyAsLong(u));
 228                     }
 229                 };
 230             }
 231         };
 232     }
 233 
 234     @Override
 235     public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) {
 236         Objects.requireNonNull(mapper);
 237         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 238                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 239             @Override
 240             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
 241                 return new Sink.ChainedReference<P_OUT>(sink) {
 242                     @Override
 243                     public void accept(P_OUT u) {
 244                         downstream.accept(mapper.applyAsDouble(u));
 245                     }
 246                 };
 247             }
 248         };
 249     }
 250 
 251     @Override
 252     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
 253         Objects.requireNonNull(mapper);
 254         // We can do better than this, by polling cancellationRequested when stream is infinite
 255         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
 256                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 257             @Override
 258             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
 259                 return new Sink.ChainedReference<P_OUT>(sink) {
 260                     @Override
 261                     public void begin(long size) {
 262                         downstream.begin(-1);
 263                     }
 264 
 265                     @Override
 266                     public void accept(P_OUT u) {
 267                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 268                         Stream<? extends R> result = mapper.apply(u);
 269                         if (result != null)
 270                             result.sequential().forEach(downstream);
 271                     }
 272                 };
 273             }
 274         };
 275     }
 276 
 277     @Override
 278     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
 279         Objects.requireNonNull(mapper);
 280         // We can do better than this, by polling cancellationRequested when stream is infinite
 281         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 282                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 283             @Override
 284             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
 285                 return new Sink.ChainedReference<P_OUT>(sink) {
 286                     IntConsumer downstreamAsInt = downstream::accept;
 287                     @Override
 288                     public void begin(long size) {
 289                         downstream.begin(-1);
 290                     }
 291 
 292                     @Override
 293                     public void accept(P_OUT u) {
 294                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 295                         IntStream result = mapper.apply(u);
 296                         if (result != null)
 297                             result.sequential().forEach(downstreamAsInt);
 298                     }
 299                 };
 300             }
 301         };
 302     }
 303 
 304     @Override
 305     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
 306         Objects.requireNonNull(mapper);
 307         // We can do better than this, by polling cancellationRequested when stream is infinite
 308         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 309                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 310             @Override
 311             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
 312                 return new Sink.ChainedReference<P_OUT>(sink) {
 313                     DoubleConsumer downstreamAsDouble = downstream::accept;
 314                     @Override
 315                     public void begin(long size) {
 316                         downstream.begin(-1);
 317                     }
 318 
 319                     @Override
 320                     public void accept(P_OUT u) {
 321                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 322                         DoubleStream result = mapper.apply(u);
 323                         if (result != null)
 324                             result.sequential().forEach(downstreamAsDouble);
 325                     }
 326                 };
 327             }
 328         };
 329     }
 330 
 331     @Override
 332     public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
 333         Objects.requireNonNull(mapper);
 334         // We can do better than this, by polling cancellationRequested when stream is infinite
 335         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 336                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 337             @Override
 338             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
 339                 return new Sink.ChainedReference<P_OUT>(sink) {
 340                     LongConsumer downstreamAsLong = downstream::accept;
 341                     @Override
 342                     public void begin(long size) {
 343                         downstream.begin(-1);
 344                     }
 345 
 346                     @Override
 347                     public void accept(P_OUT u) {
 348                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 349                         LongStream result = mapper.apply(u);
 350                         if (result != null)
 351                             result.sequential().forEach(downstreamAsLong);
 352                     }
 353                 };
 354             }
 355         };
 356     }
 357 
 358     @Override
 359     public final Stream<P_OUT> peek(Consumer<? super P_OUT> tee) {
 360         Objects.requireNonNull(tee);
 361         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
 362                                      0) {
 363             @Override
 364             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 365                 return new Sink.ChainedReference<P_OUT>(sink) {
 366                     @Override
 367                     public void accept(P_OUT u) {
 368                         tee.accept(u);
 369                         downstream.accept(u);
 370                     }
 371                 };
 372             }
 373         };
 374     }
 375 
 376     // Stateful intermediate operations from Stream
 377 
 378     @Override
 379     public final Stream<P_OUT> distinct() {
 380         return DistinctOps.makeRef(this);
 381     }
 382 
 383     @Override
 384     public final Stream<P_OUT> sorted() {
 385         return SortedOps.makeRef(this);
 386     }
 387 
 388     @Override
 389     public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
 390         return SortedOps.makeRef(this, comparator);
 391     }
 392 
 393     private Stream<P_OUT> slice(long skip, long limit) {
 394         return SliceOps.makeRef(this, skip, limit);
 395     }
 396 
 397     @Override
 398     public final Stream<P_OUT> limit(long maxSize) {
 399         if (maxSize < 0)
 400             throw new IllegalArgumentException(Long.toString(maxSize));
 401         return slice(0, maxSize);
 402     }
 403 
 404     @Override
 405     public final Stream<P_OUT> substream(long startingOffset) {
 406         if (startingOffset < 0)
 407             throw new IllegalArgumentException(Long.toString(startingOffset));
 408         if (startingOffset == 0)
 409             return this;
 410         else
 411             return slice(startingOffset, -1);
 412     }
 413 
 414     @Override
 415     public final Stream<P_OUT> substream(long startingOffset, long endingOffset) {
 416         if (startingOffset < 0 || endingOffset < startingOffset)
 417             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
 418         return slice(startingOffset, endingOffset - startingOffset);
 419     }
 420 
 421     // Terminal operations from Stream
 422 
 423     @Override
 424     public void forEach(Consumer<? super P_OUT> action) {
 425         evaluate(ForEachOps.makeRef(action, false));
 426     }
 427 
 428     @Override
 429     public void forEachOrdered(Consumer<? super P_OUT> action) {
 430         evaluate(ForEachOps.makeRef(action, true));
 431     }
 432 
 433     @Override
 434     @SuppressWarnings("unchecked")
 435     public final <A> A[] toArray(IntFunction<A[]> generator) {
 436         // Since A has no relation to U (not possible to declare that A is an upper bound of U)
 437         // there will be no static type checking.
 438         // Therefore use a raw type and assume A == U rather than propagating the separation of A and U
 439         // throughout the code-base.
 440         // The runtime type of U is never checked for equality with the component type of the runtime type of A[].
 441         // Runtime checking will be performed when an element is stored in A[], thus if A is not a
 442         // super type of U an ArrayStoreException will be thrown.
 443         IntFunction rawGenerator = (IntFunction) generator;
 444         return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator)
 445                               .asArray(rawGenerator);
 446     }
 447 
 448     @Override
 449     public final Object[] toArray() {
 450         return toArray(Object[]::new);
 451     }
 452 
 453     @Override
 454     public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
 455         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
 456     }
 457 
 458     @Override
 459     public final boolean allMatch(Predicate<? super P_OUT> predicate) {
 460         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
 461     }
 462 
 463     @Override
 464     public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
 465         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
 466     }
 467 
 468     @Override
 469     public final Optional<P_OUT> findFirst() {
 470         return evaluate(FindOps.makeRef(true));
 471     }
 472 
 473     @Override
 474     public final Optional<P_OUT> findAny() {
 475         return evaluate(FindOps.makeRef(false));
 476     }
 477 
 478     @Override
 479     public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
 480         return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
 481     }
 482 
 483     @Override
 484     public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
 485         return evaluate(ReduceOps.makeRef(accumulator));
 486     }
 487 
 488     @Override
 489     public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
 490         return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
 491     }
 492 
 493     @Override
 494     public final <R> R collect(Collector<? super P_OUT, R> collector) {
 495         if (isParallel()
 496                 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
 497                 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
 498             R container = collector.resultSupplier().get();
 499             BiFunction<R, ? super P_OUT, R> accumulator = collector.accumulator();
 500             forEach(u -> accumulator.apply(container, u));
 501             return container;
 502         }
 503         return evaluate(ReduceOps.makeRef(collector));
 504     }
 505 
 506     @Override
 507     public final <R> R collect(Supplier<R> resultFactory,
 508                                BiConsumer<R, ? super P_OUT> accumulator,
 509                                BiConsumer<R, R> combiner) {
 510         return evaluate(ReduceOps.makeRef(resultFactory, accumulator, combiner));
 511     }
 512 
 513     @Override
 514     public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
 515         return reduce(Comparators.greaterOf(comparator));
 516     }
 517 
 518     @Override
 519     public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
 520         return reduce(Comparators.lesserOf(comparator));
 521 
 522     }
 523 
 524     @Override
 525     public final long count() {
 526         return mapToLong(e -> 1L).sum();
 527     }
 528 
 529 
 530     //
 531 
 532     /**
 533      * Source stage of a ReferencePipeline.
 534      *
 535      * @param <E_IN> type of elements in the upstream source
 536      * @param <E_OUT> type of elements in produced by this stage
 537      * @since 1.8
 538      */
 539     static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
 540         /**
 541          * Constructor for the source stage of a Stream.
 542          *
 543          * @param source {@code Supplier<Spliterator>} describing the stream
 544          *               source
 545          * @param sourceFlags the source flags for the stream source, described
 546          *                    in {@link StreamOpFlag}
 547          */
 548         Head(Supplier<? extends Spliterator<?>> source,
 549              int sourceFlags, boolean parallel) {
 550             super(source, sourceFlags, parallel);
 551         }
 552 
 553         /**
 554          * Constructor for the source stage of a Stream.
 555          *
 556          * @param source {@code Spliterator} describing the stream source
 557          * @param sourceFlags the source flags for the stream source, described
 558          *                    in {@link StreamOpFlag}
 559          */
 560         Head(Spliterator<?> source,
 561              int sourceFlags, boolean parallel) {
 562             super(source, sourceFlags, parallel);
 563         }
 564 
 565         @Override
 566         final boolean opIsStateful() {
 567             throw new UnsupportedOperationException();
 568         }
 569 
 570         @Override
 571         final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
 572             throw new UnsupportedOperationException();
 573         }
 574 
 575         // Optimized sequential terminal operations for the head of the pipeline
 576 
 577         @Override
 578         public void forEach(Consumer<? super E_OUT> action) {
 579             if (!isParallel()) {
 580                 sourceStageSpliterator().forEachRemaining(action);
 581             }
 582             else {
 583                 super.forEach(action);
 584             }
 585         }
 586 
 587         @Override
 588         public void forEachOrdered(Consumer<? super E_OUT> action) {
 589             if (!isParallel()) {
 590                 sourceStageSpliterator().forEachRemaining(action);
 591             }
 592             else {
 593                 super.forEachOrdered(action);
 594             }
 595         }
 596     }
 597 
 598     /**
 599      * Base class for a stateless intermediate stage of a Stream.
 600      *
 601      * @param <E_IN> type of elements in the upstream source
 602      * @param <E_OUT> type of elements in produced by this stage
 603      * @since 1.8
 604      */
 605     abstract static class StatelessOp<E_IN, E_OUT>
 606             extends ReferencePipeline<E_IN, E_OUT> {
 607         /**
 608          * Construct a new Stream by appending a stateless intermediate
 609          * operation to an existing stream.
 610          *
 611          * @param upstream The upstream pipeline stage
 612          * @param inputShape The stream shape for the upstream pipeline stage
 613          * @param opFlags Operation flags for the new stage
 614          */
 615         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 616                     StreamShape inputShape,
 617                     int opFlags) {
 618             super(upstream, opFlags);
 619             assert upstream.getOutputShape() == inputShape;
 620         }
 621 
 622         @Override
 623         final boolean opIsStateful() {
 624             return false;
 625         }
 626     }
 627 
 628     /**
 629      * Base class for a stateful intermediate stage of a Stream.
 630      *
 631      * @param <E_IN> type of elements in the upstream source
 632      * @param <E_OUT> type of elements in produced by this stage
 633      * @since 1.8
 634      */
 635     abstract static class StatefulOp<E_IN, E_OUT>
 636             extends ReferencePipeline<E_IN, E_OUT> {
 637         /**
 638          * Construct a new Stream by appending a stateful intermediate operation
 639          * to an existing stream.
 640          * @param upstream The upstream pipeline stage
 641          * @param inputShape The stream shape for the upstream pipeline stage
 642          * @param opFlags Operation flags for the new stage
 643          */
 644         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 645                    StreamShape inputShape,
 646                    int opFlags) {
 647             super(upstream, opFlags);
 648             assert upstream.getOutputShape() == inputShape;
 649         }
 650 
 651         @Override
 652         final boolean opIsStateful() {
 653             return true;
 654         }
 655 
 656         @Override
 657         abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
 658                                                        Spliterator<P_IN> spliterator,
 659                                                        IntFunction<E_OUT[]> generator);
 660     }
 661 }