src/share/classes/java/util/stream/AbstractPipeline.java

Print this page
rev 7633 : 8017513: Support for closeable streams
Reviewed-by:
Contributed-by: brian.goetz@oracle.com

*** 69,78 **** --- 69,81 ---- * @param <S> type of the subclass implementing {@code BaseStream} * @since 1.8 */ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> { + private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed"; + private static final String MSG_CONSUMED = "source already consumed or closed"; + /** * Backlink to the head of the pipeline chain (self if this is the source * stage). */ private final AbstractPipeline sourceStage;
*** 132,141 **** --- 135,146 ---- * True if there are any stateful ops in the pipeline; only valid for the * source stage. */ private boolean sourceAnyStateful; + private Runnable sourceCloseAction; + /** * True if pipeline is parallel, otherwise the pipeline is sequential; only * valid for the source stage. */ private boolean parallel;
*** 190,200 **** * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */ AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) ! throw new IllegalStateException("stream has already been operated upon"); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; --- 195,205 ---- * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */ AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) ! throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
*** 216,226 **** * @return the result */ final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) ! throw new IllegalStateException("stream has already been operated upon"); linkedOrConsumed = true; return isParallel() ? (R) terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : (R) terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); --- 221,231 ---- * @return the result */ final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) ! throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? (R) terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : (R) terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
*** 232,242 **** * @param generator the array generator to be used to create array instances * @return a flat array-backed Node that holds the collected output elements */ final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) { if (linkedOrConsumed) ! throw new IllegalStateException("stream has already been operated upon"); linkedOrConsumed = true; // If the last intermediate operation is stateful then // evaluate directly to avoid an extra collection step if (isParallel() && previousStage != null && opIsStateful()) { --- 237,247 ---- * @param generator the array generator to be used to create array instances * @return a flat array-backed Node that holds the collected output elements */ final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) { if (linkedOrConsumed) ! throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; // If the last intermediate operation is stateful then // evaluate directly to avoid an extra collection step if (isParallel() && previousStage != null && opIsStateful()) {
*** 259,269 **** final Spliterator<E_OUT> sourceStageSpliterator() { if (this != sourceStage) throw new IllegalStateException(); if (linkedOrConsumed) ! throw new IllegalStateException("stream has already been operated upon"); linkedOrConsumed = true; if (sourceStage.sourceSpliterator != null) { Spliterator<E_OUT> s = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null; --- 264,274 ---- final Spliterator<E_OUT> sourceStageSpliterator() { if (this != sourceStage) throw new IllegalStateException(); if (linkedOrConsumed) ! throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; if (sourceStage.sourceSpliterator != null) { Spliterator<E_OUT> s = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null;
*** 273,283 **** Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; return s; } else { ! throw new IllegalStateException("source already consumed"); } } // BaseStream --- 278,288 ---- Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; return s; } else { ! throw new IllegalStateException(MSG_CONSUMED); } } // BaseStream
*** 291,305 **** public final S parallel() { sourceStage.parallel = true; return (S) this; } // Primitive specialization use co-variant overrides, hence is not final @Override public Spliterator<E_OUT> spliterator() { if (linkedOrConsumed) ! throw new IllegalStateException("stream has already been operated upon"); linkedOrConsumed = true; if (this == sourceStage) { if (sourceStage.sourceSpliterator != null) { Spliterator<E_OUT> s = sourceStage.sourceSpliterator; --- 296,331 ---- public final S parallel() { sourceStage.parallel = true; return (S) this; } + @Override + public void close() { + linkedOrConsumed = true; + sourceSupplier = null; + sourceSpliterator = null; + if (sourceStage.sourceCloseAction != null) { + sourceStage.sourceCloseAction.run(); + sourceStage.sourceCloseAction = null; + } + } + + @Override + public S onClose(Runnable closeHandler) { + Runnable existingHandler = sourceStage.sourceCloseAction; + sourceStage.sourceCloseAction = + (existingHandler == null) + ? closeHandler + : Streams.composeWithExceptions(existingHandler, closeHandler); + return (S) this; + } + // Primitive specialization use co-variant overrides, hence is not final @Override public Spliterator<E_OUT> spliterator() { if (linkedOrConsumed) ! throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; if (this == sourceStage) { if (sourceStage.sourceSpliterator != null) { Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
*** 310,320 **** Supplier<Spliterator<E_OUT>> s = sourceStage.sourceSupplier; sourceStage.sourceSupplier = null; return lazySpliterator(s); } else { ! throw new IllegalStateException("source already consumed"); } } else { return wrap(this, () -> sourceSpliterator(0), isParallel()); } --- 336,346 ---- Supplier<Spliterator<E_OUT>> s = sourceStage.sourceSupplier; sourceStage.sourceSupplier = null; return lazySpliterator(s); } else { ! throw new IllegalStateException(MSG_CONSUMED); } } else { return wrap(this, () -> sourceSpliterator(0), isParallel()); }
*** 408,418 **** else if (sourceStage.sourceSupplier != null) { spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; } else { ! throw new IllegalStateException("source already consumed"); } if (isParallel()) { // @@@ Merge parallelPrepare with the loop below and use the // spliterator characteristics to determine if SIZED --- 434,444 ---- else if (sourceStage.sourceSupplier != null) { spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; } else { ! throw new IllegalStateException(MSG_CONSUMED); } if (isParallel()) { // @@@ Merge parallelPrepare with the loop below and use the // spliterator characteristics to determine if SIZED