/* * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.util.stream; /** * An operation in a stream pipeline that takes a stream as input and produces * a stream, possibly of a different type, as output. An intermediate operation * has an input type and an output type (and, an associated input shape and * output shape). An intermediate operation also has a set of operation * flags that describes how it transforms characteristics of the stream * (such as sortedness or size; see {@link StreamOpFlag}). * *

Intermediate operations are implemented in terms of sink transforms * ; given a {@code Sink} for the output type of the operation, produce a * {@code Sink} for the input type of the operation, which, when fed with * values, has the effect of implementing the desired operation on the input * values and feeding them to the output sink. * *

Some intermediate operations are stateful. This means that the * sinks they produce as a result of the above wrapping may maintain state from * processing earlier elements. Stateful intermediate operations must implement * the {@link StatefulOp} interface. Statefulness has an effect on how the * operation can be parallelized. Stateless operations parallelize trivially * because they are homomorphisms under concatenation: * *

 *     statelessOp(a || b) = statelessOp(a) || statelessOp(b)
 * 
* * where {@code ||} denotes concatenation. Stateful operations may still be * parallelizable, but are not amenable to the automatic parallelization of * stateless operations. Accordingly, a stateful operation must provide its own * parallel execution implementation * ({@link StatefulOp#evaluateParallel(PipelineHelper)}). * * @apiNote * As an example, consider the stream pipeline: *
 *     int oldestBob = people.stream().
 *                            filter(p -> p.getFirstName.equals("Bob")).
 *                            map(p -> p.getAge()).max();
 * 
* *

This pipeline has two intermediate operations, filter and map. The * filtering operation has input and output types of {@code Person} (with input * and output shape of {@code REFERENCE}), and the mapping operation has an * input type of {@code Person} and an output type of {@code Integer} (with * shape {@code INT_VALUE}.) When we construct a sink chain, the mapping * operation will be asked to transform a {@code Sink.OfInt} which computes the * maximum value into a {@code Sink} which accepts {@code Person} objects, and * whose behavior is to take the supplied {@code Person}, call {@code getAge()} * on it, and pass the resulting value to the downstream sink. This sink * transform might be implement as: * *

 *     new Sink.ChainedReference(sink) {
 *         public void accept(U u) {
 *             downstream.accept(mappingFunction.applyAsInt(u));
 *         }
 *     }
 * 
* * @param Type of input elements to the operation * @param Type of output elements to the operation * @see TerminalOp * @see StatefulOp * @since 1.8 */ interface IntermediateOp { /** * Gets the shape of the input type of this operation * * @implSpec The default returns {@code StreamShape.REFERENCE} * @return Shape of the input type of this operation */ default StreamShape inputShape() { return StreamShape.REFERENCE; } /** * Gets the shape of the output type of this operation * * @implSpec The default returns {@code StreamShape.REFERENCE} * @return Shape of the output type of this operation */ default StreamShape outputShape() { return StreamShape.REFERENCE; } /** * Gets the operation flags of this operation. * * @implSpec The default returns {@code 0} * @return a bitmap describing the operation flags of this operation * @see StreamOpFlag */ default int getOpFlags() { return 0; } /** * Returns whether this operation is stateful or not. If it is stateful, * then the method {@link #evaluateParallel(PipelineHelper)} must be * overridden. * * @implSpec The default implementation returns {@code false}. * @return {@code true} if this operation is stateful */ default boolean isStateful() { return false; } /** * Accepts a {@code Sink} which will receive the results of this operation, * and return a {@code Sink} which accepts elements of the input type of * this operation and which performs the operation, passing the results to * the provided {@code Sink}. * *

The implementation may use the {@code flags} parameter to optimize the * sink wrapping. For example, if the input is already {@code DISTINCT}, * the implementation for the {@code Stream#distinct()} method could just * return the sink it was passed. * * @param flags The combined stream and operation flags up to, but not * including, this operation. * @param sink elements will be sent to this sink after the processing. * @return a sink which will accept elements and perform the operation upon * each element, passing the results (if any) to the provided * {@code Sink}. */ Sink wrapSink(int flags, Sink sink); /** * Performs a parallel evaluation of the operation using the specified * {@code PipelineHelper}, which describes the stream source and upstream * intermediate operations. Only called on stateful operations. If * {@link #isStateful()} returns true then implementations must override the * default implementation. * * @implSpec The default implementation throws an * {@link UnsupportedOperationException} * * @param helper the pipeline helper * @param the type of elements in the pipeline source * @return a {@code Node} describing the result of the evaluation */ default Node evaluateParallel(PipelineHelper helper) { throw new UnsupportedOperationException("Parallel evaluation is not supported"); } }