1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.function.Consumer;
  29 import java.util.function.DoubleConsumer;
  30 import java.util.function.IntConsumer;
  31 import java.util.function.LongConsumer;
  32 
  33 /**
  34  * An extension of {@link Consumer} used to conduct values through the stages of
  35  * a stream pipeline, with additional methods to manage size information,
  36  * control flow, etc.  Before calling the {@code accept()} method on a
  37  * {@code Sink} for the first time, you must first call the {@code begin()}
  38  * method to inform it that data is coming (optionally informing the sink how
  39  * much data is coming), and after all data has been sent, you must call the
  40  * {@code end()} method.  After calling {@code end()}, you should not call
  41  * {@code accept()} without again calling {@code begin()}.  {@code Sink} also
  42  * offers a mechanism by which the sink can cooperatively signal that it does
  43  * not wish to receive any more data (the {@code cancellationRequested()}
  44  * method), which a source can poll before sending more data to the
  45  * {@code Sink}.
  46  *
  47  * <p>A sink may be in one of two states: an initial state and an active state.
  48  * It starts out in the initial state; the {@code begin()} method transitions
  49  * it to the active state, and the {@code end()} method transitions it back into
  50  * the initial state, where it can be re-used.  Data-accepting methods (such as
  51  * {@code accept()} are only valid in the active state.
  52  *
  53  * @apiNote
  54  * A stream pipeline consists of a source, zero or more intermediate stages
  55  * (such as filtering or mapping), and a terminal stage, such as reduction or
  56  * for-each.  For concreteness, consider the pipeline:
  57  *
  58  * <pre>{@code
  59  *     int longestStringLengthStartingWithA
  60  *         = strings.stream()
  61  *                  .filter(s -> s.startsWith("A"))
  62  *                  .mapToInt(String::length)
  63  *                  .max();
  64  * }</pre>
  65  *
  66  * <p>Here, we have three stages, filtering, mapping, and reducing.  The
  67  * filtering stage consumes strings and emits a subset of those strings; the
  68  * mapping stage consumes strings and emits ints; the reduction stage consumes
  69  * those ints and computes the maximal value.
  70  *
  71  * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
  72  * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
  73  * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
  74  * not need a specialized interface for each primitive specialization.  (It
  75  * might be called a "kitchen sink" for this omnivorous tendency.)  The entry
  76  * point to the pipeline is the {@code Sink} for the filtering stage, which
  77  * sends some elements "downstream" -- into the {@code Sink} for the mapping
  78  * stage, which in turn sends integral values downstream into the {@code Sink}
  79  * for the reduction stage. The {@code Sink} implementations associated with a
  80  * given stage is expected to know the data type for the next stage, and call
  81  * the correct {@code accept} method on its downstream {@code Sink}.  Similarly,
  82  * each stage must implement the correct {@code accept} method corresponding to
  83  * the data type it accepts.
  84  *
  85  * <p>The specialized subtypes such as {@link Sink.OfInt} override
  86  * {@code accept(Object)} to call the appropriate primitive specialization of
  87  * {@code accept}, implement the appropriate primitive specialization of
  88  * {@code Consumer}, and re-abstract the appropriate primitive specialization of
  89  * {@code accept}.
  90  *
  91  * <p>The chaining subtypes such as {@link ChainedInt} not only implement
  92  * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
  93  * represents the downstream {@code Sink}, and implement the methods
  94  * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
  95  * delegate to the downstream {@code Sink}.  Most implementations of
  96  * intermediate operations will use these chaining wrappers.  For example, the
  97  * mapping stage in the above example would look like:
  98  *
  99  * <pre>{@code
 100  *     IntSink is = new Sink.ChainedReference<U>(sink) {
 101  *         public void accept(U u) {
 102  *             downstream.accept(mapper.applyAsInt(u));
 103  *         }
 104  *     };
 105  * }</pre>
 106  *
 107  * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
 108  * to receive elements of type {@code U} as input, and pass the downstream sink
 109  * to the constructor.  Because the next stage expects to receive integers, we
 110  * must call the {@code accept(int)} method when emitting values to the downstream.
 111  * The {@code accept()} method applies the mapping function from {@code U} to
 112  * {@code int} and passes the resulting value to the downstream {@code Sink}.
 113  *
 114  * @param <T> type of elements for value streams
 115  * @since 1.8
 116  */
 117 interface Sink<T> extends Consumer<T> {
 118     /**
 119      * Resets the sink state to receive a fresh data set.  This must be called
 120      * before sending any data to the sink.  After calling {@link #end()},
 121      * you may call this method to reset the sink for another calculation.
 122      * @param size The exact size of the data to be pushed downstream, if
 123      * known or {@code -1} if unknown or infinite.
 124      *
 125      * <p>Prior to this call, the sink must be in the initial state, and after
 126      * this call it is in the active state.
 127      */
 128     default void begin(long size) {}
 129 
 130     /**
 131      * Indicates that all elements have been pushed.  If the {@code Sink} is
 132      * stateful, it should send any stored state downstream at this time, and
 133      * should clear any accumulated state (and associated resources).
 134      *
 135      * <p>Prior to this call, the sink must be in the active state, and after
 136      * this call it is returned to the initial state.
 137      */
 138     default void end() {}
 139 
 140     /**
 141      * Indicates that this {@code Sink} does not wish to receive any more data.
 142      *
 143      * @implSpec The default implementation always returns false.
 144      *
 145      * @return true if cancellation is requested
 146      */
 147     default boolean cancellationRequested() {
 148         return false;
 149     }
 150 
 151     /**
 152      * Accepts an int value.
 153      *
 154      * @implSpec The default implementation throws IllegalStateException.
 155      *
 156      * @throws IllegalStateException if this sink does not accept int values
 157      */
 158     default void accept(int value) {
 159         throw new IllegalStateException("called wrong accept method");
 160     }
 161 
 162     /**
 163      * Accepts a long value.
 164      *
 165      * @implSpec The default implementation throws IllegalStateException.
 166      *
 167      * @throws IllegalStateException if this sink does not accept long values
 168      */
 169     default void accept(long value) {
 170         throw new IllegalStateException("called wrong accept method");
 171     }
 172 
 173     /**
 174      * Accepts a double value.
 175      *
 176      * @implSpec The default implementation throws IllegalStateException.
 177      *
 178      * @throws IllegalStateException if this sink does not accept double values
 179      */
 180     default void accept(double value) {
 181         throw new IllegalStateException("called wrong accept method");
 182     }
 183 
 184     /**
 185      * {@code Sink} that implements {@code Sink<Integer>}, re-abstracts
 186      * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to
 187      * {@code accept(int)}.
 188      */
 189     interface OfInt extends Sink<Integer>, IntConsumer {
 190         @Override
 191         void accept(int value);
 192 
 193         @Override
 194         default void accept(Integer i) {
 195             if (Tripwire.ENABLED)
 196                 Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
 197             accept(i.intValue());
 198         }
 199     }
 200 
 201     /**
 202      * {@code Sink} that implements {@code Sink<Long>}, re-abstracts
 203      * {@code accept(long)}, and wires {@code accept(Long)} to bridge to
 204      * {@code accept(long)}.
 205      */
 206     interface OfLong extends Sink<Long>, LongConsumer {
 207         @Override
 208         void accept(long value);
 209 
 210         @Override
 211         default void accept(Long i) {
 212             if (Tripwire.ENABLED)
 213                 Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
 214             accept(i.longValue());
 215         }
 216     }
 217 
 218     /**
 219      * {@code Sink} that implements {@code Sink<Double>}, re-abstracts
 220      * {@code accept(double)}, and wires {@code accept(Double)} to bridge to
 221      * {@code accept(double)}.
 222      */
 223     interface OfDouble extends Sink<Double>, DoubleConsumer {
 224         @Override
 225         void accept(double value);
 226 
 227         @Override
 228         default void accept(Double i) {
 229             if (Tripwire.ENABLED)
 230                 Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
 231             accept(i.doubleValue());
 232         }
 233     }
 234 
 235     /**
 236      * Abstract {@code Sink} implementation for creating chains of
 237      * sinks.  The {@code begin}, {@code end}, and
 238      * {@code cancellationRequested} methods are wired to chain to the
 239      * downstream {@code Sink}.  This implementation takes a downstream
 240      * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
 241      * implementation of the {@code accept()} method must call the correct
 242      * {@code accept()} method on the downstream {@code Sink}.
 243      */
 244     static abstract class ChainedReference<T> implements Sink<T> {
 245         @SuppressWarnings("rawtypes")
 246         protected final Sink downstream;
 247 
 248         public ChainedReference(Sink downstream) {
 249             this.downstream = Objects.requireNonNull(downstream);
 250         }
 251 
 252         @Override
 253         public void begin(long size) {
 254             downstream.begin(size);
 255         }
 256 
 257         @Override
 258         public void end() {
 259             downstream.end();
 260         }
 261 
 262         @Override
 263         public boolean cancellationRequested() {
 264             return downstream.cancellationRequested();
 265         }
 266     }
 267 
 268     /**
 269      * Abstract {@code Sink} implementation designed for creating chains of
 270      * sinks.  The {@code begin}, {@code end}, and
 271      * {@code cancellationRequested} methods are wired to chain to the
 272      * downstream {@code Sink}.  This implementation takes a downstream
 273      * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}.
 274      * The implementation of the {@code accept()} method must call the correct
 275      * {@code accept()} method on the downstream {@code Sink}.
 276      */
 277     static abstract class ChainedInt implements Sink.OfInt {
 278         @SuppressWarnings("rawtypes")
 279         protected final Sink downstream;
 280 
 281         public ChainedInt(Sink downstream) {
 282             this.downstream = Objects.requireNonNull(downstream);
 283         }
 284 
 285         @Override
 286         public void begin(long size) {
 287             downstream.begin(size);
 288         }
 289 
 290         @Override
 291         public void end() {
 292             downstream.end();
 293         }
 294 
 295         @Override
 296         public boolean cancellationRequested() {
 297             return downstream.cancellationRequested();
 298         }
 299     }
 300 
 301     /**
 302      * Abstract {@code Sink} implementation designed for creating chains of
 303      * sinks.  The {@code begin}, {@code end}, and
 304      * {@code cancellationRequested} methods are wired to chain to the
 305      * downstream {@code Sink}.  This implementation takes a downstream
 306      * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}.
 307      * The implementation of the {@code accept()} method must call the correct
 308      * {@code accept()} method on the downstream {@code Sink}.
 309      */
 310     static abstract class ChainedLong implements Sink.OfLong {
 311         @SuppressWarnings("rawtypes")
 312         protected final Sink downstream;
 313 
 314         public ChainedLong(Sink downstream) {
 315             this.downstream = Objects.requireNonNull(downstream);
 316         }
 317 
 318         @Override
 319         public void begin(long size) {
 320             downstream.begin(size);
 321         }
 322 
 323         @Override
 324         public void end() {
 325             downstream.end();
 326         }
 327 
 328         @Override
 329         public boolean cancellationRequested() {
 330             return downstream.cancellationRequested();
 331         }
 332     }
 333 
 334     /**
 335      * Abstract {@code Sink} implementation designed for creating chains of
 336      * sinks.  The {@code begin}, {@code end}, and
 337      * {@code cancellationRequested} methods are wired to chain to the
 338      * downstream {@code Sink}.  This implementation takes a downstream
 339      * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
 340      * The implementation of the {@code accept()} method must call the correct
 341      * {@code accept()} method on the downstream {@code Sink}.
 342      */
 343     static abstract class ChainedDouble implements Sink.OfDouble {
 344         @SuppressWarnings("rawtypes")
 345         protected final Sink downstream;
 346 
 347         public ChainedDouble(Sink downstream) {
 348             this.downstream = Objects.requireNonNull(downstream);
 349         }
 350 
 351         @Override
 352         public void begin(long size) {
 353             downstream.begin(size);
 354         }
 355 
 356         @Override
 357         public void end() {
 358             downstream.end();
 359         }
 360 
 361         @Override
 362         public boolean cancellationRequested() {
 363             return downstream.cancellationRequested();
 364         }
 365     }
 366 }