1 /* 2 * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Spliterator; 28 import java.util.function.IntFunction; 29 30 /** 31 * Helper class for executing <a href="package-summary.html#StreamPipelines">stream pipelines</a>, capturing 32 * all of the information about a stream pipeline (source, output shape, stream flags, parallelism, etc) 33 * in one place. 34 * 35 * @apiNote 36 * A stream pipeline consists of a source, zero or more intermediate operations, and a terminal operation. 37 * Execution of the stream pipeline begins when the terminal operation is executed. 38 * A {@code PipelineHelper} describes the portion of a stream pipeline including its source, some or all of its 39 * intermediate operations, and certain information about the terminal (or stateful) operation which follows 40 * the last intermediate operation described by this {@code PipelineHelper}.The 41 * {@code PipelineHelper} is passed to the {@link TerminalOp#evaluateParallel(PipelineHelper)}, 42 * {@link TerminalOp#evaluateSequential(PipelineHelper)}, and {@link StatefulOp#evaluateParallel(PipelineHelper)}, 43 * methods, which can use the {@code PipelineHelper} to access the source {@code Spliterator} for the pipeline, 44 * information about the pipeline such as input shape, output shape, stream flags, and size, and use the helper 45 * methods such as {@link #into(Sink, Spliterator)}, {@link #intoWrapped(Sink, Spliterator)}, 46 * and {@link #wrapSink(Sink)} to execute pipeline operations. 47 * 48 * @param <P_IN> Type of input elements to the pipeline 49 * @param <P_OUT> Type of output elements from the pipeline 50 * @since 1.8 51 */ 52 interface PipelineHelper<P_IN, P_OUT> { 53 54 /** 55 * Get the {@code StreamShape} describing the input shape of the pipeline 56 * @return The input shape of the pipeline 57 */ 58 StreamShape getInputShape(); 59 60 /** 61 * Get the {@code StreamShape} describing the output shape of the pipeline 62 * @return The output shape of the pipeline 63 */ 64 StreamShape getOutputShape(); 65 66 /** 67 * Get the combined stream and operation flags for the output of the pipeline. This will incorporate 68 * stream flags from the stream source, all the intermediate operations and the terminal operation. 69 * 70 * @return the combined stream and operation flags for the output of the pipeline 71 * @see StreamOpFlag 72 */ 73 int getStreamAndOpFlags(); 74 75 /** 76 * Get the operation flags for the terminal operation. 77 * 78 * @return the operation flags for the terminal operation. 79 * @see StreamOpFlag 80 */ 81 // @@@ Specifying this concisely is somewhat complicated since since the actual terminal operation flags 82 // are masked by StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK as the flags propagate upstream through parallel 83 // pipeline chunks 84 int getTerminalOpFlags(); 85 86 /** 87 * Returns whether this pipeline is parallel or sequential 88 * 89 * @return true if the pipeline is a parallel pipeline, otherwise false 90 */ 91 boolean isParallel(); 92 93 /** 94 * Get the {@code Spliterator} for the source of the pipeline. This {@code Spliterator} reflects 95 * only the source elements, not the actions of any of the intermediate stages. 96 * 97 * @return the source spliterator 98 */ 99 Spliterator<P_IN> sourceSpliterator(); 100 101 /** 102 * Returns the exact output size of the portion of the output resulting from applying the pipeline stages 103 * described by this {@code PipelineHelper} to the the portion of the input described by the provided 104 * {@code Spliterator}, if known. If not known or known infinite, will return {@code -1}. 105 * 106 * @apiNote 107 * The exact output size is known if the {@code Spliterator} has the {@code SIZED} characteristic, 108 * and the operation flags {@link StreamOpFlag#SIZED} is known on the combined stream and operation 109 * flags. 110 * 111 * @param spliterator the spliterator describing the relevant portion of the source data 112 * @return the exact size if known, or -1 if infinite or unknown 113 */ 114 long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator); 115 116 /** 117 * Apply the pipeline stages described by this {@code PipelineHelper} to the provided {@code Spliterator} and 118 * send the results to the provided {@code Sink}. 119 * 120 * @implSpec 121 * The implementation behaves as if: 122 * <pre> 123 * intoWrapped(wrapSink(sink), spliterator); 124 * </pre> 125 * 126 * @param sink the {@code Sink} to receive the results 127 * @param spliterator the spliterator describing the portion of the source input to process 128 */ 129 <S extends Sink<P_OUT>> S into(S sink, Spliterator<P_IN> spliterator); 130 131 /** 132 * Push elements obtained from the {@code Spliterator} into the provided {@code Sink}. If the stream 133 * pipeline is known to have short-circuiting stages in it (see {@link StreamOpFlag#SHORT_CIRCUIT}), then the 134 * elements are delivered as per {@link #intoWrappedWithCancel(Sink, Spliterator)}. 135 * 136 * @implSpec 137 * This method conforms to the {@code Sink} protocol of calling {@code Sink.begin} before pushing 138 * elements, via {@code Sink.accept}, and calling {@code Sink.end} after all elements have 139 * been pushed. 140 * 141 * @param wrappedSink the destination {@code Sink} 142 * @param spliterator the source {@code Spliterator} 143 */ 144 void intoWrapped(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator); 145 146 /** 147 * Push elements obtained from the {@code Spliterator} into the provided {@code Sink}, checking 148 * {@link Sink#cancellationRequested()} after each element, and stopping if cancellation is requested. 149 * 150 * @implSpec 151 * This method conforms to the {@code Sink} protocol of calling {@code Sink.begin} before pushing 152 * elements, via {@code Sink.accept}, and calling {@code Sink.end} after all elements have 153 * been pushed or if cancellation is requested. 154 * 155 * @param wrappedSink the destination {@code Sink} 156 * @param spliterator the source {@code Spliterator} 157 */ 158 void intoWrappedWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator); 159 160 /** 161 * Take a {@code Sink} that accepts elements of the output type of the {@code PipelineHelper}, and wrap it with 162 * a {@code Sink} that accepts elements of the input type and implements all the intermediate operations described 163 * by this {@code PipelineHelper}, delivering the result into the provided {@code Sink}. 164 * 165 * @param sink the {@code Sink} to receive the results 166 * @return a {@code Sink} that implements the pipeline stages and sends results to the provided {@code Sink} 167 */ 168 Sink<P_IN> wrapSink(Sink<P_OUT> sink); 169 170 /** 171 * Construct a @{link Node.Builder} compatible with the output shape of this {@code PipelineHelper} 172 * 173 * @param exactSizeIfKnown if >=0 then a builder will be created that has a fixed capacity of exactly 174 * sizeIfKnown elements; if < 0 then the builder has variable capacity. 175 * A fixed capacity builder will fail if an element is added and the builder has reached 176 * capacity. 177 * @return A {@code Node.Builder} compatible with the output shape of this {@code PipelineHelper} 178 */ 179 Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown); 180 181 /** 182 * Collect all output elements resulting from applying the pipeline stages to the source {@code Spliterator} 183 * into a {@code Node}. 184 * 185 * @implSpec 186 * If the pipeline has no intermediate operations and the source is backed by a {@code Node} then 187 * that {@code Node} will be returned or flattened and then returned. This reduces copying for a pipeline 188 * consisting of a stateful operation followed by a terminal operation that returns an array, such as: 189 * <pre>{@code 190 * stream.sorted().toArray(); 191 * }</pre> 192 * 193 * @param flatten if true and the pipeline is a parallel pipeline then the {@code Node} returned 194 * will contain no children, otherwise the {@code Node} may represent the root in a 195 * tree that reflects the shape of the computation tree. 196 * @return the {@code Node} containing all output elements 197 */ 198 Node<P_OUT> collectOutput(boolean flatten); 199 200 /** 201 * Get an array factory associated with the output type of this pipeline. 202 * 203 * @return a factory for arrays of the output type of this pipeline. 204 */ 205 IntFunction<P_OUT[]> arrayGenerator(); 206 }