1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.IntSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalInt;
  31 import java.util.PrimitiveIterator;
  32 import java.util.Spliterator;
  33 import java.util.Spliterators;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BinaryOperator;
  36 import java.util.function.IntBinaryOperator;
  37 import java.util.function.IntConsumer;
  38 import java.util.function.IntFunction;
  39 import java.util.function.IntPredicate;
  40 import java.util.function.IntToDoubleFunction;
  41 import java.util.function.IntToLongFunction;
  42 import java.util.function.IntUnaryOperator;
  43 import java.util.function.ObjIntConsumer;
  44 import java.util.function.Supplier;
  45 
  46 /**
  47  * Abstract base class for an intermediate pipeline stage or pipeline source
  48  * stage implementing whose elements are of type {@code int}.
  49  *
  50  * @param <E_IN> type of elements in the upstream source
  51  * @since 1.8
  52  */
  53 abstract class IntPipeline<E_IN>
  54         extends AbstractPipeline<E_IN, Integer, IntStream>
  55         implements IntStream {
  56 
  57     /**
  58      * Constructor for the head of a stream pipeline.
  59      *
  60      * @param source {@code Supplier<Spliterator>} describing the stream source
  61      * @param sourceFlags The source flags for the stream source, described in
  62      *        {@link StreamOpFlag}
  63      * @param parallel {@code true} if the pipeline is parallel
  64      */
  65     IntPipeline(Supplier<? extends Spliterator<Integer>> source,
  66                 int sourceFlags, boolean parallel) {
  67         super(source, sourceFlags, parallel);
  68     }
  69 
  70     /**
  71      * Constructor for the head of a stream pipeline.
  72      *
  73      * @param source {@code Spliterator} describing the stream source
  74      * @param sourceFlags The source flags for the stream source, described in
  75      *        {@link StreamOpFlag}
  76      * @param parallel {@code true} if the pipeline is parallel
  77      */
  78     IntPipeline(Spliterator<Integer> source,
  79                 int sourceFlags, boolean parallel) {
  80         super(source, sourceFlags, parallel);
  81     }
  82 
  83     /**
  84      * Constructor for appending an intermediate operation onto an existing
  85      * pipeline.
  86      *
  87      * @param upstream the upstream element source
  88      * @param opFlags the operation flags for the new operation
  89      */
  90     IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  91         super(upstream, opFlags);
  92     }
  93 
  94     /**
  95      * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply
  96      * by casting.
  97      */
  98     private static IntConsumer adapt(Sink<Integer> sink) {
  99         if (sink instanceof IntConsumer) {
 100             return (IntConsumer) sink;
 101         }
 102         else {
 103             if (Tripwire.ENABLED)
 104                 Tripwire.trip(AbstractPipeline.class,
 105                               "using IntStream.adapt(Sink<Integer> s)");
 106             return sink::accept;
 107         }
 108     }
 109 
 110     /**
 111      * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}.
 112      *
 113      * @implNote
 114      * The implementation attempts to cast to a Spliterator.OfInt, and throws an
 115      * exception if this cast is not possible.
 116      */
 117     private static Spliterator.OfInt adapt(Spliterator<Integer> s) {
 118         if (s instanceof Spliterator.OfInt) {
 119             return (Spliterator.OfInt) s;
 120         }
 121         else {
 122             if (Tripwire.ENABLED)
 123                 Tripwire.trip(AbstractPipeline.class,
 124                               "using IntStream.adapt(Spliterator<Integer> s)");
 125             throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)");
 126         }
 127     }
 128 
 129 
 130     // Shape-specific methods
 131 
 132     @Override
 133     final StreamShape getOutputShape() {
 134         return StreamShape.INT_VALUE;
 135     }
 136 
 137     @Override
 138     final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper,
 139                                               Spliterator<P_IN> spliterator,
 140                                               boolean flattenTree,
 141                                               IntFunction<Integer[]> generator) {
 142         return Nodes.collectInt(helper, spliterator, flattenTree);
 143     }
 144 
 145     @Override
 146     final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph,
 147                                            Supplier<Spliterator<P_IN>> supplier,
 148                                            boolean isParallel) {
 149         return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel);
 150     }
 151 
 152     @Override
 153     @SuppressWarnings("unchecked")
 154     final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) {
 155         return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier);
 156     }
 157 
 158     @Override
 159     final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
 160         Spliterator.OfInt spl = adapt(spliterator);
 161         IntConsumer adaptedSink = adapt(sink);
 162         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
 163     }
 164 
 165     @Override
 166     final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown,
 167                                                 IntFunction<Integer[]> generator) {
 168         return Nodes.intBuilder(exactSizeIfKnown);
 169     }
 170 
 171 
 172     // IntStream
 173 
 174     @Override
 175     public final PrimitiveIterator.OfInt iterator() {
 176         return Spliterators.iterator(spliterator());
 177     }
 178 
 179     @Override
 180     public final Spliterator.OfInt spliterator() {
 181         return adapt(super.spliterator());
 182     }
 183 
 184     // Stateless intermediate ops from IntStream
 185 
 186     @Override
 187     public final LongStream asLongStream() {
 188         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 189                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 190             @Override
 191             Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
 192                 return new Sink.ChainedInt<Long>(sink) {
 193                     @Override
 194                     public void accept(int t) {
 195                         downstream.accept((long) t);
 196                     }
 197                 };
 198             }
 199         };
 200     }
 201 
 202     @Override
 203     public final DoubleStream asDoubleStream() {
 204         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 205                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 206             @Override
 207             Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
 208                 return new Sink.ChainedInt<Double>(sink) {
 209                     @Override
 210                     public void accept(int t) {
 211                         downstream.accept((double) t);
 212                     }
 213                 };
 214             }
 215         };
 216     }
 217 
 218     @Override
 219     public final Stream<Integer> boxed() {
 220         return mapToObj(Integer::valueOf);
 221     }
 222 
 223     @Override
 224     public final IntStream map(IntUnaryOperator mapper) {
 225         Objects.requireNonNull(mapper);
 226         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 227                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 228             @Override
 229             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 230                 return new Sink.ChainedInt<Integer>(sink) {
 231                     @Override
 232                     public void accept(int t) {
 233                         downstream.accept(mapper.applyAsInt(t));
 234                     }
 235                 };
 236             }
 237         };
 238     }
 239 
 240     @Override
 241     public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
 242         Objects.requireNonNull(mapper);
 243         return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE,
 244                                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 245             @Override
 246             Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
 247                 return new Sink.ChainedInt<U>(sink) {
 248                     @Override
 249                     public void accept(int t) {
 250                         downstream.accept(mapper.apply(t));
 251                     }
 252                 };
 253             }
 254         };
 255     }
 256 
 257     @Override
 258     public final LongStream mapToLong(IntToLongFunction mapper) {
 259         Objects.requireNonNull(mapper);
 260         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 261                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 262             @Override
 263             Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
 264                 return new Sink.ChainedInt<Long>(sink) {
 265                     @Override
 266                     public void accept(int t) {
 267                         downstream.accept(mapper.applyAsLong(t));
 268                     }
 269                 };
 270             }
 271         };
 272     }
 273 
 274     @Override
 275     public final DoubleStream mapToDouble(IntToDoubleFunction mapper) {
 276         Objects.requireNonNull(mapper);
 277         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 278                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 279             @Override
 280             Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
 281                 return new Sink.ChainedInt<Double>(sink) {
 282                     @Override
 283                     public void accept(int t) {
 284                         downstream.accept(mapper.applyAsDouble(t));
 285                     }
 286                 };
 287             }
 288         };
 289     }
 290 
 291     @Override
 292     public final IntStream flatMap(IntFunction<? extends IntStream> mapper) {
 293         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 294                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 295             @Override
 296             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 297                 return new Sink.ChainedInt<Integer>(sink) {
 298                     @Override
 299                     public void begin(long size) {
 300                         downstream.begin(-1);
 301                     }
 302 
 303                     @Override
 304                     public void accept(int t) {
 305                         try (IntStream result = mapper.apply(t)) {
 306                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 307                             if (result != null)
 308                                 result.sequential().forEach(i -> downstream.accept(i));
 309                         }
 310                     }
 311                 };
 312             }
 313         };
 314     }
 315 
 316     @Override
 317     public IntStream unordered() {
 318         if (!isOrdered())
 319             return this;
 320         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) {
 321             @Override
 322             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 323                 return sink;
 324             }
 325         };
 326     }
 327 
 328     @Override
 329     public final IntStream filter(IntPredicate predicate) {
 330         Objects.requireNonNull(predicate);
 331         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 332                                         StreamOpFlag.NOT_SIZED) {
 333             @Override
 334             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 335                 return new Sink.ChainedInt<Integer>(sink) {
 336                     @Override
 337                     public void begin(long size) {
 338                         downstream.begin(-1);
 339                     }
 340 
 341                     @Override
 342                     public void accept(int t) {
 343                         if (predicate.test(t))
 344                             downstream.accept(t);
 345                     }
 346                 };
 347             }
 348         };
 349     }
 350 
 351     @Override
 352     public final IntStream peek(IntConsumer consumer) {
 353         Objects.requireNonNull(consumer);
 354         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 355                                         0) {
 356             @Override
 357             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 358                 return new Sink.ChainedInt<Integer>(sink) {
 359                     @Override
 360                     public void accept(int t) {
 361                         consumer.accept(t);
 362                         downstream.accept(t);
 363                     }
 364                 };
 365             }
 366         };
 367     }
 368 
 369     // Stateful intermediate ops from IntStream
 370 
 371     private IntStream slice(long skip, long limit) {
 372         return SliceOps.makeInt(this, skip, limit);
 373     }
 374 
 375     @Override
 376     public final IntStream limit(long maxSize) {
 377         if (maxSize < 0)
 378             throw new IllegalArgumentException(Long.toString(maxSize));
 379         return slice(0, maxSize);
 380     }
 381 
 382     @Override
 383     public final IntStream substream(long startingOffset) {
 384         if (startingOffset < 0)
 385             throw new IllegalArgumentException(Long.toString(startingOffset));
 386         if (startingOffset == 0)
 387             return this;
 388         else
 389             return slice(startingOffset, -1);
 390     }
 391 
 392     @Override
 393     public final IntStream substream(long startingOffset, long endingOffset) {
 394         if (startingOffset < 0 || endingOffset < startingOffset)
 395             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
 396         return slice(startingOffset, endingOffset - startingOffset);
 397     }
 398 
 399     @Override
 400     public final IntStream sorted() {
 401         return SortedOps.makeInt(this);
 402     }
 403 
 404     @Override
 405     public final IntStream distinct() {
 406         // While functional and quick to implement, this approach is not very efficient.
 407         // An efficient version requires an int-specific map/set implementation.
 408         return boxed().distinct().mapToInt(i -> i);
 409     }
 410 
 411     // Terminal ops from IntStream
 412 
 413     @Override
 414     public void forEach(IntConsumer action) {
 415         evaluate(ForEachOps.makeInt(action, false));
 416     }
 417 
 418     @Override
 419     public void forEachOrdered(IntConsumer action) {
 420         evaluate(ForEachOps.makeInt(action, true));
 421     }
 422 
 423     @Override
 424     public final int sum() {
 425         return reduce(0, Integer::sum);
 426     }
 427 
 428     @Override
 429     public final OptionalInt min() {
 430         return reduce(Math::min);
 431     }
 432 
 433     @Override
 434     public final OptionalInt max() {
 435         return reduce(Math::max);
 436     }
 437 
 438     @Override
 439     public final long count() {
 440         return asLongStream().map(e -> 1L).sum();
 441     }
 442 
 443     @Override
 444     public final OptionalDouble average() {
 445         long[] avg = collect(() -> new long[2],
 446                              (ll, i) -> {
 447                                  ll[0]++;
 448                                  ll[1] += i;
 449                              },
 450                              (ll, rr) -> {
 451                                  ll[0] += rr[0];
 452                                  ll[1] += rr[1];
 453                              });
 454         return avg[0] > 0
 455                ? OptionalDouble.of((double) avg[1] / avg[0])
 456                : OptionalDouble.empty();
 457     }
 458 
 459     @Override
 460     public final IntSummaryStatistics summaryStatistics() {
 461         return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept,
 462                        IntSummaryStatistics::combine);
 463     }
 464 
 465     @Override
 466     public final int reduce(int identity, IntBinaryOperator op) {
 467         return evaluate(ReduceOps.makeInt(identity, op));
 468     }
 469 
 470     @Override
 471     public final OptionalInt reduce(IntBinaryOperator op) {
 472         return evaluate(ReduceOps.makeInt(op));
 473     }
 474 
 475     @Override
 476     public final <R> R collect(Supplier<R> resultFactory,
 477                                ObjIntConsumer<R> accumulator,
 478                                BiConsumer<R, R> combiner) {
 479         BinaryOperator<R> operator = (left, right) -> {
 480             combiner.accept(left, right);
 481             return left;
 482         };
 483         return evaluate(ReduceOps.makeInt(resultFactory, accumulator, operator));
 484     }
 485 
 486     @Override
 487     public final boolean anyMatch(IntPredicate predicate) {
 488         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY));
 489     }
 490 
 491     @Override
 492     public final boolean allMatch(IntPredicate predicate) {
 493         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL));
 494     }
 495 
 496     @Override
 497     public final boolean noneMatch(IntPredicate predicate) {
 498         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE));
 499     }
 500 
 501     @Override
 502     public final OptionalInt findFirst() {
 503         return evaluate(FindOps.makeInt(true));
 504     }
 505 
 506     @Override
 507     public final OptionalInt findAny() {
 508         return evaluate(FindOps.makeInt(false));
 509     }
 510 
 511     @Override
 512     public final int[] toArray() {
 513         return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new))
 514                         .asPrimitiveArray();
 515     }
 516 
 517     //
 518 
 519     /**
 520      * Source stage of an IntStream.
 521      *
 522      * @param <E_IN> type of elements in the upstream source
 523      * @since 1.8
 524      */
 525     static class Head<E_IN> extends IntPipeline<E_IN> {
 526         /**
 527          * Constructor for the source stage of an IntStream.
 528          *
 529          * @param source {@code Supplier<Spliterator>} describing the stream
 530          *               source
 531          * @param sourceFlags the source flags for the stream source, described
 532          *                    in {@link StreamOpFlag}
 533          * @param parallel {@code true} if the pipeline is parallel
 534          */
 535         Head(Supplier<? extends Spliterator<Integer>> source,
 536              int sourceFlags, boolean parallel) {
 537             super(source, sourceFlags, parallel);
 538         }
 539 
 540         /**
 541          * Constructor for the source stage of an IntStream.
 542          *
 543          * @param source {@code Spliterator} describing the stream source
 544          * @param sourceFlags the source flags for the stream source, described
 545          *                    in {@link StreamOpFlag}
 546          * @param parallel {@code true} if the pipeline is parallel
 547          */
 548         Head(Spliterator<Integer> source,
 549              int sourceFlags, boolean parallel) {
 550             super(source, sourceFlags, parallel);
 551         }
 552 
 553         @Override
 554         final boolean opIsStateful() {
 555             throw new UnsupportedOperationException();
 556         }
 557 
 558         @Override
 559         final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) {
 560             throw new UnsupportedOperationException();
 561         }
 562 
 563         // Optimized sequential terminal operations for the head of the pipeline
 564 
 565         @Override
 566         public void forEach(IntConsumer action) {
 567             if (!isParallel()) {
 568                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 569             }
 570             else {
 571                 super.forEach(action);
 572             }
 573         }
 574 
 575         @Override
 576         public void forEachOrdered(IntConsumer action) {
 577             if (!isParallel()) {
 578                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 579             }
 580             else {
 581                 super.forEachOrdered(action);
 582             }
 583         }
 584     }
 585 
 586     /**
 587      * Base class for a stateless intermediate stage of an IntStream
 588      *
 589      * @param <E_IN> type of elements in the upstream source
 590      * @since 1.8
 591      */
 592     abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> {
 593         /**
 594          * Construct a new IntStream by appending a stateless intermediate
 595          * operation to an existing stream.
 596          * @param upstream The upstream pipeline stage
 597          * @param inputShape The stream shape for the upstream pipeline stage
 598          * @param opFlags Operation flags for the new stage
 599          */
 600         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 601                     StreamShape inputShape,
 602                     int opFlags) {
 603             super(upstream, opFlags);
 604             assert upstream.getOutputShape() == inputShape;
 605         }
 606 
 607         @Override
 608         final boolean opIsStateful() {
 609             return false;
 610         }
 611     }
 612 
 613     /**
 614      * Base class for a stateful intermediate stage of an IntStream.
 615      *
 616      * @param <E_IN> type of elements in the upstream source
 617      * @since 1.8
 618      */
 619     abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> {
 620         /**
 621          * Construct a new IntStream by appending a stateful intermediate
 622          * operation to an existing stream.
 623          * @param upstream The upstream pipeline stage
 624          * @param inputShape The stream shape for the upstream pipeline stage
 625          * @param opFlags Operation flags for the new stage
 626          */
 627         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 628                    StreamShape inputShape,
 629                    int opFlags) {
 630             super(upstream, opFlags);
 631             assert upstream.getOutputShape() == inputShape;
 632         }
 633 
 634         @Override
 635         final boolean opIsStateful() {
 636             return true;
 637         }
 638 
 639         @Override
 640         abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
 641                                                          Spliterator<P_IN> spliterator,
 642                                                          IntFunction<Integer[]> generator);
 643     }
 644 }