1 /* 2 * Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.DoubleSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.PrimitiveIterator; 31 import java.util.Spliterator; 32 import java.util.Spliterators; 33 import java.util.function.BiConsumer; 34 import java.util.function.BinaryOperator; 35 import java.util.function.DoubleBinaryOperator; 36 import java.util.function.DoubleConsumer; 37 import java.util.function.DoubleFunction; 38 import java.util.function.DoublePredicate; 39 import java.util.function.DoubleToIntFunction; 40 import java.util.function.DoubleToLongFunction; 41 import java.util.function.DoubleUnaryOperator; 42 import java.util.function.IntFunction; 43 import java.util.function.LongPredicate; 44 import java.util.function.ObjDoubleConsumer; 45 import java.util.function.Supplier; 46 47 /** 48 * Abstract base class for an intermediate pipeline stage or pipeline source 49 * stage implementing whose elements are of type {@code double}. 50 * 51 * @param <E_IN> type of elements in the upstream source 52 * 53 * @since 1.8 54 */ 55 abstract class DoublePipeline<E_IN> 56 extends AbstractPipeline<E_IN, Double, DoubleStream> 57 implements DoubleStream { 58 59 /** 60 * Constructor for the head of a stream pipeline. 61 * 62 * @param source {@code Supplier<Spliterator>} describing the stream source 63 * @param sourceFlags the source flags for the stream source, described in 64 * {@link StreamOpFlag} 65 */ 66 DoublePipeline(Supplier<? extends Spliterator<Double>> source, 67 int sourceFlags, boolean parallel) { 68 super(source, sourceFlags, parallel); 69 } 70 71 /** 72 * Constructor for the head of a stream pipeline. 73 * 74 * @param source {@code Spliterator} describing the stream source 75 * @param sourceFlags the source flags for the stream source, described in 76 * {@link StreamOpFlag} 77 */ 78 DoublePipeline(Spliterator<Double> source, 79 int sourceFlags, boolean parallel) { 80 super(source, sourceFlags, parallel); 81 } 82 83 /** 84 * Constructor for appending an intermediate operation onto an existing 85 * pipeline. 86 * 87 * @param upstream the upstream element source. 88 * @param opFlags the operation flags 89 */ 90 DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 91 super(upstream, opFlags); 92 } 93 94 /** 95 * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply 96 * by casting. 97 */ 98 private static DoubleConsumer adapt(Sink<Double> sink) { 99 if (sink instanceof DoubleConsumer) { 100 return (DoubleConsumer) sink; 101 } else { 102 if (Tripwire.ENABLED) 103 Tripwire.trip(AbstractPipeline.class, 104 "using DoubleStream.adapt(Sink<Double> s)"); 105 return sink::accept; 106 } 107 } 108 109 /** 110 * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}. 111 * 112 * @implNote 113 * The implementation attempts to cast to a Spliterator.OfDouble, and throws 114 * an exception if this cast is not possible. 115 */ 116 private static Spliterator.OfDouble adapt(Spliterator<Double> s) { 117 if (s instanceof Spliterator.OfDouble) { 118 return (Spliterator.OfDouble) s; 119 } else { 120 if (Tripwire.ENABLED) 121 Tripwire.trip(AbstractPipeline.class, 122 "using DoubleStream.adapt(Spliterator<Double> s)"); 123 throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)"); 124 } 125 } 126 127 128 // Shape-specific methods 129 130 @Override 131 final StreamShape getOutputShape() { 132 return StreamShape.DOUBLE_VALUE; 133 } 134 135 @Override 136 final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper, 137 Spliterator<P_IN> spliterator, 138 boolean flattenTree, 139 IntFunction<Double[]> generator) { 140 return Nodes.collectDouble(helper, spliterator, flattenTree); 141 } 142 143 @Override 144 final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph, 145 Supplier<Spliterator<P_IN>> supplier, 146 boolean isParallel) { 147 return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel); 148 } 149 150 @Override 151 @SuppressWarnings("unchecked") 152 final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) { 153 return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier); 154 } 155 156 @Override 157 final boolean forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) { 158 Spliterator.OfDouble spl = adapt(spliterator); 159 DoubleConsumer adaptedSink = adapt(sink); 160 boolean cancelled; 161 do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); 162 return cancelled; 163 } 164 165 @Override 166 final Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) { 167 return Nodes.doubleBuilder(exactSizeIfKnown); 168 } 169 170 171 // DoubleStream 172 173 @Override 174 public final PrimitiveIterator.OfDouble iterator() { 175 return Spliterators.iterator(spliterator()); 176 } 177 178 @Override 179 public final Spliterator.OfDouble spliterator() { 180 return adapt(super.spliterator()); 181 } 182 183 // Stateless intermediate ops from DoubleStream 184 185 @Override 186 public final Stream<Double> boxed() { 187 return mapToObj(Double::valueOf); 188 } 189 190 @Override 191 public final DoubleStream map(DoubleUnaryOperator mapper) { 192 Objects.requireNonNull(mapper); 193 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 194 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 195 @Override 196 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 197 return new Sink.ChainedDouble<Double>(sink) { 198 @Override 199 public void accept(double t) { 200 downstream.accept(mapper.applyAsDouble(t)); 201 } 202 }; 203 } 204 }; 205 } 206 207 @Override 208 public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { 209 Objects.requireNonNull(mapper); 210 return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE, 211 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 212 @Override 213 Sink<Double> opWrapSink(int flags, Sink<U> sink) { 214 return new Sink.ChainedDouble<U>(sink) { 215 @Override 216 public void accept(double t) { 217 downstream.accept(mapper.apply(t)); 218 } 219 }; 220 } 221 }; 222 } 223 224 @Override 225 public final IntStream mapToInt(DoubleToIntFunction mapper) { 226 Objects.requireNonNull(mapper); 227 return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 228 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 229 @Override 230 Sink<Double> opWrapSink(int flags, Sink<Integer> sink) { 231 return new Sink.ChainedDouble<Integer>(sink) { 232 @Override 233 public void accept(double t) { 234 downstream.accept(mapper.applyAsInt(t)); 235 } 236 }; 237 } 238 }; 239 } 240 241 @Override 242 public final LongStream mapToLong(DoubleToLongFunction mapper) { 243 Objects.requireNonNull(mapper); 244 return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 245 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 246 @Override 247 Sink<Double> opWrapSink(int flags, Sink<Long> sink) { 248 return new Sink.ChainedDouble<Long>(sink) { 249 @Override 250 public void accept(double t) { 251 downstream.accept(mapper.applyAsLong(t)); 252 } 253 }; 254 } 255 }; 256 } 257 258 @Override 259 public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { 260 Objects.requireNonNull(mapper); 261 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 262 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 263 @Override 264 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 265 return new Sink.ChainedDouble<Double>(sink) { 266 @Override 267 public void begin(long size) { 268 downstream.begin(-1); 269 } 270 271 @Override 272 public void accept(double t) { 273 try (DoubleStream result = mapper.apply(t)) { 274 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 275 if (result != null) 276 result.sequential().forEach(i -> downstream.accept(i)); 277 } 278 } 279 }; 280 } 281 }; 282 } 283 284 @Override 285 public DoubleStream unordered() { 286 if (!isOrdered()) 287 return this; 288 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) { 289 @Override 290 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 291 return sink; 292 } 293 }; 294 } 295 296 @Override 297 public final DoubleStream filter(DoublePredicate predicate) { 298 Objects.requireNonNull(predicate); 299 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 300 StreamOpFlag.NOT_SIZED) { 301 @Override 302 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 303 return new Sink.ChainedDouble<Double>(sink) { 304 @Override 305 public void begin(long size) { 306 downstream.begin(-1); 307 } 308 309 @Override 310 public void accept(double t) { 311 if (predicate.test(t)) 312 downstream.accept(t); 313 } 314 }; 315 } 316 }; 317 } 318 319 @Override 320 public final DoubleStream peek(DoubleConsumer action) { 321 Objects.requireNonNull(action); 322 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 323 0) { 324 @Override 325 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 326 return new Sink.ChainedDouble<Double>(sink) { 327 @Override 328 public void accept(double t) { 329 action.accept(t); 330 downstream.accept(t); 331 } 332 }; 333 } 334 }; 335 } 336 337 // Stateful intermediate ops from DoubleStream 338 339 @Override 340 public final DoubleStream limit(long maxSize) { 341 if (maxSize < 0) 342 throw new IllegalArgumentException(Long.toString(maxSize)); 343 return SliceOps.makeDouble(this, (long) 0, maxSize); 344 } 345 346 @Override 347 public final DoubleStream skip(long n) { 348 if (n < 0) 349 throw new IllegalArgumentException(Long.toString(n)); 350 if (n == 0) 351 return this; 352 else { 353 long limit = -1; 354 return SliceOps.makeDouble(this, n, limit); 355 } 356 } 357 358 @Override 359 public final DoubleStream takeWhile(DoublePredicate predicate) { 360 return WhileOps.makeTakeWhileDouble(this, predicate); 361 } 362 363 @Override 364 public final DoubleStream dropWhile(DoublePredicate predicate) { 365 return WhileOps.makeDropWhileDouble(this, predicate); 366 } 367 368 @Override 369 public final DoubleStream sorted() { 370 return SortedOps.makeDouble(this); 371 } 372 373 @Override 374 public final DoubleStream distinct() { 375 // While functional and quick to implement, this approach is not very efficient. 376 // An efficient version requires a double-specific map/set implementation. 377 return boxed().distinct().mapToDouble(i -> (double) i); 378 } 379 380 // Terminal ops from DoubleStream 381 382 @Override 383 public void forEach(DoubleConsumer consumer) { 384 evaluate(ForEachOps.makeDouble(consumer, false)); 385 } 386 387 @Override 388 public void forEachOrdered(DoubleConsumer consumer) { 389 evaluate(ForEachOps.makeDouble(consumer, true)); 390 } 391 392 @Override 393 public final double sum() { 394 /* 395 * In the arrays allocated for the collect operation, index 0 396 * holds the high-order bits of the running sum, index 1 holds 397 * the low-order bits of the sum computed via compensated 398 * summation, and index 2 holds the simple sum used to compute 399 * the proper result if the stream contains infinite values of 400 * the same sign. 401 */ 402 double[] summation = collect(() -> new double[3], 403 (ll, d) -> { 404 Collectors.sumWithCompensation(ll, d); 405 ll[2] += d; 406 }, 407 (ll, rr) -> { 408 Collectors.sumWithCompensation(ll, rr[0]); 409 Collectors.sumWithCompensation(ll, rr[1]); 410 ll[2] += rr[2]; 411 }); 412 413 return Collectors.computeFinalSum(summation); 414 } 415 416 @Override 417 public final OptionalDouble min() { 418 return reduce(Math::min); 419 } 420 421 @Override 422 public final OptionalDouble max() { 423 return reduce(Math::max); 424 } 425 426 /** 427 * {@inheritDoc} 428 * 429 * @implNote The {@code double} format can represent all 430 * consecutive integers in the range -2<sup>53</sup> to 431 * 2<sup>53</sup>. If the pipeline has more than 2<sup>53</sup> 432 * values, the divisor in the average computation will saturate at 433 * 2<sup>53</sup>, leading to additional numerical errors. 434 */ 435 @Override 436 public final OptionalDouble average() { 437 /* 438 * In the arrays allocated for the collect operation, index 0 439 * holds the high-order bits of the running sum, index 1 holds 440 * the low-order bits of the sum computed via compensated 441 * summation, index 2 holds the number of values seen, index 3 442 * holds the simple sum. 443 */ 444 double[] avg = collect(() -> new double[4], 445 (ll, d) -> { 446 ll[2]++; 447 Collectors.sumWithCompensation(ll, d); 448 ll[3] += d; 449 }, 450 (ll, rr) -> { 451 Collectors.sumWithCompensation(ll, rr[0]); 452 Collectors.sumWithCompensation(ll, rr[1]); 453 ll[2] += rr[2]; 454 ll[3] += rr[3]; 455 }); 456 return avg[2] > 0 457 ? OptionalDouble.of(Collectors.computeFinalSum(avg) / avg[2]) 458 : OptionalDouble.empty(); 459 } 460 461 @Override 462 public final long count() { 463 return evaluate(ReduceOps.makeDoubleCounting()); 464 } 465 466 @Override 467 public final DoubleSummaryStatistics summaryStatistics() { 468 return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept, 469 DoubleSummaryStatistics::combine); 470 } 471 472 @Override 473 public final double reduce(double identity, DoubleBinaryOperator op) { 474 return evaluate(ReduceOps.makeDouble(identity, op)); 475 } 476 477 @Override 478 public final OptionalDouble reduce(DoubleBinaryOperator op) { 479 return evaluate(ReduceOps.makeDouble(op)); 480 } 481 482 @Override 483 public final <R> R collect(Supplier<R> supplier, 484 ObjDoubleConsumer<R> accumulator, 485 BiConsumer<R, R> combiner) { 486 Objects.requireNonNull(combiner); 487 BinaryOperator<R> operator = (left, right) -> { 488 combiner.accept(left, right); 489 return left; 490 }; 491 return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator)); 492 } 493 494 @Override 495 public final boolean anyMatch(DoublePredicate predicate) { 496 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY)); 497 } 498 499 @Override 500 public final boolean allMatch(DoublePredicate predicate) { 501 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL)); 502 } 503 504 @Override 505 public final boolean noneMatch(DoublePredicate predicate) { 506 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE)); 507 } 508 509 @Override 510 public final OptionalDouble findFirst() { 511 return evaluate(FindOps.makeDouble(true)); 512 } 513 514 @Override 515 public final OptionalDouble findAny() { 516 return evaluate(FindOps.makeDouble(false)); 517 } 518 519 @Override 520 public final double[] toArray() { 521 return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new)) 522 .asPrimitiveArray(); 523 } 524 525 // 526 527 /** 528 * Source stage of a DoubleStream 529 * 530 * @param <E_IN> type of elements in the upstream source 531 */ 532 static class Head<E_IN> extends DoublePipeline<E_IN> { 533 /** 534 * Constructor for the source stage of a DoubleStream. 535 * 536 * @param source {@code Supplier<Spliterator>} describing the stream 537 * source 538 * @param sourceFlags the source flags for the stream source, described 539 * in {@link StreamOpFlag} 540 * @param parallel {@code true} if the pipeline is parallel 541 */ 542 Head(Supplier<? extends Spliterator<Double>> source, 543 int sourceFlags, boolean parallel) { 544 super(source, sourceFlags, parallel); 545 } 546 547 /** 548 * Constructor for the source stage of a DoubleStream. 549 * 550 * @param source {@code Spliterator} describing the stream source 551 * @param sourceFlags the source flags for the stream source, described 552 * in {@link StreamOpFlag} 553 * @param parallel {@code true} if the pipeline is parallel 554 */ 555 Head(Spliterator<Double> source, 556 int sourceFlags, boolean parallel) { 557 super(source, sourceFlags, parallel); 558 } 559 560 @Override 561 final boolean opIsStateful() { 562 throw new UnsupportedOperationException(); 563 } 564 565 @Override 566 final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) { 567 throw new UnsupportedOperationException(); 568 } 569 570 // Optimized sequential terminal operations for the head of the pipeline 571 572 @Override 573 public void forEach(DoubleConsumer consumer) { 574 if (!isParallel()) { 575 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 576 } 577 else { 578 super.forEach(consumer); 579 } 580 } 581 582 @Override 583 public void forEachOrdered(DoubleConsumer consumer) { 584 if (!isParallel()) { 585 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 586 } 587 else { 588 super.forEachOrdered(consumer); 589 } 590 } 591 592 } 593 594 /** 595 * Base class for a stateless intermediate stage of a DoubleStream. 596 * 597 * @param <E_IN> type of elements in the upstream source 598 * @since 1.8 599 */ 600 abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> { 601 /** 602 * Construct a new DoubleStream by appending a stateless intermediate 603 * operation to an existing stream. 604 * 605 * @param upstream the upstream pipeline stage 606 * @param inputShape the stream shape for the upstream pipeline stage 607 * @param opFlags operation flags for the new stage 608 */ 609 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 610 StreamShape inputShape, 611 int opFlags) { 612 super(upstream, opFlags); 613 assert upstream.getOutputShape() == inputShape; 614 } 615 616 @Override 617 final boolean opIsStateful() { 618 return false; 619 } 620 } 621 622 /** 623 * Base class for a stateful intermediate stage of a DoubleStream. 624 * 625 * @param <E_IN> type of elements in the upstream source 626 * @since 1.8 627 */ 628 abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> { 629 /** 630 * Construct a new DoubleStream by appending a stateful intermediate 631 * operation to an existing stream. 632 * 633 * @param upstream the upstream pipeline stage 634 * @param inputShape the stream shape for the upstream pipeline stage 635 * @param opFlags operation flags for the new stage 636 */ 637 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 638 StreamShape inputShape, 639 int opFlags) { 640 super(upstream, opFlags); 641 assert upstream.getOutputShape() == inputShape; 642 } 643 644 @Override 645 final boolean opIsStateful() { 646 return true; 647 } 648 649 @Override 650 abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 651 Spliterator<P_IN> spliterator, 652 IntFunction<Double[]> generator); 653 } 654 }