79 assert s.hasCharacteristics(Spliterator.SUBSIZED);
80 long sliceFence = calcSliceFence(skip, limit);
81 switch (shape) {
82 case REFERENCE:
83 return new StreamSpliterators
84 .SliceSpliterator.OfRef<>(s, skip, sliceFence);
85 case INT_VALUE:
86 return (Spliterator<P_IN>) new StreamSpliterators
87 .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
88 case LONG_VALUE:
89 return (Spliterator<P_IN>) new StreamSpliterators
90 .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
91 case DOUBLE_VALUE:
92 return (Spliterator<P_IN>) new StreamSpliterators
93 .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
94 default:
95 throw new IllegalStateException("Unknown shape " + shape);
96 }
97 }
98
99 /**
100 * Appends a "slice" operation to the provided stream. The slice operation
101 * may be may be skip-only, limit-only, or skip-and-limit.
102 *
103 * @param <T> the type of both input and output elements
104 * @param upstream a reference stream with element type T
105 * @param skip the number of elements to skip. Must be >= 0.
106 * @param limit the maximum size of the resulting stream, or -1 if no limit
107 * is to be imposed
108 */
109 public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
110 long skip, long limit) {
111 if (skip < 0)
112 throw new IllegalArgumentException("Skip must be non-negative: " + skip);
113
114 return new ReferencePipeline.StatefulOp<T,T>(upstream, StreamShape.REFERENCE,
115 flags(limit)) {
116 Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
117 long skip, long limit, long sizeIfKnown) {
118 if (skip <= sizeIfKnown) {
119 // Use just the limit if the number of elements
120 // to skip is <= the known pipeline size
121 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
122 skip = 0;
123 }
124 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
125 }
126
127 @Override
128 <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
129 long size = helper.exactOutputSizeIfKnown(spliterator);
130 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
131 return new StreamSpliterators.SliceSpliterator.OfRef<>(
132 helper.wrapSpliterator(spliterator),
133 skip,
134 calcSliceFence(skip, limit));
135 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
136 return unorderedSkipLimitSpliterator(
137 helper.wrapSpliterator(spliterator),
138 skip, limit, size);
139 }
140 else {
141 // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n)
142 // regardless of the value of n
143 // Need to adjust the target size of splitting for the
144 // SliceTask from say (size / k) to say min(size / k, 1 << 14)
145 // This will limit the size of the buffers created at the leaf nodes
146 // cancellation will be more aggressive cancelling later tasks
147 // if the target slice size has been reached from a given task,
148 // cancellation should also clear local results if any
149 return new SliceTask<>(this, helper, spliterator, i -> (T[]) new Object[i], skip, limit).
150 invoke().spliterator();
151 }
152 }
153
154 @Override
155 <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
156 Spliterator<P_IN> spliterator,
157 IntFunction<T[]> generator) {
158 long size = helper.exactOutputSizeIfKnown(spliterator);
159 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
160 // Because the pipeline is SIZED the slice spliterator
161 // can be created from the source, this requires matching
162 // to shape of the source, and is potentially more efficient
163 // than creating the slice spliterator from the pipeline
164 // wrapping spliterator
165 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
166 return Nodes.collect(helper, s, true, generator);
167 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
168 Spliterator<T> s = unorderedSkipLimitSpliterator(
169 helper.wrapSpliterator(spliterator),
170 skip, limit, size);
171 // Collect using this pipeline, which is empty and therefore
172 // can be used with the pipeline wrapping spliterator
173 // Note that we cannot create a slice spliterator from
174 // the source spliterator if the pipeline is not SIZED
175 return Nodes.collect(this, s, true, generator);
176 }
177 else {
178 return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
179 invoke();
180 }
181 }
182
183 @Override
184 Sink<T> opWrapSink(int flags, Sink<T> sink) {
185 return new Sink.ChainedReference<T>(sink) {
186 long n = skip;
187 long m = limit >= 0 ? limit : Long.MAX_VALUE;
188
189 @Override
190 public void begin(long size) {
191 downstream.begin(calcSize(size, skip, m));
192 }
193
194 @Override
195 public void accept(T t) {
196 if (n == 0) {
197 if (m > 0) {
198 m--;
199 downstream.accept(t);
200 }
201 }
202 else {
203 n--;
204 }
205 }
274 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
275 return Nodes.collectInt(helper, s, true);
276 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
277 Spliterator.OfInt s = unorderedSkipLimitSpliterator(
278 (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
279 skip, limit, size);
280 // Collect using this pipeline, which is empty and therefore
281 // can be used with the pipeline wrapping spliterator
282 // Note that we cannot create a slice spliterator from
283 // the source spliterator if the pipeline is not SIZED
284 return Nodes.collectInt(this, s, true);
285 }
286 else {
287 return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
288 invoke();
289 }
290 }
291
292 @Override
293 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
294 return new Sink.ChainedInt(sink) {
295 long n = skip;
296 long m = limit >= 0 ? limit : Long.MAX_VALUE;
297
298 @Override
299 public void begin(long size) {
300 downstream.begin(calcSize(size, skip, m));
301 }
302
303 @Override
304 public void accept(int t) {
305 if (n == 0) {
306 if (m > 0) {
307 m--;
308 downstream.accept(t);
309 }
310 }
311 else {
312 n--;
313 }
314 }
383 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
384 return Nodes.collectLong(helper, s, true);
385 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
386 Spliterator.OfLong s = unorderedSkipLimitSpliterator(
387 (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
388 skip, limit, size);
389 // Collect using this pipeline, which is empty and therefore
390 // can be used with the pipeline wrapping spliterator
391 // Note that we cannot create a slice spliterator from
392 // the source spliterator if the pipeline is not SIZED
393 return Nodes.collectLong(this, s, true);
394 }
395 else {
396 return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
397 invoke();
398 }
399 }
400
401 @Override
402 Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
403 return new Sink.ChainedLong(sink) {
404 long n = skip;
405 long m = limit >= 0 ? limit : Long.MAX_VALUE;
406
407 @Override
408 public void begin(long size) {
409 downstream.begin(calcSize(size, skip, m));
410 }
411
412 @Override
413 public void accept(long t) {
414 if (n == 0) {
415 if (m > 0) {
416 m--;
417 downstream.accept(t);
418 }
419 }
420 else {
421 n--;
422 }
423 }
492 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
493 return Nodes.collectDouble(helper, s, true);
494 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
495 Spliterator.OfDouble s = unorderedSkipLimitSpliterator(
496 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
497 skip, limit, size);
498 // Collect using this pipeline, which is empty and therefore
499 // can be used with the pipeline wrapping spliterator
500 // Note that we cannot create a slice spliterator from
501 // the source spliterator if the pipeline is not SIZED
502 return Nodes.collectDouble(this, s, true);
503 }
504 else {
505 return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
506 invoke();
507 }
508 }
509
510 @Override
511 Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
512 return new Sink.ChainedDouble(sink) {
513 long n = skip;
514 long m = limit >= 0 ? limit : Long.MAX_VALUE;
515
516 @Override
517 public void begin(long size) {
518 downstream.begin(calcSize(size, skip, m));
519 }
520
521 @Override
522 public void accept(double t) {
523 if (n == 0) {
524 if (m > 0) {
525 m--;
526 downstream.accept(t);
527 }
528 }
529 else {
530 n--;
531 }
532 }
543 private static int flags(long limit) {
544 return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
545 }
546
547 /**
548 * {@code ForkJoinTask} implementing slice computation.
549 *
550 * @param <P_IN> Input element type to the stream pipeline
551 * @param <P_OUT> Output element type from the stream pipeline
552 */
553 @SuppressWarnings("serial")
554 private static final class SliceTask<P_IN, P_OUT>
555 extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> {
556 private final AbstractPipeline<P_OUT, P_OUT, ?> op;
557 private final IntFunction<P_OUT[]> generator;
558 private final long targetOffset, targetSize;
559 private long thisNodeSize;
560
561 private volatile boolean completed;
562
563 SliceTask(AbstractPipeline<?, P_OUT, ?> op,
564 PipelineHelper<P_OUT> helper,
565 Spliterator<P_IN> spliterator,
566 IntFunction<P_OUT[]> generator,
567 long offset, long size) {
568 super(helper, spliterator);
569 this.op = (AbstractPipeline<P_OUT, P_OUT, ?>) op;
570 this.generator = generator;
571 this.targetOffset = offset;
572 this.targetSize = size;
573 }
574
575 SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
576 super(parent, spliterator);
577 this.op = parent.op;
578 this.generator = parent.generator;
579 this.targetOffset = parent.targetOffset;
580 this.targetSize = parent.targetSize;
581 }
582
583 @Override
584 protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
585 return new SliceTask<>(this, spliterator);
586 }
587
588 @Override
589 protected final Node<P_OUT> getEmptyResult() {
|
79 assert s.hasCharacteristics(Spliterator.SUBSIZED);
80 long sliceFence = calcSliceFence(skip, limit);
81 switch (shape) {
82 case REFERENCE:
83 return new StreamSpliterators
84 .SliceSpliterator.OfRef<>(s, skip, sliceFence);
85 case INT_VALUE:
86 return (Spliterator<P_IN>) new StreamSpliterators
87 .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
88 case LONG_VALUE:
89 return (Spliterator<P_IN>) new StreamSpliterators
90 .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
91 case DOUBLE_VALUE:
92 return (Spliterator<P_IN>) new StreamSpliterators
93 .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
94 default:
95 throw new IllegalStateException("Unknown shape " + shape);
96 }
97 }
98
99 @SuppressWarnings("unchecked")
100 private static <T> IntFunction<T[]> castingArray() {
101 return size -> (T[]) new Object[size];
102 }
103
104 /**
105 * Appends a "slice" operation to the provided stream. The slice operation
106 * may be may be skip-only, limit-only, or skip-and-limit.
107 *
108 * @param <T> the type of both input and output elements
109 * @param upstream a reference stream with element type T
110 * @param skip the number of elements to skip. Must be >= 0.
111 * @param limit the maximum size of the resulting stream, or -1 if no limit
112 * is to be imposed
113 */
114 public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
115 long skip, long limit) {
116 if (skip < 0)
117 throw new IllegalArgumentException("Skip must be non-negative: " + skip);
118
119 return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
120 flags(limit)) {
121 Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
122 long skip, long limit, long sizeIfKnown) {
123 if (skip <= sizeIfKnown) {
124 // Use just the limit if the number of elements
125 // to skip is <= the known pipeline size
126 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
127 skip = 0;
128 }
129 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
130 }
131
132 @Override
133 <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
134 long size = helper.exactOutputSizeIfKnown(spliterator);
135 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
136 return new StreamSpliterators.SliceSpliterator.OfRef<>(
137 helper.wrapSpliterator(spliterator),
138 skip,
139 calcSliceFence(skip, limit));
140 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
141 return unorderedSkipLimitSpliterator(
142 helper.wrapSpliterator(spliterator),
143 skip, limit, size);
144 }
145 else {
146 // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n)
147 // regardless of the value of n
148 // Need to adjust the target size of splitting for the
149 // SliceTask from say (size / k) to say min(size / k, 1 << 14)
150 // This will limit the size of the buffers created at the leaf nodes
151 // cancellation will be more aggressive cancelling later tasks
152 // if the target slice size has been reached from a given task,
153 // cancellation should also clear local results if any
154 return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit).
155 invoke().spliterator();
156 }
157 }
158
159 @Override
160 <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
161 Spliterator<P_IN> spliterator,
162 IntFunction<T[]> generator) {
163 long size = helper.exactOutputSizeIfKnown(spliterator);
164 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
165 // Because the pipeline is SIZED the slice spliterator
166 // can be created from the source, this requires matching
167 // to shape of the source, and is potentially more efficient
168 // than creating the slice spliterator from the pipeline
169 // wrapping spliterator
170 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
171 return Nodes.collect(helper, s, true, generator);
172 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
173 Spliterator<T> s = unorderedSkipLimitSpliterator(
174 helper.wrapSpliterator(spliterator),
175 skip, limit, size);
176 // Collect using this pipeline, which is empty and therefore
177 // can be used with the pipeline wrapping spliterator
178 // Note that we cannot create a slice spliterator from
179 // the source spliterator if the pipeline is not SIZED
180 return Nodes.collect(this, s, true, generator);
181 }
182 else {
183 return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
184 invoke();
185 }
186 }
187
188 @Override
189 Sink<T> opWrapSink(int flags, Sink<T> sink) {
190 return new Sink.ChainedReference<T, T>(sink) {
191 long n = skip;
192 long m = limit >= 0 ? limit : Long.MAX_VALUE;
193
194 @Override
195 public void begin(long size) {
196 downstream.begin(calcSize(size, skip, m));
197 }
198
199 @Override
200 public void accept(T t) {
201 if (n == 0) {
202 if (m > 0) {
203 m--;
204 downstream.accept(t);
205 }
206 }
207 else {
208 n--;
209 }
210 }
279 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
280 return Nodes.collectInt(helper, s, true);
281 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
282 Spliterator.OfInt s = unorderedSkipLimitSpliterator(
283 (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
284 skip, limit, size);
285 // Collect using this pipeline, which is empty and therefore
286 // can be used with the pipeline wrapping spliterator
287 // Note that we cannot create a slice spliterator from
288 // the source spliterator if the pipeline is not SIZED
289 return Nodes.collectInt(this, s, true);
290 }
291 else {
292 return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
293 invoke();
294 }
295 }
296
297 @Override
298 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
299 return new Sink.ChainedInt<Integer>(sink) {
300 long n = skip;
301 long m = limit >= 0 ? limit : Long.MAX_VALUE;
302
303 @Override
304 public void begin(long size) {
305 downstream.begin(calcSize(size, skip, m));
306 }
307
308 @Override
309 public void accept(int t) {
310 if (n == 0) {
311 if (m > 0) {
312 m--;
313 downstream.accept(t);
314 }
315 }
316 else {
317 n--;
318 }
319 }
388 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
389 return Nodes.collectLong(helper, s, true);
390 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
391 Spliterator.OfLong s = unorderedSkipLimitSpliterator(
392 (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
393 skip, limit, size);
394 // Collect using this pipeline, which is empty and therefore
395 // can be used with the pipeline wrapping spliterator
396 // Note that we cannot create a slice spliterator from
397 // the source spliterator if the pipeline is not SIZED
398 return Nodes.collectLong(this, s, true);
399 }
400 else {
401 return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
402 invoke();
403 }
404 }
405
406 @Override
407 Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
408 return new Sink.ChainedLong<Long>(sink) {
409 long n = skip;
410 long m = limit >= 0 ? limit : Long.MAX_VALUE;
411
412 @Override
413 public void begin(long size) {
414 downstream.begin(calcSize(size, skip, m));
415 }
416
417 @Override
418 public void accept(long t) {
419 if (n == 0) {
420 if (m > 0) {
421 m--;
422 downstream.accept(t);
423 }
424 }
425 else {
426 n--;
427 }
428 }
497 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
498 return Nodes.collectDouble(helper, s, true);
499 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
500 Spliterator.OfDouble s = unorderedSkipLimitSpliterator(
501 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
502 skip, limit, size);
503 // Collect using this pipeline, which is empty and therefore
504 // can be used with the pipeline wrapping spliterator
505 // Note that we cannot create a slice spliterator from
506 // the source spliterator if the pipeline is not SIZED
507 return Nodes.collectDouble(this, s, true);
508 }
509 else {
510 return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
511 invoke();
512 }
513 }
514
515 @Override
516 Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
517 return new Sink.ChainedDouble<Double>(sink) {
518 long n = skip;
519 long m = limit >= 0 ? limit : Long.MAX_VALUE;
520
521 @Override
522 public void begin(long size) {
523 downstream.begin(calcSize(size, skip, m));
524 }
525
526 @Override
527 public void accept(double t) {
528 if (n == 0) {
529 if (m > 0) {
530 m--;
531 downstream.accept(t);
532 }
533 }
534 else {
535 n--;
536 }
537 }
548 private static int flags(long limit) {
549 return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
550 }
551
552 /**
553 * {@code ForkJoinTask} implementing slice computation.
554 *
555 * @param <P_IN> Input element type to the stream pipeline
556 * @param <P_OUT> Output element type from the stream pipeline
557 */
558 @SuppressWarnings("serial")
559 private static final class SliceTask<P_IN, P_OUT>
560 extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> {
561 private final AbstractPipeline<P_OUT, P_OUT, ?> op;
562 private final IntFunction<P_OUT[]> generator;
563 private final long targetOffset, targetSize;
564 private long thisNodeSize;
565
566 private volatile boolean completed;
567
568 SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
569 PipelineHelper<P_OUT> helper,
570 Spliterator<P_IN> spliterator,
571 IntFunction<P_OUT[]> generator,
572 long offset, long size) {
573 super(helper, spliterator);
574 this.op = op;
575 this.generator = generator;
576 this.targetOffset = offset;
577 this.targetSize = size;
578 }
579
580 SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
581 super(parent, spliterator);
582 this.op = parent.op;
583 this.generator = parent.generator;
584 this.targetOffset = parent.targetOffset;
585 this.targetSize = parent.targetSize;
586 }
587
588 @Override
589 protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
590 return new SliceTask<>(this, spliterator);
591 }
592
593 @Override
594 protected final Node<P_OUT> getEmptyResult() {
|