1 /* 2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.IntSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.PrimitiveIterator; 32 import java.util.Spliterator; 33 import java.util.Spliterators; 34 import java.util.function.BiConsumer; 35 import java.util.function.BinaryOperator; 36 import java.util.function.IntBinaryOperator; 37 import java.util.function.IntConsumer; 38 import java.util.function.IntFunction; 39 import java.util.function.IntPredicate; 40 import java.util.function.IntToDoubleFunction; 41 import java.util.function.IntToLongFunction; 42 import java.util.function.IntUnaryOperator; 43 import java.util.function.ObjIntConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code int}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * @since 1.8 52 */ 53 abstract class IntPipeline<E_IN> 54 extends AbstractPipeline<E_IN, Integer, IntStream> 55 implements IntStream { 56 57 /** 58 * Constructor for the head of a stream pipeline. 59 * 60 * @param source {@code Supplier<Spliterator>} describing the stream source 61 * @param sourceFlags The source flags for the stream source, described in 62 * {@link StreamOpFlag} 63 * @param parallel {@code true} if the pipeline is parallel 64 */ 65 IntPipeline(Supplier<? extends Spliterator<Integer>> source, 66 int sourceFlags, boolean parallel) { 67 super(source, sourceFlags, parallel); 68 } 69 70 /** 71 * Constructor for the head of a stream pipeline. 72 * 73 * @param source {@code Spliterator} describing the stream source 74 * @param sourceFlags The source flags for the stream source, described in 75 * {@link StreamOpFlag} 76 * @param parallel {@code true} if the pipeline is parallel 77 */ 78 IntPipeline(Spliterator<Integer> source, 79 int sourceFlags, boolean parallel) { 80 super(source, sourceFlags, parallel); 81 } 82 83 /** 84 * Constructor for appending an intermediate operation onto an existing 85 * pipeline. 86 * 87 * @param upstream the upstream element source 88 * @param opFlags the operation flags for the new operation 89 */ 90 IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 91 super(upstream, opFlags); 92 } 93 94 /** 95 * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply 96 * by casting. 97 */ 98 private static IntConsumer adapt(Sink<Integer> sink) { 99 if (sink instanceof IntConsumer) { 100 return (IntConsumer) sink; 101 } 102 else { 103 if (Tripwire.ENABLED) 104 Tripwire.trip(AbstractPipeline.class, 105 "using IntStream.adapt(Sink<Integer> s)"); 106 return sink::accept; 107 } 108 } 109 110 /** 111 * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}. 112 * 113 * @implNote 114 * The implementation attempts to cast to a Spliterator.OfInt, and throws an 115 * exception if this cast is not possible. 116 */ 117 private static Spliterator.OfInt adapt(Spliterator<Integer> s) { 118 if (s instanceof Spliterator.OfInt) { 119 return (Spliterator.OfInt) s; 120 } 121 else { 122 if (Tripwire.ENABLED) 123 Tripwire.trip(AbstractPipeline.class, 124 "using IntStream.adapt(Spliterator<Integer> s)"); 125 throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)"); 126 } 127 } 128 129 130 // Shape-specific methods 131 132 @Override 133 final StreamShape getOutputShape() { 134 return StreamShape.INT_VALUE; 135 } 136 137 @Override 138 final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper, 139 Spliterator<P_IN> spliterator, 140 boolean flattenTree, 141 IntFunction<Integer[]> generator) { 142 return Nodes.collectInt(helper, spliterator, flattenTree); 143 } 144 145 @Override 146 final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph, 147 Supplier<Spliterator<P_IN>> supplier, 148 boolean isParallel) { 149 return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel); 150 } 151 152 @Override 153 @SuppressWarnings("unchecked") 154 final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) { 155 return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier); 156 } 157 158 @Override 159 final boolean forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) { 160 Spliterator.OfInt spl = adapt(spliterator); 161 IntConsumer adaptedSink = adapt(sink); 162 boolean cancelled; 163 do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink)); 164 return cancelled; 165 } 166 167 @Override 168 final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown, 169 IntFunction<Integer[]> generator) { 170 return Nodes.intBuilder(exactSizeIfKnown); 171 } 172 173 174 // IntStream 175 176 @Override 177 public final PrimitiveIterator.OfInt iterator() { 178 return Spliterators.iterator(spliterator()); 179 } 180 181 @Override 182 public final Spliterator.OfInt spliterator() { 183 return adapt(super.spliterator()); 184 } 185 186 // Stateless intermediate ops from IntStream 187 188 @Override 189 public final LongStream asLongStream() { 190 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 191 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 192 @Override 193 Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 194 return new Sink.ChainedInt<Long>(sink) { 195 @Override 196 public void accept(int t) { 197 downstream.accept((long) t); 198 } 199 }; 200 } 201 }; 202 } 203 204 @Override 205 public final DoubleStream asDoubleStream() { 206 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 207 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 208 @Override 209 Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 210 return new Sink.ChainedInt<Double>(sink) { 211 @Override 212 public void accept(int t) { 213 downstream.accept((double) t); 214 } 215 }; 216 } 217 }; 218 } 219 220 @Override 221 public final Stream<Integer> boxed() { 222 return mapToObj(Integer::valueOf); 223 } 224 225 @Override 226 public final IntStream map(IntUnaryOperator mapper) { 227 Objects.requireNonNull(mapper); 228 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 229 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 230 @Override 231 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 232 return new Sink.ChainedInt<Integer>(sink) { 233 @Override 234 public void accept(int t) { 235 downstream.accept(mapper.applyAsInt(t)); 236 } 237 }; 238 } 239 }; 240 } 241 242 @Override 243 public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) { 244 Objects.requireNonNull(mapper); 245 return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE, 246 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 247 @Override 248 Sink<Integer> opWrapSink(int flags, Sink<U> sink) { 249 return new Sink.ChainedInt<U>(sink) { 250 @Override 251 public void accept(int t) { 252 downstream.accept(mapper.apply(t)); 253 } 254 }; 255 } 256 }; 257 } 258 259 @Override 260 public final LongStream mapToLong(IntToLongFunction mapper) { 261 Objects.requireNonNull(mapper); 262 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 263 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 264 @Override 265 Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 266 return new Sink.ChainedInt<Long>(sink) { 267 @Override 268 public void accept(int t) { 269 downstream.accept(mapper.applyAsLong(t)); 270 } 271 }; 272 } 273 }; 274 } 275 276 @Override 277 public final DoubleStream mapToDouble(IntToDoubleFunction mapper) { 278 Objects.requireNonNull(mapper); 279 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 280 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 281 @Override 282 Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 283 return new Sink.ChainedInt<Double>(sink) { 284 @Override 285 public void accept(int t) { 286 downstream.accept(mapper.applyAsDouble(t)); 287 } 288 }; 289 } 290 }; 291 } 292 293 @Override 294 public final IntStream flatMap(IntFunction<? extends IntStream> mapper) { 295 Objects.requireNonNull(mapper); 296 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 297 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 298 @Override 299 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 300 return new Sink.ChainedInt<Integer>(sink) { 301 @Override 302 public void begin(long size) { 303 downstream.begin(-1); 304 } 305 306 @Override 307 public void accept(int t) { 308 try (IntStream result = mapper.apply(t)) { 309 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 310 if (result != null) 311 result.sequential().forEach(i -> downstream.accept(i)); 312 } 313 } 314 }; 315 } 316 }; 317 } 318 319 @Override 320 public IntStream unordered() { 321 if (!isOrdered()) 322 return this; 323 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) { 324 @Override 325 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 326 return sink; 327 } 328 }; 329 } 330 331 @Override 332 public final IntStream filter(IntPredicate predicate) { 333 Objects.requireNonNull(predicate); 334 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 335 StreamOpFlag.NOT_SIZED) { 336 @Override 337 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 338 return new Sink.ChainedInt<Integer>(sink) { 339 @Override 340 public void begin(long size) { 341 downstream.begin(-1); 342 } 343 344 @Override 345 public void accept(int t) { 346 if (predicate.test(t)) 347 downstream.accept(t); 348 } 349 }; 350 } 351 }; 352 } 353 354 @Override 355 public final IntStream peek(IntConsumer action) { 356 Objects.requireNonNull(action); 357 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 358 0) { 359 @Override 360 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 361 return new Sink.ChainedInt<Integer>(sink) { 362 @Override 363 public void accept(int t) { 364 action.accept(t); 365 downstream.accept(t); 366 } 367 }; 368 } 369 }; 370 } 371 372 // Stateful intermediate ops from IntStream 373 374 @Override 375 public final IntStream limit(long maxSize) { 376 if (maxSize < 0) 377 throw new IllegalArgumentException(Long.toString(maxSize)); 378 return SliceOps.makeInt(this, 0, maxSize); 379 } 380 381 @Override 382 public final IntStream skip(long n) { 383 if (n < 0) 384 throw new IllegalArgumentException(Long.toString(n)); 385 if (n == 0) 386 return this; 387 else 388 return SliceOps.makeInt(this, n, -1); 389 } 390 391 @Override 392 public final IntStream takeWhile(IntPredicate predicate) { 393 return WhileOps.makeTakeWhileInt(this, predicate); 394 } 395 396 @Override 397 public final IntStream dropWhile(IntPredicate predicate) { 398 return WhileOps.makeDropWhileInt(this, predicate); 399 } 400 401 @Override 402 public final IntStream sorted() { 403 return SortedOps.makeInt(this); 404 } 405 406 @Override 407 public final IntStream distinct() { 408 // While functional and quick to implement, this approach is not very efficient. 409 // An efficient version requires an int-specific map/set implementation. 410 return boxed().distinct().mapToInt(i -> i); 411 } 412 413 // Terminal ops from IntStream 414 415 @Override 416 public void forEach(IntConsumer action) { 417 evaluate(ForEachOps.makeInt(action, false)); 418 } 419 420 @Override 421 public void forEachOrdered(IntConsumer action) { 422 evaluate(ForEachOps.makeInt(action, true)); 423 } 424 425 @Override 426 public final int sum() { 427 return reduce(0, Integer::sum); 428 } 429 430 @Override 431 public final OptionalInt min() { 432 return reduce(Math::min); 433 } 434 435 @Override 436 public final OptionalInt max() { 437 return reduce(Math::max); 438 } 439 440 @Override 441 public final long count() { 442 return evaluate(ReduceOps.makeIntCounting()); 443 } 444 445 @Override 446 public final OptionalDouble average() { 447 long[] avg = collect(() -> new long[2], 448 (ll, i) -> { 449 ll[0]++; 450 ll[1] += i; 451 }, 452 (ll, rr) -> { 453 ll[0] += rr[0]; 454 ll[1] += rr[1]; 455 }); 456 return avg[0] > 0 457 ? OptionalDouble.of((double) avg[1] / avg[0]) 458 : OptionalDouble.empty(); 459 } 460 461 @Override 462 public final IntSummaryStatistics summaryStatistics() { 463 return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept, 464 IntSummaryStatistics::combine); 465 } 466 467 @Override 468 public final int reduce(int identity, IntBinaryOperator op) { 469 return evaluate(ReduceOps.makeInt(identity, op)); 470 } 471 472 @Override 473 public final OptionalInt reduce(IntBinaryOperator op) { 474 return evaluate(ReduceOps.makeInt(op)); 475 } 476 477 @Override 478 public final <R> R collect(Supplier<R> supplier, 479 ObjIntConsumer<R> accumulator, 480 BiConsumer<R, R> combiner) { 481 Objects.requireNonNull(combiner); 482 BinaryOperator<R> operator = (left, right) -> { 483 combiner.accept(left, right); 484 return left; 485 }; 486 return evaluate(ReduceOps.makeInt(supplier, accumulator, operator)); 487 } 488 489 @Override 490 public final boolean anyMatch(IntPredicate predicate) { 491 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY)); 492 } 493 494 @Override 495 public final boolean allMatch(IntPredicate predicate) { 496 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL)); 497 } 498 499 @Override 500 public final boolean noneMatch(IntPredicate predicate) { 501 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE)); 502 } 503 504 @Override 505 public final OptionalInt findFirst() { 506 return evaluate(FindOps.makeInt(true)); 507 } 508 509 @Override 510 public final OptionalInt findAny() { 511 return evaluate(FindOps.makeInt(false)); 512 } 513 514 @Override 515 public final int[] toArray() { 516 return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new)) 517 .asPrimitiveArray(); 518 } 519 520 // 521 522 /** 523 * Source stage of an IntStream. 524 * 525 * @param <E_IN> type of elements in the upstream source 526 * @since 1.8 527 */ 528 static class Head<E_IN> extends IntPipeline<E_IN> { 529 /** 530 * Constructor for the source stage of an IntStream. 531 * 532 * @param source {@code Supplier<Spliterator>} describing the stream 533 * source 534 * @param sourceFlags the source flags for the stream source, described 535 * in {@link StreamOpFlag} 536 * @param parallel {@code true} if the pipeline is parallel 537 */ 538 Head(Supplier<? extends Spliterator<Integer>> source, 539 int sourceFlags, boolean parallel) { 540 super(source, sourceFlags, parallel); 541 } 542 543 /** 544 * Constructor for the source stage of an IntStream. 545 * 546 * @param source {@code Spliterator} describing the stream source 547 * @param sourceFlags the source flags for the stream source, described 548 * in {@link StreamOpFlag} 549 * @param parallel {@code true} if the pipeline is parallel 550 */ 551 Head(Spliterator<Integer> source, 552 int sourceFlags, boolean parallel) { 553 super(source, sourceFlags, parallel); 554 } 555 556 @Override 557 final boolean opIsStateful() { 558 throw new UnsupportedOperationException(); 559 } 560 561 @Override 562 final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) { 563 throw new UnsupportedOperationException(); 564 } 565 566 // Optimized sequential terminal operations for the head of the pipeline 567 568 @Override 569 public void forEach(IntConsumer action) { 570 if (!isParallel()) { 571 adapt(sourceStageSpliterator()).forEachRemaining(action); 572 } 573 else { 574 super.forEach(action); 575 } 576 } 577 578 @Override 579 public void forEachOrdered(IntConsumer action) { 580 if (!isParallel()) { 581 adapt(sourceStageSpliterator()).forEachRemaining(action); 582 } 583 else { 584 super.forEachOrdered(action); 585 } 586 } 587 } 588 589 /** 590 * Base class for a stateless intermediate stage of an IntStream 591 * 592 * @param <E_IN> type of elements in the upstream source 593 * @since 1.8 594 */ 595 abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> { 596 /** 597 * Construct a new IntStream by appending a stateless intermediate 598 * operation to an existing stream. 599 * @param upstream The upstream pipeline stage 600 * @param inputShape The stream shape for the upstream pipeline stage 601 * @param opFlags Operation flags for the new stage 602 */ 603 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 604 StreamShape inputShape, 605 int opFlags) { 606 super(upstream, opFlags); 607 assert upstream.getOutputShape() == inputShape; 608 } 609 610 @Override 611 final boolean opIsStateful() { 612 return false; 613 } 614 } 615 616 /** 617 * Base class for a stateful intermediate stage of an IntStream. 618 * 619 * @param <E_IN> type of elements in the upstream source 620 * @since 1.8 621 */ 622 abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> { 623 /** 624 * Construct a new IntStream by appending a stateful intermediate 625 * operation to an existing stream. 626 * @param upstream The upstream pipeline stage 627 * @param inputShape The stream shape for the upstream pipeline stage 628 * @param opFlags Operation flags for the new stage 629 */ 630 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 631 StreamShape inputShape, 632 int opFlags) { 633 super(upstream, opFlags); 634 assert upstream.getOutputShape() == inputShape; 635 } 636 637 @Override 638 final boolean opIsStateful() { 639 return true; 640 } 641 642 @Override 643 abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 644 Spliterator<P_IN> spliterator, 645 IntFunction<Integer[]> generator); 646 } 647 }