1 /* 2 * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 /** 28 * An operation in a stream pipeline that takes a stream as input and produces 29 * a stream, possibly of a different type, as output. An intermediate operation 30 * has an input type and an output type (and, an associated input shape and 31 * output shape). An intermediate operation also has a set of <em>operation 32 * flags</em> that describes how it transforms characteristics of the stream 33 * (such as sortedness or size; see {@link StreamOpFlag}). 34 * 35 * <p>Intermediate operations are implemented in terms of <em>sink transforms 36 * </em>; given a {@code Sink} for the output type of the operation, produce a 37 * {@code Sink} for the input type of the operation, which, when fed with 38 * values, has the effect of implementing the desired operation on the input 39 * values and feeding them to the output sink. 40 * 41 * <p>Some intermediate operations are <em>stateful</em>. This means that the 42 * sinks they produce as a result of the above wrapping may maintain state from 43 * processing earlier elements. Stateful intermediate operations must implement 44 * the {@link StatefulOp} interface. Statefulness has an effect on how the 45 * operation can be parallelized. Stateless operations parallelize trivially 46 * because they are homomorphisms under concatenation: 47 * 48 * <pre> 49 * statelessOp(a || b) = statelessOp(a) || statelessOp(b) 50 * </pre> 51 * 52 * where {@code ||} denotes concatenation. Stateful operations may still be 53 * parallelizable, but are not amenable to the automatic parallelization of 54 * stateless operations. Accordingly, a stateful operation must provide its own 55 * parallel execution implementation 56 * ({@link StatefulOp#evaluateParallel(PipelineHelper)}). 57 * 58 * @apiNote 59 * As an example, consider the stream pipeline: 60 * <pre> 61 * int oldestBob = people.stream(). 62 * filter(p -> p.getFirstName.equals("Bob")). 63 * map(p -> p.getAge()).max(); 64 * </pre> 65 * 66 * <p>This pipeline has two intermediate operations, filter and map. The 67 * filtering operation has input and output types of {@code Person} (with input 68 * and output shape of {@code REFERENCE}), and the mapping operation has an 69 * input type of {@code Person} and an output type of {@code Integer} (with 70 * shape {@code INT_VALUE}.) When we construct a sink chain, the mapping 71 * operation will be asked to transform a {@code Sink.OfInt} which computes the 72 * maximum value into a {@code Sink} which accepts {@code Person} objects, and 73 * whose behavior is to take the supplied {@code Person}, call {@code getAge()} 74 * on it, and pass the resulting value to the downstream sink. This sink 75 * transform might be implement as: 76 * 77 * <pre> 78 * new Sink.ChainedReference<U>(sink) { 79 * public void accept(U u) { 80 * downstream.accept(mappingFunction.applyAsInt(u)); 81 * } 82 * } 83 * </pre> 84 * 85 * @param <E_IN> Type of input elements to the operation 86 * @param <E_OUT> Type of output elements to the operation 87 * @see TerminalOp 88 * @see StatefulOp 89 * @since 1.8 90 */ 91 interface IntermediateOp<E_IN, E_OUT> { 92 93 /** 94 * Gets the shape of the input type of this operation 95 * 96 * @implSpec The default returns {@code StreamShape.REFERENCE} 97 * @return Shape of the input type of this operation 98 */ 99 default StreamShape inputShape() { return StreamShape.REFERENCE; } 100 101 /** 102 * Gets the shape of the output type of this operation 103 * 104 * @implSpec The default returns {@code StreamShape.REFERENCE} 105 * @return Shape of the output type of this operation 106 */ 107 default StreamShape outputShape() { return StreamShape.REFERENCE; } 108 109 /** 110 * Gets the operation flags of this operation. 111 * 112 * @implSpec The default returns {@code 0} 113 * @return a bitmap describing the operation flags of this operation 114 * @see StreamOpFlag 115 */ 116 default int getOpFlags() { return 0; } 117 118 /** 119 * Returns whether this operation is stateful or not. If it is stateful, 120 * then the method {@link #evaluateParallel(PipelineHelper)} must be 121 * overridden. 122 * 123 * @implSpec The default implementation returns {@code false}. 124 * @return {@code true} if this operation is stateful 125 */ 126 default boolean isStateful() { return false; } 127 128 /** 129 * Accepts a {@code Sink} which will receive the results of this operation, 130 * and return a {@code Sink} which accepts elements of the input type of 131 * this operation and which performs the operation, passing the results to 132 * the provided {@code Sink}. 133 * 134 * <p>The implementation may use the {@code flags} parameter to optimize the 135 * sink wrapping. For example, if the input is already {@code DISTINCT}, 136 * the implementation for the {@code Stream#distinct()} method could just 137 * return the sink it was passed. 138 * 139 * @param flags The combined stream and operation flags up to, but not 140 * including, this operation. 141 * @param sink elements will be sent to this sink after the processing. 142 * @return a sink which will accept elements and perform the operation upon 143 * each element, passing the results (if any) to the provided 144 * {@code Sink}. 145 */ 146 Sink<E_IN> wrapSink(int flags, Sink<E_OUT> sink); 147 148 /** 149 * Performs a parallel evaluation of the operation using the specified 150 * {@code PipelineHelper}, which describes the stream source and upstream 151 * intermediate operations. Only called on stateful operations. If 152 * {@link #isStateful()} returns true then implementations must override the 153 * default implementation. 154 * 155 * @implSpec The default implementation throws an 156 * {@link UnsupportedOperationException} 157 * 158 * @param helper the pipeline helper 159 * @param <P_IN> the type of elements in the pipeline source 160 * @return a {@code Node} describing the result of the evaluation 161 */ 162 default <P_IN> Node<E_OUT> evaluateParallel(PipelineHelper<P_IN, E_OUT> helper) { 163 throw new UnsupportedOperationException("Parallel evaluation is not supported"); 164 } 165 }