src/share/classes/java/util/stream/AbstractPipeline.java
Print this page
rev 7633 : 8017513: Support for closeable streams
Reviewed-by:
Contributed-by: brian.goetz@oracle.com
*** 69,78 ****
--- 69,81 ----
* @param <S> type of the subclass implementing {@code BaseStream}
* @since 1.8
*/
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
+ private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
+ private static final String MSG_CONSUMED = "source already consumed or closed";
+
/**
* Backlink to the head of the pipeline chain (self if this is the source
* stage).
*/
private final AbstractPipeline sourceStage;
*** 132,141 ****
--- 135,146 ----
* True if there are any stateful ops in the pipeline; only valid for the
* source stage.
*/
private boolean sourceAnyStateful;
+ private Runnable sourceCloseAction;
+
/**
* True if pipeline is parallel, otherwise the pipeline is sequential; only
* valid for the source stage.
*/
private boolean parallel;
*** 190,200 ****
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
*/
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
! throw new IllegalStateException("stream has already been operated upon");
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
--- 195,205 ----
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
*/
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
! throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
*** 216,226 ****
* @return the result
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
! throw new IllegalStateException("stream has already been operated upon");
linkedOrConsumed = true;
return isParallel()
? (R) terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: (R) terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
--- 221,231 ----
* @return the result
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
! throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? (R) terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: (R) terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
*** 232,242 ****
* @param generator the array generator to be used to create array instances
* @return a flat array-backed Node that holds the collected output elements
*/
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
if (linkedOrConsumed)
! throw new IllegalStateException("stream has already been operated upon");
linkedOrConsumed = true;
// If the last intermediate operation is stateful then
// evaluate directly to avoid an extra collection step
if (isParallel() && previousStage != null && opIsStateful()) {
--- 237,247 ----
* @param generator the array generator to be used to create array instances
* @return a flat array-backed Node that holds the collected output elements
*/
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
if (linkedOrConsumed)
! throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// If the last intermediate operation is stateful then
// evaluate directly to avoid an extra collection step
if (isParallel() && previousStage != null && opIsStateful()) {
*** 259,269 ****
final Spliterator<E_OUT> sourceStageSpliterator() {
if (this != sourceStage)
throw new IllegalStateException();
if (linkedOrConsumed)
! throw new IllegalStateException("stream has already been operated upon");
linkedOrConsumed = true;
if (sourceStage.sourceSpliterator != null) {
Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
--- 264,274 ----
final Spliterator<E_OUT> sourceStageSpliterator() {
if (this != sourceStage)
throw new IllegalStateException();
if (linkedOrConsumed)
! throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
if (sourceStage.sourceSpliterator != null) {
Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
*** 273,283 ****
Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
return s;
}
else {
! throw new IllegalStateException("source already consumed");
}
}
// BaseStream
--- 278,288 ----
Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
return s;
}
else {
! throw new IllegalStateException(MSG_CONSUMED);
}
}
// BaseStream
*** 291,305 ****
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
// Primitive specialization use co-variant overrides, hence is not final
@Override
public Spliterator<E_OUT> spliterator() {
if (linkedOrConsumed)
! throw new IllegalStateException("stream has already been operated upon");
linkedOrConsumed = true;
if (this == sourceStage) {
if (sourceStage.sourceSpliterator != null) {
Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
--- 296,331 ----
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
+ @Override
+ public void close() {
+ linkedOrConsumed = true;
+ sourceSupplier = null;
+ sourceSpliterator = null;
+ if (sourceStage.sourceCloseAction != null) {
+ sourceStage.sourceCloseAction.run();
+ sourceStage.sourceCloseAction = null;
+ }
+ }
+
+ @Override
+ public S onClose(Runnable closeHandler) {
+ Runnable existingHandler = sourceStage.sourceCloseAction;
+ sourceStage.sourceCloseAction =
+ (existingHandler == null)
+ ? closeHandler
+ : Streams.composeWithExceptions(existingHandler, closeHandler);
+ return (S) this;
+ }
+
// Primitive specialization use co-variant overrides, hence is not final
@Override
public Spliterator<E_OUT> spliterator() {
if (linkedOrConsumed)
! throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
if (this == sourceStage) {
if (sourceStage.sourceSpliterator != null) {
Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
*** 310,320 ****
Supplier<Spliterator<E_OUT>> s = sourceStage.sourceSupplier;
sourceStage.sourceSupplier = null;
return lazySpliterator(s);
}
else {
! throw new IllegalStateException("source already consumed");
}
}
else {
return wrap(this, () -> sourceSpliterator(0), isParallel());
}
--- 336,346 ----
Supplier<Spliterator<E_OUT>> s = sourceStage.sourceSupplier;
sourceStage.sourceSupplier = null;
return lazySpliterator(s);
}
else {
! throw new IllegalStateException(MSG_CONSUMED);
}
}
else {
return wrap(this, () -> sourceSpliterator(0), isParallel());
}
*** 408,418 ****
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
! throw new IllegalStateException("source already consumed");
}
if (isParallel()) {
// @@@ Merge parallelPrepare with the loop below and use the
// spliterator characteristics to determine if SIZED
--- 434,444 ----
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
! throw new IllegalStateException(MSG_CONSUMED);
}
if (isParallel()) {
// @@@ Merge parallelPrepare with the loop below and use the
// spliterator characteristics to determine if SIZED