54 * or terminal operations are permitted on this stream instance.
55 *
56 * @implNote
57 * <p>For sequential streams, and parallel streams without
58 * <a href="package-summary.html#StreamOps">stateful intermediate
59 * operations</a>, parallel streams, pipeline evaluation is done in a single
60 * pass that "jams" all the operations together. For parallel streams with
61 * stateful operations, execution is divided into segments, where each
62 * stateful operations marks the end of a segment, and each segment is
63 * evaluated separately and the result used as the input to the next
64 * segment. In all cases, the source data is not consumed until a terminal
65 * operation begins.
66 *
67 * @param <E_IN> type of input elements
68 * @param <E_OUT> type of output elements
69 * @param <S> type of the subclass implementing {@code BaseStream}
70 * @since 1.8
71 */
72 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
73 extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
74 /**
75 * Backlink to the head of the pipeline chain (self if this is the source
76 * stage).
77 */
78 private final AbstractPipeline sourceStage;
79
80 /**
81 * The "upstream" pipeline, or null if this is the source stage.
82 */
83 private final AbstractPipeline previousStage;
84
85 /**
86 * The operation flags for the intermediate operation represented by this
87 * pipeline object.
88 */
89 protected final int sourceOrOpFlags;
90
91 /**
92 * The next stage in the pipeline, or null if this is the last stage.
93 * Effectively final at the point of linking to the next pipeline.
117 private Spliterator<?> sourceSpliterator;
118
119 /**
120 * The source supplier. Only valid for the head pipeline. Before the
121 * pipeline is consumed if non-null then {@code sourceSpliterator} must be
122 * null. After the pipeline is consumed if non-null then is set to null.
123 */
124 private Supplier<? extends Spliterator<?>> sourceSupplier;
125
126 /**
127 * True if this pipeline has been linked or consumed
128 */
129 private boolean linkedOrConsumed;
130
131 /**
132 * True if there are any stateful ops in the pipeline; only valid for the
133 * source stage.
134 */
135 private boolean sourceAnyStateful;
136
137 /**
138 * True if pipeline is parallel, otherwise the pipeline is sequential; only
139 * valid for the source stage.
140 */
141 private boolean parallel;
142
143 /**
144 * Constructor for the head of a stream pipeline.
145 *
146 * @param source {@code Supplier<Spliterator>} describing the stream source
147 * @param sourceFlags The source flags for the stream source, described in
148 * {@link StreamOpFlag}
149 * @param parallel True if the pipeline is parallel
150 */
151 AbstractPipeline(Supplier<? extends Spliterator<?>> source,
152 int sourceFlags, boolean parallel) {
153 this.previousStage = null;
154 this.sourceSupplier = source;
155 this.sourceStage = this;
156 this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
175 this.sourceSpliterator = source;
176 this.sourceStage = this;
177 this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
178 // The following is an optimization of:
179 // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
180 this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
181 this.depth = 0;
182 this.parallel = parallel;
183 }
184
185 /**
186 * Constructor for appending an intermediate operation stage onto an
187 * existing pipeline.
188 *
189 * @param previousStage the upstream pipeline stage
190 * @param opFlags the operation flags for the new stage, described in
191 * {@link StreamOpFlag}
192 */
193 AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
194 if (previousStage.linkedOrConsumed)
195 throw new IllegalStateException("stream has already been operated upon");
196 previousStage.linkedOrConsumed = true;
197 previousStage.nextStage = this;
198
199 this.previousStage = previousStage;
200 this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
201 this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
202 this.sourceStage = previousStage.sourceStage;
203 if (opIsStateful())
204 sourceStage.sourceAnyStateful = true;
205 this.depth = previousStage.depth + 1;
206 }
207
208
209 // Terminal evaluation methods
210
211 /**
212 * Evaluate the pipeline with a terminal operation to produce a result.
213 *
214 * @param <R> the type of result
215 * @param terminalOp the terminal operation to be applied to the pipeline.
216 * @return the result
217 */
218 final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
219 assert getOutputShape() == terminalOp.inputShape();
220 if (linkedOrConsumed)
221 throw new IllegalStateException("stream has already been operated upon");
222 linkedOrConsumed = true;
223
224 return isParallel()
225 ? (R) terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
226 : (R) terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
227 }
228
229 /**
230 * Collect the elements output from the pipeline stage.
231 *
232 * @param generator the array generator to be used to create array instances
233 * @return a flat array-backed Node that holds the collected output elements
234 */
235 final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
236 if (linkedOrConsumed)
237 throw new IllegalStateException("stream has already been operated upon");
238 linkedOrConsumed = true;
239
240 // If the last intermediate operation is stateful then
241 // evaluate directly to avoid an extra collection step
242 if (isParallel() && previousStage != null && opIsStateful()) {
243 return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
244 }
245 else {
246 return evaluate(sourceSpliterator(0), true, generator);
247 }
248 }
249
250 /**
251 * Gets the source stage spliterator if this pipeline stage is the source
252 * stage. The pipeline is consumed after this method is called and
253 * returns successfully.
254 *
255 * @return the source stage spliterator
256 * @throws IllegalStateException if this pipeline stage is not the source
257 * stage.
258 */
259 final Spliterator<E_OUT> sourceStageSpliterator() {
260 if (this != sourceStage)
261 throw new IllegalStateException();
262
263 if (linkedOrConsumed)
264 throw new IllegalStateException("stream has already been operated upon");
265 linkedOrConsumed = true;
266
267 if (sourceStage.sourceSpliterator != null) {
268 Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
269 sourceStage.sourceSpliterator = null;
270 return s;
271 }
272 else if (sourceStage.sourceSupplier != null) {
273 Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
274 sourceStage.sourceSupplier = null;
275 return s;
276 }
277 else {
278 throw new IllegalStateException("source already consumed");
279 }
280 }
281
282 // BaseStream
283
284 @Override
285 public final S sequential() {
286 sourceStage.parallel = false;
287 return (S) this;
288 }
289
290 @Override
291 public final S parallel() {
292 sourceStage.parallel = true;
293 return (S) this;
294 }
295
296 // Primitive specialization use co-variant overrides, hence is not final
297 @Override
298 public Spliterator<E_OUT> spliterator() {
299 if (linkedOrConsumed)
300 throw new IllegalStateException("stream has already been operated upon");
301 linkedOrConsumed = true;
302
303 if (this == sourceStage) {
304 if (sourceStage.sourceSpliterator != null) {
305 Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
306 sourceStage.sourceSpliterator = null;
307 return s;
308 }
309 else if (sourceStage.sourceSupplier != null) {
310 Supplier<Spliterator<E_OUT>> s = sourceStage.sourceSupplier;
311 sourceStage.sourceSupplier = null;
312 return lazySpliterator(s);
313 }
314 else {
315 throw new IllegalStateException("source already consumed");
316 }
317 }
318 else {
319 return wrap(this, () -> sourceSpliterator(0), isParallel());
320 }
321 }
322
323 @Override
324 public final boolean isParallel() {
325 return sourceStage.parallel;
326 }
327
328
329 /**
330 * Returns the composition of stream flags of the stream source and all
331 * intermediate operations.
332 *
333 * @return the composition of stream flags of the stream source and all
334 * intermediate operations
335 * @see StreamOpFlag
393
394 /**
395 * Get the source spliterator for this pipeline stage. For a sequential or
396 * stateless parallel pipeline, this is the source spliterator. For a
397 * stateful parallel pipeline, this is a spliterator describing the results
398 * of all computations up to and including the most recent stateful
399 * operation.
400 */
401 private Spliterator<?> sourceSpliterator(int terminalFlags) {
402 // Get the source spliterator of the pipeline
403 Spliterator<?> spliterator = null;
404 if (sourceStage.sourceSpliterator != null) {
405 spliterator = sourceStage.sourceSpliterator;
406 sourceStage.sourceSpliterator = null;
407 }
408 else if (sourceStage.sourceSupplier != null) {
409 spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
410 sourceStage.sourceSupplier = null;
411 }
412 else {
413 throw new IllegalStateException("source already consumed");
414 }
415
416 if (isParallel()) {
417 // @@@ Merge parallelPrepare with the loop below and use the
418 // spliterator characteristics to determine if SIZED
419 // should be injected
420 parallelPrepare(terminalFlags);
421
422 // Adapt the source spliterator, evaluating each stateful op
423 // in the pipeline up to and including this pipeline stage
424 for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
425 u != e;
426 u = p, p = p.nextStage) {
427
428 if (p.opIsStateful()) {
429 spliterator = p.opEvaluateParallelLazy(u, spliterator);
430 }
431 }
432 }
433 else if (terminalFlags != 0) {
|
54 * or terminal operations are permitted on this stream instance.
55 *
56 * @implNote
57 * <p>For sequential streams, and parallel streams without
58 * <a href="package-summary.html#StreamOps">stateful intermediate
59 * operations</a>, parallel streams, pipeline evaluation is done in a single
60 * pass that "jams" all the operations together. For parallel streams with
61 * stateful operations, execution is divided into segments, where each
62 * stateful operations marks the end of a segment, and each segment is
63 * evaluated separately and the result used as the input to the next
64 * segment. In all cases, the source data is not consumed until a terminal
65 * operation begins.
66 *
67 * @param <E_IN> type of input elements
68 * @param <E_OUT> type of output elements
69 * @param <S> type of the subclass implementing {@code BaseStream}
70 * @since 1.8
71 */
72 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
73 extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
74 private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
75 private static final String MSG_CONSUMED = "source already consumed or closed";
76
77 /**
78 * Backlink to the head of the pipeline chain (self if this is the source
79 * stage).
80 */
81 private final AbstractPipeline sourceStage;
82
83 /**
84 * The "upstream" pipeline, or null if this is the source stage.
85 */
86 private final AbstractPipeline previousStage;
87
88 /**
89 * The operation flags for the intermediate operation represented by this
90 * pipeline object.
91 */
92 protected final int sourceOrOpFlags;
93
94 /**
95 * The next stage in the pipeline, or null if this is the last stage.
96 * Effectively final at the point of linking to the next pipeline.
120 private Spliterator<?> sourceSpliterator;
121
122 /**
123 * The source supplier. Only valid for the head pipeline. Before the
124 * pipeline is consumed if non-null then {@code sourceSpliterator} must be
125 * null. After the pipeline is consumed if non-null then is set to null.
126 */
127 private Supplier<? extends Spliterator<?>> sourceSupplier;
128
129 /**
130 * True if this pipeline has been linked or consumed
131 */
132 private boolean linkedOrConsumed;
133
134 /**
135 * True if there are any stateful ops in the pipeline; only valid for the
136 * source stage.
137 */
138 private boolean sourceAnyStateful;
139
140 private Runnable sourceCloseAction;
141
142 /**
143 * True if pipeline is parallel, otherwise the pipeline is sequential; only
144 * valid for the source stage.
145 */
146 private boolean parallel;
147
148 /**
149 * Constructor for the head of a stream pipeline.
150 *
151 * @param source {@code Supplier<Spliterator>} describing the stream source
152 * @param sourceFlags The source flags for the stream source, described in
153 * {@link StreamOpFlag}
154 * @param parallel True if the pipeline is parallel
155 */
156 AbstractPipeline(Supplier<? extends Spliterator<?>> source,
157 int sourceFlags, boolean parallel) {
158 this.previousStage = null;
159 this.sourceSupplier = source;
160 this.sourceStage = this;
161 this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
180 this.sourceSpliterator = source;
181 this.sourceStage = this;
182 this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
183 // The following is an optimization of:
184 // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
185 this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
186 this.depth = 0;
187 this.parallel = parallel;
188 }
189
190 /**
191 * Constructor for appending an intermediate operation stage onto an
192 * existing pipeline.
193 *
194 * @param previousStage the upstream pipeline stage
195 * @param opFlags the operation flags for the new stage, described in
196 * {@link StreamOpFlag}
197 */
198 AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
199 if (previousStage.linkedOrConsumed)
200 throw new IllegalStateException(MSG_STREAM_LINKED);
201 previousStage.linkedOrConsumed = true;
202 previousStage.nextStage = this;
203
204 this.previousStage = previousStage;
205 this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
206 this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
207 this.sourceStage = previousStage.sourceStage;
208 if (opIsStateful())
209 sourceStage.sourceAnyStateful = true;
210 this.depth = previousStage.depth + 1;
211 }
212
213
214 // Terminal evaluation methods
215
216 /**
217 * Evaluate the pipeline with a terminal operation to produce a result.
218 *
219 * @param <R> the type of result
220 * @param terminalOp the terminal operation to be applied to the pipeline.
221 * @return the result
222 */
223 final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
224 assert getOutputShape() == terminalOp.inputShape();
225 if (linkedOrConsumed)
226 throw new IllegalStateException(MSG_STREAM_LINKED);
227 linkedOrConsumed = true;
228
229 return isParallel()
230 ? (R) terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
231 : (R) terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
232 }
233
234 /**
235 * Collect the elements output from the pipeline stage.
236 *
237 * @param generator the array generator to be used to create array instances
238 * @return a flat array-backed Node that holds the collected output elements
239 */
240 final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
241 if (linkedOrConsumed)
242 throw new IllegalStateException(MSG_STREAM_LINKED);
243 linkedOrConsumed = true;
244
245 // If the last intermediate operation is stateful then
246 // evaluate directly to avoid an extra collection step
247 if (isParallel() && previousStage != null && opIsStateful()) {
248 return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
249 }
250 else {
251 return evaluate(sourceSpliterator(0), true, generator);
252 }
253 }
254
255 /**
256 * Gets the source stage spliterator if this pipeline stage is the source
257 * stage. The pipeline is consumed after this method is called and
258 * returns successfully.
259 *
260 * @return the source stage spliterator
261 * @throws IllegalStateException if this pipeline stage is not the source
262 * stage.
263 */
264 final Spliterator<E_OUT> sourceStageSpliterator() {
265 if (this != sourceStage)
266 throw new IllegalStateException();
267
268 if (linkedOrConsumed)
269 throw new IllegalStateException(MSG_STREAM_LINKED);
270 linkedOrConsumed = true;
271
272 if (sourceStage.sourceSpliterator != null) {
273 Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
274 sourceStage.sourceSpliterator = null;
275 return s;
276 }
277 else if (sourceStage.sourceSupplier != null) {
278 Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
279 sourceStage.sourceSupplier = null;
280 return s;
281 }
282 else {
283 throw new IllegalStateException(MSG_CONSUMED);
284 }
285 }
286
287 // BaseStream
288
289 @Override
290 public final S sequential() {
291 sourceStage.parallel = false;
292 return (S) this;
293 }
294
295 @Override
296 public final S parallel() {
297 sourceStage.parallel = true;
298 return (S) this;
299 }
300
301 @Override
302 public void close() {
303 linkedOrConsumed = true;
304 sourceSupplier = null;
305 sourceSpliterator = null;
306 if (sourceStage.sourceCloseAction != null) {
307 sourceStage.sourceCloseAction.run();
308 sourceStage.sourceCloseAction = null;
309 }
310 }
311
312 @Override
313 public S onClose(Runnable closeHandler) {
314 Runnable existingHandler = sourceStage.sourceCloseAction;
315 sourceStage.sourceCloseAction =
316 (existingHandler == null)
317 ? closeHandler
318 : Streams.composeWithExceptions(existingHandler, closeHandler);
319 return (S) this;
320 }
321
322 // Primitive specialization use co-variant overrides, hence is not final
323 @Override
324 public Spliterator<E_OUT> spliterator() {
325 if (linkedOrConsumed)
326 throw new IllegalStateException(MSG_STREAM_LINKED);
327 linkedOrConsumed = true;
328
329 if (this == sourceStage) {
330 if (sourceStage.sourceSpliterator != null) {
331 Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
332 sourceStage.sourceSpliterator = null;
333 return s;
334 }
335 else if (sourceStage.sourceSupplier != null) {
336 Supplier<Spliterator<E_OUT>> s = sourceStage.sourceSupplier;
337 sourceStage.sourceSupplier = null;
338 return lazySpliterator(s);
339 }
340 else {
341 throw new IllegalStateException(MSG_CONSUMED);
342 }
343 }
344 else {
345 return wrap(this, () -> sourceSpliterator(0), isParallel());
346 }
347 }
348
349 @Override
350 public final boolean isParallel() {
351 return sourceStage.parallel;
352 }
353
354
355 /**
356 * Returns the composition of stream flags of the stream source and all
357 * intermediate operations.
358 *
359 * @return the composition of stream flags of the stream source and all
360 * intermediate operations
361 * @see StreamOpFlag
419
420 /**
421 * Get the source spliterator for this pipeline stage. For a sequential or
422 * stateless parallel pipeline, this is the source spliterator. For a
423 * stateful parallel pipeline, this is a spliterator describing the results
424 * of all computations up to and including the most recent stateful
425 * operation.
426 */
427 private Spliterator<?> sourceSpliterator(int terminalFlags) {
428 // Get the source spliterator of the pipeline
429 Spliterator<?> spliterator = null;
430 if (sourceStage.sourceSpliterator != null) {
431 spliterator = sourceStage.sourceSpliterator;
432 sourceStage.sourceSpliterator = null;
433 }
434 else if (sourceStage.sourceSupplier != null) {
435 spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
436 sourceStage.sourceSupplier = null;
437 }
438 else {
439 throw new IllegalStateException(MSG_CONSUMED);
440 }
441
442 if (isParallel()) {
443 // @@@ Merge parallelPrepare with the loop below and use the
444 // spliterator characteristics to determine if SIZED
445 // should be injected
446 parallelPrepare(terminalFlags);
447
448 // Adapt the source spliterator, evaluating each stateful op
449 // in the pipeline up to and including this pipeline stage
450 for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
451 u != e;
452 u = p, p = p.nextStage) {
453
454 if (p.opIsStateful()) {
455 spliterator = p.opEvaluateParallelLazy(u, spliterator);
456 }
457 }
458 }
459 else if (terminalFlags != 0) {
|