1 /*
   2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.IntSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalInt;
  31 import java.util.PrimitiveIterator;
  32 import java.util.Spliterator;
  33 import java.util.Spliterators;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BinaryOperator;
  36 import java.util.function.IntBinaryOperator;
  37 import java.util.function.IntConsumer;
  38 import java.util.function.IntFunction;
  39 import java.util.function.IntPredicate;
  40 import java.util.function.IntToDoubleFunction;
  41 import java.util.function.IntToLongFunction;
  42 import java.util.function.IntUnaryOperator;
  43 import java.util.function.ObjIntConsumer;
  44 import java.util.function.Supplier;
  45 
  46 /**
  47  * Abstract base class for an intermediate pipeline stage or pipeline source
  48  * stage implementing whose elements are of type {@code int}.
  49  *
  50  * @param <E_IN> type of elements in the upstream source
  51  * @since 1.8
  52  */
  53 abstract class IntPipeline<E_IN>
  54         extends AbstractPipeline<E_IN, Integer, IntStream>
  55         implements IntStream {
  56 
  57     /**
  58      * Constructor for the head of a stream pipeline.
  59      *
  60      * @param source {@code Supplier<Spliterator>} describing the stream source
  61      * @param sourceFlags The source flags for the stream source, described in
  62      *        {@link StreamOpFlag}
  63      * @param parallel {@code true} if the pipeline is parallel
  64      */
  65     IntPipeline(Supplier<? extends Spliterator<Integer>> source,
  66                 int sourceFlags, boolean parallel) {
  67         super(source, sourceFlags, parallel);
  68     }
  69 
  70     /**
  71      * Constructor for the head of a stream pipeline.
  72      *
  73      * @param source {@code Spliterator} describing the stream source
  74      * @param sourceFlags The source flags for the stream source, described in
  75      *        {@link StreamOpFlag}
  76      * @param parallel {@code true} if the pipeline is parallel
  77      */
  78     IntPipeline(Spliterator<Integer> source,
  79                 int sourceFlags, boolean parallel) {
  80         super(source, sourceFlags, parallel);
  81     }
  82 
  83     /**
  84      * Constructor for appending an intermediate operation onto an existing
  85      * pipeline.
  86      *
  87      * @param upstream the upstream element source
  88      * @param opFlags the operation flags for the new operation
  89      */
  90     IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  91         super(upstream, opFlags);
  92     }
  93 
  94     /**
  95      * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply
  96      * by casting.
  97      */
  98     private static IntConsumer adapt(Sink<Integer> sink) {
  99         if (sink instanceof IntConsumer) {
 100             return (IntConsumer) sink;
 101         }
 102         else {
 103             if (Tripwire.ENABLED)
 104                 Tripwire.trip(AbstractPipeline.class,
 105                               "using IntStream.adapt(Sink<Integer> s)");
 106             return sink::accept;
 107         }
 108     }
 109 
 110     /**
 111      * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}.
 112      *
 113      * @implNote
 114      * The implementation attempts to cast to a Spliterator.OfInt, and throws an
 115      * exception if this cast is not possible.
 116      */
 117     private static Spliterator.OfInt adapt(Spliterator<Integer> s) {
 118         if (s instanceof Spliterator.OfInt) {
 119             return (Spliterator.OfInt) s;
 120         }
 121         else {
 122             if (Tripwire.ENABLED)
 123                 Tripwire.trip(AbstractPipeline.class,
 124                               "using IntStream.adapt(Spliterator<Integer> s)");
 125             throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)");
 126         }
 127     }
 128 
 129 
 130     // Shape-specific methods
 131 
 132     @Override
 133     final StreamShape getOutputShape() {
 134         return StreamShape.INT_VALUE;
 135     }
 136 
 137     @Override
 138     final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper,
 139                                               Spliterator<P_IN> spliterator,
 140                                               boolean flattenTree,
 141                                               IntFunction<Integer[]> generator) {
 142         return Nodes.collectInt(helper, spliterator, flattenTree);
 143     }
 144 
 145     @Override
 146     final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph,
 147                                            Supplier<Spliterator<P_IN>> supplier,
 148                                            boolean isParallel) {
 149         return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel);
 150     }
 151 
 152     @Override
 153     @SuppressWarnings("unchecked")
 154     final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) {
 155         return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier);
 156     }
 157 
 158     @Override
 159     final boolean forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
 160         Spliterator.OfInt spl = adapt(spliterator);
 161         IntConsumer adaptedSink = adapt(sink);
 162         boolean cancelled;
 163         do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
 164         return cancelled;
 165     }
 166 
 167     @Override
 168     final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown,
 169                                                 IntFunction<Integer[]> generator) {
 170         return Nodes.intBuilder(exactSizeIfKnown);
 171     }
 172 
 173 
 174     // IntStream
 175 
 176     @Override
 177     public final PrimitiveIterator.OfInt iterator() {
 178         return Spliterators.iterator(spliterator());
 179     }
 180 
 181     @Override
 182     public final Spliterator.OfInt spliterator() {
 183         return adapt(super.spliterator());
 184     }
 185 
 186     // Stateless intermediate ops from IntStream
 187 
 188     @Override
 189     public final LongStream asLongStream() {
 190         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 191                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 192             @Override
 193             Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
 194                 return new Sink.ChainedInt<Long>(sink) {
 195                     @Override
 196                     public void accept(int t) {
 197                         downstream.accept((long) t);
 198                     }
 199                 };
 200             }
 201         };
 202     }
 203 
 204     @Override
 205     public final DoubleStream asDoubleStream() {
 206         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 207                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 208             @Override
 209             Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
 210                 return new Sink.ChainedInt<Double>(sink) {
 211                     @Override
 212                     public void accept(int t) {
 213                         downstream.accept((double) t);
 214                     }
 215                 };
 216             }
 217         };
 218     }
 219 
 220     @Override
 221     public final Stream<Integer> boxed() {
 222         return mapToObj(Integer::valueOf);
 223     }
 224 
 225     @Override
 226     public final IntStream map(IntUnaryOperator mapper) {
 227         Objects.requireNonNull(mapper);
 228         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 229                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 230             @Override
 231             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 232                 return new Sink.ChainedInt<Integer>(sink) {
 233                     @Override
 234                     public void accept(int t) {
 235                         downstream.accept(mapper.applyAsInt(t));
 236                     }
 237                 };
 238             }
 239         };
 240     }
 241 
 242     @Override
 243     public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
 244         Objects.requireNonNull(mapper);
 245         return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE,
 246                                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 247             @Override
 248             Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
 249                 return new Sink.ChainedInt<U>(sink) {
 250                     @Override
 251                     public void accept(int t) {
 252                         downstream.accept(mapper.apply(t));
 253                     }
 254                 };
 255             }
 256         };
 257     }
 258 
 259     @Override
 260     public final LongStream mapToLong(IntToLongFunction mapper) {
 261         Objects.requireNonNull(mapper);
 262         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 263                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 264             @Override
 265             Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
 266                 return new Sink.ChainedInt<Long>(sink) {
 267                     @Override
 268                     public void accept(int t) {
 269                         downstream.accept(mapper.applyAsLong(t));
 270                     }
 271                 };
 272             }
 273         };
 274     }
 275 
 276     @Override
 277     public final DoubleStream mapToDouble(IntToDoubleFunction mapper) {
 278         Objects.requireNonNull(mapper);
 279         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 280                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 281             @Override
 282             Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
 283                 return new Sink.ChainedInt<Double>(sink) {
 284                     @Override
 285                     public void accept(int t) {
 286                         downstream.accept(mapper.applyAsDouble(t));
 287                     }
 288                 };
 289             }
 290         };
 291     }
 292 
 293     @Override
 294     public final IntStream flatMap(IntFunction<? extends IntStream> mapper) {
 295         Objects.requireNonNull(mapper);
 296         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 297                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 298             @Override
 299             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 300                 return new Sink.ChainedInt<Integer>(sink) {
 301                     @Override
 302                     public void begin(long size) {
 303                         downstream.begin(-1);
 304                     }
 305 
 306                     @Override
 307                     public void accept(int t) {
 308                         try (IntStream result = mapper.apply(t)) {
 309                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 310                             if (result != null)
 311                                 result.sequential().forEach(i -> downstream.accept(i));
 312                         }
 313                     }
 314                 };
 315             }
 316         };
 317     }
 318 
 319     @Override
 320     public IntStream unordered() {
 321         if (!isOrdered())
 322             return this;
 323         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) {
 324             @Override
 325             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 326                 return sink;
 327             }
 328         };
 329     }
 330 
 331     @Override
 332     public final IntStream filter(IntPredicate predicate) {
 333         Objects.requireNonNull(predicate);
 334         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 335                                         StreamOpFlag.NOT_SIZED) {
 336             @Override
 337             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 338                 return new Sink.ChainedInt<Integer>(sink) {
 339                     @Override
 340                     public void begin(long size) {
 341                         downstream.begin(-1);
 342                     }
 343 
 344                     @Override
 345                     public void accept(int t) {
 346                         if (predicate.test(t))
 347                             downstream.accept(t);
 348                     }
 349                 };
 350             }
 351         };
 352     }
 353 
 354     @Override
 355     public final IntStream peek(IntConsumer action) {
 356         Objects.requireNonNull(action);
 357         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 358                                         0) {
 359             @Override
 360             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 361                 return new Sink.ChainedInt<Integer>(sink) {
 362                     @Override
 363                     public void accept(int t) {
 364                         action.accept(t);
 365                         downstream.accept(t);
 366                     }
 367                 };
 368             }
 369         };
 370     }
 371 
 372     // Stateful intermediate ops from IntStream
 373 
 374     @Override
 375     public final IntStream limit(long maxSize) {
 376         if (maxSize < 0)
 377             throw new IllegalArgumentException(Long.toString(maxSize));
 378         return SliceOps.makeInt(this, 0, maxSize);
 379     }
 380 
 381     @Override
 382     public final IntStream skip(long n) {
 383         if (n < 0)
 384             throw new IllegalArgumentException(Long.toString(n));
 385         if (n == 0)
 386             return this;
 387         else
 388             return SliceOps.makeInt(this, n, -1);
 389     }
 390 
 391     @Override
 392     public final IntStream takeWhile(IntPredicate predicate) {
 393         return WhileOps.makeTakeWhileInt(this, predicate);
 394     }
 395 
 396     @Override
 397     public final IntStream dropWhile(IntPredicate predicate) {
 398         return WhileOps.makeDropWhileInt(this, predicate);
 399     }
 400 
 401     @Override
 402     public final IntStream sorted() {
 403         return SortedOps.makeInt(this);
 404     }
 405 
 406     @Override
 407     public final IntStream distinct() {
 408         // While functional and quick to implement, this approach is not very efficient.
 409         // An efficient version requires an int-specific map/set implementation.
 410         return boxed().distinct().mapToInt(i -> i);
 411     }
 412 
 413     // Terminal ops from IntStream
 414 
 415     @Override
 416     public void forEach(IntConsumer action) {
 417         evaluate(ForEachOps.makeInt(action, false));
 418     }
 419 
 420     @Override
 421     public void forEachOrdered(IntConsumer action) {
 422         evaluate(ForEachOps.makeInt(action, true));
 423     }
 424 
 425     @Override
 426     public final int sum() {
 427         return reduce(0, Integer::sum);
 428     }
 429 
 430     @Override
 431     public final OptionalInt min() {
 432         return reduce(Math::min);
 433     }
 434 
 435     @Override
 436     public final OptionalInt max() {
 437         return reduce(Math::max);
 438     }
 439 
 440     @Override
 441     public final long count() {
 442         return evaluate(ReduceOps.makeIntCounting());
 443     }
 444 
 445     @Override
 446     public final OptionalDouble average() {
 447         long[] avg = collect(() -> new long[2],
 448                              (ll, i) -> {
 449                                  ll[0]++;
 450                                  ll[1] += i;
 451                              },
 452                              (ll, rr) -> {
 453                                  ll[0] += rr[0];
 454                                  ll[1] += rr[1];
 455                              });
 456         return avg[0] > 0
 457                ? OptionalDouble.of((double) avg[1] / avg[0])
 458                : OptionalDouble.empty();
 459     }
 460 
 461     @Override
 462     public final IntSummaryStatistics summaryStatistics() {
 463         return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept,
 464                        IntSummaryStatistics::combine);
 465     }
 466 
 467     @Override
 468     public final int reduce(int identity, IntBinaryOperator op) {
 469         return evaluate(ReduceOps.makeInt(identity, op));
 470     }
 471 
 472     @Override
 473     public final OptionalInt reduce(IntBinaryOperator op) {
 474         return evaluate(ReduceOps.makeInt(op));
 475     }
 476 
 477     @Override
 478     public final <R> R collect(Supplier<R> supplier,
 479                                ObjIntConsumer<R> accumulator,
 480                                BiConsumer<R, R> combiner) {
 481         Objects.requireNonNull(combiner);
 482         BinaryOperator<R> operator = (left, right) -> {
 483             combiner.accept(left, right);
 484             return left;
 485         };
 486         return evaluate(ReduceOps.makeInt(supplier, accumulator, operator));
 487     }
 488 
 489     @Override
 490     public final boolean anyMatch(IntPredicate predicate) {
 491         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY));
 492     }
 493 
 494     @Override
 495     public final boolean allMatch(IntPredicate predicate) {
 496         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL));
 497     }
 498 
 499     @Override
 500     public final boolean noneMatch(IntPredicate predicate) {
 501         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE));
 502     }
 503 
 504     @Override
 505     public final OptionalInt findFirst() {
 506         return evaluate(FindOps.makeInt(true));
 507     }
 508 
 509     @Override
 510     public final OptionalInt findAny() {
 511         return evaluate(FindOps.makeInt(false));
 512     }
 513 
 514     @Override
 515     public final int[] toArray() {
 516         return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new))
 517                         .asPrimitiveArray();
 518     }
 519 
 520     //
 521 
 522     /**
 523      * Source stage of an IntStream.
 524      *
 525      * @param <E_IN> type of elements in the upstream source
 526      * @since 1.8
 527      */
 528     static class Head<E_IN> extends IntPipeline<E_IN> {
 529         /**
 530          * Constructor for the source stage of an IntStream.
 531          *
 532          * @param source {@code Supplier<Spliterator>} describing the stream
 533          *               source
 534          * @param sourceFlags the source flags for the stream source, described
 535          *                    in {@link StreamOpFlag}
 536          * @param parallel {@code true} if the pipeline is parallel
 537          */
 538         Head(Supplier<? extends Spliterator<Integer>> source,
 539              int sourceFlags, boolean parallel) {
 540             super(source, sourceFlags, parallel);
 541         }
 542 
 543         /**
 544          * Constructor for the source stage of an IntStream.
 545          *
 546          * @param source {@code Spliterator} describing the stream source
 547          * @param sourceFlags the source flags for the stream source, described
 548          *                    in {@link StreamOpFlag}
 549          * @param parallel {@code true} if the pipeline is parallel
 550          */
 551         Head(Spliterator<Integer> source,
 552              int sourceFlags, boolean parallel) {
 553             super(source, sourceFlags, parallel);
 554         }
 555 
 556         @Override
 557         final boolean opIsStateful() {
 558             throw new UnsupportedOperationException();
 559         }
 560 
 561         @Override
 562         final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) {
 563             throw new UnsupportedOperationException();
 564         }
 565 
 566         // Optimized sequential terminal operations for the head of the pipeline
 567 
 568         @Override
 569         public void forEach(IntConsumer action) {
 570             if (!isParallel()) {
 571                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 572             }
 573             else {
 574                 super.forEach(action);
 575             }
 576         }
 577 
 578         @Override
 579         public void forEachOrdered(IntConsumer action) {
 580             if (!isParallel()) {
 581                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 582             }
 583             else {
 584                 super.forEachOrdered(action);
 585             }
 586         }
 587     }
 588 
 589     /**
 590      * Base class for a stateless intermediate stage of an IntStream
 591      *
 592      * @param <E_IN> type of elements in the upstream source
 593      * @since 1.8
 594      */
 595     abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> {
 596         /**
 597          * Construct a new IntStream by appending a stateless intermediate
 598          * operation to an existing stream.
 599          * @param upstream The upstream pipeline stage
 600          * @param inputShape The stream shape for the upstream pipeline stage
 601          * @param opFlags Operation flags for the new stage
 602          */
 603         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 604                     StreamShape inputShape,
 605                     int opFlags) {
 606             super(upstream, opFlags);
 607             assert upstream.getOutputShape() == inputShape;
 608         }
 609 
 610         @Override
 611         final boolean opIsStateful() {
 612             return false;
 613         }
 614     }
 615 
 616     /**
 617      * Base class for a stateful intermediate stage of an IntStream.
 618      *
 619      * @param <E_IN> type of elements in the upstream source
 620      * @since 1.8
 621      */
 622     abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> {
 623         /**
 624          * Construct a new IntStream by appending a stateful intermediate
 625          * operation to an existing stream.
 626          * @param upstream The upstream pipeline stage
 627          * @param inputShape The stream shape for the upstream pipeline stage
 628          * @param opFlags Operation flags for the new stage
 629          */
 630         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 631                    StreamShape inputShape,
 632                    int opFlags) {
 633             super(upstream, opFlags);
 634             assert upstream.getOutputShape() == inputShape;
 635         }
 636 
 637         @Override
 638         final boolean opIsStateful() {
 639             return true;
 640         }
 641 
 642         @Override
 643         abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
 644                                                          Spliterator<P_IN> spliterator,
 645                                                          IntFunction<Integer[]> generator);
 646     }
 647 }