1 /*
   2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.DoubleSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.PrimitiveIterator;
  31 import java.util.Spliterator;
  32 import java.util.Spliterators;
  33 import java.util.function.BiConsumer;
  34 import java.util.function.BinaryOperator;
  35 import java.util.function.DoubleBinaryOperator;
  36 import java.util.function.DoubleConsumer;
  37 import java.util.function.DoubleFunction;
  38 import java.util.function.DoublePredicate;
  39 import java.util.function.DoubleToIntFunction;
  40 import java.util.function.DoubleToLongFunction;
  41 import java.util.function.DoubleUnaryOperator;
  42 import java.util.function.IntFunction;
  43 import java.util.function.ObjDoubleConsumer;
  44 import java.util.function.Supplier;
  45 
  46 /**
  47  * Abstract base class for an intermediate pipeline stage or pipeline source
  48  * stage implementing whose elements are of type {@code double}.
  49  *
  50  * @param <E_IN> type of elements in the upstream source
  51  *
  52  * @since 1.8
  53  */
  54 abstract class DoublePipeline<E_IN>
  55         extends AbstractPipeline<E_IN, Double, DoubleStream>
  56         implements DoubleStream {
  57 
  58     /**
  59      * Constructor for the head of a stream pipeline.
  60      *
  61      * @param source {@code Supplier<Spliterator>} describing the stream source
  62      * @param sourceFlags the source flags for the stream source, described in
  63      * {@link StreamOpFlag}
  64      */
  65     DoublePipeline(Supplier<? extends Spliterator<Double>> source,
  66                    int sourceFlags, boolean parallel) {
  67         super(source, sourceFlags, parallel);
  68     }
  69 
  70     /**
  71      * Constructor for the head of a stream pipeline.
  72      *
  73      * @param source {@code Spliterator} describing the stream source
  74      * @param sourceFlags the source flags for the stream source, described in
  75      * {@link StreamOpFlag}
  76      */
  77     DoublePipeline(Spliterator<Double> source,
  78                    int sourceFlags, boolean parallel) {
  79         super(source, sourceFlags, parallel);
  80     }
  81 
  82     /**
  83      * Constructor for appending an intermediate operation onto an existing
  84      * pipeline.
  85      *
  86      * @param upstream the upstream element source.
  87      * @param opFlags the operation flags
  88      */
  89     DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  90         super(upstream, opFlags);
  91     }
  92 
  93     /**
  94      * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply
  95      * by casting.
  96      */
  97     private static DoubleConsumer adapt(Sink<Double> sink) {
  98         if (sink instanceof DoubleConsumer) {
  99             return (DoubleConsumer) sink;
 100         } else {
 101             if (Tripwire.ENABLED)
 102                 Tripwire.trip(AbstractPipeline.class,
 103                               "using DoubleStream.adapt(Sink<Double> s)");
 104             return sink::accept;
 105         }
 106     }
 107 
 108     /**
 109      * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}.
 110      *
 111      * @implNote
 112      * The implementation attempts to cast to a Spliterator.OfDouble, and throws
 113      * an exception if this cast is not possible.
 114      */
 115     private static Spliterator.OfDouble adapt(Spliterator<Double> s) {
 116         if (s instanceof Spliterator.OfDouble) {
 117             return (Spliterator.OfDouble) s;
 118         } else {
 119             if (Tripwire.ENABLED)
 120                 Tripwire.trip(AbstractPipeline.class,
 121                               "using DoubleStream.adapt(Spliterator<Double> s)");
 122             throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)");
 123         }
 124     }
 125 
 126 
 127     // Shape-specific methods
 128 
 129     @Override
 130     final StreamShape getOutputShape() {
 131         return StreamShape.DOUBLE_VALUE;
 132     }
 133 
 134     @Override
 135     final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper,
 136                                              Spliterator<P_IN> spliterator,
 137                                              boolean flattenTree,
 138                                              IntFunction<Double[]> generator) {
 139         return Nodes.collectDouble(helper, spliterator, flattenTree);
 140     }
 141 
 142     @Override
 143     final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph,
 144                                           Supplier<Spliterator<P_IN>> supplier,
 145                                           boolean isParallel) {
 146         return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel);
 147     }
 148 
 149     @Override
 150     final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) {
 151         return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier);
 152     }
 153 
 154     @Override
 155     final void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
 156         Spliterator.OfDouble spl = adapt(spliterator);
 157         DoubleConsumer adaptedSink = adapt(sink);
 158         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
 159     }
 160 
 161     @Override
 162     final  Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) {
 163         return Nodes.doubleBuilder(exactSizeIfKnown);
 164     }
 165 
 166 
 167     // DoubleStream
 168 
 169     @Override
 170     public final PrimitiveIterator.OfDouble iterator() {
 171         return Spliterators.iterator(spliterator());
 172     }
 173 
 174     @Override
 175     public final Spliterator.OfDouble spliterator() {
 176         return adapt(super.spliterator());
 177     }
 178 
 179     // Stateless intermediate ops from DoubleStream
 180 
 181     @Override
 182     public final Stream<Double> boxed() {
 183         return mapToObj(Double::valueOf);
 184     }
 185 
 186     @Override
 187     public final DoubleStream map(DoubleUnaryOperator mapper) {
 188         Objects.requireNonNull(mapper);
 189         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 190                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 191             @Override
 192             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 193                 return new Sink.ChainedDouble(sink) {
 194                     @Override
 195                     public void accept(double t) {
 196                         downstream.accept(mapper.applyAsDouble(t));
 197                     }
 198                 };
 199             }
 200         };
 201     }
 202 
 203     @Override
 204     public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
 205         Objects.requireNonNull(mapper);
 206         return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE,
 207                                                             StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 208             @Override
 209             Sink<Double> opWrapSink(int flags, Sink<U> sink) {
 210                 return new Sink.ChainedDouble(sink) {
 211                     @Override
 212                     public void accept(double t) {
 213                         downstream.accept(mapper.apply(t));
 214                     }
 215                 };
 216             }
 217         };
 218     }
 219 
 220     @Override
 221     public final IntStream mapToInt(DoubleToIntFunction mapper) {
 222         Objects.requireNonNull(mapper);
 223         return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 224                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 225             @Override
 226             Sink<Double> opWrapSink(int flags, Sink<Integer> sink) {
 227                 return new Sink.ChainedDouble(sink) {
 228                     @Override
 229                     public void accept(double t) {
 230                         downstream.accept(mapper.applyAsInt(t));
 231                     }
 232                 };
 233             }
 234         };
 235     }
 236 
 237     @Override
 238     public final LongStream mapToLong(DoubleToLongFunction mapper) {
 239         Objects.requireNonNull(mapper);
 240         return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 241                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 242             @Override
 243             Sink<Double> opWrapSink(int flags, Sink<Long> sink) {
 244                 return new Sink.ChainedDouble(sink) {
 245                     @Override
 246                     public void accept(double t) {
 247                         downstream.accept(mapper.applyAsLong(t));
 248                     }
 249                 };
 250             }
 251         };
 252     }
 253 
 254     @Override
 255     public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
 256         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 257                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 258             @Override
 259             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 260                 return new Sink.ChainedDouble(sink) {
 261                     @Override
 262                     public void begin(long size) {
 263                         downstream.begin(-1);
 264                     }
 265 
 266                     @Override
 267                     public void accept(double t) {
 268                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 269                         DoubleStream result = mapper.apply(t);
 270                         if (result != null)
 271                             result.sequential().forEach(i -> downstream.accept(i));
 272                     }
 273                 };
 274             }
 275         };
 276     }
 277 
 278     @Override
 279     public DoubleStream unordered() {
 280         if (!isOrdered())
 281             return this;
 282         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) {
 283             @Override
 284             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 285                 return sink;
 286             }
 287         };
 288     }
 289 
 290     @Override
 291     public final DoubleStream filter(DoublePredicate predicate) {
 292         Objects.requireNonNull(predicate);
 293         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 294                                        StreamOpFlag.NOT_SIZED) {
 295             @Override
 296             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 297                 return new Sink.ChainedDouble(sink) {
 298                     @Override
 299                     public void begin(long size) {
 300                         downstream.begin(-1);
 301                     }
 302 
 303                     @Override
 304                     public void accept(double t) {
 305                         if (predicate.test(t))
 306                             downstream.accept(t);
 307                     }
 308                 };
 309             }
 310         };
 311     }
 312 
 313     @Override
 314     public final DoubleStream peek(DoubleConsumer consumer) {
 315         Objects.requireNonNull(consumer);
 316         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 317                                        0) {
 318             @Override
 319             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 320                 return new Sink.ChainedDouble(sink) {
 321                     @Override
 322                     public void accept(double t) {
 323                         consumer.accept(t);
 324                         downstream.accept(t);
 325                     }
 326                 };
 327             }
 328         };
 329     }
 330 
 331     // Stateful intermediate ops from DoubleStream
 332 
 333     @Override
 334     public final DoubleStream limit(long maxSize) {
 335         if (maxSize < 0)
 336             throw new IllegalArgumentException(Long.toString(maxSize));
 337         return SliceOps.makeDouble(this, (long) 0, maxSize);
 338     }
 339 
 340     @Override
 341     public final DoubleStream substream(long startingOffset) {
 342         if (startingOffset < 0)
 343             throw new IllegalArgumentException(Long.toString(startingOffset));
 344         if (startingOffset == 0)
 345             return this;
 346         else {
 347             long limit = -1;
 348             return SliceOps.makeDouble(this, startingOffset, limit);
 349         }
 350     }
 351 
 352     @Override
 353     public final DoubleStream substream(long startingOffset, long endingOffset) {
 354         if (startingOffset < 0 || endingOffset < startingOffset)
 355             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
 356         return SliceOps.makeDouble(this, startingOffset, endingOffset - startingOffset);
 357     }
 358 
 359     @Override
 360     public final DoubleStream sorted() {
 361         return SortedOps.makeDouble(this);
 362     }
 363 
 364     @Override
 365     public final DoubleStream distinct() {
 366         // While functional and quick to implement, this approach is not very efficient.
 367         // An efficient version requires a double-specific map/set implementation.
 368         return boxed().distinct().mapToDouble(i -> (double) i);
 369     }
 370 
 371     // Terminal ops from DoubleStream
 372 
 373     @Override
 374     public void forEach(DoubleConsumer consumer) {
 375         evaluate(ForEachOps.makeDouble(consumer, false));
 376     }
 377 
 378     @Override
 379     public void forEachOrdered(DoubleConsumer consumer) {
 380         evaluate(ForEachOps.makeDouble(consumer, true));
 381     }
 382 
 383     @Override
 384     public final double sum() {
 385         // TODO: better algorithm to compensate for errors
 386         return reduce(0.0, Double::sum);
 387     }
 388 
 389     @Override
 390     public final OptionalDouble min() {
 391         return reduce(Math::min);
 392     }
 393 
 394     @Override
 395     public final OptionalDouble max() {
 396         return reduce(Math::max);
 397     }
 398 
 399     @Override
 400     public final OptionalDouble average() {
 401         double[] avg = collect(() -> new double[2],
 402                                (ll, i) -> {
 403                                    ll[0]++;
 404                                    ll[1] += i;
 405                                },
 406                                (ll, rr) -> {
 407                                    ll[0] += rr[0];
 408                                    ll[1] += rr[1];
 409                                });
 410         return avg[0] > 0
 411                ? OptionalDouble.of(avg[1] / avg[0])
 412                : OptionalDouble.empty();
 413     }
 414 
 415     @Override
 416     public final long count() {
 417         return mapToObj(e -> null).mapToInt(e -> 1).sum();
 418     }
 419 
 420     @Override
 421     public final DoubleSummaryStatistics summaryStatistics() {
 422         return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept,
 423                        DoubleSummaryStatistics::combine);
 424     }
 425 
 426     @Override
 427     public final double reduce(double identity, DoubleBinaryOperator op) {
 428         return evaluate(ReduceOps.makeDouble(identity, op));
 429     }
 430 
 431     @Override
 432     public final OptionalDouble reduce(DoubleBinaryOperator op) {
 433         return evaluate(ReduceOps.makeDouble(op));
 434     }
 435 
 436     @Override
 437     public final <R> R collect(Supplier<R> resultFactory,
 438                                ObjDoubleConsumer<R> accumulator,
 439                                BiConsumer<R, R> combiner) {
 440         BinaryOperator<R> operator = (left, right) -> {
 441             combiner.accept(left, right);
 442             return left;
 443         };
 444         return evaluate(ReduceOps.makeDouble(resultFactory, accumulator, operator));
 445     }
 446 
 447     @Override
 448     public final boolean anyMatch(DoublePredicate predicate) {
 449         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY));
 450     }
 451 
 452     @Override
 453     public final boolean allMatch(DoublePredicate predicate) {
 454         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL));
 455     }
 456 
 457     @Override
 458     public final boolean noneMatch(DoublePredicate predicate) {
 459         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE));
 460     }
 461 
 462     @Override
 463     public final OptionalDouble findFirst() {
 464         return evaluate(FindOps.makeDouble(true));
 465     }
 466 
 467     @Override
 468     public final OptionalDouble findAny() {
 469         return evaluate(FindOps.makeDouble(false));
 470     }
 471 
 472     @Override
 473     public final double[] toArray() {
 474         return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new))
 475                         .asPrimitiveArray();
 476     }
 477 
 478     //
 479 
 480     /**
 481      * Source stage of a DoubleStream
 482      *
 483      * @param <E_IN> type of elements in the upstream source
 484      */
 485     static class Head<E_IN> extends DoublePipeline<E_IN> {
 486         /**
 487          * Constructor for the source stage of a DoubleStream.
 488          *
 489          * @param source {@code Supplier<Spliterator>} describing the stream
 490          *               source
 491          * @param sourceFlags the source flags for the stream source, described
 492          *                    in {@link StreamOpFlag}
 493          * @param parallel {@code true} if the pipeline is parallel
 494          */
 495         Head(Supplier<? extends Spliterator<Double>> source,
 496              int sourceFlags, boolean parallel) {
 497             super(source, sourceFlags, parallel);
 498         }
 499 
 500         /**
 501          * Constructor for the source stage of a DoubleStream.
 502          *
 503          * @param source {@code Spliterator} describing the stream source
 504          * @param sourceFlags the source flags for the stream source, described
 505          *                    in {@link StreamOpFlag}
 506          * @param parallel {@code true} if the pipeline is parallel
 507          */
 508         Head(Spliterator<Double> source,
 509              int sourceFlags, boolean parallel) {
 510             super(source, sourceFlags, parallel);
 511         }
 512 
 513         @Override
 514         final boolean opIsStateful() {
 515             throw new UnsupportedOperationException();
 516         }
 517 
 518         @Override
 519         final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) {
 520             throw new UnsupportedOperationException();
 521         }
 522 
 523         // Optimized sequential terminal operations for the head of the pipeline
 524 
 525         @Override
 526         public void forEach(DoubleConsumer consumer) {
 527             if (!isParallel()) {
 528                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
 529             }
 530             else {
 531                 super.forEach(consumer);
 532             }
 533         }
 534 
 535         @Override
 536         public void forEachOrdered(DoubleConsumer consumer) {
 537             if (!isParallel()) {
 538                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
 539             }
 540             else {
 541                 super.forEachOrdered(consumer);
 542             }
 543         }
 544 
 545     }
 546 
 547     /**
 548      * Base class for a stateless intermediate stage of a DoubleStream.
 549      *
 550      * @param <E_IN> type of elements in the upstream source
 551      * @since 1.8
 552      */
 553     abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> {
 554         /**
 555          * Construct a new DoubleStream by appending a stateless intermediate
 556          * operation to an existing stream.
 557          *
 558          * @param upstream the upstream pipeline stage
 559          * @param inputShape the stream shape for the upstream pipeline stage
 560          * @param opFlags operation flags for the new stage
 561          */
 562         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 563                     StreamShape inputShape,
 564                     int opFlags) {
 565             super(upstream, opFlags);
 566             assert upstream.getOutputShape() == inputShape;
 567         }
 568 
 569         @Override
 570         final boolean opIsStateful() {
 571             return false;
 572         }
 573     }
 574 
 575     /**
 576      * Base class for a stateful intermediate stage of a DoubleStream.
 577      *
 578      * @param <E_IN> type of elements in the upstream source
 579      * @since 1.8
 580      */
 581     abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> {
 582         /**
 583          * Construct a new DoubleStream by appending a stateful intermediate
 584          * operation to an existing stream.
 585          *
 586          * @param upstream the upstream pipeline stage
 587          * @param inputShape the stream shape for the upstream pipeline stage
 588          * @param opFlags operation flags for the new stage
 589          */
 590         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 591                    StreamShape inputShape,
 592                    int opFlags) {
 593             super(upstream, opFlags);
 594             assert upstream.getOutputShape() == inputShape;
 595         }
 596 
 597         @Override
 598         final boolean opIsStateful() {
 599             return true;
 600         }
 601 
 602         @Override
 603         abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 604                                                         Spliterator<P_IN> spliterator,
 605                                                         IntFunction<Double[]> generator);
 606     }
 607 }