1 /*
   2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.DoubleSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.PrimitiveIterator;
  31 import java.util.Spliterator;
  32 import java.util.Spliterators;
  33 import java.util.function.BiConsumer;
  34 import java.util.function.BinaryOperator;
  35 import java.util.function.DoubleBinaryOperator;
  36 import java.util.function.DoubleConsumer;
  37 import java.util.function.DoubleFunction;
  38 import java.util.function.DoublePredicate;
  39 import java.util.function.DoubleToIntFunction;
  40 import java.util.function.DoubleToLongFunction;
  41 import java.util.function.DoubleUnaryOperator;
  42 import java.util.function.IntFunction;
  43 import java.util.function.ObjDoubleConsumer;
  44 import java.util.function.Supplier;
  45 
  46 /**
  47  * Abstract base class for an intermediate pipeline stage or pipeline source
  48  * stage implementing whose elements are of type {@code double}.
  49  *
  50  * @param <E_IN> type of elements in the upstream source
  51  *
  52  * @since 1.8
  53  */
  54 abstract class DoublePipeline<E_IN>
  55         extends AbstractPipeline<E_IN, Double, DoubleStream>
  56         implements DoubleStream {
  57 
  58     /**
  59      * Constructor for the head of a stream pipeline.
  60      *
  61      * @param source {@code Supplier<Spliterator>} describing the stream source
  62      * @param sourceFlags the source flags for the stream source, described in
  63      * {@link StreamOpFlag}
  64      */
  65     DoublePipeline(Supplier<? extends Spliterator<Double>> source,
  66                    int sourceFlags, boolean parallel) {
  67         super(source, sourceFlags, parallel);
  68     }
  69 
  70     /**
  71      * Constructor for the head of a stream pipeline.
  72      *
  73      * @param source {@code Spliterator} describing the stream source
  74      * @param sourceFlags the source flags for the stream source, described in
  75      * {@link StreamOpFlag}
  76      */
  77     DoublePipeline(Spliterator<Double> source,
  78                    int sourceFlags, boolean parallel) {
  79         super(source, sourceFlags, parallel);
  80     }
  81 
  82     /**
  83      * Constructor for appending an intermediate operation onto an existing
  84      * pipeline.
  85      *
  86      * @param upstream the upstream element source.
  87      * @param opFlags the operation flags
  88      */
  89     DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  90         super(upstream, opFlags);
  91     }
  92 
  93     /**
  94      * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply
  95      * by casting.
  96      */
  97     private static DoubleConsumer adapt(Sink<Double> sink) {
  98         if (sink instanceof DoubleConsumer) {
  99             return (DoubleConsumer) sink;
 100         } else {
 101             if (Tripwire.ENABLED)
 102                 Tripwire.trip(AbstractPipeline.class,
 103                               "using DoubleStream.adapt(Sink<Double> s)");
 104             return sink::accept;
 105         }
 106     }
 107 
 108     /**
 109      * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}.
 110      *
 111      * @implNote
 112      * The implementation attempts to cast to a Spliterator.OfDouble, and throws
 113      * an exception if this cast is not possible.
 114      */
 115     private static Spliterator.OfDouble adapt(Spliterator<Double> s) {
 116         if (s instanceof Spliterator.OfDouble) {
 117             return (Spliterator.OfDouble) s;
 118         } else {
 119             if (Tripwire.ENABLED)
 120                 Tripwire.trip(AbstractPipeline.class,
 121                               "using DoubleStream.adapt(Spliterator<Double> s)");
 122             throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)");
 123         }
 124     }
 125 
 126 
 127     // Shape-specific methods
 128 
 129     @Override
 130     final StreamShape getOutputShape() {
 131         return StreamShape.DOUBLE_VALUE;
 132     }
 133 
 134     @Override
 135     final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper,
 136                                              Spliterator<P_IN> spliterator,
 137                                              boolean flattenTree,
 138                                              IntFunction<Double[]> generator) {
 139         return Nodes.collectDouble(helper, spliterator, flattenTree);
 140     }
 141 
 142     @Override
 143     final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph,
 144                                           Supplier<Spliterator<P_IN>> supplier,
 145                                           boolean isParallel) {
 146         return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel);
 147     }
 148 
 149     @Override
 150     @SuppressWarnings("unchecked")
 151     final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) {
 152         return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier);
 153     }
 154 
 155     @Override
 156     final void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
 157         Spliterator.OfDouble spl = adapt(spliterator);
 158         DoubleConsumer adaptedSink = adapt(sink);
 159         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
 160     }
 161 
 162     @Override
 163     final  Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) {
 164         return Nodes.doubleBuilder(exactSizeIfKnown);
 165     }
 166 
 167 
 168     // DoubleStream
 169 
 170     @Override
 171     public final PrimitiveIterator.OfDouble iterator() {
 172         return Spliterators.iterator(spliterator());
 173     }
 174 
 175     @Override
 176     public final Spliterator.OfDouble spliterator() {
 177         return adapt(super.spliterator());
 178     }
 179 
 180     // Stateless intermediate ops from DoubleStream
 181 
 182     @Override
 183     public final Stream<Double> boxed() {
 184         return mapToObj(Double::valueOf);
 185     }
 186 
 187     @Override
 188     public final DoubleStream map(DoubleUnaryOperator mapper) {
 189         Objects.requireNonNull(mapper);
 190         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 191                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 192             @Override
 193             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 194                 return new Sink.ChainedDouble<Double>(sink) {
 195                     @Override
 196                     public void accept(double t) {
 197                         downstream.accept(mapper.applyAsDouble(t));
 198                     }
 199                 };
 200             }
 201         };
 202     }
 203 
 204     @Override
 205     public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
 206         Objects.requireNonNull(mapper);
 207         return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE,
 208                                                             StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 209             @Override
 210             Sink<Double> opWrapSink(int flags, Sink<U> sink) {
 211                 return new Sink.ChainedDouble<U>(sink) {
 212                     @Override
 213                     public void accept(double t) {
 214                         downstream.accept(mapper.apply(t));
 215                     }
 216                 };
 217             }
 218         };
 219     }
 220 
 221     @Override
 222     public final IntStream mapToInt(DoubleToIntFunction mapper) {
 223         Objects.requireNonNull(mapper);
 224         return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 225                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 226             @Override
 227             Sink<Double> opWrapSink(int flags, Sink<Integer> sink) {
 228                 return new Sink.ChainedDouble<Integer>(sink) {
 229                     @Override
 230                     public void accept(double t) {
 231                         downstream.accept(mapper.applyAsInt(t));
 232                     }
 233                 };
 234             }
 235         };
 236     }
 237 
 238     @Override
 239     public final LongStream mapToLong(DoubleToLongFunction mapper) {
 240         Objects.requireNonNull(mapper);
 241         return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 242                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 243             @Override
 244             Sink<Double> opWrapSink(int flags, Sink<Long> sink) {
 245                 return new Sink.ChainedDouble<Long>(sink) {
 246                     @Override
 247                     public void accept(double t) {
 248                         downstream.accept(mapper.applyAsLong(t));
 249                     }
 250                 };
 251             }
 252         };
 253     }
 254 
 255     @Override
 256     public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
 257         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 258                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 259             @Override
 260             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 261                 return new Sink.ChainedDouble<Double>(sink) {
 262                     @Override
 263                     public void begin(long size) {
 264                         downstream.begin(-1);
 265                     }
 266 
 267                     @Override
 268                     public void accept(double t) {
 269                         try (DoubleStream result = mapper.apply(t)) {
 270                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 271                             if (result != null)
 272                                 result.sequential().forEach(i -> downstream.accept(i));
 273                         }
 274                     }
 275                 };
 276             }
 277         };
 278     }
 279 
 280     @Override
 281     public DoubleStream unordered() {
 282         if (!isOrdered())
 283             return this;
 284         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) {
 285             @Override
 286             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 287                 return sink;
 288             }
 289         };
 290     }
 291 
 292     @Override
 293     public final DoubleStream filter(DoublePredicate predicate) {
 294         Objects.requireNonNull(predicate);
 295         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 296                                        StreamOpFlag.NOT_SIZED) {
 297             @Override
 298             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 299                 return new Sink.ChainedDouble<Double>(sink) {
 300                     @Override
 301                     public void begin(long size) {
 302                         downstream.begin(-1);
 303                     }
 304 
 305                     @Override
 306                     public void accept(double t) {
 307                         if (predicate.test(t))
 308                             downstream.accept(t);
 309                     }
 310                 };
 311             }
 312         };
 313     }
 314 
 315     @Override
 316     public final DoubleStream peek(DoubleConsumer action) {
 317         Objects.requireNonNull(action);
 318         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 319                                        0) {
 320             @Override
 321             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 322                 return new Sink.ChainedDouble<Double>(sink) {
 323                     @Override
 324                     public void accept(double t) {
 325                         action.accept(t);
 326                         downstream.accept(t);
 327                     }
 328                 };
 329             }
 330         };
 331     }
 332 
 333     // Stateful intermediate ops from DoubleStream
 334 
 335     @Override
 336     public final DoubleStream limit(long maxSize) {
 337         if (maxSize < 0)
 338             throw new IllegalArgumentException(Long.toString(maxSize));
 339         return SliceOps.makeDouble(this, (long) 0, maxSize);
 340     }
 341 
 342     @Override
 343     public final DoubleStream skip(long n) {
 344         if (n < 0)
 345             throw new IllegalArgumentException(Long.toString(n));
 346         if (n == 0)
 347             return this;
 348         else {
 349             long limit = -1;
 350             return SliceOps.makeDouble(this, n, limit);
 351         }
 352     }
 353 
 354     @Override
 355     public final DoubleStream sorted() {
 356         return SortedOps.makeDouble(this);
 357     }
 358 
 359     @Override
 360     public final DoubleStream distinct() {
 361         // While functional and quick to implement, this approach is not very efficient.
 362         // An efficient version requires a double-specific map/set implementation.
 363         return boxed().distinct().mapToDouble(i -> (double) i);
 364     }
 365 
 366     // Terminal ops from DoubleStream
 367 
 368     @Override
 369     public void forEach(DoubleConsumer consumer) {
 370         evaluate(ForEachOps.makeDouble(consumer, false));
 371     }
 372 
 373     @Override
 374     public void forEachOrdered(DoubleConsumer consumer) {
 375         evaluate(ForEachOps.makeDouble(consumer, true));
 376     }
 377 
 378     @Override
 379     public final double sum() {
 380         /*
 381          * In the arrays allocated for the collect operation, index 0
 382          * holds the high-order bits of the running sum and index 1
 383          * holds the low-order bits of the sum computed via
 384          * compensated summation.
 385          */
 386         double[] summation = collect(() -> new double[2],
 387                                (ll, d) -> {
 388                                    Collectors.sumWithCompensation(ll, d);
 389                                },
 390                                (ll, rr) -> {
 391                                    Collectors.sumWithCompensation(ll, rr[0]);
 392                                    Collectors.sumWithCompensation(ll, rr[1]);
 393                                });
 394 
 395         // Better error bounds to add both terms as the final sum
 396         return summation[0] + summation[1];
 397     }
 398 
 399     @Override
 400     public final OptionalDouble min() {
 401         return reduce(Math::min);
 402     }
 403 
 404     @Override
 405     public final OptionalDouble max() {
 406         return reduce(Math::max);
 407     }
 408 
 409     /**
 410      * {@inheritDoc}
 411      *
 412      * @implNote The {@code double} format can represent all
 413      * consecutive integers in the range -2<sup>53</sup> to
 414      * 2<sup>53</sup>. If the pipeline has more than 2<sup>53</sup>
 415      * values, the divisor in the average computation will saturate at
 416      * 2<sup>53</sup>, leading to additional numerical errors.
 417      */
 418     @Override
 419     public final OptionalDouble average() {
 420         /*
 421          * In the arrays allocated for the collect operation, index 0
 422          * holds the high-order bits of the running sum, index 1 holds
 423          * the low-order bits of the sum computed via compensated
 424          * summation, and index 2 holds the number of values seen.
 425          */
 426         double[] avg = collect(() -> new double[3],
 427                                (ll, d) -> {
 428                                    ll[2]++;
 429                                    Collectors.sumWithCompensation(ll, d);
 430                                },
 431                                (ll, rr) -> {
 432                                    Collectors.sumWithCompensation(ll, rr[0]);
 433                                    Collectors.sumWithCompensation(ll, rr[1]);
 434                                    ll[2] += rr[2];
 435                                });
 436         return avg[2] > 0
 437             // Better error bounds to add both terms as the final sum to compute average
 438             ? OptionalDouble.of((avg[0] + avg[1]) / avg[2])
 439             : OptionalDouble.empty();
 440     }
 441 
 442     @Override
 443     public final long count() {
 444         return mapToObj(e -> null).mapToInt(e -> 1).sum();
 445     }
 446 
 447     @Override
 448     public final DoubleSummaryStatistics summaryStatistics() {
 449         return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept,
 450                        DoubleSummaryStatistics::combine);
 451     }
 452 
 453     @Override
 454     public final double reduce(double identity, DoubleBinaryOperator op) {
 455         return evaluate(ReduceOps.makeDouble(identity, op));
 456     }
 457 
 458     @Override
 459     public final OptionalDouble reduce(DoubleBinaryOperator op) {
 460         return evaluate(ReduceOps.makeDouble(op));
 461     }
 462 
 463     @Override
 464     public final <R> R collect(Supplier<R> supplier,
 465                                ObjDoubleConsumer<R> accumulator,
 466                                BiConsumer<R, R> combiner) {
 467         BinaryOperator<R> operator = (left, right) -> {
 468             combiner.accept(left, right);
 469             return left;
 470         };
 471         return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator));
 472     }
 473 
 474     @Override
 475     public final boolean anyMatch(DoublePredicate predicate) {
 476         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY));
 477     }
 478 
 479     @Override
 480     public final boolean allMatch(DoublePredicate predicate) {
 481         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL));
 482     }
 483 
 484     @Override
 485     public final boolean noneMatch(DoublePredicate predicate) {
 486         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE));
 487     }
 488 
 489     @Override
 490     public final OptionalDouble findFirst() {
 491         return evaluate(FindOps.makeDouble(true));
 492     }
 493 
 494     @Override
 495     public final OptionalDouble findAny() {
 496         return evaluate(FindOps.makeDouble(false));
 497     }
 498 
 499     @Override
 500     public final double[] toArray() {
 501         return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new))
 502                         .asPrimitiveArray();
 503     }
 504 
 505     //
 506 
 507     /**
 508      * Source stage of a DoubleStream
 509      *
 510      * @param <E_IN> type of elements in the upstream source
 511      */
 512     static class Head<E_IN> extends DoublePipeline<E_IN> {
 513         /**
 514          * Constructor for the source stage of a DoubleStream.
 515          *
 516          * @param source {@code Supplier<Spliterator>} describing the stream
 517          *               source
 518          * @param sourceFlags the source flags for the stream source, described
 519          *                    in {@link StreamOpFlag}
 520          * @param parallel {@code true} if the pipeline is parallel
 521          */
 522         Head(Supplier<? extends Spliterator<Double>> source,
 523              int sourceFlags, boolean parallel) {
 524             super(source, sourceFlags, parallel);
 525         }
 526 
 527         /**
 528          * Constructor for the source stage of a DoubleStream.
 529          *
 530          * @param source {@code Spliterator} describing the stream source
 531          * @param sourceFlags the source flags for the stream source, described
 532          *                    in {@link StreamOpFlag}
 533          * @param parallel {@code true} if the pipeline is parallel
 534          */
 535         Head(Spliterator<Double> source,
 536              int sourceFlags, boolean parallel) {
 537             super(source, sourceFlags, parallel);
 538         }
 539 
 540         @Override
 541         final boolean opIsStateful() {
 542             throw new UnsupportedOperationException();
 543         }
 544 
 545         @Override
 546         final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) {
 547             throw new UnsupportedOperationException();
 548         }
 549 
 550         // Optimized sequential terminal operations for the head of the pipeline
 551 
 552         @Override
 553         public void forEach(DoubleConsumer consumer) {
 554             if (!isParallel()) {
 555                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
 556             }
 557             else {
 558                 super.forEach(consumer);
 559             }
 560         }
 561 
 562         @Override
 563         public void forEachOrdered(DoubleConsumer consumer) {
 564             if (!isParallel()) {
 565                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
 566             }
 567             else {
 568                 super.forEachOrdered(consumer);
 569             }
 570         }
 571 
 572     }
 573 
 574     /**
 575      * Base class for a stateless intermediate stage of a DoubleStream.
 576      *
 577      * @param <E_IN> type of elements in the upstream source
 578      * @since 1.8
 579      */
 580     abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> {
 581         /**
 582          * Construct a new DoubleStream by appending a stateless intermediate
 583          * operation to an existing stream.
 584          *
 585          * @param upstream the upstream pipeline stage
 586          * @param inputShape the stream shape for the upstream pipeline stage
 587          * @param opFlags operation flags for the new stage
 588          */
 589         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 590                     StreamShape inputShape,
 591                     int opFlags) {
 592             super(upstream, opFlags);
 593             assert upstream.getOutputShape() == inputShape;
 594         }
 595 
 596         @Override
 597         final boolean opIsStateful() {
 598             return false;
 599         }
 600     }
 601 
 602     /**
 603      * Base class for a stateful intermediate stage of a DoubleStream.
 604      *
 605      * @param <E_IN> type of elements in the upstream source
 606      * @since 1.8
 607      */
 608     abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> {
 609         /**
 610          * Construct a new DoubleStream by appending a stateful intermediate
 611          * operation to an existing stream.
 612          *
 613          * @param upstream the upstream pipeline stage
 614          * @param inputShape the stream shape for the upstream pipeline stage
 615          * @param opFlags operation flags for the new stage
 616          */
 617         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 618                    StreamShape inputShape,
 619                    int opFlags) {
 620             super(upstream, opFlags);
 621             assert upstream.getOutputShape() == inputShape;
 622         }
 623 
 624         @Override
 625         final boolean opIsStateful() {
 626             return true;
 627         }
 628 
 629         @Override
 630         abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 631                                                         Spliterator<P_IN> spliterator,
 632                                                         IntFunction<Double[]> generator);
 633     }
 634 }