1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Comparator; 28 import java.util.Comparators; 29 import java.util.Iterator; 30 import java.util.Objects; 31 import java.util.Optional; 32 import java.util.Spliterator; 33 import java.util.Spliterators; 34 import java.util.function.BiConsumer; 35 import java.util.function.BiFunction; 36 import java.util.function.BinaryOperator; 37 import java.util.function.Consumer; 38 import java.util.function.DoubleConsumer; 39 import java.util.function.Function; 40 import java.util.function.IntConsumer; 41 import java.util.function.IntFunction; 42 import java.util.function.LongConsumer; 43 import java.util.function.Predicate; 44 import java.util.function.Supplier; 45 import java.util.function.ToDoubleFunction; 46 import java.util.function.ToIntFunction; 47 import java.util.function.ToLongFunction; 48 49 /** 50 * Abstract base class for an intermediate pipeline stage or pipeline source 51 * stage implementing whose elements are of type {@code U}. 52 * 53 * @param <P_IN> type of elements in the upstream source 54 * @param <P_OUT> type of elements in produced by this stage 55 * 56 * @since 1.8 57 */ 58 abstract class ReferencePipeline<P_IN, P_OUT> 59 extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>> 60 implements Stream<P_OUT> { 61 62 /** 63 * Constructor for the head of a stream pipeline. 64 * 65 * @param source {@code Supplier<Spliterator>} describing the stream source 66 * @param sourceFlags the source flags for the stream source, described in 67 * {@link StreamOpFlag} 68 * @param parallel {@code true} if the pipeline is parallel 69 */ 70 ReferencePipeline(Supplier<? extends Spliterator<?>> source, 71 int sourceFlags, boolean parallel) { 72 super(source, sourceFlags, parallel); 73 } 74 75 /** 76 * Constructor for the head of a stream pipeline. 77 * 78 * @param source {@code Spliterator} describing the stream source 79 * @param sourceFlags The source flags for the stream source, described in 80 * {@link StreamOpFlag} 81 * @param parallel {@code true} if the pipeline is parallel 82 */ 83 ReferencePipeline(Spliterator<?> source, 84 int sourceFlags, boolean parallel) { 85 super(source, sourceFlags, parallel); 86 } 87 88 /** 89 * Constructor for appending an intermediate operation onto an existing 90 * pipeline. 91 * 92 * @param upstream the upstream element source. 93 */ 94 ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) { 95 super(upstream, opFlags); 96 } 97 98 // Shape-specific methods 99 100 @Override 101 final StreamShape getOutputShape() { 102 return StreamShape.REFERENCE; 103 } 104 105 @Override 106 final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper, 107 Spliterator<P_IN> spliterator, 108 boolean flattenTree, 109 IntFunction<P_OUT[]> generator) { 110 return Nodes.collect(helper, spliterator, flattenTree, generator); 111 } 112 113 @Override 114 final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph, 115 Supplier<Spliterator<P_IN>> supplier, 116 boolean isParallel) { 117 return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel); 118 } 119 120 @Override 121 final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) { 122 return new StreamSpliterators.DelegatingSpliterator<>(supplier); 123 } 124 125 @Override 126 final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { 127 do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)); 128 } 129 130 @Override 131 final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) { 132 return Nodes.builder(exactSizeIfKnown, generator); 133 } 134 135 136 // BaseStream 137 138 @Override 139 public final Iterator<P_OUT> iterator() { 140 return Spliterators.iterator(spliterator()); 141 } 142 143 144 // Stream 145 146 // Stateless intermediate operations from Stream 147 148 @Override 149 public Stream<P_OUT> unordered() { 150 if (!isOrdered()) 151 return this; 152 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) { 153 @Override 154 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 155 return sink; 156 } 157 }; 158 } 159 160 @Override 161 public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { 162 Objects.requireNonNull(predicate); 163 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 164 StreamOpFlag.NOT_SIZED) { 165 @Override 166 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 167 return new Sink.ChainedReference<P_OUT>(sink) { 168 @Override 169 public void begin(long size) { 170 downstream.begin(-1); 171 } 172 173 @Override 174 public void accept(P_OUT u) { 175 if (predicate.test(u)) 176 downstream.accept(u); 177 } 178 }; 179 } 180 }; 181 } 182 183 @Override 184 public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { 185 Objects.requireNonNull(mapper); 186 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, 187 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 188 @Override 189 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { 190 return new Sink.ChainedReference<P_OUT>(sink) { 191 @Override 192 public void accept(P_OUT u) { 193 downstream.accept(mapper.apply(u)); 194 } 195 }; 196 } 197 }; 198 } 199 200 @Override 201 public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) { 202 Objects.requireNonNull(mapper); 203 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 204 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 205 @Override 206 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { 207 return new Sink.ChainedReference<P_OUT>(sink) { 208 @Override 209 public void accept(P_OUT u) { 210 downstream.accept(mapper.applyAsInt(u)); 211 } 212 }; 213 } 214 }; 215 } 216 217 @Override 218 public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) { 219 Objects.requireNonNull(mapper); 220 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 221 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 222 @Override 223 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { 224 return new Sink.ChainedReference<P_OUT>(sink) { 225 @Override 226 public void accept(P_OUT u) { 227 downstream.accept(mapper.applyAsLong(u)); 228 } 229 }; 230 } 231 }; 232 } 233 234 @Override 235 public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) { 236 Objects.requireNonNull(mapper); 237 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 238 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 239 @Override 240 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { 241 return new Sink.ChainedReference<P_OUT>(sink) { 242 @Override 243 public void accept(P_OUT u) { 244 downstream.accept(mapper.applyAsDouble(u)); 245 } 246 }; 247 } 248 }; 249 } 250 251 @Override 252 public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { 253 Objects.requireNonNull(mapper); 254 // We can do better than this, by polling cancellationRequested when stream is infinite 255 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, 256 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 257 @Override 258 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { 259 return new Sink.ChainedReference<P_OUT>(sink) { 260 @Override 261 public void begin(long size) { 262 downstream.begin(-1); 263 } 264 265 @Override 266 public void accept(P_OUT u) { 267 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 268 Stream<? extends R> result = mapper.apply(u); 269 if (result != null) 270 result.sequential().forEach(downstream); 271 } 272 }; 273 } 274 }; 275 } 276 277 @Override 278 public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) { 279 Objects.requireNonNull(mapper); 280 // We can do better than this, by polling cancellationRequested when stream is infinite 281 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 282 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 283 @Override 284 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { 285 return new Sink.ChainedReference<P_OUT>(sink) { 286 IntConsumer downstreamAsInt = downstream::accept; 287 @Override 288 public void begin(long size) { 289 downstream.begin(-1); 290 } 291 292 @Override 293 public void accept(P_OUT u) { 294 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 295 IntStream result = mapper.apply(u); 296 if (result != null) 297 result.sequential().forEach(downstreamAsInt); 298 } 299 }; 300 } 301 }; 302 } 303 304 @Override 305 public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) { 306 Objects.requireNonNull(mapper); 307 // We can do better than this, by polling cancellationRequested when stream is infinite 308 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 309 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 310 @Override 311 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { 312 return new Sink.ChainedReference<P_OUT>(sink) { 313 DoubleConsumer downstreamAsDouble = downstream::accept; 314 @Override 315 public void begin(long size) { 316 downstream.begin(-1); 317 } 318 319 @Override 320 public void accept(P_OUT u) { 321 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 322 DoubleStream result = mapper.apply(u); 323 if (result != null) 324 result.sequential().forEach(downstreamAsDouble); 325 } 326 }; 327 } 328 }; 329 } 330 331 @Override 332 public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) { 333 Objects.requireNonNull(mapper); 334 // We can do better than this, by polling cancellationRequested when stream is infinite 335 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 336 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 337 @Override 338 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { 339 return new Sink.ChainedReference<P_OUT>(sink) { 340 LongConsumer downstreamAsLong = downstream::accept; 341 @Override 342 public void begin(long size) { 343 downstream.begin(-1); 344 } 345 346 @Override 347 public void accept(P_OUT u) { 348 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 349 LongStream result = mapper.apply(u); 350 if (result != null) 351 result.sequential().forEach(downstreamAsLong); 352 } 353 }; 354 } 355 }; 356 } 357 358 @Override 359 public final Stream<P_OUT> peek(Consumer<? super P_OUT> tee) { 360 Objects.requireNonNull(tee); 361 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 362 0) { 363 @Override 364 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 365 return new Sink.ChainedReference<P_OUT>(sink) { 366 @Override 367 public void accept(P_OUT u) { 368 tee.accept(u); 369 downstream.accept(u); 370 } 371 }; 372 } 373 }; 374 } 375 376 // Stateful intermediate operations from Stream 377 378 @Override 379 public final Stream<P_OUT> distinct() { 380 return DistinctOps.makeRef(this); 381 } 382 383 @Override 384 public final Stream<P_OUT> sorted() { 385 return SortedOps.makeRef(this); 386 } 387 388 @Override 389 public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) { 390 return SortedOps.makeRef(this, comparator); 391 } 392 393 private Stream<P_OUT> slice(long skip, long limit) { 394 return SliceOps.makeRef(this, skip, limit); 395 } 396 397 @Override 398 public final Stream<P_OUT> limit(long maxSize) { 399 if (maxSize < 0) 400 throw new IllegalArgumentException(Long.toString(maxSize)); 401 return slice(0, maxSize); 402 } 403 404 @Override 405 public final Stream<P_OUT> substream(long startingOffset) { 406 if (startingOffset < 0) 407 throw new IllegalArgumentException(Long.toString(startingOffset)); 408 if (startingOffset == 0) 409 return this; 410 else 411 return slice(startingOffset, -1); 412 } 413 414 @Override 415 public final Stream<P_OUT> substream(long startingOffset, long endingOffset) { 416 if (startingOffset < 0 || endingOffset < startingOffset) 417 throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset)); 418 return slice(startingOffset, endingOffset - startingOffset); 419 } 420 421 // Terminal operations from Stream 422 423 @Override 424 public void forEach(Consumer<? super P_OUT> action) { 425 evaluate(ForEachOps.makeRef(action, false)); 426 } 427 428 @Override 429 public void forEachOrdered(Consumer<? super P_OUT> action) { 430 evaluate(ForEachOps.makeRef(action, true)); 431 } 432 433 @Override 434 @SuppressWarnings("unchecked") 435 public final <A> A[] toArray(IntFunction<A[]> generator) { 436 // Since A has no relation to U (not possible to declare that A is an upper bound of U) 437 // there will be no static type checking. 438 // Therefore use a raw type and assume A == U rather than propagating the separation of A and U 439 // throughout the code-base. 440 // The runtime type of U is never checked for equality with the component type of the runtime type of A[]. 441 // Runtime checking will be performed when an element is stored in A[], thus if A is not a 442 // super type of U an ArrayStoreException will be thrown. 443 IntFunction rawGenerator = (IntFunction) generator; 444 return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator) 445 .asArray(rawGenerator); 446 } 447 448 @Override 449 public final Object[] toArray() { 450 return toArray(Object[]::new); 451 } 452 453 @Override 454 public final boolean anyMatch(Predicate<? super P_OUT> predicate) { 455 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY)); 456 } 457 458 @Override 459 public final boolean allMatch(Predicate<? super P_OUT> predicate) { 460 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL)); 461 } 462 463 @Override 464 public final boolean noneMatch(Predicate<? super P_OUT> predicate) { 465 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE)); 466 } 467 468 @Override 469 public final Optional<P_OUT> findFirst() { 470 return evaluate(FindOps.makeRef(true)); 471 } 472 473 @Override 474 public final Optional<P_OUT> findAny() { 475 return evaluate(FindOps.makeRef(false)); 476 } 477 478 @Override 479 public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) { 480 return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator)); 481 } 482 483 @Override 484 public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) { 485 return evaluate(ReduceOps.makeRef(accumulator)); 486 } 487 488 @Override 489 public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) { 490 return evaluate(ReduceOps.makeRef(identity, accumulator, combiner)); 491 } 492 493 @Override 494 public final <R> R collect(Collector<? super P_OUT, R> collector) { 495 if (isParallel() 496 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) 497 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { 498 R container = collector.resultSupplier().get(); 499 BiFunction<R, ? super P_OUT, R> accumulator = collector.accumulator(); 500 forEach(u -> accumulator.apply(container, u)); 501 return container; 502 } 503 return evaluate(ReduceOps.makeRef(collector)); 504 } 505 506 @Override 507 public final <R> R collect(Supplier<R> resultFactory, 508 BiConsumer<R, ? super P_OUT> accumulator, 509 BiConsumer<R, R> combiner) { 510 return evaluate(ReduceOps.makeRef(resultFactory, accumulator, combiner)); 511 } 512 513 @Override 514 public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) { 515 return reduce(Comparators.greaterOf(comparator)); 516 } 517 518 @Override 519 public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) { 520 return reduce(Comparators.lesserOf(comparator)); 521 522 } 523 524 @Override 525 public final long count() { 526 return mapToLong(e -> 1L).sum(); 527 } 528 529 530 // 531 532 /** 533 * Source stage of a ReferencePipeline. 534 * 535 * @param <E_IN> type of elements in the upstream source 536 * @param <E_OUT> type of elements in produced by this stage 537 * @since 1.8 538 */ 539 static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { 540 /** 541 * Constructor for the source stage of a Stream. 542 * 543 * @param source {@code Supplier<Spliterator>} describing the stream 544 * source 545 * @param sourceFlags the source flags for the stream source, described 546 * in {@link StreamOpFlag} 547 */ 548 Head(Supplier<? extends Spliterator<?>> source, 549 int sourceFlags, boolean parallel) { 550 super(source, sourceFlags, parallel); 551 } 552 553 /** 554 * Constructor for the source stage of a Stream. 555 * 556 * @param source {@code Spliterator} describing the stream source 557 * @param sourceFlags the source flags for the stream source, described 558 * in {@link StreamOpFlag} 559 */ 560 Head(Spliterator<?> source, 561 int sourceFlags, boolean parallel) { 562 super(source, sourceFlags, parallel); 563 } 564 565 @Override 566 final boolean opIsStateful() { 567 throw new UnsupportedOperationException(); 568 } 569 570 @Override 571 final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) { 572 throw new UnsupportedOperationException(); 573 } 574 575 // Optimized sequential terminal operations for the head of the pipeline 576 577 @Override 578 public void forEach(Consumer<? super E_OUT> action) { 579 if (!isParallel()) { 580 sourceStageSpliterator().forEachRemaining(action); 581 } 582 else { 583 super.forEach(action); 584 } 585 } 586 587 @Override 588 public void forEachOrdered(Consumer<? super E_OUT> action) { 589 if (!isParallel()) { 590 sourceStageSpliterator().forEachRemaining(action); 591 } 592 else { 593 super.forEachOrdered(action); 594 } 595 } 596 } 597 598 /** 599 * Base class for a stateless intermediate stage of a Stream. 600 * 601 * @param <E_IN> type of elements in the upstream source 602 * @param <E_OUT> type of elements in produced by this stage 603 * @since 1.8 604 */ 605 abstract static class StatelessOp<E_IN, E_OUT> 606 extends ReferencePipeline<E_IN, E_OUT> { 607 /** 608 * Construct a new Stream by appending a stateless intermediate 609 * operation to an existing stream. 610 * 611 * @param upstream The upstream pipeline stage 612 * @param inputShape The stream shape for the upstream pipeline stage 613 * @param opFlags Operation flags for the new stage 614 */ 615 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 616 StreamShape inputShape, 617 int opFlags) { 618 super(upstream, opFlags); 619 assert upstream.getOutputShape() == inputShape; 620 } 621 622 @Override 623 final boolean opIsStateful() { 624 return false; 625 } 626 } 627 628 /** 629 * Base class for a stateful intermediate stage of a Stream. 630 * 631 * @param <E_IN> type of elements in the upstream source 632 * @param <E_OUT> type of elements in produced by this stage 633 * @since 1.8 634 */ 635 abstract static class StatefulOp<E_IN, E_OUT> 636 extends ReferencePipeline<E_IN, E_OUT> { 637 /** 638 * Construct a new Stream by appending a stateful intermediate operation 639 * to an existing stream. 640 * @param upstream The upstream pipeline stage 641 * @param inputShape The stream shape for the upstream pipeline stage 642 * @param opFlags Operation flags for the new stage 643 */ 644 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 645 StreamShape inputShape, 646 int opFlags) { 647 super(upstream, opFlags); 648 assert upstream.getOutputShape() == inputShape; 649 } 650 651 @Override 652 final boolean opIsStateful() { 653 return true; 654 } 655 656 @Override 657 abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper, 658 Spliterator<P_IN> spliterator, 659 IntFunction<E_OUT[]> generator); 660 } 661 }