1 /*
   2  * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 /**
  28  * An operation in a stream pipeline that takes a stream as input and produces
  29  * a stream, possibly of a different type, as output.  An intermediate operation
  30  * has an input type and an output type (and, an associated input shape and
  31  * output shape).  An intermediate operation also has a set of <em>operation
  32  * flags</em> that describes how it transforms characteristics of the stream
  33  * (such as sortedness or size; see {@link StreamOpFlag}).
  34  *
  35  * <p>Intermediate operations are implemented in terms of <em>sink transforms
  36  * </em>; given a {@code Sink} for the output type of the operation, produce a
  37  * {@code Sink} for the input type of the operation, which, when fed with
  38  * values, has the effect of implementing the desired operation on the input
  39  * values and feeding them to the output sink.
  40  *
  41  * <p>Some intermediate operations are <em>stateful</em>.  This means that the
  42  * sinks they produce as a result of the above wrapping may maintain state from
  43  * processing earlier elements.  Stateful intermediate operations must implement
  44  * the {@link StatefulOp} interface.  Statefulness has an effect on how the
  45  * operation can be parallelized.  Stateless operations parallelize trivially
  46  * because they are homomorphisms under concatenation:
  47  *
  48  * <pre>
  49  *     statelessOp(a || b) = statelessOp(a) || statelessOp(b)
  50  * </pre>
  51  *
  52  * where {@code ||} denotes concatenation.  Stateful operations may still be
  53  * parallelizable, but are not amenable to the automatic parallelization of
  54  * stateless operations.  Accordingly, a stateful operation must provide its own
  55  * parallel execution implementation
  56  * ({@link StatefulOp#evaluateParallel(PipelineHelper)}).
  57  *
  58  * @apiNote
  59  * As an example, consider the stream pipeline:
  60  * <pre>
  61  *     int oldestBob = people.stream().
  62  *                            filter(p -> p.getFirstName.equals("Bob")).
  63  *                            map(p -> p.getAge()).max();
  64  * </pre>
  65  *
  66  * <p>This pipeline has two intermediate operations, filter and map.  The
  67  * filtering operation has input and output types of {@code Person} (with input
  68  * and output shape of {@code REFERENCE}), and the mapping operation has an
  69  * input type of {@code Person} and an output type of {@code Integer} (with
  70  * shape {@code INT_VALUE}.)  When we construct a sink chain, the mapping
  71  * operation will be asked to transform a {@code Sink.OfInt} which computes the
  72  * maximum value into a {@code Sink} which accepts {@code Person} objects, and
  73  * whose behavior is to take the supplied {@code Person}, call {@code getAge()}
  74  * on it, and pass the resulting value to the downstream sink.  This sink
  75  * transform might be implement as:
  76  *
  77  * <pre>
  78  *     new Sink.ChainedReference<U>(sink) {
  79  *         public void accept(U u) {
  80  *             downstream.accept(mappingFunction.applyAsInt(u));
  81  *         }
  82  *     }
  83  * </pre>
  84  *
  85  * @param <E_IN>  Type of input elements to the operation
  86  * @param <E_OUT> Type of output elements to the operation
  87  * @see TerminalOp
  88  * @see StatefulOp
  89  * @since 1.8
  90  */
  91 interface IntermediateOp<E_IN, E_OUT> {
  92 
  93     /**
  94      * Gets the shape of the input type of this operation
  95      *
  96      * @implSpec The default returns {@code StreamShape.REFERENCE}
  97      * @return Shape of the input type of this operation
  98      */
  99     default StreamShape inputShape() { return StreamShape.REFERENCE; }
 100 
 101     /**
 102      * Gets the shape of the output type of this operation
 103      *
 104      * @implSpec The default returns {@code StreamShape.REFERENCE}
 105      * @return Shape of the output type of this operation
 106      */
 107     default StreamShape outputShape() { return StreamShape.REFERENCE; }
 108 
 109     /**
 110      * Gets the operation flags of this operation.
 111      *
 112      * @implSpec The default returns {@code 0}
 113      * @return a bitmap describing the operation flags of this operation
 114      * @see StreamOpFlag
 115      */
 116     default int getOpFlags() { return 0; }
 117 
 118     /**
 119      * Returns whether this operation is stateful or not.  If it is stateful,
 120      * then the method {@link #evaluateParallel(PipelineHelper)} must be
 121      * overridden.
 122      *
 123      * @implSpec The default implementation returns {@code false}.
 124      * @return {@code true} if this operation is stateful
 125      */
 126     default boolean isStateful() { return false; }
 127 
 128     /**
 129      * Accepts a {@code Sink} which will receive the results of this operation,
 130      * and return a {@code Sink} which accepts elements of the input type of
 131      * this operation and which performs the operation, passing the results to
 132      * the provided {@code Sink}.
 133      *
 134      * <p>The implementation may use the {@code flags} parameter to optimize the
 135      * sink wrapping.  For example, if the input is already {@code DISTINCT},
 136      * the implementation for the {@code Stream#distinct()} method could just
 137      * return the sink it was passed.
 138      *
 139      * @param flags The combined stream and operation flags up to, but not
 140      *        including, this operation.
 141      * @param sink elements will be sent to this sink after the processing.
 142      * @return a sink which will accept elements and perform the operation upon
 143      *         each element, passing the results (if any) to the provided
 144      *         {@code Sink}.
 145      */
 146     Sink<E_IN> wrapSink(int flags, Sink<E_OUT> sink);
 147 
 148     /**
 149      * Performs a parallel evaluation of the operation using the specified
 150      * {@code PipelineHelper}, which describes the stream source and upstream
 151      * intermediate operations.  Only called on stateful operations.  If
 152      * {@link #isStateful()} returns true then implementations must override the
 153      * default implementation.
 154      *
 155      * @implSpec The default implementation throws an
 156      * {@link UnsupportedOperationException}
 157      *
 158      * @param helper the pipeline helper
 159      * @param <P_IN> the type of elements in the pipeline source
 160      * @return a {@code Node} describing the result of the evaluation
 161      */
 162     default <P_IN> Node<E_OUT> evaluateParallel(PipelineHelper<P_IN, E_OUT> helper) {
 163         throw new UnsupportedOperationException("Parallel evaluation is not supported");
 164     }
 165 }