1 /*
   2  * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Spliterator;
  28 import java.util.function.IntFunction;
  29 
  30 /**
  31  * Helper class for executing
  32  * <a href="package-summary.html#StreamPipelines">stream pipelines</a>,
  33  * capturing all of the information about a stream pipeline (source, output
  34  * shape, stream flags, parallelism, etc) in one place.
  35  *
  36  * @apiNote
  37  * A stream pipeline consists of a source, zero or more intermediate operations,
  38  * and a terminal operation.  Execution of the stream pipeline begins when the
  39  * terminal operation is executed.  A {@code PipelineHelper} describes the
  40  * portion of a stream pipeline including its source, some or all of its
  41  * intermediate operations, and certain information about the terminal (or
  42  * stateful) operation which follows the last intermediate operation described
  43  * by this {@code PipelineHelper}.  The {@code PipelineHelper} is passed to the
  44  * {@link TerminalOp#evaluateParallel(PipelineHelper)},
  45  * {@link TerminalOp#evaluateSequential(PipelineHelper)}, and
  46  * {@link StatefulOp#evaluateParallel(PipelineHelper)}, methods, which can use
  47  * the {@code PipelineHelper} to access the source {@code Spliterator} for the
  48  * pipeline, information about the pipeline such as input shape, output shape,
  49  * stream flags, and size, and use the helper methods such as
  50  * {@link #into(Sink, Spliterator)}, {@link #intoWrapped(Sink, Spliterator)},
  51  * and {@link #wrapSink(Sink)} to execute pipeline operations.
  52  *
  53  * @param <P_IN>  Type of input elements to the pipeline
  54  * @param <P_OUT> Type of output elements from the pipeline
  55  * @since 1.8
  56  */
  57 interface PipelineHelper<P_IN, P_OUT> {
  58 
  59     /**
  60      * Gets the {@code StreamShape} describing the input shape of the pipeline
  61      * @return The input shape of the pipeline
  62      */
  63     StreamShape getInputShape();
  64 
  65     /**
  66      * Gets the {@code StreamShape} describing the output shape of the pipeline
  67      * @return The output shape of the pipeline
  68      */
  69     StreamShape getOutputShape();
  70 
  71     /**
  72      * Gets the combined stream and operation flags for the output of the
  73      * pipeline.  This will incorporate stream flags from the stream source, all
  74      * the intermediate operations and the terminal operation.
  75      *
  76      * @return the combined stream and operation flags for the output of the
  77      *         pipeline
  78      * @see StreamOpFlag
  79      */
  80     int getStreamAndOpFlags();
  81 
  82     /**
  83      * Gets the operation flags for the terminal operation.
  84      *
  85      * @return the operation flags for the terminal operation.
  86      * @see StreamOpFlag
  87      */
  88     // @@@ Specifying this concisely is somewhat complicated since since the actual terminal operation flags
  89     //     are masked by StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK as the flags propagate upstream through parallel
  90     //     pipeline chunks
  91     int getTerminalOpFlags();
  92 
  93     /**
  94      * Returns whether this pipeline is parallel or sequential
  95      *
  96      * @return true if the pipeline is a parallel pipeline, otherwise false
  97      */
  98     boolean isParallel();
  99 
 100     /**
 101      * Gets the {@code Spliterator} for the source of the pipeline.  This
 102      * {@code Spliterator} reflects only the source elements, not the actions of
 103      * any of the intermediate stages.
 104      *
 105      * @return the source spliterator
 106      */
 107     Spliterator<P_IN> sourceSpliterator();
 108 
 109     /**
 110      * Returns the exact output size of the portion of the output resulting from
 111      * applying the pipeline stages described by this {@code PipelineHelper} to
 112      * the the portion of the input described by the provided
 113      * {@code Spliterator}, if known.  If not known or known infinite, will
 114      * return {@code -1}.
 115      *
 116      * @apiNote
 117      * The exact output size is known if the {@code Spliterator} has the
 118      * {@code SIZED} characteristic, and the operation flags
 119      * {@link StreamOpFlag#SIZED} is known on the combined stream and operation
 120      * flags.
 121      *
 122      * @param spliterator the spliterator describing the relevant portion of the
 123      *        source data
 124      * @return the exact size if known, or -1 if infinite or unknown
 125      */
 126     long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
 127 
 128     /**
 129      * Applies the pipeline stages described by this {@code PipelineHelper} to
 130      * the provided {@code Spliterator} and send the results to the provided
 131      * {@code Sink}.
 132      *
 133      * @implSpec
 134      * The implementation behaves as if:
 135      * <pre>
 136      *     intoWrapped(wrapSink(sink), spliterator);
 137      * </pre>
 138      *
 139      * @param sink the {@code Sink} to receive the results
 140      * @param spliterator the spliterator describing the portion of the source
 141      *        input to process
 142      */
 143     <S extends Sink<P_OUT>> S into(S sink, Spliterator<P_IN> spliterator);
 144 
 145     /**
 146      * Pushes elements obtained from the {@code Spliterator} into the provided
 147      * {@code Sink}.  If the stream pipeline is known to have short-circuiting
 148      * stages in it (see {@link StreamOpFlag#SHORT_CIRCUIT}), then the elements
 149      * are delivered as per {@link #intoWrappedWithCancel(Sink, Spliterator)}.
 150      *
 151      * @implSpec
 152      * This method conforms to the {@code Sink} protocol of calling
 153      * {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and
 154      * calling {@code Sink.end} after all elements have been pushed.
 155      *
 156      * @param wrappedSink the destination {@code Sink}
 157      * @param spliterator the source {@code Spliterator}
 158      */
 159     void intoWrapped(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
 160 
 161     /**
 162      * Pushes elements obtained from the {@code Spliterator} into the provided
 163      * {@code Sink}, checking {@link Sink#cancellationRequested()} after each
 164      * element, and stopping if cancellation is requested.
 165      *
 166      * @implSpec
 167      * This method conforms to the {@code Sink} protocol of calling
 168      * {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and
 169      * calling {@code Sink.end} after all elements have been pushed or if
 170      * cancellation is requested.
 171      *
 172      * @param wrappedSink the destination {@code Sink}
 173      * @param spliterator the source {@code Spliterator}
 174      */
 175     void intoWrappedWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
 176 
 177     /**
 178      * Takes a {@code Sink} that accepts elements of the output type of the
 179      * {@code PipelineHelper}, and wrap it with a {@code Sink} that accepts
 180      * elements of the input type and implements all the intermediate operations
 181      * described by this {@code PipelineHelper}, delivering the result into the
 182      * provided {@code Sink}.
 183      *
 184      * @param sink the {@code Sink} to receive the results
 185      * @return a {@code Sink} that implements the pipeline stages and sends
 186      *         results to the provided {@code Sink}
 187      */
 188     Sink<P_IN> wrapSink(Sink<P_OUT> sink);
 189 
 190     /**
 191      * Constructs a @{link Node.Builder} compatible with the output shape of
 192      * this {@code PipelineHelper}
 193      *
 194      * @param exactSizeIfKnown if >=0 then a builder will be created that has a
 195      *        fixed capacity of exactly sizeIfKnown elements; if < 0 then the
 196      *        builder has variable capacity.  A fixed capacity builder will fail
 197      *        if an element is added and the builder has reached capacity.
 198      * @return A {@code Node.Builder} compatible with the output shape of this
 199      *         {@code PipelineHelper}
 200      */
 201     Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown);
 202 
 203     /**
 204      * Collects all output elements resulting from applying the pipeline stages
 205      * to the source {@code Spliterator} into a {@code Node}.
 206      *
 207      * @implSpec
 208      * If the pipeline has no intermediate operations and the source is backed
 209      * by a {@code Node} then that {@code Node} will be returned or flattened
 210      * and then returned. This reduces copying for a pipeline consisting of a
 211      * stateful operation followed by a terminal operation that returns an
 212      * array, such as:
 213      * <pre>{@code
 214      *     stream.sorted().toArray();
 215      * }</pre>
 216      *
 217      * @param flatten if true and the pipeline is a parallel pipeline then the
 218      *        {@code Node} returned will contain no children, otherwise the
 219      *        {@code Node} may represent the root in a tree that reflects the
 220      *        shape of the computation tree.
 221      * @return the {@code Node} containing all output elements
 222      */
 223     Node<P_OUT> collectOutput(boolean flatten);
 224 
 225     /**
 226      * Gets an array factory associated with the output type of this pipeline.
 227      *
 228      * @return a factory for arrays of the output type of this pipeline.
 229      */
 230     IntFunction<P_OUT[]> arrayGenerator();
 231 
 232     /**
 233      * Collects all output elements resulting from the applying the pipeline
 234      * stages, plus an additional final stage that is an intermediate operation,
 235      * to the source {@code Spliterator} into a {code Node}.  The order of
 236      * output elements will respect the encounter order of the source stream,
 237      * and all computation will happen in the invoking thread.
 238      * <p>
 239      * Implementations of {@link StatefulOp#evaluateParallel(PipelineHelper)}
 240      * can defer to this method if a sequential implementation is acceptable.
 241      *
 242      * @implSpec
 243      * If the intermediate operation injects {@link StreamOpFlag#SHORT_CIRCUIT}
 244      * then this implementation must stop collecting output elements when the
 245      * sink returned from {@link IntermediateOp#wrapSink(int, Sink)} reports it
 246      * is cancelled.
 247      * <p>
 248      * If the intermediate operation preserves or injects
 249      * {@link StreamOpFlag#SIZED} and the output size of the pipeline is known
 250      * then this implementation may apply size optimizations since the output
 251      * size is known.
 252      *
 253      * @param op An {@code IntermediateOp} representing the final stage in the
 254      *        pipeline, typically a {@code StatefulOp}
 255      * @return A {@code Node} containing the output of the stream pipeline
 256      */
 257     Node<P_OUT> evaluateSequential(IntermediateOp<P_OUT, P_OUT> op);
 258 }