src/share/classes/java/util/stream/SliceOps.java

Print this page
rev 7923 : 8023681: Fix raw type warning caused by Sink
Reviewed-by:


  79         assert s.hasCharacteristics(Spliterator.SUBSIZED);
  80         long sliceFence = calcSliceFence(skip, limit);
  81         switch (shape) {
  82             case REFERENCE:
  83                 return new StreamSpliterators
  84                         .SliceSpliterator.OfRef<>(s, skip, sliceFence);
  85             case INT_VALUE:
  86                 return (Spliterator<P_IN>) new StreamSpliterators
  87                         .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
  88             case LONG_VALUE:
  89                 return (Spliterator<P_IN>) new StreamSpliterators
  90                         .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
  91             case DOUBLE_VALUE:
  92                 return (Spliterator<P_IN>) new StreamSpliterators
  93                         .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
  94             default:
  95                 throw new IllegalStateException("Unknown shape " + shape);
  96         }
  97     }
  98 





  99     /**
 100      * Appends a "slice" operation to the provided stream.  The slice operation
 101      * may be may be skip-only, limit-only, or skip-and-limit.
 102      *
 103      * @param <T> the type of both input and output elements
 104      * @param upstream a reference stream with element type T
 105      * @param skip the number of elements to skip.  Must be >= 0.
 106      * @param limit the maximum size of the resulting stream, or -1 if no limit
 107      *        is to be imposed
 108      */
 109     public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
 110                                        long skip, long limit) {
 111         if (skip < 0)
 112             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
 113 
 114         return new ReferencePipeline.StatefulOp<T,T>(upstream, StreamShape.REFERENCE,
 115                                                      flags(limit)) {
 116             Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
 117                                                          long skip, long limit, long sizeIfKnown) {
 118                 if (skip <= sizeIfKnown) {
 119                     // Use just the limit if the number of elements
 120                     // to skip is <= the known pipeline size
 121                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
 122                     skip = 0;
 123                 }
 124                 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
 125             }
 126 
 127             @Override
 128             <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
 129                 long size = helper.exactOutputSizeIfKnown(spliterator);
 130                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 131                     return new StreamSpliterators.SliceSpliterator.OfRef<>(
 132                             helper.wrapSpliterator(spliterator),
 133                             skip,
 134                             calcSliceFence(skip, limit));
 135                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 136                     return unorderedSkipLimitSpliterator(
 137                             helper.wrapSpliterator(spliterator),
 138                             skip, limit, size);
 139                 }
 140                 else {
 141                     // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n)
 142                     //     regardless of the value of n
 143                     //     Need to adjust the target size of splitting for the
 144                     //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
 145                     //     This will limit the size of the buffers created at the leaf nodes
 146                     //     cancellation will be more aggressive cancelling later tasks
 147                     //     if the target slice size has been reached from a given task,
 148                     //     cancellation should also clear local results if any
 149                     return new SliceTask<>(this, helper, spliterator, i -> (T[]) new Object[i], skip, limit).
 150                             invoke().spliterator();
 151                 }
 152             }
 153 
 154             @Override
 155             <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 156                                               Spliterator<P_IN> spliterator,
 157                                               IntFunction<T[]> generator) {
 158                 long size = helper.exactOutputSizeIfKnown(spliterator);
 159                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 160                     // Because the pipeline is SIZED the slice spliterator
 161                     // can be created from the source, this requires matching
 162                     // to shape of the source, and is potentially more efficient
 163                     // than creating the slice spliterator from the pipeline
 164                     // wrapping spliterator
 165                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 166                     return Nodes.collect(helper, s, true, generator);
 167                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 168                     Spliterator<T> s =  unorderedSkipLimitSpliterator(
 169                             helper.wrapSpliterator(spliterator),
 170                             skip, limit, size);
 171                     // Collect using this pipeline, which is empty and therefore
 172                     // can be used with the pipeline wrapping spliterator
 173                     // Note that we cannot create a slice spliterator from
 174                     // the source spliterator if the pipeline is not SIZED
 175                     return Nodes.collect(this, s, true, generator);
 176                 }
 177                 else {
 178                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 179                             invoke();
 180                 }
 181             }
 182 
 183             @Override
 184             Sink<T> opWrapSink(int flags, Sink<T> sink) {
 185                 return new Sink.ChainedReference<T>(sink) {
 186                     long n = skip;
 187                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 188 
 189                     @Override
 190                     public void begin(long size) {
 191                         downstream.begin(calcSize(size, skip, m));
 192                     }
 193 
 194                     @Override
 195                     public void accept(T t) {
 196                         if (n == 0) {
 197                             if (m > 0) {
 198                                 m--;
 199                                 downstream.accept(t);
 200                             }
 201                         }
 202                         else {
 203                             n--;
 204                         }
 205                     }


 274                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 275                     return Nodes.collectInt(helper, s, true);
 276                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 277                     Spliterator.OfInt s =  unorderedSkipLimitSpliterator(
 278                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
 279                             skip, limit, size);
 280                     // Collect using this pipeline, which is empty and therefore
 281                     // can be used with the pipeline wrapping spliterator
 282                     // Note that we cannot create a slice spliterator from
 283                     // the source spliterator if the pipeline is not SIZED
 284                     return Nodes.collectInt(this, s, true);
 285                 }
 286                 else {
 287                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 288                             invoke();
 289                 }
 290             }
 291 
 292             @Override
 293             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 294                 return new Sink.ChainedInt(sink) {
 295                     long n = skip;
 296                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 297 
 298                     @Override
 299                     public void begin(long size) {
 300                         downstream.begin(calcSize(size, skip, m));
 301                     }
 302 
 303                     @Override
 304                     public void accept(int t) {
 305                         if (n == 0) {
 306                             if (m > 0) {
 307                                 m--;
 308                                 downstream.accept(t);
 309                             }
 310                         }
 311                         else {
 312                             n--;
 313                         }
 314                     }


 383                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 384                     return Nodes.collectLong(helper, s, true);
 385                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 386                     Spliterator.OfLong s =  unorderedSkipLimitSpliterator(
 387                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
 388                             skip, limit, size);
 389                     // Collect using this pipeline, which is empty and therefore
 390                     // can be used with the pipeline wrapping spliterator
 391                     // Note that we cannot create a slice spliterator from
 392                     // the source spliterator if the pipeline is not SIZED
 393                     return Nodes.collectLong(this, s, true);
 394                 }
 395                 else {
 396                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 397                             invoke();
 398                 }
 399             }
 400 
 401             @Override
 402             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 403                 return new Sink.ChainedLong(sink) {
 404                     long n = skip;
 405                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 406 
 407                     @Override
 408                     public void begin(long size) {
 409                         downstream.begin(calcSize(size, skip, m));
 410                     }
 411 
 412                     @Override
 413                     public void accept(long t) {
 414                         if (n == 0) {
 415                             if (m > 0) {
 416                                 m--;
 417                                 downstream.accept(t);
 418                             }
 419                         }
 420                         else {
 421                             n--;
 422                         }
 423                     }


 492                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 493                     return Nodes.collectDouble(helper, s, true);
 494                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 495                     Spliterator.OfDouble s =  unorderedSkipLimitSpliterator(
 496                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
 497                             skip, limit, size);
 498                     // Collect using this pipeline, which is empty and therefore
 499                     // can be used with the pipeline wrapping spliterator
 500                     // Note that we cannot create a slice spliterator from
 501                     // the source spliterator if the pipeline is not SIZED
 502                     return Nodes.collectDouble(this, s, true);
 503                 }
 504                 else {
 505                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 506                             invoke();
 507                 }
 508             }
 509 
 510             @Override
 511             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 512                 return new Sink.ChainedDouble(sink) {
 513                     long n = skip;
 514                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 515 
 516                     @Override
 517                     public void begin(long size) {
 518                         downstream.begin(calcSize(size, skip, m));
 519                     }
 520 
 521                     @Override
 522                     public void accept(double t) {
 523                         if (n == 0) {
 524                             if (m > 0) {
 525                                 m--;
 526                                 downstream.accept(t);
 527                             }
 528                         }
 529                         else {
 530                             n--;
 531                         }
 532                     }


 543     private static int flags(long limit) {
 544         return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
 545     }
 546 
 547     /**
 548      * {@code ForkJoinTask} implementing slice computation.
 549      *
 550      * @param <P_IN> Input element type to the stream pipeline
 551      * @param <P_OUT> Output element type from the stream pipeline
 552      */
 553     @SuppressWarnings("serial")
 554     private static final class SliceTask<P_IN, P_OUT>
 555             extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> {
 556         private final AbstractPipeline<P_OUT, P_OUT, ?> op;
 557         private final IntFunction<P_OUT[]> generator;
 558         private final long targetOffset, targetSize;
 559         private long thisNodeSize;
 560 
 561         private volatile boolean completed;
 562 
 563         SliceTask(AbstractPipeline<?, P_OUT, ?> op,
 564                   PipelineHelper<P_OUT> helper,
 565                   Spliterator<P_IN> spliterator,
 566                   IntFunction<P_OUT[]> generator,
 567                   long offset, long size) {
 568             super(helper, spliterator);
 569             this.op = (AbstractPipeline<P_OUT, P_OUT, ?>) op;
 570             this.generator = generator;
 571             this.targetOffset = offset;
 572             this.targetSize = size;
 573         }
 574 
 575         SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
 576             super(parent, spliterator);
 577             this.op = parent.op;
 578             this.generator = parent.generator;
 579             this.targetOffset = parent.targetOffset;
 580             this.targetSize = parent.targetSize;
 581         }
 582 
 583         @Override
 584         protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
 585             return new SliceTask<>(this, spliterator);
 586         }
 587 
 588         @Override
 589         protected final Node<P_OUT> getEmptyResult() {




  79         assert s.hasCharacteristics(Spliterator.SUBSIZED);
  80         long sliceFence = calcSliceFence(skip, limit);
  81         switch (shape) {
  82             case REFERENCE:
  83                 return new StreamSpliterators
  84                         .SliceSpliterator.OfRef<>(s, skip, sliceFence);
  85             case INT_VALUE:
  86                 return (Spliterator<P_IN>) new StreamSpliterators
  87                         .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
  88             case LONG_VALUE:
  89                 return (Spliterator<P_IN>) new StreamSpliterators
  90                         .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
  91             case DOUBLE_VALUE:
  92                 return (Spliterator<P_IN>) new StreamSpliterators
  93                         .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
  94             default:
  95                 throw new IllegalStateException("Unknown shape " + shape);
  96         }
  97     }
  98 
  99     @SuppressWarnings("unchecked")
 100     private static <T> IntFunction<T[]> castingArray() {
 101         return size -> (T[]) new Object[size];
 102     }
 103 
 104     /**
 105      * Appends a "slice" operation to the provided stream.  The slice operation
 106      * may be may be skip-only, limit-only, or skip-and-limit.
 107      *
 108      * @param <T> the type of both input and output elements
 109      * @param upstream a reference stream with element type T
 110      * @param skip the number of elements to skip.  Must be >= 0.
 111      * @param limit the maximum size of the resulting stream, or -1 if no limit
 112      *        is to be imposed
 113      */
 114     public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
 115                                         long skip, long limit) {
 116         if (skip < 0)
 117             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
 118 
 119         return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
 120                                                       flags(limit)) {
 121             Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
 122                                                          long skip, long limit, long sizeIfKnown) {
 123                 if (skip <= sizeIfKnown) {
 124                     // Use just the limit if the number of elements
 125                     // to skip is <= the known pipeline size
 126                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
 127                     skip = 0;
 128                 }
 129                 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
 130             }
 131 
 132             @Override
 133             <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
 134                 long size = helper.exactOutputSizeIfKnown(spliterator);
 135                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 136                     return new StreamSpliterators.SliceSpliterator.OfRef<>(
 137                             helper.wrapSpliterator(spliterator),
 138                             skip,
 139                             calcSliceFence(skip, limit));
 140                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 141                     return unorderedSkipLimitSpliterator(
 142                             helper.wrapSpliterator(spliterator),
 143                             skip, limit, size);
 144                 }
 145                 else {
 146                     // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n)
 147                     //     regardless of the value of n
 148                     //     Need to adjust the target size of splitting for the
 149                     //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
 150                     //     This will limit the size of the buffers created at the leaf nodes
 151                     //     cancellation will be more aggressive cancelling later tasks
 152                     //     if the target slice size has been reached from a given task,
 153                     //     cancellation should also clear local results if any
 154                     return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit).
 155                             invoke().spliterator();
 156                 }
 157             }
 158 
 159             @Override
 160             <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 161                                               Spliterator<P_IN> spliterator,
 162                                               IntFunction<T[]> generator) {
 163                 long size = helper.exactOutputSizeIfKnown(spliterator);
 164                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 165                     // Because the pipeline is SIZED the slice spliterator
 166                     // can be created from the source, this requires matching
 167                     // to shape of the source, and is potentially more efficient
 168                     // than creating the slice spliterator from the pipeline
 169                     // wrapping spliterator
 170                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 171                     return Nodes.collect(helper, s, true, generator);
 172                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 173                     Spliterator<T> s =  unorderedSkipLimitSpliterator(
 174                             helper.wrapSpliterator(spliterator),
 175                             skip, limit, size);
 176                     // Collect using this pipeline, which is empty and therefore
 177                     // can be used with the pipeline wrapping spliterator
 178                     // Note that we cannot create a slice spliterator from
 179                     // the source spliterator if the pipeline is not SIZED
 180                     return Nodes.collect(this, s, true, generator);
 181                 }
 182                 else {
 183                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 184                             invoke();
 185                 }
 186             }
 187 
 188             @Override
 189             Sink<T> opWrapSink(int flags, Sink<T> sink) {
 190                 return new Sink.ChainedReference<T, T>(sink) {
 191                     long n = skip;
 192                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 193 
 194                     @Override
 195                     public void begin(long size) {
 196                         downstream.begin(calcSize(size, skip, m));
 197                     }
 198 
 199                     @Override
 200                     public void accept(T t) {
 201                         if (n == 0) {
 202                             if (m > 0) {
 203                                 m--;
 204                                 downstream.accept(t);
 205                             }
 206                         }
 207                         else {
 208                             n--;
 209                         }
 210                     }


 279                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 280                     return Nodes.collectInt(helper, s, true);
 281                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 282                     Spliterator.OfInt s =  unorderedSkipLimitSpliterator(
 283                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
 284                             skip, limit, size);
 285                     // Collect using this pipeline, which is empty and therefore
 286                     // can be used with the pipeline wrapping spliterator
 287                     // Note that we cannot create a slice spliterator from
 288                     // the source spliterator if the pipeline is not SIZED
 289                     return Nodes.collectInt(this, s, true);
 290                 }
 291                 else {
 292                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 293                             invoke();
 294                 }
 295             }
 296 
 297             @Override
 298             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 299                 return new Sink.ChainedInt<Integer>(sink) {
 300                     long n = skip;
 301                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 302 
 303                     @Override
 304                     public void begin(long size) {
 305                         downstream.begin(calcSize(size, skip, m));
 306                     }
 307 
 308                     @Override
 309                     public void accept(int t) {
 310                         if (n == 0) {
 311                             if (m > 0) {
 312                                 m--;
 313                                 downstream.accept(t);
 314                             }
 315                         }
 316                         else {
 317                             n--;
 318                         }
 319                     }


 388                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 389                     return Nodes.collectLong(helper, s, true);
 390                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 391                     Spliterator.OfLong s =  unorderedSkipLimitSpliterator(
 392                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
 393                             skip, limit, size);
 394                     // Collect using this pipeline, which is empty and therefore
 395                     // can be used with the pipeline wrapping spliterator
 396                     // Note that we cannot create a slice spliterator from
 397                     // the source spliterator if the pipeline is not SIZED
 398                     return Nodes.collectLong(this, s, true);
 399                 }
 400                 else {
 401                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 402                             invoke();
 403                 }
 404             }
 405 
 406             @Override
 407             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 408                 return new Sink.ChainedLong<Long>(sink) {
 409                     long n = skip;
 410                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 411 
 412                     @Override
 413                     public void begin(long size) {
 414                         downstream.begin(calcSize(size, skip, m));
 415                     }
 416 
 417                     @Override
 418                     public void accept(long t) {
 419                         if (n == 0) {
 420                             if (m > 0) {
 421                                 m--;
 422                                 downstream.accept(t);
 423                             }
 424                         }
 425                         else {
 426                             n--;
 427                         }
 428                     }


 497                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 498                     return Nodes.collectDouble(helper, s, true);
 499                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 500                     Spliterator.OfDouble s =  unorderedSkipLimitSpliterator(
 501                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
 502                             skip, limit, size);
 503                     // Collect using this pipeline, which is empty and therefore
 504                     // can be used with the pipeline wrapping spliterator
 505                     // Note that we cannot create a slice spliterator from
 506                     // the source spliterator if the pipeline is not SIZED
 507                     return Nodes.collectDouble(this, s, true);
 508                 }
 509                 else {
 510                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 511                             invoke();
 512                 }
 513             }
 514 
 515             @Override
 516             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 517                 return new Sink.ChainedDouble<Double>(sink) {
 518                     long n = skip;
 519                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 520 
 521                     @Override
 522                     public void begin(long size) {
 523                         downstream.begin(calcSize(size, skip, m));
 524                     }
 525 
 526                     @Override
 527                     public void accept(double t) {
 528                         if (n == 0) {
 529                             if (m > 0) {
 530                                 m--;
 531                                 downstream.accept(t);
 532                             }
 533                         }
 534                         else {
 535                             n--;
 536                         }
 537                     }


 548     private static int flags(long limit) {
 549         return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
 550     }
 551 
 552     /**
 553      * {@code ForkJoinTask} implementing slice computation.
 554      *
 555      * @param <P_IN> Input element type to the stream pipeline
 556      * @param <P_OUT> Output element type from the stream pipeline
 557      */
 558     @SuppressWarnings("serial")
 559     private static final class SliceTask<P_IN, P_OUT>
 560             extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> {
 561         private final AbstractPipeline<P_OUT, P_OUT, ?> op;
 562         private final IntFunction<P_OUT[]> generator;
 563         private final long targetOffset, targetSize;
 564         private long thisNodeSize;
 565 
 566         private volatile boolean completed;
 567 
 568         SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
 569                   PipelineHelper<P_OUT> helper,
 570                   Spliterator<P_IN> spliterator,
 571                   IntFunction<P_OUT[]> generator,
 572                   long offset, long size) {
 573             super(helper, spliterator);
 574             this.op = op;
 575             this.generator = generator;
 576             this.targetOffset = offset;
 577             this.targetSize = size;
 578         }
 579 
 580         SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
 581             super(parent, spliterator);
 582             this.op = parent.op;
 583             this.generator = parent.generator;
 584             this.targetOffset = parent.targetOffset;
 585             this.targetSize = parent.targetSize;
 586         }
 587 
 588         @Override
 589         protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
 590             return new SliceTask<>(this, spliterator);
 591         }
 592 
 593         @Override
 594         protected final Node<P_OUT> getEmptyResult() {