54 * or terminal operations are permitted on this stream instance.
55 *
56 * @implNote
57 * <p>For sequential streams, and parallel streams without
58 * <a href="package-summary.html#StreamOps">stateful intermediate
59 * operations</a>, parallel streams, pipeline evaluation is done in a single
60 * pass that "jams" all the operations together. For parallel streams with
61 * stateful operations, execution is divided into segments, where each
62 * stateful operations marks the end of a segment, and each segment is
63 * evaluated separately and the result used as the input to the next
64 * segment. In all cases, the source data is not consumed until a terminal
65 * operation begins.
66 *
67 * @param <E_IN> type of input elements
68 * @param <E_OUT> type of output elements
69 * @param <S> type of the subclass implementing {@code BaseStream}
70 * @since 1.8
71 */
72 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
73 extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
74 /**
75 * Backlink to the head of the pipeline chain (self if this is the source
76 * stage).
77 */
78 @SuppressWarnings("rawtypes")
79 private final AbstractPipeline sourceStage;
80
81 /**
82 * The "upstream" pipeline, or null if this is the source stage.
83 */
84 @SuppressWarnings("rawtypes")
85 private final AbstractPipeline previousStage;
86
87 /**
88 * The operation flags for the intermediate operation represented by this
89 * pipeline object.
90 */
91 protected final int sourceOrOpFlags;
92
93 /**
120 private Spliterator<?> sourceSpliterator;
121
122 /**
123 * The source supplier. Only valid for the head pipeline. Before the
124 * pipeline is consumed if non-null then {@code sourceSpliterator} must be
125 * null. After the pipeline is consumed if non-null then is set to null.
126 */
127 private Supplier<? extends Spliterator<?>> sourceSupplier;
128
129 /**
130 * True if this pipeline has been linked or consumed
131 */
132 private boolean linkedOrConsumed;
133
134 /**
135 * True if there are any stateful ops in the pipeline; only valid for the
136 * source stage.
137 */
138 private boolean sourceAnyStateful;
139
140 /**
141 * True if pipeline is parallel, otherwise the pipeline is sequential; only
142 * valid for the source stage.
143 */
144 private boolean parallel;
145
146 /**
147 * Constructor for the head of a stream pipeline.
148 *
149 * @param source {@code Supplier<Spliterator>} describing the stream source
150 * @param sourceFlags The source flags for the stream source, described in
151 * {@link StreamOpFlag}
152 * @param parallel True if the pipeline is parallel
153 */
154 AbstractPipeline(Supplier<? extends Spliterator<?>> source,
155 int sourceFlags, boolean parallel) {
156 this.previousStage = null;
157 this.sourceSupplier = source;
158 this.sourceStage = this;
159 this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
178 this.sourceSpliterator = source;
179 this.sourceStage = this;
180 this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
181 // The following is an optimization of:
182 // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
183 this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
184 this.depth = 0;
185 this.parallel = parallel;
186 }
187
188 /**
189 * Constructor for appending an intermediate operation stage onto an
190 * existing pipeline.
191 *
192 * @param previousStage the upstream pipeline stage
193 * @param opFlags the operation flags for the new stage, described in
194 * {@link StreamOpFlag}
195 */
196 AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
197 if (previousStage.linkedOrConsumed)
198 throw new IllegalStateException("stream has already been operated upon");
199 previousStage.linkedOrConsumed = true;
200 previousStage.nextStage = this;
201
202 this.previousStage = previousStage;
203 this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
204 this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
205 this.sourceStage = previousStage.sourceStage;
206 if (opIsStateful())
207 sourceStage.sourceAnyStateful = true;
208 this.depth = previousStage.depth + 1;
209 }
210
211
212 // Terminal evaluation methods
213
214 /**
215 * Evaluate the pipeline with a terminal operation to produce a result.
216 *
217 * @param <R> the type of result
218 * @param terminalOp the terminal operation to be applied to the pipeline.
219 * @return the result
220 */
221 final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
222 assert getOutputShape() == terminalOp.inputShape();
223 if (linkedOrConsumed)
224 throw new IllegalStateException("stream has already been operated upon");
225 linkedOrConsumed = true;
226
227 return isParallel()
228 ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
229 : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
230 }
231
232 /**
233 * Collect the elements output from the pipeline stage.
234 *
235 * @param generator the array generator to be used to create array instances
236 * @return a flat array-backed Node that holds the collected output elements
237 */
238 @SuppressWarnings("unchecked")
239 final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
240 if (linkedOrConsumed)
241 throw new IllegalStateException("stream has already been operated upon");
242 linkedOrConsumed = true;
243
244 // If the last intermediate operation is stateful then
245 // evaluate directly to avoid an extra collection step
246 if (isParallel() && previousStage != null && opIsStateful()) {
247 return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
248 }
249 else {
250 return evaluate(sourceSpliterator(0), true, generator);
251 }
252 }
253
254 /**
255 * Gets the source stage spliterator if this pipeline stage is the source
256 * stage. The pipeline is consumed after this method is called and
257 * returns successfully.
258 *
259 * @return the source stage spliterator
260 * @throws IllegalStateException if this pipeline stage is not the source
261 * stage.
262 */
263 @SuppressWarnings("unchecked")
264 final Spliterator<E_OUT> sourceStageSpliterator() {
265 if (this != sourceStage)
266 throw new IllegalStateException();
267
268 if (linkedOrConsumed)
269 throw new IllegalStateException("stream has already been operated upon");
270 linkedOrConsumed = true;
271
272 if (sourceStage.sourceSpliterator != null) {
273 @SuppressWarnings("unchecked")
274 Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
275 sourceStage.sourceSpliterator = null;
276 return s;
277 }
278 else if (sourceStage.sourceSupplier != null) {
279 @SuppressWarnings("unchecked")
280 Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
281 sourceStage.sourceSupplier = null;
282 return s;
283 }
284 else {
285 throw new IllegalStateException("source already consumed");
286 }
287 }
288
289 // BaseStream
290
291 @Override
292 @SuppressWarnings("unchecked")
293 public final S sequential() {
294 sourceStage.parallel = false;
295 return (S) this;
296 }
297
298 @Override
299 @SuppressWarnings("unchecked")
300 public final S parallel() {
301 sourceStage.parallel = true;
302 return (S) this;
303 }
304
305 // Primitive specialization use co-variant overrides, hence is not final
306 @Override
307 @SuppressWarnings("unchecked")
308 public Spliterator<E_OUT> spliterator() {
309 if (linkedOrConsumed)
310 throw new IllegalStateException("stream has already been operated upon");
311 linkedOrConsumed = true;
312
313 if (this == sourceStage) {
314 if (sourceStage.sourceSpliterator != null) {
315 @SuppressWarnings("unchecked")
316 Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;
317 sourceStage.sourceSpliterator = null;
318 return s;
319 }
320 else if (sourceStage.sourceSupplier != null) {
321 @SuppressWarnings("unchecked")
322 Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
323 sourceStage.sourceSupplier = null;
324 return lazySpliterator(s);
325 }
326 else {
327 throw new IllegalStateException("source already consumed");
328 }
329 }
330 else {
331 return wrap(this, () -> sourceSpliterator(0), isParallel());
332 }
333 }
334
335 @Override
336 public final boolean isParallel() {
337 return sourceStage.parallel;
338 }
339
340
341 /**
342 * Returns the composition of stream flags of the stream source and all
343 * intermediate operations.
344 *
345 * @return the composition of stream flags of the stream source and all
346 * intermediate operations
347 * @see StreamOpFlag
407 /**
408 * Get the source spliterator for this pipeline stage. For a sequential or
409 * stateless parallel pipeline, this is the source spliterator. For a
410 * stateful parallel pipeline, this is a spliterator describing the results
411 * of all computations up to and including the most recent stateful
412 * operation.
413 */
414 @SuppressWarnings("unchecked")
415 private Spliterator<?> sourceSpliterator(int terminalFlags) {
416 // Get the source spliterator of the pipeline
417 Spliterator<?> spliterator = null;
418 if (sourceStage.sourceSpliterator != null) {
419 spliterator = sourceStage.sourceSpliterator;
420 sourceStage.sourceSpliterator = null;
421 }
422 else if (sourceStage.sourceSupplier != null) {
423 spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
424 sourceStage.sourceSupplier = null;
425 }
426 else {
427 throw new IllegalStateException("source already consumed");
428 }
429
430 if (isParallel()) {
431 // @@@ Merge parallelPrepare with the loop below and use the
432 // spliterator characteristics to determine if SIZED
433 // should be injected
434 parallelPrepare(terminalFlags);
435
436 // Adapt the source spliterator, evaluating each stateful op
437 // in the pipeline up to and including this pipeline stage
438 for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
439 u != e;
440 u = p, p = p.nextStage) {
441
442 if (p.opIsStateful()) {
443 spliterator = p.opEvaluateParallelLazy(u, spliterator);
444 }
445 }
446 }
447 else if (terminalFlags != 0) {
|
54 * or terminal operations are permitted on this stream instance.
55 *
56 * @implNote
57 * <p>For sequential streams, and parallel streams without
58 * <a href="package-summary.html#StreamOps">stateful intermediate
59 * operations</a>, parallel streams, pipeline evaluation is done in a single
60 * pass that "jams" all the operations together. For parallel streams with
61 * stateful operations, execution is divided into segments, where each
62 * stateful operations marks the end of a segment, and each segment is
63 * evaluated separately and the result used as the input to the next
64 * segment. In all cases, the source data is not consumed until a terminal
65 * operation begins.
66 *
67 * @param <E_IN> type of input elements
68 * @param <E_OUT> type of output elements
69 * @param <S> type of the subclass implementing {@code BaseStream}
70 * @since 1.8
71 */
72 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
73 extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
74 private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
75 private static final String MSG_CONSUMED = "source already consumed or closed";
76
77 /**
78 * Backlink to the head of the pipeline chain (self if this is the source
79 * stage).
80 */
81 @SuppressWarnings("rawtypes")
82 private final AbstractPipeline sourceStage;
83
84 /**
85 * The "upstream" pipeline, or null if this is the source stage.
86 */
87 @SuppressWarnings("rawtypes")
88 private final AbstractPipeline previousStage;
89
90 /**
91 * The operation flags for the intermediate operation represented by this
92 * pipeline object.
93 */
94 protected final int sourceOrOpFlags;
95
96 /**
123 private Spliterator<?> sourceSpliterator;
124
125 /**
126 * The source supplier. Only valid for the head pipeline. Before the
127 * pipeline is consumed if non-null then {@code sourceSpliterator} must be
128 * null. After the pipeline is consumed if non-null then is set to null.
129 */
130 private Supplier<? extends Spliterator<?>> sourceSupplier;
131
132 /**
133 * True if this pipeline has been linked or consumed
134 */
135 private boolean linkedOrConsumed;
136
137 /**
138 * True if there are any stateful ops in the pipeline; only valid for the
139 * source stage.
140 */
141 private boolean sourceAnyStateful;
142
143 private Runnable sourceCloseAction;
144
145 /**
146 * True if pipeline is parallel, otherwise the pipeline is sequential; only
147 * valid for the source stage.
148 */
149 private boolean parallel;
150
151 /**
152 * Constructor for the head of a stream pipeline.
153 *
154 * @param source {@code Supplier<Spliterator>} describing the stream source
155 * @param sourceFlags The source flags for the stream source, described in
156 * {@link StreamOpFlag}
157 * @param parallel True if the pipeline is parallel
158 */
159 AbstractPipeline(Supplier<? extends Spliterator<?>> source,
160 int sourceFlags, boolean parallel) {
161 this.previousStage = null;
162 this.sourceSupplier = source;
163 this.sourceStage = this;
164 this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
183 this.sourceSpliterator = source;
184 this.sourceStage = this;
185 this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
186 // The following is an optimization of:
187 // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
188 this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
189 this.depth = 0;
190 this.parallel = parallel;
191 }
192
193 /**
194 * Constructor for appending an intermediate operation stage onto an
195 * existing pipeline.
196 *
197 * @param previousStage the upstream pipeline stage
198 * @param opFlags the operation flags for the new stage, described in
199 * {@link StreamOpFlag}
200 */
201 AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
202 if (previousStage.linkedOrConsumed)
203 throw new IllegalStateException(MSG_STREAM_LINKED);
204 previousStage.linkedOrConsumed = true;
205 previousStage.nextStage = this;
206
207 this.previousStage = previousStage;
208 this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
209 this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
210 this.sourceStage = previousStage.sourceStage;
211 if (opIsStateful())
212 sourceStage.sourceAnyStateful = true;
213 this.depth = previousStage.depth + 1;
214 }
215
216
217 // Terminal evaluation methods
218
219 /**
220 * Evaluate the pipeline with a terminal operation to produce a result.
221 *
222 * @param <R> the type of result
223 * @param terminalOp the terminal operation to be applied to the pipeline.
224 * @return the result
225 */
226 final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
227 assert getOutputShape() == terminalOp.inputShape();
228 if (linkedOrConsumed)
229 throw new IllegalStateException(MSG_STREAM_LINKED);
230 linkedOrConsumed = true;
231
232 return isParallel()
233 ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
234 : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
235 }
236
237 /**
238 * Collect the elements output from the pipeline stage.
239 *
240 * @param generator the array generator to be used to create array instances
241 * @return a flat array-backed Node that holds the collected output elements
242 */
243 @SuppressWarnings("unchecked")
244 final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
245 if (linkedOrConsumed)
246 throw new IllegalStateException(MSG_STREAM_LINKED);
247 linkedOrConsumed = true;
248
249 // If the last intermediate operation is stateful then
250 // evaluate directly to avoid an extra collection step
251 if (isParallel() && previousStage != null && opIsStateful()) {
252 return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
253 }
254 else {
255 return evaluate(sourceSpliterator(0), true, generator);
256 }
257 }
258
259 /**
260 * Gets the source stage spliterator if this pipeline stage is the source
261 * stage. The pipeline is consumed after this method is called and
262 * returns successfully.
263 *
264 * @return the source stage spliterator
265 * @throws IllegalStateException if this pipeline stage is not the source
266 * stage.
267 */
268 @SuppressWarnings("unchecked")
269 final Spliterator<E_OUT> sourceStageSpliterator() {
270 if (this != sourceStage)
271 throw new IllegalStateException();
272
273 if (linkedOrConsumed)
274 throw new IllegalStateException(MSG_STREAM_LINKED);
275 linkedOrConsumed = true;
276
277 if (sourceStage.sourceSpliterator != null) {
278 @SuppressWarnings("unchecked")
279 Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
280 sourceStage.sourceSpliterator = null;
281 return s;
282 }
283 else if (sourceStage.sourceSupplier != null) {
284 @SuppressWarnings("unchecked")
285 Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
286 sourceStage.sourceSupplier = null;
287 return s;
288 }
289 else {
290 throw new IllegalStateException(MSG_CONSUMED);
291 }
292 }
293
294 // BaseStream
295
296 @Override
297 @SuppressWarnings("unchecked")
298 public final S sequential() {
299 sourceStage.parallel = false;
300 return (S) this;
301 }
302
303 @Override
304 @SuppressWarnings("unchecked")
305 public final S parallel() {
306 sourceStage.parallel = true;
307 return (S) this;
308 }
309
310 @Override
311 public void close() {
312 linkedOrConsumed = true;
313 sourceSupplier = null;
314 sourceSpliterator = null;
315 if (sourceStage.sourceCloseAction != null) {
316 Runnable closeAction = sourceStage.sourceCloseAction;
317 sourceStage.sourceCloseAction = null;
318 closeAction.run();
319 }
320 }
321
322 @Override
323 @SuppressWarnings("unchecked")
324 public S onClose(Runnable closeHandler) {
325 Runnable existingHandler = sourceStage.sourceCloseAction;
326 sourceStage.sourceCloseAction =
327 (existingHandler == null)
328 ? closeHandler
329 : Streams.composeWithExceptions(existingHandler, closeHandler);
330 return (S) this;
331 }
332
333 // Primitive specialization use co-variant overrides, hence is not final
334 @Override
335 @SuppressWarnings("unchecked")
336 public Spliterator<E_OUT> spliterator() {
337 if (linkedOrConsumed)
338 throw new IllegalStateException(MSG_STREAM_LINKED);
339 linkedOrConsumed = true;
340
341 if (this == sourceStage) {
342 if (sourceStage.sourceSpliterator != null) {
343 @SuppressWarnings("unchecked")
344 Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;
345 sourceStage.sourceSpliterator = null;
346 return s;
347 }
348 else if (sourceStage.sourceSupplier != null) {
349 @SuppressWarnings("unchecked")
350 Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
351 sourceStage.sourceSupplier = null;
352 return lazySpliterator(s);
353 }
354 else {
355 throw new IllegalStateException(MSG_CONSUMED);
356 }
357 }
358 else {
359 return wrap(this, () -> sourceSpliterator(0), isParallel());
360 }
361 }
362
363 @Override
364 public final boolean isParallel() {
365 return sourceStage.parallel;
366 }
367
368
369 /**
370 * Returns the composition of stream flags of the stream source and all
371 * intermediate operations.
372 *
373 * @return the composition of stream flags of the stream source and all
374 * intermediate operations
375 * @see StreamOpFlag
435 /**
436 * Get the source spliterator for this pipeline stage. For a sequential or
437 * stateless parallel pipeline, this is the source spliterator. For a
438 * stateful parallel pipeline, this is a spliterator describing the results
439 * of all computations up to and including the most recent stateful
440 * operation.
441 */
442 @SuppressWarnings("unchecked")
443 private Spliterator<?> sourceSpliterator(int terminalFlags) {
444 // Get the source spliterator of the pipeline
445 Spliterator<?> spliterator = null;
446 if (sourceStage.sourceSpliterator != null) {
447 spliterator = sourceStage.sourceSpliterator;
448 sourceStage.sourceSpliterator = null;
449 }
450 else if (sourceStage.sourceSupplier != null) {
451 spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
452 sourceStage.sourceSupplier = null;
453 }
454 else {
455 throw new IllegalStateException(MSG_CONSUMED);
456 }
457
458 if (isParallel()) {
459 // @@@ Merge parallelPrepare with the loop below and use the
460 // spliterator characteristics to determine if SIZED
461 // should be injected
462 parallelPrepare(terminalFlags);
463
464 // Adapt the source spliterator, evaluating each stateful op
465 // in the pipeline up to and including this pipeline stage
466 for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
467 u != e;
468 u = p, p = p.nextStage) {
469
470 if (p.opIsStateful()) {
471 spliterator = p.opEvaluateParallelLazy(u, spliterator);
472 }
473 }
474 }
475 else if (terminalFlags != 0) {
|