1 /*
   2  * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.function.Consumer;
  28 import java.util.function.DoubleConsumer;
  29 import java.util.function.IntConsumer;
  30 import java.util.function.LongConsumer;
  31 import java.util.Objects;
  32 
  33 /**
  34  * A {@code TerminalOp} that evaluates a stream pipeline and sends the output into a {@code TerminalSink}.
  35  * @param <T> The output type of the stream pipeline
  36  * @since 1.8
  37  */
  38 class ForEachOp<T> implements TerminalOp<T, Void> {
  39     protected final TerminalSink<T, Void> sink;
  40     protected final StreamShape shape;
  41 
  42     /**
  43      * Construct a {@code ForEachOp} that sends the stream output to the provided {@code TerminalSink}
  44      * @param sink The {@code TerminalSink} to send stream output to
  45      * @param shape The output shape of the stream pipeline
  46      */
  47     protected ForEachOp(TerminalSink<T, Void> sink, StreamShape shape) {
  48         this.shape = Objects.requireNonNull(shape);
  49         this.sink = Objects.requireNonNull(sink);
  50     }
  51 
  52     /** Specialization of {@code TerminalSink} with void result */
  53     protected interface VoidTerminalSink<T> extends TerminalSink<T, Void> {
  54         @Override
  55         default public Void getAndClearState() {
  56             return null;
  57         }
  58 
  59         /** Specialization of {@code TerminalSink} and {@code Sink.OfInt} with void result */
  60         public interface OfInt extends VoidTerminalSink<Integer>, Sink.OfInt { }
  61 
  62         /** Specialization of {@code TerminalSink} and {@code Sink.OfLong} with void result */
  63         public interface OfLong extends VoidTerminalSink<Long>, Sink.OfLong { }
  64 
  65         /** Specialization of {@code TerminalSink} and {@code Sink.OfDouble} with void result */
  66         public interface OfDouble extends VoidTerminalSink<Double>, Sink.OfDouble { }
  67     }
  68 
  69     /**
  70      * Construct a {@code ForEachOp} that reads from a {@code Stream} and sends the stream output to
  71      * the provided {@code Consumer}
  72      * @param consumer The {@code Consumer} to send stream output to
  73      */
  74     public static <T> ForEachOp<T> make(final Consumer<? super T> consumer) {
  75         return new ForEachOp<>((VoidTerminalSink<T>) consumer::accept, StreamShape.REFERENCE);
  76     }
  77 
  78     /**
  79      * Construct a {@code ForEachOp} that reads from an {@code IntStream} and sends the stream output to
  80      * the provided {@code IntConsumer}
  81      * @param consumer The {@code IntConsumer} to send stream output to
  82      */
  83     public static ForEachOp<Integer> make(final IntConsumer consumer) {
  84         return new ForEachOp<>((VoidTerminalSink.OfInt) consumer::accept, StreamShape.INT_VALUE);
  85     }
  86 
  87     /**
  88      * Construct a {@code ForEachOp} that reads from a {@code LongStream} and sends the stream output to
  89      * the provided {@code LongConsumer}
  90      * @param consumer The {@code LongConsumer} to send stream output to
  91      */
  92     public static ForEachOp<Long> make(final LongConsumer consumer) {
  93         return new ForEachOp<>((VoidTerminalSink.OfLong) consumer::accept, StreamShape.LONG_VALUE);
  94     }
  95 
  96     /**
  97      * Construct a {@code ForEachOp} that reads from a {@code DoubleStream} and sends the stream output to
  98      * the provided {@code DoubleConsumer}
  99      * @param consumer The {@code DoubleConsumer} to send stream output to
 100      */
 101     public static ForEachOp<Double> make(final DoubleConsumer consumer) {
 102         return new ForEachOp<>((VoidTerminalSink.OfDouble) consumer::accept, StreamShape.DOUBLE_VALUE);
 103     }
 104 
 105     @Override
 106     public int getOpFlags() {
 107         return StreamOpFlag.NOT_ORDERED;
 108     }
 109 
 110     @Override
 111     public StreamShape inputShape() {
 112         return shape;
 113     }
 114 
 115     @Override
 116     public <S> Void evaluateSequential(PipelineHelper<S, T> helper) {
 117         return helper.into(sink, helper.sourceSpliterator()).getAndClearState();
 118     }
 119 
 120     @Override
 121     public <S> Void evaluateParallel(PipelineHelper<S, T> helper) {
 122         OpUtils.parallelForEach(helper, helper.wrapSink(sink));
 123         return null;
 124     }
 125 }