src/share/classes/java/util/stream/ReferencePipeline.java

Print this page
rev 7962 : 8017513: Support for closeable streams
8022237: j.u.s.BaseStream.onClose() has an issue in implementation or requires spec clarification
8022572: Same exception instances thrown from j.u.stream.Stream.onClose() handlers are not listed as suppressed
Summary: BaseStream implements AutoCloseable; Remove CloseableStream and DelegatingStream
Reviewed-by:
Contributed-by: brian.goetz@oracle.com


 247             }
 248         };
 249     }
 250 
 251     @Override
 252     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
 253         Objects.requireNonNull(mapper);
 254         // We can do better than this, by polling cancellationRequested when stream is infinite
 255         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
 256                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 257             @Override
 258             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
 259                 return new Sink.ChainedReference<P_OUT, R>(sink) {
 260                     @Override
 261                     public void begin(long size) {
 262                         downstream.begin(-1);
 263                     }
 264 
 265                     @Override
 266                     public void accept(P_OUT u) {

 267                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 268                         Stream<? extends R> result = mapper.apply(u);
 269                         if (result != null)
 270                             result.sequential().forEach(downstream);
 271                     }

 272                 };
 273             }
 274         };
 275     }
 276 
 277     @Override
 278     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
 279         Objects.requireNonNull(mapper);
 280         // We can do better than this, by polling cancellationRequested when stream is infinite
 281         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 282                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 283             @Override
 284             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
 285                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
 286                     IntConsumer downstreamAsInt = downstream::accept;
 287                     @Override
 288                     public void begin(long size) {
 289                         downstream.begin(-1);
 290                     }
 291 
 292                     @Override
 293                     public void accept(P_OUT u) {

 294                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 295                         IntStream result = mapper.apply(u);
 296                         if (result != null)
 297                             result.sequential().forEach(downstreamAsInt);
 298                     }

 299                 };
 300             }
 301         };
 302     }
 303 
 304     @Override
 305     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
 306         Objects.requireNonNull(mapper);
 307         // We can do better than this, by polling cancellationRequested when stream is infinite
 308         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 309                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 310             @Override
 311             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
 312                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
 313                     DoubleConsumer downstreamAsDouble = downstream::accept;
 314                     @Override
 315                     public void begin(long size) {
 316                         downstream.begin(-1);
 317                     }
 318 
 319                     @Override
 320                     public void accept(P_OUT u) {

 321                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 322                         DoubleStream result = mapper.apply(u);
 323                         if (result != null)
 324                             result.sequential().forEach(downstreamAsDouble);
 325                     }

 326                 };
 327             }
 328         };
 329     }
 330 
 331     @Override
 332     public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
 333         Objects.requireNonNull(mapper);
 334         // We can do better than this, by polling cancellationRequested when stream is infinite
 335         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 336                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 337             @Override
 338             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
 339                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
 340                     LongConsumer downstreamAsLong = downstream::accept;
 341                     @Override
 342                     public void begin(long size) {
 343                         downstream.begin(-1);
 344                     }
 345 
 346                     @Override
 347                     public void accept(P_OUT u) {

 348                         // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
 349                         LongStream result = mapper.apply(u);
 350                         if (result != null)
 351                             result.sequential().forEach(downstreamAsLong);

 352                     }
 353                 };
 354             }
 355         };
 356     }
 357 
 358     @Override
 359     public final Stream<P_OUT> peek(Consumer<? super P_OUT> tee) {
 360         Objects.requireNonNull(tee);
 361         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
 362                                      0) {
 363             @Override
 364             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 365                 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
 366                     @Override
 367                     public void accept(P_OUT u) {
 368                         tee.accept(u);
 369                         downstream.accept(u);
 370                     }
 371                 };




 247             }
 248         };
 249     }
 250 
 251     @Override
 252     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
 253         Objects.requireNonNull(mapper);
 254         // We can do better than this, by polling cancellationRequested when stream is infinite
 255         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
 256                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 257             @Override
 258             Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
 259                 return new Sink.ChainedReference<P_OUT, R>(sink) {
 260                     @Override
 261                     public void begin(long size) {
 262                         downstream.begin(-1);
 263                     }
 264 
 265                     @Override
 266                     public void accept(P_OUT u) {
 267                         try (Stream<? extends R> result = mapper.apply(u)) {
 268                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it

 269                             if (result != null)
 270                                 result.sequential().forEach(downstream);
 271                         }
 272                     }
 273                 };
 274             }
 275         };
 276     }
 277 
 278     @Override
 279     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
 280         Objects.requireNonNull(mapper);
 281         // We can do better than this, by polling cancellationRequested when stream is infinite
 282         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 283                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 284             @Override
 285             Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
 286                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
 287                     IntConsumer downstreamAsInt = downstream::accept;
 288                     @Override
 289                     public void begin(long size) {
 290                         downstream.begin(-1);
 291                     }
 292 
 293                     @Override
 294                     public void accept(P_OUT u) {
 295                         try (IntStream result = mapper.apply(u)) {
 296                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it

 297                             if (result != null)
 298                                 result.sequential().forEach(downstreamAsInt);
 299                         }
 300                     }
 301                 };
 302             }
 303         };
 304     }
 305 
 306     @Override
 307     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
 308         Objects.requireNonNull(mapper);
 309         // We can do better than this, by polling cancellationRequested when stream is infinite
 310         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 311                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 312             @Override
 313             Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
 314                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
 315                     DoubleConsumer downstreamAsDouble = downstream::accept;
 316                     @Override
 317                     public void begin(long size) {
 318                         downstream.begin(-1);
 319                     }
 320 
 321                     @Override
 322                     public void accept(P_OUT u) {
 323                         try (DoubleStream result = mapper.apply(u)) {
 324                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it

 325                             if (result != null)
 326                                 result.sequential().forEach(downstreamAsDouble);
 327                         }
 328                     }
 329                 };
 330             }
 331         };
 332     }
 333 
 334     @Override
 335     public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
 336         Objects.requireNonNull(mapper);
 337         // We can do better than this, by polling cancellationRequested when stream is infinite
 338         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
 339                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
 340             @Override
 341             Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
 342                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
 343                     LongConsumer downstreamAsLong = downstream::accept;
 344                     @Override
 345                     public void begin(long size) {
 346                         downstream.begin(-1);
 347                     }
 348 
 349                     @Override
 350                     public void accept(P_OUT u) {
 351                         try (LongStream result = mapper.apply(u)) {
 352                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it

 353                             if (result != null)
 354                                 result.sequential().forEach(downstreamAsLong);
 355                         }
 356                     }
 357                 };
 358             }
 359         };
 360     }
 361 
 362     @Override
 363     public final Stream<P_OUT> peek(Consumer<? super P_OUT> tee) {
 364         Objects.requireNonNull(tee);
 365         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
 366                                      0) {
 367             @Override
 368             Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
 369                 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
 370                     @Override
 371                     public void accept(P_OUT u) {
 372                         tee.accept(u);
 373                         downstream.accept(u);
 374                     }
 375                 };