src/share/classes/java/util/stream/AbstractPipeline.java

Print this page
rev 7982 : 8017513: Support for closeable streams
8022237: j.u.s.BaseStream.onClose() has an issue in implementation or requires spec clarification
8022572: Same exception instances thrown from j.u.stream.Stream.onClose() handlers are not listed as suppressed
Summary: BaseStream implements AutoCloseable; Remove CloseableStream and DelegatingStream
Reviewed-by: alanb, mduigou, psandoz
Contributed-by: brian.goetz@oracle.com

*** 69,78 **** --- 69,81 ---- * @param <S> type of the subclass implementing {@code BaseStream} * @since 1.8 */ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> { + private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed"; + private static final String MSG_CONSUMED = "source already consumed or closed"; + /** * Backlink to the head of the pipeline chain (self if this is the source * stage). */ @SuppressWarnings("rawtypes")
*** 135,144 **** --- 138,149 ---- * True if there are any stateful ops in the pipeline; only valid for the * source stage. */ private boolean sourceAnyStateful; + private Runnable sourceCloseAction; + /** * True if pipeline is parallel, otherwise the pipeline is sequential; only * valid for the source stage. */ private boolean parallel;
*** 193,203 **** * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */ AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) ! throw new IllegalStateException("stream has already been operated upon"); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; --- 198,208 ---- * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */ AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) ! throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
*** 219,229 **** * @return the result */ final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) ! throw new IllegalStateException("stream has already been operated upon"); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); --- 224,234 ---- * @return the result */ final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) ! throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
*** 236,246 **** * @return a flat array-backed Node that holds the collected output elements */ @SuppressWarnings("unchecked") final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) { if (linkedOrConsumed) ! throw new IllegalStateException("stream has already been operated upon"); linkedOrConsumed = true; // If the last intermediate operation is stateful then // evaluate directly to avoid an extra collection step if (isParallel() && previousStage != null && opIsStateful()) { --- 241,251 ---- * @return a flat array-backed Node that holds the collected output elements */ @SuppressWarnings("unchecked") final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) { if (linkedOrConsumed) ! throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; // If the last intermediate operation is stateful then // evaluate directly to avoid an extra collection step if (isParallel() && previousStage != null && opIsStateful()) {
*** 264,274 **** final Spliterator<E_OUT> sourceStageSpliterator() { if (this != sourceStage) throw new IllegalStateException(); if (linkedOrConsumed) ! throw new IllegalStateException("stream has already been operated upon"); linkedOrConsumed = true; if (sourceStage.sourceSpliterator != null) { @SuppressWarnings("unchecked") Spliterator<E_OUT> s = sourceStage.sourceSpliterator; --- 269,279 ---- final Spliterator<E_OUT> sourceStageSpliterator() { if (this != sourceStage) throw new IllegalStateException(); if (linkedOrConsumed) ! throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; if (sourceStage.sourceSpliterator != null) { @SuppressWarnings("unchecked") Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
*** 280,290 **** Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; return s; } else { ! throw new IllegalStateException("source already consumed"); } } // BaseStream --- 285,295 ---- Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; return s; } else { ! throw new IllegalStateException(MSG_CONSUMED); } } // BaseStream
*** 300,315 **** public final S parallel() { sourceStage.parallel = true; return (S) this; } // Primitive specialization use co-variant overrides, hence is not final @Override @SuppressWarnings("unchecked") public Spliterator<E_OUT> spliterator() { if (linkedOrConsumed) ! throw new IllegalStateException("stream has already been operated upon"); linkedOrConsumed = true; if (this == sourceStage) { if (sourceStage.sourceSpliterator != null) { @SuppressWarnings("unchecked") --- 305,343 ---- public final S parallel() { sourceStage.parallel = true; return (S) this; } + @Override + public void close() { + linkedOrConsumed = true; + sourceSupplier = null; + sourceSpliterator = null; + if (sourceStage.sourceCloseAction != null) { + Runnable closeAction = sourceStage.sourceCloseAction; + sourceStage.sourceCloseAction = null; + closeAction.run(); + } + } + + @Override + @SuppressWarnings("unchecked") + public S onClose(Runnable closeHandler) { + Runnable existingHandler = sourceStage.sourceCloseAction; + sourceStage.sourceCloseAction = + (existingHandler == null) + ? closeHandler + : Streams.composeWithExceptions(existingHandler, closeHandler); + return (S) this; + } + // Primitive specialization use co-variant overrides, hence is not final @Override @SuppressWarnings("unchecked") public Spliterator<E_OUT> spliterator() { if (linkedOrConsumed) ! throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; if (this == sourceStage) { if (sourceStage.sourceSpliterator != null) { @SuppressWarnings("unchecked")
*** 322,332 **** Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier; sourceStage.sourceSupplier = null; return lazySpliterator(s); } else { ! throw new IllegalStateException("source already consumed"); } } else { return wrap(this, () -> sourceSpliterator(0), isParallel()); } --- 350,360 ---- Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier; sourceStage.sourceSupplier = null; return lazySpliterator(s); } else { ! throw new IllegalStateException(MSG_CONSUMED); } } else { return wrap(this, () -> sourceSpliterator(0), isParallel()); }
*** 422,432 **** else if (sourceStage.sourceSupplier != null) { spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; } else { ! throw new IllegalStateException("source already consumed"); } if (isParallel()) { // @@@ Merge parallelPrepare with the loop below and use the // spliterator characteristics to determine if SIZED --- 450,460 ---- else if (sourceStage.sourceSupplier != null) { spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; } else { ! throw new IllegalStateException(MSG_CONSUMED); } if (isParallel()) { // @@@ Merge parallelPrepare with the loop below and use the // spliterator characteristics to determine if SIZED