1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.function.Consumer;
  29 import java.util.function.DoubleConsumer;
  30 import java.util.function.IntConsumer;
  31 import java.util.function.LongConsumer;
  32 
  33 /**
  34  * An extension of {@link Consumer} used to conduct values through the stages of
  35  * a stream pipeline, with additional methods to manage size information,
  36  * control flow, etc.  Before calling the {@code accept()} method on a
  37  * {@code Sink} for the first time, you must first call the {@code begin()}
  38  * method to inform it that data is coming (optionally informing the sink how
  39  * much data is coming), and after all data has been sent, you must call the
  40  * {@code end()} method.  After calling {@code end()}, you should not call
  41  * {@code accept()} without again calling {@code begin()}.  {@code Sink} also
  42  * offers a mechanism by which the sink can cooperatively signal that it does
  43  * not wish to receive any more data (the {@code cancellationRequested()}
  44  * method), which a source can poll before sending more data to the
  45  * {@code Sink}.
  46  *
  47  * <p>A sink may be in one of two states: an initial state and an active state.
  48  * It starts out in the initial state; the {@code begin()} method transitions
  49  * it to the active state, and the {@code end()} method transitions it back into
  50  * the initial state, where it can be re-used.  Data-accepting methods (such as
  51  * {@code accept()} are only valid in the active state.
  52  *
  53  * @apiNote
  54  * A stream pipeline consists of a source, zero or more intermediate stages
  55  * (such as filtering or mapping), and a terminal stage, such as reduction or
  56  * for-each.  For concreteness, consider the pipeline:
  57  *
  58  * <pre>{@code
  59  *     int longestStringLengthStartingWithA
  60  *         = strings.stream()
  61  *                  .filter(s -> s.startsWith("A"))
  62  *                  .mapToInt(String::length)
  63  *                  .max();
  64  * }</pre>
  65  *
  66  * <p>Here, we have three stages, filtering, mapping, and reducing.  The
  67  * filtering stage consumes strings and emits a subset of those strings; the
  68  * mapping stage consumes strings and emits ints; the reduction stage consumes
  69  * those ints and computes the maximal value.
  70  *
  71  * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
  72  * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
  73  * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
  74  * not need a specialized interface for each primitive specialization.  (It
  75  * might be called a "kitchen sink" for this omnivorous tendency.)  The entry
  76  * point to the pipeline is the {@code Sink} for the filtering stage, which
  77  * sends some elements "downstream" -- into the {@code Sink} for the mapping
  78  * stage, which in turn sends integral values downstream into the {@code Sink}
  79  * for the reduction stage. The {@code Sink} implementations associated with a
  80  * given stage is expected to know the data type for the next stage, and call
  81  * the correct {@code accept} method on its downstream {@code Sink}.  Similarly,
  82  * each stage must implement the correct {@code accept} method corresponding to
  83  * the data type it accepts.
  84  *
  85  * <p>The specialized subtypes such as {@link Sink.OfInt} override
  86  * {@code accept(Object)} to call the appropriate primitive specialization of
  87  * {@code accept}, implement the appropriate primitive specialization of
  88  * {@code Consumer}, and re-abstract the appropriate primitive specialization of
  89  * {@code accept}.
  90  *
  91  * <p>The chaining subtypes such as {@link ChainedInt} not only implement
  92  * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
  93  * represents the downstream {@code Sink}, and implement the methods
  94  * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
  95  * delegate to the downstream {@code Sink}.  Most implementations of
  96  * intermediate operations will use these chaining wrappers.  For example, the
  97  * mapping stage in the above example would look like:
  98  *
  99  * <pre>{@code
 100  *     IntSink is = new Sink.ChainedReference<U>(sink) {
 101  *         public void accept(U u) {
 102  *             downstream.accept(mapper.applyAsInt(u));
 103  *         }
 104  *     };
 105  * }</pre>
 106  *
 107  * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
 108  * to receive elements of type {@code U} as input, and pass the downstream sink
 109  * to the constructor.  Because the next stage expects to receive integers, we
 110  * must call the {@code accept(int)} method when emitting values to the downstream.
 111  * The {@code accept()} method applies the mapping function from {@code U} to
 112  * {@code int} and passes the resulting value to the downstream {@code Sink}.
 113  *
 114  * @param <T> type of elements for value streams
 115  * @since 1.8
 116  */
 117 interface Sink<T> extends Consumer<T> {
 118     /**
 119      * Resets the sink state to receive a fresh data set.  This must be called
 120      * before sending any data to the sink.  After calling {@link #end()},
 121      * you may call this method to reset the sink for another calculation.
 122      * @param size The exact size of the data to be pushed downstream, if
 123      * known or {@code -1} if unknown or infinite.
 124      *
 125      * <p>Prior to this call, the sink must be in the initial state, and after
 126      * this call it is in the active state.
 127      */
 128     default void begin(long size) {}
 129 
 130     /**
 131      * Indicates that all elements have been pushed.  If the {@code Sink} is
 132      * stateful, it should send any stored state downstream at this time, and
 133      * should clear any accumulated state (and associated resources).
 134      *
 135      * <p>Prior to this call, the sink must be in the active state, and after
 136      * this call it is returned to the initial state.
 137      */
 138     default void end() {}
 139 
 140     /**
 141      * Indicates that this {@code Sink} does not wish to receive any more data.
 142      *
 143      * @implSpec The default implementation always returns false.
 144      *
 145      * @return true if cancellation is requested
 146      */
 147     default boolean cancellationRequested() {
 148         return false;
 149     }
 150 
 151     /**
 152      * Accepts an int value.
 153      *
 154      * @implSpec The default implementation throws IllegalStateException.
 155      *
 156      * @throws IllegalStateException if this sink does not accept int values
 157      */
 158     default void accept(int value) {
 159         throw new IllegalStateException("called wrong accept method");
 160     }
 161 
 162     /**
 163      * Accepts a long value.
 164      *
 165      * @implSpec The default implementation throws IllegalStateException.
 166      *
 167      * @throws IllegalStateException if this sink does not accept long values
 168      */
 169     default void accept(long value) {
 170         throw new IllegalStateException("called wrong accept method");
 171     }
 172 
 173     /**
 174      * Accepts a double value.
 175      *
 176      * @implSpec The default implementation throws IllegalStateException.
 177      *
 178      * @throws IllegalStateException if this sink does not accept double values
 179      */
 180     default void accept(double value) {
 181         throw new IllegalStateException("called wrong accept method");
 182     }
 183 
 184     /**
 185      * {@code Sink} that implements {@code Sink<Integer>}, re-abstracts
 186      * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to
 187      * {@code accept(int)}.
 188      */
 189     interface OfInt extends Sink<Integer>, IntConsumer {
 190         @Override
 191         void accept(int value);
 192 
 193         @Override
 194         default void accept(Integer i) {
 195             if (Tripwire.ENABLED)
 196                 Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
 197             accept(i.intValue());
 198         }
 199     }
 200 
 201     /**
 202      * {@code Sink} that implements {@code Sink<Long>}, re-abstracts
 203      * {@code accept(long)}, and wires {@code accept(Long)} to bridge to
 204      * {@code accept(long)}.
 205      */
 206     interface OfLong extends Sink<Long>, LongConsumer {
 207         @Override
 208         void accept(long value);
 209 
 210         @Override
 211         default void accept(Long i) {
 212             if (Tripwire.ENABLED)
 213                 Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
 214             accept(i.longValue());
 215         }
 216     }
 217 
 218     /**
 219      * {@code Sink} that implements {@code Sink<Double>}, re-abstracts
 220      * {@code accept(double)}, and wires {@code accept(Double)} to bridge to
 221      * {@code accept(double)}.
 222      */
 223     interface OfDouble extends Sink<Double>, DoubleConsumer {
 224         @Override
 225         void accept(double value);
 226 
 227         @Override
 228         default void accept(Double i) {
 229             if (Tripwire.ENABLED)
 230                 Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
 231             accept(i.doubleValue());
 232         }
 233     }
 234 
 235     /**
 236      * Abstract {@code Sink} implementation for creating chains of
 237      * sinks.  The {@code begin}, {@code end}, and
 238      * {@code cancellationRequested} methods are wired to chain to the
 239      * downstream {@code Sink}.  This implementation takes a downstream
 240      * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
 241      * implementation of the {@code accept()} method must call the correct
 242      * {@code accept()} method on the downstream {@code Sink}.
 243      */
 244     static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
 245         protected final Sink<? super E_OUT> downstream;
 246 
 247         public ChainedReference(Sink<? super E_OUT> downstream) {
 248             this.downstream = Objects.requireNonNull(downstream);
 249         }
 250 
 251         @Override
 252         public void begin(long size) {
 253             downstream.begin(size);
 254         }
 255 
 256         @Override
 257         public void end() {
 258             downstream.end();
 259         }
 260 
 261         @Override
 262         public boolean cancellationRequested() {
 263             return downstream.cancellationRequested();
 264         }
 265     }
 266 
 267     /**
 268      * Abstract {@code Sink} implementation designed for creating chains of
 269      * sinks.  The {@code begin}, {@code end}, and
 270      * {@code cancellationRequested} methods are wired to chain to the
 271      * downstream {@code Sink}.  This implementation takes a downstream
 272      * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}.
 273      * The implementation of the {@code accept()} method must call the correct
 274      * {@code accept()} method on the downstream {@code Sink}.
 275      */
 276     static abstract class ChainedInt<E_OUT> implements Sink.OfInt {
 277         protected final Sink<? super E_OUT> downstream;
 278 
 279         public ChainedInt(Sink<? super E_OUT> downstream) {
 280             this.downstream = Objects.requireNonNull(downstream);
 281         }
 282 
 283         @Override
 284         public void begin(long size) {
 285             downstream.begin(size);
 286         }
 287 
 288         @Override
 289         public void end() {
 290             downstream.end();
 291         }
 292 
 293         @Override
 294         public boolean cancellationRequested() {
 295             return downstream.cancellationRequested();
 296         }
 297     }
 298 
 299     /**
 300      * Abstract {@code Sink} implementation designed for creating chains of
 301      * sinks.  The {@code begin}, {@code end}, and
 302      * {@code cancellationRequested} methods are wired to chain to the
 303      * downstream {@code Sink}.  This implementation takes a downstream
 304      * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}.
 305      * The implementation of the {@code accept()} method must call the correct
 306      * {@code accept()} method on the downstream {@code Sink}.
 307      */
 308     static abstract class ChainedLong<E_OUT> implements Sink.OfLong {
 309         protected final Sink<? super E_OUT> downstream;
 310 
 311         public ChainedLong(Sink<? super E_OUT> downstream) {
 312             this.downstream = Objects.requireNonNull(downstream);
 313         }
 314 
 315         @Override
 316         public void begin(long size) {
 317             downstream.begin(size);
 318         }
 319 
 320         @Override
 321         public void end() {
 322             downstream.end();
 323         }
 324 
 325         @Override
 326         public boolean cancellationRequested() {
 327             return downstream.cancellationRequested();
 328         }
 329     }
 330 
 331     /**
 332      * Abstract {@code Sink} implementation designed for creating chains of
 333      * sinks.  The {@code begin}, {@code end}, and
 334      * {@code cancellationRequested} methods are wired to chain to the
 335      * downstream {@code Sink}.  This implementation takes a downstream
 336      * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
 337      * The implementation of the {@code accept()} method must call the correct
 338      * {@code accept()} method on the downstream {@code Sink}.
 339      */
 340     static abstract class ChainedDouble<E_OUT> implements Sink.OfDouble {
 341         protected final Sink<? super E_OUT> downstream;
 342 
 343         public ChainedDouble(Sink<? super E_OUT> downstream) {
 344             this.downstream = Objects.requireNonNull(downstream);
 345         }
 346 
 347         @Override
 348         public void begin(long size) {
 349             downstream.begin(size);
 350         }
 351 
 352         @Override
 353         public void end() {
 354             downstream.end();
 355         }
 356 
 357         @Override
 358         public boolean cancellationRequested() {
 359             return downstream.cancellationRequested();
 360         }
 361     }
 362 }