1 /* 2 * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Spliterator; 28 import java.util.function.IntFunction; 29 30 /** 31 * Helper class for executing 32 * <a href="package-summary.html#StreamPipelines">stream pipelines</a>, 33 * capturing all of the information about a stream pipeline (source, output 34 * shape, stream flags, parallelism, etc) in one place. 35 * 36 * @apiNote 37 * A stream pipeline consists of a source, zero or more intermediate operations, 38 * and a terminal operation. Execution of the stream pipeline begins when the 39 * terminal operation is executed. A {@code PipelineHelper} describes the 40 * portion of a stream pipeline including its source, some or all of its 41 * intermediate operations, and certain information about the terminal (or 42 * stateful) operation which follows the last intermediate operation described 43 * by this {@code PipelineHelper}. The {@code PipelineHelper} is passed to the 44 * {@link TerminalOp#evaluateParallel(PipelineHelper)}, 45 * {@link TerminalOp#evaluateSequential(PipelineHelper)}, and 46 * {@link StatefulOp#evaluateParallel(PipelineHelper)}, methods, which can use 47 * the {@code PipelineHelper} to access the source {@code Spliterator} for the 48 * pipeline, information about the pipeline such as input shape, output shape, 49 * stream flags, and size, and use the helper methods such as 50 * {@link #into(Sink, Spliterator)}, {@link #intoWrapped(Sink, Spliterator)}, 51 * and {@link #wrapSink(Sink)} to execute pipeline operations. 52 * 53 * @param <P_IN> Type of input elements to the pipeline 54 * @param <P_OUT> Type of output elements from the pipeline 55 * @since 1.8 56 */ 57 interface PipelineHelper<P_IN, P_OUT> { 58 59 /** 60 * Gets the {@code StreamShape} describing the input shape of the pipeline 61 * @return The input shape of the pipeline 62 */ 63 StreamShape getInputShape(); 64 65 /** 66 * Gets the {@code StreamShape} describing the output shape of the pipeline 67 * @return The output shape of the pipeline 68 */ 69 StreamShape getOutputShape(); 70 71 /** 72 * Gets the combined stream and operation flags for the output of the 73 * pipeline. This will incorporate stream flags from the stream source, all 74 * the intermediate operations and the terminal operation. 75 * 76 * @return the combined stream and operation flags for the output of the 77 * pipeline 78 * @see StreamOpFlag 79 */ 80 int getStreamAndOpFlags(); 81 82 /** 83 * Gets the operation flags for the terminal operation. 84 * 85 * @return the operation flags for the terminal operation. 86 * @see StreamOpFlag 87 */ 88 // @@@ Specifying this concisely is somewhat complicated since since the actual terminal operation flags 89 // are masked by StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK as the flags propagate upstream through parallel 90 // pipeline chunks 91 int getTerminalOpFlags(); 92 93 /** 94 * Returns whether this pipeline is parallel or sequential 95 * 96 * @return true if the pipeline is a parallel pipeline, otherwise false 97 */ 98 boolean isParallel(); 99 100 /** 101 * Gets the {@code Spliterator} for the source of the pipeline. This 102 * {@code Spliterator} reflects only the source elements, not the actions of 103 * any of the intermediate stages. 104 * 105 * @return the source spliterator 106 */ 107 Spliterator<P_IN> sourceSpliterator(); 108 109 /** 110 * Returns the exact output size of the portion of the output resulting from 111 * applying the pipeline stages described by this {@code PipelineHelper} to 112 * the the portion of the input described by the provided 113 * {@code Spliterator}, if known. If not known or known infinite, will 114 * return {@code -1}. 115 * 116 * @apiNote 117 * The exact output size is known if the {@code Spliterator} has the 118 * {@code SIZED} characteristic, and the operation flags 119 * {@link StreamOpFlag#SIZED} is known on the combined stream and operation 120 * flags. 121 * 122 * @param spliterator the spliterator describing the relevant portion of the 123 * source data 124 * @return the exact size if known, or -1 if infinite or unknown 125 */ 126 long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator); 127 128 /** 129 * Applies the pipeline stages described by this {@code PipelineHelper} to 130 * the provided {@code Spliterator} and send the results to the provided 131 * {@code Sink}. 132 * 133 * @implSpec 134 * The implementation behaves as if: 135 * <pre> 136 * intoWrapped(wrapSink(sink), spliterator); 137 * </pre> 138 * 139 * @param sink the {@code Sink} to receive the results 140 * @param spliterator the spliterator describing the portion of the source 141 * input to process 142 */ 143 <S extends Sink<P_OUT>> S into(S sink, Spliterator<P_IN> spliterator); 144 145 /** 146 * Pushes elements obtained from the {@code Spliterator} into the provided 147 * {@code Sink}. If the stream pipeline is known to have short-circuiting 148 * stages in it (see {@link StreamOpFlag#SHORT_CIRCUIT}), then the elements 149 * are delivered as per {@link #intoWrappedWithCancel(Sink, Spliterator)}. 150 * 151 * @implSpec 152 * This method conforms to the {@code Sink} protocol of calling 153 * {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and 154 * calling {@code Sink.end} after all elements have been pushed. 155 * 156 * @param wrappedSink the destination {@code Sink} 157 * @param spliterator the source {@code Spliterator} 158 */ 159 void intoWrapped(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator); 160 161 /** 162 * Pushes elements obtained from the {@code Spliterator} into the provided 163 * {@code Sink}, checking {@link Sink#cancellationRequested()} after each 164 * element, and stopping if cancellation is requested. 165 * 166 * @implSpec 167 * This method conforms to the {@code Sink} protocol of calling 168 * {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and 169 * calling {@code Sink.end} after all elements have been pushed or if 170 * cancellation is requested. 171 * 172 * @param wrappedSink the destination {@code Sink} 173 * @param spliterator the source {@code Spliterator} 174 */ 175 void intoWrappedWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator); 176 177 /** 178 * Takes a {@code Sink} that accepts elements of the output type of the 179 * {@code PipelineHelper}, and wrap it with a {@code Sink} that accepts 180 * elements of the input type and implements all the intermediate operations 181 * described by this {@code PipelineHelper}, delivering the result into the 182 * provided {@code Sink}. 183 * 184 * @param sink the {@code Sink} to receive the results 185 * @return a {@code Sink} that implements the pipeline stages and sends 186 * results to the provided {@code Sink} 187 */ 188 Sink<P_IN> wrapSink(Sink<P_OUT> sink); 189 190 /** 191 * Constructs a @{link Node.Builder} compatible with the output shape of 192 * this {@code PipelineHelper} 193 * 194 * @param exactSizeIfKnown if >=0 then a builder will be created that has a 195 * fixed capacity of exactly sizeIfKnown elements; if < 0 then the 196 * builder has variable capacity. A fixed capacity builder will fail 197 * if an element is added and the builder has reached capacity. 198 * @return A {@code Node.Builder} compatible with the output shape of this 199 * {@code PipelineHelper} 200 */ 201 Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown); 202 203 /** 204 * Collects all output elements resulting from applying the pipeline stages 205 * to the source {@code Spliterator} into a {@code Node}. 206 * 207 * @implSpec 208 * If the pipeline has no intermediate operations and the source is backed 209 * by a {@code Node} then that {@code Node} will be returned or flattened 210 * and then returned. This reduces copying for a pipeline consisting of a 211 * stateful operation followed by a terminal operation that returns an 212 * array, such as: 213 * <pre>{@code 214 * stream.sorted().toArray(); 215 * }</pre> 216 * 217 * @param flatten if true and the pipeline is a parallel pipeline then the 218 * {@code Node} returned will contain no children, otherwise the 219 * {@code Node} may represent the root in a tree that reflects the 220 * shape of the computation tree. 221 * @return the {@code Node} containing all output elements 222 */ 223 Node<P_OUT> collectOutput(boolean flatten); 224 225 /** 226 * Gets an array factory associated with the output type of this pipeline. 227 * 228 * @return a factory for arrays of the output type of this pipeline. 229 */ 230 IntFunction<P_OUT[]> arrayGenerator(); 231 232 /** 233 * Collects all output elements resulting from the applying the pipeline 234 * stages, plus an additional final stage that is an intermediate operation, 235 * to the source {@code Spliterator} into a {code Node}. The order of 236 * output elements will respect the encounter order of the source stream, 237 * and all computation will happen in the invoking thread. 238 * <p> 239 * Implementations of {@link StatefulOp#evaluateParallel(PipelineHelper)} 240 * can defer to this method if a sequential implementation is acceptable. 241 * 242 * @implSpec 243 * If the intermediate operation injects {@link StreamOpFlag#SHORT_CIRCUIT} 244 * then this implementation must stop collecting output elements when the 245 * sink returned from {@link IntermediateOp#wrapSink(int, Sink)} reports it 246 * is cancelled. 247 * <p> 248 * If the intermediate operation preserves or injects 249 * {@link StreamOpFlag#SIZED} and the output size of the pipeline is known 250 * then this implementation may apply size optimizations since the output 251 * size is known. 252 * 253 * @param op An {@code IntermediateOp} representing the final stage in the 254 * pipeline, typically a {@code StatefulOp} 255 * @return A {@code Node} containing the output of the stream pipeline 256 */ 257 Node<P_OUT> evaluateSequential(IntermediateOp<P_OUT, P_OUT> op); 258 }