247 }
248 };
249 }
250
251 @Override
252 public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
253 Objects.requireNonNull(mapper);
254 // We can do better than this, by polling cancellationRequested when stream is infinite
255 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
256 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
257 @Override
258 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
259 return new Sink.ChainedReference<P_OUT, R>(sink) {
260 @Override
261 public void begin(long size) {
262 downstream.begin(-1);
263 }
264
265 @Override
266 public void accept(P_OUT u) {
267 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
268 Stream<? extends R> result = mapper.apply(u);
269 if (result != null)
270 result.sequential().forEach(downstream);
271 }
272 };
273 }
274 };
275 }
276
277 @Override
278 public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
279 Objects.requireNonNull(mapper);
280 // We can do better than this, by polling cancellationRequested when stream is infinite
281 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
282 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
283 @Override
284 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
285 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
286 IntConsumer downstreamAsInt = downstream::accept;
287 @Override
288 public void begin(long size) {
289 downstream.begin(-1);
290 }
291
292 @Override
293 public void accept(P_OUT u) {
294 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
295 IntStream result = mapper.apply(u);
296 if (result != null)
297 result.sequential().forEach(downstreamAsInt);
298 }
299 };
300 }
301 };
302 }
303
304 @Override
305 public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
306 Objects.requireNonNull(mapper);
307 // We can do better than this, by polling cancellationRequested when stream is infinite
308 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
309 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
310 @Override
311 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
312 return new Sink.ChainedReference<P_OUT, Double>(sink) {
313 DoubleConsumer downstreamAsDouble = downstream::accept;
314 @Override
315 public void begin(long size) {
316 downstream.begin(-1);
317 }
318
319 @Override
320 public void accept(P_OUT u) {
321 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
322 DoubleStream result = mapper.apply(u);
323 if (result != null)
324 result.sequential().forEach(downstreamAsDouble);
325 }
326 };
327 }
328 };
329 }
330
331 @Override
332 public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
333 Objects.requireNonNull(mapper);
334 // We can do better than this, by polling cancellationRequested when stream is infinite
335 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
336 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
337 @Override
338 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
339 return new Sink.ChainedReference<P_OUT, Long>(sink) {
340 LongConsumer downstreamAsLong = downstream::accept;
341 @Override
342 public void begin(long size) {
343 downstream.begin(-1);
344 }
345
346 @Override
347 public void accept(P_OUT u) {
348 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
349 LongStream result = mapper.apply(u);
350 if (result != null)
351 result.sequential().forEach(downstreamAsLong);
352 }
353 };
354 }
355 };
356 }
357
358 @Override
359 public final Stream<P_OUT> peek(Consumer<? super P_OUT> tee) {
360 Objects.requireNonNull(tee);
361 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
362 0) {
363 @Override
364 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
365 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
366 @Override
367 public void accept(P_OUT u) {
368 tee.accept(u);
369 downstream.accept(u);
370 }
371 };
|
247 }
248 };
249 }
250
251 @Override
252 public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
253 Objects.requireNonNull(mapper);
254 // We can do better than this, by polling cancellationRequested when stream is infinite
255 return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
256 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
257 @Override
258 Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
259 return new Sink.ChainedReference<P_OUT, R>(sink) {
260 @Override
261 public void begin(long size) {
262 downstream.begin(-1);
263 }
264
265 @Override
266 public void accept(P_OUT u) {
267 try (Stream<? extends R> result = mapper.apply(u)) {
268 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
269 if (result != null)
270 result.sequential().forEach(downstream);
271 }
272 }
273 };
274 }
275 };
276 }
277
278 @Override
279 public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
280 Objects.requireNonNull(mapper);
281 // We can do better than this, by polling cancellationRequested when stream is infinite
282 return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
283 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
284 @Override
285 Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
286 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
287 IntConsumer downstreamAsInt = downstream::accept;
288 @Override
289 public void begin(long size) {
290 downstream.begin(-1);
291 }
292
293 @Override
294 public void accept(P_OUT u) {
295 try (IntStream result = mapper.apply(u)) {
296 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
297 if (result != null)
298 result.sequential().forEach(downstreamAsInt);
299 }
300 }
301 };
302 }
303 };
304 }
305
306 @Override
307 public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
308 Objects.requireNonNull(mapper);
309 // We can do better than this, by polling cancellationRequested when stream is infinite
310 return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
311 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
312 @Override
313 Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
314 return new Sink.ChainedReference<P_OUT, Double>(sink) {
315 DoubleConsumer downstreamAsDouble = downstream::accept;
316 @Override
317 public void begin(long size) {
318 downstream.begin(-1);
319 }
320
321 @Override
322 public void accept(P_OUT u) {
323 try (DoubleStream result = mapper.apply(u)) {
324 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
325 if (result != null)
326 result.sequential().forEach(downstreamAsDouble);
327 }
328 }
329 };
330 }
331 };
332 }
333
334 @Override
335 public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
336 Objects.requireNonNull(mapper);
337 // We can do better than this, by polling cancellationRequested when stream is infinite
338 return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
339 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
340 @Override
341 Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
342 return new Sink.ChainedReference<P_OUT, Long>(sink) {
343 LongConsumer downstreamAsLong = downstream::accept;
344 @Override
345 public void begin(long size) {
346 downstream.begin(-1);
347 }
348
349 @Override
350 public void accept(P_OUT u) {
351 try (LongStream result = mapper.apply(u)) {
352 // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
353 if (result != null)
354 result.sequential().forEach(downstreamAsLong);
355 }
356 }
357 };
358 }
359 };
360 }
361
362 @Override
363 public final Stream<P_OUT> peek(Consumer<? super P_OUT> tee) {
364 Objects.requireNonNull(tee);
365 return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
366 0) {
367 @Override
368 Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
369 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
370 @Override
371 public void accept(P_OUT u) {
372 tee.accept(u);
373 downstream.accept(u);
374 }
375 };
|