/* * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.util.stream; import java.util.Objects; import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.IntConsumer; import java.util.function.LongConsumer; /** * An extension of {@link Consumer} used to conduct values through the stages of * a stream pipeline, with additional methods to manage size information, * control flow, etc. Before calling the {@code accept()} method on a * {@code Sink} for the first time, you must first call the {@code begin()} * method to inform it that data is coming (optionally informing the sink how * much data is coming), and after all data has been sent, you must call the * {@code end()} method. After calling {@code end()}, you should not call * {@code accept()} without again calling {@code begin()}. {@code Sink} also * offers a mechanism by which the sink can cooperatively signal that it does * not wish to receive any more data (the {@code cancellationRequested()} * method), which a source can poll before sending more data to the * {@code Sink}. * *

A sink may be in one of two states: an initial state and an active state. * It starts out in the initial state; the {@code begin()} method transitions * it to the active state, and the {@code end()} method transitions it back into * the initial state, where it can be re-used. Data-accepting methods (such as * {@code accept()} are only valid in the active state. * * @apiNote * A stream pipeline consists of a source, zero or more intermediate stages * (such as filtering or mapping), and a terminal stage, such as reduction or * for-each. For concreteness, consider the pipeline: * *

{@code
 *     int longestStringLengthStartingWithA
 *         = strings.stream()
 *                  .filter(s -> s.startsWith("A"))
 *                  .mapToInt(String::length)
 *                  .max();
 * }
* *

Here, we have three stages, filtering, mapping, and reducing. The * filtering stage consumes strings and emits a subset of those strings; the * mapping stage consumes strings and emits ints; the reduction stage consumes * those ints and computes the maximal value. * *

A {@code Sink} instance is used to represent each stage of this pipeline, * whether the stage accepts objects, ints, longs, or doubles. Sink has entry * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do * not need a specialized interface for each primitive specialization. (It * might be called a "kitchen sink" for this omnivorous tendency.) The entry * point to the pipeline is the {@code Sink} for the filtering stage, which * sends some elements "downstream" -- into the {@code Sink} for the mapping * stage, which in turn sends integral values downstream into the {@code Sink} * for the reduction stage. The {@code Sink} implementations associated with a * given stage is expected to know the data type for the next stage, and call * the correct {@code accept} method on its downstream {@code Sink}. Similarly, * each stage must implement the correct {@code accept} method corresponding to * the data type it accepts. * *

The specialized subtypes such as {@link Sink.OfInt} override * {@code accept(Object)} to call the appropriate primitive specialization of * {@code accept}, implement the appropriate primitive specialization of * {@code Consumer}, and re-abstract the appropriate primitive specialization of * {@code accept}. * *

The chaining subtypes such as {@link ChainedInt} not only implement * {@code Sink.OfInt}, but also maintain a {@code downstream} field which * represents the downstream {@code Sink}, and implement the methods * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to * delegate to the downstream {@code Sink}. Most implementations of * intermediate operations will use these chaining wrappers. For example, the * mapping stage in the above example would look like: * *

{@code
 *     IntSink is = new Sink.ChainedReference(sink) {
 *         public void accept(U u) {
 *             downstream.accept(mapper.applyAsInt(u));
 *         }
 *     };
 * }
* *

Here, we implement {@code Sink.ChainedReference}, meaning that we expect * to receive elements of type {@code U} as input, and pass the downstream sink * to the constructor. Because the next stage expects to receive integers, we * must call the {@code accept(int)} method when emitting values to the downstream. * The {@code accept()} method applies the mapping function from {@code U} to * {@code int} and passes the resulting value to the downstream {@code Sink}. * * @param type of elements for value streams * @since 1.8 */ interface Sink extends Consumer { /** * Resets the sink state to receive a fresh data set. This must be called * before sending any data to the sink. After calling {@link #end()}, * you may call this method to reset the sink for another calculation. * @param size The exact size of the data to be pushed downstream, if * known or {@code -1} if unknown or infinite. * *

Prior to this call, the sink must be in the initial state, and after * this call it is in the active state. */ default void begin(long size) {} /** * Indicates that all elements have been pushed. If the {@code Sink} is * stateful, it should send any stored state downstream at this time, and * should clear any accumulated state (and associated resources). * *

Prior to this call, the sink must be in the active state, and after * this call it is returned to the initial state. */ default void end() {} /** * Indicates that this {@code Sink} does not wish to receive any more data. * * @implSpec The default implementation always returns false. * * @return true if cancellation is requested */ default boolean cancellationRequested() { return false; } /** * Accepts an int value. * * @implSpec The default implementation throws IllegalStateException. * * @throws IllegalStateException if this sink does not accept int values */ default void accept(int value) { throw new IllegalStateException("called wrong accept method"); } /** * Accepts a long value. * * @implSpec The default implementation throws IllegalStateException. * * @throws IllegalStateException if this sink does not accept long values */ default void accept(long value) { throw new IllegalStateException("called wrong accept method"); } /** * Accepts a double value. * * @implSpec The default implementation throws IllegalStateException. * * @throws IllegalStateException if this sink does not accept double values */ default void accept(double value) { throw new IllegalStateException("called wrong accept method"); } /** * {@code Sink} that implements {@code Sink}, re-abstracts * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to * {@code accept(int)}. */ interface OfInt extends Sink, IntConsumer { @Override void accept(int value); @Override default void accept(Integer i) { if (Tripwire.ENABLED) Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)"); accept(i.intValue()); } } /** * {@code Sink} that implements {@code Sink}, re-abstracts * {@code accept(long)}, and wires {@code accept(Long)} to bridge to * {@code accept(long)}. */ interface OfLong extends Sink, LongConsumer { @Override void accept(long value); @Override default void accept(Long i) { if (Tripwire.ENABLED) Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)"); accept(i.longValue()); } } /** * {@code Sink} that implements {@code Sink}, re-abstracts * {@code accept(double)}, and wires {@code accept(Double)} to bridge to * {@code accept(double)}. */ interface OfDouble extends Sink, DoubleConsumer { @Override void accept(double value); @Override default void accept(Double i) { if (Tripwire.ENABLED) Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)"); accept(i.doubleValue()); } } /** * Abstract {@code Sink} implementation for creating chains of * sinks. The {@code begin}, {@code end}, and * {@code cancellationRequested} methods are wired to chain to the * downstream {@code Sink}. This implementation takes a downstream * {@code Sink} of unknown input shape and produces a {@code Sink}. The * implementation of the {@code accept()} method must call the correct * {@code accept()} method on the downstream {@code Sink}. */ static abstract class ChainedReference implements Sink { protected final Sink downstream; public ChainedReference(Sink downstream) { this.downstream = Objects.requireNonNull(downstream); } @Override public void begin(long size) { downstream.begin(size); } @Override public void end() { downstream.end(); } @Override public boolean cancellationRequested() { return downstream.cancellationRequested(); } } /** * Abstract {@code Sink} implementation designed for creating chains of * sinks. The {@code begin}, {@code end}, and * {@code cancellationRequested} methods are wired to chain to the * downstream {@code Sink}. This implementation takes a downstream * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}. * The implementation of the {@code accept()} method must call the correct * {@code accept()} method on the downstream {@code Sink}. */ static abstract class ChainedInt implements Sink.OfInt { protected final Sink downstream; public ChainedInt(Sink downstream) { this.downstream = Objects.requireNonNull(downstream); } @Override public void begin(long size) { downstream.begin(size); } @Override public void end() { downstream.end(); } @Override public boolean cancellationRequested() { return downstream.cancellationRequested(); } } /** * Abstract {@code Sink} implementation designed for creating chains of * sinks. The {@code begin}, {@code end}, and * {@code cancellationRequested} methods are wired to chain to the * downstream {@code Sink}. This implementation takes a downstream * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}. * The implementation of the {@code accept()} method must call the correct * {@code accept()} method on the downstream {@code Sink}. */ static abstract class ChainedLong implements Sink.OfLong { protected final Sink downstream; public ChainedLong(Sink downstream) { this.downstream = Objects.requireNonNull(downstream); } @Override public void begin(long size) { downstream.begin(size); } @Override public void end() { downstream.end(); } @Override public boolean cancellationRequested() { return downstream.cancellationRequested(); } } /** * Abstract {@code Sink} implementation designed for creating chains of * sinks. The {@code begin}, {@code end}, and * {@code cancellationRequested} methods are wired to chain to the * downstream {@code Sink}. This implementation takes a downstream * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}. * The implementation of the {@code accept()} method must call the correct * {@code accept()} method on the downstream {@code Sink}. */ static abstract class ChainedDouble implements Sink.OfDouble { protected final Sink downstream; public ChainedDouble(Sink downstream) { this.downstream = Objects.requireNonNull(downstream); } @Override public void begin(long size) { downstream.begin(size); } @Override public void end() { downstream.end(); } @Override public boolean cancellationRequested() { return downstream.cancellationRequested(); } } }