1 /*
   2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Comparator;
  28 import java.util.Iterator;
  29 import java.util.Objects;
  30 import java.util.Optional;
  31 import java.util.Spliterator;
  32 import java.util.Spliterators;
  33 import java.util.function.BiConsumer;
  34 import java.util.function.BiFunction;
  35 import java.util.function.BinaryOperator;
  36 import java.util.function.Consumer;
  37 import java.util.function.DoubleConsumer;
  38 import java.util.function.Function;
  39 import java.util.function.IntConsumer;
  40 import java.util.function.IntFunction;
  41 import java.util.function.LongConsumer;
  42 import java.util.function.Predicate;
  43 import java.util.function.Supplier;
  44 import java.util.function.ToDoubleFunction;
  45 import java.util.function.ToIntFunction;
  46 import java.util.function.ToLongFunction;
  47 
  48 /**
  49  * Abstract base class for an intermediate pipeline stage or pipeline source
  50  * stage implementing whose elements are of type {@code U}.
  51  *
  52  * @param <P_IN> type of elements in the upstream source
  53  * @param <P_OUT> type of elements in produced by this stage
  54  *
  55  * @since 1.8
  56  */
  57 abstract class ReferencePipeline<P_IN, P_OUT>
  58         extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
  59         implements Stream<P_OUT>  {
  60 
  61     /**
  62      * Constructor for the head of a stream pipeline.
  63      *
  64      * @param source {@code Supplier<Spliterator>} describing the stream source
  65      * @param sourceFlags the source flags for the stream source, described in
  66      *        {@link StreamOpFlag}
  67      * @param parallel {@code true} if the pipeline is parallel
  68      */
  69     ReferencePipeline(Supplier<? extends Spliterator<?>> source,
  70                       int sourceFlags, boolean parallel) {
  71         super(source, sourceFlags, parallel);
  72     }
  73 
  74     /**
  75      * Constructor for the head of a stream pipeline.
  76      *
  77      * @param source {@code Spliterator} describing the stream source
  78      * @param sourceFlags The source flags for the stream source, described in
  79      *        {@link StreamOpFlag}
  80      * @param parallel {@code true} if the pipeline is parallel
  81      */
  82     ReferencePipeline(Spliterator<?> source,
  83                       int sourceFlags, boolean parallel) {
  84         super(source, sourceFlags, parallel);
  85     }
  86 
  87     /**
  88      * Constructor for appending an intermediate operation onto an existing
  89      * pipeline.
  90      *
  91      * @param upstream the upstream element source.
  92      */
  93     ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
  94         super(upstream, opFlags);
  95     }
  96 
  97     // Shape-specific methods
  98 
  99     @Override
 100     final StreamShape getOutputShape() {
 101         return StreamShape.REFERENCE;
 102     }
 103 
 104     @Override
 105     final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
 106                                         Spliterator<P_IN> spliterator,
 107                                         boolean flattenTree,
 108                                         IntFunction<P_OUT[]> generator) {
 109         return Nodes.collect(helper, spliterator, flattenTree, generator);
 110     }
 111 
 112     @Override
 113     final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
 114                                      Supplier<Spliterator<P_IN>> supplier,
 115                                      boolean isParallel) {
 116         return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
 117     }
 118 
 119     @Override
 120     final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
 121         return new StreamSpliterators.DelegatingSpliterator<>(supplier);
 122     }
 123 
 124     @Override
 125     final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
 126         boolean cancelled;
 127         do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink));
 128         return cancelled;
 129     }
 130 
 131     @Override
 132     final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
 133         return Nodes.builder(exactSizeIfKnown, generator);
 134     }
 135 
 136 
 137     // BaseStream
 138 
 139     @Override
 140     public final Iterator<P_OUT> iterator() {
 141         return Spliterators.iterator(spliterator());
 142     }
 143 
 144 
 145     // Stream
 146 
 147     // Stateless intermediate operations from Stream
 148 
 149     @Override
 150     public Stream<P_OUT> unordered() {
 151         if (!isOrdered())
 152             return this;
 153         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
 154             @Override
 155             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 156                 return sink;
 157             }
 158         };
 159     }
 160 
 161     @Override
 162     public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
 163         Objects.requireNonNull(predicate);
 164         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
 165                                      StreamOpFlag.NOT_SIZED) {
 166             @Override
 167             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 168                 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
 169                     @Override
 170                     public void begin(long size) {
 171                         downstream.begin(-1);
 172                     }
 173 
 174                     @Override
 175                     public void accept(P_OUT u) {
 176                         if (predicate.test(u))
 177                             downstream.accept(u);
 178                     }
 179                 };
 180             }
 181         };
 182     }
 183 
 184     @Override
 185     @SuppressWarnings("unchecked")
 186     public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
 187         Objects.requireNonNull(mapper);
 188         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
 189                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 190             @Override
 191             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
 192                 return new Sink.ChainedReference<P_OUT, R>(sink) {
 193                     @Override
 194                     public void accept(P_OUT u) {
 195                         downstream.accept(mapper.apply(u));
 196                     }
 197                 };
 198             }
 199         };
 200     }
 201 
 202     @Override
 203     public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
 204         Objects.requireNonNull(mapper);
 205         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 206                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 207             @Override
 208             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
 209                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
 210                     @Override
 211                     public void accept(P_OUT u) {
 212                         downstream.accept(mapper.applyAsInt(u));
 213                     }
 214                 };
 215             }
 216         };
 217     }
 218 
 219     @Override
 220     public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
 221         Objects.requireNonNull(mapper);
 222         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 223                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 224             @Override
 225             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
 226                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
 227                     @Override
 228                     public void accept(P_OUT u) {
 229                         downstream.accept(mapper.applyAsLong(u));
 230                     }
 231                 };
 232             }
 233         };
 234     }
 235 
 236     @Override
 237     public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) {
 238         Objects.requireNonNull(mapper);
 239         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 240                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 241             @Override
 242             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
 243                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
 244                     @Override
 245                     public void accept(P_OUT u) {
 246                         downstream.accept(mapper.applyAsDouble(u));
 247                     }
 248                 };
 249             }
 250         };
 251     }
 252 
 253     @Override
 254     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
 255         Objects.requireNonNull(mapper);
 256         // We can do better than this, by polling cancellationRequested when stream is infinite
 257         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
 258                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 259             @Override
 260             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
 261                 return new Sink.ChainedReference<P_OUT, R>(sink) {
 262                     @Override
 263                     public void begin(long size) {
 264                         downstream.begin(-1);
 265                     }
 266 
 267                     @Override
 268                     public void accept(P_OUT u) {
 269                         try (Stream<? extends R> result = mapper.apply(u)) {
 270                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 271                             if (result != null)
 272                                 result.sequential().forEach(downstream);
 273                         }
 274                     }
 275                 };
 276             }
 277         };
 278     }
 279 
 280     @Override
 281     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
 282         Objects.requireNonNull(mapper);
 283         // We can do better than this, by polling cancellationRequested when stream is infinite
 284         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 285                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 286             @Override
 287             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
 288                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
 289                     IntConsumer downstreamAsInt = downstream::accept;
 290                     @Override
 291                     public void begin(long size) {
 292                         downstream.begin(-1);
 293                     }
 294 
 295                     @Override
 296                     public void accept(P_OUT u) {
 297                         try (IntStream result = mapper.apply(u)) {
 298                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 299                             if (result != null)
 300                                 result.sequential().forEach(downstreamAsInt);
 301                         }
 302                     }
 303                 };
 304             }
 305         };
 306     }
 307 
 308     @Override
 309     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
 310         Objects.requireNonNull(mapper);
 311         // We can do better than this, by polling cancellationRequested when stream is infinite
 312         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 313                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 314             @Override
 315             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
 316                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
 317                     DoubleConsumer downstreamAsDouble = downstream::accept;
 318                     @Override
 319                     public void begin(long size) {
 320                         downstream.begin(-1);
 321                     }
 322 
 323                     @Override
 324                     public void accept(P_OUT u) {
 325                         try (DoubleStream result = mapper.apply(u)) {
 326                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 327                             if (result != null)
 328                                 result.sequential().forEach(downstreamAsDouble);
 329                         }
 330                     }
 331                 };
 332             }
 333         };
 334     }
 335 
 336     @Override
 337     public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
 338         Objects.requireNonNull(mapper);
 339         // We can do better than this, by polling cancellationRequested when stream is infinite
 340         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 341                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 342             @Override
 343             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
 344                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
 345                     LongConsumer downstreamAsLong = downstream::accept;
 346                     @Override
 347                     public void begin(long size) {
 348                         downstream.begin(-1);
 349                     }
 350 
 351                     @Override
 352                     public void accept(P_OUT u) {
 353                         try (LongStream result = mapper.apply(u)) {
 354                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 355                             if (result != null)
 356                                 result.sequential().forEach(downstreamAsLong);
 357                         }
 358                     }
 359                 };
 360             }
 361         };
 362     }
 363 
 364     @Override
 365     public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
 366         Objects.requireNonNull(action);
 367         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
 368                                      0) {
 369             @Override
 370             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 371                 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
 372                     @Override
 373                     public void accept(P_OUT u) {
 374                         action.accept(u);
 375                         downstream.accept(u);
 376                     }
 377                 };
 378             }
 379         };
 380     }
 381 
 382     // Stateful intermediate operations from Stream
 383 
 384     @Override
 385     public final Stream<P_OUT> distinct() {
 386         return DistinctOps.makeRef(this);
 387     }
 388 
 389     @Override
 390     public final Stream<P_OUT> sorted() {
 391         return SortedOps.makeRef(this);
 392     }
 393 
 394     @Override
 395     public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
 396         return SortedOps.makeRef(this, comparator);
 397     }
 398 
 399     @Override
 400     public Stream<P_OUT> limit(long maxSize) {
 401         if (maxSize < 0)
 402             throw new IllegalArgumentException(Long.toString(maxSize));
 403         return SliceOps.makeRef(this, 0, maxSize);
 404     }
 405 
 406     @Override
 407     public final Stream<P_OUT> skip(long n) {
 408         if (n < 0)
 409             throw new IllegalArgumentException(Long.toString(n));
 410         if (n == 0)
 411             return this;
 412         else
 413             return SliceOps.makeRef(this, n, -1);
 414     }
 415 
 416     @Override
 417     public final Stream<P_OUT> takeWhile(Predicate<? super P_OUT> predicate) {
 418         return WhileOps.makeTakeWhileRef(this, predicate);
 419     }
 420 
 421     @Override
 422     public final Stream<P_OUT> dropWhile(Predicate<? super P_OUT> predicate) {
 423         return WhileOps.makeDropWhileRef(this, predicate);
 424     }
 425 
 426     // Terminal operations from Stream
 427 
 428     @Override
 429     public void forEach(Consumer<? super P_OUT> action) {
 430         evaluate(ForEachOps.makeRef(action, false));
 431     }
 432 
 433     @Override
 434     public void forEachOrdered(Consumer<? super P_OUT> action) {
 435         evaluate(ForEachOps.makeRef(action, true));
 436     }
 437 
 438     @Override
 439     @SuppressWarnings("unchecked")
 440     public final <A> A[] toArray(IntFunction<A[]> generator) {
 441         // Since A has no relation to U (not possible to declare that A is an upper bound of U)
 442         // there will be no static type checking.
 443         // Therefore use a raw type and assume A == U rather than propagating the separation of A and U
 444         // throughout the code-base.
 445         // The runtime type of U is never checked for equality with the component type of the runtime type of A[].
 446         // Runtime checking will be performed when an element is stored in A[], thus if A is not a
 447         // super type of U an ArrayStoreException will be thrown.
 448         @SuppressWarnings("rawtypes")
 449         IntFunction rawGenerator = (IntFunction) generator;
 450         return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator)
 451                               .asArray(rawGenerator);
 452     }
 453 
 454     @Override
 455     public final Object[] toArray() {
 456         return toArray(Object[]::new);
 457     }
 458 
 459     @Override
 460     public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
 461         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
 462     }
 463 
 464     @Override
 465     public final boolean allMatch(Predicate<? super P_OUT> predicate) {
 466         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
 467     }
 468 
 469     @Override
 470     public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
 471         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
 472     }
 473 
 474     @Override
 475     public final Optional<P_OUT> findFirst() {
 476         return evaluate(FindOps.makeRef(true));
 477     }
 478 
 479     @Override
 480     public final Optional<P_OUT> findAny() {
 481         return evaluate(FindOps.makeRef(false));
 482     }
 483 
 484     @Override
 485     public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
 486         return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
 487     }
 488 
 489     @Override
 490     public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
 491         return evaluate(ReduceOps.makeRef(accumulator));
 492     }
 493 
 494     @Override
 495     public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
 496         return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
 497     }
 498 
 499     @Override
 500     @SuppressWarnings("unchecked")
 501     public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
 502         A container;
 503         if (isParallel()
 504                 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
 505                 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
 506             container = collector.supplier().get();
 507             BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
 508             forEach(u -> accumulator.accept(container, u));
 509         }
 510         else {
 511             container = evaluate(ReduceOps.makeRef(collector));
 512         }
 513         return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
 514                ? (R) container
 515                : collector.finisher().apply(container);
 516     }
 517 
 518     @Override
 519     public final <R> R collect(Supplier<R> supplier,
 520                                BiConsumer<R, ? super P_OUT> accumulator,
 521                                BiConsumer<R, R> combiner) {
 522         return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
 523     }
 524 
 525     @Override
 526     public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
 527         return reduce(BinaryOperator.maxBy(comparator));
 528     }
 529 
 530     @Override
 531     public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
 532         return reduce(BinaryOperator.minBy(comparator));
 533 
 534     }
 535 
 536     @Override
 537     public final long count() {
 538         return evaluate(ReduceOps.makeRefCounting());
 539     }
 540 
 541     //
 542 
 543     /**
 544      * Source stage of a ReferencePipeline.
 545      *
 546      * @param <E_IN> type of elements in the upstream source
 547      * @param <E_OUT> type of elements in produced by this stage
 548      * @since 1.8
 549      */
 550     static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
 551         /**
 552          * Constructor for the source stage of a Stream.
 553          *
 554          * @param source {@code Supplier<Spliterator>} describing the stream
 555          *               source
 556          * @param sourceFlags the source flags for the stream source, described
 557          *                    in {@link StreamOpFlag}
 558          */
 559         Head(Supplier<? extends Spliterator<?>> source,
 560              int sourceFlags, boolean parallel) {
 561             super(source, sourceFlags, parallel);
 562         }
 563 
 564         /**
 565          * Constructor for the source stage of a Stream.
 566          *
 567          * @param source {@code Spliterator} describing the stream source
 568          * @param sourceFlags the source flags for the stream source, described
 569          *                    in {@link StreamOpFlag}
 570          */
 571         Head(Spliterator<?> source,
 572              int sourceFlags, boolean parallel) {
 573             super(source, sourceFlags, parallel);
 574         }
 575 
 576         @Override
 577         final boolean opIsStateful() {
 578             throw new UnsupportedOperationException();
 579         }
 580 
 581         @Override
 582         final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
 583             throw new UnsupportedOperationException();
 584         }
 585 
 586         // Optimized sequential terminal operations for the head of the pipeline
 587 
 588         @Override
 589         public void forEach(Consumer<? super E_OUT> action) {
 590             if (!isParallel()) {
 591                 sourceStageSpliterator().forEachRemaining(action);
 592             }
 593             else {
 594                 super.forEach(action);
 595             }
 596         }
 597 
 598         @Override
 599         public void forEachOrdered(Consumer<? super E_OUT> action) {
 600             if (!isParallel()) {
 601                 sourceStageSpliterator().forEachRemaining(action);
 602             }
 603             else {
 604                 super.forEachOrdered(action);
 605             }
 606         }
 607     }
 608 
 609     /**
 610      * Base class for a stateless intermediate stage of a Stream.
 611      *
 612      * @param <E_IN> type of elements in the upstream source
 613      * @param <E_OUT> type of elements in produced by this stage
 614      * @since 1.8
 615      */
 616     abstract static class StatelessOp<E_IN, E_OUT>
 617             extends ReferencePipeline<E_IN, E_OUT> {
 618         /**
 619          * Construct a new Stream by appending a stateless intermediate
 620          * operation to an existing stream.
 621          *
 622          * @param upstream The upstream pipeline stage
 623          * @param inputShape The stream shape for the upstream pipeline stage
 624          * @param opFlags Operation flags for the new stage
 625          */
 626         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 627                     StreamShape inputShape,
 628                     int opFlags) {
 629             super(upstream, opFlags);
 630             assert upstream.getOutputShape() == inputShape;
 631         }
 632 
 633         @Override
 634         final boolean opIsStateful() {
 635             return false;
 636         }
 637     }
 638 
 639     /**
 640      * Base class for a stateful intermediate stage of a Stream.
 641      *
 642      * @param <E_IN> type of elements in the upstream source
 643      * @param <E_OUT> type of elements in produced by this stage
 644      * @since 1.8
 645      */
 646     abstract static class StatefulOp<E_IN, E_OUT>
 647             extends ReferencePipeline<E_IN, E_OUT> {
 648         /**
 649          * Construct a new Stream by appending a stateful intermediate operation
 650          * to an existing stream.
 651          * @param upstream The upstream pipeline stage
 652          * @param inputShape The stream shape for the upstream pipeline stage
 653          * @param opFlags Operation flags for the new stage
 654          */
 655         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 656                    StreamShape inputShape,
 657                    int opFlags) {
 658             super(upstream, opFlags);
 659             assert upstream.getOutputShape() == inputShape;
 660         }
 661 
 662         @Override
 663         final boolean opIsStateful() {
 664             return true;
 665         }
 666 
 667         @Override
 668         abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
 669                                                        Spliterator<P_IN> spliterator,
 670                                                        IntFunction<E_OUT[]> generator);
 671     }
 672 }