1 /*
   2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.LongSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalLong;
  31 import java.util.PrimitiveIterator;
  32 import java.util.Spliterator;
  33 import java.util.Spliterators;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BinaryOperator;
  36 import java.util.function.IntFunction;
  37 import java.util.function.LongBinaryOperator;
  38 import java.util.function.LongConsumer;
  39 import java.util.function.LongFunction;
  40 import java.util.function.LongPredicate;
  41 import java.util.function.LongToDoubleFunction;
  42 import java.util.function.LongToIntFunction;
  43 import java.util.function.LongUnaryOperator;
  44 import java.util.function.ObjLongConsumer;
  45 import java.util.function.Supplier;
  46 
  47 /**
  48  * Abstract base class for an intermediate pipeline stage or pipeline source
  49  * stage implementing whose elements are of type {@code long}.
  50  *
  51  * @param <E_IN> type of elements in the upstream source
  52  * @since 1.8
  53  */
  54 abstract class LongPipeline<E_IN>
  55         extends AbstractPipeline<E_IN, Long, LongStream>
  56         implements LongStream {
  57 
  58     /**
  59      * Constructor for the head of a stream pipeline.
  60      *
  61      * @param source {@code Supplier<Spliterator>} describing the stream source
  62      * @param sourceFlags the source flags for the stream source, described in
  63      *        {@link StreamOpFlag}
  64      * @param parallel {@code true} if the pipeline is parallel
  65      */
  66     LongPipeline(Supplier<? extends Spliterator<Long>> source,
  67                  int sourceFlags, boolean parallel) {
  68         super(source, sourceFlags, parallel);
  69     }
  70 
  71     /**
  72      * Constructor for the head of a stream pipeline.
  73      *
  74      * @param source {@code Spliterator} describing the stream source
  75      * @param sourceFlags the source flags for the stream source, described in
  76      *        {@link StreamOpFlag}
  77      * @param parallel {@code true} if the pipeline is parallel
  78      */
  79     LongPipeline(Spliterator<Long> source,
  80                  int sourceFlags, boolean parallel) {
  81         super(source, sourceFlags, parallel);
  82     }
  83 
  84     /**
  85      * Constructor for appending an intermediate operation onto an existing pipeline.
  86      *
  87      * @param upstream the upstream element source.
  88      * @param opFlags the operation flags
  89      */
  90     LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  91         super(upstream, opFlags);
  92     }
  93 
  94     /**
  95      * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply
  96      * by casting.
  97      */
  98     private static LongConsumer adapt(Sink<Long> sink) {
  99         if (sink instanceof LongConsumer) {
 100             return (LongConsumer) sink;
 101         } else {
 102             if (Tripwire.ENABLED)
 103                 Tripwire.trip(AbstractPipeline.class,
 104                               "using LongStream.adapt(Sink<Long> s)");
 105             return sink::accept;
 106         }
 107     }
 108 
 109     /**
 110      * Adapt a {@code Spliterator<Long>} to a {@code Spliterator.OfLong}.
 111      *
 112      * @implNote
 113      * The implementation attempts to cast to a Spliterator.OfLong, and throws
 114      * an exception if this cast is not possible.
 115      */
 116     private static Spliterator.OfLong adapt(Spliterator<Long> s) {
 117         if (s instanceof Spliterator.OfLong) {
 118             return (Spliterator.OfLong) s;
 119         } else {
 120             if (Tripwire.ENABLED)
 121                 Tripwire.trip(AbstractPipeline.class,
 122                               "using LongStream.adapt(Spliterator<Long> s)");
 123             throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)");
 124         }
 125     }
 126 
 127 
 128     // Shape-specific methods
 129 
 130     @Override
 131     final StreamShape getOutputShape() {
 132         return StreamShape.LONG_VALUE;
 133     }
 134 
 135     @Override
 136     final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
 137                                            Spliterator<P_IN> spliterator,
 138                                            boolean flattenTree,
 139                                            IntFunction<Long[]> generator) {
 140         return Nodes.collectLong(helper, spliterator, flattenTree);
 141     }
 142 
 143     @Override
 144     final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
 145                                         Supplier<Spliterator<P_IN>> supplier,
 146                                         boolean isParallel) {
 147         return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
 148     }
 149 
 150     @Override
 151     @SuppressWarnings("unchecked")
 152     final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
 153         return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
 154     }
 155 
 156     @Override
 157     final void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
 158         Spliterator.OfLong spl = adapt(spliterator);
 159         LongConsumer adaptedSink =  adapt(sink);
 160         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
 161     }
 162 
 163     @Override
 164     final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
 165         return Nodes.longBuilder(exactSizeIfKnown);
 166     }
 167 
 168 
 169     // LongStream
 170 
 171     @Override
 172     public final PrimitiveIterator.OfLong iterator() {
 173         return Spliterators.iterator(spliterator());
 174     }
 175 
 176     @Override
 177     public final Spliterator.OfLong spliterator() {
 178         return adapt(super.spliterator());
 179     }
 180 
 181     // Stateless intermediate ops from LongStream
 182 
 183     @Override
 184     public final DoubleStream asDoubleStream() {
 185         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 186                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 187             @Override
 188             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
 189                 return new Sink.ChainedLong<Double>(sink) {
 190                     @Override
 191                     public void accept(long t) {
 192                         downstream.accept((double) t);
 193                     }
 194                 };
 195             }
 196         };
 197     }
 198 
 199     @Override
 200     public final Stream<Long> boxed() {
 201         return mapToObj(Long::valueOf);
 202     }
 203 
 204     @Override
 205     public final LongStream map(LongUnaryOperator mapper) {
 206         Objects.requireNonNull(mapper);
 207         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 208                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 209             @Override
 210             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 211                 return new Sink.ChainedLong<Long>(sink) {
 212                     @Override
 213                     public void accept(long t) {
 214                         downstream.accept(mapper.applyAsLong(t));
 215                     }
 216                 };
 217             }
 218         };
 219     }
 220 
 221     @Override
 222     public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
 223         Objects.requireNonNull(mapper);
 224         return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE,
 225                                                           StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 226             @Override
 227             Sink<Long> opWrapSink(int flags, Sink<U> sink) {
 228                 return new Sink.ChainedLong<U>(sink) {
 229                     @Override
 230                     public void accept(long t) {
 231                         downstream.accept(mapper.apply(t));
 232                     }
 233                 };
 234             }
 235         };
 236     }
 237 
 238     @Override
 239     public final IntStream mapToInt(LongToIntFunction mapper) {
 240         Objects.requireNonNull(mapper);
 241         return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 242                                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 243             @Override
 244             Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
 245                 return new Sink.ChainedLong<Integer>(sink) {
 246                     @Override
 247                     public void accept(long t) {
 248                         downstream.accept(mapper.applyAsInt(t));
 249                     }
 250                 };
 251             }
 252         };
 253     }
 254 
 255     @Override
 256     public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
 257         Objects.requireNonNull(mapper);
 258         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 259                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 260             @Override
 261             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
 262                 return new Sink.ChainedLong<Double>(sink) {
 263                     @Override
 264                     public void accept(long t) {
 265                         downstream.accept(mapper.applyAsDouble(t));
 266                     }
 267                 };
 268             }
 269         };
 270     }
 271 
 272     @Override
 273     public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
 274         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 275                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 276             @Override
 277             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 278                 return new Sink.ChainedLong<Long>(sink) {
 279                     @Override
 280                     public void begin(long size) {
 281                         downstream.begin(-1);
 282                     }
 283 
 284                     @Override
 285                     public void accept(long t) {
 286                         try (LongStream result = mapper.apply(t)) {
 287                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 288                             if (result != null)
 289                                 result.sequential().forEach(i -> downstream.accept(i));
 290                         }
 291                     }
 292                 };
 293             }
 294         };
 295     }
 296 
 297     @Override
 298     public LongStream unordered() {
 299         if (!isOrdered())
 300             return this;
 301         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
 302             @Override
 303             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 304                 return sink;
 305             }
 306         };
 307     }
 308 
 309     @Override
 310     public final LongStream filter(LongPredicate predicate) {
 311         Objects.requireNonNull(predicate);
 312         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 313                                      StreamOpFlag.NOT_SIZED) {
 314             @Override
 315             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 316                 return new Sink.ChainedLong<Long>(sink) {
 317                     @Override
 318                     public void begin(long size) {
 319                         downstream.begin(-1);
 320                     }
 321 
 322                     @Override
 323                     public void accept(long t) {
 324                         if (predicate.test(t))
 325                             downstream.accept(t);
 326                     }
 327                 };
 328             }
 329         };
 330     }
 331 
 332     @Override
 333     public final LongStream peek(LongConsumer action) {
 334         Objects.requireNonNull(action);
 335         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 336                                      0) {
 337             @Override
 338             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 339                 return new Sink.ChainedLong<Long>(sink) {
 340                     @Override
 341                     public void accept(long t) {
 342                         action.accept(t);
 343                         downstream.accept(t);
 344                     }
 345                 };
 346             }
 347         };
 348     }
 349 
 350     // Stateful intermediate ops from LongStream
 351 
 352     @Override
 353     public final LongStream limit(long maxSize) {
 354         if (maxSize < 0)
 355             throw new IllegalArgumentException(Long.toString(maxSize));
 356         return SliceOps.makeLong(this, 0, maxSize);
 357     }
 358 
 359     @Override
 360     public final LongStream skip(long n) {
 361         if (n < 0)
 362             throw new IllegalArgumentException(Long.toString(n));
 363         if (n == 0)
 364             return this;
 365         else
 366             return SliceOps.makeLong(this, n, -1);
 367     }
 368 
 369     @Override
 370     public final LongStream sorted() {
 371         return SortedOps.makeLong(this);
 372     }
 373 
 374     @Override
 375     public final LongStream distinct() {
 376         // While functional and quick to implement, this approach is not very efficient.
 377         // An efficient version requires a long-specific map/set implementation.
 378         return boxed().distinct().mapToLong(i -> (long) i);
 379     }
 380 
 381     // Terminal ops from LongStream
 382 
 383     @Override
 384     public void forEach(LongConsumer action) {
 385         evaluate(ForEachOps.makeLong(action, false));
 386     }
 387 
 388     @Override
 389     public void forEachOrdered(LongConsumer action) {
 390         evaluate(ForEachOps.makeLong(action, true));
 391     }
 392 
 393     @Override
 394     public final long sum() {
 395         // use better algorithm to compensate for intermediate overflow?
 396         return reduce(0, Long::sum);
 397     }
 398 
 399     @Override
 400     public final OptionalLong min() {
 401         return reduce(Math::min);
 402     }
 403 
 404     @Override
 405     public final OptionalLong max() {
 406         return reduce(Math::max);
 407     }
 408 
 409     @Override
 410     public final OptionalDouble average() {
 411         long[] avg = collect(() -> new long[2],
 412                              (ll, i) -> {
 413                                  ll[0]++;
 414                                  ll[1] += i;
 415                              },
 416                              (ll, rr) -> {
 417                                  ll[0] += rr[0];
 418                                  ll[1] += rr[1];
 419                              });
 420         return avg[0] > 0
 421                ? OptionalDouble.of((double) avg[1] / avg[0])
 422                : OptionalDouble.empty();
 423     }
 424 
 425     @Override
 426     public final long count() {
 427         return map(e -> 1L).sum();
 428     }
 429 
 430     @Override
 431     public final LongSummaryStatistics summaryStatistics() {
 432         return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,
 433                        LongSummaryStatistics::combine);
 434     }
 435 
 436     @Override
 437     public final long reduce(long identity, LongBinaryOperator op) {
 438         return evaluate(ReduceOps.makeLong(identity, op));
 439     }
 440 
 441     @Override
 442     public final OptionalLong reduce(LongBinaryOperator op) {
 443         return evaluate(ReduceOps.makeLong(op));
 444     }
 445 
 446     @Override
 447     public final <R> R collect(Supplier<R> supplier,
 448                                ObjLongConsumer<R> accumulator,
 449                                BiConsumer<R, R> combiner) {
 450         BinaryOperator<R> operator = (left, right) -> {
 451             combiner.accept(left, right);
 452             return left;
 453         };
 454         return evaluate(ReduceOps.makeLong(supplier, accumulator, operator));
 455     }
 456 
 457     @Override
 458     public final boolean anyMatch(LongPredicate predicate) {
 459         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
 460     }
 461 
 462     @Override
 463     public final boolean allMatch(LongPredicate predicate) {
 464         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
 465     }
 466 
 467     @Override
 468     public final boolean noneMatch(LongPredicate predicate) {
 469         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
 470     }
 471 
 472     @Override
 473     public final OptionalLong findFirst() {
 474         return evaluate(FindOps.makeLong(true));
 475     }
 476 
 477     @Override
 478     public final OptionalLong findAny() {
 479         return evaluate(FindOps.makeLong(false));
 480     }
 481 
 482     @Override
 483     public final long[] toArray() {
 484         return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new))
 485                 .asPrimitiveArray();
 486     }
 487 
 488 
 489     //
 490 
 491     /**
 492      * Source stage of a LongPipeline.
 493      *
 494      * @param <E_IN> type of elements in the upstream source
 495      * @since 1.8
 496      */
 497     static class Head<E_IN> extends LongPipeline<E_IN> {
 498         /**
 499          * Constructor for the source stage of a LongStream.
 500          *
 501          * @param source {@code Supplier<Spliterator>} describing the stream
 502          *               source
 503          * @param sourceFlags the source flags for the stream source, described
 504          *                    in {@link StreamOpFlag}
 505          * @param parallel {@code true} if the pipeline is parallel
 506          */
 507         Head(Supplier<? extends Spliterator<Long>> source,
 508              int sourceFlags, boolean parallel) {
 509             super(source, sourceFlags, parallel);
 510         }
 511 
 512         /**
 513          * Constructor for the source stage of a LongStream.
 514          *
 515          * @param source {@code Spliterator} describing the stream source
 516          * @param sourceFlags the source flags for the stream source, described
 517          *                    in {@link StreamOpFlag}
 518          * @param parallel {@code true} if the pipeline is parallel
 519          */
 520         Head(Spliterator<Long> source,
 521              int sourceFlags, boolean parallel) {
 522             super(source, sourceFlags, parallel);
 523         }
 524 
 525         @Override
 526         final boolean opIsStateful() {
 527             throw new UnsupportedOperationException();
 528         }
 529 
 530         @Override
 531         final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
 532             throw new UnsupportedOperationException();
 533         }
 534 
 535         // Optimized sequential terminal operations for the head of the pipeline
 536 
 537         @Override
 538         public void forEach(LongConsumer action) {
 539             if (!isParallel()) {
 540                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 541             } else {
 542                 super.forEach(action);
 543             }
 544         }
 545 
 546         @Override
 547         public void forEachOrdered(LongConsumer action) {
 548             if (!isParallel()) {
 549                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 550             } else {
 551                 super.forEachOrdered(action);
 552             }
 553         }
 554     }
 555 
 556     /** Base class for a stateless intermediate stage of a LongStream.
 557      *
 558      * @param <E_IN> type of elements in the upstream source
 559      * @since 1.8
 560      */
 561     abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
 562         /**
 563          * Construct a new LongStream by appending a stateless intermediate
 564          * operation to an existing stream.
 565          * @param upstream The upstream pipeline stage
 566          * @param inputShape The stream shape for the upstream pipeline stage
 567          * @param opFlags Operation flags for the new stage
 568          */
 569         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 570                     StreamShape inputShape,
 571                     int opFlags) {
 572             super(upstream, opFlags);
 573             assert upstream.getOutputShape() == inputShape;
 574         }
 575 
 576         @Override
 577         final boolean opIsStateful() {
 578             return false;
 579         }
 580     }
 581 
 582     /**
 583      * Base class for a stateful intermediate stage of a LongStream.
 584      *
 585      * @param <E_IN> type of elements in the upstream source
 586      * @since 1.8
 587      */
 588     abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
 589         /**
 590          * Construct a new LongStream by appending a stateful intermediate
 591          * operation to an existing stream.
 592          * @param upstream The upstream pipeline stage
 593          * @param inputShape The stream shape for the upstream pipeline stage
 594          * @param opFlags Operation flags for the new stage
 595          */
 596         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 597                    StreamShape inputShape,
 598                    int opFlags) {
 599             super(upstream, opFlags);
 600             assert upstream.getOutputShape() == inputShape;
 601         }
 602 
 603         @Override
 604         final boolean opIsStateful() {
 605             return true;
 606         }
 607 
 608         @Override
 609         abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
 610                                                       Spliterator<P_IN> spliterator,
 611                                                       IntFunction<Long[]> generator);
 612     }
 613 }