1 /*
   2  * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Spliterator;
  28 import java.util.concurrent.CountedCompleter;
  29 import java.util.function.Supplier;
  30 
  31 /**
  32  * Utility methods useful in the implementation of stream operations ({@link IntermediateOp}, {@link StatefulOp},
  33  * {@link TerminalOp}).
  34  * @since 1.8
  35  */
  36 class OpUtils {
  37     private OpUtils() {
  38         throw new IllegalStateException("no instances");
  39     }
  40 
  41     /**
  42      * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations,
  43      * compute the contents of the stream, in parallel, and pass the elements to the provided {@code Sink}.  Elements
  44      * will be passed to the {@code Sink} in whatever thread and whatever order they become available, independent
  45      * of the stream's encounter order.
  46      *
  47      * @param helper A {@code PipelineHelper} describing the stream source and operations
  48      * @param sink A {@code Sink} into which to deposit resulting elements
  49      * @param <P_IN> The input type of the stream pipeline
  50      * @param <P_OUT> The output type of the stream pipeline
  51      */
  52     public static<P_IN, P_OUT> void parallelForEach(PipelineHelper<P_IN, P_OUT> helper,
  53                                                     Sink<P_IN> sink) {
  54         new ForEachTask<>(helper, sink).invoke();
  55     }
  56 
  57     /**
  58      * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations,
  59      * compute the contents of the stream, sequentially, and collect the results into a {@code Node}.  The order
  60      * of output elements will respect the encounter order of the source stream, and all computation will happen
  61      * in the invoking thread.
  62      *
  63      * @param op An {@code IntermediateOp} representing the final operation to be implemented, typically
  64      *           a {@code StatefulOp}
  65      * @param helper A {@code PipelineHelper} describing the stream source and operations
  66      * @param <P_IN> The input type of the stream pipeline
  67      * @param <P_OUT> The output type of the stream pipeline
  68      * @return A {@code Node} containing the output of the stream pipeline
  69      */
  70     static<P_IN, P_OUT> Node<P_OUT> evaluateSequential(IntermediateOp<P_OUT, P_OUT> op,
  71                                                        PipelineHelper<P_IN, P_OUT> helper) {
  72         long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.getOpFlags())
  73                            ? helper.exactOutputSizeIfKnown(helper.sourceSpliterator())
  74                            : -1;
  75         final Node.Builder<P_OUT> nb = helper.makeNodeBuilder(sizeIfKnown);
  76         Sink<P_OUT> opSink = op.wrapSink(helper.getStreamAndOpFlags(), nb);
  77 
  78         if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(op.getOpFlags()))
  79             helper.into(opSink, helper.sourceSpliterator());
  80         else
  81             helper.intoWrappedWithCancel(helper.wrapSink(opSink), helper.sourceSpliterator());
  82         return nb.build();
  83     }
  84 
  85     /**
  86      * Given a {@link PipelineHelper} describing a stream source and a sequence of intermediate stream operations,
  87      * perform a reduction operation on the stream, in parallel, using the supplied {@link AccumulatingSink} type.
  88      * The {@code AccumulatingSink} must represent an associative reducing operation.
  89      *
  90      * @param helper A {@code PipelineHelper} describing the stream source and operations
  91      * @param factory A {@code Supplier} for an {@code AccumulatingSink} which performs the associative reducing
  92      *                operation
  93      * @param <P_IN> The input type of the stream pipeline
  94      * @param <P_OUT> The output type of the stream pipeline
  95      * @param <R> The result type of the reduction operation
  96      * @param <S> The type of the {@code AccumulatingSink}
  97      * @return The result of the reduction operation
  98      */
  99     public static<P_IN, P_OUT, R, S extends AccumulatingSink<P_OUT, R, S>>
 100     R parallelReduce(PipelineHelper<P_IN, P_OUT> helper, Supplier<S> factory) {
 101         S sink = new ReduceTask<>(helper, factory).invoke();
 102         return sink.getAndClearState();
 103     }
 104 
 105     /**
 106      * A type of {@code TerminalSink} that implements an associative reducing operation on elements of type
 107      * {@code T} and producing a result of type {@code R}.
 108      *
 109      * @param <T> The type of input element to the combining operation
 110      * @param <R> The result type
 111      * @param <K> The type of the {@code AccumulatingSink}
 112      */
 113     public interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> extends TerminalSink<T, R> {
 114         public void combine(K other);
 115         public void clearState();
 116     }
 117 
 118 
 119     /** A {@code ForkJoinTask} for performing a parallel for-each operation */
 120     private static class ForEachTask<S, T> extends AbstractTask<S, T, Void, ForEachTask<S, T>> {
 121         // @@@ Extending AbstractTask here is potentially inefficient, since we don't really need to
 122         // keep track of the structure of the computation tree
 123         private final Sink<S> sink;
 124 
 125         private ForEachTask(PipelineHelper<S, T> helper, Sink<S> sink) {
 126             super(helper);
 127             this.sink = sink;
 128         }
 129 
 130         private ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator, Sink<S> sink) {
 131             super(parent, spliterator);
 132             this.sink = sink;
 133         }
 134 
 135         @Override
 136         public boolean suggestSplit() {
 137             boolean suggest = super.suggestSplit();
 138             if (StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()))
 139                 suggest = suggest && !sink.cancellationRequested();
 140             return suggest;
 141         }
 142 
 143         @Override
 144         protected ForEachTask<S, T> makeChild(Spliterator<S> spliterator) {
 145             return new ForEachTask<>(this, spliterator, sink);
 146         }
 147 
 148         @Override
 149         protected Void doLeaf() {
 150             helper.intoWrapped(sink, spliterator);
 151             return null;
 152         }
 153     }
 154 
 155     /** A {@code ForkJoinTask} for performing a parallel reduce operation */
 156     private static class ReduceTask<P_IN, P_OUT, R, S extends AccumulatingSink<P_OUT, R, S>>
 157             extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
 158         private final Supplier<S> sinkFactory;
 159 
 160         private ReduceTask(PipelineHelper<P_IN, P_OUT> helper, Supplier<S> sinkFactory) {
 161             super(helper);
 162             this.sinkFactory = sinkFactory;
 163         }
 164 
 165         private ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, Spliterator<P_IN> spliterator) {
 166             super(parent, spliterator);
 167             this.sinkFactory = parent.sinkFactory;
 168         }
 169 
 170         @Override
 171         protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
 172             return new ReduceTask<>(this, spliterator);
 173         }
 174 
 175         @Override
 176         protected S doLeaf() {
 177             return helper.into(sinkFactory.get(), spliterator);
 178         }
 179 
 180         @Override
 181         public void onCompletion(CountedCompleter caller) {
 182             if (!isLeaf()) {
 183                 ReduceTask<P_IN, P_OUT, R, S> child = children;
 184                 S result = child.getLocalResult();
 185                 child = child.nextSibling;
 186                 for (; child != null; child = child.nextSibling) {
 187                     S otherResult = child.getLocalResult();
 188                     result.combine(otherResult);
 189                     otherResult.clearState();
 190                 }
 191                 setLocalResult(result);
 192             }
 193         }
 194     }
 195 }