1 /*
   2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.LongSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalLong;
  31 import java.util.PrimitiveIterator;
  32 import java.util.Spliterator;
  33 import java.util.Spliterators;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BinaryOperator;
  36 import java.util.function.IntFunction;
  37 import java.util.function.LongBinaryOperator;
  38 import java.util.function.LongConsumer;
  39 import java.util.function.LongFunction;
  40 import java.util.function.LongPredicate;
  41 import java.util.function.LongToDoubleFunction;
  42 import java.util.function.LongToIntFunction;
  43 import java.util.function.LongUnaryOperator;
  44 import java.util.function.ObjLongConsumer;
  45 import java.util.function.Supplier;
  46 
  47 /**
  48  * Abstract base class for an intermediate pipeline stage or pipeline source
  49  * stage implementing whose elements are of type {@code long}.
  50  * @param <E_IN> Type of elements in the upstream source.
  51  * @since 1.8
  52  */
  53 abstract class LongPipeline<E_IN>
  54         extends AbstractPipeline<E_IN, Long, LongStream>
  55         implements LongStream {
  56 
  57     /**
  58      * Constructor for the head of a stream pipeline.
  59      *
  60      * @param source {@code Supplier<Spliterator>} describing the stream source
  61      * @param sourceFlags The source flags for the stream source, described in
  62      *        {@link StreamOpFlag}
  63      * @param parallel True if the pipeline is parallel
  64      */
  65     LongPipeline(Supplier<? extends Spliterator<Long>> source,
  66                  int sourceFlags, boolean parallel) {
  67         super(source, sourceFlags, parallel);
  68     }
  69 
  70     /**
  71      * Constructor for the head of a stream pipeline.
  72      *
  73      * @param source {@code Spliterator} describing the stream source
  74      * @param sourceFlags The source flags for the stream source, described in
  75      *        {@link StreamOpFlag}
  76      * @param parallel True if the pipeline is parallel
  77      */
  78     LongPipeline(Spliterator<Long> source,
  79                  int sourceFlags, boolean parallel) {
  80         super(source, sourceFlags, parallel);
  81     }
  82 
  83     /**
  84      * Constructor for appending an intermediate operation onto an existing pipeline.
  85      *
  86      * @param upstream the upstream element source.
  87      * @param opFlags the operation flags
  88      */
  89     LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  90         super(upstream, opFlags);
  91     }
  92 
  93     /**
  94      * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply
  95      * by casting
  96      */
  97     private static LongConsumer adapt(Sink<Long> sink) {
  98         if (sink instanceof LongConsumer) {
  99             return (LongConsumer) sink;
 100         }
 101         else {
 102             if (Tripwire.ENABLED)
 103                 Tripwire.trip(AbstractPipeline.class,
 104                               "using LongStream.adapt(Sink<Long> s)");
 105             return sink::accept;
 106         }
 107     }
 108 
 109     /**
 110      * Adapt a {@code Spliterator<Long>} to a {@code Spliterator.OfLong}.
 111      *
 112      * @implNote
 113      * The implementation attempts to cast to a Spliterator.OfLong, and throws an
 114      * exception if this cast is not possible.
 115      */
 116     private static Spliterator.OfLong adapt(Spliterator<Long> s) {
 117         if (s instanceof Spliterator.OfLong) {
 118             return (Spliterator.OfLong) s;
 119         }
 120         else {
 121             if (Tripwire.ENABLED)
 122                 Tripwire.trip(AbstractPipeline.class,
 123                               "using LongStream.adapt(Spliterator<Long> s)");
 124             throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)");
 125         }
 126     }
 127 
 128 
 129     // Shape-specific methods
 130 
 131     @Override
 132     final StreamShape getOutputShape() {
 133         return StreamShape.LONG_VALUE;
 134     }
 135 
 136     @Override
 137     final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
 138                                            Spliterator<P_IN> spliterator,
 139                                            boolean flattenTree,
 140                                            IntFunction<Long[]> generator) {
 141         return Nodes.collectLong(helper, spliterator, flattenTree);
 142     }
 143 
 144     @Override
 145     final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
 146                                         Supplier<Spliterator<P_IN>> supplier,
 147                                         boolean isParallel) {
 148         return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
 149     }
 150 
 151     @Override
 152     final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
 153         return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
 154     }
 155 
 156     @Override
 157     final void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
 158         Spliterator.OfLong spl = adapt(spliterator);
 159         LongConsumer adaptedSink =  adapt(sink);
 160         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
 161     }
 162 
 163     @Override
 164     final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
 165         return Nodes.longBuilder(exactSizeIfKnown);
 166     }
 167 
 168 
 169     // LongStream
 170 
 171     @Override
 172     public final PrimitiveIterator.OfLong iterator() {
 173         return Spliterators.iteratorFromSpliterator(spliterator());
 174     }
 175 
 176     @Override
 177     public final Spliterator.OfLong spliterator() {
 178         return adapt(super.spliterator());
 179     }
 180 
 181     // Stateless intermediate ops from LongStream
 182 
 183     @Override
 184     public final DoubleStream doubles() {
 185         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 186                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 187             @Override
 188             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
 189                 return new Sink.ChainedLong(sink) {
 190                     @Override
 191                     public void accept(long t) {
 192                         downstream.accept((double) t);
 193                     }
 194                 };
 195             }
 196         };
 197     }
 198 
 199     @Override
 200     public final Stream<Long> boxed() {
 201         return mapToObj(Long::valueOf);
 202     }
 203 
 204     @Override
 205     public final LongStream map(LongUnaryOperator mapper) {
 206         Objects.requireNonNull(mapper);
 207         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 208                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 209             @Override
 210             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 211                 return new Sink.ChainedLong(sink) {
 212                     @Override
 213                     public void accept(long t) {
 214                         downstream.accept(mapper.applyAsLong(t));
 215                     }
 216                 };
 217             }
 218         };
 219     }
 220 
 221     @Override
 222     public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
 223         Objects.requireNonNull(mapper);
 224         return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE,
 225                                                           StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 226             @Override
 227             Sink<Long> opWrapSink(int flags, Sink<U> sink) {
 228                 return new Sink.ChainedLong(sink) {
 229                     @Override
 230                     public void accept(long t) {
 231                         downstream.accept(mapper.apply(t));
 232                     }
 233                 };
 234             }
 235         };
 236     }
 237 
 238     @Override
 239     public final IntStream mapToInt(LongToIntFunction mapper) {
 240         Objects.requireNonNull(mapper);
 241         return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 242                                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 243             @Override
 244             Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
 245                 return new Sink.ChainedLong(sink) {
 246                     @Override
 247                     public void accept(long t) {
 248                         downstream.accept(mapper.applyAsInt(t));
 249                     }
 250                 };
 251             }
 252         };
 253     }
 254 
 255     @Override
 256     public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
 257         Objects.requireNonNull(mapper);
 258         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 259                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 260             @Override
 261             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
 262                 return new Sink.ChainedLong(sink) {
 263                     @Override
 264                     public void accept(long t) {
 265                         downstream.accept(mapper.applyAsDouble(t));
 266                     }
 267                 };
 268             }
 269         };
 270     }
 271 
 272     @Override
 273     public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
 274         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 275                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 276             @Override
 277             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 278                 return new Sink.ChainedLong(sink) {
 279                     public void accept(long t) {
 280                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 281                         LongStream result = mapper.apply(t);
 282                         if (result != null)
 283                             result.sequential().forEach(i -> downstream.accept(i));
 284                     }
 285                 };
 286             }
 287         };
 288     }
 289 
 290     @Override
 291     public LongStream unordered() {
 292         if (!isOrdered())
 293             return this;
 294         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
 295             @Override
 296             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 297                 return sink;
 298             }
 299         };
 300     }
 301 
 302     @Override
 303     public final LongStream filter(LongPredicate predicate) {
 304         Objects.requireNonNull(predicate);
 305         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 306                                      StreamOpFlag.NOT_SIZED) {
 307             @Override
 308             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 309                 return new Sink.ChainedLong(sink) {
 310                     @Override
 311                     public void accept(long t) {
 312                         if (predicate.test(t))
 313                             downstream.accept(t);
 314                     }
 315                 };
 316             }
 317         };
 318     }
 319 
 320     @Override
 321     public final LongStream peek(LongConsumer consumer) {
 322         Objects.requireNonNull(consumer);
 323         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 324                                      0) {
 325             @Override
 326             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 327                 return new Sink.ChainedLong(sink) {
 328                     @Override
 329                     public void accept(long t) {
 330                         consumer.accept(t);
 331                         downstream.accept(t);
 332                     }
 333                 };
 334             }
 335         };
 336     }
 337 
 338     // Stateful intermediate ops from LongStream
 339 
 340     private LongStream slice(long skip, long limit) {
 341         return SliceOps.makeLong(this, skip, limit);
 342     }
 343 
 344     @Override
 345     public final LongStream limit(long maxSize) {
 346         if (maxSize < 0)
 347             throw new IllegalArgumentException(Long.toString(maxSize));
 348         return slice(0, maxSize);
 349     }
 350 
 351     @Override
 352     public final LongStream substream(long startingOffset) {
 353         if (startingOffset < 0)
 354             throw new IllegalArgumentException(Long.toString(startingOffset));
 355         if (startingOffset == 0)
 356             return this;
 357         else
 358             return slice(startingOffset, -1);
 359     }
 360 
 361     @Override
 362     public final LongStream substream(long startingOffset, long endingOffset) {
 363         if (startingOffset < 0 || endingOffset < startingOffset)
 364             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
 365         return slice(startingOffset, endingOffset - startingOffset);
 366     }
 367 
 368     @Override
 369     public final LongStream sorted() {
 370         return SortedOps.makeLong(this);
 371     }
 372 
 373     @Override
 374     public final LongStream distinct() {
 375         // While functional and quick to implement, this approach is not very efficient.
 376         // An efficient version requires a long-specific map/set implementation.
 377         return boxed().distinct().mapToLong(i -> (long) i);
 378     }
 379 
 380     // Terminal ops from LongStream
 381 
 382     @Override
 383     public void forEach(LongConsumer action) {
 384         evaluate(ForEachOps.makeLong(action, false));
 385     }
 386 
 387     @Override
 388     public void forEachOrdered(LongConsumer action) {
 389         evaluate(ForEachOps.makeLong(action, true));
 390     }
 391 
 392     @Override
 393     public final long sum() {
 394         // use better algorithm to compensate for intermediate overflow?
 395         return reduce(0, Long::sum);
 396     }
 397 
 398     @Override
 399     public final OptionalLong min() {
 400         return reduce(Math::min);
 401     }
 402 
 403     @Override
 404     public final OptionalLong max() {
 405         return reduce(Math::max);
 406     }
 407 
 408     @Override
 409     public final OptionalDouble average() {
 410         long[] avg = collect(() -> new long[2],
 411                              (ll, i) -> {
 412                                  ll[0]++;
 413                                  ll[1] += i;
 414                              },
 415                              (ll, rr) -> {
 416                                  ll[0] += rr[0];
 417                                  ll[1] += rr[1];
 418                              });
 419         return avg[0] > 0
 420                ? OptionalDouble.of((double) avg[1] / avg[0])
 421                : OptionalDouble.empty();
 422     }
 423 
 424     @Override
 425     public final long count() {
 426         return map(e -> 1L).sum();
 427     }
 428 
 429     @Override
 430     public final LongSummaryStatistics summaryStatistics() {
 431         return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,
 432                        LongSummaryStatistics::combine);
 433     }
 434 
 435     @Override
 436     public final long reduce(long identity, LongBinaryOperator op) {
 437         return evaluate(ReduceOps.makeLong(identity, op));
 438     }
 439 
 440     @Override
 441     public final OptionalLong reduce(LongBinaryOperator op) {
 442         return evaluate(ReduceOps.makeLong(op));
 443     }
 444 
 445     @Override
 446     public final <R> R collect(Supplier<R> resultFactory,
 447                                ObjLongConsumer<R> accumulator,
 448                                BiConsumer<R, R> combiner) {
 449         BinaryOperator<R> operator = (left, right) -> {
 450             combiner.accept(left, right);
 451             return left;
 452         };
 453         return evaluate(ReduceOps.makeLong(resultFactory, accumulator, operator));
 454     }
 455 
 456     @Override
 457     public final boolean anyMatch(LongPredicate predicate) {
 458         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
 459     }
 460 
 461     @Override
 462     public final boolean allMatch(LongPredicate predicate) {
 463         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
 464     }
 465 
 466     @Override
 467     public final boolean noneMatch(LongPredicate predicate) {
 468         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
 469     }
 470 
 471     @Override
 472     public final OptionalLong findFirst() {
 473         return evaluate(FindOps.makeLong(true));
 474     }
 475 
 476     @Override
 477     public final OptionalLong findAny() {
 478         return evaluate(FindOps.makeLong(false));
 479     }
 480 
 481     @Override
 482     public final long[] toArray() {
 483         return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new)).asLongArray();
 484     }
 485 
 486 
 487     //
 488 
 489     /** Source stage of a LongPipeline */
 490     static class Head<E_IN> extends LongPipeline<E_IN> {
 491         /**
 492          * Constructor for the source stage of a LongStream.
 493          *
 494          * @param source {@code Supplier<Spliterator>} describing the stream
 495          *               source
 496          * @param sourceFlags The source flags for the stream source, described
 497          *                    in {@link StreamOpFlag}
 498          * @param parallel True if the pipeline is parallel
 499          */
 500         Head(Supplier<? extends Spliterator<Long>> source,
 501              int sourceFlags, boolean parallel) {
 502             super(source, sourceFlags, parallel);
 503         }
 504 
 505         /**
 506          * Constructor for the source stage of a LongStream.
 507          *
 508          * @param source {@code Spliterator} describing the stream source
 509          * @param sourceFlags The source flags for the stream source, described
 510          *                    in {@link StreamOpFlag}
 511          * @param parallel True if the pipeline is parallel
 512          */
 513         Head(Spliterator<Long> source,
 514              int sourceFlags, boolean parallel) {
 515             super(source, sourceFlags, parallel);
 516         }
 517 
 518         @Override
 519         final boolean opIsStateful() {
 520             throw new UnsupportedOperationException();
 521         }
 522 
 523         @Override
 524         final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
 525             throw new UnsupportedOperationException();
 526         }
 527 
 528         // Optimized sequential terminal operations for the head of the pipeline
 529 
 530         @Override
 531         public void forEach(LongConsumer action) {
 532             if (!isParallel()) {
 533                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 534             }
 535             else {
 536                 super.forEach(action);
 537             }
 538         }
 539 
 540         @Override
 541         public void forEachOrdered(LongConsumer action) {
 542             if (!isParallel()) {
 543                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 544             }
 545             else {
 546                 super.forEachOrdered(action);
 547             }
 548         }
 549     }
 550 
 551     /** Base class for a stateless intermediate stage of a LongStream */
 552     abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
 553         /**
 554          * Construct a new LongStream by appending a stateless intermediate
 555          * operation to an existing stream.
 556          * @param upstream The upstream pipeline stage
 557          * @param inputShape The stream shape for the upstream pipeline stage
 558          * @param opFlags Operation flags for the new stage
 559          */
 560         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 561                     StreamShape inputShape,
 562                     int opFlags) {
 563             super(upstream, opFlags);
 564             assert upstream.getOutputShape() == inputShape;
 565         }
 566 
 567         @Override
 568         final boolean opIsStateful() {
 569             return false;
 570         }
 571     }
 572 
 573     /** Base class for a stateful intermediate stage of a LongStream */
 574     abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
 575         /**
 576          * Construct a new LongStream by appending a stateful intermediate
 577          * operation to an existing stream.
 578          * @param upstream The upstream pipeline stage
 579          * @param inputShape The stream shape for the upstream pipeline stage
 580          * @param opFlags Operation flags for the new stage
 581          */
 582         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 583                    StreamShape inputShape,
 584                    int opFlags) {
 585             super(upstream, opFlags);
 586             assert upstream.getOutputShape() == inputShape;
 587         }
 588 
 589         @Override
 590         final boolean opIsStateful() {
 591             return true;
 592         }
 593 
 594         @Override
 595         abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
 596                                                       Spliterator<P_IN> spliterator,
 597                                                       IntFunction<Long[]> generator);
 598     }
 599 }