1 /* 2 * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.function.Consumer; 29 import java.util.function.DoubleConsumer; 30 import java.util.function.IntConsumer; 31 import java.util.function.LongConsumer; 32 33 /** 34 * An extension of {@link Consumer} used to conduct values through the stages of a stream pipeline, with additional 35 * methods to manage size information, control flow, etc. Before calling the {@code accept()} method on a 36 * {@code Sink} for the first time, you must first call the {@code begin()} method to inform it that data is coming 37 * (optionally informing the sink how much data is coming), and after all data has been sent, you must call the 38 * {@code end()} method. After calling {@code end()}, you should not call {@code accept()} without again 39 * calling {@code begin()}. {@code Sink} also offers a mechanism by which the sink can cooperatively signal that 40 * it does not wish to receive any more data (the {@code cancellationRequested()} method), which a source can poll 41 * before sending more data to the {@code Sink}. 42 * 43 * @apiNote 44 * 45 * A stream pipeline consists of a source, zero or more intermediate stages (such as filtering or mapping), and a 46 * terminal stage, such as reduction or for-each. For concreteness, consider the pipeline: 47 * 48 * <pre> 49 * int longestStringLengthStartingWithA 50 * = strings.stream() 51 * .filter(s -> s.startsWith("A")) 52 * .map(String::length) 53 * .max(); 54 * </pre> 55 * 56 * Here, we have three stages, filtering, mapping, and reducing. The filtering stage consumes strings and emits 57 * a subset of those strings; the mapping stage consumes strings and emits ints; the reduction stage consumes those 58 * ints and computes the maximal value. 59 * 60 * A {@code Sink} instance is used to represent each stage of this pipeline, whether the stage accepts objects, 61 * ints, longs, or doubles. Sink has entry points for {@code accept(Object)}, {@code accept(int)}, etc, so that we 62 * do not need a specialized interface for each primitive specialization. (It might be called a "kitchen sink" 63 * for this omnivorous tendency.) The entry point to the pipeline is the {@code Sink} for the filtering stage, 64 * which sends some elements "downstream" -- into the {@code Sink} for the mapping stage, which in turn sends integral 65 * values downstream into the {@code Sink} for the reduction stage. The {@code Sink} implementations associated 66 * with a given stage is expected to know the data type for the next stage, and call the correct {@code accept} method 67 * on its downstream {@code Sink}. Similarly, each stage must implement the correct {@code accept} method 68 * corresponding to the data type it accepts. 69 * 70 * The specialized subtypes such as {@link Sink.OfInt} bridge {@code accept(Object)} to call the appropriate 71 * primitive specialization of {@code accept}, implement the appropriate primitive specialization of {@code Consumer}, 72 * and reabstract the appropriate primitive specialization of {@code accept}. 73 * 74 * The chaining subtypes such as {@link ChainedInt} not only implement {@code Sink.OfInt}, but also maintain a 75 * {@code downstream} field which represents the downstream {@code Sink}, and implement the methods {@code begin()}, 76 * {@code end()}, and {@code cancellationRequested()} to delegate to the downstream {@code Sink}. 77 * Most implementations of intermediate operations will use these chaining wrappers. For example, the mapping stage 78 * in the above example would look like: 79 * 80 * <pre> 81 * IntSink is = new Sink.ChainedReference<U>(sink) { 82 * public void accept(U u) { 83 * downstream.accept(mapper.applyAsInt(u)); 84 * } 85 * }; 86 * </pre> 87 * 88 * Here, we implement {@code Sink.ChanedReference<U>}, meaning that we expect to receive elements of type {@code U} 89 * as input, and pass the downstream sink to the constructor. Because the next stage expects to receive integers, 90 * we must call the {@code accept(int)} method when emitting values to the downstream. The {@code accept()} method 91 * applies the mapping function from {@code U} to {@code int} and passes the resulting value to the downstream 92 * {@code Sink}. 93 * 94 * @param <T> Type of elements for value streams 95 * @since 1.8 96 */ 97 @FunctionalInterface 98 interface Sink<T> extends Consumer<T> { 99 /** 100 * Reset the sink state to receive a fresh data set. This is used when a 101 * {@code Sink} is being reused by multiple calculations. 102 * @param size The exact size of the data to be pushed downstream, if 103 * known or {@code Long.MAX_VALUE} if unknown or infinite. 104 */ 105 default void begin(long size) {} 106 107 /** 108 * Indicate that all elements have been pushed. If the {@code Sink} buffers any 109 * results from previous values, they should dump their contents downstream and 110 * clear any stored state. 111 */ 112 default void end() {} 113 114 /** 115 * Used to communicate to upstream sources that this {@code Sink} does not wish to receive 116 * any more data 117 * @return 118 */ 119 default boolean cancellationRequested() { 120 return false; 121 } 122 123 /** 124 * Accept an int value 125 * @implSpec The default implementation throws IllegalStateException 126 * 127 * @throws IllegalStateException If this sink does not accept int values 128 */ 129 default void accept(int value) { 130 throw new IllegalStateException("called wrong accept method"); 131 } 132 133 /** 134 * Accept a long value 135 * @implSpec The default implementation throws IllegalStateException 136 * 137 * @throws IllegalStateException If this sink does not accept long values 138 */ 139 default void accept(long value) { 140 throw new IllegalStateException("called wrong accept method"); 141 } 142 143 /** 144 * Accept a double value 145 * @implSpec The default implementation throws IllegalStateException 146 * 147 * @throws IllegalStateException If this sink does not accept double values 148 */ 149 default void accept(double value) { 150 throw new IllegalStateException("called wrong accept method"); 151 } 152 153 /** 154 * {@code Sink} that implements {@code Sink<Integer>}, reabstracts {@code accept(int)}, 155 * and wires {@code accept(Integer)} to bridge to {@code accept(int)}. 156 */ 157 @FunctionalInterface 158 interface OfInt extends Sink<Integer>, IntConsumer { 159 @Override 160 void accept(int value); 161 162 @Override 163 default void accept(Integer i) { 164 if (Tripwire.enabled) 165 Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)"); 166 accept(i.intValue()); 167 } 168 } 169 170 /** 171 * {@code Sink} that implements {@code Sink<Long>}, reabstracts {@code accept(long)}, 172 * and wires {@code accept(Long)} to bridge to {@code accept(long)}. 173 */ 174 @FunctionalInterface 175 interface OfLong extends Sink<Long>, LongConsumer { 176 @Override 177 void accept(long value); 178 179 @Override 180 default void accept(Long i) { 181 if (Tripwire.enabled) 182 Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)"); 183 accept(i.longValue()); 184 } 185 } 186 187 /** 188 * {@code Sink} that implements {@code Sink<Double>}, reabstracts {@code accept(double)}, 189 * and wires {@code accept(Double)} to bridge to {@code accept(double)}. 190 */ 191 @FunctionalInterface 192 interface OfDouble extends Sink<Double>, DoubleConsumer { 193 @Override 194 void accept(double value); 195 196 @Override 197 default void accept(Double i) { 198 if (Tripwire.enabled) 199 Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)"); 200 accept(i.doubleValue()); 201 } 202 } 203 204 /** 205 * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and 206 * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream 207 * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape 208 * and produces a {@code Sink<T>}. The implementation of the {@code accept()} method must 209 * call the correct {@code accept()} method on the downstream {@code Sink}. 210 */ 211 static abstract class ChainedReference<T> implements Sink<T> { 212 protected final Sink downstream; 213 214 public ChainedReference(Sink downstream) { 215 this.downstream = Objects.requireNonNull(downstream); 216 } 217 218 @Override 219 public void begin(long size) { 220 downstream.begin(size); 221 } 222 223 @Override 224 public void end() { 225 downstream.end(); 226 } 227 228 @Override 229 public boolean cancellationRequested() { 230 return downstream.cancellationRequested(); 231 } 232 } 233 234 /** 235 * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and 236 * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream 237 * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape 238 * and produces a {@code Sink.OfInt}. The implementation of the {@code accept()} method must 239 * call the correct {@code accept()} method on the downstream {@code Sink}. 240 */ 241 static abstract class ChainedInt implements Sink.OfInt { 242 protected final Sink downstream; 243 244 public ChainedInt(Sink downstream) { 245 this.downstream = Objects.requireNonNull(downstream); 246 } 247 248 @Override 249 public void begin(long size) { 250 downstream.begin(size); 251 } 252 253 @Override 254 public void end() { 255 downstream.end(); 256 } 257 258 @Override 259 public boolean cancellationRequested() { 260 return downstream.cancellationRequested(); 261 } 262 } 263 264 /** 265 * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and 266 * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream 267 * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape 268 * and produces a {@code Sink.OfLong}. The implementation of the {@code accept()} method must 269 * call the correct {@code accept()} method on the downstream {@code Sink}. 270 */ 271 static abstract class ChainedLong implements Sink.OfLong { 272 protected final Sink downstream; 273 274 public ChainedLong(Sink downstream) { 275 this.downstream = Objects.requireNonNull(downstream); 276 } 277 278 @Override 279 public void begin(long size) { 280 downstream.begin(size); 281 } 282 283 @Override 284 public void end() { 285 downstream.end(); 286 } 287 288 @Override 289 public boolean cancellationRequested() { 290 return downstream.cancellationRequested(); 291 } 292 } 293 294 /** 295 * {@code Sink} implementation designed for creating chains of sinks. The {@code begin} and 296 * {@code end}, and {@code cancellationRequested} methods are wired to chain to the downstream 297 * {@code Sink}. This implementation takes a downstream {@code Sink} of unknown input shape 298 * and produces a {@code Sink.OfDouble}. The implementation of the {@code accept()} method must 299 * call the correct {@code accept()} method on the downstream {@code Sink}. 300 */ 301 static abstract class ChainedDouble implements Sink.OfDouble { 302 protected final Sink downstream; 303 304 public ChainedDouble(Sink downstream) { 305 this.downstream = Objects.requireNonNull(downstream); 306 } 307 308 @Override 309 public void begin(long size) { 310 downstream.begin(size); 311 } 312 313 @Override 314 public void end() { 315 downstream.end(); 316 } 317 318 @Override 319 public boolean cancellationRequested() { 320 return downstream.cancellationRequested(); 321 } 322 } 323 }