1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.IntSummaryStatistics; 28 import java.util.Objects; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.PrimitiveIterator; 32 import java.util.Spliterator; 33 import java.util.Spliterators; 34 import java.util.function.BiConsumer; 35 import java.util.function.BinaryOperator; 36 import java.util.function.IntBinaryOperator; 37 import java.util.function.IntConsumer; 38 import java.util.function.IntFunction; 39 import java.util.function.IntPredicate; 40 import java.util.function.IntToDoubleFunction; 41 import java.util.function.IntToLongFunction; 42 import java.util.function.IntUnaryOperator; 43 import java.util.function.ObjIntConsumer; 44 import java.util.function.Supplier; 45 46 /** 47 * Abstract base class for an intermediate pipeline stage or pipeline source 48 * stage implementing whose elements are of type {@code int}. 49 * 50 * @param <E_IN> type of elements in the upstream source 51 * @since 1.8 52 */ 53 abstract class IntPipeline<E_IN> 54 extends AbstractPipeline<E_IN, Integer, IntStream> 55 implements IntStream { 56 57 /** 58 * Constructor for the head of a stream pipeline. 59 * 60 * @param source {@code Supplier<Spliterator>} describing the stream source 61 * @param sourceFlags The source flags for the stream source, described in 62 * {@link StreamOpFlag} 63 * @param parallel {@code true} if the pipeline is parallel 64 */ 65 IntPipeline(Supplier<? extends Spliterator<Integer>> source, 66 int sourceFlags, boolean parallel) { 67 super(source, sourceFlags, parallel); 68 } 69 70 /** 71 * Constructor for the head of a stream pipeline. 72 * 73 * @param source {@code Spliterator} describing the stream source 74 * @param sourceFlags The source flags for the stream source, described in 75 * {@link StreamOpFlag} 76 * @param parallel {@code true} if the pipeline is parallel 77 */ 78 IntPipeline(Spliterator<Integer> source, 79 int sourceFlags, boolean parallel) { 80 super(source, sourceFlags, parallel); 81 } 82 83 /** 84 * Constructor for appending an intermediate operation onto an existing 85 * pipeline. 86 * 87 * @param upstream the upstream element source 88 * @param opFlags the operation flags for the new operation 89 */ 90 IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) { 91 super(upstream, opFlags); 92 } 93 94 /** 95 * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply 96 * by casting. 97 */ 98 private static IntConsumer adapt(Sink<Integer> sink) { 99 if (sink instanceof IntConsumer) { 100 return (IntConsumer) sink; 101 } 102 else { 103 if (Tripwire.ENABLED) 104 Tripwire.trip(AbstractPipeline.class, 105 "using IntStream.adapt(Sink<Integer> s)"); 106 return sink::accept; 107 } 108 } 109 110 /** 111 * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}. 112 * 113 * @implNote 114 * The implementation attempts to cast to a Spliterator.OfInt, and throws an 115 * exception if this cast is not possible. 116 */ 117 private static Spliterator.OfInt adapt(Spliterator<Integer> s) { 118 if (s instanceof Spliterator.OfInt) { 119 return (Spliterator.OfInt) s; 120 } 121 else { 122 if (Tripwire.ENABLED) 123 Tripwire.trip(AbstractPipeline.class, 124 "using IntStream.adapt(Spliterator<Integer> s)"); 125 throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)"); 126 } 127 } 128 129 130 // Shape-specific methods 131 132 @Override 133 final StreamShape getOutputShape() { 134 return StreamShape.INT_VALUE; 135 } 136 137 @Override 138 final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper, 139 Spliterator<P_IN> spliterator, 140 boolean flattenTree, 141 IntFunction<Integer[]> generator) { 142 return Nodes.collectInt(helper, spliterator, flattenTree); 143 } 144 145 @Override 146 final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph, 147 Supplier<Spliterator<P_IN>> supplier, 148 boolean isParallel) { 149 return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel); 150 } 151 152 @Override 153 final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) { 154 return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier); 155 } 156 157 @Override 158 final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) { 159 Spliterator.OfInt spl = adapt(spliterator); 160 IntConsumer adaptedSink = adapt(sink); 161 do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink)); 162 } 163 164 @Override 165 final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown, 166 IntFunction<Integer[]> generator) { 167 return Nodes.intBuilder(exactSizeIfKnown); 168 } 169 170 171 // IntStream 172 173 @Override 174 public final PrimitiveIterator.OfInt iterator() { 175 return Spliterators.iterator(spliterator()); 176 } 177 178 @Override 179 public final Spliterator.OfInt spliterator() { 180 return adapt(super.spliterator()); 181 } 182 183 // Stateless intermediate ops from IntStream 184 185 @Override 186 public final LongStream asLongStream() { 187 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 188 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 189 @Override 190 Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 191 return new Sink.ChainedInt(sink) { 192 @Override 193 public void accept(int t) { 194 downstream.accept((long) t); 195 } 196 }; 197 } 198 }; 199 } 200 201 @Override 202 public final DoubleStream asDoubleStream() { 203 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 204 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 205 @Override 206 Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 207 return new Sink.ChainedInt(sink) { 208 @Override 209 public void accept(int t) { 210 downstream.accept((double) t); 211 } 212 }; 213 } 214 }; 215 } 216 217 @Override 218 public final Stream<Integer> boxed() { 219 return mapToObj(Integer::valueOf); 220 } 221 222 @Override 223 public final IntStream map(IntUnaryOperator mapper) { 224 Objects.requireNonNull(mapper); 225 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 226 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 227 @Override 228 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 229 return new Sink.ChainedInt(sink) { 230 @Override 231 public void accept(int t) { 232 downstream.accept(mapper.applyAsInt(t)); 233 } 234 }; 235 } 236 }; 237 } 238 239 @Override 240 public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) { 241 Objects.requireNonNull(mapper); 242 return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE, 243 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 244 @Override 245 Sink<Integer> opWrapSink(int flags, Sink<U> sink) { 246 return new Sink.ChainedInt(sink) { 247 @Override 248 public void accept(int t) { 249 downstream.accept(mapper.apply(t)); 250 } 251 }; 252 } 253 }; 254 } 255 256 @Override 257 public final LongStream mapToLong(IntToLongFunction mapper) { 258 Objects.requireNonNull(mapper); 259 return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 260 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 261 @Override 262 Sink<Integer> opWrapSink(int flags, Sink<Long> sink) { 263 return new Sink.ChainedInt(sink) { 264 @Override 265 public void accept(int t) { 266 downstream.accept(mapper.applyAsLong(t)); 267 } 268 }; 269 } 270 }; 271 } 272 273 @Override 274 public final DoubleStream mapToDouble(IntToDoubleFunction mapper) { 275 Objects.requireNonNull(mapper); 276 return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE, 277 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { 278 @Override 279 Sink<Integer> opWrapSink(int flags, Sink<Double> sink) { 280 return new Sink.ChainedInt(sink) { 281 @Override 282 public void accept(int t) { 283 downstream.accept(mapper.applyAsDouble(t)); 284 } 285 }; 286 } 287 }; 288 } 289 290 @Override 291 public final IntStream flatMap(IntFunction<? extends IntStream> mapper) { 292 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 293 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { 294 @Override 295 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 296 return new Sink.ChainedInt(sink) { 297 @Override 298 public void begin(long size) { 299 downstream.begin(-1); 300 } 301 302 @Override 303 public void accept(int t) { 304 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it 305 IntStream result = mapper.apply(t); 306 if (result != null) 307 result.sequential().forEach(i -> downstream.accept(i)); 308 } 309 }; 310 } 311 }; 312 } 313 314 @Override 315 public IntStream unordered() { 316 if (!isOrdered()) 317 return this; 318 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) { 319 @Override 320 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 321 return sink; 322 } 323 }; 324 } 325 326 @Override 327 public final IntStream filter(IntPredicate predicate) { 328 Objects.requireNonNull(predicate); 329 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 330 StreamOpFlag.NOT_SIZED) { 331 @Override 332 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 333 return new Sink.ChainedInt(sink) { 334 @Override 335 public void begin(long size) { 336 downstream.begin(-1); 337 } 338 339 @Override 340 public void accept(int t) { 341 if (predicate.test(t)) 342 downstream.accept(t); 343 } 344 }; 345 } 346 }; 347 } 348 349 @Override 350 public final IntStream peek(IntConsumer consumer) { 351 Objects.requireNonNull(consumer); 352 return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, 353 0) { 354 @Override 355 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 356 return new Sink.ChainedInt(sink) { 357 @Override 358 public void accept(int t) { 359 consumer.accept(t); 360 downstream.accept(t); 361 } 362 }; 363 } 364 }; 365 } 366 367 // Stateful intermediate ops from IntStream 368 369 private IntStream slice(long skip, long limit) { 370 return SliceOps.makeInt(this, skip, limit); 371 } 372 373 @Override 374 public final IntStream limit(long maxSize) { 375 if (maxSize < 0) 376 throw new IllegalArgumentException(Long.toString(maxSize)); 377 return slice(0, maxSize); 378 } 379 380 @Override 381 public final IntStream substream(long startingOffset) { 382 if (startingOffset < 0) 383 throw new IllegalArgumentException(Long.toString(startingOffset)); 384 if (startingOffset == 0) 385 return this; 386 else 387 return slice(startingOffset, -1); 388 } 389 390 @Override 391 public final IntStream substream(long startingOffset, long endingOffset) { 392 if (startingOffset < 0 || endingOffset < startingOffset) 393 throw new IllegalArgumentException(String.format("substream(%d, %d)", startingOffset, endingOffset)); 394 return slice(startingOffset, endingOffset - startingOffset); 395 } 396 397 @Override 398 public final IntStream sorted() { 399 return SortedOps.makeInt(this); 400 } 401 402 @Override 403 public final IntStream distinct() { 404 // While functional and quick to implement, this approach is not very efficient. 405 // An efficient version requires an int-specific map/set implementation. 406 return boxed().distinct().mapToInt(i -> i); 407 } 408 409 // Terminal ops from IntStream 410 411 @Override 412 public void forEach(IntConsumer action) { 413 evaluate(ForEachOps.makeInt(action, false)); 414 } 415 416 @Override 417 public void forEachOrdered(IntConsumer action) { 418 evaluate(ForEachOps.makeInt(action, true)); 419 } 420 421 @Override 422 public final int sum() { 423 return reduce(0, Integer::sum); 424 } 425 426 @Override 427 public final OptionalInt min() { 428 return reduce(Math::min); 429 } 430 431 @Override 432 public final OptionalInt max() { 433 return reduce(Math::max); 434 } 435 436 @Override 437 public final long count() { 438 return asLongStream().map(e -> 1L).sum(); 439 } 440 441 @Override 442 public final OptionalDouble average() { 443 long[] avg = collect(() -> new long[2], 444 (ll, i) -> { 445 ll[0]++; 446 ll[1] += i; 447 }, 448 (ll, rr) -> { 449 ll[0] += rr[0]; 450 ll[1] += rr[1]; 451 }); 452 return avg[0] > 0 453 ? OptionalDouble.of((double) avg[1] / avg[0]) 454 : OptionalDouble.empty(); 455 } 456 457 @Override 458 public final IntSummaryStatistics summaryStatistics() { 459 return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept, 460 IntSummaryStatistics::combine); 461 } 462 463 @Override 464 public final int reduce(int identity, IntBinaryOperator op) { 465 return evaluate(ReduceOps.makeInt(identity, op)); 466 } 467 468 @Override 469 public final OptionalInt reduce(IntBinaryOperator op) { 470 return evaluate(ReduceOps.makeInt(op)); 471 } 472 473 @Override 474 public final <R> R collect(Supplier<R> resultFactory, 475 ObjIntConsumer<R> accumulator, 476 BiConsumer<R, R> combiner) { 477 BinaryOperator<R> operator = (left, right) -> { 478 combiner.accept(left, right); 479 return left; 480 }; 481 return evaluate(ReduceOps.makeInt(resultFactory, accumulator, operator)); 482 } 483 484 @Override 485 public final boolean anyMatch(IntPredicate predicate) { 486 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY)); 487 } 488 489 @Override 490 public final boolean allMatch(IntPredicate predicate) { 491 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL)); 492 } 493 494 @Override 495 public final boolean noneMatch(IntPredicate predicate) { 496 return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE)); 497 } 498 499 @Override 500 public final OptionalInt findFirst() { 501 return evaluate(FindOps.makeInt(true)); 502 } 503 504 @Override 505 public final OptionalInt findAny() { 506 return evaluate(FindOps.makeInt(false)); 507 } 508 509 @Override 510 public final int[] toArray() { 511 return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new)) 512 .asPrimitiveArray(); 513 } 514 515 // 516 517 /** 518 * Source stage of an IntStream. 519 * 520 * @param <E_IN> type of elements in the upstream source 521 * @since 1.8 522 */ 523 static class Head<E_IN> extends IntPipeline<E_IN> { 524 /** 525 * Constructor for the source stage of an IntStream. 526 * 527 * @param source {@code Supplier<Spliterator>} describing the stream 528 * source 529 * @param sourceFlags the source flags for the stream source, described 530 * in {@link StreamOpFlag} 531 * @param parallel {@code true} if the pipeline is parallel 532 */ 533 Head(Supplier<? extends Spliterator<Integer>> source, 534 int sourceFlags, boolean parallel) { 535 super(source, sourceFlags, parallel); 536 } 537 538 /** 539 * Constructor for the source stage of an IntStream. 540 * 541 * @param source {@code Spliterator} describing the stream source 542 * @param sourceFlags the source flags for the stream source, described 543 * in {@link StreamOpFlag} 544 * @param parallel {@code true} if the pipeline is parallel 545 */ 546 Head(Spliterator<Integer> source, 547 int sourceFlags, boolean parallel) { 548 super(source, sourceFlags, parallel); 549 } 550 551 @Override 552 final boolean opIsStateful() { 553 throw new UnsupportedOperationException(); 554 } 555 556 @Override 557 final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) { 558 throw new UnsupportedOperationException(); 559 } 560 561 // Optimized sequential terminal operations for the head of the pipeline 562 563 @Override 564 public void forEach(IntConsumer action) { 565 if (!isParallel()) { 566 adapt(sourceStageSpliterator()).forEachRemaining(action); 567 } 568 else { 569 super.forEach(action); 570 } 571 } 572 573 @Override 574 public void forEachOrdered(IntConsumer action) { 575 if (!isParallel()) { 576 adapt(sourceStageSpliterator()).forEachRemaining(action); 577 } 578 else { 579 super.forEachOrdered(action); 580 } 581 } 582 } 583 584 /** 585 * Base class for a stateless intermediate stage of an IntStream 586 * 587 * @param <E_IN> type of elements in the upstream source 588 * @since 1.8 589 */ 590 abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> { 591 /** 592 * Construct a new IntStream by appending a stateless intermediate 593 * operation to an existing stream. 594 * @param upstream The upstream pipeline stage 595 * @param inputShape The stream shape for the upstream pipeline stage 596 * @param opFlags Operation flags for the new stage 597 */ 598 StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, 599 StreamShape inputShape, 600 int opFlags) { 601 super(upstream, opFlags); 602 assert upstream.getOutputShape() == inputShape; 603 } 604 605 @Override 606 final boolean opIsStateful() { 607 return false; 608 } 609 } 610 611 /** 612 * Base class for a stateful intermediate stage of an IntStream. 613 * 614 * @param <E_IN> type of elements in the upstream source 615 * @since 1.8 616 */ 617 abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> { 618 /** 619 * Construct a new IntStream by appending a stateful intermediate 620 * operation to an existing stream. 621 * @param upstream The upstream pipeline stage 622 * @param inputShape The stream shape for the upstream pipeline stage 623 * @param opFlags Operation flags for the new stage 624 */ 625 StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, 626 StreamShape inputShape, 627 int opFlags) { 628 super(upstream, opFlags); 629 assert upstream.getOutputShape() == inputShape; 630 } 631 632 @Override 633 final boolean opIsStateful() { 634 return true; 635 } 636 637 @Override 638 abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 639 Spliterator<P_IN> spliterator, 640 IntFunction<Integer[]> generator); 641 } 642 }