1 /* 2 * Copyright (c) 2013, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.DoubleSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.PrimitiveIterator; 31 import java.util.Spliterator; 32 import java.util.Spliterators; 33 import java.util.function.BiConsumer; 34 import java.util.function.BinaryOperator; 35 import java.util.function.DoubleBinaryOperator; 36 import java.util.function.DoubleConsumer; 37 import java.util.function.DoubleFunction; 38 import java.util.function.DoublePredicate; 39 import java.util.function.DoubleToIntFunction; 40 import java.util.function.DoubleToLongFunction; 41 import java.util.function.DoubleUnaryOperator; 42 import java.util.function.IntFunction; 43 import java.util.function.ObjDoubleConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code double}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * 52 * @since 1.8 53 */ 54 abstract class DoublePipeline<E_IN> 55 extends AbstractPipeline<E_IN, Double, DoubleStream> 56 implements DoubleStream { 57 58 /** 59 * Constructor for the head of a stream pipeline. 60 * 61 * @param source {@code Supplier<Spliterator>} describing the stream source 62 * @param sourceFlags the source flags for the stream source, described in 63 * {@link StreamOpFlag} 64 */ 65 DoublePipeline(Supplier<? extends Spliterator<Double>> source, 66 int sourceFlags, boolean parallel) { 67 super(source, sourceFlags, parallel); 68 } 69 70 /** 71 * Constructor for the head of a stream pipeline. 72 * 73 * @param source {@code Spliterator} describing the stream source 74 * @param sourceFlags the source flags for the stream source, described in 75 * {@link StreamOpFlag} 76 */ 77 DoublePipeline(Spliterator<Double> source, 78 int sourceFlags, boolean parallel) { 79 super(source, sourceFlags, parallel); 80 } 81 82 /** 83 * Constructor for appending an intermediate operation onto an existing 84 * pipeline. 85 * 86 * @param upstream the upstream element source. 87 * @param opFlags the operation flags 88 */ 89 DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 90 super(upstream, opFlags); 91 } 92 93 /** 94 * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply 95 * by casting. 96 */ 97 private static DoubleConsumer adapt(Sink<Double> sink) { 98 if (sink instanceof DoubleConsumer) { 99 return (DoubleConsumer) sink; 100 } else { 101 if (Tripwire.ENABLED) 102 Tripwire.trip(AbstractPipeline.class, 103 "using DoubleStream.adapt(Sink<Double> s)"); 104 return sink::accept; 105 } 106 } 107 108 /** 109 * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}. 110 * 111 * @implNote 112 * The implementation attempts to cast to a Spliterator.OfDouble, and throws 113 * an exception if this cast is not possible. 114 */ 115 private static Spliterator.OfDouble adapt(Spliterator<Double> s) { 116 if (s instanceof Spliterator.OfDouble) { 117 return (Spliterator.OfDouble) s; 118 } else { 119 if (Tripwire.ENABLED) 120 Tripwire.trip(AbstractPipeline.class, 121 "using DoubleStream.adapt(Spliterator<Double> s)"); 122 throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)"); 123 } 124 } 125 126 127 // Shape-specific methods 128 129 @Override 130 final StreamShape getOutputShape() { 131 return StreamShape.DOUBLE_VALUE; 132 } 133 134 @Override 135 final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper, 136 Spliterator<P_IN> spliterator, 137 boolean flattenTree, 138 IntFunction<Double[]> generator) { 139 return Nodes.collectDouble(helper, spliterator, flattenTree); 140 } 141 142 @Override 143 final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph, 144 Supplier<Spliterator<P_IN>> supplier, 145 boolean isParallel) { 146 return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel); 147 } 148 149 @Override 150 @SuppressWarnings("unchecked") 151 final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) { 152 return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier); 153 } 154 155 @Override 156 final boolean forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) { 157 Spliterator.OfDouble spl = adapt(spliterator); 158 DoubleConsumer adaptedSink = adapt(sink); 159 boolean cancelled; 160 do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); 161 return cancelled; 162 } 163 164 @Override 165 final Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) { 166 return Nodes.doubleBuilder(exactSizeIfKnown); 167 } 168 169 private <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper, int opFlags) { 170 return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE, opFlags) { 171 @Override 172 Sink<Double> opWrapSink(int flags, Sink<U> sink) { 173 return new Sink.ChainedDouble<U>(sink) { 174 @Override 175 public void accept(double t) { 176 downstream.accept(mapper.apply(t)); 177 } 178 }; 179 } 180 }; 181 } 182 183 // DoubleStream 184 185 @Override 186 public final PrimitiveIterator.OfDouble iterator() { 187 return Spliterators.iterator(spliterator()); 188 } 189 190 @Override 191 public final Spliterator.OfDouble spliterator() { 192 return adapt(super.spliterator()); 193 } 194 195 // Stateless intermediate ops from DoubleStream 196 197 @Override 198 public final Stream<Double> boxed() { 199 return mapToObj(Double::valueOf, 0); 200 } 201 202 @Override 203 public final DoubleStream map(DoubleUnaryOperator mapper) { 204 Objects.requireNonNull(mapper); 205 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 206 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 207 @Override 208 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 209 return new Sink.ChainedDouble<Double>(sink) { 210 @Override 211 public void accept(double t) { 212 downstream.accept(mapper.applyAsDouble(t)); 213 } 214 }; 215 } 216 }; 217 } 218 219 @Override 220 public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { 221 Objects.requireNonNull(mapper); 222 return mapToObj(mapper, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT); 223 } 224 225 @Override 226 public final IntStream mapToInt(DoubleToIntFunction mapper) { 227 Objects.requireNonNull(mapper); 228 return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 229 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 230 @Override 231 Sink<Double> opWrapSink(int flags, Sink<Integer> sink) { 232 return new Sink.ChainedDouble<Integer>(sink) { 233 @Override 234 public void accept(double t) { 235 downstream.accept(mapper.applyAsInt(t)); 236 } 237 }; 238 } 239 }; 240 } 241 242 @Override 243 public final LongStream mapToLong(DoubleToLongFunction mapper) { 244 Objects.requireNonNull(mapper); 245 return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 246 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 247 @Override 248 Sink<Double> opWrapSink(int flags, Sink<Long> sink) { 249 return new Sink.ChainedDouble<Long>(sink) { 250 @Override 251 public void accept(double t) { 252 downstream.accept(mapper.applyAsLong(t)); 253 } 254 }; 255 } 256 }; 257 } 258 259 @Override 260 public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { 261 Objects.requireNonNull(mapper); 262 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 263 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 264 @Override 265 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 266 return new Sink.ChainedDouble<Double>(sink) { 267 // true if cancellationRequested() has been called 268 boolean cancellationRequestedCalled; 269 270 // cache the consumer to avoid creation on every accepted element 271 DoubleConsumer downstreamAsDouble = downstream::accept; 272 273 @Override 274 public void begin(long size) { 275 downstream.begin(-1); 276 } 277 278 @Override 279 public void accept(double t) { 280 try (DoubleStream result = mapper.apply(t)) { 281 if (result != null) { 282 if (!cancellationRequestedCalled) { 283 result.sequential().forEach(downstreamAsDouble); 284 } 285 else { 286 var s = result.sequential().spliterator(); 287 do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble)); 288 } 289 } 290 } 291 } 292 293 @Override 294 public boolean cancellationRequested() { 295 // If this method is called then an operation within the stream 296 // pipeline is short-circuiting (see AbstractPipeline.copyInto). 297 // Note that we cannot differentiate between an upstream or 298 // downstream operation 299 cancellationRequestedCalled = true; 300 return downstream.cancellationRequested(); 301 } 302 }; 303 } 304 }; 305 } 306 307 @Override 308 public DoubleStream unordered() { 309 if (!isOrdered()) 310 return this; 311 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) { 312 @Override 313 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 314 return sink; 315 } 316 }; 317 } 318 319 @Override 320 public final DoubleStream filter(DoublePredicate predicate) { 321 Objects.requireNonNull(predicate); 322 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 323 StreamOpFlag.NOT_SIZED) { 324 @Override 325 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 326 return new Sink.ChainedDouble<Double>(sink) { 327 @Override 328 public void begin(long size) { 329 downstream.begin(-1); 330 } 331 332 @Override 333 public void accept(double t) { 334 if (predicate.test(t)) 335 downstream.accept(t); 336 } 337 }; 338 } 339 }; 340 } 341 342 @Override 343 public final DoubleStream peek(DoubleConsumer action) { 344 Objects.requireNonNull(action); 345 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 346 0) { 347 @Override 348 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 349 return new Sink.ChainedDouble<Double>(sink) { 350 @Override 351 public void accept(double t) { 352 action.accept(t); 353 downstream.accept(t); 354 } 355 }; 356 } 357 }; 358 } 359 360 // Stateful intermediate ops from DoubleStream 361 362 @Override 363 public final DoubleStream limit(long maxSize) { 364 if (maxSize < 0) 365 throw new IllegalArgumentException(Long.toString(maxSize)); 366 return SliceOps.makeDouble(this, (long) 0, maxSize); 367 } 368 369 @Override 370 public final DoubleStream skip(long n) { 371 if (n < 0) 372 throw new IllegalArgumentException(Long.toString(n)); 373 if (n == 0) 374 return this; 375 else { 376 long limit = -1; 377 return SliceOps.makeDouble(this, n, limit); 378 } 379 } 380 381 @Override 382 public final DoubleStream takeWhile(DoublePredicate predicate) { 383 return WhileOps.makeTakeWhileDouble(this, predicate); 384 } 385 386 @Override 387 public final DoubleStream dropWhile(DoublePredicate predicate) { 388 return WhileOps.makeDropWhileDouble(this, predicate); 389 } 390 391 @Override 392 public final DoubleStream sorted() { 393 return SortedOps.makeDouble(this); 394 } 395 396 @Override 397 public final DoubleStream distinct() { 398 // While functional and quick to implement, this approach is not very efficient. 399 // An efficient version requires a double-specific map/set implementation. 400 return boxed().distinct().mapToDouble(i -> (double) i); 401 } 402 403 // Terminal ops from DoubleStream 404 405 @Override 406 public void forEach(DoubleConsumer consumer) { 407 evaluate(ForEachOps.makeDouble(consumer, false)); 408 } 409 410 @Override 411 public void forEachOrdered(DoubleConsumer consumer) { 412 evaluate(ForEachOps.makeDouble(consumer, true)); 413 } 414 415 @Override 416 public final double sum() { 417 /* 418 * In the arrays allocated for the collect operation, index 0 419 * holds the high-order bits of the running sum, index 1 holds 420 * the (negative) low-order bits of the sum computed via 421 * compensated summation, and index 2 holds the simple sum used 422 * to compute the proper result if the stream contains infinite 423 * values of the same sign. 424 */ 425 double[] summation = collect(() -> new double[3], 426 (ll, d) -> { 427 Collectors.sumWithCompensation(ll, d); 428 ll[2] += d; 429 }, 430 (ll, rr) -> { 431 Collectors.sumWithCompensation(ll, rr[0]); 432 Collectors.sumWithCompensation(ll, -rr[1]); 433 ll[2] += rr[2]; 434 }); 435 436 return Collectors.computeFinalSum(summation); 437 } 438 439 @Override 440 public final OptionalDouble min() { 441 return reduce(Math::min); 442 } 443 444 @Override 445 public final OptionalDouble max() { 446 return reduce(Math::max); 447 } 448 449 /** 450 * {@inheritDoc} 451 * 452 * @implNote The {@code double} format can represent all 453 * consecutive integers in the range -2<sup>53</sup> to 454 * 2<sup>53</sup>. If the pipeline has more than 2<sup>53</sup> 455 * values, the divisor in the average computation will saturate at 456 * 2<sup>53</sup>, leading to additional numerical errors. 457 */ 458 @Override 459 public final OptionalDouble average() { 460 /* 461 * In the arrays allocated for the collect operation, index 0 462 * holds the high-order bits of the running sum, index 1 holds 463 * the (negative) low-order bits of the sum computed via 464 * compensated summation, index 2 holds the number of values 465 * seen, index 3 holds the simple sum. 466 */ 467 double[] avg = collect(() -> new double[4], 468 (ll, d) -> { 469 ll[2]++; 470 Collectors.sumWithCompensation(ll, d); 471 ll[3] += d; 472 }, 473 (ll, rr) -> { 474 Collectors.sumWithCompensation(ll, rr[0]); 475 Collectors.sumWithCompensation(ll, -rr[1]); 476 ll[2] += rr[2]; 477 ll[3] += rr[3]; 478 }); 479 return avg[2] > 0 480 ? OptionalDouble.of(Collectors.computeFinalSum(avg) / avg[2]) 481 : OptionalDouble.empty(); 482 } 483 484 @Override 485 public final long count() { 486 return evaluate(ReduceOps.makeDoubleCounting()); 487 } 488 489 @Override 490 public final DoubleSummaryStatistics summaryStatistics() { 491 return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept, 492 DoubleSummaryStatistics::combine); 493 } 494 495 @Override 496 public final double reduce(double identity, DoubleBinaryOperator op) { 497 return evaluate(ReduceOps.makeDouble(identity, op)); 498 } 499 500 @Override 501 public final OptionalDouble reduce(DoubleBinaryOperator op) { 502 return evaluate(ReduceOps.makeDouble(op)); 503 } 504 505 @Override 506 public final <R> R collect(Supplier<R> supplier, 507 ObjDoubleConsumer<R> accumulator, 508 BiConsumer<R, R> combiner) { 509 Objects.requireNonNull(combiner); 510 BinaryOperator<R> operator = (left, right) -> { 511 combiner.accept(left, right); 512 return left; 513 }; 514 return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator)); 515 } 516 517 @Override 518 public final boolean anyMatch(DoublePredicate predicate) { 519 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY)); 520 } 521 522 @Override 523 public final boolean allMatch(DoublePredicate predicate) { 524 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL)); 525 } 526 527 @Override 528 public final boolean noneMatch(DoublePredicate predicate) { 529 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE)); 530 } 531 532 @Override 533 public final OptionalDouble findFirst() { 534 return evaluate(FindOps.makeDouble(true)); 535 } 536 537 @Override 538 public final OptionalDouble findAny() { 539 return evaluate(FindOps.makeDouble(false)); 540 } 541 542 @Override 543 public final double[] toArray() { 544 return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new)) 545 .asPrimitiveArray(); 546 } 547 548 // 549 550 /** 551 * Source stage of a DoubleStream 552 * 553 * @param <E_IN> type of elements in the upstream source 554 */ 555 static class Head<E_IN> extends DoublePipeline<E_IN> { 556 /** 557 * Constructor for the source stage of a DoubleStream. 558 * 559 * @param source {@code Supplier<Spliterator>} describing the stream 560 * source 561 * @param sourceFlags the source flags for the stream source, described 562 * in {@link StreamOpFlag} 563 * @param parallel {@code true} if the pipeline is parallel 564 */ 565 Head(Supplier<? extends Spliterator<Double>> source, 566 int sourceFlags, boolean parallel) { 567 super(source, sourceFlags, parallel); 568 } 569 570 /** 571 * Constructor for the source stage of a DoubleStream. 572 * 573 * @param source {@code Spliterator} describing the stream source 574 * @param sourceFlags the source flags for the stream source, described 575 * in {@link StreamOpFlag} 576 * @param parallel {@code true} if the pipeline is parallel 577 */ 578 Head(Spliterator<Double> source, 579 int sourceFlags, boolean parallel) { 580 super(source, sourceFlags, parallel); 581 } 582 583 @Override 584 final boolean opIsStateful() { 585 throw new UnsupportedOperationException(); 586 } 587 588 @Override 589 final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) { 590 throw new UnsupportedOperationException(); 591 } 592 593 // Optimized sequential terminal operations for the head of the pipeline 594 595 @Override 596 public void forEach(DoubleConsumer consumer) { 597 if (!isParallel()) { 598 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 599 } 600 else { 601 super.forEach(consumer); 602 } 603 } 604 605 @Override 606 public void forEachOrdered(DoubleConsumer consumer) { 607 if (!isParallel()) { 608 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 609 } 610 else { 611 super.forEachOrdered(consumer); 612 } 613 } 614 615 } 616 617 /** 618 * Base class for a stateless intermediate stage of a DoubleStream. 619 * 620 * @param <E_IN> type of elements in the upstream source 621 * @since 1.8 622 */ 623 abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> { 624 /** 625 * Construct a new DoubleStream by appending a stateless intermediate 626 * operation to an existing stream. 627 * 628 * @param upstream the upstream pipeline stage 629 * @param inputShape the stream shape for the upstream pipeline stage 630 * @param opFlags operation flags for the new stage 631 */ 632 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 633 StreamShape inputShape, 634 int opFlags) { 635 super(upstream, opFlags); 636 assert upstream.getOutputShape() == inputShape; 637 } 638 639 @Override 640 final boolean opIsStateful() { 641 return false; 642 } 643 } 644 645 /** 646 * Base class for a stateful intermediate stage of a DoubleStream. 647 * 648 * @param <E_IN> type of elements in the upstream source 649 * @since 1.8 650 */ 651 abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> { 652 /** 653 * Construct a new DoubleStream by appending a stateful intermediate 654 * operation to an existing stream. 655 * 656 * @param upstream the upstream pipeline stage 657 * @param inputShape the stream shape for the upstream pipeline stage 658 * @param opFlags operation flags for the new stage 659 */ 660 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 661 StreamShape inputShape, 662 int opFlags) { 663 super(upstream, opFlags); 664 assert upstream.getOutputShape() == inputShape; 665 } 666 667 @Override 668 final boolean opIsStateful() { 669 return true; 670 } 671 672 @Override 673 abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 674 Spliterator<P_IN> spliterator, 675 IntFunction<Double[]> generator); 676 } 677 }