1 /*
   2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.DoubleSummaryStatistics;
  28 import java.util.Objects;
  29 import java.util.OptionalDouble;
  30 import java.util.PrimitiveIterator;
  31 import java.util.Spliterator;
  32 import java.util.Spliterators;
  33 import java.util.function.BiConsumer;
  34 import java.util.function.BinaryOperator;
  35 import java.util.function.DoubleBinaryOperator;
  36 import java.util.function.DoubleConsumer;
  37 import java.util.function.DoubleFunction;
  38 import java.util.function.DoublePredicate;
  39 import java.util.function.DoubleToIntFunction;
  40 import java.util.function.DoubleToLongFunction;
  41 import java.util.function.DoubleUnaryOperator;
  42 import java.util.function.IntFunction;
  43 import java.util.function.ObjDoubleConsumer;
  44 import java.util.function.Supplier;
  45 
  46 /**
  47  * Abstract base class for an intermediate pipeline stage or pipeline source
  48  * stage implementing whose elements are of type {@code double}.
  49  * @param <E_IN> Type of elements in the upstream source.
  50  * @since 1.8
  51  */
  52 abstract class DoublePipeline<E_IN>
  53         extends AbstractPipeline<E_IN, Double, DoubleStream>
  54         implements DoubleStream {
  55 
  56     /**
  57      * Constructor for the head of a stream pipeline.
  58      *
  59      * @param source {@code Supplier<Spliterator>} describing the stream source
  60      * @param sourceFlags The source flags for the stream source, described in
  61      * {@link StreamOpFlag}
  62      */
  63     DoublePipeline(Supplier<? extends Spliterator<Double>> source,
  64                    int sourceFlags, boolean parallel) {
  65         super(source, sourceFlags, parallel);
  66     }
  67 
  68     /**
  69      * Constructor for the head of a stream pipeline.
  70      *
  71      * @param source {@code Spliterator} describing the stream source
  72      * @param sourceFlags The source flags for the stream source, described in
  73      * {@link StreamOpFlag}
  74      */
  75     DoublePipeline(Spliterator<Double> source,
  76                    int sourceFlags, boolean parallel) {
  77         super(source, sourceFlags, parallel);
  78     }
  79 
  80     /**
  81      * Constructor for appending an intermediate operation onto an existing pipeline.
  82      *
  83      * @param upstream the upstream element source.
  84      * @param opFlags the operation flags
  85      */
  86     DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
  87         super(upstream, opFlags);
  88     }
  89 
  90     /**
  91      * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply
  92      * by casting
  93      */
  94     private static DoubleConsumer adapt(Sink<Double> sink) {
  95         if (sink instanceof DoubleConsumer) {
  96             return (DoubleConsumer) sink;
  97         }
  98         else {
  99             if (Tripwire.ENABLED)
 100                 Tripwire.trip(AbstractPipeline.class,
 101                               "using DoubleStream.adapt(Sink<Double> s)");
 102             return sink::accept;
 103         }
 104     }
 105 
 106     /**
 107      * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}.
 108      *
 109      * @implNote
 110      * The implementation attempts to cast to a Spliterator.OfDouble, and throws an
 111      * exception if this cast is not possible.
 112      */
 113     private static Spliterator.OfDouble adapt(Spliterator<Double> s) {
 114         if (s instanceof Spliterator.OfDouble) {
 115             return (Spliterator.OfDouble) s;
 116         }
 117         else {
 118             if (Tripwire.ENABLED)
 119                 Tripwire.trip(AbstractPipeline.class,
 120                               "using DoubleStream.adapt(Spliterator<Double> s)");
 121             throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)");
 122         }
 123     }
 124 
 125 
 126     // Shape-specific methods
 127 
 128     @Override
 129     final StreamShape getOutputShape() {
 130         return StreamShape.DOUBLE_VALUE;
 131     }
 132 
 133     @Override
 134     final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper,
 135                                              Spliterator<P_IN> spliterator,
 136                                              boolean flattenTree,
 137                                              IntFunction<Double[]> generator) {
 138         return Nodes.collectDouble(helper, spliterator, flattenTree);
 139     }
 140 
 141     @Override
 142     final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph,
 143                                           Supplier<Spliterator<P_IN>> supplier,
 144                                           boolean isParallel) {
 145         return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel);
 146     }
 147 
 148     @Override
 149     final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) {
 150         return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier);
 151     }
 152 
 153     @Override
 154     final void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
 155         Spliterator.OfDouble spl = adapt(spliterator);
 156         DoubleConsumer adaptedSink = adapt(sink);
 157         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
 158     }
 159 
 160     @Override
 161     final  Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) {
 162         return Nodes.doubleBuilder(exactSizeIfKnown);
 163     }
 164 
 165 
 166     // DoubleStream
 167 
 168     @Override
 169     public final PrimitiveIterator.OfDouble iterator() {
 170         return Spliterators.iteratorFromSpliterator(spliterator());
 171     }
 172 
 173     @Override
 174     public final Spliterator.OfDouble spliterator() {
 175         return adapt(super.spliterator());
 176     }
 177 
 178     // Stateless intermediate ops from DoubleStream
 179 
 180     @Override
 181     public final Stream<Double> boxed() {
 182         return mapToObj(Double::valueOf);
 183     }
 184 
 185     @Override
 186     public final DoubleStream map(DoubleUnaryOperator mapper) {
 187         Objects.requireNonNull(mapper);
 188         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 189                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 190             @Override
 191             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 192                 return new Sink.ChainedDouble(sink) {
 193                     @Override
 194                     public void accept(double t) {
 195                         downstream.accept(mapper.applyAsDouble(t));
 196                     }
 197                 };
 198             }
 199         };
 200     }
 201 
 202     @Override
 203     public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
 204         Objects.requireNonNull(mapper);
 205         return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE,
 206                                                             StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 207             @Override
 208             Sink<Double> opWrapSink(int flags, Sink<U> sink) {
 209                 return new Sink.ChainedDouble(sink) {
 210                     @Override
 211                     public void accept(double t) {
 212                         downstream.accept(mapper.apply(t));
 213                     }
 214                 };
 215             }
 216         };
 217     }
 218 
 219     @Override
 220     public final IntStream mapToInt(DoubleToIntFunction mapper) {
 221         Objects.requireNonNull(mapper);
 222         return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 223                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 224             @Override
 225             Sink<Double> opWrapSink(int flags, Sink<Integer> sink) {
 226                 return new Sink.ChainedDouble(sink) {
 227                     @Override
 228                     public void accept(double t) {
 229                         downstream.accept(mapper.applyAsInt(t));
 230                     }
 231                 };
 232             }
 233         };
 234     }
 235 
 236     @Override
 237     public final LongStream mapToLong(DoubleToLongFunction mapper) {
 238         Objects.requireNonNull(mapper);
 239         return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 240                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
 241             @Override
 242             Sink<Double> opWrapSink(int flags, Sink<Long> sink) {
 243                 return new Sink.ChainedDouble(sink) {
 244                     @Override
 245                     public void accept(double t) {
 246                         downstream.accept(mapper.applyAsLong(t));
 247                     }
 248                 };
 249             }
 250         };
 251     }
 252 
 253     @Override
 254     public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
 255         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 256                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 257             @Override
 258             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 259                 return new Sink.ChainedDouble(sink) {
 260                     public void accept(double t) {
 261                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 262                         DoubleStream result = mapper.apply(t);
 263                         if (result != null)
 264                             result.sequential().forEach(i -> downstream.accept(i));
 265                     }
 266                 };
 267             }
 268         };
 269     }
 270 
 271     @Override
 272     public DoubleStream unordered() {
 273         if (!isOrdered())
 274             return this;
 275         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) {
 276             @Override
 277             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 278                 return sink;
 279             }
 280         };
 281     }
 282 
 283     @Override
 284     public final DoubleStream filter(DoublePredicate predicate) {
 285         Objects.requireNonNull(predicate);
 286         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 287                                        StreamOpFlag.NOT_SIZED) {
 288             @Override
 289             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 290                 return new Sink.ChainedDouble(sink) {
 291                     @Override
 292                     public void accept(double t) {
 293                         if (predicate.test(t))
 294                             downstream.accept(t);
 295                     }
 296                 };
 297             }
 298         };
 299     }
 300 
 301     @Override
 302     public final DoubleStream peek(DoubleConsumer consumer) {
 303         Objects.requireNonNull(consumer);
 304         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
 305                                        0) {
 306             @Override
 307             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 308                 return new Sink.ChainedDouble(sink) {
 309                     @Override
 310                     public void accept(double t) {
 311                         consumer.accept(t);
 312                         downstream.accept(t);
 313                     }
 314                 };
 315             }
 316         };
 317     }
 318 
 319     // Stateful intermediate ops from DoubleStream
 320 
 321     @Override
 322     public final DoubleStream limit(long maxSize) {
 323         if (maxSize < 0)
 324             throw new IllegalArgumentException(Long.toString(maxSize));
 325         return SliceOps.makeDouble(this, (long) 0, maxSize);
 326     }
 327 
 328     @Override
 329     public final DoubleStream substream(long startingOffset) {
 330         if (startingOffset < 0)
 331             throw new IllegalArgumentException(Long.toString(startingOffset));
 332         if (startingOffset == 0)
 333             return this;
 334         else {
 335             long limit = -1;
 336             return SliceOps.makeDouble(this, startingOffset, limit);
 337         }
 338     }
 339 
 340     @Override
 341     public final DoubleStream substream(long startingOffset, long endingOffset) {
 342         if (startingOffset < 0 || endingOffset < startingOffset)
 343             throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset));
 344         return SliceOps.makeDouble(this, startingOffset, endingOffset - startingOffset);
 345     }
 346 
 347     @Override
 348     public final DoubleStream sorted() {
 349         return SortedOps.makeDouble(this);
 350     }
 351 
 352     @Override
 353     public final DoubleStream distinct() {
 354         // While functional and quick to implement, this approach is not very efficient.
 355         // An efficient version requires a double-specific map/set implementation.
 356         return boxed().distinct().mapToDouble(i -> (double) i);
 357     }
 358 
 359     // Terminal ops from DoubleStream
 360 
 361     @Override
 362     public void forEach(DoubleConsumer consumer) {
 363         evaluate(ForEachOps.makeDouble(consumer, false));
 364     }
 365 
 366     @Override
 367     public void forEachOrdered(DoubleConsumer consumer) {
 368         evaluate(ForEachOps.makeDouble(consumer, true));
 369     }
 370 
 371     @Override
 372     public final double sum() {
 373         // TODO: better algorithm to compensate for errors
 374         return reduce(0.0, Double::sum);
 375     }
 376 
 377     @Override
 378     public final OptionalDouble min() {
 379         return reduce(Math::min);
 380     }
 381 
 382     @Override
 383     public final OptionalDouble max() {
 384         return reduce(Math::max);
 385     }
 386 
 387     @Override
 388     public final OptionalDouble average() {
 389         double[] avg = collect(() -> new double[2],
 390                                (ll, i) -> {
 391                                    ll[0]++;
 392                                    ll[1] += i;
 393                                },
 394                                (ll, rr) -> {
 395                                    ll[0] += rr[0];
 396                                    ll[1] += rr[1];
 397                                });
 398         return avg[0] > 0
 399                ? OptionalDouble.of(avg[1] / avg[0])
 400                : OptionalDouble.empty();
 401     }
 402 
 403     @Override
 404     public final long count() {
 405         return mapToObj(e -> null).mapToInt(e -> 1).sum();
 406     }
 407 
 408     @Override
 409     public final DoubleSummaryStatistics summaryStatistics() {
 410         return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept,
 411                        DoubleSummaryStatistics::combine);
 412     }
 413 
 414     @Override
 415     public final double reduce(double identity, DoubleBinaryOperator op) {
 416         return evaluate(ReduceOps.makeDouble(identity, op));
 417     }
 418 
 419     @Override
 420     public final OptionalDouble reduce(DoubleBinaryOperator op) {
 421         return evaluate(ReduceOps.makeDouble(op));
 422     }
 423 
 424     @Override
 425     public final <R> R collect(Supplier<R> resultFactory,
 426                                ObjDoubleConsumer<R> accumulator,
 427                                BiConsumer<R, R> combiner) {
 428         BinaryOperator<R> operator = (left, right) -> {
 429             combiner.accept(left, right);
 430             return left;
 431         };
 432         return evaluate(ReduceOps.makeDouble(resultFactory, accumulator, operator));
 433     }
 434 
 435     @Override
 436     public final boolean anyMatch(DoublePredicate predicate) {
 437         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY));
 438     }
 439 
 440     @Override
 441     public final boolean allMatch(DoublePredicate predicate) {
 442         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL));
 443     }
 444 
 445     @Override
 446     public final boolean noneMatch(DoublePredicate predicate) {
 447         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE));
 448     }
 449 
 450     @Override
 451     public final OptionalDouble findFirst() {
 452         return evaluate(FindOps.makeDouble(true));
 453     }
 454 
 455     @Override
 456     public final OptionalDouble findAny() {
 457         return evaluate(FindOps.makeDouble(false));
 458     }
 459 
 460     @Override
 461     public final double[] toArray() {
 462         return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new))
 463                         .asDoubleArray();
 464     }
 465 
 466     //
 467 
 468     /** Source stage of a DoubleStream */
 469     static class Head<E_IN> extends DoublePipeline<E_IN> {
 470         /**
 471          * Constructor for the source stage of a DoubleStream.
 472          *
 473          * @param source {@code Supplier<Spliterator>} describing the stream
 474          *               source
 475          * @param sourceFlags The source flags for the stream source, described
 476          *                    in {@link StreamOpFlag}
 477          * @param parallel True if the pipeline is parallel
 478          */
 479         Head(Supplier<? extends Spliterator<Double>> source,
 480              int sourceFlags, boolean parallel) {
 481             super(source, sourceFlags, parallel);
 482         }
 483 
 484         /**
 485          * Constructor for the source stage of a DoubleStream.
 486          *
 487          * @param source {@code Spliterator} describing the stream source
 488          * @param sourceFlags The source flags for the stream source, described
 489          *                    in {@link StreamOpFlag}
 490          * @param parallel True if the pipeline is parallel
 491          */
 492         Head(Spliterator<Double> source,
 493              int sourceFlags, boolean parallel) {
 494             super(source, sourceFlags, parallel);
 495         }
 496 
 497         @Override
 498         final boolean opIsStateful() {
 499             throw new UnsupportedOperationException();
 500         }
 501 
 502         @Override
 503         final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) {
 504             throw new UnsupportedOperationException();
 505         }
 506 
 507         // Optimized sequential terminal operations for the head of the pipeline
 508 
 509         @Override
 510         public void forEach(DoubleConsumer consumer) {
 511             if (!isParallel()) {
 512                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
 513             }
 514             else {
 515                 super.forEach(consumer);
 516             }
 517         }
 518 
 519         @Override
 520         public void forEachOrdered(DoubleConsumer consumer) {
 521             if (!isParallel()) {
 522                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
 523             }
 524             else {
 525                 super.forEachOrdered(consumer);
 526             }
 527         }
 528 
 529     }
 530 
 531     /** Base class for a stateless intermediate stage of a DoubleStream */
 532     abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> {
 533         /**
 534          * Construct a new DoubleStream by appending a stateless intermediate
 535          * operation to an existing stream.
 536          * @param upstream The upstream pipeline stage
 537          * @param inputShape The stream shape for the upstream pipeline stage
 538          * @param opFlags Operation flags for the new stage
 539          */
 540         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
 541                     StreamShape inputShape,
 542                     int opFlags) {
 543             super(upstream, opFlags);
 544             assert upstream.getOutputShape() == inputShape;
 545         }
 546 
 547         @Override
 548         final boolean opIsStateful() {
 549             return false;
 550         }
 551     }
 552 
 553     /** Base class for a stateful intermediate stage of a DoubleStream */
 554     abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> {
 555         /**
 556          * Construct a new DoubleStream by appending a stateful intermediate
 557          * operation to an existing stream.
 558          * @param upstream The upstream pipeline stage
 559          * @param inputShape The stream shape for the upstream pipeline stage
 560          * @param opFlags Operation flags for the new stage
 561          */
 562         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
 563                    StreamShape inputShape,
 564                    int opFlags) {
 565             super(upstream, opFlags);
 566             assert upstream.getOutputShape() == inputShape;
 567         }
 568 
 569         @Override
 570         final boolean opIsStateful() {
 571             return true;
 572         }
 573 
 574         @Override
 575         abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 576                                                         Spliterator<P_IN> spliterator,
 577                                                         IntFunction<Double[]> generator);
 578     }
 579 }