1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Spliterator;
  28 import java.util.concurrent.CountedCompleter;
  29 import java.util.function.IntFunction;
  30 
  31 /**
  32  * Factory for instances of a short-circuiting stateful intermediate operations
  33  * that produce subsequences of their input stream.
  34  *
  35  * @since 1.8
  36  */
  37 final class SliceOps {
  38 
  39     // No instances
  40     private SliceOps() { }
  41 
  42     /**
  43      * Calculates the sliced size given the current size, number of elements
  44      * skip, and the number of elements to limit.
  45      *
  46      * @param size the current size
  47      * @param skip the number of elements to skip, assumed to be >= 0
  48      * @param limit the number of elements to limit, assumed to be >= 0, with
  49      *        a value of {@code Long.MAX_VALUE} if there is no limit
  50      * @return the sliced size
  51      */
  52     private static long calcSize(long size, long skip, long limit) {
  53         return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1;
  54     }
  55 
  56     /**
  57      * Calculates the slice fence, which is one past the index of the slice
  58      * range
  59      * @param skip the number of elements to skip, assumed to be >= 0
  60      * @param limit the number of elements to limit, assumed to be >= 0, with
  61      *        a value of {@code Long.MAX_VALUE} if there is no limit
  62      * @return the slice fence.
  63      */
  64     private static long calcSliceFence(long skip, long limit) {
  65         long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE;
  66         // Check for overflow
  67         return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE;
  68     }
  69 
  70     /**
  71      * Creates a slice spliterator given a stream shape governing the
  72      * spliterator type.  Requires that the underlying Spliterator
  73      * be SUBSIZED.
  74      */
  75     @SuppressWarnings("unchecked")
  76     private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape,
  77                                                              Spliterator<P_IN> s,
  78                                                              long skip, long limit) {
  79         assert s.hasCharacteristics(Spliterator.SUBSIZED);
  80         long sliceFence = calcSliceFence(skip, limit);
  81         switch (shape) {
  82             case REFERENCE:
  83                 return new StreamSpliterators
  84                         .SliceSpliterator.OfRef<>(s, skip, sliceFence);
  85             case INT_VALUE:
  86                 return (Spliterator<P_IN>) new StreamSpliterators
  87                         .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
  88             case LONG_VALUE:
  89                 return (Spliterator<P_IN>) new StreamSpliterators
  90                         .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
  91             case DOUBLE_VALUE:
  92                 return (Spliterator<P_IN>) new StreamSpliterators
  93                         .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
  94             default:
  95                 throw new IllegalStateException("Unknown shape " + shape);
  96         }
  97     }
  98 
  99     /**
 100      * Appends a "slice" operation to the provided stream.  The slice operation
 101      * may be may be skip-only, limit-only, or skip-and-limit.
 102      *
 103      * @param <T> the type of both input and output elements
 104      * @param upstream a reference stream with element type T
 105      * @param skip the number of elements to skip.  Must be >= 0.
 106      * @param limit the maximum size of the resulting stream, or -1 if no limit
 107      *        is to be imposed
 108      */
 109     public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
 110                                        long skip, long limit) {
 111         if (skip < 0)
 112             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
 113 
 114         return new ReferencePipeline.StatefulOp<T,T>(upstream, StreamShape.REFERENCE,
 115                                                      flags(limit)) {
 116             Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
 117                                                          long skip, long limit, long sizeIfKnown) {
 118                 if (skip <= sizeIfKnown) {
 119                     // Use just the limit if the number of elements
 120                     // to skip is <= the known pipeline size
 121                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
 122                     skip = 0;
 123                 }
 124                 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
 125             }
 126 
 127             @Override
 128             <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
 129                 long size = helper.exactOutputSizeIfKnown(spliterator);
 130                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 131                     return new StreamSpliterators.SliceSpliterator.OfRef<>(
 132                             helper.wrapSpliterator(spliterator),
 133                             skip,
 134                             calcSliceFence(skip, limit));
 135                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 136                     return unorderedSkipLimitSpliterator(
 137                             helper.wrapSpliterator(spliterator),
 138                             skip, limit, size);
 139                 }
 140                 else {
 141                     // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n)
 142                     //     regardless of the value of n
 143                     //     Need to adjust the target size of splitting for the
 144                     //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
 145                     //     This will limit the size of the buffers created at the leaf nodes
 146                     //     cancellation will be more aggressive cancelling later tasks
 147                     //     if the target slice size has been reached from a given task,
 148                     //     cancellation should also clear local results if any
 149                     return new SliceTask<>(this, helper, spliterator, i -> (T[]) new Object[i], skip, limit).
 150                             invoke().spliterator();
 151                 }
 152             }
 153 
 154             @Override
 155             <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 156                                               Spliterator<P_IN> spliterator,
 157                                               IntFunction<T[]> generator) {
 158                 long size = helper.exactOutputSizeIfKnown(spliterator);
 159                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 160                     // Because the pipeline is SIZED the slice spliterator
 161                     // can be created from the source, this requires matching
 162                     // to shape of the source, and is potentially more efficient
 163                     // than creating the slice spliterator from the pipeline
 164                     // wrapping spliterator
 165                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 166                     return Nodes.collect(helper, s, true, generator);
 167                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 168                     Spliterator<T> s =  unorderedSkipLimitSpliterator(
 169                             helper.wrapSpliterator(spliterator),
 170                             skip, limit, size);
 171                     // Collect using this pipeline, which is empty and therefore
 172                     // can be used with the pipeline wrapping spliterator
 173                     // Note that we cannot create a slice spliterator from
 174                     // the source spliterator if the pipeline is not SIZED
 175                     return Nodes.collect(this, s, true, generator);
 176                 }
 177                 else {
 178                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 179                             invoke();
 180                 }
 181             }
 182 
 183             @Override
 184             Sink<T> opWrapSink(int flags, Sink<T> sink) {
 185                 return new Sink.ChainedReference<T>(sink) {
 186                     long n = skip;
 187                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 188 
 189                     @Override
 190                     public void begin(long size) {
 191                         downstream.begin(calcSize(size, skip, m));
 192                     }
 193 
 194                     @Override
 195                     public void accept(T t) {
 196                         if (n == 0) {
 197                             if (m > 0) {
 198                                 m--;
 199                                 downstream.accept(t);
 200                             }
 201                         }
 202                         else {
 203                             n--;
 204                         }
 205                     }
 206 
 207                     @Override
 208                     public boolean cancellationRequested() {
 209                         return m == 0 || downstream.cancellationRequested();
 210                     }
 211                 };
 212             }
 213         };
 214     }
 215 
 216     /**
 217      * Appends a "slice" operation to the provided IntStream.  The slice
 218      * operation may be may be skip-only, limit-only, or skip-and-limit.
 219      *
 220      * @param upstream An IntStream
 221      * @param skip The number of elements to skip.  Must be >= 0.
 222      * @param limit The maximum size of the resulting stream, or -1 if no limit
 223      *        is to be imposed
 224      */
 225     public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream,
 226                                     long skip, long limit) {
 227         if (skip < 0)
 228             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
 229 
 230         return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE,
 231                                                    flags(limit)) {
 232             Spliterator.OfInt unorderedSkipLimitSpliterator(
 233                     Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) {
 234                 if (skip <= sizeIfKnown) {
 235                     // Use just the limit if the number of elements
 236                     // to skip is <= the known pipeline size
 237                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
 238                     skip = 0;
 239                 }
 240                 return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit);
 241             }
 242 
 243             @Override
 244             <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
 245                                                                Spliterator<P_IN> spliterator) {
 246                 long size = helper.exactOutputSizeIfKnown(spliterator);
 247                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 248                     return new StreamSpliterators.SliceSpliterator.OfInt(
 249                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
 250                             skip,
 251                             calcSliceFence(skip, limit));
 252                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 253                     return unorderedSkipLimitSpliterator(
 254                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
 255                             skip, limit, size);
 256                 }
 257                 else {
 258                     return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit).
 259                             invoke().spliterator();
 260                 }
 261             }
 262 
 263             @Override
 264             <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
 265                                                     Spliterator<P_IN> spliterator,
 266                                                     IntFunction<Integer[]> generator) {
 267                 long size = helper.exactOutputSizeIfKnown(spliterator);
 268                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 269                     // Because the pipeline is SIZED the slice spliterator
 270                     // can be created from the source, this requires matching
 271                     // to shape of the source, and is potentially more efficient
 272                     // than creating the slice spliterator from the pipeline
 273                     // wrapping spliterator
 274                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 275                     return Nodes.collectInt(helper, s, true);
 276                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 277                     Spliterator.OfInt s =  unorderedSkipLimitSpliterator(
 278                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
 279                             skip, limit, size);
 280                     // Collect using this pipeline, which is empty and therefore
 281                     // can be used with the pipeline wrapping spliterator
 282                     // Note that we cannot create a slice spliterator from
 283                     // the source spliterator if the pipeline is not SIZED
 284                     return Nodes.collectInt(this, s, true);
 285                 }
 286                 else {
 287                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 288                             invoke();
 289                 }
 290             }
 291 
 292             @Override
 293             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 294                 return new Sink.ChainedInt(sink) {
 295                     long n = skip;
 296                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 297 
 298                     @Override
 299                     public void begin(long size) {
 300                         downstream.begin(calcSize(size, skip, m));
 301                     }
 302 
 303                     @Override
 304                     public void accept(int t) {
 305                         if (n == 0) {
 306                             if (m > 0) {
 307                                 m--;
 308                                 downstream.accept(t);
 309                             }
 310                         }
 311                         else {
 312                             n--;
 313                         }
 314                     }
 315 
 316                     @Override
 317                     public boolean cancellationRequested() {
 318                         return m == 0 || downstream.cancellationRequested();
 319                     }
 320                 };
 321             }
 322         };
 323     }
 324 
 325     /**
 326      * Appends a "slice" operation to the provided LongStream.  The slice
 327      * operation may be may be skip-only, limit-only, or skip-and-limit.
 328      *
 329      * @param upstream A LongStream
 330      * @param skip The number of elements to skip.  Must be >= 0.
 331      * @param limit The maximum size of the resulting stream, or -1 if no limit
 332      *        is to be imposed
 333      */
 334     public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream,
 335                                       long skip, long limit) {
 336         if (skip < 0)
 337             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
 338 
 339         return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE,
 340                                                  flags(limit)) {
 341             Spliterator.OfLong unorderedSkipLimitSpliterator(
 342                     Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) {
 343                 if (skip <= sizeIfKnown) {
 344                     // Use just the limit if the number of elements
 345                     // to skip is <= the known pipeline size
 346                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
 347                     skip = 0;
 348                 }
 349                 return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit);
 350             }
 351 
 352             @Override
 353             <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
 354                                                             Spliterator<P_IN> spliterator) {
 355                 long size = helper.exactOutputSizeIfKnown(spliterator);
 356                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 357                     return new StreamSpliterators.SliceSpliterator.OfLong(
 358                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
 359                             skip,
 360                             calcSliceFence(skip, limit));
 361                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 362                     return unorderedSkipLimitSpliterator(
 363                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
 364                             skip, limit, size);
 365                 }
 366                 else {
 367                     return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit).
 368                             invoke().spliterator();
 369                 }
 370             }
 371 
 372             @Override
 373             <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
 374                                                  Spliterator<P_IN> spliterator,
 375                                                  IntFunction<Long[]> generator) {
 376                 long size = helper.exactOutputSizeIfKnown(spliterator);
 377                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 378                     // Because the pipeline is SIZED the slice spliterator
 379                     // can be created from the source, this requires matching
 380                     // to shape of the source, and is potentially more efficient
 381                     // than creating the slice spliterator from the pipeline
 382                     // wrapping spliterator
 383                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 384                     return Nodes.collectLong(helper, s, true);
 385                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 386                     Spliterator.OfLong s =  unorderedSkipLimitSpliterator(
 387                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
 388                             skip, limit, size);
 389                     // Collect using this pipeline, which is empty and therefore
 390                     // can be used with the pipeline wrapping spliterator
 391                     // Note that we cannot create a slice spliterator from
 392                     // the source spliterator if the pipeline is not SIZED
 393                     return Nodes.collectLong(this, s, true);
 394                 }
 395                 else {
 396                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 397                             invoke();
 398                 }
 399             }
 400 
 401             @Override
 402             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 403                 return new Sink.ChainedLong(sink) {
 404                     long n = skip;
 405                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 406 
 407                     @Override
 408                     public void begin(long size) {
 409                         downstream.begin(calcSize(size, skip, m));
 410                     }
 411 
 412                     @Override
 413                     public void accept(long t) {
 414                         if (n == 0) {
 415                             if (m > 0) {
 416                                 m--;
 417                                 downstream.accept(t);
 418                             }
 419                         }
 420                         else {
 421                             n--;
 422                         }
 423                     }
 424 
 425                     @Override
 426                     public boolean cancellationRequested() {
 427                         return m == 0 || downstream.cancellationRequested();
 428                     }
 429                 };
 430             }
 431         };
 432     }
 433 
 434     /**
 435      * Appends a "slice" operation to the provided DoubleStream.  The slice
 436      * operation may be may be skip-only, limit-only, or skip-and-limit.
 437      *
 438      * @param upstream A DoubleStream
 439      * @param skip The number of elements to skip.  Must be >= 0.
 440      * @param limit The maximum size of the resulting stream, or -1 if no limit
 441      *        is to be imposed
 442      */
 443     public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream,
 444                                           long skip, long limit) {
 445         if (skip < 0)
 446             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
 447 
 448         return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE,
 449                                                      flags(limit)) {
 450             Spliterator.OfDouble unorderedSkipLimitSpliterator(
 451                     Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) {
 452                 if (skip <= sizeIfKnown) {
 453                     // Use just the limit if the number of elements
 454                     // to skip is <= the known pipeline size
 455                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
 456                     skip = 0;
 457                 }
 458                 return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit);
 459             }
 460 
 461             @Override
 462             <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
 463                                                               Spliterator<P_IN> spliterator) {
 464                 long size = helper.exactOutputSizeIfKnown(spliterator);
 465                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 466                     return new StreamSpliterators.SliceSpliterator.OfDouble(
 467                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
 468                             skip,
 469                             calcSliceFence(skip, limit));
 470                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 471                     return unorderedSkipLimitSpliterator(
 472                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
 473                             skip, limit, size);
 474                 }
 475                 else {
 476                     return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit).
 477                             invoke().spliterator();
 478                 }
 479             }
 480 
 481             @Override
 482             <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 483                                                    Spliterator<P_IN> spliterator,
 484                                                    IntFunction<Double[]> generator) {
 485                 long size = helper.exactOutputSizeIfKnown(spliterator);
 486                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
 487                     // Because the pipeline is SIZED the slice spliterator
 488                     // can be created from the source, this requires matching
 489                     // to shape of the source, and is potentially more efficient
 490                     // than creating the slice spliterator from the pipeline
 491                     // wrapping spliterator
 492                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
 493                     return Nodes.collectDouble(helper, s, true);
 494                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
 495                     Spliterator.OfDouble s =  unorderedSkipLimitSpliterator(
 496                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
 497                             skip, limit, size);
 498                     // Collect using this pipeline, which is empty and therefore
 499                     // can be used with the pipeline wrapping spliterator
 500                     // Note that we cannot create a slice spliterator from
 501                     // the source spliterator if the pipeline is not SIZED
 502                     return Nodes.collectDouble(this, s, true);
 503                 }
 504                 else {
 505                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
 506                             invoke();
 507                 }
 508             }
 509 
 510             @Override
 511             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 512                 return new Sink.ChainedDouble(sink) {
 513                     long n = skip;
 514                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
 515 
 516                     @Override
 517                     public void begin(long size) {
 518                         downstream.begin(calcSize(size, skip, m));
 519                     }
 520 
 521                     @Override
 522                     public void accept(double t) {
 523                         if (n == 0) {
 524                             if (m > 0) {
 525                                 m--;
 526                                 downstream.accept(t);
 527                             }
 528                         }
 529                         else {
 530                             n--;
 531                         }
 532                     }
 533 
 534                     @Override
 535                     public boolean cancellationRequested() {
 536                         return m == 0 || downstream.cancellationRequested();
 537                     }
 538                 };
 539             }
 540         };
 541     }
 542 
 543     private static int flags(long limit) {
 544         return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
 545     }
 546 
 547     /**
 548      * {@code ForkJoinTask} implementing slice computation.
 549      *
 550      * @param <P_IN> Input element type to the stream pipeline
 551      * @param <P_OUT> Output element type from the stream pipeline
 552      */
 553     @SuppressWarnings("serial")
 554     private static final class SliceTask<P_IN, P_OUT>
 555             extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> {
 556         private final AbstractPipeline<P_OUT, P_OUT, ?> op;
 557         private final IntFunction<P_OUT[]> generator;
 558         private final long targetOffset, targetSize;
 559         private long thisNodeSize;
 560 
 561         private volatile boolean completed;
 562 
 563         SliceTask(AbstractPipeline<?, P_OUT, ?> op,
 564                   PipelineHelper<P_OUT> helper,
 565                   Spliterator<P_IN> spliterator,
 566                   IntFunction<P_OUT[]> generator,
 567                   long offset, long size) {
 568             super(helper, spliterator);
 569             this.op = (AbstractPipeline<P_OUT, P_OUT, ?>) op;
 570             this.generator = generator;
 571             this.targetOffset = offset;
 572             this.targetSize = size;
 573         }
 574 
 575         SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
 576             super(parent, spliterator);
 577             this.op = parent.op;
 578             this.generator = parent.generator;
 579             this.targetOffset = parent.targetOffset;
 580             this.targetSize = parent.targetSize;
 581         }
 582 
 583         @Override
 584         protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
 585             return new SliceTask<>(this, spliterator);
 586         }
 587 
 588         @Override
 589         protected final Node<P_OUT> getEmptyResult() {
 590             return Nodes.emptyNode(op.getOutputShape());
 591         }
 592 
 593         @Override
 594         protected final Node<P_OUT> doLeaf() {
 595             if (isRoot()) {
 596                 long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags)
 597                                    ? op.exactOutputSizeIfKnown(spliterator)
 598                                    : -1;
 599                 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator);
 600                 Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
 601                 helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
 602                 // There is no need to truncate since the op performs the
 603                 // skipping and limiting of elements
 604                 return nb.build();
 605             }
 606             else {
 607                 Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator),
 608                                                           spliterator).build();
 609                 thisNodeSize = node.count();
 610                 completed = true;
 611                 spliterator = null;
 612                 return node;
 613             }
 614         }
 615 
 616         @Override
 617         public final void onCompletion(CountedCompleter<?> caller) {
 618             if (!isLeaf()) {
 619                 Node<P_OUT> result;
 620                 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
 621                 if (canceled) {
 622                     thisNodeSize = 0;
 623                     result = getEmptyResult();
 624                 }
 625                 else if (thisNodeSize == 0)
 626                     result = getEmptyResult();
 627                 else if (leftChild.thisNodeSize == 0)
 628                     result = rightChild.getLocalResult();
 629                 else {
 630                     result = Nodes.conc(op.getOutputShape(),
 631                                         leftChild.getLocalResult(), rightChild.getLocalResult());
 632                 }
 633                 setLocalResult(isRoot() ? doTruncate(result) : result);
 634                 completed = true;
 635             }
 636             if (targetSize >= 0
 637                 && !isRoot()
 638                 && isLeftCompleted(targetOffset + targetSize))
 639                     cancelLaterNodes();
 640 
 641             super.onCompletion(caller);
 642         }
 643 
 644         @Override
 645         protected void cancel() {
 646             super.cancel();
 647             if (completed)
 648                 setLocalResult(getEmptyResult());
 649         }
 650 
 651         private Node<P_OUT> doTruncate(Node<P_OUT> input) {
 652             long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize;
 653             return input.truncate(targetOffset, to, generator);
 654         }
 655 
 656         /**
 657          * Determine if the number of completed elements in this node and nodes
 658          * to the left of this node is greater than or equal to the target size.
 659          *
 660          * @param target the target size
 661          * @return true if the number of elements is greater than or equal to
 662          *         the target size, otherwise false.
 663          */
 664         private boolean isLeftCompleted(long target) {
 665             long size = completed ? thisNodeSize : completedSize(target);
 666             if (size >= target)
 667                 return true;
 668             for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this;
 669                  parent != null;
 670                  node = parent, parent = parent.getParent()) {
 671                 if (node == parent.rightChild) {
 672                     SliceTask<P_IN, P_OUT> left = parent.leftChild;
 673                     if (left != null) {
 674                         size += left.completedSize(target);
 675                         if (size >= target)
 676                             return true;
 677                     }
 678                 }
 679             }
 680             return size >= target;
 681         }
 682 
 683         /**
 684          * Compute the number of completed elements in this node.
 685          * <p>
 686          * Computation terminates if all nodes have been processed or the
 687          * number of completed elements is greater than or equal to the target
 688          * size.
 689          *
 690          * @param target the target size
 691          * @return return the number of completed elements
 692          */
 693         private long completedSize(long target) {
 694             if (completed)
 695                 return thisNodeSize;
 696             else {
 697                 SliceTask<P_IN, P_OUT> left = leftChild;
 698                 SliceTask<P_IN, P_OUT> right = rightChild;
 699                 if (left == null || right == null) {
 700                     // must be completed
 701                     return thisNodeSize;
 702                 }
 703                 else {
 704                     long leftSize = left.completedSize(target);
 705                     return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target);
 706                 }
 707             }
 708         }
 709     }
 710 }