1 /* 2 * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.function.Consumer; 28 import java.util.function.DoubleConsumer; 29 import java.util.function.IntConsumer; 30 import java.util.function.LongConsumer; 31 import java.util.Objects; 32 33 /** 34 * A {@code TerminalOp} that evaluates a stream pipeline and sends the output into a {@code TerminalSink}. 35 * @param <T> The output type of the stream pipeline 36 * @since 1.8 37 */ 38 class ForEachOp<T> implements TerminalOp<T, Void> { 39 protected final TerminalSink<T, Void> sink; 40 protected final StreamShape shape; 41 42 /** 43 * Construct a {@code ForEachOp} that sends the stream output to the provided {@code TerminalSink} 44 * @param sink The {@code TerminalSink} to send stream output to 45 * @param shape The output shape of the stream pipeline 46 */ 47 protected ForEachOp(TerminalSink<T, Void> sink, StreamShape shape) { 48 this.shape = Objects.requireNonNull(shape); 49 this.sink = Objects.requireNonNull(sink); 50 } 51 52 /** Specialization of {@code TerminalSink} with void result */ 53 protected interface VoidTerminalSink<T> extends TerminalSink<T, Void> { 54 @Override 55 default public Void getAndClearState() { 56 return null; 57 } 58 59 /** Specialization of {@code TerminalSink} and {@code Sink.OfInt} with void result */ 60 public interface OfInt extends VoidTerminalSink<Integer>, Sink.OfInt { } 61 62 /** Specialization of {@code TerminalSink} and {@code Sink.OfLong} with void result */ 63 public interface OfLong extends VoidTerminalSink<Long>, Sink.OfLong { } 64 65 /** Specialization of {@code TerminalSink} and {@code Sink.OfDouble} with void result */ 66 public interface OfDouble extends VoidTerminalSink<Double>, Sink.OfDouble { } 67 } 68 69 /** 70 * Construct a {@code ForEachOp} that reads from a {@code Stream} and sends the stream output to 71 * the provided {@code Consumer} 72 * @param consumer The {@code Consumer} to send stream output to 73 */ 74 public static <T> ForEachOp<T> make(final Consumer<? super T> consumer) { 75 return new ForEachOp<>((VoidTerminalSink<T>) consumer::accept, StreamShape.REFERENCE); 76 } 77 78 /** 79 * Construct a {@code ForEachOp} that reads from an {@code IntStream} and sends the stream output to 80 * the provided {@code IntConsumer} 81 * @param consumer The {@code IntConsumer} to send stream output to 82 */ 83 public static ForEachOp<Integer> make(final IntConsumer consumer) { 84 return new ForEachOp<>((VoidTerminalSink.OfInt) consumer::accept, StreamShape.INT_VALUE); 85 } 86 87 /** 88 * Construct a {@code ForEachOp} that reads from a {@code LongStream} and sends the stream output to 89 * the provided {@code LongConsumer} 90 * @param consumer The {@code LongConsumer} to send stream output to 91 */ 92 public static ForEachOp<Long> make(final LongConsumer consumer) { 93 return new ForEachOp<>((VoidTerminalSink.OfLong) consumer::accept, StreamShape.LONG_VALUE); 94 } 95 96 /** 97 * Construct a {@code ForEachOp} that reads from a {@code DoubleStream} and sends the stream output to 98 * the provided {@code DoubleConsumer} 99 * @param consumer The {@code DoubleConsumer} to send stream output to 100 */ 101 public static ForEachOp<Double> make(final DoubleConsumer consumer) { 102 return new ForEachOp<>((VoidTerminalSink.OfDouble) consumer::accept, StreamShape.DOUBLE_VALUE); 103 } 104 105 @Override 106 public int getOpFlags() { 107 return StreamOpFlag.NOT_ORDERED; 108 } 109 110 @Override 111 public StreamShape inputShape() { 112 return shape; 113 } 114 115 @Override 116 public <S> Void evaluateSequential(PipelineHelper<S, T> helper) { 117 return helper.into(sink, helper.sourceSpliterator()).getAndClearState(); 118 } 119 120 @Override 121 public <S> Void evaluateParallel(PipelineHelper<S, T> helper) { 122 OpUtils.parallelForEach(helper, helper.wrapSink(sink)); 123 return null; 124 } 125 }