src/share/classes/java/util/stream/AbstractPipeline.java

Print this page
rev 7633 : 8017513: Support for closeable streams
Reviewed-by:
Contributed-by: brian.goetz@oracle.com


  54  * or terminal operations are permitted on this stream instance.
  55  *
  56  * @implNote
  57  * <p>For sequential streams, and parallel streams without
  58  * <a href="package-summary.html#StreamOps">stateful intermediate
  59  * operations</a>, parallel streams, pipeline evaluation is done in a single
  60  * pass that "jams" all the operations together.  For parallel streams with
  61  * stateful operations, execution is divided into segments, where each
  62  * stateful operations marks the end of a segment, and each segment is
  63  * evaluated separately and the result used as the input to the next
  64  * segment.  In all cases, the source data is not consumed until a terminal
  65  * operation begins.
  66  *
  67  * @param <E_IN>  type of input elements
  68  * @param <E_OUT> type of output elements
  69  * @param <S> type of the subclass implementing {@code BaseStream}
  70  * @since 1.8
  71  */
  72 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
  73         extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {



  74     /**
  75      * Backlink to the head of the pipeline chain (self if this is the source
  76      * stage).
  77      */
  78     private final AbstractPipeline sourceStage;
  79 
  80     /**
  81      * The "upstream" pipeline, or null if this is the source stage.
  82      */
  83     private final AbstractPipeline previousStage;
  84 
  85     /**
  86      * The operation flags for the intermediate operation represented by this
  87      * pipeline object.
  88      */
  89     protected final int sourceOrOpFlags;
  90 
  91     /**
  92      * The next stage in the pipeline, or null if this is the last stage.
  93      * Effectively final at the point of linking to the next pipeline.


 117     private Spliterator<?> sourceSpliterator;
 118 
 119     /**
 120      * The source supplier. Only valid for the head pipeline. Before the
 121      * pipeline is consumed if non-null then {@code sourceSpliterator} must be
 122      * null. After the pipeline is consumed if non-null then is set to null.
 123      */
 124     private Supplier<? extends Spliterator<?>> sourceSupplier;
 125 
 126     /**
 127      * True if this pipeline has been linked or consumed
 128      */
 129     private boolean linkedOrConsumed;
 130 
 131     /**
 132      * True if there are any stateful ops in the pipeline; only valid for the
 133      * source stage.
 134      */
 135     private boolean sourceAnyStateful;
 136 


 137     /**
 138      * True if pipeline is parallel, otherwise the pipeline is sequential; only
 139      * valid for the source stage.
 140      */
 141     private boolean parallel;
 142 
 143     /**
 144      * Constructor for the head of a stream pipeline.
 145      *
 146      * @param source {@code Supplier<Spliterator>} describing the stream source
 147      * @param sourceFlags The source flags for the stream source, described in
 148      * {@link StreamOpFlag}
 149      * @param parallel True if the pipeline is parallel
 150      */
 151     AbstractPipeline(Supplier<? extends Spliterator<?>> source,
 152                      int sourceFlags, boolean parallel) {
 153         this.previousStage = null;
 154         this.sourceSupplier = source;
 155         this.sourceStage = this;
 156         this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;


 175         this.sourceSpliterator = source;
 176         this.sourceStage = this;
 177         this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
 178         // The following is an optimization of:
 179         // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
 180         this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
 181         this.depth = 0;
 182         this.parallel = parallel;
 183     }
 184 
 185     /**
 186      * Constructor for appending an intermediate operation stage onto an
 187      * existing pipeline.
 188      *
 189      * @param previousStage the upstream pipeline stage
 190      * @param opFlags the operation flags for the new stage, described in
 191      * {@link StreamOpFlag}
 192      */
 193     AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
 194         if (previousStage.linkedOrConsumed)
 195             throw new IllegalStateException("stream has already been operated upon");
 196         previousStage.linkedOrConsumed = true;
 197         previousStage.nextStage = this;
 198 
 199         this.previousStage = previousStage;
 200         this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
 201         this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
 202         this.sourceStage = previousStage.sourceStage;
 203         if (opIsStateful())
 204             sourceStage.sourceAnyStateful = true;
 205         this.depth = previousStage.depth + 1;
 206     }
 207 
 208 
 209     // Terminal evaluation methods
 210 
 211     /**
 212      * Evaluate the pipeline with a terminal operation to produce a result.
 213      *
 214      * @param <R> the type of result
 215      * @param terminalOp the terminal operation to be applied to the pipeline.
 216      * @return the result
 217      */
 218     final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
 219         assert getOutputShape() == terminalOp.inputShape();
 220         if (linkedOrConsumed)
 221             throw new IllegalStateException("stream has already been operated upon");
 222         linkedOrConsumed = true;
 223 
 224         return isParallel()
 225                ? (R) terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
 226                : (R) terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
 227     }
 228 
 229     /**
 230      * Collect the elements output from the pipeline stage.
 231      *
 232      * @param generator the array generator to be used to create array instances
 233      * @return a flat array-backed Node that holds the collected output elements
 234      */
 235     final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
 236         if (linkedOrConsumed)
 237             throw new IllegalStateException("stream has already been operated upon");
 238         linkedOrConsumed = true;
 239 
 240         // If the last intermediate operation is stateful then
 241         // evaluate directly to avoid an extra collection step
 242         if (isParallel() && previousStage != null && opIsStateful()) {
 243             return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
 244         }
 245         else {
 246             return evaluate(sourceSpliterator(0), true, generator);
 247         }
 248     }
 249 
 250     /**
 251      * Gets the source stage spliterator if this pipeline stage is the source
 252      * stage.  The pipeline is consumed after this method is called and
 253      * returns successfully.
 254      *
 255      * @return the source stage spliterator
 256      * @throws IllegalStateException if this pipeline stage is not the source
 257      *         stage.
 258      */
 259     final Spliterator<E_OUT> sourceStageSpliterator() {
 260         if (this != sourceStage)
 261             throw new IllegalStateException();
 262 
 263         if (linkedOrConsumed)
 264             throw new IllegalStateException("stream has already been operated upon");
 265         linkedOrConsumed = true;
 266 
 267         if (sourceStage.sourceSpliterator != null) {
 268             Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
 269             sourceStage.sourceSpliterator = null;
 270             return s;
 271         }
 272         else if (sourceStage.sourceSupplier != null) {
 273             Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
 274             sourceStage.sourceSupplier = null;
 275             return s;
 276         }
 277         else {
 278             throw new IllegalStateException("source already consumed");
 279         }
 280     }
 281 
 282     // BaseStream
 283 
 284     @Override
 285     public final S sequential() {
 286         sourceStage.parallel = false;
 287         return (S) this;
 288     }
 289 
 290     @Override
 291     public final S parallel() {
 292         sourceStage.parallel = true;
 293         return (S) this;
 294     }
 295 





















 296     // Primitive specialization use co-variant overrides, hence is not final
 297     @Override
 298     public Spliterator<E_OUT> spliterator() {
 299         if (linkedOrConsumed)
 300             throw new IllegalStateException("stream has already been operated upon");
 301         linkedOrConsumed = true;
 302 
 303         if (this == sourceStage) {
 304             if (sourceStage.sourceSpliterator != null) {
 305                 Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
 306                 sourceStage.sourceSpliterator = null;
 307                 return s;
 308             }
 309             else if (sourceStage.sourceSupplier != null) {
 310                 Supplier<Spliterator<E_OUT>> s = sourceStage.sourceSupplier;
 311                 sourceStage.sourceSupplier = null;
 312                 return lazySpliterator(s);
 313             }
 314             else {
 315                 throw new IllegalStateException("source already consumed");
 316             }
 317         }
 318         else {
 319             return wrap(this, () -> sourceSpliterator(0), isParallel());
 320         }
 321     }
 322 
 323     @Override
 324     public final boolean isParallel() {
 325         return sourceStage.parallel;
 326     }
 327 
 328 
 329     /**
 330      * Returns the composition of stream flags of the stream source and all
 331      * intermediate operations.
 332      *
 333      * @return the composition of stream flags of the stream source and all
 334      *         intermediate operations
 335      * @see StreamOpFlag


 393 
 394     /**
 395      * Get the source spliterator for this pipeline stage.  For a sequential or
 396      * stateless parallel pipeline, this is the source spliterator.  For a
 397      * stateful parallel pipeline, this is a spliterator describing the results
 398      * of all computations up to and including the most recent stateful
 399      * operation.
 400      */
 401     private Spliterator<?> sourceSpliterator(int terminalFlags) {
 402         // Get the source spliterator of the pipeline
 403         Spliterator<?> spliterator = null;
 404         if (sourceStage.sourceSpliterator != null) {
 405             spliterator = sourceStage.sourceSpliterator;
 406             sourceStage.sourceSpliterator = null;
 407         }
 408         else if (sourceStage.sourceSupplier != null) {
 409             spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
 410             sourceStage.sourceSupplier = null;
 411         }
 412         else {
 413             throw new IllegalStateException("source already consumed");
 414         }
 415 
 416         if (isParallel()) {
 417             // @@@ Merge parallelPrepare with the loop below and use the
 418             //     spliterator characteristics to determine if SIZED
 419             //     should be injected
 420             parallelPrepare(terminalFlags);
 421 
 422             // Adapt the source spliterator, evaluating each stateful op
 423             // in the pipeline up to and including this pipeline stage
 424             for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
 425                  u != e;
 426                  u = p, p = p.nextStage) {
 427 
 428                 if (p.opIsStateful()) {
 429                     spliterator = p.opEvaluateParallelLazy(u, spliterator);
 430                 }
 431             }
 432         }
 433         else if (terminalFlags != 0)  {




  54  * or terminal operations are permitted on this stream instance.
  55  *
  56  * @implNote
  57  * <p>For sequential streams, and parallel streams without
  58  * <a href="package-summary.html#StreamOps">stateful intermediate
  59  * operations</a>, parallel streams, pipeline evaluation is done in a single
  60  * pass that "jams" all the operations together.  For parallel streams with
  61  * stateful operations, execution is divided into segments, where each
  62  * stateful operations marks the end of a segment, and each segment is
  63  * evaluated separately and the result used as the input to the next
  64  * segment.  In all cases, the source data is not consumed until a terminal
  65  * operation begins.
  66  *
  67  * @param <E_IN>  type of input elements
  68  * @param <E_OUT> type of output elements
  69  * @param <S> type of the subclass implementing {@code BaseStream}
  70  * @since 1.8
  71  */
  72 abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
  73         extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
  74     private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
  75     private static final String MSG_CONSUMED = "source already consumed or closed";
  76 
  77     /**
  78      * Backlink to the head of the pipeline chain (self if this is the source
  79      * stage).
  80      */
  81     private final AbstractPipeline sourceStage;
  82 
  83     /**
  84      * The "upstream" pipeline, or null if this is the source stage.
  85      */
  86     private final AbstractPipeline previousStage;
  87 
  88     /**
  89      * The operation flags for the intermediate operation represented by this
  90      * pipeline object.
  91      */
  92     protected final int sourceOrOpFlags;
  93 
  94     /**
  95      * The next stage in the pipeline, or null if this is the last stage.
  96      * Effectively final at the point of linking to the next pipeline.


 120     private Spliterator<?> sourceSpliterator;
 121 
 122     /**
 123      * The source supplier. Only valid for the head pipeline. Before the
 124      * pipeline is consumed if non-null then {@code sourceSpliterator} must be
 125      * null. After the pipeline is consumed if non-null then is set to null.
 126      */
 127     private Supplier<? extends Spliterator<?>> sourceSupplier;
 128 
 129     /**
 130      * True if this pipeline has been linked or consumed
 131      */
 132     private boolean linkedOrConsumed;
 133 
 134     /**
 135      * True if there are any stateful ops in the pipeline; only valid for the
 136      * source stage.
 137      */
 138     private boolean sourceAnyStateful;
 139 
 140     private Runnable sourceCloseAction;
 141 
 142     /**
 143      * True if pipeline is parallel, otherwise the pipeline is sequential; only
 144      * valid for the source stage.
 145      */
 146     private boolean parallel;
 147 
 148     /**
 149      * Constructor for the head of a stream pipeline.
 150      *
 151      * @param source {@code Supplier<Spliterator>} describing the stream source
 152      * @param sourceFlags The source flags for the stream source, described in
 153      * {@link StreamOpFlag}
 154      * @param parallel True if the pipeline is parallel
 155      */
 156     AbstractPipeline(Supplier<? extends Spliterator<?>> source,
 157                      int sourceFlags, boolean parallel) {
 158         this.previousStage = null;
 159         this.sourceSupplier = source;
 160         this.sourceStage = this;
 161         this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;


 180         this.sourceSpliterator = source;
 181         this.sourceStage = this;
 182         this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
 183         // The following is an optimization of:
 184         // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
 185         this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
 186         this.depth = 0;
 187         this.parallel = parallel;
 188     }
 189 
 190     /**
 191      * Constructor for appending an intermediate operation stage onto an
 192      * existing pipeline.
 193      *
 194      * @param previousStage the upstream pipeline stage
 195      * @param opFlags the operation flags for the new stage, described in
 196      * {@link StreamOpFlag}
 197      */
 198     AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
 199         if (previousStage.linkedOrConsumed)
 200             throw new IllegalStateException(MSG_STREAM_LINKED);
 201         previousStage.linkedOrConsumed = true;
 202         previousStage.nextStage = this;
 203 
 204         this.previousStage = previousStage;
 205         this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
 206         this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
 207         this.sourceStage = previousStage.sourceStage;
 208         if (opIsStateful())
 209             sourceStage.sourceAnyStateful = true;
 210         this.depth = previousStage.depth + 1;
 211     }
 212 
 213 
 214     // Terminal evaluation methods
 215 
 216     /**
 217      * Evaluate the pipeline with a terminal operation to produce a result.
 218      *
 219      * @param <R> the type of result
 220      * @param terminalOp the terminal operation to be applied to the pipeline.
 221      * @return the result
 222      */
 223     final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
 224         assert getOutputShape() == terminalOp.inputShape();
 225         if (linkedOrConsumed)
 226             throw new IllegalStateException(MSG_STREAM_LINKED);
 227         linkedOrConsumed = true;
 228 
 229         return isParallel()
 230                ? (R) terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
 231                : (R) terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
 232     }
 233 
 234     /**
 235      * Collect the elements output from the pipeline stage.
 236      *
 237      * @param generator the array generator to be used to create array instances
 238      * @return a flat array-backed Node that holds the collected output elements
 239      */
 240     final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
 241         if (linkedOrConsumed)
 242             throw new IllegalStateException(MSG_STREAM_LINKED);
 243         linkedOrConsumed = true;
 244 
 245         // If the last intermediate operation is stateful then
 246         // evaluate directly to avoid an extra collection step
 247         if (isParallel() && previousStage != null && opIsStateful()) {
 248             return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
 249         }
 250         else {
 251             return evaluate(sourceSpliterator(0), true, generator);
 252         }
 253     }
 254 
 255     /**
 256      * Gets the source stage spliterator if this pipeline stage is the source
 257      * stage.  The pipeline is consumed after this method is called and
 258      * returns successfully.
 259      *
 260      * @return the source stage spliterator
 261      * @throws IllegalStateException if this pipeline stage is not the source
 262      *         stage.
 263      */
 264     final Spliterator<E_OUT> sourceStageSpliterator() {
 265         if (this != sourceStage)
 266             throw new IllegalStateException();
 267 
 268         if (linkedOrConsumed)
 269             throw new IllegalStateException(MSG_STREAM_LINKED);
 270         linkedOrConsumed = true;
 271 
 272         if (sourceStage.sourceSpliterator != null) {
 273             Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
 274             sourceStage.sourceSpliterator = null;
 275             return s;
 276         }
 277         else if (sourceStage.sourceSupplier != null) {
 278             Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
 279             sourceStage.sourceSupplier = null;
 280             return s;
 281         }
 282         else {
 283             throw new IllegalStateException(MSG_CONSUMED);
 284         }
 285     }
 286 
 287     // BaseStream
 288 
 289     @Override
 290     public final S sequential() {
 291         sourceStage.parallel = false;
 292         return (S) this;
 293     }
 294 
 295     @Override
 296     public final S parallel() {
 297         sourceStage.parallel = true;
 298         return (S) this;
 299     }
 300 
 301     @Override
 302     public void close() {
 303         linkedOrConsumed = true;
 304         sourceSupplier = null;
 305         sourceSpliterator = null;
 306         if (sourceStage.sourceCloseAction != null) {
 307             sourceStage.sourceCloseAction.run();
 308             sourceStage.sourceCloseAction = null;
 309         }
 310     }
 311 
 312     @Override
 313     public S onClose(Runnable closeHandler) {
 314         Runnable existingHandler = sourceStage.sourceCloseAction;
 315         sourceStage.sourceCloseAction =
 316                 (existingHandler == null)
 317                 ? closeHandler
 318                 : Streams.composeWithExceptions(existingHandler, closeHandler);
 319         return (S) this;
 320     }
 321 
 322     // Primitive specialization use co-variant overrides, hence is not final
 323     @Override
 324     public Spliterator<E_OUT> spliterator() {
 325         if (linkedOrConsumed)
 326             throw new IllegalStateException(MSG_STREAM_LINKED);
 327         linkedOrConsumed = true;
 328 
 329         if (this == sourceStage) {
 330             if (sourceStage.sourceSpliterator != null) {
 331                 Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
 332                 sourceStage.sourceSpliterator = null;
 333                 return s;
 334             }
 335             else if (sourceStage.sourceSupplier != null) {
 336                 Supplier<Spliterator<E_OUT>> s = sourceStage.sourceSupplier;
 337                 sourceStage.sourceSupplier = null;
 338                 return lazySpliterator(s);
 339             }
 340             else {
 341                 throw new IllegalStateException(MSG_CONSUMED);
 342             }
 343         }
 344         else {
 345             return wrap(this, () -> sourceSpliterator(0), isParallel());
 346         }
 347     }
 348 
 349     @Override
 350     public final boolean isParallel() {
 351         return sourceStage.parallel;
 352     }
 353 
 354 
 355     /**
 356      * Returns the composition of stream flags of the stream source and all
 357      * intermediate operations.
 358      *
 359      * @return the composition of stream flags of the stream source and all
 360      *         intermediate operations
 361      * @see StreamOpFlag


 419 
 420     /**
 421      * Get the source spliterator for this pipeline stage.  For a sequential or
 422      * stateless parallel pipeline, this is the source spliterator.  For a
 423      * stateful parallel pipeline, this is a spliterator describing the results
 424      * of all computations up to and including the most recent stateful
 425      * operation.
 426      */
 427     private Spliterator<?> sourceSpliterator(int terminalFlags) {
 428         // Get the source spliterator of the pipeline
 429         Spliterator<?> spliterator = null;
 430         if (sourceStage.sourceSpliterator != null) {
 431             spliterator = sourceStage.sourceSpliterator;
 432             sourceStage.sourceSpliterator = null;
 433         }
 434         else if (sourceStage.sourceSupplier != null) {
 435             spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
 436             sourceStage.sourceSupplier = null;
 437         }
 438         else {
 439             throw new IllegalStateException(MSG_CONSUMED);
 440         }
 441 
 442         if (isParallel()) {
 443             // @@@ Merge parallelPrepare with the loop below and use the
 444             //     spliterator characteristics to determine if SIZED
 445             //     should be injected
 446             parallelPrepare(terminalFlags);
 447 
 448             // Adapt the source spliterator, evaluating each stateful op
 449             // in the pipeline up to and including this pipeline stage
 450             for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
 451                  u != e;
 452                  u = p, p = p.nextStage) {
 453 
 454                 if (p.opIsStateful()) {
 455                     spliterator = p.opEvaluateParallelLazy(u, spliterator);
 456                 }
 457             }
 458         }
 459         else if (terminalFlags != 0)  {