1 /*
   2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.LongSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalLong;
  31 import java.util.PrimitiveIterator;
  32 import java.util.Spliterator;
  33 import java.util.Spliterators;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BinaryOperator;
  36 import java.util.function.IntFunction;
  37 import java.util.function.LongBinaryOperator;
  38 import java.util.function.LongConsumer;
  39 import java.util.function.LongFunction;
  40 import java.util.function.LongPredicate;
  41 import java.util.function.LongToDoubleFunction;
  42 import java.util.function.LongToIntFunction;
  43 import java.util.function.LongUnaryOperator;
  44 import java.util.function.ObjLongConsumer;
  45 import java.util.function.Supplier;
  46 
  47 /**
  48  * Abstract base class for an intermediate pipeline stage or pipeline source
  49  * stage implementing whose elements are of type {@code long}.
  50  *
  51  * @param <E_IN> type of elements in the upstream source
  52  * @since 1.8
  53  */
  54 abstract class LongPipeline<E_IN>
  55         extends AbstractPipeline<E_IN, Long, LongStream>
  56         implements LongStream {
  57 
  58     /**
  59      * Constructor for the head of a stream pipeline.
  60      *
  61      * @param source {@code Supplier<Spliterator>} describing the stream source
  62      * @param sourceFlags the source flags for the stream source, described in
  63      *        {@link StreamOpFlag}
  64      * @param parallel {@code true} if the pipeline is parallel
  65      */
  66     LongPipeline(Supplier<? extends Spliterator<Long>> source,
  67                  int sourceFlags, boolean parallel) {
  68         super(source, sourceFlags, parallel);
  69     }
  70 
  71     /**
  72      * Constructor for the head of a stream pipeline.
  73      *
  74      * @param source {@code Spliterator} describing the stream source
  75      * @param sourceFlags the source flags for the stream source, described in
  76      *        {@link StreamOpFlag}
  77      * @param parallel {@code true} if the pipeline is parallel
  78      */
  79     LongPipeline(Spliterator<Long> source,
  80                  int sourceFlags, boolean parallel) {
  81         super(source, sourceFlags, parallel);
  82     }
  83 
  84     /**
  85      * Constructor for appending an intermediate operation onto an existing pipeline.
  86      *
  87      * @param upstream the upstream element source.
  88      * @param opFlags the operation flags
  89      */
  90     LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  91         super(upstream, opFlags);
  92     }
  93 
  94     /**
  95      * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply
  96      * by casting.
  97      */
  98     private static LongConsumer adapt(Sink<Long> sink) {
  99         if (sink instanceof LongConsumer) {
 100             return (LongConsumer) sink;
 101         } else {
 102             if (Tripwire.ENABLED)
 103                 Tripwire.trip(AbstractPipeline.class,
 104                               "using LongStream.adapt(Sink<Long> s)");
 105             return sink::accept;
 106         }
 107     }
 108 
 109     /**
 110      * Adapt a {@code Spliterator<Long>} to a {@code Spliterator.OfLong}.
 111      *
 112      * @implNote
 113      * The implementation attempts to cast to a Spliterator.OfLong, and throws
 114      * an exception if this cast is not possible.
 115      */
 116     private static Spliterator.OfLong adapt(Spliterator<Long> s) {
 117         if (s instanceof Spliterator.OfLong) {
 118             return (Spliterator.OfLong) s;
 119         } else {
 120             if (Tripwire.ENABLED)
 121                 Tripwire.trip(AbstractPipeline.class,
 122                               "using LongStream.adapt(Spliterator<Long> s)");
 123             throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)");
 124         }
 125     }
 126 
 127 
 128     // Shape-specific methods
 129 
 130     @Override
 131     final StreamShape getOutputShape() {
 132         return StreamShape.LONG_VALUE;
 133     }
 134 
 135     @Override
 136     final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
 137                                            Spliterator<P_IN> spliterator,
 138                                            boolean flattenTree,
 139                                            IntFunction<Long[]> generator) {
 140         return Nodes.collectLong(helper, spliterator, flattenTree);
 141     }
 142 
 143     @Override
 144     final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
 145                                         Supplier<Spliterator<P_IN>> supplier,
 146                                         boolean isParallel) {
 147         return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
 148     }
 149 
 150     @Override
 151     final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
 152         return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
 153     }
 154 
 155     @Override
 156     final void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
 157         Spliterator.OfLong spl = adapt(spliterator);
 158         LongConsumer adaptedSink =  adapt(sink);
 159         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
 160     }
 161 
 162     @Override
 163     final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
 164         return Nodes.longBuilder(exactSizeIfKnown);
 165     }
 166 
 167 
 168     // LongStream
 169 
 170     @Override
 171     public final PrimitiveIterator.OfLong iterator() {
 172         return Spliterators.iterator(spliterator());
 173     }
 174 
 175     @Override
 176     public final Spliterator.OfLong spliterator() {
 177         return adapt(super.spliterator());
 178     }
 179 
 180     // Stateless intermediate ops from LongStream
 181 
 182     @Override
 183     public final DoubleStream asDoubleStream() {
 184         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 185                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 186             @Override
 187             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
 188                 return new Sink.ChainedLong(sink) {
 189                     @Override
 190                     public void accept(long t) {
 191                         downstream.accept((double) t);
 192                     }
 193                 };
 194             }
 195         };
 196     }
 197 
 198     @Override
 199     public final Stream<Long> boxed() {
 200         return mapToObj(Long::valueOf);
 201     }
 202 
 203     @Override
 204     public final LongStream map(LongUnaryOperator mapper) {
 205         Objects.requireNonNull(mapper);
 206         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 207                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 208             @Override
 209             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 210                 return new Sink.ChainedLong(sink) {
 211                     @Override
 212                     public void accept(long t) {
 213                         downstream.accept(mapper.applyAsLong(t));
 214                     }
 215                 };
 216             }
 217         };
 218     }
 219 
 220     @Override
 221     public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
 222         Objects.requireNonNull(mapper);
 223         return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE,
 224                                                           StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 225             @Override
 226             Sink<Long> opWrapSink(int flags, Sink<U> sink) {
 227                 return new Sink.ChainedLong(sink) {
 228                     @Override
 229                     public void accept(long t) {
 230                         downstream.accept(mapper.apply(t));
 231                     }
 232                 };
 233             }
 234         };
 235     }
 236 
 237     @Override
 238     public final IntStream mapToInt(LongToIntFunction mapper) {
 239         Objects.requireNonNull(mapper);
 240         return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 241                                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 242             @Override
 243             Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
 244                 return new Sink.ChainedLong(sink) {
 245                     @Override
 246                     public void accept(long t) {
 247                         downstream.accept(mapper.applyAsInt(t));
 248                     }
 249                 };
 250             }
 251         };
 252     }
 253 
 254     @Override
 255     public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
 256         Objects.requireNonNull(mapper);
 257         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 258                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 259             @Override
 260             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
 261                 return new Sink.ChainedLong(sink) {
 262                     @Override
 263                     public void accept(long t) {
 264                         downstream.accept(mapper.applyAsDouble(t));
 265                     }
 266                 };
 267             }
 268         };
 269     }
 270 
 271     @Override
 272     public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
 273         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 274                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 275             @Override
 276             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 277                 return new Sink.ChainedLong(sink) {
 278                     @Override
 279                     public void begin(long size) {
 280                         downstream.begin(-1);
 281                     }
 282 
 283                     @Override
 284                     public void accept(long t) {
 285                         try (LongStream result = mapper.apply(t)) {
 286                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 287                             if (result != null)
 288                                 result.sequential().forEach(i -> downstream.accept(i));
 289                         }
 290                     }
 291                 };
 292             }
 293         };
 294     }
 295 
 296     @Override
 297     public LongStream unordered() {
 298         if (!isOrdered())
 299             return this;
 300         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
 301             @Override
 302             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 303                 return sink;
 304             }
 305         };
 306     }
 307 
 308     @Override
 309     public final LongStream filter(LongPredicate predicate) {
 310         Objects.requireNonNull(predicate);
 311         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 312                                      StreamOpFlag.NOT_SIZED) {
 313             @Override
 314             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 315                 return new Sink.ChainedLong(sink) {
 316                     @Override
 317                     public void begin(long size) {
 318                         downstream.begin(-1);
 319                     }
 320 
 321                     @Override
 322                     public void accept(long t) {
 323                         if (predicate.test(t))
 324                             downstream.accept(t);
 325                     }
 326                 };
 327             }
 328         };
 329     }
 330 
 331     @Override
 332     public final LongStream peek(LongConsumer consumer) {
 333         Objects.requireNonNull(consumer);
 334         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 335                                      0) {
 336             @Override
 337             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 338                 return new Sink.ChainedLong(sink) {
 339                     @Override
 340                     public void accept(long t) {
 341                         consumer.accept(t);
 342                         downstream.accept(t);
 343                     }
 344                 };
 345             }
 346         };
 347     }
 348 
 349     // Stateful intermediate ops from LongStream
 350 
 351     private LongStream slice(long skip, long limit) {
 352         return SliceOps.makeLong(this, skip, limit);
 353     }
 354 
 355     @Override
 356     public final LongStream limit(long maxSize) {
 357         if (maxSize < 0)
 358             throw new IllegalArgumentException(Long.toString(maxSize));
 359         return slice(0, maxSize);
 360     }
 361 
 362     @Override
 363     public final LongStream substream(long startingOffset) {
 364         if (startingOffset < 0)
 365             throw new IllegalArgumentException(Long.toString(startingOffset));
 366         if (startingOffset == 0)
 367             return this;
 368         else
 369             return slice(startingOffset, -1);
 370     }
 371 
 372     @Override
 373     public final LongStream substream(long startingOffset, long endingOffset) {
 374         if (startingOffset < 0 || endingOffset < startingOffset)
 375             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
 376         return slice(startingOffset, endingOffset - startingOffset);
 377     }
 378 
 379     @Override
 380     public final LongStream sorted() {
 381         return SortedOps.makeLong(this);
 382     }
 383 
 384     @Override
 385     public final LongStream distinct() {
 386         // While functional and quick to implement, this approach is not very efficient.
 387         // An efficient version requires a long-specific map/set implementation.
 388         return boxed().distinct().mapToLong(i -> (long) i);
 389     }
 390 
 391     // Terminal ops from LongStream
 392 
 393     @Override
 394     public void forEach(LongConsumer action) {
 395         evaluate(ForEachOps.makeLong(action, false));
 396     }
 397 
 398     @Override
 399     public void forEachOrdered(LongConsumer action) {
 400         evaluate(ForEachOps.makeLong(action, true));
 401     }
 402 
 403     @Override
 404     public final long sum() {
 405         // use better algorithm to compensate for intermediate overflow?
 406         return reduce(0, Long::sum);
 407     }
 408 
 409     @Override
 410     public final OptionalLong min() {
 411         return reduce(Math::min);
 412     }
 413 
 414     @Override
 415     public final OptionalLong max() {
 416         return reduce(Math::max);
 417     }
 418 
 419     @Override
 420     public final OptionalDouble average() {
 421         long[] avg = collect(() -> new long[2],
 422                              (ll, i) -> {
 423                                  ll[0]++;
 424                                  ll[1] += i;
 425                              },
 426                              (ll, rr) -> {
 427                                  ll[0] += rr[0];
 428                                  ll[1] += rr[1];
 429                              });
 430         return avg[0] > 0
 431                ? OptionalDouble.of((double) avg[1] / avg[0])
 432                : OptionalDouble.empty();
 433     }
 434 
 435     @Override
 436     public final long count() {
 437         return map(e -> 1L).sum();
 438     }
 439 
 440     @Override
 441     public final LongSummaryStatistics summaryStatistics() {
 442         return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,
 443                        LongSummaryStatistics::combine);
 444     }
 445 
 446     @Override
 447     public final long reduce(long identity, LongBinaryOperator op) {
 448         return evaluate(ReduceOps.makeLong(identity, op));
 449     }
 450 
 451     @Override
 452     public final OptionalLong reduce(LongBinaryOperator op) {
 453         return evaluate(ReduceOps.makeLong(op));
 454     }
 455 
 456     @Override
 457     public final <R> R collect(Supplier<R> resultFactory,
 458                                ObjLongConsumer<R> accumulator,
 459                                BiConsumer<R, R> combiner) {
 460         BinaryOperator<R> operator = (left, right) -> {
 461             combiner.accept(left, right);
 462             return left;
 463         };
 464         return evaluate(ReduceOps.makeLong(resultFactory, accumulator, operator));
 465     }
 466 
 467     @Override
 468     public final boolean anyMatch(LongPredicate predicate) {
 469         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
 470     }
 471 
 472     @Override
 473     public final boolean allMatch(LongPredicate predicate) {
 474         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
 475     }
 476 
 477     @Override
 478     public final boolean noneMatch(LongPredicate predicate) {
 479         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
 480     }
 481 
 482     @Override
 483     public final OptionalLong findFirst() {
 484         return evaluate(FindOps.makeLong(true));
 485     }
 486 
 487     @Override
 488     public final OptionalLong findAny() {
 489         return evaluate(FindOps.makeLong(false));
 490     }
 491 
 492     @Override
 493     public final long[] toArray() {
 494         return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new))
 495                 .asPrimitiveArray();
 496     }
 497 
 498 
 499     //
 500 
 501     /**
 502      * Source stage of a LongPipeline.
 503      *
 504      * @param <E_IN> type of elements in the upstream source
 505      * @since 1.8
 506      */
 507     static class Head<E_IN> extends LongPipeline<E_IN> {
 508         /**
 509          * Constructor for the source stage of a LongStream.
 510          *
 511          * @param source {@code Supplier<Spliterator>} describing the stream
 512          *               source
 513          * @param sourceFlags the source flags for the stream source, described
 514          *                    in {@link StreamOpFlag}
 515          * @param parallel {@code true} if the pipeline is parallel
 516          */
 517         Head(Supplier<? extends Spliterator<Long>> source,
 518              int sourceFlags, boolean parallel) {
 519             super(source, sourceFlags, parallel);
 520         }
 521 
 522         /**
 523          * Constructor for the source stage of a LongStream.
 524          *
 525          * @param source {@code Spliterator} describing the stream source
 526          * @param sourceFlags the source flags for the stream source, described
 527          *                    in {@link StreamOpFlag}
 528          * @param parallel {@code true} if the pipeline is parallel
 529          */
 530         Head(Spliterator<Long> source,
 531              int sourceFlags, boolean parallel) {
 532             super(source, sourceFlags, parallel);
 533         }
 534 
 535         @Override
 536         final boolean opIsStateful() {
 537             throw new UnsupportedOperationException();
 538         }
 539 
 540         @Override
 541         final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
 542             throw new UnsupportedOperationException();
 543         }
 544 
 545         // Optimized sequential terminal operations for the head of the pipeline
 546 
 547         @Override
 548         public void forEach(LongConsumer action) {
 549             if (!isParallel()) {
 550                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 551             } else {
 552                 super.forEach(action);
 553             }
 554         }
 555 
 556         @Override
 557         public void forEachOrdered(LongConsumer action) {
 558             if (!isParallel()) {
 559                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 560             } else {
 561                 super.forEachOrdered(action);
 562             }
 563         }
 564     }
 565 
 566     /** Base class for a stateless intermediate stage of a LongStream.
 567      *
 568      * @param <E_IN> type of elements in the upstream source
 569      * @since 1.8
 570      */
 571     abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
 572         /**
 573          * Construct a new LongStream by appending a stateless intermediate
 574          * operation to an existing stream.
 575          * @param upstream The upstream pipeline stage
 576          * @param inputShape The stream shape for the upstream pipeline stage
 577          * @param opFlags Operation flags for the new stage
 578          */
 579         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 580                     StreamShape inputShape,
 581                     int opFlags) {
 582             super(upstream, opFlags);
 583             assert upstream.getOutputShape() == inputShape;
 584         }
 585 
 586         @Override
 587         final boolean opIsStateful() {
 588             return false;
 589         }
 590     }
 591 
 592     /**
 593      * Base class for a stateful intermediate stage of a LongStream.
 594      *
 595      * @param <E_IN> type of elements in the upstream source
 596      * @since 1.8
 597      */
 598     abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
 599         /**
 600          * Construct a new LongStream by appending a stateful intermediate
 601          * operation to an existing stream.
 602          * @param upstream The upstream pipeline stage
 603          * @param inputShape The stream shape for the upstream pipeline stage
 604          * @param opFlags Operation flags for the new stage
 605          */
 606         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 607                    StreamShape inputShape,
 608                    int opFlags) {
 609             super(upstream, opFlags);
 610             assert upstream.getOutputShape() == inputShape;
 611         }
 612 
 613         @Override
 614         final boolean opIsStateful() {
 615             return true;
 616         }
 617 
 618         @Override
 619         abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
 620                                                       Spliterator<P_IN> spliterator,
 621                                                       IntFunction<Long[]> generator);
 622     }
 623 }