1 /*
   2  * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Spliterator;
  28 import java.util.function.IntFunction;
  29 
  30 /**
  31  * Helper class for executing  <a href="package-summary.html#StreamPipelines">stream pipelines</a>, capturing
  32  * all of the information about a stream pipeline (source, output shape, stream flags, parallelism, etc)
  33  * in one place.
  34  *
  35  * @apiNote
  36  * A stream pipeline consists of a source, zero or more intermediate operations, and a terminal operation.
  37  * Execution of the stream pipeline begins when the terminal operation is executed.
  38  * A {@code PipelineHelper} describes the portion of a stream pipeline including its source, some or all of its
  39  * intermediate operations, and certain information about the terminal (or stateful) operation which follows
  40  * the last intermediate operation described by this {@code PipelineHelper}.The
  41  * {@code PipelineHelper} is passed to the {@link TerminalOp#evaluateParallel(PipelineHelper)},
  42  * {@link TerminalOp#evaluateSequential(PipelineHelper)}, and {@link StatefulOp#evaluateParallel(PipelineHelper)},
  43  * methods, which can use the {@code PipelineHelper} to access the source {@code Spliterator} for the pipeline,
  44  * information about the pipeline such as input shape, output shape, stream flags, and size, and use the helper
  45  * methods such as {@link #into(Sink, Spliterator)}, {@link #intoWrapped(Sink, Spliterator)},
  46  * and {@link #wrapSink(Sink)} to execute pipeline operations.
  47  *
  48  * @param <P_IN>  Type of input elements to the pipeline
  49  * @param <P_OUT> Type of output elements from the pipeline
  50  * @since 1.8
  51  */
  52 interface PipelineHelper<P_IN, P_OUT> {
  53 
  54     /**
  55      * Get the {@code StreamShape} describing the input shape of the pipeline
  56      * @return The input shape of the pipeline
  57      */
  58     StreamShape getInputShape();
  59 
  60     /**
  61      * Get the {@code StreamShape} describing the output shape of the pipeline
  62      * @return The output shape of the pipeline
  63      */
  64     StreamShape getOutputShape();
  65 
  66     /**
  67      * Get the combined stream and operation flags for the output of the pipeline.  This will incorporate
  68      * stream flags from the stream source, all the intermediate operations and the terminal operation.
  69      *
  70      * @return the combined stream and operation flags for the output of the pipeline
  71      * @see StreamOpFlag
  72      */
  73     int getStreamAndOpFlags();
  74 
  75     /**
  76      * Get the operation flags for the terminal operation.
  77      *
  78      * @return the operation flags for the terminal operation.
  79      * @see StreamOpFlag
  80      */
  81     // @@@ Specifying this concisely is somewhat complicated since since the actual terminal operation flags
  82     //     are masked by StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK as the flags propagate upstream through parallel
  83     //     pipeline chunks
  84     int getTerminalOpFlags();
  85 
  86     /**
  87      * Returns whether this pipeline is parallel or sequential
  88      *
  89      * @return true if the pipeline is a parallel pipeline, otherwise false
  90      */
  91     boolean isParallel();
  92 
  93     /**
  94      * Get the {@code Spliterator} for the source of the pipeline.  This {@code Spliterator} reflects
  95      * only the source elements, not the actions of any of the intermediate stages.
  96      *
  97      * @return the source spliterator
  98      */
  99     Spliterator<P_IN> sourceSpliterator();
 100 
 101     /**
 102      * Returns the exact output size of the portion of the output resulting from applying the pipeline stages
 103      * described by this {@code PipelineHelper} to the the portion of the input described by the provided
 104      * {@code Spliterator}, if known.  If not known or known infinite, will return {@code -1}.
 105      *
 106      * @apiNote
 107      * The exact output size is known if the {@code Spliterator} has the {@code SIZED} characteristic,
 108      * and the operation flags {@link StreamOpFlag#SIZED} is known on the combined stream and operation
 109      * flags.
 110      *
 111      * @param spliterator the spliterator describing the relevant portion of the source data
 112      * @return the exact size if known, or -1 if infinite or unknown
 113      */
 114     long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
 115 
 116     /**
 117      * Apply the pipeline stages described by this {@code PipelineHelper} to the provided {@code Spliterator} and
 118      * send the results to the provided {@code Sink}.
 119      *
 120      * @implSpec
 121      * The implementation behaves as if:
 122      * <pre>
 123      *     intoWrapped(wrapSink(sink), spliterator);
 124      * </pre>
 125      *
 126      * @param sink the {@code Sink} to receive the results
 127      * @param spliterator the spliterator describing the portion of the source input to process
 128      */
 129     <S extends Sink<P_OUT>> S into(S sink, Spliterator<P_IN> spliterator);
 130 
 131     /**
 132      * Push elements obtained from the {@code Spliterator} into the provided {@code Sink}.  If the stream
 133      * pipeline is known to have short-circuiting stages in it (see {@link StreamOpFlag#SHORT_CIRCUIT}), then the
 134      * elements are delivered as per {@link #intoWrappedWithCancel(Sink, Spliterator)}.
 135      *
 136      * @implSpec
 137      * This method conforms to the {@code Sink} protocol of calling {@code Sink.begin} before pushing
 138      * elements, via {@code Sink.accept}, and calling {@code Sink.end} after all elements have
 139      * been pushed.
 140      *
 141      * @param wrappedSink the destination {@code Sink}
 142      * @param spliterator the source {@code Spliterator}
 143      */
 144     void intoWrapped(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
 145 
 146     /**
 147      * Push elements obtained from the {@code Spliterator} into the provided {@code Sink}, checking
 148      * {@link Sink#cancellationRequested()} after each element, and stopping if cancellation is requested.
 149      *
 150      * @implSpec
 151      * This method conforms to the {@code Sink} protocol of calling {@code Sink.begin} before pushing
 152      * elements, via {@code Sink.accept}, and calling {@code Sink.end} after all elements have
 153      * been pushed or if cancellation is requested.
 154      *
 155      * @param wrappedSink the destination {@code Sink}
 156      * @param spliterator the source {@code Spliterator}
 157      */
 158     void intoWrappedWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
 159 
 160     /**
 161      * Take a {@code Sink} that accepts elements of the output type of the {@code PipelineHelper}, and wrap it with
 162      * a {@code Sink} that accepts elements of the input type and implements all the intermediate operations described
 163      * by this {@code PipelineHelper}, delivering the result into the provided {@code Sink}.
 164      *
 165      * @param sink the {@code Sink} to receive the results
 166      * @return a {@code Sink} that implements the pipeline stages and sends results to the provided {@code Sink}
 167      */
 168     Sink<P_IN> wrapSink(Sink<P_OUT> sink);
 169 
 170     /**
 171      * Construct a @{link Node.Builder} compatible with the output shape of this {@code PipelineHelper}
 172      *
 173      * @param exactSizeIfKnown if >=0 then a builder will be created that has a fixed capacity of exactly
 174      *                         sizeIfKnown elements; if < 0 then the builder has variable capacity.
 175      *                         A fixed capacity builder will fail if an element is added and the builder has reached
 176      *                         capacity.
 177      * @return A {@code Node.Builder} compatible with the output shape of this {@code PipelineHelper}
 178      */
 179     Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown);
 180 
 181     /**
 182      * Collect all output elements resulting from applying the pipeline stages to the source {@code Spliterator}
 183      * into a {@code Node}.
 184      *
 185      * @implSpec
 186      * If the pipeline has no intermediate operations and the source is backed by a {@code Node} then
 187      * that {@code Node} will be returned or flattened and then returned. This reduces copying for a pipeline
 188      * consisting of a stateful operation followed by a terminal operation that returns an array, such as:
 189      * <pre>{@code
 190      *     stream.sorted().toArray();
 191      * }</pre>
 192      *
 193      * @param flatten if true and the pipeline is a parallel pipeline then the {@code Node} returned
 194      *                will contain no children, otherwise the {@code Node} may represent the root in a
 195      *                tree that reflects the shape of the computation tree.
 196      * @return the {@code Node} containing all output elements
 197      */
 198     Node<P_OUT> collectOutput(boolean flatten);
 199 
 200     /**
 201      * Get an array factory associated with the output type of this pipeline.
 202      *
 203      * @return a factory for arrays of the output type of this pipeline.
 204      */
 205     IntFunction<P_OUT[]> arrayGenerator();
 206 }