src/share/classes/java/util/stream/AbstractPipeline.java

Print this page
rev 7982 : 8017513: Support for closeable streams
8022237: j.u.s.BaseStream.onClose() has an issue in implementation or requires spec clarification
8022572: Same exception instances thrown from j.u.stream.Stream.onClose() handlers are not listed as suppressed
Summary: BaseStream implements AutoCloseable; Remove CloseableStream and DelegatingStream
Reviewed-by: alanb, mduigou, psandoz
Contributed-by: brian.goetz@oracle.com


  54  * or terminal operations are permitted on this stream instance.
  55  *
  56  * @implNote
  57  * <p>For sequential streams, and parallel streams without
  58  * <a href="package-summary.html#StreamOps">stateful intermediate
  59  * operations</a>, parallel streams, pipeline evaluation is done in a single
  60  * pass that "jams" all the operations together.  For parallel streams with
  61  * stateful operations, execution is divided into segments, where each
  62  * stateful operations marks the end of a segment, and each segment is
  63  * evaluated separately and the result used as the input to the next
  64  * segment.  In all cases, the source data is not consumed until a terminal
  65  * operation begins.
  66  *
  67  * @param <E_IN>  type of input elements
  68  * @param <E_OUT> type of output elements
  69  * @param <S> type of the subclass implementing {@code BaseStream}
  70  * @since 1.8
  71  */
  72 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
  73         extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {



  74     /**
  75      * Backlink to the head of the pipeline chain (self if this is the source
  76      * stage).
  77      */
  78     @SuppressWarnings("rawtypes")
  79     private final AbstractPipeline sourceStage;
  80 
  81     /**
  82      * The "upstream" pipeline, or null if this is the source stage.
  83      */
  84     @SuppressWarnings("rawtypes")
  85     private final AbstractPipeline previousStage;
  86 
  87     /**
  88      * The operation flags for the intermediate operation represented by this
  89      * pipeline object.
  90      */
  91     protected final int sourceOrOpFlags;
  92 
  93     /**


 120     private Spliterator<?> sourceSpliterator;
 121 
 122     /**
 123      * The source supplier. Only valid for the head pipeline. Before the
 124      * pipeline is consumed if non-null then {@code sourceSpliterator} must be
 125      * null. After the pipeline is consumed if non-null then is set to null.
 126      */
 127     private Supplier<? extends Spliterator<?>> sourceSupplier;
 128 
 129     /**
 130      * True if this pipeline has been linked or consumed
 131      */
 132     private boolean linkedOrConsumed;
 133 
 134     /**
 135      * True if there are any stateful ops in the pipeline; only valid for the
 136      * source stage.
 137      */
 138     private boolean sourceAnyStateful;
 139 


 140     /**
 141      * True if pipeline is parallel, otherwise the pipeline is sequential; only
 142      * valid for the source stage.
 143      */
 144     private boolean parallel;
 145 
 146     /**
 147      * Constructor for the head of a stream pipeline.
 148      *
 149      * @param source {@code Supplier<Spliterator>} describing the stream source
 150      * @param sourceFlags The source flags for the stream source, described in
 151      * {@link StreamOpFlag}
 152      * @param parallel True if the pipeline is parallel
 153      */
 154     AbstractPipeline(Supplier<? extends Spliterator<?>> source,
 155                      int sourceFlags, boolean parallel) {
 156         this.previousStage = null;
 157         this.sourceSupplier = source;
 158         this.sourceStage = this;
 159         this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;


 178         this.sourceSpliterator = source;
 179         this.sourceStage = this;
 180         this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
 181         // The following is an optimization of:
 182         // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
 183         this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
 184         this.depth = 0;
 185         this.parallel = parallel;
 186     }
 187 
 188     /**
 189      * Constructor for appending an intermediate operation stage onto an
 190      * existing pipeline.
 191      *
 192      * @param previousStage the upstream pipeline stage
 193      * @param opFlags the operation flags for the new stage, described in
 194      * {@link StreamOpFlag}
 195      */
 196     AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
 197         if (previousStage.linkedOrConsumed)
 198             throw new IllegalStateException("stream has already been operated upon");
 199         previousStage.linkedOrConsumed = true;
 200         previousStage.nextStage = this;
 201 
 202         this.previousStage = previousStage;
 203         this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
 204         this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
 205         this.sourceStage = previousStage.sourceStage;
 206         if (opIsStateful())
 207             sourceStage.sourceAnyStateful = true;
 208         this.depth = previousStage.depth + 1;
 209     }
 210 
 211 
 212     // Terminal evaluation methods
 213 
 214     /**
 215      * Evaluate the pipeline with a terminal operation to produce a result.
 216      *
 217      * @param <R> the type of result
 218      * @param terminalOp the terminal operation to be applied to the pipeline.
 219      * @return the result
 220      */
 221     final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
 222         assert getOutputShape() == terminalOp.inputShape();
 223         if (linkedOrConsumed)
 224             throw new IllegalStateException("stream has already been operated upon");
 225         linkedOrConsumed = true;
 226 
 227         return isParallel()
 228                ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
 229                : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
 230     }
 231 
 232     /**
 233      * Collect the elements output from the pipeline stage.
 234      *
 235      * @param generator the array generator to be used to create array instances
 236      * @return a flat array-backed Node that holds the collected output elements
 237      */
 238     @SuppressWarnings("unchecked")
 239     final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
 240         if (linkedOrConsumed)
 241             throw new IllegalStateException("stream has already been operated upon");
 242         linkedOrConsumed = true;
 243 
 244         // If the last intermediate operation is stateful then
 245         // evaluate directly to avoid an extra collection step
 246         if (isParallel() && previousStage != null && opIsStateful()) {
 247             return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
 248         }
 249         else {
 250             return evaluate(sourceSpliterator(0), true, generator);
 251         }
 252     }
 253 
 254     /**
 255      * Gets the source stage spliterator if this pipeline stage is the source
 256      * stage.  The pipeline is consumed after this method is called and
 257      * returns successfully.
 258      *
 259      * @return the source stage spliterator
 260      * @throws IllegalStateException if this pipeline stage is not the source
 261      *         stage.
 262      */
 263     @SuppressWarnings("unchecked")
 264     final Spliterator<E_OUT> sourceStageSpliterator() {
 265         if (this != sourceStage)
 266             throw new IllegalStateException();
 267 
 268         if (linkedOrConsumed)
 269             throw new IllegalStateException("stream has already been operated upon");
 270         linkedOrConsumed = true;
 271 
 272         if (sourceStage.sourceSpliterator != null) {
 273             @SuppressWarnings("unchecked")
 274             Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
 275             sourceStage.sourceSpliterator = null;
 276             return s;
 277         }
 278         else if (sourceStage.sourceSupplier != null) {
 279             @SuppressWarnings("unchecked")
 280             Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
 281             sourceStage.sourceSupplier = null;
 282             return s;
 283         }
 284         else {
 285             throw new IllegalStateException("source already consumed");
 286         }
 287     }
 288 
 289     // BaseStream
 290 
 291     @Override
 292     @SuppressWarnings("unchecked")
 293     public final S sequential() {
 294         sourceStage.parallel = false;
 295         return (S) this;
 296     }
 297 
 298     @Override
 299     @SuppressWarnings("unchecked")
 300     public final S parallel() {
 301         sourceStage.parallel = true;
 302         return (S) this;
 303     }
 304 























 305     // Primitive specialization use co-variant overrides, hence is not final
 306     @Override
 307     @SuppressWarnings("unchecked")
 308     public Spliterator<E_OUT> spliterator() {
 309         if (linkedOrConsumed)
 310             throw new IllegalStateException("stream has already been operated upon");
 311         linkedOrConsumed = true;
 312 
 313         if (this == sourceStage) {
 314             if (sourceStage.sourceSpliterator != null) {
 315                 @SuppressWarnings("unchecked")
 316                 Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;
 317                 sourceStage.sourceSpliterator = null;
 318                 return s;
 319             }
 320             else if (sourceStage.sourceSupplier != null) {
 321                 @SuppressWarnings("unchecked")
 322                 Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
 323                 sourceStage.sourceSupplier = null;
 324                 return lazySpliterator(s);
 325             }
 326             else {
 327                 throw new IllegalStateException("source already consumed");
 328             }
 329         }
 330         else {
 331             return wrap(this, () -> sourceSpliterator(0), isParallel());
 332         }
 333     }
 334 
 335     @Override
 336     public final boolean isParallel() {
 337         return sourceStage.parallel;
 338     }
 339 
 340 
 341     /**
 342      * Returns the composition of stream flags of the stream source and all
 343      * intermediate operations.
 344      *
 345      * @return the composition of stream flags of the stream source and all
 346      *         intermediate operations
 347      * @see StreamOpFlag


 407     /**
 408      * Get the source spliterator for this pipeline stage.  For a sequential or
 409      * stateless parallel pipeline, this is the source spliterator.  For a
 410      * stateful parallel pipeline, this is a spliterator describing the results
 411      * of all computations up to and including the most recent stateful
 412      * operation.
 413      */
 414     @SuppressWarnings("unchecked")
 415     private Spliterator<?> sourceSpliterator(int terminalFlags) {
 416         // Get the source spliterator of the pipeline
 417         Spliterator<?> spliterator = null;
 418         if (sourceStage.sourceSpliterator != null) {
 419             spliterator = sourceStage.sourceSpliterator;
 420             sourceStage.sourceSpliterator = null;
 421         }
 422         else if (sourceStage.sourceSupplier != null) {
 423             spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
 424             sourceStage.sourceSupplier = null;
 425         }
 426         else {
 427             throw new IllegalStateException("source already consumed");
 428         }
 429 
 430         if (isParallel()) {
 431             // @@@ Merge parallelPrepare with the loop below and use the
 432             //     spliterator characteristics to determine if SIZED
 433             //     should be injected
 434             parallelPrepare(terminalFlags);
 435 
 436             // Adapt the source spliterator, evaluating each stateful op
 437             // in the pipeline up to and including this pipeline stage
 438             for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
 439                  u != e;
 440                  u = p, p = p.nextStage) {
 441 
 442                 if (p.opIsStateful()) {
 443                     spliterator = p.opEvaluateParallelLazy(u, spliterator);
 444                 }
 445             }
 446         }
 447         else if (terminalFlags != 0)  {




  54  * or terminal operations are permitted on this stream instance.
  55  *
  56  * @implNote
  57  * <p>For sequential streams, and parallel streams without
  58  * <a href="package-summary.html#StreamOps">stateful intermediate
  59  * operations</a>, parallel streams, pipeline evaluation is done in a single
  60  * pass that "jams" all the operations together.  For parallel streams with
  61  * stateful operations, execution is divided into segments, where each
  62  * stateful operations marks the end of a segment, and each segment is
  63  * evaluated separately and the result used as the input to the next
  64  * segment.  In all cases, the source data is not consumed until a terminal
  65  * operation begins.
  66  *
  67  * @param <E_IN>  type of input elements
  68  * @param <E_OUT> type of output elements
  69  * @param <S> type of the subclass implementing {@code BaseStream}
  70  * @since 1.8
  71  */
  72 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
  73         extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
  74     private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
  75     private static final String MSG_CONSUMED = "source already consumed or closed";
  76 
  77     /**
  78      * Backlink to the head of the pipeline chain (self if this is the source
  79      * stage).
  80      */
  81     @SuppressWarnings("rawtypes")
  82     private final AbstractPipeline sourceStage;
  83 
  84     /**
  85      * The "upstream" pipeline, or null if this is the source stage.
  86      */
  87     @SuppressWarnings("rawtypes")
  88     private final AbstractPipeline previousStage;
  89 
  90     /**
  91      * The operation flags for the intermediate operation represented by this
  92      * pipeline object.
  93      */
  94     protected final int sourceOrOpFlags;
  95 
  96     /**


 123     private Spliterator<?> sourceSpliterator;
 124 
 125     /**
 126      * The source supplier. Only valid for the head pipeline. Before the
 127      * pipeline is consumed if non-null then {@code sourceSpliterator} must be
 128      * null. After the pipeline is consumed if non-null then is set to null.
 129      */
 130     private Supplier<? extends Spliterator<?>> sourceSupplier;
 131 
 132     /**
 133      * True if this pipeline has been linked or consumed
 134      */
 135     private boolean linkedOrConsumed;
 136 
 137     /**
 138      * True if there are any stateful ops in the pipeline; only valid for the
 139      * source stage.
 140      */
 141     private boolean sourceAnyStateful;
 142 
 143     private Runnable sourceCloseAction;
 144 
 145     /**
 146      * True if pipeline is parallel, otherwise the pipeline is sequential; only
 147      * valid for the source stage.
 148      */
 149     private boolean parallel;
 150 
 151     /**
 152      * Constructor for the head of a stream pipeline.
 153      *
 154      * @param source {@code Supplier<Spliterator>} describing the stream source
 155      * @param sourceFlags The source flags for the stream source, described in
 156      * {@link StreamOpFlag}
 157      * @param parallel True if the pipeline is parallel
 158      */
 159     AbstractPipeline(Supplier<? extends Spliterator<?>> source,
 160                      int sourceFlags, boolean parallel) {
 161         this.previousStage = null;
 162         this.sourceSupplier = source;
 163         this.sourceStage = this;
 164         this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;


 183         this.sourceSpliterator = source;
 184         this.sourceStage = this;
 185         this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
 186         // The following is an optimization of:
 187         // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
 188         this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
 189         this.depth = 0;
 190         this.parallel = parallel;
 191     }
 192 
 193     /**
 194      * Constructor for appending an intermediate operation stage onto an
 195      * existing pipeline.
 196      *
 197      * @param previousStage the upstream pipeline stage
 198      * @param opFlags the operation flags for the new stage, described in
 199      * {@link StreamOpFlag}
 200      */
 201     AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
 202         if (previousStage.linkedOrConsumed)
 203             throw new IllegalStateException(MSG_STREAM_LINKED);
 204         previousStage.linkedOrConsumed = true;
 205         previousStage.nextStage = this;
 206 
 207         this.previousStage = previousStage;
 208         this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
 209         this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
 210         this.sourceStage = previousStage.sourceStage;
 211         if (opIsStateful())
 212             sourceStage.sourceAnyStateful = true;
 213         this.depth = previousStage.depth + 1;
 214     }
 215 
 216 
 217     // Terminal evaluation methods
 218 
 219     /**
 220      * Evaluate the pipeline with a terminal operation to produce a result.
 221      *
 222      * @param <R> the type of result
 223      * @param terminalOp the terminal operation to be applied to the pipeline.
 224      * @return the result
 225      */
 226     final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
 227         assert getOutputShape() == terminalOp.inputShape();
 228         if (linkedOrConsumed)
 229             throw new IllegalStateException(MSG_STREAM_LINKED);
 230         linkedOrConsumed = true;
 231 
 232         return isParallel()
 233                ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
 234                : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
 235     }
 236 
 237     /**
 238      * Collect the elements output from the pipeline stage.
 239      *
 240      * @param generator the array generator to be used to create array instances
 241      * @return a flat array-backed Node that holds the collected output elements
 242      */
 243     @SuppressWarnings("unchecked")
 244     final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
 245         if (linkedOrConsumed)
 246             throw new IllegalStateException(MSG_STREAM_LINKED);
 247         linkedOrConsumed = true;
 248 
 249         // If the last intermediate operation is stateful then
 250         // evaluate directly to avoid an extra collection step
 251         if (isParallel() && previousStage != null && opIsStateful()) {
 252             return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
 253         }
 254         else {
 255             return evaluate(sourceSpliterator(0), true, generator);
 256         }
 257     }
 258 
 259     /**
 260      * Gets the source stage spliterator if this pipeline stage is the source
 261      * stage.  The pipeline is consumed after this method is called and
 262      * returns successfully.
 263      *
 264      * @return the source stage spliterator
 265      * @throws IllegalStateException if this pipeline stage is not the source
 266      *         stage.
 267      */
 268     @SuppressWarnings("unchecked")
 269     final Spliterator<E_OUT> sourceStageSpliterator() {
 270         if (this != sourceStage)
 271             throw new IllegalStateException();
 272 
 273         if (linkedOrConsumed)
 274             throw new IllegalStateException(MSG_STREAM_LINKED);
 275         linkedOrConsumed = true;
 276 
 277         if (sourceStage.sourceSpliterator != null) {
 278             @SuppressWarnings("unchecked")
 279             Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
 280             sourceStage.sourceSpliterator = null;
 281             return s;
 282         }
 283         else if (sourceStage.sourceSupplier != null) {
 284             @SuppressWarnings("unchecked")
 285             Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
 286             sourceStage.sourceSupplier = null;
 287             return s;
 288         }
 289         else {
 290             throw new IllegalStateException(MSG_CONSUMED);
 291         }
 292     }
 293 
 294     // BaseStream
 295 
 296     @Override
 297     @SuppressWarnings("unchecked")
 298     public final S sequential() {
 299         sourceStage.parallel = false;
 300         return (S) this;
 301     }
 302 
 303     @Override
 304     @SuppressWarnings("unchecked")
 305     public final S parallel() {
 306         sourceStage.parallel = true;
 307         return (S) this;
 308     }
 309 
 310     @Override
 311     public void close() {
 312         linkedOrConsumed = true;
 313         sourceSupplier = null;
 314         sourceSpliterator = null;
 315         if (sourceStage.sourceCloseAction != null) {
 316             Runnable closeAction = sourceStage.sourceCloseAction;
 317             sourceStage.sourceCloseAction = null;
 318             closeAction.run();
 319         }
 320     }
 321 
 322     @Override
 323     @SuppressWarnings("unchecked")
 324     public S onClose(Runnable closeHandler) {
 325         Runnable existingHandler = sourceStage.sourceCloseAction;
 326         sourceStage.sourceCloseAction =
 327                 (existingHandler == null)
 328                 ? closeHandler
 329                 : Streams.composeWithExceptions(existingHandler, closeHandler);
 330         return (S) this;
 331     }
 332 
 333     // Primitive specialization use co-variant overrides, hence is not final
 334     @Override
 335     @SuppressWarnings("unchecked")
 336     public Spliterator<E_OUT> spliterator() {
 337         if (linkedOrConsumed)
 338             throw new IllegalStateException(MSG_STREAM_LINKED);
 339         linkedOrConsumed = true;
 340 
 341         if (this == sourceStage) {
 342             if (sourceStage.sourceSpliterator != null) {
 343                 @SuppressWarnings("unchecked")
 344                 Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;
 345                 sourceStage.sourceSpliterator = null;
 346                 return s;
 347             }
 348             else if (sourceStage.sourceSupplier != null) {
 349                 @SuppressWarnings("unchecked")
 350                 Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
 351                 sourceStage.sourceSupplier = null;
 352                 return lazySpliterator(s);
 353             }
 354             else {
 355                 throw new IllegalStateException(MSG_CONSUMED);
 356             }
 357         }
 358         else {
 359             return wrap(this, () -> sourceSpliterator(0), isParallel());
 360         }
 361     }
 362 
 363     @Override
 364     public final boolean isParallel() {
 365         return sourceStage.parallel;
 366     }
 367 
 368 
 369     /**
 370      * Returns the composition of stream flags of the stream source and all
 371      * intermediate operations.
 372      *
 373      * @return the composition of stream flags of the stream source and all
 374      *         intermediate operations
 375      * @see StreamOpFlag


 435     /**
 436      * Get the source spliterator for this pipeline stage.  For a sequential or
 437      * stateless parallel pipeline, this is the source spliterator.  For a
 438      * stateful parallel pipeline, this is a spliterator describing the results
 439      * of all computations up to and including the most recent stateful
 440      * operation.
 441      */
 442     @SuppressWarnings("unchecked")
 443     private Spliterator<?> sourceSpliterator(int terminalFlags) {
 444         // Get the source spliterator of the pipeline
 445         Spliterator<?> spliterator = null;
 446         if (sourceStage.sourceSpliterator != null) {
 447             spliterator = sourceStage.sourceSpliterator;
 448             sourceStage.sourceSpliterator = null;
 449         }
 450         else if (sourceStage.sourceSupplier != null) {
 451             spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
 452             sourceStage.sourceSupplier = null;
 453         }
 454         else {
 455             throw new IllegalStateException(MSG_CONSUMED);
 456         }
 457 
 458         if (isParallel()) {
 459             // @@@ Merge parallelPrepare with the loop below and use the
 460             //     spliterator characteristics to determine if SIZED
 461             //     should be injected
 462             parallelPrepare(terminalFlags);
 463 
 464             // Adapt the source spliterator, evaluating each stateful op
 465             // in the pipeline up to and including this pipeline stage
 466             for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
 467                  u != e;
 468                  u = p, p = p.nextStage) {
 469 
 470                 if (p.opIsStateful()) {
 471                     spliterator = p.opEvaluateParallelLazy(u, spliterator);
 472                 }
 473             }
 474         }
 475         else if (terminalFlags != 0)  {