1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Comparator; 28 import java.util.Iterator; 29 import java.util.Objects; 30 import java.util.Optional; 31 import java.util.Spliterator; 32 import java.util.Spliterators; 33 import java.util.function.BiConsumer; 34 import java.util.function.BiFunction; 35 import java.util.function.BinaryOperator; 36 import java.util.function.Consumer; 37 import java.util.function.DoubleConsumer; 38 import java.util.function.Function; 39 import java.util.function.IntConsumer; 40 import java.util.function.IntFunction; 41 import java.util.function.LongConsumer; 42 import java.util.function.Predicate; 43 import java.util.function.Supplier; 44 import java.util.function.ToDoubleFunction; 45 import java.util.function.ToIntFunction; 46 import java.util.function.ToLongFunction; 47 48 /** 49 * Abstract base class for an intermediate pipeline stage or pipeline source 50 * stage implementing whose elements are of type {@code U}. 51 * 52 * @param <P_IN> type of elements in the upstream source 53 * @param <P_OUT> type of elements in produced by this stage 54 * 55 * @since 1.8 56 */ 57 abstract class ReferencePipeline<P_IN, P_OUT> 58 extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>> 59 implements Stream<P_OUT> { 60 61 /** 62 * Constructor for the head of a stream pipeline. 63 * 64 * @param source {@code Supplier<Spliterator>} describing the stream source 65 * @param sourceFlags the source flags for the stream source, described in 66 * {@link StreamOpFlag} 67 * @param parallel {@code true} if the pipeline is parallel 68 */ 69 ReferencePipeline(Supplier<? extends Spliterator<?>> source, 70 int sourceFlags, boolean parallel) { 71 super(source, sourceFlags, parallel); 72 } 73 74 /** 75 * Constructor for the head of a stream pipeline. 76 * 77 * @param source {@code Spliterator} describing the stream source 78 * @param sourceFlags The source flags for the stream source, described in 79 * {@link StreamOpFlag} 80 * @param parallel {@code true} if the pipeline is parallel 81 */ 82 ReferencePipeline(Spliterator<?> source, 83 int sourceFlags, boolean parallel) { 84 super(source, sourceFlags, parallel); 85 } 86 87 /** 88 * Constructor for appending an intermediate operation onto an existing 89 * pipeline. 90 * 91 * @param upstream the upstream element source. 92 */ 93 ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) { 94 super(upstream, opFlags); 95 } 96 97 // Shape-specific methods 98 99 @Override 100 final StreamShape getOutputShape() { 101 return StreamShape.REFERENCE; 102 } 103 104 @Override 105 final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper, 106 Spliterator<P_IN> spliterator, 107 boolean flattenTree, 108 IntFunction<P_OUT[]> generator) { 109 return Nodes.collect(helper, spliterator, flattenTree, generator); 110 } 111 112 @Override 113 final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph, 114 Supplier<Spliterator<P_IN>> supplier, 115 boolean isParallel) { 116 return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel); 117 } 118 119 @Override 120 final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) { 121 return new StreamSpliterators.DelegatingSpliterator<>(supplier); 122 } 123 124 @Override 125 final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { 126 do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)); 127 } 128 129 @Override 130 final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) { 131 return Nodes.builder(exactSizeIfKnown, generator); 132 } 133 134 135 // BaseStream 136 137 @Override 138 public final Iterator<P_OUT> iterator() { 139 return Spliterators.iterator(spliterator()); 140 } 141 142 143 // Stream 144 145 // Stateless intermediate operations from Stream 146 147 @Override 148 public Stream<P_OUT> unordered() { 149 if (!isOrdered()) 150 return this; 151 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) { 152 @Override 153 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 154 return sink; 155 } 156 }; 157 } 158 159 @Override 160 public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { 161 Objects.requireNonNull(predicate); 162 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 163 StreamOpFlag.NOT_SIZED) { 164 @Override 165 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 166 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { 167 @Override 168 public void begin(long size) { 169 downstream.begin(-1); 170 } 171 172 @Override 173 public void accept(P_OUT u) { 174 if (predicate.test(u)) 175 downstream.accept(u); 176 } 177 }; 178 } 179 }; 180 } 181 182 @Override 183 @SuppressWarnings("unchecked") 184 public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { 185 Objects.requireNonNull(mapper); 186 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, 187 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 188 @Override 189 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { 190 return new Sink.ChainedReference<P_OUT, R>(sink) { 191 @Override 192 public void accept(P_OUT u) { 193 downstream.accept(mapper.apply(u)); 194 } 195 }; 196 } 197 }; 198 } 199 200 @Override 201 public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) { 202 Objects.requireNonNull(mapper); 203 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 204 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 205 @Override 206 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { 207 return new Sink.ChainedReference<P_OUT, Integer>(sink) { 208 @Override 209 public void accept(P_OUT u) { 210 downstream.accept(mapper.applyAsInt(u)); 211 } 212 }; 213 } 214 }; 215 } 216 217 @Override 218 public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) { 219 Objects.requireNonNull(mapper); 220 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 221 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 222 @Override 223 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { 224 return new Sink.ChainedReference<P_OUT, Long>(sink) { 225 @Override 226 public void accept(P_OUT u) { 227 downstream.accept(mapper.applyAsLong(u)); 228 } 229 }; 230 } 231 }; 232 } 233 234 @Override 235 public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) { 236 Objects.requireNonNull(mapper); 237 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 238 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 239 @Override 240 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { 241 return new Sink.ChainedReference<P_OUT, Double>(sink) { 242 @Override 243 public void accept(P_OUT u) { 244 downstream.accept(mapper.applyAsDouble(u)); 245 } 246 }; 247 } 248 }; 249 } 250 251 @Override 252 public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { 253 Objects.requireNonNull(mapper); 254 // We can do better than this, by polling cancellationRequested when stream is infinite 255 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, 256 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 257 @Override 258 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { 259 return new Sink.ChainedReference<P_OUT, R>(sink) { 260 @Override 261 public void begin(long size) { 262 downstream.begin(-1); 263 } 264 265 @Override 266 public void accept(P_OUT u) { 267 try (Stream<? extends R> result = mapper.apply(u)) { 268 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 269 if (result != null) 270 result.sequential().forEach(downstream); 271 } 272 } 273 }; 274 } 275 }; 276 } 277 278 @Override 279 public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) { 280 Objects.requireNonNull(mapper); 281 // We can do better than this, by polling cancellationRequested when stream is infinite 282 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 283 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 284 @Override 285 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { 286 return new Sink.ChainedReference<P_OUT, Integer>(sink) { 287 IntConsumer downstreamAsInt = downstream::accept; 288 @Override 289 public void begin(long size) { 290 downstream.begin(-1); 291 } 292 293 @Override 294 public void accept(P_OUT u) { 295 try (IntStream result = mapper.apply(u)) { 296 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 297 if (result != null) 298 result.sequential().forEach(downstreamAsInt); 299 } 300 } 301 }; 302 } 303 }; 304 } 305 306 @Override 307 public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) { 308 Objects.requireNonNull(mapper); 309 // We can do better than this, by polling cancellationRequested when stream is infinite 310 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 311 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 312 @Override 313 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { 314 return new Sink.ChainedReference<P_OUT, Double>(sink) { 315 DoubleConsumer downstreamAsDouble = downstream::accept; 316 @Override 317 public void begin(long size) { 318 downstream.begin(-1); 319 } 320 321 @Override 322 public void accept(P_OUT u) { 323 try (DoubleStream result = mapper.apply(u)) { 324 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 325 if (result != null) 326 result.sequential().forEach(downstreamAsDouble); 327 } 328 } 329 }; 330 } 331 }; 332 } 333 334 @Override 335 public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) { 336 Objects.requireNonNull(mapper); 337 // We can do better than this, by polling cancellationRequested when stream is infinite 338 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 339 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 340 @Override 341 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { 342 return new Sink.ChainedReference<P_OUT, Long>(sink) { 343 LongConsumer downstreamAsLong = downstream::accept; 344 @Override 345 public void begin(long size) { 346 downstream.begin(-1); 347 } 348 349 @Override 350 public void accept(P_OUT u) { 351 try (LongStream result = mapper.apply(u)) { 352 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 353 if (result != null) 354 result.sequential().forEach(downstreamAsLong); 355 } 356 } 357 }; 358 } 359 }; 360 } 361 362 @Override 363 public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) { 364 Objects.requireNonNull(action); 365 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 366 0) { 367 @Override 368 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 369 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { 370 @Override 371 public void accept(P_OUT u) { 372 action.accept(u); 373 downstream.accept(u); 374 } 375 }; 376 } 377 }; 378 } 379 380 // Stateful intermediate operations from Stream 381 382 @Override 383 public final Stream<P_OUT> distinct() { 384 return DistinctOps.makeRef(this); 385 } 386 387 @Override 388 public final Stream<P_OUT> sorted() { 389 return SortedOps.makeRef(this); 390 } 391 392 @Override 393 public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) { 394 return SortedOps.makeRef(this, comparator); 395 } 396 397 private Stream<P_OUT> slice(long skip, long limit) { 398 return SliceOps.makeRef(this, skip, limit); 399 } 400 401 @Override 402 public final Stream<P_OUT> limit(long maxSize) { 403 if (maxSize < 0) 404 throw new IllegalArgumentException(Long.toString(maxSize)); 405 return slice(0, maxSize); 406 } 407 408 @Override 409 public final Stream<P_OUT> substream(long startingOffset) { 410 if (startingOffset < 0) 411 throw new IllegalArgumentException(Long.toString(startingOffset)); 412 if (startingOffset == 0) 413 return this; 414 else 415 return slice(startingOffset, -1); 416 } 417 418 @Override 419 public final Stream<P_OUT> substream(long startingOffset, long endingOffset) { 420 if (startingOffset < 0 || endingOffset < startingOffset) 421 throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset)); 422 return slice(startingOffset, endingOffset - startingOffset); 423 } 424 425 // Terminal operations from Stream 426 427 @Override 428 public void forEach(Consumer<? super P_OUT> action) { 429 evaluate(ForEachOps.makeRef(action, false)); 430 } 431 432 @Override 433 public void forEachOrdered(Consumer<? super P_OUT> action) { 434 evaluate(ForEachOps.makeRef(action, true)); 435 } 436 437 @Override 438 @SuppressWarnings("unchecked") 439 public final <A> A[] toArray(IntFunction<A[]> generator) { 440 // Since A has no relation to U (not possible to declare that A is an upper bound of U) 441 // there will be no static type checking. 442 // Therefore use a raw type and assume A == U rather than propagating the separation of A and U 443 // throughout the code-base. 444 // The runtime type of U is never checked for equality with the component type of the runtime type of A[]. 445 // Runtime checking will be performed when an element is stored in A[], thus if A is not a 446 // super type of U an ArrayStoreException will be thrown. 447 @SuppressWarnings("rawtypes") 448 IntFunction rawGenerator = (IntFunction) generator; 449 return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator) 450 .asArray(rawGenerator); 451 } 452 453 @Override 454 public final Object[] toArray() { 455 return toArray(Object[]::new); 456 } 457 458 @Override 459 public final boolean anyMatch(Predicate<? super P_OUT> predicate) { 460 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY)); 461 } 462 463 @Override 464 public final boolean allMatch(Predicate<? super P_OUT> predicate) { 465 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL)); 466 } 467 468 @Override 469 public final boolean noneMatch(Predicate<? super P_OUT> predicate) { 470 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE)); 471 } 472 473 @Override 474 public final Optional<P_OUT> findFirst() { 475 return evaluate(FindOps.makeRef(true)); 476 } 477 478 @Override 479 public final Optional<P_OUT> findAny() { 480 return evaluate(FindOps.makeRef(false)); 481 } 482 483 @Override 484 public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) { 485 return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator)); 486 } 487 488 @Override 489 public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) { 490 return evaluate(ReduceOps.makeRef(accumulator)); 491 } 492 493 @Override 494 public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) { 495 return evaluate(ReduceOps.makeRef(identity, accumulator, combiner)); 496 } 497 498 @Override 499 @SuppressWarnings("unchecked") 500 public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { 501 A container; 502 if (isParallel() 503 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) 504 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { 505 container = collector.supplier().get(); 506 BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); 507 forEach(u -> accumulator.accept(container, u)); 508 } 509 else { 510 container = evaluate(ReduceOps.makeRef(collector)); 511 } 512 return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) 513 ? (R) container 514 : collector.finisher().apply(container); 515 } 516 517 @Override 518 public final <R> R collect(Supplier<R> supplier, 519 BiConsumer<R, ? super P_OUT> accumulator, 520 BiConsumer<R, R> combiner) { 521 return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner)); 522 } 523 524 @Override 525 public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) { 526 return reduce(BinaryOperator.maxBy(comparator)); 527 } 528 529 @Override 530 public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) { 531 return reduce(BinaryOperator.minBy(comparator)); 532 533 } 534 535 @Override 536 public final long count() { 537 return mapToLong(e -> 1L).sum(); 538 } 539 540 541 // 542 543 /** 544 * Source stage of a ReferencePipeline. 545 * 546 * @param <E_IN> type of elements in the upstream source 547 * @param <E_OUT> type of elements in produced by this stage 548 * @since 1.8 549 */ 550 static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { 551 /** 552 * Constructor for the source stage of a Stream. 553 * 554 * @param source {@code Supplier<Spliterator>} describing the stream 555 * source 556 * @param sourceFlags the source flags for the stream source, described 557 * in {@link StreamOpFlag} 558 */ 559 Head(Supplier<? extends Spliterator<?>> source, 560 int sourceFlags, boolean parallel) { 561 super(source, sourceFlags, parallel); 562 } 563 564 /** 565 * Constructor for the source stage of a Stream. 566 * 567 * @param source {@code Spliterator} describing the stream source 568 * @param sourceFlags the source flags for the stream source, described 569 * in {@link StreamOpFlag} 570 */ 571 Head(Spliterator<?> source, 572 int sourceFlags, boolean parallel) { 573 super(source, sourceFlags, parallel); 574 } 575 576 @Override 577 final boolean opIsStateful() { 578 throw new UnsupportedOperationException(); 579 } 580 581 @Override 582 final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) { 583 throw new UnsupportedOperationException(); 584 } 585 586 // Optimized sequential terminal operations for the head of the pipeline 587 588 @Override 589 public void forEach(Consumer<? super E_OUT> action) { 590 if (!isParallel()) { 591 sourceStageSpliterator().forEachRemaining(action); 592 } 593 else { 594 super.forEach(action); 595 } 596 } 597 598 @Override 599 public void forEachOrdered(Consumer<? super E_OUT> action) { 600 if (!isParallel()) { 601 sourceStageSpliterator().forEachRemaining(action); 602 } 603 else { 604 super.forEachOrdered(action); 605 } 606 } 607 } 608 609 /** 610 * Base class for a stateless intermediate stage of a Stream. 611 * 612 * @param <E_IN> type of elements in the upstream source 613 * @param <E_OUT> type of elements in produced by this stage 614 * @since 1.8 615 */ 616 abstract static class StatelessOp<E_IN, E_OUT> 617 extends ReferencePipeline<E_IN, E_OUT> { 618 /** 619 * Construct a new Stream by appending a stateless intermediate 620 * operation to an existing stream. 621 * 622 * @param upstream The upstream pipeline stage 623 * @param inputShape The stream shape for the upstream pipeline stage 624 * @param opFlags Operation flags for the new stage 625 */ 626 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 627 StreamShape inputShape, 628 int opFlags) { 629 super(upstream, opFlags); 630 assert upstream.getOutputShape() == inputShape; 631 } 632 633 @Override 634 final boolean opIsStateful() { 635 return false; 636 } 637 } 638 639 /** 640 * Base class for a stateful intermediate stage of a Stream. 641 * 642 * @param <E_IN> type of elements in the upstream source 643 * @param <E_OUT> type of elements in produced by this stage 644 * @since 1.8 645 */ 646 abstract static class StatefulOp<E_IN, E_OUT> 647 extends ReferencePipeline<E_IN, E_OUT> { 648 /** 649 * Construct a new Stream by appending a stateful intermediate operation 650 * to an existing stream. 651 * @param upstream The upstream pipeline stage 652 * @param inputShape The stream shape for the upstream pipeline stage 653 * @param opFlags Operation flags for the new stage 654 */ 655 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 656 StreamShape inputShape, 657 int opFlags) { 658 super(upstream, opFlags); 659 assert upstream.getOutputShape() == inputShape; 660 } 661 662 @Override 663 final boolean opIsStateful() { 664 return true; 665 } 666 667 @Override 668 abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper, 669 Spliterator<P_IN> spliterator, 670 IntFunction<E_OUT[]> generator); 671 } 672 }