1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.IntSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.PrimitiveIterator; 32 import java.util.Spliterator; 33 import java.util.Spliterators; 34 import java.util.function.BiConsumer; 35 import java.util.function.BinaryOperator; 36 import java.util.function.IntBinaryOperator; 37 import java.util.function.IntConsumer; 38 import java.util.function.IntFunction; 39 import java.util.function.IntPredicate; 40 import java.util.function.IntToDoubleFunction; 41 import java.util.function.IntToLongFunction; 42 import java.util.function.IntUnaryOperator; 43 import java.util.function.ObjIntConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code int}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * @since 1.8 52 */ 53 abstract class IntPipeline<E_IN> 54 extends AbstractPipeline<E_IN, Integer, IntStream> 55 implements IntStream { 56 57 /** 58 * Constructor for the head of a stream pipeline. 59 * 60 * @param source {@code Supplier<Spliterator>} describing the stream source 61 * @param sourceFlags The source flags for the stream source, described in 62 * {@link StreamOpFlag} 63 * @param parallel {@code true} if the pipeline is parallel 64 */ 65 IntPipeline(Supplier<? extends Spliterator<Integer>> source, 66 int sourceFlags, boolean parallel) { 67 super(source, sourceFlags, parallel); 68 } 69 70 /** 71 * Constructor for the head of a stream pipeline. 72 * 73 * @param source {@code Spliterator} describing the stream source 74 * @param sourceFlags The source flags for the stream source, described in 75 * {@link StreamOpFlag} 76 * @param parallel {@code true} if the pipeline is parallel 77 */ 78 IntPipeline(Spliterator<Integer> source, 79 int sourceFlags, boolean parallel) { 80 super(source, sourceFlags, parallel); 81 } 82 83 /** 84 * Constructor for appending an intermediate operation onto an existing 85 * pipeline. 86 * 87 * @param upstream the upstream element source 88 * @param opFlags the operation flags for the new operation 89 */ 90 IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 91 super(upstream, opFlags); 92 } 93 94 /** 95 * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply 96 * by casting. 97 */ 98 private static IntConsumer adapt(Sink<Integer> sink) { 99 if (sink instanceof IntConsumer) { 100 return (IntConsumer) sink; 101 } 102 else { 103 if (Tripwire.ENABLED) 104 Tripwire.trip(AbstractPipeline.class, 105 "using IntStream.adapt(Sink<Integer> s)"); 106 return sink::accept; 107 } 108 } 109 110 /** 111 * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}. 112 * 113 * @implNote 114 * The implementation attempts to cast to a Spliterator.OfInt, and throws an 115 * exception if this cast is not possible. 116 */ 117 private static Spliterator.OfInt adapt(Spliterator<Integer> s) { 118 if (s instanceof Spliterator.OfInt) { 119 return (Spliterator.OfInt) s; 120 } 121 else { 122 if (Tripwire.ENABLED) 123 Tripwire.trip(AbstractPipeline.class, 124 "using IntStream.adapt(Spliterator<Integer> s)"); 125 throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)"); 126 } 127 } 128 129 130 // Shape-specific methods 131 132 @Override 133 final StreamShape getOutputShape() { 134 return StreamShape.INT_VALUE; 135 } 136 137 @Override 138 final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper, 139 Spliterator<P_IN> spliterator, 140 boolean flattenTree, 141 IntFunction<Integer[]> generator) { 142 return Nodes.collectInt(helper, spliterator, flattenTree); 143 } 144 145 @Override 146 final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph, 147 Supplier<Spliterator<P_IN>> supplier, 148 boolean isParallel) { 149 return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel); 150 } 151 152 @Override 153 @SuppressWarnings("unchecked") 154 final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) { 155 return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier); 156 } 157 158 @Override 159 final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) { 160 Spliterator.OfInt spl = adapt(spliterator); 161 IntConsumer adaptedSink = adapt(sink); 162 do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); 163 } 164 165 @Override 166 final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown, 167 IntFunction<Integer[]> generator) { 168 return Nodes.intBuilder(exactSizeIfKnown); 169 } 170 171 172 // IntStream 173 174 @Override 175 public final PrimitiveIterator.OfInt iterator() { 176 return Spliterators.iterator(spliterator()); 177 } 178 179 @Override 180 public final Spliterator.OfInt spliterator() { 181 return adapt(super.spliterator()); 182 } 183 184 // Stateless intermediate ops from IntStream 185 186 @Override 187 public final LongStream asLongStream() { 188 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 189 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 190 @Override 191 Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 192 return new Sink.ChainedInt(sink) { 193 @Override 194 @SuppressWarnings("unchecked") 195 public void accept(int t) { 196 downstream.accept((long) t); 197 } 198 }; 199 } 200 }; 201 } 202 203 @Override 204 public final DoubleStream asDoubleStream() { 205 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 206 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 207 @Override 208 Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 209 return new Sink.ChainedInt(sink) { 210 @Override 211 @SuppressWarnings("unchecked") 212 public void accept(int t) { 213 downstream.accept((double) t); 214 } 215 }; 216 } 217 }; 218 } 219 220 @Override 221 public final Stream<Integer> boxed() { 222 return mapToObj(Integer::valueOf); 223 } 224 225 @Override 226 public final IntStream map(IntUnaryOperator mapper) { 227 Objects.requireNonNull(mapper); 228 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 229 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 230 @Override 231 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 232 return new Sink.ChainedInt(sink) { 233 @Override 234 public void accept(int t) { 235 downstream.accept(mapper.applyAsInt(t)); 236 } 237 }; 238 } 239 }; 240 } 241 242 @Override 243 public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) { 244 Objects.requireNonNull(mapper); 245 return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE, 246 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 247 @Override 248 Sink<Integer> opWrapSink(int flags, Sink<U> sink) { 249 return new Sink.ChainedInt(sink) { 250 @Override 251 @SuppressWarnings("unchecked") 252 public void accept(int t) { 253 downstream.accept(mapper.apply(t)); 254 } 255 }; 256 } 257 }; 258 } 259 260 @Override 261 public final LongStream mapToLong(IntToLongFunction mapper) { 262 Objects.requireNonNull(mapper); 263 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 264 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 265 @Override 266 Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 267 return new Sink.ChainedInt(sink) { 268 @Override 269 public void accept(int t) { 270 downstream.accept(mapper.applyAsLong(t)); 271 } 272 }; 273 } 274 }; 275 } 276 277 @Override 278 public final DoubleStream mapToDouble(IntToDoubleFunction mapper) { 279 Objects.requireNonNull(mapper); 280 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 281 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 282 @Override 283 Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 284 return new Sink.ChainedInt(sink) { 285 @Override 286 public void accept(int t) { 287 downstream.accept(mapper.applyAsDouble(t)); 288 } 289 }; 290 } 291 }; 292 } 293 294 @Override 295 public final IntStream flatMap(IntFunction<? extends IntStream> mapper) { 296 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 297 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 298 @Override 299 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 300 return new Sink.ChainedInt(sink) { 301 @Override 302 public void begin(long size) { 303 downstream.begin(-1); 304 } 305 306 @Override 307 public void accept(int t) { 308 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 309 IntStream result = mapper.apply(t); 310 if (result != null) 311 result.sequential().forEach(i -> downstream.accept(i)); 312 } 313 }; 314 } 315 }; 316 } 317 318 @Override 319 public IntStream unordered() { 320 if (!isOrdered()) 321 return this; 322 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) { 323 @Override 324 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 325 return sink; 326 } 327 }; 328 } 329 330 @Override 331 public final IntStream filter(IntPredicate predicate) { 332 Objects.requireNonNull(predicate); 333 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 334 StreamOpFlag.NOT_SIZED) { 335 @Override 336 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 337 return new Sink.ChainedInt(sink) { 338 @Override 339 public void begin(long size) { 340 downstream.begin(-1); 341 } 342 343 @Override 344 public void accept(int t) { 345 if (predicate.test(t)) 346 downstream.accept(t); 347 } 348 }; 349 } 350 }; 351 } 352 353 @Override 354 public final IntStream peek(IntConsumer consumer) { 355 Objects.requireNonNull(consumer); 356 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 357 0) { 358 @Override 359 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 360 return new Sink.ChainedInt(sink) { 361 @Override 362 public void accept(int t) { 363 consumer.accept(t); 364 downstream.accept(t); 365 } 366 }; 367 } 368 }; 369 } 370 371 // Stateful intermediate ops from IntStream 372 373 private IntStream slice(long skip, long limit) { 374 return SliceOps.makeInt(this, skip, limit); 375 } 376 377 @Override 378 public final IntStream limit(long maxSize) { 379 if (maxSize < 0) 380 throw new IllegalArgumentException(Long.toString(maxSize)); 381 return slice(0, maxSize); 382 } 383 384 @Override 385 public final IntStream substream(long startingOffset) { 386 if (startingOffset < 0) 387 throw new IllegalArgumentException(Long.toString(startingOffset)); 388 if (startingOffset == 0) 389 return this; 390 else 391 return slice(startingOffset, -1); 392 } 393 394 @Override 395 public final IntStream substream(long startingOffset, long endingOffset) { 396 if (startingOffset < 0 || endingOffset < startingOffset) 397 throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset)); 398 return slice(startingOffset, endingOffset - startingOffset); 399 } 400 401 @Override 402 public final IntStream sorted() { 403 return SortedOps.makeInt(this); 404 } 405 406 @Override 407 public final IntStream distinct() { 408 // While functional and quick to implement, this approach is not very efficient. 409 // An efficient version requires an int-specific map/set implementation. 410 return boxed().distinct().mapToInt(i -> i); 411 } 412 413 // Terminal ops from IntStream 414 415 @Override 416 public void forEach(IntConsumer action) { 417 evaluate(ForEachOps.makeInt(action, false)); 418 } 419 420 @Override 421 public void forEachOrdered(IntConsumer action) { 422 evaluate(ForEachOps.makeInt(action, true)); 423 } 424 425 @Override 426 public final int sum() { 427 return reduce(0, Integer::sum); 428 } 429 430 @Override 431 public final OptionalInt min() { 432 return reduce(Math::min); 433 } 434 435 @Override 436 public final OptionalInt max() { 437 return reduce(Math::max); 438 } 439 440 @Override 441 public final long count() { 442 return asLongStream().map(e -> 1L).sum(); 443 } 444 445 @Override 446 public final OptionalDouble average() { 447 long[] avg = collect(() -> new long[2], 448 (ll, i) -> { 449 ll[0]++; 450 ll[1] += i; 451 }, 452 (ll, rr) -> { 453 ll[0] += rr[0]; 454 ll[1] += rr[1]; 455 }); 456 return avg[0] > 0 457 ? OptionalDouble.of((double) avg[1] / avg[0]) 458 : OptionalDouble.empty(); 459 } 460 461 @Override 462 public final IntSummaryStatistics summaryStatistics() { 463 return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept, 464 IntSummaryStatistics::combine); 465 } 466 467 @Override 468 public final int reduce(int identity, IntBinaryOperator op) { 469 return evaluate(ReduceOps.makeInt(identity, op)); 470 } 471 472 @Override 473 public final OptionalInt reduce(IntBinaryOperator op) { 474 return evaluate(ReduceOps.makeInt(op)); 475 } 476 477 @Override 478 public final <R> R collect(Supplier<R> resultFactory, 479 ObjIntConsumer<R> accumulator, 480 BiConsumer<R, R> combiner) { 481 BinaryOperator<R> operator = (left, right) -> { 482 combiner.accept(left, right); 483 return left; 484 }; 485 return evaluate(ReduceOps.makeInt(resultFactory, accumulator, operator)); 486 } 487 488 @Override 489 public final boolean anyMatch(IntPredicate predicate) { 490 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY)); 491 } 492 493 @Override 494 public final boolean allMatch(IntPredicate predicate) { 495 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL)); 496 } 497 498 @Override 499 public final boolean noneMatch(IntPredicate predicate) { 500 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE)); 501 } 502 503 @Override 504 public final OptionalInt findFirst() { 505 return evaluate(FindOps.makeInt(true)); 506 } 507 508 @Override 509 public final OptionalInt findAny() { 510 return evaluate(FindOps.makeInt(false)); 511 } 512 513 @Override 514 public final int[] toArray() { 515 return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new)) 516 .asPrimitiveArray(); 517 } 518 519 // 520 521 /** 522 * Source stage of an IntStream. 523 * 524 * @param <E_IN> type of elements in the upstream source 525 * @since 1.8 526 */ 527 static class Head<E_IN> extends IntPipeline<E_IN> { 528 /** 529 * Constructor for the source stage of an IntStream. 530 * 531 * @param source {@code Supplier<Spliterator>} describing the stream 532 * source 533 * @param sourceFlags the source flags for the stream source, described 534 * in {@link StreamOpFlag} 535 * @param parallel {@code true} if the pipeline is parallel 536 */ 537 Head(Supplier<? extends Spliterator<Integer>> source, 538 int sourceFlags, boolean parallel) { 539 super(source, sourceFlags, parallel); 540 } 541 542 /** 543 * Constructor for the source stage of an IntStream. 544 * 545 * @param source {@code Spliterator} describing the stream source 546 * @param sourceFlags the source flags for the stream source, described 547 * in {@link StreamOpFlag} 548 * @param parallel {@code true} if the pipeline is parallel 549 */ 550 Head(Spliterator<Integer> source, 551 int sourceFlags, boolean parallel) { 552 super(source, sourceFlags, parallel); 553 } 554 555 @Override 556 final boolean opIsStateful() { 557 throw new UnsupportedOperationException(); 558 } 559 560 @Override 561 final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) { 562 throw new UnsupportedOperationException(); 563 } 564 565 // Optimized sequential terminal operations for the head of the pipeline 566 567 @Override 568 public void forEach(IntConsumer action) { 569 if (!isParallel()) { 570 adapt(sourceStageSpliterator()).forEachRemaining(action); 571 } 572 else { 573 super.forEach(action); 574 } 575 } 576 577 @Override 578 public void forEachOrdered(IntConsumer action) { 579 if (!isParallel()) { 580 adapt(sourceStageSpliterator()).forEachRemaining(action); 581 } 582 else { 583 super.forEachOrdered(action); 584 } 585 } 586 } 587 588 /** 589 * Base class for a stateless intermediate stage of an IntStream 590 * 591 * @param <E_IN> type of elements in the upstream source 592 * @since 1.8 593 */ 594 abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> { 595 /** 596 * Construct a new IntStream by appending a stateless intermediate 597 * operation to an existing stream. 598 * @param upstream The upstream pipeline stage 599 * @param inputShape The stream shape for the upstream pipeline stage 600 * @param opFlags Operation flags for the new stage 601 */ 602 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 603 StreamShape inputShape, 604 int opFlags) { 605 super(upstream, opFlags); 606 assert upstream.getOutputShape() == inputShape; 607 } 608 609 @Override 610 final boolean opIsStateful() { 611 return false; 612 } 613 } 614 615 /** 616 * Base class for a stateful intermediate stage of an IntStream. 617 * 618 * @param <E_IN> type of elements in the upstream source 619 * @since 1.8 620 */ 621 abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> { 622 /** 623 * Construct a new IntStream by appending a stateful intermediate 624 * operation to an existing stream. 625 * @param upstream The upstream pipeline stage 626 * @param inputShape The stream shape for the upstream pipeline stage 627 * @param opFlags Operation flags for the new stage 628 */ 629 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 630 StreamShape inputShape, 631 int opFlags) { 632 super(upstream, opFlags); 633 assert upstream.getOutputShape() == inputShape; 634 } 635 636 @Override 637 final boolean opIsStateful() { 638 return true; 639 } 640 641 @Override 642 abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 643 Spliterator<P_IN> spliterator, 644 IntFunction<Integer[]> generator); 645 } 646 }