1 /* 2 * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.DoubleSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.PrimitiveIterator; 31 import java.util.Spliterator; 32 import java.util.Spliterators; 33 import java.util.function.BiConsumer; 34 import java.util.function.BinaryOperator; 35 import java.util.function.DoubleBinaryOperator; 36 import java.util.function.DoubleConsumer; 37 import java.util.function.DoubleFunction; 38 import java.util.function.DoublePredicate; 39 import java.util.function.DoubleToIntFunction; 40 import java.util.function.DoubleToLongFunction; 41 import java.util.function.DoubleUnaryOperator; 42 import java.util.function.IntFunction; 43 import java.util.function.ObjDoubleConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code double}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * 52 * @since 1.8 53 */ 54 abstract class DoublePipeline<E_IN> 55 extends AbstractPipeline<E_IN, Double, DoubleStream> 56 implements DoubleStream { 57 58 /** 59 * Constructor for the head of a stream pipeline. 60 * 61 * @param source {@code Supplier<Spliterator>} describing the stream source 62 * @param sourceFlags the source flags for the stream source, described in 63 * {@link StreamOpFlag} 64 */ 65 DoublePipeline(Supplier<? extends Spliterator<Double>> source, 66 int sourceFlags, boolean parallel) { 67 super(source, sourceFlags, parallel); 68 } 69 70 /** 71 * Constructor for the head of a stream pipeline. 72 * 73 * @param source {@code Spliterator} describing the stream source 74 * @param sourceFlags the source flags for the stream source, described in 75 * {@link StreamOpFlag} 76 */ 77 DoublePipeline(Spliterator<Double> source, 78 int sourceFlags, boolean parallel) { 79 super(source, sourceFlags, parallel); 80 } 81 82 /** 83 * Constructor for appending an intermediate operation onto an existing 84 * pipeline. 85 * 86 * @param upstream the upstream element source. 87 * @param opFlags the operation flags 88 */ 89 DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 90 super(upstream, opFlags); 91 } 92 93 /** 94 * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply 95 * by casting. 96 */ 97 private static DoubleConsumer adapt(Sink<Double> sink) { 98 if (sink instanceof DoubleConsumer) { 99 return (DoubleConsumer) sink; 100 } else { 101 if (Tripwire.ENABLED) 102 Tripwire.trip(AbstractPipeline.class, 103 "using DoubleStream.adapt(Sink<Double> s)"); 104 return sink::accept; 105 } 106 } 107 108 /** 109 * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}. 110 * 111 * @implNote 112 * The implementation attempts to cast to a Spliterator.OfDouble, and throws 113 * an exception if this cast is not possible. 114 */ 115 private static Spliterator.OfDouble adapt(Spliterator<Double> s) { 116 if (s instanceof Spliterator.OfDouble) { 117 return (Spliterator.OfDouble) s; 118 } else { 119 if (Tripwire.ENABLED) 120 Tripwire.trip(AbstractPipeline.class, 121 "using DoubleStream.adapt(Spliterator<Double> s)"); 122 throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)"); 123 } 124 } 125 126 127 // Shape-specific methods 128 129 @Override 130 final StreamShape getOutputShape() { 131 return StreamShape.DOUBLE_VALUE; 132 } 133 134 @Override 135 final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper, 136 Spliterator<P_IN> spliterator, 137 boolean flattenTree, 138 IntFunction<Double[]> generator) { 139 return Nodes.collectDouble(helper, spliterator, flattenTree); 140 } 141 142 @Override 143 final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph, 144 Supplier<Spliterator<P_IN>> supplier, 145 boolean isParallel) { 146 return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel); 147 } 148 149 @Override 150 final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) { 151 return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier); 152 } 153 154 @Override 155 final void forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) { 156 Spliterator.OfDouble spl = adapt(spliterator); 157 DoubleConsumer adaptedSink = adapt(sink); 158 do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); 159 } 160 161 @Override 162 final Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) { 163 return Nodes.doubleBuilder(exactSizeIfKnown); 164 } 165 166 167 // DoubleStream 168 169 @Override 170 public final PrimitiveIterator.OfDouble iterator() { 171 return Spliterators.iterator(spliterator()); 172 } 173 174 @Override 175 public final Spliterator.OfDouble spliterator() { 176 return adapt(super.spliterator()); 177 } 178 179 // Stateless intermediate ops from DoubleStream 180 181 @Override 182 public final Stream<Double> boxed() { 183 return mapToObj(Double::valueOf); 184 } 185 186 @Override 187 public final DoubleStream map(DoubleUnaryOperator mapper) { 188 Objects.requireNonNull(mapper); 189 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 190 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 191 @Override 192 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 193 return new Sink.ChainedDouble(sink) { 194 @Override 195 public void accept(double t) { 196 downstream.accept(mapper.applyAsDouble(t)); 197 } 198 }; 199 } 200 }; 201 } 202 203 @Override 204 public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) { 205 Objects.requireNonNull(mapper); 206 return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE, 207 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 208 @Override 209 Sink<Double> opWrapSink(int flags, Sink<U> sink) { 210 return new Sink.ChainedDouble(sink) { 211 @Override 212 public void accept(double t) { 213 downstream.accept(mapper.apply(t)); 214 } 215 }; 216 } 217 }; 218 } 219 220 @Override 221 public final IntStream mapToInt(DoubleToIntFunction mapper) { 222 Objects.requireNonNull(mapper); 223 return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 224 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 225 @Override 226 Sink<Double> opWrapSink(int flags, Sink<Integer> sink) { 227 return new Sink.ChainedDouble(sink) { 228 @Override 229 public void accept(double t) { 230 downstream.accept(mapper.applyAsInt(t)); 231 } 232 }; 233 } 234 }; 235 } 236 237 @Override 238 public final LongStream mapToLong(DoubleToLongFunction mapper) { 239 Objects.requireNonNull(mapper); 240 return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 241 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 242 @Override 243 Sink<Double> opWrapSink(int flags, Sink<Long> sink) { 244 return new Sink.ChainedDouble(sink) { 245 @Override 246 public void accept(double t) { 247 downstream.accept(mapper.applyAsLong(t)); 248 } 249 }; 250 } 251 }; 252 } 253 254 @Override 255 public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) { 256 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 257 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 258 @Override 259 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 260 return new Sink.ChainedDouble(sink) { 261 @Override 262 public void begin(long size) { 263 downstream.begin(-1); 264 } 265 266 @Override 267 public void accept(double t) { 268 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 269 DoubleStream result = mapper.apply(t); 270 if (result != null) 271 result.sequential().forEach(i -> downstream.accept(i)); 272 } 273 }; 274 } 275 }; 276 } 277 278 @Override 279 public DoubleStream unordered() { 280 if (!isOrdered()) 281 return this; 282 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) { 283 @Override 284 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 285 return sink; 286 } 287 }; 288 } 289 290 @Override 291 public final DoubleStream filter(DoublePredicate predicate) { 292 Objects.requireNonNull(predicate); 293 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 294 StreamOpFlag.NOT_SIZED) { 295 @Override 296 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 297 return new Sink.ChainedDouble(sink) { 298 @Override 299 public void begin(long size) { 300 downstream.begin(-1); 301 } 302 303 @Override 304 public void accept(double t) { 305 if (predicate.test(t)) 306 downstream.accept(t); 307 } 308 }; 309 } 310 }; 311 } 312 313 @Override 314 public final DoubleStream peek(DoubleConsumer consumer) { 315 Objects.requireNonNull(consumer); 316 return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, 317 0) { 318 @Override 319 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 320 return new Sink.ChainedDouble(sink) { 321 @Override 322 public void accept(double t) { 323 consumer.accept(t); 324 downstream.accept(t); 325 } 326 }; 327 } 328 }; 329 } 330 331 // Stateful intermediate ops from DoubleStream 332 333 @Override 334 public final DoubleStream limit(long maxSize) { 335 if (maxSize < 0) 336 throw new IllegalArgumentException(Long.toString(maxSize)); 337 return SliceOps.makeDouble(this, (long) 0, maxSize); 338 } 339 340 @Override 341 public final DoubleStream substream(long startingOffset) { 342 if (startingOffset < 0) 343 throw new IllegalArgumentException(Long.toString(startingOffset)); 344 if (startingOffset == 0) 345 return this; 346 else { 347 long limit = -1; 348 return SliceOps.makeDouble(this, startingOffset, limit); 349 } 350 } 351 352 @Override 353 public final DoubleStream substream(long startingOffset, long endingOffset) { 354 if (startingOffset < 0 || endingOffset < startingOffset) 355 throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset)); 356 return SliceOps.makeDouble(this, startingOffset, endingOffset - startingOffset); 357 } 358 359 @Override 360 public final DoubleStream sorted() { 361 return SortedOps.makeDouble(this); 362 } 363 364 @Override 365 public final DoubleStream distinct() { 366 // While functional and quick to implement, this approach is not very efficient. 367 // An efficient version requires a double-specific map/set implementation. 368 return boxed().distinct().mapToDouble(i -> (double) i); 369 } 370 371 // Terminal ops from DoubleStream 372 373 @Override 374 public void forEach(DoubleConsumer consumer) { 375 evaluate(ForEachOps.makeDouble(consumer, false)); 376 } 377 378 @Override 379 public void forEachOrdered(DoubleConsumer consumer) { 380 evaluate(ForEachOps.makeDouble(consumer, true)); 381 } 382 383 @Override 384 public final double sum() { 385 // TODO: better algorithm to compensate for errors 386 return reduce(0.0, Double::sum); 387 } 388 389 @Override 390 public final OptionalDouble min() { 391 return reduce(Math::min); 392 } 393 394 @Override 395 public final OptionalDouble max() { 396 return reduce(Math::max); 397 } 398 399 @Override 400 public final OptionalDouble average() { 401 double[] avg = collect(() -> new double[2], 402 (ll, i) -> { 403 ll[0]++; 404 ll[1] += i; 405 }, 406 (ll, rr) -> { 407 ll[0] += rr[0]; 408 ll[1] += rr[1]; 409 }); 410 return avg[0] > 0 411 ? OptionalDouble.of(avg[1] / avg[0]) 412 : OptionalDouble.empty(); 413 } 414 415 @Override 416 public final long count() { 417 return mapToObj(e -> null).mapToInt(e -> 1).sum(); 418 } 419 420 @Override 421 public final DoubleSummaryStatistics summaryStatistics() { 422 return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept, 423 DoubleSummaryStatistics::combine); 424 } 425 426 @Override 427 public final double reduce(double identity, DoubleBinaryOperator op) { 428 return evaluate(ReduceOps.makeDouble(identity, op)); 429 } 430 431 @Override 432 public final OptionalDouble reduce(DoubleBinaryOperator op) { 433 return evaluate(ReduceOps.makeDouble(op)); 434 } 435 436 @Override 437 public final <R> R collect(Supplier<R> resultFactory, 438 ObjDoubleConsumer<R> accumulator, 439 BiConsumer<R, R> combiner) { 440 BinaryOperator<R> operator = (left, right) -> { 441 combiner.accept(left, right); 442 return left; 443 }; 444 return evaluate(ReduceOps.makeDouble(resultFactory, accumulator, operator)); 445 } 446 447 @Override 448 public final boolean anyMatch(DoublePredicate predicate) { 449 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY)); 450 } 451 452 @Override 453 public final boolean allMatch(DoublePredicate predicate) { 454 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL)); 455 } 456 457 @Override 458 public final boolean noneMatch(DoublePredicate predicate) { 459 return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE)); 460 } 461 462 @Override 463 public final OptionalDouble findFirst() { 464 return evaluate(FindOps.makeDouble(true)); 465 } 466 467 @Override 468 public final OptionalDouble findAny() { 469 return evaluate(FindOps.makeDouble(false)); 470 } 471 472 @Override 473 public final double[] toArray() { 474 return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new)) 475 .asPrimitiveArray(); 476 } 477 478 // 479 480 /** 481 * Source stage of a DoubleStream 482 * 483 * @param <E_IN> type of elements in the upstream source 484 */ 485 static class Head<E_IN> extends DoublePipeline<E_IN> { 486 /** 487 * Constructor for the source stage of a DoubleStream. 488 * 489 * @param source {@code Supplier<Spliterator>} describing the stream 490 * source 491 * @param sourceFlags the source flags for the stream source, described 492 * in {@link StreamOpFlag} 493 * @param parallel {@code true} if the pipeline is parallel 494 */ 495 Head(Supplier<? extends Spliterator<Double>> source, 496 int sourceFlags, boolean parallel) { 497 super(source, sourceFlags, parallel); 498 } 499 500 /** 501 * Constructor for the source stage of a DoubleStream. 502 * 503 * @param source {@code Spliterator} describing the stream source 504 * @param sourceFlags the source flags for the stream source, described 505 * in {@link StreamOpFlag} 506 * @param parallel {@code true} if the pipeline is parallel 507 */ 508 Head(Spliterator<Double> source, 509 int sourceFlags, boolean parallel) { 510 super(source, sourceFlags, parallel); 511 } 512 513 @Override 514 final boolean opIsStateful() { 515 throw new UnsupportedOperationException(); 516 } 517 518 @Override 519 final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) { 520 throw new UnsupportedOperationException(); 521 } 522 523 // Optimized sequential terminal operations for the head of the pipeline 524 525 @Override 526 public void forEach(DoubleConsumer consumer) { 527 if (!isParallel()) { 528 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 529 } 530 else { 531 super.forEach(consumer); 532 } 533 } 534 535 @Override 536 public void forEachOrdered(DoubleConsumer consumer) { 537 if (!isParallel()) { 538 adapt(sourceStageSpliterator()).forEachRemaining(consumer); 539 } 540 else { 541 super.forEachOrdered(consumer); 542 } 543 } 544 545 } 546 547 /** 548 * Base class for a stateless intermediate stage of a DoubleStream. 549 * 550 * @param <E_IN> type of elements in the upstream source 551 * @since 1.8 552 */ 553 abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> { 554 /** 555 * Construct a new DoubleStream by appending a stateless intermediate 556 * operation to an existing stream. 557 * 558 * @param upstream the upstream pipeline stage 559 * @param inputShape the stream shape for the upstream pipeline stage 560 * @param opFlags operation flags for the new stage 561 */ 562 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 563 StreamShape inputShape, 564 int opFlags) { 565 super(upstream, opFlags); 566 assert upstream.getOutputShape() == inputShape; 567 } 568 569 @Override 570 final boolean opIsStateful() { 571 return false; 572 } 573 } 574 575 /** 576 * Base class for a stateful intermediate stage of a DoubleStream. 577 * 578 * @param <E_IN> type of elements in the upstream source 579 * @since 1.8 580 */ 581 abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> { 582 /** 583 * Construct a new DoubleStream by appending a stateful intermediate 584 * operation to an existing stream. 585 * 586 * @param upstream the upstream pipeline stage 587 * @param inputShape the stream shape for the upstream pipeline stage 588 * @param opFlags operation flags for the new stage 589 */ 590 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 591 StreamShape inputShape, 592 int opFlags) { 593 super(upstream, opFlags); 594 assert upstream.getOutputShape() == inputShape; 595 } 596 597 @Override 598 final boolean opIsStateful() { 599 return true; 600 } 601 602 @Override 603 abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 604 Spliterator<P_IN> spliterator, 605 IntFunction<Double[]> generator); 606 } 607 }