--- /dev/null 2013-02-14 00:07:37.184020992 -0500 +++ new/src/share/classes/java/util/stream/IntermediateOp.java 2013-02-21 14:11:05.000000000 -0500 @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +/** + * An operation in a stream pipeline that takes a stream as input and produces a stream, possibly of a different + * type, as output. An intermediate operation has an input type and an output type (and, an associated input + * shape and output shape). An intermediate operation also has a set of operation flags that describes + * how the operation transforms characteristics of the stream (such as sortedness or size; see + * {@link StreamOpFlag}). + * + *

Intermediate operations are implemented in terms of sink transforms; given a {@code Sink} for the + * output type of the operation, produce a {@code Sink} for the input type of the operation, which, when fed with + * values, has the effect of implementing the desired operation on the input values and feeding them to the output + * sink. + * + *

Some intermediate operations are stateful. This means that the sinks they produce as a result of + * the above wrapping may maintain state from processing earlier elements. Stateful intermediate operations must + * implement the {@link StatefulOp} interface. + * + * @apiNote + * As an example, consider the stream pipeline: + *

+ *     int oldestBob = people.stream().filter(p -> p.getFirstName.equals("Bob")).map(p -> p.getAge()).max();
+ * 
+ * + *

This pipeline has two intermediate operations, filter and map. The filtering operation has input and output types + * of {@code Person} (with input and output shape of {@code REFERENCE}), and the mapping operation has an input + * type of {@code Person} and an output type of {@code Integer} (with shape {@code INT_VALUE}.) When we construct + * a sink chain, the mapping operation will be asked to transform a {@code Sink.OfInt} which computes the maximum + * value into a {@code Sink} which accepts {@code Person} objects, and whose behavior is to take the supplied + * {@code Person}, call {@code getAge()} on it, and pass the resulting value to the downstream sink. This sink + * transform might be implement as: + * + *

+ *     new Sink.ChainedReference(sink) {
+ *         public void accept(U u) {
+ *             downstream.accept(mappingFunction.applyAsInt(u));
+ *         }
+ *     }
+ * 
+ * + * @param Type of input elements to the operation + * @param Type of output elements to the operation + * @see TerminalOp + * @see StatefulOp + * @since 1.8 + */ +interface IntermediateOp { + + /** + * Get the shape of the input type of this operation + * @implSpec The default returns {@code StreamShape.REFERENCE} + * @return Shape of the input type of this operation + */ + default StreamShape inputShape() { return StreamShape.REFERENCE; } + + /** + * Get the shape of the output type of this operation + * @implSpec The default returns {@code StreamShape.REFERENCE} + * @return Shape of the output type of this operation + */ + default StreamShape outputShape() { return StreamShape.REFERENCE; } + + /** + * Get the operation flags of this operation. + * + * @implSpec The default returns {@code 0} + * @return a bitmap describing the operation flags of this operation + * @see StreamOpFlag + */ + default int getOpFlags() { return 0; } + + /** + * Return whether this operation is stateful or not. If it is stateful, then the method + * {@link #evaluateParallel(PipelineHelper)} must be overridden. + * + * @implSpec The default implementation returns {@code false}. + * + * @return {@code true} then operation is stateful + */ + default boolean isStateful() { return false; } + + /** + * Accept a {@code Sink} which will receive the results of this operation, and return a {@code Sink} + * which accepts elements of the input type of this operation and which performs the operation, passing + * the results to the provided {@code Sink}. + * + *

The implementation may use the {@code flags} parameter to optimize the sink wrapping. For example, if + * the input is already {@code DISTINCT}, the implementation for the {@code Stream#distinct()} method could + * just return the sink it was passed. + * + * @param flags The combined stream and operation flags up to, but not including, this operation. + * @param sink elements will be sent to this sink after the processing. + * @return a sink which will accept elements and perform the operation upon + * each element, passing the results (if any) to the provided {@code Sink}. + */ + Sink wrapSink(int flags, Sink sink); + + /** + * Perform a parallel evaluation of the operation using the specified {@code PipelineHelper}, which describes + * the stream source and upstream intermediate operations. Only called on stateful operations. + * If {@link #isStateful()} returns true then implementations must override the default implementation. + * + * @implSpec The default implementation throws an {@link UnsupportedOperationException} + * + * @param helper the pipeline helper + * @param the type of elements in the pipeline source + * @return a {@code Node} describing the result of the evaluation + */ + default Node evaluateParallel(PipelineHelper helper) { + throw new UnsupportedOperationException("Parallel evaluation is not supported"); + } +}