1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.IntSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalInt;
  31 import java.util.PrimitiveIterator;
  32 import java.util.Spliterator;
  33 import java.util.Spliterators;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BinaryOperator;
  36 import java.util.function.IntBinaryOperator;
  37 import java.util.function.IntConsumer;
  38 import java.util.function.IntFunction;
  39 import java.util.function.IntPredicate;
  40 import java.util.function.IntToDoubleFunction;
  41 import java.util.function.IntToLongFunction;
  42 import java.util.function.IntUnaryOperator;
  43 import java.util.function.ObjIntConsumer;
  44 import java.util.function.Supplier;
  45 
  46 /**
  47  * Abstract base class for an intermediate pipeline stage or pipeline source
  48  * stage implementing whose elements are of type {@code int}.
  49  *
  50  * @param <E_IN> type of elements in the upstream source
  51  * @since 1.8
  52  */
  53 abstract class IntPipeline<E_IN>
  54         extends AbstractPipeline<E_IN, Integer, IntStream>
  55         implements IntStream {
  56 
  57     /**
  58      * Constructor for the head of a stream pipeline.
  59      *
  60      * @param source {@code Supplier<Spliterator>} describing the stream source
  61      * @param sourceFlags The source flags for the stream source, described in
  62      *        {@link StreamOpFlag}
  63      * @param parallel {@code true} if the pipeline is parallel
  64      */
  65     IntPipeline(Supplier<? extends Spliterator<Integer>> source,
  66                 int sourceFlags, boolean parallel) {
  67         super(source, sourceFlags, parallel);
  68     }
  69 
  70     /**
  71      * Constructor for the head of a stream pipeline.
  72      *
  73      * @param source {@code Spliterator} describing the stream source
  74      * @param sourceFlags The source flags for the stream source, described in
  75      *        {@link StreamOpFlag}
  76      * @param parallel {@code true} if the pipeline is parallel
  77      */
  78     IntPipeline(Spliterator<Integer> source,
  79                 int sourceFlags, boolean parallel) {
  80         super(source, sourceFlags, parallel);
  81     }
  82 
  83     /**
  84      * Constructor for appending an intermediate operation onto an existing
  85      * pipeline.
  86      *
  87      * @param upstream the upstream element source
  88      * @param opFlags the operation flags for the new operation
  89      */
  90     IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  91         super(upstream, opFlags);
  92     }
  93 
  94     /**
  95      * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply
  96      * by casting.
  97      */
  98     private static IntConsumer adapt(Sink<Integer> sink) {
  99         if (sink instanceof IntConsumer) {
 100             return (IntConsumer) sink;
 101         }
 102         else {
 103             if (Tripwire.ENABLED)
 104                 Tripwire.trip(AbstractPipeline.class,
 105                               "using IntStream.adapt(Sink<Integer> s)");
 106             return sink::accept;
 107         }
 108     }
 109 
 110     /**
 111      * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}.
 112      *
 113      * @implNote
 114      * The implementation attempts to cast to a Spliterator.OfInt, and throws an
 115      * exception if this cast is not possible.
 116      */
 117     private static Spliterator.OfInt adapt(Spliterator<Integer> s) {
 118         if (s instanceof Spliterator.OfInt) {
 119             return (Spliterator.OfInt) s;
 120         }
 121         else {
 122             if (Tripwire.ENABLED)
 123                 Tripwire.trip(AbstractPipeline.class,
 124                               "using IntStream.adapt(Spliterator<Integer> s)");
 125             throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)");
 126         }
 127     }
 128 
 129 
 130     // Shape-specific methods
 131 
 132     @Override
 133     final StreamShape getOutputShape() {
 134         return StreamShape.INT_VALUE;
 135     }
 136 
 137     @Override
 138     final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper,
 139                                               Spliterator<P_IN> spliterator,
 140                                               boolean flattenTree,
 141                                               IntFunction<Integer[]> generator) {
 142         return Nodes.collectInt(helper, spliterator, flattenTree);
 143     }
 144 
 145     @Override
 146     final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph,
 147                                            Supplier<Spliterator<P_IN>> supplier,
 148                                            boolean isParallel) {
 149         return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel);
 150     }
 151 
 152     @Override
 153     final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) {
 154         return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier);
 155     }
 156 
 157     @Override
 158     final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
 159         Spliterator.OfInt spl = adapt(spliterator);
 160         IntConsumer adaptedSink = adapt(sink);
 161         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
 162     }
 163 
 164     @Override
 165     final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown,
 166                                                 IntFunction<Integer[]> generator) {
 167         return Nodes.intBuilder(exactSizeIfKnown);
 168     }
 169 
 170 
 171     // IntStream
 172 
 173     @Override
 174     public final PrimitiveIterator.OfInt iterator() {
 175         return Spliterators.iterator(spliterator());
 176     }
 177 
 178     @Override
 179     public final Spliterator.OfInt spliterator() {
 180         return adapt(super.spliterator());
 181     }
 182 
 183     // Stateless intermediate ops from IntStream
 184 
 185     @Override
 186     public final LongStream asLongStream() {
 187         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 188                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 189             @Override
 190             Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
 191                 return new Sink.ChainedInt(sink) {
 192                     @Override
 193                     public void accept(int t) {
 194                         downstream.accept((long) t);
 195                     }
 196                 };
 197             }
 198         };
 199     }
 200 
 201     @Override
 202     public final DoubleStream asDoubleStream() {
 203         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 204                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 205             @Override
 206             Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
 207                 return new Sink.ChainedInt(sink) {
 208                     @Override
 209                     public void accept(int t) {
 210                         downstream.accept((double) t);
 211                     }
 212                 };
 213             }
 214         };
 215     }
 216 
 217     @Override
 218     public final Stream<Integer> boxed() {
 219         return mapToObj(Integer::valueOf);
 220     }
 221 
 222     @Override
 223     public final IntStream map(IntUnaryOperator mapper) {
 224         Objects.requireNonNull(mapper);
 225         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 226                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 227             @Override
 228             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 229                 return new Sink.ChainedInt(sink) {
 230                     @Override
 231                     public void accept(int t) {
 232                         downstream.accept(mapper.applyAsInt(t));
 233                     }
 234                 };
 235             }
 236         };
 237     }
 238 
 239     @Override
 240     public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
 241         Objects.requireNonNull(mapper);
 242         return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE,
 243                                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 244             @Override
 245             Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
 246                 return new Sink.ChainedInt(sink) {
 247                     @Override
 248                     public void accept(int t) {
 249                         downstream.accept(mapper.apply(t));
 250                     }
 251                 };
 252             }
 253         };
 254     }
 255 
 256     @Override
 257     public final LongStream mapToLong(IntToLongFunction mapper) {
 258         Objects.requireNonNull(mapper);
 259         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 260                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 261             @Override
 262             Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
 263                 return new Sink.ChainedInt(sink) {
 264                     @Override
 265                     public void accept(int t) {
 266                         downstream.accept(mapper.applyAsLong(t));
 267                     }
 268                 };
 269             }
 270         };
 271     }
 272 
 273     @Override
 274     public final DoubleStream mapToDouble(IntToDoubleFunction mapper) {
 275         Objects.requireNonNull(mapper);
 276         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 277                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 278             @Override
 279             Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
 280                 return new Sink.ChainedInt(sink) {
 281                     @Override
 282                     public void accept(int t) {
 283                         downstream.accept(mapper.applyAsDouble(t));
 284                     }
 285                 };
 286             }
 287         };
 288     }
 289 
 290     @Override
 291     public final IntStream flatMap(IntFunction<? extends IntStream> mapper) {
 292         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 293                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 294             @Override
 295             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 296                 return new Sink.ChainedInt(sink) {
 297                     @Override
 298                     public void begin(long size) {
 299                         downstream.begin(-1);
 300                     }
 301 
 302                     @Override
 303                     public void accept(int t) {
 304                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 305                         IntStream result = mapper.apply(t);
 306                         if (result != null)
 307                             result.sequential().forEach(i -> downstream.accept(i));
 308                     }
 309                 };
 310             }
 311         };
 312     }
 313 
 314     @Override
 315     public IntStream unordered() {
 316         if (!isOrdered())
 317             return this;
 318         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) {
 319             @Override
 320             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 321                 return sink;
 322             }
 323         };
 324     }
 325 
 326     @Override
 327     public final IntStream filter(IntPredicate predicate) {
 328         Objects.requireNonNull(predicate);
 329         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 330                                         StreamOpFlag.NOT_SIZED) {
 331             @Override
 332             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 333                 return new Sink.ChainedInt(sink) {
 334                     @Override
 335                     public void begin(long size) {
 336                         downstream.begin(-1);
 337                     }
 338 
 339                     @Override
 340                     public void accept(int t) {
 341                         if (predicate.test(t))
 342                             downstream.accept(t);
 343                     }
 344                 };
 345             }
 346         };
 347     }
 348 
 349     @Override
 350     public final IntStream peek(IntConsumer consumer) {
 351         Objects.requireNonNull(consumer);
 352         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
 353                                         0) {
 354             @Override
 355             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 356                 return new Sink.ChainedInt(sink) {
 357                     @Override
 358                     public void accept(int t) {
 359                         consumer.accept(t);
 360                         downstream.accept(t);
 361                     }
 362                 };
 363             }
 364         };
 365     }
 366 
 367     // Stateful intermediate ops from IntStream
 368 
 369     private IntStream slice(long skip, long limit) {
 370         return SliceOps.makeInt(this, skip, limit);
 371     }
 372 
 373     @Override
 374     public final IntStream limit(long maxSize) {
 375         if (maxSize < 0)
 376             throw new IllegalArgumentException(Long.toString(maxSize));
 377         return slice(0, maxSize);
 378     }
 379 
 380     @Override
 381     public final IntStream substream(long startingOffset) {
 382         if (startingOffset < 0)
 383             throw new IllegalArgumentException(Long.toString(startingOffset));
 384         if (startingOffset == 0)
 385             return this;
 386         else
 387             return slice(startingOffset, -1);
 388     }
 389 
 390     @Override
 391     public final IntStream substream(long startingOffset, long endingOffset) {
 392         if (startingOffset < 0 || endingOffset < startingOffset)
 393             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
 394         return slice(startingOffset, endingOffset - startingOffset);
 395     }
 396 
 397     @Override
 398     public final IntStream sorted() {
 399         return SortedOps.makeInt(this);
 400     }
 401 
 402     @Override
 403     public final IntStream distinct() {
 404         // While functional and quick to implement, this approach is not very efficient.
 405         // An efficient version requires an int-specific map/set implementation.
 406         return boxed().distinct().mapToInt(i -> i);
 407     }
 408 
 409     // Terminal ops from IntStream
 410 
 411     @Override
 412     public void forEach(IntConsumer action) {
 413         evaluate(ForEachOps.makeInt(action, false));
 414     }
 415 
 416     @Override
 417     public void forEachOrdered(IntConsumer action) {
 418         evaluate(ForEachOps.makeInt(action, true));
 419     }
 420 
 421     @Override
 422     public final int sum() {
 423         return reduce(0, Integer::sum);
 424     }
 425 
 426     @Override
 427     public final OptionalInt min() {
 428         return reduce(Math::min);
 429     }
 430 
 431     @Override
 432     public final OptionalInt max() {
 433         return reduce(Math::max);
 434     }
 435 
 436     @Override
 437     public final long count() {
 438         return asLongStream().map(e -> 1L).sum();
 439     }
 440 
 441     @Override
 442     public final OptionalDouble average() {
 443         long[] avg = collect(() -> new long[2],
 444                              (ll, i) -> {
 445                                  ll[0]++;
 446                                  ll[1] += i;
 447                              },
 448                              (ll, rr) -> {
 449                                  ll[0] += rr[0];
 450                                  ll[1] += rr[1];
 451                              });
 452         return avg[0] > 0
 453                ? OptionalDouble.of((double) avg[1] / avg[0])
 454                : OptionalDouble.empty();
 455     }
 456 
 457     @Override
 458     public final IntSummaryStatistics summaryStatistics() {
 459         return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept,
 460                        IntSummaryStatistics::combine);
 461     }
 462 
 463     @Override
 464     public final int reduce(int identity, IntBinaryOperator op) {
 465         return evaluate(ReduceOps.makeInt(identity, op));
 466     }
 467 
 468     @Override
 469     public final OptionalInt reduce(IntBinaryOperator op) {
 470         return evaluate(ReduceOps.makeInt(op));
 471     }
 472 
 473     @Override
 474     public final <R> R collect(Supplier<R> resultFactory,
 475                                ObjIntConsumer<R> accumulator,
 476                                BiConsumer<R, R> combiner) {
 477         BinaryOperator<R> operator = (left, right) -> {
 478             combiner.accept(left, right);
 479             return left;
 480         };
 481         return evaluate(ReduceOps.makeInt(resultFactory, accumulator, operator));
 482     }
 483 
 484     @Override
 485     public final boolean anyMatch(IntPredicate predicate) {
 486         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY));
 487     }
 488 
 489     @Override
 490     public final boolean allMatch(IntPredicate predicate) {
 491         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL));
 492     }
 493 
 494     @Override
 495     public final boolean noneMatch(IntPredicate predicate) {
 496         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE));
 497     }
 498 
 499     @Override
 500     public final OptionalInt findFirst() {
 501         return evaluate(FindOps.makeInt(true));
 502     }
 503 
 504     @Override
 505     public final OptionalInt findAny() {
 506         return evaluate(FindOps.makeInt(false));
 507     }
 508 
 509     @Override
 510     public final int[] toArray() {
 511         return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new))
 512                         .asPrimitiveArray();
 513     }
 514 
 515     //
 516 
 517     /**
 518      * Source stage of an IntStream.
 519      *
 520      * @param <E_IN> type of elements in the upstream source
 521      * @since 1.8
 522      */
 523     static class Head<E_IN> extends IntPipeline<E_IN> {
 524         /**
 525          * Constructor for the source stage of an IntStream.
 526          *
 527          * @param source {@code Supplier<Spliterator>} describing the stream
 528          *               source
 529          * @param sourceFlags the source flags for the stream source, described
 530          *                    in {@link StreamOpFlag}
 531          * @param parallel {@code true} if the pipeline is parallel
 532          */
 533         Head(Supplier<? extends Spliterator<Integer>> source,
 534              int sourceFlags, boolean parallel) {
 535             super(source, sourceFlags, parallel);
 536         }
 537 
 538         /**
 539          * Constructor for the source stage of an IntStream.
 540          *
 541          * @param source {@code Spliterator} describing the stream source
 542          * @param sourceFlags the source flags for the stream source, described
 543          *                    in {@link StreamOpFlag}
 544          * @param parallel {@code true} if the pipeline is parallel
 545          */
 546         Head(Spliterator<Integer> source,
 547              int sourceFlags, boolean parallel) {
 548             super(source, sourceFlags, parallel);
 549         }
 550 
 551         @Override
 552         final boolean opIsStateful() {
 553             throw new UnsupportedOperationException();
 554         }
 555 
 556         @Override
 557         final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) {
 558             throw new UnsupportedOperationException();
 559         }
 560 
 561         // Optimized sequential terminal operations for the head of the pipeline
 562 
 563         @Override
 564         public void forEach(IntConsumer action) {
 565             if (!isParallel()) {
 566                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 567             }
 568             else {
 569                 super.forEach(action);
 570             }
 571         }
 572 
 573         @Override
 574         public void forEachOrdered(IntConsumer action) {
 575             if (!isParallel()) {
 576                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 577             }
 578             else {
 579                 super.forEachOrdered(action);
 580             }
 581         }
 582     }
 583 
 584     /**
 585      * Base class for a stateless intermediate stage of an IntStream
 586      *
 587      * @param <E_IN> type of elements in the upstream source
 588      * @since 1.8
 589      */
 590     abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> {
 591         /**
 592          * Construct a new IntStream by appending a stateless intermediate
 593          * operation to an existing stream.
 594          * @param upstream The upstream pipeline stage
 595          * @param inputShape The stream shape for the upstream pipeline stage
 596          * @param opFlags Operation flags for the new stage
 597          */
 598         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 599                     StreamShape inputShape,
 600                     int opFlags) {
 601             super(upstream, opFlags);
 602             assert upstream.getOutputShape() == inputShape;
 603         }
 604 
 605         @Override
 606         final boolean opIsStateful() {
 607             return false;
 608         }
 609     }
 610 
 611     /**
 612      * Base class for a stateful intermediate stage of an IntStream.
 613      *
 614      * @param <E_IN> type of elements in the upstream source
 615      * @since 1.8
 616      */
 617     abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> {
 618         /**
 619          * Construct a new IntStream by appending a stateful intermediate
 620          * operation to an existing stream.
 621          * @param upstream The upstream pipeline stage
 622          * @param inputShape The stream shape for the upstream pipeline stage
 623          * @param opFlags Operation flags for the new stage
 624          */
 625         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 626                    StreamShape inputShape,
 627                    int opFlags) {
 628             super(upstream, opFlags);
 629             assert upstream.getOutputShape() == inputShape;
 630         }
 631 
 632         @Override
 633         final boolean opIsStateful() {
 634             return true;
 635         }
 636 
 637         @Override
 638         abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
 639                                                          Spliterator<P_IN> spliterator,
 640                                                          IntFunction<Integer[]> generator);
 641     }
 642 }