--- /dev/null 2013-02-14 00:07:37.184020992 -0500
+++ new/src/share/classes/java/util/stream/Sink.java 2013-02-21 14:11:06.000000000 -0500
@@ -0,0 +1,323 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.function.DoubleConsumer;
+import java.util.function.IntConsumer;
+import java.util.function.LongConsumer;
+
+/**
+ * An extension of {@link Consumer} used to conduct values through the stages of a stream pipeline, with additional
+ * methods to manage size information, control flow, etc. Before calling the {@code accept()} method on a
+ * {@code Sink} for the first time, you must first call the {@code begin()} method to inform it that data is coming
+ * (optionally informing the sink how much data is coming), and after all data has been sent, you must call the
+ * {@code end()} method. After calling {@code end()}, you should not call {@code accept()} without again
+ * calling {@code begin()}. {@code Sink} also offers a mechanism by which the sink can cooperatively signal that
+ * it does not wish to receive any more data (the {@code cancellationRequested()} method), which a source can poll
+ * before sending more data to the {@code Sink}.
+ *
+ * @apiNote
+ *
+ * A stream pipeline consists of a source, zero or more intermediate stages (such as filtering or mapping), and a
+ * terminal stage, such as reduction or for-each. For concreteness, consider the pipeline:
+ *
+ *
+ * int longestStringLengthStartingWithA
+ * = strings.stream()
+ * .filter(s -> s.startsWith("A"))
+ * .map(String::length)
+ * .max();
+ *
+ *
+ * Here, we have three stages, filtering, mapping, and reducing. The filtering stage consumes strings and emits
+ * a subset of those strings; the mapping stage consumes strings and emits ints; the reduction stage consumes those
+ * ints and computes the maximal value.
+ *
+ * A {@code Sink} instance is used to represent each stage of this pipeline, whether the stage accepts objects,
+ * ints, longs, or doubles. Sink has entry points for {@code accept(Object)}, {@code accept(int)}, etc, so that we
+ * do not need a specialized interface for each primitive specialization. (It might be called a "kitchen sink"
+ * for this omnivorous tendency.) The entry point to the pipeline is the {@code Sink} for the filtering stage,
+ * which sends some elements "downstream" -- into the {@code Sink} for the mapping stage, which in turn sends integral
+ * values downstream into the {@code Sink} for the reduction stage. The {@code Sink} implementations associated
+ * with a given stage is expected to know the data type for the next stage, and call the correct {@code accept} method
+ * on its downstream {@code Sink}. Similarly, each stage must implement the correct {@code accept} method
+ * corresponding to the data type it accepts.
+ *
+ * The specialized subtypes such as {@link Sink.OfInt} bridge {@code accept(Object)} to call the appropriate
+ * primitive specialization of {@code accept}, implement the appropriate primitive specialization of {@code Consumer},
+ * and reabstract the appropriate primitive specialization of {@code accept}.
+ *
+ * The chaining subtypes such as {@link ChainedInt} not only implement {@code Sink.OfInt}, but also maintain a
+ * {@code downstream} field which represents the downstream {@code Sink}, and implement the methods {@code begin()},
+ * {@code end()}, and {@code cancellationRequested()} to delegate to the downstream {@code Sink}.
+ * Most implementations of intermediate operations will use these chaining wrappers. For example, the mapping stage
+ * in the above example would look like:
+ *
+ *
+ * IntSink is = new Sink.ChainedReference(sink) {
+ * public void accept(U u) {
+ * downstream.accept(mapper.applyAsInt(u));
+ * }
+ * };
+ *
+ *
+ * Here, we implement {@code Sink.ChanedReference}, meaning that we expect to receive elements of type {@code U}
+ * as input, and pass the downstream sink to the constructor. Because the next stage expects to receive integers,
+ * we must call the {@code accept(int)} method when emitting values to the downstream. The {@code accept()} method
+ * applies the mapping function from {@code U} to {@code int} and passes the resulting value to the downstream
+ * {@code Sink}.
+ *
+ * @param Type of elements for value streams
+ * @since 1.8
+ */
+@FunctionalInterface
+interface Sink extends Consumer {
+ /**
+ * Reset the sink state to receive a fresh data set. This is used when a
+ * {@code Sink} is being reused by multiple calculations.
+ * @param size The exact size of the data to be pushed downstream, if
+ * known or {@code Long.MAX_VALUE} if unknown or infinite.
+ */
+ default void begin(long size) {}
+
+ /**
+ * Indicate that all elements have been pushed. If the {@code Sink} buffers any
+ * results from previous values, they should dump their contents downstream and
+ * clear any stored state.
+ */
+ default void end() {}
+
+ /**
+ * Used to communicate to upstream sources that this {@code Sink} does not wish to receive
+ * any more data
+ * @return
+ */
+ default boolean cancellationRequested() {
+ return false;
+ }
+
+ /**
+ * Accept an int value
+ * @implSpec The default implementation throws IllegalStateException
+ *
+ * @throws IllegalStateException If this sink does not accept int values
+ */
+ default void accept(int value) {
+ throw new IllegalStateException("called wrong accept method");
+ }
+
+ /**
+ * Accept a long value
+ * @implSpec The default implementation throws IllegalStateException
+ *
+ * @throws IllegalStateException If this sink does not accept long values
+ */
+ default void accept(long value) {
+ throw new IllegalStateException("called wrong accept method");
+ }
+
+ /**
+ * Accept a double value
+ * @implSpec The default implementation throws IllegalStateException
+ *
+ * @throws IllegalStateException If this sink does not accept double values
+ */
+ default void accept(double value) {
+ throw new IllegalStateException("called wrong accept method");
+ }
+
+ /**
+ * {@code Sink} that implements {@code Sink}, reabstracts {@code accept(int)},
+ * and wires {@code accept(Integer)} to bridge to {@code accept(int)}.
+ */
+ @FunctionalInterface
+ interface OfInt extends Sink, IntConsumer {
+ @Override
+ void accept(int value);
+
+ @Override
+ default void accept(Integer i) {
+ if (Tripwire.enabled)
+ Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
+ accept(i.intValue());
+ }
+ }
+
+ /**
+ * {@code Sink} that implements {@code Sink}, reabstracts {@code accept(long)},
+ * and wires {@code accept(Long)} to bridge to {@code accept(long)}.
+ */
+ @FunctionalInterface
+ interface OfLong extends Sink, LongConsumer {
+ @Override
+ void accept(long value);
+
+ @Override
+ default void accept(Long i) {
+ if (Tripwire.enabled)
+ Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
+ accept(i.longValue());
+ }
+ }
+
+ /**
+ * {@code Sink} that implements {@code Sink}, reabstracts {@code accept(double)},
+ * and wires {@code accept(Double)} to bridge to {@code accept(double)}.
+ */
+ @FunctionalInterface
+ interface OfDouble extends Sink, DoubleConsumer {
+ @Override
+ void accept(double value);
+
+ @Override
+ default void accept(Double i) {
+ if (Tripwire.enabled)
+ Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
+ accept(i.doubleValue());
+ }
+ }
+
+ /**
+ * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and
+ * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream
+ * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape
+ * and produces a {@code Sink}. The implementation of the {@code accept()} method must
+ * call the correct {@code accept()} method on the downstream {@code Sink}.
+ */
+ static abstract class ChainedReference implements Sink {
+ protected final Sink downstream;
+
+ public ChainedReference(Sink downstream) {
+ this.downstream = Objects.requireNonNull(downstream);
+ }
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(size);
+ }
+
+ @Override
+ public void end() {
+ downstream.end();
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return downstream.cancellationRequested();
+ }
+ }
+
+ /**
+ * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and
+ * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream
+ * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape
+ * and produces a {@code Sink.OfInt}. The implementation of the {@code accept()} method must
+ * call the correct {@code accept()} method on the downstream {@code Sink}.
+ */
+ static abstract class ChainedInt implements Sink.OfInt {
+ protected final Sink downstream;
+
+ public ChainedInt(Sink downstream) {
+ this.downstream = Objects.requireNonNull(downstream);
+ }
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(size);
+ }
+
+ @Override
+ public void end() {
+ downstream.end();
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return downstream.cancellationRequested();
+ }
+ }
+
+ /**
+ * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and
+ * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream
+ * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape
+ * and produces a {@code Sink.OfLong}. The implementation of the {@code accept()} method must
+ * call the correct {@code accept()} method on the downstream {@code Sink}.
+ */
+ static abstract class ChainedLong implements Sink.OfLong {
+ protected final Sink downstream;
+
+ public ChainedLong(Sink downstream) {
+ this.downstream = Objects.requireNonNull(downstream);
+ }
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(size);
+ }
+
+ @Override
+ public void end() {
+ downstream.end();
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return downstream.cancellationRequested();
+ }
+ }
+
+ /**
+ * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and
+ * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream
+ * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape
+ * and produces a {@code Sink.OfDouble}. The implementation of the {@code accept()} method must
+ * call the correct {@code accept()} method on the downstream {@code Sink}.
+ */
+ static abstract class ChainedDouble implements Sink.OfDouble {
+ protected final Sink downstream;
+
+ public ChainedDouble(Sink downstream) {
+ this.downstream = Objects.requireNonNull(downstream);
+ }
+
+ @Override
+ public void begin(long size) {
+ downstream.begin(size);
+ }
+
+ @Override
+ public void end() {
+ downstream.end();
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return downstream.cancellationRequested();
+ }
+ }
+}