1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Spliterator; 28 import java.util.concurrent.CountedCompleter; 29 import java.util.function.IntFunction; 30 31 /** 32 * Factory for instances of a short-circuiting stateful intermediate operations 33 * that produce subsequences of their input stream. 34 * 35 * @since 1.8 36 */ 37 final class SliceOps { 38 39 // No instances 40 private SliceOps() { } 41 42 /** 43 * Calculates the sliced size given the current size, number of elements 44 * skip, and the number of elements to limit. 45 * 46 * @param size the current size 47 * @param skip the number of elements to skip, assumed to be >= 0 48 * @param limit the number of elements to limit, assumed to be >= 0, with 49 * a value of {@code Long.MAX_VALUE} if there is no limit 50 * @return the sliced size 51 */ 52 private static long calcSize(long size, long skip, long limit) { 53 return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1; 54 } 55 56 /** 57 * Calculates the slice fence, which is one past the index of the slice 58 * range 59 * @param skip the number of elements to skip, assumed to be >= 0 60 * @param limit the number of elements to limit, assumed to be >= 0, with 61 * a value of {@code Long.MAX_VALUE} if there is no limit 62 * @return the slice fence. 63 */ 64 private static long calcSliceFence(long skip, long limit) { 65 long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE; 66 // Check for overflow 67 return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE; 68 } 69 70 /** 71 * Creates a slice spliterator given a stream shape governing the 72 * spliterator type. Requires that the underlying Spliterator 73 * be SUBSIZED. 74 */ 75 @SuppressWarnings("unchecked") 76 private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape, 77 Spliterator<P_IN> s, 78 long skip, long limit) { 79 assert s.hasCharacteristics(Spliterator.SUBSIZED); 80 long sliceFence = calcSliceFence(skip, limit); 81 switch (shape) { 82 case REFERENCE: 83 return new StreamSpliterators 84 .SliceSpliterator.OfRef<>(s, skip, sliceFence); 85 case INT_VALUE: 86 return (Spliterator<P_IN>) new StreamSpliterators 87 .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence); 88 case LONG_VALUE: 89 return (Spliterator<P_IN>) new StreamSpliterators 90 .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence); 91 case DOUBLE_VALUE: 92 return (Spliterator<P_IN>) new StreamSpliterators 93 .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence); 94 default: 95 throw new IllegalStateException("Unknown shape " + shape); 96 } 97 } 98 99 /** 100 * Appends a "slice" operation to the provided stream. The slice operation 101 * may be may be skip-only, limit-only, or skip-and-limit. 102 * 103 * @param <T> the type of both input and output elements 104 * @param upstream a reference stream with element type T 105 * @param skip the number of elements to skip. Must be >= 0. 106 * @param limit the maximum size of the resulting stream, or -1 if no limit 107 * is to be imposed 108 */ 109 public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream, 110 long skip, long limit) { 111 if (skip < 0) 112 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 113 114 return new ReferencePipeline.StatefulOp<T,T>(upstream, StreamShape.REFERENCE, 115 flags(limit)) { 116 Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s, 117 long skip, long limit, long sizeIfKnown) { 118 if (skip <= sizeIfKnown) { 119 // Use just the limit if the number of elements 120 // to skip is <= the known pipeline size 121 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 122 skip = 0; 123 } 124 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit); 125 } 126 127 @Override 128 <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { 129 long size = helper.exactOutputSizeIfKnown(spliterator); 130 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 131 return new StreamSpliterators.SliceSpliterator.OfRef<>( 132 helper.wrapSpliterator(spliterator), 133 skip, 134 calcSliceFence(skip, limit)); 135 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 136 return unorderedSkipLimitSpliterator( 137 helper.wrapSpliterator(spliterator), 138 skip, limit, size); 139 } 140 else { 141 // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n) 142 // regardless of the value of n 143 // Need to adjust the target size of splitting for the 144 // SliceTask from say (size / k) to say min(size / k, 1 << 14) 145 // This will limit the size of the buffers created at the leaf nodes 146 // cancellation will be more aggressive cancelling later tasks 147 // if the target slice size has been reached from a given task, 148 // cancellation should also clear local results if any 149 return new SliceTask<>(this, helper, spliterator, i -> (T[]) new Object[i], skip, limit). 150 invoke().spliterator(); 151 } 152 } 153 154 @Override 155 <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 156 Spliterator<P_IN> spliterator, 157 IntFunction<T[]> generator) { 158 long size = helper.exactOutputSizeIfKnown(spliterator); 159 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 160 // Because the pipeline is SIZED the slice spliterator 161 // can be created from the source, this requires matching 162 // to shape of the source, and is potentially more efficient 163 // than creating the slice spliterator from the pipeline 164 // wrapping spliterator 165 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 166 return Nodes.collect(helper, s, true, generator); 167 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 168 Spliterator<T> s = unorderedSkipLimitSpliterator( 169 helper.wrapSpliterator(spliterator), 170 skip, limit, size); 171 // Collect using this pipeline, which is empty and therefore 172 // can be used with the pipeline wrapping spliterator 173 // Note that we cannot create a slice spliterator from 174 // the source spliterator if the pipeline is not SIZED 175 return Nodes.collect(this, s, true, generator); 176 } 177 else { 178 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 179 invoke(); 180 } 181 } 182 183 @Override 184 Sink<T> opWrapSink(int flags, Sink<T> sink) { 185 return new Sink.ChainedReference<T>(sink) { 186 long n = skip; 187 long m = limit >= 0 ? limit : Long.MAX_VALUE; 188 189 @Override 190 public void begin(long size) { 191 downstream.begin(calcSize(size, skip, m)); 192 } 193 194 @Override 195 public void accept(T t) { 196 if (n == 0) { 197 if (m > 0) { 198 m--; 199 downstream.accept(t); 200 } 201 } 202 else { 203 n--; 204 } 205 } 206 207 @Override 208 public boolean cancellationRequested() { 209 return m == 0 || downstream.cancellationRequested(); 210 } 211 }; 212 } 213 }; 214 } 215 216 /** 217 * Appends a "slice" operation to the provided IntStream. The slice 218 * operation may be may be skip-only, limit-only, or skip-and-limit. 219 * 220 * @param upstream An IntStream 221 * @param skip The number of elements to skip. Must be >= 0. 222 * @param limit The maximum size of the resulting stream, or -1 if no limit 223 * is to be imposed 224 */ 225 public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream, 226 long skip, long limit) { 227 if (skip < 0) 228 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 229 230 return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE, 231 flags(limit)) { 232 Spliterator.OfInt unorderedSkipLimitSpliterator( 233 Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) { 234 if (skip <= sizeIfKnown) { 235 // Use just the limit if the number of elements 236 // to skip is <= the known pipeline size 237 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 238 skip = 0; 239 } 240 return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit); 241 } 242 243 @Override 244 <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, 245 Spliterator<P_IN> spliterator) { 246 long size = helper.exactOutputSizeIfKnown(spliterator); 247 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 248 return new StreamSpliterators.SliceSpliterator.OfInt( 249 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 250 skip, 251 calcSliceFence(skip, limit)); 252 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 253 return unorderedSkipLimitSpliterator( 254 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 255 skip, limit, size); 256 } 257 else { 258 return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit). 259 invoke().spliterator(); 260 } 261 } 262 263 @Override 264 <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 265 Spliterator<P_IN> spliterator, 266 IntFunction<Integer[]> generator) { 267 long size = helper.exactOutputSizeIfKnown(spliterator); 268 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 269 // Because the pipeline is SIZED the slice spliterator 270 // can be created from the source, this requires matching 271 // to shape of the source, and is potentially more efficient 272 // than creating the slice spliterator from the pipeline 273 // wrapping spliterator 274 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 275 return Nodes.collectInt(helper, s, true); 276 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 277 Spliterator.OfInt s = unorderedSkipLimitSpliterator( 278 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 279 skip, limit, size); 280 // Collect using this pipeline, which is empty and therefore 281 // can be used with the pipeline wrapping spliterator 282 // Note that we cannot create a slice spliterator from 283 // the source spliterator if the pipeline is not SIZED 284 return Nodes.collectInt(this, s, true); 285 } 286 else { 287 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 288 invoke(); 289 } 290 } 291 292 @Override 293 Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 294 return new Sink.ChainedInt(sink) { 295 long n = skip; 296 long m = limit >= 0 ? limit : Long.MAX_VALUE; 297 298 @Override 299 public void begin(long size) { 300 downstream.begin(calcSize(size, skip, m)); 301 } 302 303 @Override 304 public void accept(int t) { 305 if (n == 0) { 306 if (m > 0) { 307 m--; 308 downstream.accept(t); 309 } 310 } 311 else { 312 n--; 313 } 314 } 315 316 @Override 317 public boolean cancellationRequested() { 318 return m == 0 || downstream.cancellationRequested(); 319 } 320 }; 321 } 322 }; 323 } 324 325 /** 326 * Appends a "slice" operation to the provided LongStream. The slice 327 * operation may be may be skip-only, limit-only, or skip-and-limit. 328 * 329 * @param upstream A LongStream 330 * @param skip The number of elements to skip. Must be >= 0. 331 * @param limit The maximum size of the resulting stream, or -1 if no limit 332 * is to be imposed 333 */ 334 public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream, 335 long skip, long limit) { 336 if (skip < 0) 337 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 338 339 return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE, 340 flags(limit)) { 341 Spliterator.OfLong unorderedSkipLimitSpliterator( 342 Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) { 343 if (skip <= sizeIfKnown) { 344 // Use just the limit if the number of elements 345 // to skip is <= the known pipeline size 346 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 347 skip = 0; 348 } 349 return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit); 350 } 351 352 @Override 353 <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, 354 Spliterator<P_IN> spliterator) { 355 long size = helper.exactOutputSizeIfKnown(spliterator); 356 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 357 return new StreamSpliterators.SliceSpliterator.OfLong( 358 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 359 skip, 360 calcSliceFence(skip, limit)); 361 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 362 return unorderedSkipLimitSpliterator( 363 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 364 skip, limit, size); 365 } 366 else { 367 return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit). 368 invoke().spliterator(); 369 } 370 } 371 372 @Override 373 <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, 374 Spliterator<P_IN> spliterator, 375 IntFunction<Long[]> generator) { 376 long size = helper.exactOutputSizeIfKnown(spliterator); 377 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 378 // Because the pipeline is SIZED the slice spliterator 379 // can be created from the source, this requires matching 380 // to shape of the source, and is potentially more efficient 381 // than creating the slice spliterator from the pipeline 382 // wrapping spliterator 383 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 384 return Nodes.collectLong(helper, s, true); 385 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 386 Spliterator.OfLong s = unorderedSkipLimitSpliterator( 387 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 388 skip, limit, size); 389 // Collect using this pipeline, which is empty and therefore 390 // can be used with the pipeline wrapping spliterator 391 // Note that we cannot create a slice spliterator from 392 // the source spliterator if the pipeline is not SIZED 393 return Nodes.collectLong(this, s, true); 394 } 395 else { 396 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 397 invoke(); 398 } 399 } 400 401 @Override 402 Sink<Long> opWrapSink(int flags, Sink<Long> sink) { 403 return new Sink.ChainedLong(sink) { 404 long n = skip; 405 long m = limit >= 0 ? limit : Long.MAX_VALUE; 406 407 @Override 408 public void begin(long size) { 409 downstream.begin(calcSize(size, skip, m)); 410 } 411 412 @Override 413 public void accept(long t) { 414 if (n == 0) { 415 if (m > 0) { 416 m--; 417 downstream.accept(t); 418 } 419 } 420 else { 421 n--; 422 } 423 } 424 425 @Override 426 public boolean cancellationRequested() { 427 return m == 0 || downstream.cancellationRequested(); 428 } 429 }; 430 } 431 }; 432 } 433 434 /** 435 * Appends a "slice" operation to the provided DoubleStream. The slice 436 * operation may be may be skip-only, limit-only, or skip-and-limit. 437 * 438 * @param upstream A DoubleStream 439 * @param skip The number of elements to skip. Must be >= 0. 440 * @param limit The maximum size of the resulting stream, or -1 if no limit 441 * is to be imposed 442 */ 443 public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream, 444 long skip, long limit) { 445 if (skip < 0) 446 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 447 448 return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE, 449 flags(limit)) { 450 Spliterator.OfDouble unorderedSkipLimitSpliterator( 451 Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) { 452 if (skip <= sizeIfKnown) { 453 // Use just the limit if the number of elements 454 // to skip is <= the known pipeline size 455 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 456 skip = 0; 457 } 458 return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit); 459 } 460 461 @Override 462 <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, 463 Spliterator<P_IN> spliterator) { 464 long size = helper.exactOutputSizeIfKnown(spliterator); 465 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 466 return new StreamSpliterators.SliceSpliterator.OfDouble( 467 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 468 skip, 469 calcSliceFence(skip, limit)); 470 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 471 return unorderedSkipLimitSpliterator( 472 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 473 skip, limit, size); 474 } 475 else { 476 return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit). 477 invoke().spliterator(); 478 } 479 } 480 481 @Override 482 <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 483 Spliterator<P_IN> spliterator, 484 IntFunction<Double[]> generator) { 485 long size = helper.exactOutputSizeIfKnown(spliterator); 486 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 487 // Because the pipeline is SIZED the slice spliterator 488 // can be created from the source, this requires matching 489 // to shape of the source, and is potentially more efficient 490 // than creating the slice spliterator from the pipeline 491 // wrapping spliterator 492 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 493 return Nodes.collectDouble(helper, s, true); 494 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 495 Spliterator.OfDouble s = unorderedSkipLimitSpliterator( 496 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 497 skip, limit, size); 498 // Collect using this pipeline, which is empty and therefore 499 // can be used with the pipeline wrapping spliterator 500 // Note that we cannot create a slice spliterator from 501 // the source spliterator if the pipeline is not SIZED 502 return Nodes.collectDouble(this, s, true); 503 } 504 else { 505 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 506 invoke(); 507 } 508 } 509 510 @Override 511 Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 512 return new Sink.ChainedDouble(sink) { 513 long n = skip; 514 long m = limit >= 0 ? limit : Long.MAX_VALUE; 515 516 @Override 517 public void begin(long size) { 518 downstream.begin(calcSize(size, skip, m)); 519 } 520 521 @Override 522 public void accept(double t) { 523 if (n == 0) { 524 if (m > 0) { 525 m--; 526 downstream.accept(t); 527 } 528 } 529 else { 530 n--; 531 } 532 } 533 534 @Override 535 public boolean cancellationRequested() { 536 return m == 0 || downstream.cancellationRequested(); 537 } 538 }; 539 } 540 }; 541 } 542 543 private static int flags(long limit) { 544 return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0); 545 } 546 547 /** 548 * {@code ForkJoinTask} implementing slice computation. 549 * 550 * @param <P_IN> Input element type to the stream pipeline 551 * @param <P_OUT> Output element type from the stream pipeline 552 */ 553 @SuppressWarnings("serial") 554 private static final class SliceTask<P_IN, P_OUT> 555 extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> { 556 private final AbstractPipeline<P_OUT, P_OUT, ?> op; 557 private final IntFunction<P_OUT[]> generator; 558 private final long targetOffset, targetSize; 559 private long thisNodeSize; 560 561 private volatile boolean completed; 562 563 SliceTask(AbstractPipeline<?, P_OUT, ?> op, 564 PipelineHelper<P_OUT> helper, 565 Spliterator<P_IN> spliterator, 566 IntFunction<P_OUT[]> generator, 567 long offset, long size) { 568 super(helper, spliterator); 569 this.op = (AbstractPipeline<P_OUT, P_OUT, ?>) op; 570 this.generator = generator; 571 this.targetOffset = offset; 572 this.targetSize = size; 573 } 574 575 SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { 576 super(parent, spliterator); 577 this.op = parent.op; 578 this.generator = parent.generator; 579 this.targetOffset = parent.targetOffset; 580 this.targetSize = parent.targetSize; 581 } 582 583 @Override 584 protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { 585 return new SliceTask<>(this, spliterator); 586 } 587 588 @Override 589 protected final Node<P_OUT> getEmptyResult() { 590 return Nodes.emptyNode(op.getOutputShape()); 591 } 592 593 @Override 594 protected final Node<P_OUT> doLeaf() { 595 if (isRoot()) { 596 long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags) 597 ? op.exactOutputSizeIfKnown(spliterator) 598 : -1; 599 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator); 600 Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb); 601 helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator); 602 // There is no need to truncate since the op performs the 603 // skipping and limiting of elements 604 return nb.build(); 605 } 606 else { 607 Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator), 608 spliterator).build(); 609 thisNodeSize = node.count(); 610 completed = true; 611 spliterator = null; 612 return node; 613 } 614 } 615 616 @Override 617 public final void onCompletion(CountedCompleter<?> caller) { 618 if (!isLeaf()) { 619 Node<P_OUT> result; 620 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; 621 if (canceled) { 622 thisNodeSize = 0; 623 result = getEmptyResult(); 624 } 625 else if (thisNodeSize == 0) 626 result = getEmptyResult(); 627 else if (leftChild.thisNodeSize == 0) 628 result = rightChild.getLocalResult(); 629 else { 630 result = Nodes.conc(op.getOutputShape(), 631 leftChild.getLocalResult(), rightChild.getLocalResult()); 632 } 633 setLocalResult(isRoot() ? doTruncate(result) : result); 634 completed = true; 635 } 636 if (targetSize >= 0 637 && !isRoot() 638 && isLeftCompleted(targetOffset + targetSize)) 639 cancelLaterNodes(); 640 641 super.onCompletion(caller); 642 } 643 644 @Override 645 protected void cancel() { 646 super.cancel(); 647 if (completed) 648 setLocalResult(getEmptyResult()); 649 } 650 651 private Node<P_OUT> doTruncate(Node<P_OUT> input) { 652 long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize; 653 return input.truncate(targetOffset, to, generator); 654 } 655 656 /** 657 * Determine if the number of completed elements in this node and nodes 658 * to the left of this node is greater than or equal to the target size. 659 * 660 * @param target the target size 661 * @return true if the number of elements is greater than or equal to 662 * the target size, otherwise false. 663 */ 664 private boolean isLeftCompleted(long target) { 665 long size = completed ? thisNodeSize : completedSize(target); 666 if (size >= target) 667 return true; 668 for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this; 669 parent != null; 670 node = parent, parent = parent.getParent()) { 671 if (node == parent.rightChild) { 672 SliceTask<P_IN, P_OUT> left = parent.leftChild; 673 if (left != null) { 674 size += left.completedSize(target); 675 if (size >= target) 676 return true; 677 } 678 } 679 } 680 return size >= target; 681 } 682 683 /** 684 * Compute the number of completed elements in this node. 685 * <p> 686 * Computation terminates if all nodes have been processed or the 687 * number of completed elements is greater than or equal to the target 688 * size. 689 * 690 * @param target the target size 691 * @return return the number of completed elements 692 */ 693 private long completedSize(long target) { 694 if (completed) 695 return thisNodeSize; 696 else { 697 SliceTask<P_IN, P_OUT> left = leftChild; 698 SliceTask<P_IN, P_OUT> right = rightChild; 699 if (left == null || right == null) { 700 // must be completed 701 return thisNodeSize; 702 } 703 else { 704 long leftSize = left.completedSize(target); 705 return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target); 706 } 707 } 708 } 709 } 710 }