src/share/classes/java/util/stream/AbstractPipeline.java

Print this page
rev 7962 : 8017513: Support for closeable streams
8022237: j.u.s.BaseStream.onClose() has an issue in implementation or requires spec clarification
8022572: Same exception instances thrown from j.u.stream.Stream.onClose() handlers are not listed as suppressed
Summary: BaseStream implements AutoCloseable; Remove CloseableStream and DelegatingStream
Reviewed-by:
Contributed-by: brian.goetz@oracle.com

@@ -69,10 +69,13 @@
  * @param <S> type of the subclass implementing {@code BaseStream}
  * @since 1.8
  */
 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
         extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
+    private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
+    private static final String MSG_CONSUMED = "source already consumed or closed";
+
     /**
      * Backlink to the head of the pipeline chain (self if this is the source
      * stage).
      */
     @SuppressWarnings("rawtypes")

@@ -135,10 +138,12 @@
      * True if there are any stateful ops in the pipeline; only valid for the
      * source stage.
      */
     private boolean sourceAnyStateful;
 
+    private Runnable sourceCloseAction;
+
     /**
      * True if pipeline is parallel, otherwise the pipeline is sequential; only
      * valid for the source stage.
      */
     private boolean parallel;

@@ -193,11 +198,11 @@
      * @param opFlags the operation flags for the new stage, described in
      * {@link StreamOpFlag}
      */
     AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
         if (previousStage.linkedOrConsumed)
-            throw new IllegalStateException("stream has already been operated upon");
+            throw new IllegalStateException(MSG_STREAM_LINKED);
         previousStage.linkedOrConsumed = true;
         previousStage.nextStage = this;
 
         this.previousStage = previousStage;
         this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;

@@ -219,11 +224,11 @@
      * @return the result
      */
     final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
         assert getOutputShape() == terminalOp.inputShape();
         if (linkedOrConsumed)
-            throw new IllegalStateException("stream has already been operated upon");
+            throw new IllegalStateException(MSG_STREAM_LINKED);
         linkedOrConsumed = true;
 
         return isParallel()
                ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
                : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

@@ -236,11 +241,11 @@
      * @return a flat array-backed Node that holds the collected output elements
      */
     @SuppressWarnings("unchecked")
     final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
         if (linkedOrConsumed)
-            throw new IllegalStateException("stream has already been operated upon");
+            throw new IllegalStateException(MSG_STREAM_LINKED);
         linkedOrConsumed = true;
 
         // If the last intermediate operation is stateful then
         // evaluate directly to avoid an extra collection step
         if (isParallel() && previousStage != null && opIsStateful()) {

@@ -264,11 +269,11 @@
     final Spliterator<E_OUT> sourceStageSpliterator() {
         if (this != sourceStage)
             throw new IllegalStateException();
 
         if (linkedOrConsumed)
-            throw new IllegalStateException("stream has already been operated upon");
+            throw new IllegalStateException(MSG_STREAM_LINKED);
         linkedOrConsumed = true;
 
         if (sourceStage.sourceSpliterator != null) {
             @SuppressWarnings("unchecked")
             Spliterator<E_OUT> s = sourceStage.sourceSpliterator;

@@ -280,11 +285,11 @@
             Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
             sourceStage.sourceSupplier = null;
             return s;
         }
         else {
-            throw new IllegalStateException("source already consumed");
+            throw new IllegalStateException(MSG_CONSUMED);
         }
     }
 
     // BaseStream
 

@@ -300,16 +305,39 @@
     public final S parallel() {
         sourceStage.parallel = true;
         return (S) this;
     }
 
+    @Override
+    public void close() {
+        linkedOrConsumed = true;
+        sourceSupplier = null;
+        sourceSpliterator = null;
+        if (sourceStage.sourceCloseAction != null) {
+            Runnable closeAction = sourceStage.sourceCloseAction;
+            sourceStage.sourceCloseAction = null;
+            closeAction.run();
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public S onClose(Runnable closeHandler) {
+        Runnable existingHandler = sourceStage.sourceCloseAction;
+        sourceStage.sourceCloseAction =
+                (existingHandler == null)
+                ? closeHandler
+                : Streams.composeWithExceptions(existingHandler, closeHandler);
+        return (S) this;
+    }
+
     // Primitive specialization use co-variant overrides, hence is not final
     @Override
     @SuppressWarnings("unchecked")
     public Spliterator<E_OUT> spliterator() {
         if (linkedOrConsumed)
-            throw new IllegalStateException("stream has already been operated upon");
+            throw new IllegalStateException(MSG_STREAM_LINKED);
         linkedOrConsumed = true;
 
         if (this == sourceStage) {
             if (sourceStage.sourceSpliterator != null) {
                 @SuppressWarnings("unchecked")

@@ -322,11 +350,11 @@
                 Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
                 sourceStage.sourceSupplier = null;
                 return lazySpliterator(s);
             }
             else {
-                throw new IllegalStateException("source already consumed");
+                throw new IllegalStateException(MSG_CONSUMED);
             }
         }
         else {
             return wrap(this, () -> sourceSpliterator(0), isParallel());
         }

@@ -422,11 +450,11 @@
         else if (sourceStage.sourceSupplier != null) {
             spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
             sourceStage.sourceSupplier = null;
         }
         else {
-            throw new IllegalStateException("source already consumed");
+            throw new IllegalStateException(MSG_CONSUMED);
         }
 
         if (isParallel()) {
             // @@@ Merge parallelPrepare with the loop below and use the
             //     spliterator characteristics to determine if SIZED