--- /dev/null 2013-02-14 00:07:37.184020992 -0500 +++ new/src/share/classes/java/util/stream/Sink.java 2013-02-21 14:11:06.000000000 -0500 @@ -0,0 +1,323 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.DoubleConsumer; +import java.util.function.IntConsumer; +import java.util.function.LongConsumer; + +/** + * An extension of {@link Consumer} used to conduct values through the stages of a stream pipeline, with additional + * methods to manage size information, control flow, etc. Before calling the {@code accept()} method on a + * {@code Sink} for the first time, you must first call the {@code begin()} method to inform it that data is coming + * (optionally informing the sink how much data is coming), and after all data has been sent, you must call the + * {@code end()} method. After calling {@code end()}, you should not call {@code accept()} without again + * calling {@code begin()}. {@code Sink} also offers a mechanism by which the sink can cooperatively signal that + * it does not wish to receive any more data (the {@code cancellationRequested()} method), which a source can poll + * before sending more data to the {@code Sink}. + * + * @apiNote + * + * A stream pipeline consists of a source, zero or more intermediate stages (such as filtering or mapping), and a + * terminal stage, such as reduction or for-each. For concreteness, consider the pipeline: + * + *
+ *     int longestStringLengthStartingWithA
+ *         = strings.stream()
+ *                  .filter(s -> s.startsWith("A"))
+ *                  .map(String::length)
+ *                  .max();
+ * 
+ * + * Here, we have three stages, filtering, mapping, and reducing. The filtering stage consumes strings and emits + * a subset of those strings; the mapping stage consumes strings and emits ints; the reduction stage consumes those + * ints and computes the maximal value. + * + * A {@code Sink} instance is used to represent each stage of this pipeline, whether the stage accepts objects, + * ints, longs, or doubles. Sink has entry points for {@code accept(Object)}, {@code accept(int)}, etc, so that we + * do not need a specialized interface for each primitive specialization. (It might be called a "kitchen sink" + * for this omnivorous tendency.) The entry point to the pipeline is the {@code Sink} for the filtering stage, + * which sends some elements "downstream" -- into the {@code Sink} for the mapping stage, which in turn sends integral + * values downstream into the {@code Sink} for the reduction stage. The {@code Sink} implementations associated + * with a given stage is expected to know the data type for the next stage, and call the correct {@code accept} method + * on its downstream {@code Sink}. Similarly, each stage must implement the correct {@code accept} method + * corresponding to the data type it accepts. + * + * The specialized subtypes such as {@link Sink.OfInt} bridge {@code accept(Object)} to call the appropriate + * primitive specialization of {@code accept}, implement the appropriate primitive specialization of {@code Consumer}, + * and reabstract the appropriate primitive specialization of {@code accept}. + * + * The chaining subtypes such as {@link ChainedInt} not only implement {@code Sink.OfInt}, but also maintain a + * {@code downstream} field which represents the downstream {@code Sink}, and implement the methods {@code begin()}, + * {@code end()}, and {@code cancellationRequested()} to delegate to the downstream {@code Sink}. + * Most implementations of intermediate operations will use these chaining wrappers. For example, the mapping stage + * in the above example would look like: + * + *
+ *     IntSink is = new Sink.ChainedReference(sink) {
+ *         public void accept(U u) {
+ *             downstream.accept(mapper.applyAsInt(u));
+ *         }
+ *     };
+ * 
+ * + * Here, we implement {@code Sink.ChanedReference}, meaning that we expect to receive elements of type {@code U} + * as input, and pass the downstream sink to the constructor. Because the next stage expects to receive integers, + * we must call the {@code accept(int)} method when emitting values to the downstream. The {@code accept()} method + * applies the mapping function from {@code U} to {@code int} and passes the resulting value to the downstream + * {@code Sink}. + * + * @param Type of elements for value streams + * @since 1.8 + */ +@FunctionalInterface +interface Sink extends Consumer { + /** + * Reset the sink state to receive a fresh data set. This is used when a + * {@code Sink} is being reused by multiple calculations. + * @param size The exact size of the data to be pushed downstream, if + * known or {@code Long.MAX_VALUE} if unknown or infinite. + */ + default void begin(long size) {} + + /** + * Indicate that all elements have been pushed. If the {@code Sink} buffers any + * results from previous values, they should dump their contents downstream and + * clear any stored state. + */ + default void end() {} + + /** + * Used to communicate to upstream sources that this {@code Sink} does not wish to receive + * any more data + * @return + */ + default boolean cancellationRequested() { + return false; + } + + /** + * Accept an int value + * @implSpec The default implementation throws IllegalStateException + * + * @throws IllegalStateException If this sink does not accept int values + */ + default void accept(int value) { + throw new IllegalStateException("called wrong accept method"); + } + + /** + * Accept a long value + * @implSpec The default implementation throws IllegalStateException + * + * @throws IllegalStateException If this sink does not accept long values + */ + default void accept(long value) { + throw new IllegalStateException("called wrong accept method"); + } + + /** + * Accept a double value + * @implSpec The default implementation throws IllegalStateException + * + * @throws IllegalStateException If this sink does not accept double values + */ + default void accept(double value) { + throw new IllegalStateException("called wrong accept method"); + } + + /** + * {@code Sink} that implements {@code Sink}, reabstracts {@code accept(int)}, + * and wires {@code accept(Integer)} to bridge to {@code accept(int)}. + */ + @FunctionalInterface + interface OfInt extends Sink, IntConsumer { + @Override + void accept(int value); + + @Override + default void accept(Integer i) { + if (Tripwire.enabled) + Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)"); + accept(i.intValue()); + } + } + + /** + * {@code Sink} that implements {@code Sink}, reabstracts {@code accept(long)}, + * and wires {@code accept(Long)} to bridge to {@code accept(long)}. + */ + @FunctionalInterface + interface OfLong extends Sink, LongConsumer { + @Override + void accept(long value); + + @Override + default void accept(Long i) { + if (Tripwire.enabled) + Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)"); + accept(i.longValue()); + } + } + + /** + * {@code Sink} that implements {@code Sink}, reabstracts {@code accept(double)}, + * and wires {@code accept(Double)} to bridge to {@code accept(double)}. + */ + @FunctionalInterface + interface OfDouble extends Sink, DoubleConsumer { + @Override + void accept(double value); + + @Override + default void accept(Double i) { + if (Tripwire.enabled) + Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)"); + accept(i.doubleValue()); + } + } + + /** + * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and + * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream + * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape + * and produces a {@code Sink}. The implementation of the {@code accept()} method must + * call the correct {@code accept()} method on the downstream {@code Sink}. + */ + static abstract class ChainedReference implements Sink { + protected final Sink downstream; + + public ChainedReference(Sink downstream) { + this.downstream = Objects.requireNonNull(downstream); + } + + @Override + public void begin(long size) { + downstream.begin(size); + } + + @Override + public void end() { + downstream.end(); + } + + @Override + public boolean cancellationRequested() { + return downstream.cancellationRequested(); + } + } + + /** + * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and + * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream + * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape + * and produces a {@code Sink.OfInt}. The implementation of the {@code accept()} method must + * call the correct {@code accept()} method on the downstream {@code Sink}. + */ + static abstract class ChainedInt implements Sink.OfInt { + protected final Sink downstream; + + public ChainedInt(Sink downstream) { + this.downstream = Objects.requireNonNull(downstream); + } + + @Override + public void begin(long size) { + downstream.begin(size); + } + + @Override + public void end() { + downstream.end(); + } + + @Override + public boolean cancellationRequested() { + return downstream.cancellationRequested(); + } + } + + /** + * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and + * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream + * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape + * and produces a {@code Sink.OfLong}. The implementation of the {@code accept()} method must + * call the correct {@code accept()} method on the downstream {@code Sink}. + */ + static abstract class ChainedLong implements Sink.OfLong { + protected final Sink downstream; + + public ChainedLong(Sink downstream) { + this.downstream = Objects.requireNonNull(downstream); + } + + @Override + public void begin(long size) { + downstream.begin(size); + } + + @Override + public void end() { + downstream.end(); + } + + @Override + public boolean cancellationRequested() { + return downstream.cancellationRequested(); + } + } + + /** + * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and + * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream + * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape + * and produces a {@code Sink.OfDouble}. The implementation of the {@code accept()} method must + * call the correct {@code accept()} method on the downstream {@code Sink}. + */ + static abstract class ChainedDouble implements Sink.OfDouble { + protected final Sink downstream; + + public ChainedDouble(Sink downstream) { + this.downstream = Objects.requireNonNull(downstream); + } + + @Override + public void begin(long size) { + downstream.begin(size); + } + + @Override + public void end() { + downstream.end(); + } + + @Override + public boolean cancellationRequested() { + return downstream.cancellationRequested(); + } + } +}