1 /*
   2  * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.function.Consumer;
  29 import java.util.function.DoubleConsumer;
  30 import java.util.function.IntConsumer;
  31 import java.util.function.LongConsumer;
  32 
  33 /**
  34  * An extension of {@link Consumer} used to conduct values through the stages of a stream pipeline, with additional
  35  * methods to manage size information, control flow, etc.  Before calling the {@code accept()} method on a
  36  * {@code Sink} for the first time, you must first call the {@code begin()} method to inform it that data is coming
  37  * (optionally informing the sink how much data is coming), and after all data has been sent, you must call the
  38  * {@code end()} method.  After calling {@code end()}, you should not call {@code accept()} without again
  39  * calling {@code begin()}.  {@code Sink} also offers a mechanism by which the sink can cooperatively signal that
  40  * it does not wish to receive any more data (the {@code cancellationRequested()} method), which a source can poll
  41  * before sending more data to the {@code Sink}.
  42  *
  43  * @apiNote
  44  *
  45  * A stream pipeline consists of a source, zero or more intermediate stages (such as filtering or mapping), and a
  46  * terminal stage, such as reduction or for-each.  For concreteness, consider the pipeline:
  47  *
  48  * <pre>
  49  *     int longestStringLengthStartingWithA
  50  *         = strings.stream()
  51  *                  .filter(s -> s.startsWith("A"))
  52  *                  .map(String::length)
  53  *                  .max();
  54  * </pre>
  55  *
  56  * Here, we have three stages, filtering, mapping, and reducing.  The filtering stage consumes strings and emits
  57  * a subset of those strings; the mapping stage consumes strings and emits ints; the reduction stage consumes those
  58  * ints and computes the maximal value.
  59  *
  60  * A {@code Sink} instance is used to represent each stage of this pipeline, whether the stage accepts objects,
  61  * ints, longs, or doubles.  Sink has entry points for {@code accept(Object)}, {@code accept(int)}, etc, so that we
  62  * do not need a specialized interface for each primitive specialization.  (It might be called a "kitchen sink"
  63  * for this omnivorous tendency.)  The entry point to the pipeline is the {@code Sink} for the filtering stage,
  64  * which sends some elements "downstream" -- into the {@code Sink} for the mapping stage, which in turn sends integral
  65  * values downstream into the {@code Sink} for the reduction stage. The {@code Sink} implementations associated
  66  * with a given stage is expected to know the data type for the next stage, and call the correct {@code accept} method
  67  * on its downstream {@code Sink}.  Similarly, each stage must implement the correct {@code accept} method
  68  * corresponding to the data type it accepts.
  69  *
  70  * The specialized subtypes such as {@link Sink.OfInt} bridge {@code accept(Object)} to call the appropriate
  71  * primitive specialization of {@code accept}, implement the appropriate primitive specialization of {@code Consumer},
  72  * and reabstract the appropriate primitive specialization of {@code accept}.
  73  *
  74  * The chaining subtypes such as {@link ChainedInt} not only implement {@code Sink.OfInt}, but also maintain a
  75  * {@code downstream} field which represents the downstream {@code Sink}, and implement the methods {@code begin()},
  76  * {@code end()}, and {@code cancellationRequested()} to delegate to the downstream {@code Sink}.
  77  * Most implementations of intermediate operations will use these chaining wrappers.  For example, the mapping stage
  78  * in the above example would look like:
  79  *
  80  * <pre>
  81  *     IntSink is = new Sink.ChainedReference<U>(sink) {
  82  *         public void accept(U u) {
  83  *             downstream.accept(mapper.applyAsInt(u));
  84  *         }
  85  *     };
  86  * </pre>
  87  *
  88  * Here, we implement {@code Sink.ChanedReference<U>}, meaning that we expect to receive elements of type {@code U}
  89  * as input, and pass the downstream sink to the constructor.  Because the next stage expects to receive integers,
  90  * we must call the {@code accept(int)} method when emitting values to the downstream.  The {@code accept()} method
  91  * applies the mapping function from {@code U} to {@code int} and passes the resulting value to the downstream
  92  * {@code Sink}.
  93  *
  94  * @param <T> Type of elements for value streams
  95  * @since 1.8
  96  */
  97 @FunctionalInterface
  98 interface Sink<T> extends Consumer<T> {
  99     /**
 100      * Reset the sink state to receive a fresh data set. This is used when a
 101      * {@code Sink} is being reused by multiple calculations.
 102      * @param size The exact size of the data to be pushed downstream, if
 103      * known or {@code Long.MAX_VALUE} if unknown or infinite.
 104      */
 105     default void begin(long size) {}
 106 
 107     /**
 108      * Indicate that all elements have been pushed.  If the {@code Sink} buffers any
 109      * results from previous values, they should dump their contents downstream and
 110      * clear any stored state.
 111      */
 112     default void end() {}
 113 
 114     /**
 115      * Used to communicate to upstream sources that this {@code Sink} does not wish to receive
 116      * any more data
 117      * @return
 118      */
 119     default boolean cancellationRequested() {
 120         return false;
 121     }
 122 
 123     /**
 124      * Accept an int value
 125      * @implSpec The default implementation throws IllegalStateException
 126      *
 127      * @throws IllegalStateException If this sink does not accept int values
 128      */
 129     default void accept(int value) {
 130         throw new IllegalStateException("called wrong accept method");
 131     }
 132 
 133     /**
 134      * Accept a long value
 135      * @implSpec The default implementation throws IllegalStateException
 136      *
 137      * @throws IllegalStateException If this sink does not accept long values
 138      */
 139     default void accept(long value) {
 140         throw new IllegalStateException("called wrong accept method");
 141     }
 142 
 143     /**
 144      * Accept a double value
 145      * @implSpec The default implementation throws IllegalStateException
 146      *
 147      * @throws IllegalStateException If this sink does not accept double values
 148      */
 149     default void accept(double value) {
 150         throw new IllegalStateException("called wrong accept method");
 151     }
 152 
 153     /**
 154      * {@code Sink} that implements {@code Sink<Integer>}, reabstracts {@code accept(int)},
 155      * and wires {@code accept(Integer)} to bridge to {@code accept(int)}.
 156      */
 157     @FunctionalInterface
 158     interface OfInt extends Sink<Integer>, IntConsumer {
 159         @Override
 160         void accept(int value);
 161 
 162         @Override
 163         default void accept(Integer i) {
 164             if (Tripwire.enabled)
 165                 Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
 166             accept(i.intValue());
 167         }
 168     }
 169 
 170     /**
 171      * {@code Sink} that implements {@code Sink<Long>}, reabstracts {@code accept(long)},
 172      * and wires {@code accept(Long)} to bridge to {@code accept(long)}.
 173      */
 174     @FunctionalInterface
 175     interface OfLong extends Sink<Long>, LongConsumer {
 176         @Override
 177         void accept(long value);
 178 
 179         @Override
 180         default void accept(Long i) {
 181             if (Tripwire.enabled)
 182                 Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
 183             accept(i.longValue());
 184         }
 185     }
 186 
 187     /**
 188      * {@code Sink} that implements {@code Sink<Double>}, reabstracts {@code accept(double)},
 189      * and wires {@code accept(Double)} to bridge to {@code accept(double)}.
 190      */
 191     @FunctionalInterface
 192     interface OfDouble extends Sink<Double>, DoubleConsumer {
 193         @Override
 194         void accept(double value);
 195 
 196         @Override
 197         default void accept(Double i) {
 198             if (Tripwire.enabled)
 199                 Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
 200             accept(i.doubleValue());
 201         }
 202     }
 203 
 204     /**
 205      * {@code Sink} implementation designed for creating chains of sinks.  The {@code begin} and
 206      * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream
 207      * {@code Sink}.  This implementation takes a downstream {@code Sink} of unknown input shape
 208      * and produces a {@code Sink<T>}.  The implementation of the {@code accept()} method must
 209      * call the correct {@code accept()} method on the downstream {@code Sink}.
 210      */
 211     static abstract class ChainedReference<T> implements Sink<T> {
 212         protected final Sink downstream;
 213 
 214         public ChainedReference(Sink downstream) {
 215             this.downstream = Objects.requireNonNull(downstream);
 216         }
 217 
 218         @Override
 219         public void begin(long size) {
 220             downstream.begin(size);
 221         }
 222 
 223         @Override
 224         public void end() {
 225             downstream.end();
 226         }
 227 
 228         @Override
 229         public boolean cancellationRequested() {
 230             return downstream.cancellationRequested();
 231         }
 232     }
 233 
 234     /**
 235      * {@code Sink} implementation designed for creating chains of sinks.  The {@code begin} and
 236      * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream
 237      * {@code Sink}.  This implementation takes a downstream {@code Sink} of unknown input shape
 238      * and produces a {@code Sink.OfInt}.  The implementation of the {@code accept()} method must
 239      * call the correct {@code accept()} method on the downstream {@code Sink}.
 240      */
 241     static abstract class ChainedInt implements Sink.OfInt {
 242         protected final Sink downstream;
 243 
 244         public ChainedInt(Sink downstream) {
 245             this.downstream = Objects.requireNonNull(downstream);
 246         }
 247 
 248         @Override
 249         public void begin(long size) {
 250             downstream.begin(size);
 251         }
 252 
 253         @Override
 254         public void end() {
 255             downstream.end();
 256         }
 257 
 258         @Override
 259         public boolean cancellationRequested() {
 260             return downstream.cancellationRequested();
 261         }
 262     }
 263 
 264     /**
 265      * {@code Sink} implementation designed for creating chains of sinks.  The {@code begin} and
 266      * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream
 267      * {@code Sink}.  This implementation takes a downstream {@code Sink} of unknown input shape
 268      * and produces a {@code Sink.OfLong}.  The implementation of the {@code accept()} method must
 269      * call the correct {@code accept()} method on the downstream {@code Sink}.
 270      */
 271     static abstract class ChainedLong implements Sink.OfLong {
 272         protected final Sink downstream;
 273 
 274         public ChainedLong(Sink downstream) {
 275             this.downstream = Objects.requireNonNull(downstream);
 276         }
 277 
 278         @Override
 279         public void begin(long size) {
 280             downstream.begin(size);
 281         }
 282 
 283         @Override
 284         public void end() {
 285             downstream.end();
 286         }
 287 
 288         @Override
 289         public boolean cancellationRequested() {
 290             return downstream.cancellationRequested();
 291         }
 292     }
 293 
 294     /**
 295      * {@code Sink} implementation designed for creating chains of sinks.  The {@code begin} and
 296      * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream
 297      * {@code Sink}.  This implementation takes a downstream {@code Sink} of unknown input shape
 298      * and produces a {@code Sink.OfDouble}.  The implementation of the {@code accept()} method must
 299      * call the correct {@code accept()} method on the downstream {@code Sink}.
 300      */
 301     static abstract class ChainedDouble implements Sink.OfDouble {
 302         protected final Sink downstream;
 303 
 304         public ChainedDouble(Sink downstream) {
 305             this.downstream = Objects.requireNonNull(downstream);
 306         }
 307 
 308         @Override
 309         public void begin(long size) {
 310             downstream.begin(size);
 311         }
 312 
 313         @Override
 314         public void end() {
 315             downstream.end();
 316         }
 317 
 318         @Override
 319         public boolean cancellationRequested() {
 320             return downstream.cancellationRequested();
 321         }
 322     }
 323 }