1 /*
   2  * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.DoubleSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.PrimitiveIterator;
  31 import java.util.Spliterator;
  32 import java.util.Spliterators;
  33 import java.util.function.BiConsumer;
  34 import java.util.function.BinaryOperator;
  35 import java.util.function.DoubleBinaryOperator;
  36 import java.util.function.DoubleConsumer;
  37 import java.util.function.DoubleFunction;
  38 import java.util.function.DoublePredicate;
  39 import java.util.function.DoubleToIntFunction;
  40 import java.util.function.DoubleToLongFunction;
  41 import java.util.function.DoubleUnaryOperator;
  42 import java.util.function.IntFunction;
  43 import java.util.function.LongPredicate;
  44 import java.util.function.ObjDoubleConsumer;
  45 import java.util.function.Supplier;
  46 
  47 /**
  48  * Abstract base class for an intermediate pipeline stage or pipeline source
  49  * stage implementing whose elements are of type {@code double}.
  50  *
  51  * @param <E_IN> type of elements in the upstream source
  52  *
  53  * @since 1.8
  54  */
  55 abstract class DoublePipeline<E_IN>
  56         extends AbstractPipeline<E_IN, Double, DoubleStream>
  57         implements DoubleStream {
  58 
  59     /**
  60      * Constructor for the head of a stream pipeline.
  61      *
  62      * @param source {@code Supplier<Spliterator>} describing the stream source
  63      * @param sourceFlags the source flags for the stream source, described in
  64      * {@link StreamOpFlag}
  65      */
  66     DoublePipeline(Supplier<? extends Spliterator<Double>> source,
  67                    int sourceFlags, boolean parallel) {
  68         super(source, sourceFlags, parallel);
  69     }
  70 
  71     /**
  72      * Constructor for the head of a stream pipeline.
  73      *
  74      * @param source {@code Spliterator} describing the stream source
  75      * @param sourceFlags the source flags for the stream source, described in
  76      * {@link StreamOpFlag}
  77      */
  78     DoublePipeline(Spliterator<Double> source,
  79                    int sourceFlags, boolean parallel) {
  80         super(source, sourceFlags, parallel);
  81     }
  82 
  83     /**
  84      * Constructor for appending an intermediate operation onto an existing
  85      * pipeline.
  86      *
  87      * @param upstream the upstream element source.
  88      * @param opFlags the operation flags
  89      */
  90     DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  91         super(upstream, opFlags);
  92     }
  93 
  94     /**
  95      * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply
  96      * by casting.
  97      */
  98     private static DoubleConsumer adapt(Sink<Double> sink) {
  99         if (sink instanceof DoubleConsumer) {
 100             return (DoubleConsumer) sink;
 101         } else {
 102             if (Tripwire.ENABLED)
 103                 Tripwire.trip(AbstractPipeline.class,
 104                               "using DoubleStream.adapt(Sink<Double> s)");
 105             return sink::accept;
 106         }
 107     }
 108 
 109     /**
 110      * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}.
 111      *
 112      * @implNote
 113      * The implementation attempts to cast to a Spliterator.OfDouble, and throws
 114      * an exception if this cast is not possible.
 115      */
 116     private static Spliterator.OfDouble adapt(Spliterator<Double> s) {
 117         if (s instanceof Spliterator.OfDouble) {
 118             return (Spliterator.OfDouble) s;
 119         } else {
 120             if (Tripwire.ENABLED)
 121                 Tripwire.trip(AbstractPipeline.class,
 122                               "using DoubleStream.adapt(Spliterator<Double> s)");
 123             throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)");
 124         }
 125     }
 126 
 127 
 128     // Shape-specific methods
 129 
 130     @Override
 131     final StreamShape getOutputShape() {
 132         return StreamShape.DOUBLE_VALUE;
 133     }
 134 
 135     @Override
 136     final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper,
 137                                              Spliterator<P_IN> spliterator,
 138                                              boolean flattenTree,
 139                                              IntFunction<Double[]> generator) {
 140         return Nodes.collectDouble(helper, spliterator, flattenTree);
 141     }
 142 
 143     @Override
 144     final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph,
 145                                           Supplier<Spliterator<P_IN>> supplier,
 146                                           boolean isParallel) {
 147         return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel);
 148     }
 149 
 150     @Override
 151     @SuppressWarnings("unchecked")
 152     final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) {
 153         return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier);
 154     }
 155 
 156     @Override
 157     final boolean forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
 158         Spliterator.OfDouble spl = adapt(spliterator);
 159         DoubleConsumer adaptedSink = adapt(sink);
 160         boolean cancelled;
 161         do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
 162         return cancelled;
 163     }
 164 
 165     @Override
 166     final  Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) {
 167         return Nodes.doubleBuilder(exactSizeIfKnown);
 168     }
 169 
 170 
 171     // DoubleStream
 172 
 173     @Override
 174     public final PrimitiveIterator.OfDouble iterator() {
 175         return Spliterators.iterator(spliterator());
 176     }
 177 
 178     @Override
 179     public final Spliterator.OfDouble spliterator() {
 180         return adapt(super.spliterator());
 181     }
 182 
 183     // Stateless intermediate ops from DoubleStream
 184 
 185     @Override
 186     public final Stream<Double> boxed() {
 187         return mapToObj(Double::valueOf);
 188     }
 189 
 190     @Override
 191     public final DoubleStream map(DoubleUnaryOperator mapper) {
 192         Objects.requireNonNull(mapper);
 193         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 194                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 195             @Override
 196             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 197                 return new Sink.ChainedDouble<Double>(sink) {
 198                     @Override
 199                     public void accept(double t) {
 200                         downstream.accept(mapper.applyAsDouble(t));
 201                     }
 202                 };
 203             }
 204         };
 205     }
 206 
 207     @Override
 208     public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
 209         Objects.requireNonNull(mapper);
 210         return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE,
 211                                                             StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 212             @Override
 213             Sink<Double> opWrapSink(int flags, Sink<U> sink) {
 214                 return new Sink.ChainedDouble<U>(sink) {
 215                     @Override
 216                     public void accept(double t) {
 217                         downstream.accept(mapper.apply(t));
 218                     }
 219                 };
 220             }
 221         };
 222     }
 223 
 224     @Override
 225     public final IntStream mapToInt(DoubleToIntFunction mapper) {
 226         Objects.requireNonNull(mapper);
 227         return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 228                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 229             @Override
 230             Sink<Double> opWrapSink(int flags, Sink<Integer> sink) {
 231                 return new Sink.ChainedDouble<Integer>(sink) {
 232                     @Override
 233                     public void accept(double t) {
 234                         downstream.accept(mapper.applyAsInt(t));
 235                     }
 236                 };
 237             }
 238         };
 239     }
 240 
 241     @Override
 242     public final LongStream mapToLong(DoubleToLongFunction mapper) {
 243         Objects.requireNonNull(mapper);
 244         return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 245                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 246             @Override
 247             Sink<Double> opWrapSink(int flags, Sink<Long> sink) {
 248                 return new Sink.ChainedDouble<Long>(sink) {
 249                     @Override
 250                     public void accept(double t) {
 251                         downstream.accept(mapper.applyAsLong(t));
 252                     }
 253                 };
 254             }
 255         };
 256     }
 257 
 258     @Override
 259     public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
 260         Objects.requireNonNull(mapper);
 261         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 262                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 263             @Override
 264             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 265                 return new Sink.ChainedDouble<Double>(sink) {
 266                     @Override
 267                     public void begin(long size) {
 268                         downstream.begin(-1);
 269                     }
 270 
 271                     @Override
 272                     public void accept(double t) {
 273                         try (DoubleStream result = mapper.apply(t)) {
 274                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 275                             if (result != null)
 276                                 result.sequential().forEach(i -> downstream.accept(i));
 277                         }
 278                     }
 279                 };
 280             }
 281         };
 282     }
 283 
 284     @Override
 285     public DoubleStream unordered() {
 286         if (!isOrdered())
 287             return this;
 288         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) {
 289             @Override
 290             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 291                 return sink;
 292             }
 293         };
 294     }
 295 
 296     @Override
 297     public final DoubleStream filter(DoublePredicate predicate) {
 298         Objects.requireNonNull(predicate);
 299         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 300                                        StreamOpFlag.NOT_SIZED) {
 301             @Override
 302             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 303                 return new Sink.ChainedDouble<Double>(sink) {
 304                     @Override
 305                     public void begin(long size) {
 306                         downstream.begin(-1);
 307                     }
 308 
 309                     @Override
 310                     public void accept(double t) {
 311                         if (predicate.test(t))
 312                             downstream.accept(t);
 313                     }
 314                 };
 315             }
 316         };
 317     }
 318 
 319     @Override
 320     public final DoubleStream peek(DoubleConsumer action) {
 321         Objects.requireNonNull(action);
 322         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 323                                        0) {
 324             @Override
 325             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 326                 return new Sink.ChainedDouble<Double>(sink) {
 327                     @Override
 328                     public void accept(double t) {
 329                         action.accept(t);
 330                         downstream.accept(t);
 331                     }
 332                 };
 333             }
 334         };
 335     }
 336 
 337     // Stateful intermediate ops from DoubleStream
 338 
 339     @Override
 340     public final DoubleStream limit(long maxSize) {
 341         if (maxSize < 0)
 342             throw new IllegalArgumentException(Long.toString(maxSize));
 343         return SliceOps.makeDouble(this, (long) 0, maxSize);
 344     }
 345 
 346     @Override
 347     public final DoubleStream skip(long n) {
 348         if (n < 0)
 349             throw new IllegalArgumentException(Long.toString(n));
 350         if (n == 0)
 351             return this;
 352         else {
 353             long limit = -1;
 354             return SliceOps.makeDouble(this, n, limit);
 355         }
 356     }
 357 
 358     @Override
 359     public final DoubleStream takeWhile(DoublePredicate predicate) {
 360         return WhileOps.makeTakeWhileDouble(this, predicate);
 361     }
 362 
 363     @Override
 364     public final DoubleStream dropWhile(DoublePredicate predicate) {
 365         return WhileOps.makeDropWhileDouble(this, predicate);
 366     }
 367 
 368     @Override
 369     public final DoubleStream sorted() {
 370         return SortedOps.makeDouble(this);
 371     }
 372 
 373     @Override
 374     public final DoubleStream distinct() {
 375         // While functional and quick to implement, this approach is not very efficient.
 376         // An efficient version requires a double-specific map/set implementation.
 377         return boxed().distinct().mapToDouble(i -> (double) i);
 378     }
 379 
 380     // Terminal ops from DoubleStream
 381 
 382     @Override
 383     public void forEach(DoubleConsumer consumer) {
 384         evaluate(ForEachOps.makeDouble(consumer, false));
 385     }
 386 
 387     @Override
 388     public void forEachOrdered(DoubleConsumer consumer) {
 389         evaluate(ForEachOps.makeDouble(consumer, true));
 390     }
 391 
 392     @Override
 393     public final double sum() {
 394         /*
 395          * In the arrays allocated for the collect operation, index 0
 396          * holds the high-order bits of the running sum, index 1 holds
 397          * the low-order bits of the sum computed via compensated
 398          * summation, and index 2 holds the simple sum used to compute
 399          * the proper result if the stream contains infinite values of
 400          * the same sign.
 401          */
 402         double[] summation = collect(() -> new double[3],
 403                                (ll, d) -> {
 404                                    Collectors.sumWithCompensation(ll, d);
 405                                    ll[2] += d;
 406                                },
 407                                (ll, rr) -> {
 408                                    Collectors.sumWithCompensation(ll, rr[0]);
 409                                    Collectors.sumWithCompensation(ll, rr[1]);
 410                                    ll[2] += rr[2];
 411                                });
 412 
 413         return Collectors.computeFinalSum(summation);
 414     }
 415 
 416     @Override
 417     public final OptionalDouble min() {
 418         return reduce(Math::min);
 419     }
 420 
 421     @Override
 422     public final OptionalDouble max() {
 423         return reduce(Math::max);
 424     }
 425 
 426     /**
 427      * {@inheritDoc}
 428      *
 429      * @implNote The {@code double} format can represent all
 430      * consecutive integers in the range -2<sup>53</sup> to
 431      * 2<sup>53</sup>. If the pipeline has more than 2<sup>53</sup>
 432      * values, the divisor in the average computation will saturate at
 433      * 2<sup>53</sup>, leading to additional numerical errors.
 434      */
 435     @Override
 436     public final OptionalDouble average() {
 437         /*
 438          * In the arrays allocated for the collect operation, index 0
 439          * holds the high-order bits of the running sum, index 1 holds
 440          * the low-order bits of the sum computed via compensated
 441          * summation, index 2 holds the number of values seen, index 3
 442          * holds the simple sum.
 443          */
 444         double[] avg = collect(() -> new double[4],
 445                                (ll, d) -> {
 446                                    ll[2]++;
 447                                    Collectors.sumWithCompensation(ll, d);
 448                                    ll[3] += d;
 449                                },
 450                                (ll, rr) -> {
 451                                    Collectors.sumWithCompensation(ll, rr[0]);
 452                                    Collectors.sumWithCompensation(ll, rr[1]);
 453                                    ll[2] += rr[2];
 454                                    ll[3] += rr[3];
 455                                });
 456         return avg[2] > 0
 457             ? OptionalDouble.of(Collectors.computeFinalSum(avg) / avg[2])
 458             : OptionalDouble.empty();
 459     }
 460 
 461     @Override
 462     public final long count() {
 463         return evaluate(ReduceOps.makeDoubleCounting());
 464     }
 465 
 466     @Override
 467     public final DoubleSummaryStatistics summaryStatistics() {
 468         return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept,
 469                        DoubleSummaryStatistics::combine);
 470     }
 471 
 472     @Override
 473     public final double reduce(double identity, DoubleBinaryOperator op) {
 474         return evaluate(ReduceOps.makeDouble(identity, op));
 475     }
 476 
 477     @Override
 478     public final OptionalDouble reduce(DoubleBinaryOperator op) {
 479         return evaluate(ReduceOps.makeDouble(op));
 480     }
 481 
 482     @Override
 483     public final <R> R collect(Supplier<R> supplier,
 484                                ObjDoubleConsumer<R> accumulator,
 485                                BiConsumer<R, R> combiner) {
 486         Objects.requireNonNull(combiner);
 487         BinaryOperator<R> operator = (left, right) -> {
 488             combiner.accept(left, right);
 489             return left;
 490         };
 491         return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator));
 492     }
 493 
 494     @Override
 495     public final boolean anyMatch(DoublePredicate predicate) {
 496         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY));
 497     }
 498 
 499     @Override
 500     public final boolean allMatch(DoublePredicate predicate) {
 501         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL));
 502     }
 503 
 504     @Override
 505     public final boolean noneMatch(DoublePredicate predicate) {
 506         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE));
 507     }
 508 
 509     @Override
 510     public final OptionalDouble findFirst() {
 511         return evaluate(FindOps.makeDouble(true));
 512     }
 513 
 514     @Override
 515     public final OptionalDouble findAny() {
 516         return evaluate(FindOps.makeDouble(false));
 517     }
 518 
 519     @Override
 520     public final double[] toArray() {
 521         return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new))
 522                         .asPrimitiveArray();
 523     }
 524 
 525     //
 526 
 527     /**
 528      * Source stage of a DoubleStream
 529      *
 530      * @param <E_IN> type of elements in the upstream source
 531      */
 532     static class Head<E_IN> extends DoublePipeline<E_IN> {
 533         /**
 534          * Constructor for the source stage of a DoubleStream.
 535          *
 536          * @param source {@code Supplier<Spliterator>} describing the stream
 537          *               source
 538          * @param sourceFlags the source flags for the stream source, described
 539          *                    in {@link StreamOpFlag}
 540          * @param parallel {@code true} if the pipeline is parallel
 541          */
 542         Head(Supplier<? extends Spliterator<Double>> source,
 543              int sourceFlags, boolean parallel) {
 544             super(source, sourceFlags, parallel);
 545         }
 546 
 547         /**
 548          * Constructor for the source stage of a DoubleStream.
 549          *
 550          * @param source {@code Spliterator} describing the stream source
 551          * @param sourceFlags the source flags for the stream source, described
 552          *                    in {@link StreamOpFlag}
 553          * @param parallel {@code true} if the pipeline is parallel
 554          */
 555         Head(Spliterator<Double> source,
 556              int sourceFlags, boolean parallel) {
 557             super(source, sourceFlags, parallel);
 558         }
 559 
 560         @Override
 561         final boolean opIsStateful() {
 562             throw new UnsupportedOperationException();
 563         }
 564 
 565         @Override
 566         final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) {
 567             throw new UnsupportedOperationException();
 568         }
 569 
 570         // Optimized sequential terminal operations for the head of the pipeline
 571 
 572         @Override
 573         public void forEach(DoubleConsumer consumer) {
 574             if (!isParallel()) {
 575                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
 576             }
 577             else {
 578                 super.forEach(consumer);
 579             }
 580         }
 581 
 582         @Override
 583         public void forEachOrdered(DoubleConsumer consumer) {
 584             if (!isParallel()) {
 585                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
 586             }
 587             else {
 588                 super.forEachOrdered(consumer);
 589             }
 590         }
 591 
 592     }
 593 
 594     /**
 595      * Base class for a stateless intermediate stage of a DoubleStream.
 596      *
 597      * @param <E_IN> type of elements in the upstream source
 598      * @since 1.8
 599      */
 600     abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> {
 601         /**
 602          * Construct a new DoubleStream by appending a stateless intermediate
 603          * operation to an existing stream.
 604          *
 605          * @param upstream the upstream pipeline stage
 606          * @param inputShape the stream shape for the upstream pipeline stage
 607          * @param opFlags operation flags for the new stage
 608          */
 609         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 610                     StreamShape inputShape,
 611                     int opFlags) {
 612             super(upstream, opFlags);
 613             assert upstream.getOutputShape() == inputShape;
 614         }
 615 
 616         @Override
 617         final boolean opIsStateful() {
 618             return false;
 619         }
 620     }
 621 
 622     /**
 623      * Base class for a stateful intermediate stage of a DoubleStream.
 624      *
 625      * @param <E_IN> type of elements in the upstream source
 626      * @since 1.8
 627      */
 628     abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> {
 629         /**
 630          * Construct a new DoubleStream by appending a stateful intermediate
 631          * operation to an existing stream.
 632          *
 633          * @param upstream the upstream pipeline stage
 634          * @param inputShape the stream shape for the upstream pipeline stage
 635          * @param opFlags operation flags for the new stage
 636          */
 637         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 638                    StreamShape inputShape,
 639                    int opFlags) {
 640             super(upstream, opFlags);
 641             assert upstream.getOutputShape() == inputShape;
 642         }
 643 
 644         @Override
 645         final boolean opIsStateful() {
 646             return true;
 647         }
 648 
 649         @Override
 650         abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 651                                                         Spliterator<P_IN> spliterator,
 652                                                         IntFunction<Double[]> generator);
 653     }
 654 }