1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Comparator; 28 import java.util.Iterator; 29 import java.util.Objects; 30 import java.util.Optional; 31 import java.util.Spliterator; 32 import java.util.Spliterators; 33 import java.util.function.BiConsumer; 34 import java.util.function.BiFunction; 35 import java.util.function.BinaryOperator; 36 import java.util.function.Consumer; 37 import java.util.function.DoubleConsumer; 38 import java.util.function.Function; 39 import java.util.function.IntConsumer; 40 import java.util.function.IntFunction; 41 import java.util.function.LongConsumer; 42 import java.util.function.Predicate; 43 import java.util.function.Supplier; 44 import java.util.function.ToDoubleFunction; 45 import java.util.function.ToIntFunction; 46 import java.util.function.ToLongFunction; 47 48 /** 49 * Abstract base class for an intermediate pipeline stage or pipeline source 50 * stage implementing whose elements are of type {@code U}. 51 * 52 * @param <P_IN> type of elements in the upstream source 53 * @param <P_OUT> type of elements in produced by this stage 54 * 55 * @since 1.8 56 */ 57 abstract class ReferencePipeline<P_IN, P_OUT> 58 extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>> 59 implements Stream<P_OUT> { 60 61 /** 62 * Constructor for the head of a stream pipeline. 63 * 64 * @param source {@code Supplier<Spliterator>} describing the stream source 65 * @param sourceFlags the source flags for the stream source, described in 66 * {@link StreamOpFlag} 67 * @param parallel {@code true} if the pipeline is parallel 68 */ 69 ReferencePipeline(Supplier<? extends Spliterator<?>> source, 70 int sourceFlags, boolean parallel) { 71 super(source, sourceFlags, parallel); 72 } 73 74 /** 75 * Constructor for the head of a stream pipeline. 76 * 77 * @param source {@code Spliterator} describing the stream source 78 * @param sourceFlags The source flags for the stream source, described in 79 * {@link StreamOpFlag} 80 * @param parallel {@code true} if the pipeline is parallel 81 */ 82 ReferencePipeline(Spliterator<?> source, 83 int sourceFlags, boolean parallel) { 84 super(source, sourceFlags, parallel); 85 } 86 87 /** 88 * Constructor for appending an intermediate operation onto an existing 89 * pipeline. 90 * 91 * @param upstream the upstream element source. 92 */ 93 ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) { 94 super(upstream, opFlags); 95 } 96 97 // Shape-specific methods 98 99 @Override 100 final StreamShape getOutputShape() { 101 return StreamShape.REFERENCE; 102 } 103 104 @Override 105 final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper, 106 Spliterator<P_IN> spliterator, 107 boolean flattenTree, 108 IntFunction<P_OUT[]> generator) { 109 return Nodes.collect(helper, spliterator, flattenTree, generator); 110 } 111 112 @Override 113 final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph, 114 Supplier<Spliterator<P_IN>> supplier, 115 boolean isParallel) { 116 return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel); 117 } 118 119 @Override 120 final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) { 121 return new StreamSpliterators.DelegatingSpliterator<>(supplier); 122 } 123 124 @Override 125 final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { 126 do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)); 127 } 128 129 @Override 130 final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) { 131 return Nodes.builder(exactSizeIfKnown, generator); 132 } 133 134 135 // BaseStream 136 137 @Override 138 public final Iterator<P_OUT> iterator() { 139 return Spliterators.iterator(spliterator()); 140 } 141 142 143 // Stream 144 145 // Stateless intermediate operations from Stream 146 147 @Override 148 public Stream<P_OUT> unordered() { 149 if (!isOrdered()) 150 return this; 151 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) { 152 @Override 153 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 154 return sink; 155 } 156 }; 157 } 158 159 @Override 160 public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { 161 Objects.requireNonNull(predicate); 162 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 163 StreamOpFlag.NOT_SIZED) { 164 @Override 165 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 166 return new Sink.ChainedReference<P_OUT>(sink) { 167 @Override 168 public void begin(long size) { 169 downstream.begin(-1); 170 } 171 172 @Override 173 public void accept(P_OUT u) { 174 if (predicate.test(u)) 175 downstream.accept(u); 176 } 177 }; 178 } 179 }; 180 } 181 182 @Override 183 public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { 184 Objects.requireNonNull(mapper); 185 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, 186 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 187 @Override 188 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { 189 return new Sink.ChainedReference<P_OUT>(sink) { 190 @Override 191 public void accept(P_OUT u) { 192 downstream.accept(mapper.apply(u)); 193 } 194 }; 195 } 196 }; 197 } 198 199 @Override 200 public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) { 201 Objects.requireNonNull(mapper); 202 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 203 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 204 @Override 205 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { 206 return new Sink.ChainedReference<P_OUT>(sink) { 207 @Override 208 public void accept(P_OUT u) { 209 downstream.accept(mapper.applyAsInt(u)); 210 } 211 }; 212 } 213 }; 214 } 215 216 @Override 217 public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) { 218 Objects.requireNonNull(mapper); 219 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 220 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 221 @Override 222 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { 223 return new Sink.ChainedReference<P_OUT>(sink) { 224 @Override 225 public void accept(P_OUT u) { 226 downstream.accept(mapper.applyAsLong(u)); 227 } 228 }; 229 } 230 }; 231 } 232 233 @Override 234 public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) { 235 Objects.requireNonNull(mapper); 236 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 237 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 238 @Override 239 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { 240 return new Sink.ChainedReference<P_OUT>(sink) { 241 @Override 242 public void accept(P_OUT u) { 243 downstream.accept(mapper.applyAsDouble(u)); 244 } 245 }; 246 } 247 }; 248 } 249 250 @Override 251 public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { 252 Objects.requireNonNull(mapper); 253 // We can do better than this, by polling cancellationRequested when stream is infinite 254 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, 255 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 256 @Override 257 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { 258 return new Sink.ChainedReference<P_OUT>(sink) { 259 @Override 260 public void begin(long size) { 261 downstream.begin(-1); 262 } 263 264 @Override 265 public void accept(P_OUT u) { 266 try (Stream<? extends R> result = mapper.apply(u)) { 267 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 268 if (result != null) 269 result.sequential().forEach(downstream); 270 } 271 } 272 }; 273 } 274 }; 275 } 276 277 @Override 278 public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) { 279 Objects.requireNonNull(mapper); 280 // We can do better than this, by polling cancellationRequested when stream is infinite 281 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 282 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 283 @Override 284 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) { 285 return new Sink.ChainedReference<P_OUT>(sink) { 286 IntConsumer downstreamAsInt = downstream::accept; 287 @Override 288 public void begin(long size) { 289 downstream.begin(-1); 290 } 291 292 @Override 293 public void accept(P_OUT u) { 294 try (IntStream result = mapper.apply(u)) { 295 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 296 if (result != null) 297 result.sequential().forEach(downstreamAsInt); 298 } 299 } 300 }; 301 } 302 }; 303 } 304 305 @Override 306 public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) { 307 Objects.requireNonNull(mapper); 308 // We can do better than this, by polling cancellationRequested when stream is infinite 309 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 310 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 311 @Override 312 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) { 313 return new Sink.ChainedReference<P_OUT>(sink) { 314 DoubleConsumer downstreamAsDouble = downstream::accept; 315 @Override 316 public void begin(long size) { 317 downstream.begin(-1); 318 } 319 320 @Override 321 public void accept(P_OUT u) { 322 try (DoubleStream result = mapper.apply(u)) { 323 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 324 if (result != null) 325 result.sequential().forEach(downstreamAsDouble); 326 } 327 } 328 }; 329 } 330 }; 331 } 332 333 @Override 334 public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) { 335 Objects.requireNonNull(mapper); 336 // We can do better than this, by polling cancellationRequested when stream is infinite 337 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE, 338 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 339 @Override 340 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) { 341 return new Sink.ChainedReference<P_OUT>(sink) { 342 LongConsumer downstreamAsLong = downstream::accept; 343 @Override 344 public void begin(long size) { 345 downstream.begin(-1); 346 } 347 348 @Override 349 public void accept(P_OUT u) { 350 try (LongStream result = mapper.apply(u)) { 351 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 352 if (result != null) 353 result.sequential().forEach(downstreamAsLong); 354 } 355 } 356 }; 357 } 358 }; 359 } 360 361 @Override 362 public final Stream<P_OUT> peek(Consumer<? super P_OUT> tee) { 363 Objects.requireNonNull(tee); 364 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 365 0) { 366 @Override 367 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { 368 return new Sink.ChainedReference<P_OUT>(sink) { 369 @Override 370 public void accept(P_OUT u) { 371 tee.accept(u); 372 downstream.accept(u); 373 } 374 }; 375 } 376 }; 377 } 378 379 // Stateful intermediate operations from Stream 380 381 @Override 382 public final Stream<P_OUT> distinct() { 383 return DistinctOps.makeRef(this); 384 } 385 386 @Override 387 public final Stream<P_OUT> sorted() { 388 return SortedOps.makeRef(this); 389 } 390 391 @Override 392 public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) { 393 return SortedOps.makeRef(this, comparator); 394 } 395 396 private Stream<P_OUT> slice(long skip, long limit) { 397 return SliceOps.makeRef(this, skip, limit); 398 } 399 400 @Override 401 public final Stream<P_OUT> limit(long maxSize) { 402 if (maxSize < 0) 403 throw new IllegalArgumentException(Long.toString(maxSize)); 404 return slice(0, maxSize); 405 } 406 407 @Override 408 public final Stream<P_OUT> substream(long startingOffset) { 409 if (startingOffset < 0) 410 throw new IllegalArgumentException(Long.toString(startingOffset)); 411 if (startingOffset == 0) 412 return this; 413 else 414 return slice(startingOffset, -1); 415 } 416 417 @Override 418 public final Stream<P_OUT> substream(long startingOffset, long endingOffset) { 419 if (startingOffset < 0 || endingOffset < startingOffset) 420 throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset)); 421 return slice(startingOffset, endingOffset - startingOffset); 422 } 423 424 // Terminal operations from Stream 425 426 @Override 427 public void forEach(Consumer<? super P_OUT> action) { 428 evaluate(ForEachOps.makeRef(action, false)); 429 } 430 431 @Override 432 public void forEachOrdered(Consumer<? super P_OUT> action) { 433 evaluate(ForEachOps.makeRef(action, true)); 434 } 435 436 @Override 437 @SuppressWarnings("unchecked") 438 public final <A> A[] toArray(IntFunction<A[]> generator) { 439 // Since A has no relation to U (not possible to declare that A is an upper bound of U) 440 // there will be no static type checking. 441 // Therefore use a raw type and assume A == U rather than propagating the separation of A and U 442 // throughout the code-base. 443 // The runtime type of U is never checked for equality with the component type of the runtime type of A[]. 444 // Runtime checking will be performed when an element is stored in A[], thus if A is not a 445 // super type of U an ArrayStoreException will be thrown. 446 IntFunction rawGenerator = (IntFunction) generator; 447 return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator) 448 .asArray(rawGenerator); 449 } 450 451 @Override 452 public final Object[] toArray() { 453 return toArray(Object[]::new); 454 } 455 456 @Override 457 public final boolean anyMatch(Predicate<? super P_OUT> predicate) { 458 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY)); 459 } 460 461 @Override 462 public final boolean allMatch(Predicate<? super P_OUT> predicate) { 463 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL)); 464 } 465 466 @Override 467 public final boolean noneMatch(Predicate<? super P_OUT> predicate) { 468 return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE)); 469 } 470 471 @Override 472 public final Optional<P_OUT> findFirst() { 473 return evaluate(FindOps.makeRef(true)); 474 } 475 476 @Override 477 public final Optional<P_OUT> findAny() { 478 return evaluate(FindOps.makeRef(false)); 479 } 480 481 @Override 482 public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) { 483 return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator)); 484 } 485 486 @Override 487 public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) { 488 return evaluate(ReduceOps.makeRef(accumulator)); 489 } 490 491 @Override 492 public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) { 493 return evaluate(ReduceOps.makeRef(identity, accumulator, combiner)); 494 } 495 496 @Override 497 public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { 498 A container; 499 if (isParallel() 500 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) 501 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { 502 container = collector.supplier().get(); 503 BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); 504 forEach(u -> accumulator.accept(container, u)); 505 } 506 else { 507 container = evaluate(ReduceOps.makeRef(collector)); 508 } 509 return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) 510 ? (R) container 511 : collector.finisher().apply(container); 512 } 513 514 @Override 515 public final <R> R collect(Supplier<R> resultFactory, 516 BiConsumer<R, ? super P_OUT> accumulator, 517 BiConsumer<R, R> combiner) { 518 return evaluate(ReduceOps.makeRef(resultFactory, accumulator, combiner)); 519 } 520 521 @Override 522 public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) { 523 return reduce(BinaryOperator.maxBy(comparator)); 524 } 525 526 @Override 527 public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) { 528 return reduce(BinaryOperator.minBy(comparator)); 529 530 } 531 532 @Override 533 public final long count() { 534 return mapToLong(e -> 1L).sum(); 535 } 536 537 538 // 539 540 /** 541 * Source stage of a ReferencePipeline. 542 * 543 * @param <E_IN> type of elements in the upstream source 544 * @param <E_OUT> type of elements in produced by this stage 545 * @since 1.8 546 */ 547 static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { 548 /** 549 * Constructor for the source stage of a Stream. 550 * 551 * @param source {@code Supplier<Spliterator>} describing the stream 552 * source 553 * @param sourceFlags the source flags for the stream source, described 554 * in {@link StreamOpFlag} 555 */ 556 Head(Supplier<? extends Spliterator<?>> source, 557 int sourceFlags, boolean parallel) { 558 super(source, sourceFlags, parallel); 559 } 560 561 /** 562 * Constructor for the source stage of a Stream. 563 * 564 * @param source {@code Spliterator} describing the stream source 565 * @param sourceFlags the source flags for the stream source, described 566 * in {@link StreamOpFlag} 567 */ 568 Head(Spliterator<?> source, 569 int sourceFlags, boolean parallel) { 570 super(source, sourceFlags, parallel); 571 } 572 573 @Override 574 final boolean opIsStateful() { 575 throw new UnsupportedOperationException(); 576 } 577 578 @Override 579 final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) { 580 throw new UnsupportedOperationException(); 581 } 582 583 // Optimized sequential terminal operations for the head of the pipeline 584 585 @Override 586 public void forEach(Consumer<? super E_OUT> action) { 587 if (!isParallel()) { 588 sourceStageSpliterator().forEachRemaining(action); 589 } 590 else { 591 super.forEach(action); 592 } 593 } 594 595 @Override 596 public void forEachOrdered(Consumer<? super E_OUT> action) { 597 if (!isParallel()) { 598 sourceStageSpliterator().forEachRemaining(action); 599 } 600 else { 601 super.forEachOrdered(action); 602 } 603 } 604 } 605 606 /** 607 * Base class for a stateless intermediate stage of a Stream. 608 * 609 * @param <E_IN> type of elements in the upstream source 610 * @param <E_OUT> type of elements in produced by this stage 611 * @since 1.8 612 */ 613 abstract static class StatelessOp<E_IN, E_OUT> 614 extends ReferencePipeline<E_IN, E_OUT> { 615 /** 616 * Construct a new Stream by appending a stateless intermediate 617 * operation to an existing stream. 618 * 619 * @param upstream The upstream pipeline stage 620 * @param inputShape The stream shape for the upstream pipeline stage 621 * @param opFlags Operation flags for the new stage 622 */ 623 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 624 StreamShape inputShape, 625 int opFlags) { 626 super(upstream, opFlags); 627 assert upstream.getOutputShape() == inputShape; 628 } 629 630 @Override 631 final boolean opIsStateful() { 632 return false; 633 } 634 } 635 636 /** 637 * Base class for a stateful intermediate stage of a Stream. 638 * 639 * @param <E_IN> type of elements in the upstream source 640 * @param <E_OUT> type of elements in produced by this stage 641 * @since 1.8 642 */ 643 abstract static class StatefulOp<E_IN, E_OUT> 644 extends ReferencePipeline<E_IN, E_OUT> { 645 /** 646 * Construct a new Stream by appending a stateful intermediate operation 647 * to an existing stream. 648 * @param upstream The upstream pipeline stage 649 * @param inputShape The stream shape for the upstream pipeline stage 650 * @param opFlags Operation flags for the new stage 651 */ 652 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 653 StreamShape inputShape, 654 int opFlags) { 655 super(upstream, opFlags); 656 assert upstream.getOutputShape() == inputShape; 657 } 658 659 @Override 660 final boolean opIsStateful() { 661 return true; 662 } 663 664 @Override 665 abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper, 666 Spliterator<P_IN> spliterator, 667 IntFunction<E_OUT[]> generator); 668 } 669 }