1 /*
   2  * Copyright (c) 2013, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.LongSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalLong;
  31 import java.util.PrimitiveIterator;
  32 import java.util.Spliterator;
  33 import java.util.Spliterators;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BinaryOperator;
  36 import java.util.function.IntFunction;
  37 import java.util.function.LongBinaryOperator;
  38 import java.util.function.LongConsumer;
  39 import java.util.function.LongFunction;
  40 import java.util.function.LongPredicate;
  41 import java.util.function.LongToDoubleFunction;
  42 import java.util.function.LongToIntFunction;
  43 import java.util.function.LongUnaryOperator;
  44 import java.util.function.ObjLongConsumer;
  45 import java.util.function.Supplier;
  46 
  47 /**
  48  * Abstract base class for an intermediate pipeline stage or pipeline source
  49  * stage implementing whose elements are of type {@code long}.
  50  *
  51  * @param <E_IN> type of elements in the upstream source
  52  * @since 1.8
  53  */
  54 abstract class LongPipeline<E_IN>
  55         extends AbstractPipeline<E_IN, Long, LongStream>
  56         implements LongStream {
  57 
  58     /**
  59      * Constructor for the head of a stream pipeline.
  60      *
  61      * @param source {@code Supplier<Spliterator>} describing the stream source
  62      * @param sourceFlags the source flags for the stream source, described in
  63      *        {@link StreamOpFlag}
  64      * @param parallel {@code true} if the pipeline is parallel
  65      */
  66     LongPipeline(Supplier<? extends Spliterator<Long>> source,
  67                  int sourceFlags, boolean parallel) {
  68         super(source, sourceFlags, parallel);
  69     }
  70 
  71     /**
  72      * Constructor for the head of a stream pipeline.
  73      *
  74      * @param source {@code Spliterator} describing the stream source
  75      * @param sourceFlags the source flags for the stream source, described in
  76      *        {@link StreamOpFlag}
  77      * @param parallel {@code true} if the pipeline is parallel
  78      */
  79     LongPipeline(Spliterator<Long> source,
  80                  int sourceFlags, boolean parallel) {
  81         super(source, sourceFlags, parallel);
  82     }
  83 
  84     /**
  85      * Constructor for appending an intermediate operation onto an existing pipeline.
  86      *
  87      * @param upstream the upstream element source.
  88      * @param opFlags the operation flags
  89      */
  90     LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  91         super(upstream, opFlags);
  92     }
  93 
  94     /**
  95      * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply
  96      * by casting.
  97      */
  98     private static LongConsumer adapt(Sink<Long> sink) {
  99         if (sink instanceof LongConsumer) {
 100             return (LongConsumer) sink;
 101         } else {
 102             if (Tripwire.ENABLED)
 103                 Tripwire.trip(AbstractPipeline.class,
 104                               "using LongStream.adapt(Sink<Long> s)");
 105             return sink::accept;
 106         }
 107     }
 108 
 109     /**
 110      * Adapt a {@code Spliterator<Long>} to a {@code Spliterator.OfLong}.
 111      *
 112      * @implNote
 113      * The implementation attempts to cast to a Spliterator.OfLong, and throws
 114      * an exception if this cast is not possible.
 115      */
 116     private static Spliterator.OfLong adapt(Spliterator<Long> s) {
 117         if (s instanceof Spliterator.OfLong) {
 118             return (Spliterator.OfLong) s;
 119         } else {
 120             if (Tripwire.ENABLED)
 121                 Tripwire.trip(AbstractPipeline.class,
 122                               "using LongStream.adapt(Spliterator<Long> s)");
 123             throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)");
 124         }
 125     }
 126 
 127 
 128     // Shape-specific methods
 129 
 130     @Override
 131     final StreamShape getOutputShape() {
 132         return StreamShape.LONG_VALUE;
 133     }
 134 
 135     @Override
 136     final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
 137                                            Spliterator<P_IN> spliterator,
 138                                            boolean flattenTree,
 139                                            IntFunction<Long[]> generator) {
 140         return Nodes.collectLong(helper, spliterator, flattenTree);
 141     }
 142 
 143     @Override
 144     final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
 145                                         Supplier<Spliterator<P_IN>> supplier,
 146                                         boolean isParallel) {
 147         return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
 148     }
 149 
 150     @Override
 151     @SuppressWarnings("unchecked")
 152     final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
 153         return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
 154     }
 155 
 156     @Override
 157     final boolean forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
 158         Spliterator.OfLong spl = adapt(spliterator);
 159         LongConsumer adaptedSink =  adapt(sink);
 160         boolean cancelled;
 161         do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
 162         return cancelled;
 163     }
 164 
 165     @Override
 166     final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
 167         return Nodes.longBuilder(exactSizeIfKnown);
 168     }
 169 
 170     private final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper, int opFlags) {
 171         return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE, opFlags) {
 172             @Override
 173             Sink<Long> opWrapSink(int flags, Sink<U> sink) {
 174                 return new Sink.ChainedLong<U>(sink) {
 175                     @Override
 176                     public void accept(long t) {
 177                         downstream.accept(mapper.apply(t));
 178                     }
 179                 };
 180             }
 181         };
 182     }
 183 
 184     // LongStream
 185 
 186     @Override
 187     public final PrimitiveIterator.OfLong iterator() {
 188         return Spliterators.iterator(spliterator());
 189     }
 190 
 191     @Override
 192     public final Spliterator.OfLong spliterator() {
 193         return adapt(super.spliterator());
 194     }
 195 
 196     // Stateless intermediate ops from LongStream
 197 
 198     @Override
 199     public final DoubleStream asDoubleStream() {
 200         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_DISTINCT) {
 201             @Override
 202             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
 203                 return new Sink.ChainedLong<Double>(sink) {
 204                     @Override
 205                     public void accept(long t) {
 206                         downstream.accept((double) t);
 207                     }
 208                 };
 209             }
 210         };
 211     }
 212 
 213     @Override
 214     public final Stream<Long> boxed() {
 215         return mapToObj(Long::valueOf, 0);
 216     }
 217 
 218     @Override
 219     public final LongStream map(LongUnaryOperator mapper) {
 220         Objects.requireNonNull(mapper);
 221         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 222                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 223             @Override
 224             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 225                 return new Sink.ChainedLong<Long>(sink) {
 226                     @Override
 227                     public void accept(long t) {
 228                         downstream.accept(mapper.applyAsLong(t));
 229                     }
 230                 };
 231             }
 232         };
 233     }
 234 
 235     @Override
 236     public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
 237         Objects.requireNonNull(mapper);
 238         return mapToObj(mapper, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT);
 239     }
 240 
 241     @Override
 242     public final IntStream mapToInt(LongToIntFunction mapper) {
 243         Objects.requireNonNull(mapper);
 244         return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 245                                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 246             @Override
 247             Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
 248                 return new Sink.ChainedLong<Integer>(sink) {
 249                     @Override
 250                     public void accept(long t) {
 251                         downstream.accept(mapper.applyAsInt(t));
 252                     }
 253                 };
 254             }
 255         };
 256     }
 257 
 258     @Override
 259     public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
 260         Objects.requireNonNull(mapper);
 261         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 262                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 263             @Override
 264             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
 265                 return new Sink.ChainedLong<Double>(sink) {
 266                     @Override
 267                     public void accept(long t) {
 268                         downstream.accept(mapper.applyAsDouble(t));
 269                     }
 270                 };
 271             }
 272         };
 273     }
 274 
 275     @Override
 276     public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
 277         Objects.requireNonNull(mapper);
 278         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 279                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 280             @Override
 281             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 282                 return new Sink.ChainedLong<Long>(sink) {
 283                     @Override
 284                     public void begin(long size) {
 285                         downstream.begin(-1);
 286                     }
 287 
 288                     @Override
 289                     public void accept(long t) {
 290                         try (LongStream result = mapper.apply(t)) {
 291                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 292                             if (result != null)
 293                                 result.sequential().forEach(i -> downstream.accept(i));
 294                         }
 295                     }
 296                 };
 297             }
 298         };
 299     }
 300 
 301     @Override
 302     public LongStream unordered() {
 303         if (!isOrdered())
 304             return this;
 305         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
 306             @Override
 307             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 308                 return sink;
 309             }
 310         };
 311     }
 312 
 313     @Override
 314     public final LongStream filter(LongPredicate predicate) {
 315         Objects.requireNonNull(predicate);
 316         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 317                                      StreamOpFlag.NOT_SIZED) {
 318             @Override
 319             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 320                 return new Sink.ChainedLong<Long>(sink) {
 321                     @Override
 322                     public void begin(long size) {
 323                         downstream.begin(-1);
 324                     }
 325 
 326                     @Override
 327                     public void accept(long t) {
 328                         if (predicate.test(t))
 329                             downstream.accept(t);
 330                     }
 331                 };
 332             }
 333         };
 334     }
 335 
 336     @Override
 337     public final LongStream peek(LongConsumer action) {
 338         Objects.requireNonNull(action);
 339         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 340                                      0) {
 341             @Override
 342             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 343                 return new Sink.ChainedLong<Long>(sink) {
 344                     @Override
 345                     public void accept(long t) {
 346                         action.accept(t);
 347                         downstream.accept(t);
 348                     }
 349                 };
 350             }
 351         };
 352     }
 353 
 354     // Stateful intermediate ops from LongStream
 355 
 356     @Override
 357     public final LongStream limit(long maxSize) {
 358         if (maxSize < 0)
 359             throw new IllegalArgumentException(Long.toString(maxSize));
 360         return SliceOps.makeLong(this, 0, maxSize);
 361     }
 362 
 363     @Override
 364     public final LongStream skip(long n) {
 365         if (n < 0)
 366             throw new IllegalArgumentException(Long.toString(n));
 367         if (n == 0)
 368             return this;
 369         else
 370             return SliceOps.makeLong(this, n, -1);
 371     }
 372 
 373     @Override
 374     public final LongStream takeWhile(LongPredicate predicate) {
 375         return WhileOps.makeTakeWhileLong(this, predicate);
 376     }
 377 
 378     @Override
 379     public final LongStream dropWhile(LongPredicate predicate) {
 380         return WhileOps.makeDropWhileLong(this, predicate);
 381     }
 382 
 383     @Override
 384     public final LongStream sorted() {
 385         return SortedOps.makeLong(this);
 386     }
 387 
 388     @Override
 389     public final LongStream distinct() {
 390         // While functional and quick to implement, this approach is not very efficient.
 391         // An efficient version requires a long-specific map/set implementation.
 392         return boxed().distinct().mapToLong(i -> (long) i);
 393     }
 394 
 395     // Terminal ops from LongStream
 396 
 397     @Override
 398     public void forEach(LongConsumer action) {
 399         evaluate(ForEachOps.makeLong(action, false));
 400     }
 401 
 402     @Override
 403     public void forEachOrdered(LongConsumer action) {
 404         evaluate(ForEachOps.makeLong(action, true));
 405     }
 406 
 407     @Override
 408     public final long sum() {
 409         // use better algorithm to compensate for intermediate overflow?
 410         return reduce(0, Long::sum);
 411     }
 412 
 413     @Override
 414     public final OptionalLong min() {
 415         return reduce(Math::min);
 416     }
 417 
 418     @Override
 419     public final OptionalLong max() {
 420         return reduce(Math::max);
 421     }
 422 
 423     @Override
 424     public final OptionalDouble average() {
 425         long[] avg = collect(() -> new long[2],
 426                              (ll, i) -> {
 427                                  ll[0]++;
 428                                  ll[1] += i;
 429                              },
 430                              (ll, rr) -> {
 431                                  ll[0] += rr[0];
 432                                  ll[1] += rr[1];
 433                              });
 434         return avg[0] > 0
 435                ? OptionalDouble.of((double) avg[1] / avg[0])
 436                : OptionalDouble.empty();
 437     }
 438 
 439     @Override
 440     public final long count() {
 441         return evaluate(ReduceOps.makeLongCounting());
 442     }
 443 
 444     @Override
 445     public final LongSummaryStatistics summaryStatistics() {
 446         return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,
 447                        LongSummaryStatistics::combine);
 448     }
 449 
 450     @Override
 451     public final long reduce(long identity, LongBinaryOperator op) {
 452         return evaluate(ReduceOps.makeLong(identity, op));
 453     }
 454 
 455     @Override
 456     public final OptionalLong reduce(LongBinaryOperator op) {
 457         return evaluate(ReduceOps.makeLong(op));
 458     }
 459 
 460     @Override
 461     public final <R> R collect(Supplier<R> supplier,
 462                                ObjLongConsumer<R> accumulator,
 463                                BiConsumer<R, R> combiner) {
 464         Objects.requireNonNull(combiner);
 465         BinaryOperator<R> operator = (left, right) -> {
 466             combiner.accept(left, right);
 467             return left;
 468         };
 469         return evaluate(ReduceOps.makeLong(supplier, accumulator, operator));
 470     }
 471 
 472     @Override
 473     public final boolean anyMatch(LongPredicate predicate) {
 474         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
 475     }
 476 
 477     @Override
 478     public final boolean allMatch(LongPredicate predicate) {
 479         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
 480     }
 481 
 482     @Override
 483     public final boolean noneMatch(LongPredicate predicate) {
 484         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
 485     }
 486 
 487     @Override
 488     public final OptionalLong findFirst() {
 489         return evaluate(FindOps.makeLong(true));
 490     }
 491 
 492     @Override
 493     public final OptionalLong findAny() {
 494         return evaluate(FindOps.makeLong(false));
 495     }
 496 
 497     @Override
 498     public final long[] toArray() {
 499         return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new))
 500                 .asPrimitiveArray();
 501     }
 502 
 503 
 504     //
 505 
 506     /**
 507      * Source stage of a LongPipeline.
 508      *
 509      * @param <E_IN> type of elements in the upstream source
 510      * @since 1.8
 511      */
 512     static class Head<E_IN> extends LongPipeline<E_IN> {
 513         /**
 514          * Constructor for the source stage of a LongStream.
 515          *
 516          * @param source {@code Supplier<Spliterator>} describing the stream
 517          *               source
 518          * @param sourceFlags the source flags for the stream source, described
 519          *                    in {@link StreamOpFlag}
 520          * @param parallel {@code true} if the pipeline is parallel
 521          */
 522         Head(Supplier<? extends Spliterator<Long>> source,
 523              int sourceFlags, boolean parallel) {
 524             super(source, sourceFlags, parallel);
 525         }
 526 
 527         /**
 528          * Constructor for the source stage of a LongStream.
 529          *
 530          * @param source {@code Spliterator} describing the stream source
 531          * @param sourceFlags the source flags for the stream source, described
 532          *                    in {@link StreamOpFlag}
 533          * @param parallel {@code true} if the pipeline is parallel
 534          */
 535         Head(Spliterator<Long> source,
 536              int sourceFlags, boolean parallel) {
 537             super(source, sourceFlags, parallel);
 538         }
 539 
 540         @Override
 541         final boolean opIsStateful() {
 542             throw new UnsupportedOperationException();
 543         }
 544 
 545         @Override
 546         final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
 547             throw new UnsupportedOperationException();
 548         }
 549 
 550         // Optimized sequential terminal operations for the head of the pipeline
 551 
 552         @Override
 553         public void forEach(LongConsumer action) {
 554             if (!isParallel()) {
 555                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 556             } else {
 557                 super.forEach(action);
 558             }
 559         }
 560 
 561         @Override
 562         public void forEachOrdered(LongConsumer action) {
 563             if (!isParallel()) {
 564                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 565             } else {
 566                 super.forEachOrdered(action);
 567             }
 568         }
 569     }
 570 
 571     /** Base class for a stateless intermediate stage of a LongStream.
 572      *
 573      * @param <E_IN> type of elements in the upstream source
 574      * @since 1.8
 575      */
 576     abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
 577         /**
 578          * Construct a new LongStream by appending a stateless intermediate
 579          * operation to an existing stream.
 580          * @param upstream The upstream pipeline stage
 581          * @param inputShape The stream shape for the upstream pipeline stage
 582          * @param opFlags Operation flags for the new stage
 583          */
 584         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 585                     StreamShape inputShape,
 586                     int opFlags) {
 587             super(upstream, opFlags);
 588             assert upstream.getOutputShape() == inputShape;
 589         }
 590 
 591         @Override
 592         final boolean opIsStateful() {
 593             return false;
 594         }
 595     }
 596 
 597     /**
 598      * Base class for a stateful intermediate stage of a LongStream.
 599      *
 600      * @param <E_IN> type of elements in the upstream source
 601      * @since 1.8
 602      */
 603     abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
 604         /**
 605          * Construct a new LongStream by appending a stateful intermediate
 606          * operation to an existing stream.
 607          * @param upstream The upstream pipeline stage
 608          * @param inputShape The stream shape for the upstream pipeline stage
 609          * @param opFlags Operation flags for the new stage
 610          */
 611         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 612                    StreamShape inputShape,
 613                    int opFlags) {
 614             super(upstream, opFlags);
 615             assert upstream.getOutputShape() == inputShape;
 616         }
 617 
 618         @Override
 619         final boolean opIsStateful() {
 620             return true;
 621         }
 622 
 623         @Override
 624         abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
 625                                                       Spliterator<P_IN> spliterator,
 626                                                       IntFunction<Long[]> generator);
 627     }
 628 }