--- old/src/share/classes/java/util/stream/AbstractPipeline.java 2013-07-10 14:11:41.943135649 -0700 +++ new/src/share/classes/java/util/stream/AbstractPipeline.java 2013-07-10 14:11:41.759135653 -0700 @@ -71,6 +71,9 @@ */ abstract class AbstractPipeline> extends PipelineHelper implements BaseStream { + private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed"; + private static final String MSG_CONSUMED = "source already consumed or closed"; + /** * Backlink to the head of the pipeline chain (self if this is the source * stage). @@ -134,6 +137,8 @@ */ private boolean sourceAnyStateful; + private Runnable sourceCloseAction; + /** * True if pipeline is parallel, otherwise the pipeline is sequential; only * valid for the source stage. @@ -192,7 +197,7 @@ */ AbstractPipeline(AbstractPipeline previousStage, int opFlags) { if (previousStage.linkedOrConsumed) - throw new IllegalStateException("stream has already been operated upon"); + throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; @@ -218,7 +223,7 @@ final R evaluate(TerminalOp terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) - throw new IllegalStateException("stream has already been operated upon"); + throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() @@ -234,7 +239,7 @@ */ final Node evaluateToArrayNode(IntFunction generator) { if (linkedOrConsumed) - throw new IllegalStateException("stream has already been operated upon"); + throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; // If the last intermediate operation is stateful then @@ -261,7 +266,7 @@ throw new IllegalStateException(); if (linkedOrConsumed) - throw new IllegalStateException("stream has already been operated upon"); + throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; if (sourceStage.sourceSpliterator != null) { @@ -275,7 +280,7 @@ return s; } else { - throw new IllegalStateException("source already consumed"); + throw new IllegalStateException(MSG_CONSUMED); } } @@ -293,11 +298,32 @@ return (S) this; } + @Override + public void close() { + linkedOrConsumed = true; + sourceSupplier = null; + sourceSpliterator = null; + if (sourceStage.sourceCloseAction != null) { + sourceStage.sourceCloseAction.run(); + sourceStage.sourceCloseAction = null; + } + } + + @Override + public S onClose(Runnable closeHandler) { + Runnable existingHandler = sourceStage.sourceCloseAction; + sourceStage.sourceCloseAction = + (existingHandler == null) + ? closeHandler + : Streams.composeWithExceptions(existingHandler, closeHandler); + return (S) this; + } + // Primitive specialization use co-variant overrides, hence is not final @Override public Spliterator spliterator() { if (linkedOrConsumed) - throw new IllegalStateException("stream has already been operated upon"); + throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; if (this == sourceStage) { @@ -312,7 +338,7 @@ return lazySpliterator(s); } else { - throw new IllegalStateException("source already consumed"); + throw new IllegalStateException(MSG_CONSUMED); } } else { @@ -410,7 +436,7 @@ sourceStage.sourceSupplier = null; } else { - throw new IllegalStateException("source already consumed"); + throw new IllegalStateException(MSG_CONSUMED); } if (isParallel()) {