1 /*
   2  * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.LongSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.OptionalLong;
  31 import java.util.PrimitiveIterator;
  32 import java.util.Spliterator;
  33 import java.util.Spliterators;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.BinaryOperator;
  36 import java.util.function.IntFunction;
  37 import java.util.function.LongBinaryOperator;
  38 import java.util.function.LongConsumer;
  39 import java.util.function.LongFunction;
  40 import java.util.function.LongPredicate;
  41 import java.util.function.LongToDoubleFunction;
  42 import java.util.function.LongToIntFunction;
  43 import java.util.function.LongUnaryOperator;
  44 import java.util.function.ObjLongConsumer;
  45 import java.util.function.Supplier;
  46 
  47 /**
  48  * Abstract base class for an intermediate pipeline stage or pipeline source
  49  * stage implementing whose elements are of type {@code long}.
  50  *
  51  * @param <E_IN> type of elements in the upstream source
  52  * @since 1.8
  53  */
  54 abstract class LongPipeline<E_IN>
  55         extends AbstractPipeline<E_IN, Long, LongStream>
  56         implements LongStream {
  57 
  58     /**
  59      * Constructor for the head of a stream pipeline.
  60      *
  61      * @param source {@code Supplier<Spliterator>} describing the stream source
  62      * @param sourceFlags the source flags for the stream source, described in
  63      *        {@link StreamOpFlag}
  64      * @param parallel {@code true} if the pipeline is parallel
  65      */
  66     LongPipeline(Supplier<? extends Spliterator<Long>> source,
  67                  int sourceFlags, boolean parallel) {
  68         super(source, sourceFlags, parallel);
  69     }
  70 
  71     /**
  72      * Constructor for the head of a stream pipeline.
  73      *
  74      * @param source {@code Spliterator} describing the stream source
  75      * @param sourceFlags the source flags for the stream source, described in
  76      *        {@link StreamOpFlag}
  77      * @param parallel {@code true} if the pipeline is parallel
  78      */
  79     LongPipeline(Spliterator<Long> source,
  80                  int sourceFlags, boolean parallel) {
  81         super(source, sourceFlags, parallel);
  82     }
  83 
  84     /**
  85      * Constructor for appending an intermediate operation onto an existing pipeline.
  86      *
  87      * @param upstream the upstream element source.
  88      * @param opFlags the operation flags
  89      */
  90     LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  91         super(upstream, opFlags);
  92     }
  93 
  94     /**
  95      * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply
  96      * by casting.
  97      */
  98     private static LongConsumer adapt(Sink<Long> sink) {
  99         if (sink instanceof LongConsumer) {
 100             return (LongConsumer) sink;
 101         } else {
 102             if (Tripwire.ENABLED)
 103                 Tripwire.trip(AbstractPipeline.class,
 104                               "using LongStream.adapt(Sink<Long> s)");
 105             return sink::accept;
 106         }
 107     }
 108 
 109     /**
 110      * Adapt a {@code Spliterator<Long>} to a {@code Spliterator.OfLong}.
 111      *
 112      * @implNote
 113      * The implementation attempts to cast to a Spliterator.OfLong, and throws
 114      * an exception if this cast is not possible.
 115      */
 116     private static Spliterator.OfLong adapt(Spliterator<Long> s) {
 117         if (s instanceof Spliterator.OfLong) {
 118             return (Spliterator.OfLong) s;
 119         } else {
 120             if (Tripwire.ENABLED)
 121                 Tripwire.trip(AbstractPipeline.class,
 122                               "using LongStream.adapt(Spliterator<Long> s)");
 123             throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)");
 124         }
 125     }
 126 
 127 
 128     // Shape-specific methods
 129 
 130     @Override
 131     final StreamShape getOutputShape() {
 132         return StreamShape.LONG_VALUE;
 133     }
 134 
 135     @Override
 136     final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
 137                                            Spliterator<P_IN> spliterator,
 138                                            boolean flattenTree,
 139                                            IntFunction<Long[]> generator) {
 140         return Nodes.collectLong(helper, spliterator, flattenTree);
 141     }
 142 
 143     @Override
 144     final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
 145                                         Supplier<Spliterator<P_IN>> supplier,
 146                                         boolean isParallel) {
 147         return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
 148     }
 149 
 150     @Override
 151     @SuppressWarnings("unchecked")
 152     final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
 153         return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
 154     }
 155 
 156     @Override
 157     final boolean forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
 158         Spliterator.OfLong spl = adapt(spliterator);
 159         LongConsumer adaptedSink =  adapt(sink);
 160         boolean cancelled;
 161         do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
 162         return cancelled;
 163     }
 164 
 165     @Override
 166     final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
 167         return Nodes.longBuilder(exactSizeIfKnown);
 168     }
 169 
 170 
 171     // LongStream
 172 
 173     @Override
 174     public final PrimitiveIterator.OfLong iterator() {
 175         return Spliterators.iterator(spliterator());
 176     }
 177 
 178     @Override
 179     public final Spliterator.OfLong spliterator() {
 180         return adapt(super.spliterator());
 181     }
 182 
 183     // Stateless intermediate ops from LongStream
 184 
 185     @Override
 186     public final DoubleStream asDoubleStream() {
 187         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 188                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 189             @Override
 190             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
 191                 return new Sink.ChainedLong<Double>(sink) {
 192                     @Override
 193                     public void accept(long t) {
 194                         downstream.accept((double) t);
 195                     }
 196                 };
 197             }
 198         };
 199     }
 200 
 201     @Override
 202     public final Stream<Long> boxed() {
 203         return mapToObj(Long::valueOf);
 204     }
 205 
 206     @Override
 207     public final LongStream map(LongUnaryOperator mapper) {
 208         Objects.requireNonNull(mapper);
 209         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 210                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 211             @Override
 212             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 213                 return new Sink.ChainedLong<Long>(sink) {
 214                     @Override
 215                     public void accept(long t) {
 216                         downstream.accept(mapper.applyAsLong(t));
 217                     }
 218                 };
 219             }
 220         };
 221     }
 222 
 223     @Override
 224     public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
 225         Objects.requireNonNull(mapper);
 226         return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE,
 227                                                           StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 228             @Override
 229             Sink<Long> opWrapSink(int flags, Sink<U> sink) {
 230                 return new Sink.ChainedLong<U>(sink) {
 231                     @Override
 232                     public void accept(long t) {
 233                         downstream.accept(mapper.apply(t));
 234                     }
 235                 };
 236             }
 237         };
 238     }
 239 
 240     @Override
 241     public final IntStream mapToInt(LongToIntFunction mapper) {
 242         Objects.requireNonNull(mapper);
 243         return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 244                                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 245             @Override
 246             Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
 247                 return new Sink.ChainedLong<Integer>(sink) {
 248                     @Override
 249                     public void accept(long t) {
 250                         downstream.accept(mapper.applyAsInt(t));
 251                     }
 252                 };
 253             }
 254         };
 255     }
 256 
 257     @Override
 258     public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
 259         Objects.requireNonNull(mapper);
 260         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 261                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 262             @Override
 263             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
 264                 return new Sink.ChainedLong<Double>(sink) {
 265                     @Override
 266                     public void accept(long t) {
 267                         downstream.accept(mapper.applyAsDouble(t));
 268                     }
 269                 };
 270             }
 271         };
 272     }
 273 
 274     @Override
 275     public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
 276         Objects.requireNonNull(mapper);
 277         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 278                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 279             @Override
 280             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 281                 return new Sink.ChainedLong<Long>(sink) {
 282                     @Override
 283                     public void begin(long size) {
 284                         downstream.begin(-1);
 285                     }
 286 
 287                     @Override
 288                     public void accept(long t) {
 289                         try (LongStream result = mapper.apply(t)) {
 290                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 291                             if (result != null)
 292                                 result.sequential().forEach(i -> downstream.accept(i));
 293                         }
 294                     }
 295                 };
 296             }
 297         };
 298     }
 299 
 300     @Override
 301     public LongStream unordered() {
 302         if (!isOrdered())
 303             return this;
 304         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
 305             @Override
 306             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 307                 return sink;
 308             }
 309         };
 310     }
 311 
 312     @Override
 313     public final LongStream filter(LongPredicate predicate) {
 314         Objects.requireNonNull(predicate);
 315         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 316                                      StreamOpFlag.NOT_SIZED) {
 317             @Override
 318             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 319                 return new Sink.ChainedLong<Long>(sink) {
 320                     @Override
 321                     public void begin(long size) {
 322                         downstream.begin(-1);
 323                     }
 324 
 325                     @Override
 326                     public void accept(long t) {
 327                         if (predicate.test(t))
 328                             downstream.accept(t);
 329                     }
 330                 };
 331             }
 332         };
 333     }
 334 
 335     @Override
 336     public final LongStream peek(LongConsumer action) {
 337         Objects.requireNonNull(action);
 338         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
 339                                      0) {
 340             @Override
 341             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 342                 return new Sink.ChainedLong<Long>(sink) {
 343                     @Override
 344                     public void accept(long t) {
 345                         action.accept(t);
 346                         downstream.accept(t);
 347                     }
 348                 };
 349             }
 350         };
 351     }
 352 
 353     // Stateful intermediate ops from LongStream
 354 
 355     @Override
 356     public final LongStream limit(long maxSize) {
 357         if (maxSize < 0)
 358             throw new IllegalArgumentException(Long.toString(maxSize));
 359         return SliceOps.makeLong(this, 0, maxSize);
 360     }
 361 
 362     @Override
 363     public final LongStream skip(long n) {
 364         if (n < 0)
 365             throw new IllegalArgumentException(Long.toString(n));
 366         if (n == 0)
 367             return this;
 368         else
 369             return SliceOps.makeLong(this, n, -1);
 370     }
 371 
 372     @Override
 373     public final LongStream takeWhile(LongPredicate predicate) {
 374         return WhileOps.makeTakeWhileLong(this, predicate);
 375     }
 376 
 377     @Override
 378     public final LongStream dropWhile(LongPredicate predicate) {
 379         return WhileOps.makeDropWhileLong(this, predicate);
 380     }
 381 
 382     @Override
 383     public final LongStream sorted() {
 384         return SortedOps.makeLong(this);
 385     }
 386 
 387     @Override
 388     public final LongStream distinct() {
 389         // While functional and quick to implement, this approach is not very efficient.
 390         // An efficient version requires a long-specific map/set implementation.
 391         return boxed().distinct().mapToLong(i -> (long) i);
 392     }
 393 
 394     // Terminal ops from LongStream
 395 
 396     @Override
 397     public void forEach(LongConsumer action) {
 398         evaluate(ForEachOps.makeLong(action, false));
 399     }
 400 
 401     @Override
 402     public void forEachOrdered(LongConsumer action) {
 403         evaluate(ForEachOps.makeLong(action, true));
 404     }
 405 
 406     @Override
 407     public final long sum() {
 408         // use better algorithm to compensate for intermediate overflow?
 409         return reduce(0, Long::sum);
 410     }
 411 
 412     @Override
 413     public final OptionalLong min() {
 414         return reduce(Math::min);
 415     }
 416 
 417     @Override
 418     public final OptionalLong max() {
 419         return reduce(Math::max);
 420     }
 421 
 422     @Override
 423     public final OptionalDouble average() {
 424         long[] avg = collect(() -> new long[2],
 425                              (ll, i) -> {
 426                                  ll[0]++;
 427                                  ll[1] += i;
 428                              },
 429                              (ll, rr) -> {
 430                                  ll[0] += rr[0];
 431                                  ll[1] += rr[1];
 432                              });
 433         return avg[0] > 0
 434                ? OptionalDouble.of((double) avg[1] / avg[0])
 435                : OptionalDouble.empty();
 436     }
 437 
 438     @Override
 439     public final long count() {
 440         return evaluate(ReduceOps.makeLongCounting());
 441     }
 442 
 443     @Override
 444     public final LongSummaryStatistics summaryStatistics() {
 445         return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,
 446                        LongSummaryStatistics::combine);
 447     }
 448 
 449     @Override
 450     public final long reduce(long identity, LongBinaryOperator op) {
 451         return evaluate(ReduceOps.makeLong(identity, op));
 452     }
 453 
 454     @Override
 455     public final OptionalLong reduce(LongBinaryOperator op) {
 456         return evaluate(ReduceOps.makeLong(op));
 457     }
 458 
 459     @Override
 460     public final <R> R collect(Supplier<R> supplier,
 461                                ObjLongConsumer<R> accumulator,
 462                                BiConsumer<R, R> combiner) {
 463         Objects.requireNonNull(combiner);
 464         BinaryOperator<R> operator = (left, right) -> {
 465             combiner.accept(left, right);
 466             return left;
 467         };
 468         return evaluate(ReduceOps.makeLong(supplier, accumulator, operator));
 469     }
 470 
 471     @Override
 472     public final boolean anyMatch(LongPredicate predicate) {
 473         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
 474     }
 475 
 476     @Override
 477     public final boolean allMatch(LongPredicate predicate) {
 478         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
 479     }
 480 
 481     @Override
 482     public final boolean noneMatch(LongPredicate predicate) {
 483         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
 484     }
 485 
 486     @Override
 487     public final OptionalLong findFirst() {
 488         return evaluate(FindOps.makeLong(true));
 489     }
 490 
 491     @Override
 492     public final OptionalLong findAny() {
 493         return evaluate(FindOps.makeLong(false));
 494     }
 495 
 496     @Override
 497     public final long[] toArray() {
 498         return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new))
 499                 .asPrimitiveArray();
 500     }
 501 
 502 
 503     //
 504 
 505     /**
 506      * Source stage of a LongPipeline.
 507      *
 508      * @param <E_IN> type of elements in the upstream source
 509      * @since 1.8
 510      */
 511     static class Head<E_IN> extends LongPipeline<E_IN> {
 512         /**
 513          * Constructor for the source stage of a LongStream.
 514          *
 515          * @param source {@code Supplier<Spliterator>} describing the stream
 516          *               source
 517          * @param sourceFlags the source flags for the stream source, described
 518          *                    in {@link StreamOpFlag}
 519          * @param parallel {@code true} if the pipeline is parallel
 520          */
 521         Head(Supplier<? extends Spliterator<Long>> source,
 522              int sourceFlags, boolean parallel) {
 523             super(source, sourceFlags, parallel);
 524         }
 525 
 526         /**
 527          * Constructor for the source stage of a LongStream.
 528          *
 529          * @param source {@code Spliterator} describing the stream source
 530          * @param sourceFlags the source flags for the stream source, described
 531          *                    in {@link StreamOpFlag}
 532          * @param parallel {@code true} if the pipeline is parallel
 533          */
 534         Head(Spliterator<Long> source,
 535              int sourceFlags, boolean parallel) {
 536             super(source, sourceFlags, parallel);
 537         }
 538 
 539         @Override
 540         final boolean opIsStateful() {
 541             throw new UnsupportedOperationException();
 542         }
 543 
 544         @Override
 545         final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
 546             throw new UnsupportedOperationException();
 547         }
 548 
 549         // Optimized sequential terminal operations for the head of the pipeline
 550 
 551         @Override
 552         public void forEach(LongConsumer action) {
 553             if (!isParallel()) {
 554                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 555             } else {
 556                 super.forEach(action);
 557             }
 558         }
 559 
 560         @Override
 561         public void forEachOrdered(LongConsumer action) {
 562             if (!isParallel()) {
 563                 adapt(sourceStageSpliterator()).forEachRemaining(action);
 564             } else {
 565                 super.forEachOrdered(action);
 566             }
 567         }
 568     }
 569 
 570     /** Base class for a stateless intermediate stage of a LongStream.
 571      *
 572      * @param <E_IN> type of elements in the upstream source
 573      * @since 1.8
 574      */
 575     abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
 576         /**
 577          * Construct a new LongStream by appending a stateless intermediate
 578          * operation to an existing stream.
 579          * @param upstream The upstream pipeline stage
 580          * @param inputShape The stream shape for the upstream pipeline stage
 581          * @param opFlags Operation flags for the new stage
 582          */
 583         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 584                     StreamShape inputShape,
 585                     int opFlags) {
 586             super(upstream, opFlags);
 587             assert upstream.getOutputShape() == inputShape;
 588         }
 589 
 590         @Override
 591         final boolean opIsStateful() {
 592             return false;
 593         }
 594     }
 595 
 596     /**
 597      * Base class for a stateful intermediate stage of a LongStream.
 598      *
 599      * @param <E_IN> type of elements in the upstream source
 600      * @since 1.8
 601      */
 602     abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
 603         /**
 604          * Construct a new LongStream by appending a stateful intermediate
 605          * operation to an existing stream.
 606          * @param upstream The upstream pipeline stage
 607          * @param inputShape The stream shape for the upstream pipeline stage
 608          * @param opFlags Operation flags for the new stage
 609          */
 610         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 611                    StreamShape inputShape,
 612                    int opFlags) {
 613             super(upstream, opFlags);
 614             assert upstream.getOutputShape() == inputShape;
 615         }
 616 
 617         @Override
 618         final boolean opIsStateful() {
 619             return true;
 620         }
 621 
 622         @Override
 623         abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
 624                                                       Spliterator<P_IN> spliterator,
 625                                                       IntFunction<Long[]> generator);
 626     }
 627 }