/* * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.util.stream; import java.util.Comparator; import java.util.Objects; import java.util.Spliterator; import java.util.concurrent.CountedCompleter; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.DoublePredicate; import java.util.function.IntConsumer; import java.util.function.IntFunction; import java.util.function.IntPredicate; import java.util.function.LongConsumer; import java.util.function.LongPredicate; import java.util.function.Predicate; /** * Factory for instances of a takeWhile and dropWhile operations * that produce subsequences of their input stream. * * @since 9 */ final class WhileOps { static final int TAKE_FLAGS = StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SHORT_CIRCUIT; static final int DROP_FLAGS = StreamOpFlag.NOT_SIZED; /** * Appends a "takeWhile" operation to the provided Stream. * * @param the type of both input and output elements * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt taking. */ static Stream makeTakeWhileRef(AbstractPipeline upstream, Predicate predicate) { Objects.requireNonNull(predicate); return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE, TAKE_FLAGS) { @Override Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) .spliterator(); } else { return new UnorderedWhileSpliterator.OfRef.Taking<>( helper.wrapSpliterator(spliterator), false, predicate); } } @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { return new TakeWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedReference(sink) { boolean take = true; @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(T t) { if (take = predicate.test(t)) { downstream.accept(t); } } @Override public boolean cancellationRequested() { return !take || downstream.cancellationRequested(); } }; } }; } /** * Appends a "takeWhile" operation to the provided IntStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt taking. */ static IntStream makeTakeWhileInt(AbstractPipeline upstream, IntPredicate predicate) { Objects.requireNonNull(predicate); return new IntPipeline.StatefulOp(upstream, StreamShape.INT_VALUE, TAKE_FLAGS) { @Override Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Integer[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfInt.Taking( (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); } } @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { return new TakeWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedInt(sink) { boolean take = true; @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(int t) { if (take = predicate.test(t)) { downstream.accept(t); } } @Override public boolean cancellationRequested() { return !take || downstream.cancellationRequested(); } }; } }; } /** * Appends a "takeWhile" operation to the provided LongStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt taking. */ static LongStream makeTakeWhileLong(AbstractPipeline upstream, LongPredicate predicate) { Objects.requireNonNull(predicate); return new LongPipeline.StatefulOp(upstream, StreamShape.LONG_VALUE, TAKE_FLAGS) { @Override Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Long[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfLong.Taking( (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); } } @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { return new TakeWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedLong(sink) { boolean take = true; @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(long t) { if (take = predicate.test(t)) { downstream.accept(t); } } @Override public boolean cancellationRequested() { return !take || downstream.cancellationRequested(); } }; } }; } /** * Appends a "takeWhile" operation to the provided DoubleStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt taking. */ static DoubleStream makeTakeWhileDouble(AbstractPipeline upstream, DoublePredicate predicate) { Objects.requireNonNull(predicate); return new DoublePipeline.StatefulOp(upstream, StreamShape.DOUBLE_VALUE, TAKE_FLAGS) { @Override Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Double[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfDouble.Taking( (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); } } @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { return new TakeWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink opWrapSink(int flags, Sink sink) { return new Sink.ChainedDouble(sink) { boolean take = true; @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(double t) { if (take = predicate.test(t)) { downstream.accept(t); } } @Override public boolean cancellationRequested() { return !take || downstream.cancellationRequested(); } }; } }; } /** * A specialization for the dropWhile operation that controls if * elements to be dropped are counted and passed downstream. *

* This specialization is utilized by the {@link TakeWhileTask} for * pipelines that are ordered. In such cases elements cannot be dropped * until all elements have been collected. * * @param the type of both input and output elements */ interface DropWhileOp { /** * Accepts a {@code Sink} which will receive the results of this * dropWhile operation, and return a {@code DropWhileSink} which * accepts * elements and which performs the dropWhile operation passing the * results to the provided {@code Sink}. * * @param sink sink to which elements should be sent after processing * @param retainAndCountDroppedElements true if elements to be dropped * are counted and passed to the sink, otherwise such elements * are actually dropped and not passed to the sink. * @return a dropWhile sink */ DropWhileSink opWrapSink(Sink sink, boolean retainAndCountDroppedElements); } /** * A specialization for a dropWhile sink. * * @param the type of both input and output elements */ interface DropWhileSink extends Sink { /** * @return the could of elements that would have been dropped and * instead were passed downstream. */ long getDropCount(); } /** * Appends a "dropWhile" operation to the provided Stream. * * @param the type of both input and output elements * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt dropping. */ static Stream makeDropWhileRef(AbstractPipeline upstream, Predicate predicate) { Objects.requireNonNull(predicate); class Op extends ReferencePipeline.StatefulOp implements DropWhileOp { public Op(AbstractPipeline upstream, StreamShape inputShape, int opFlags) { super(upstream, inputShape, opFlags); } @Override Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) .spliterator(); } else { return new UnorderedWhileSpliterator.OfRef.Dropping<>( helper.wrapSpliterator(spliterator), false, predicate); } } @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { return new DropWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink opWrapSink(int flags, Sink sink) { return opWrapSink(sink, false); } public DropWhileSink opWrapSink(Sink sink, boolean retainAndCountDroppedElements) { class OpSink extends Sink.ChainedReference implements DropWhileSink { long dropCount; boolean take; OpSink() { super(sink); } @Override public void accept(T t) { boolean takeElement = take || (take = !predicate.test(t)); // If ordered and element is dropped increment index // for possible future truncation if (retainAndCountDroppedElements && !takeElement) dropCount++; // If ordered need to process element, otherwise // skip if element is dropped if (retainAndCountDroppedElements || takeElement) downstream.accept(t); } @Override public long getDropCount() { return dropCount; } } return new OpSink(); } } return new Op(upstream, StreamShape.REFERENCE, DROP_FLAGS); } /** * Appends a "dropWhile" operation to the provided IntStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt dropping. */ static IntStream makeDropWhileInt(AbstractPipeline upstream, IntPredicate predicate) { Objects.requireNonNull(predicate); class Op extends IntPipeline.StatefulOp implements DropWhileOp { public Op(AbstractPipeline upstream, StreamShape inputShape, int opFlags) { super(upstream, inputShape, opFlags); } @Override Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Integer[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfInt.Dropping( (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); } } @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { return new DropWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink opWrapSink(int flags, Sink sink) { return opWrapSink(sink, false); } public DropWhileSink opWrapSink(Sink sink, boolean retainAndCountDroppedElements) { class OpSink extends Sink.ChainedInt implements DropWhileSink { long dropCount; boolean take; OpSink() { super(sink); } @Override public void accept(int t) { boolean takeElement = take || (take = !predicate.test(t)); // If ordered and element is dropped increment index // for possible future truncation if (retainAndCountDroppedElements && !takeElement) dropCount++; // If ordered need to process element, otherwise // skip if element is dropped if (retainAndCountDroppedElements || takeElement) downstream.accept(t); } @Override public long getDropCount() { return dropCount; } } return new OpSink(); } } return new Op(upstream, StreamShape.INT_VALUE, DROP_FLAGS); } /** * Appends a "dropWhile" operation to the provided LongStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt dropping. */ static LongStream makeDropWhileLong(AbstractPipeline upstream, LongPredicate predicate) { Objects.requireNonNull(predicate); class Op extends LongPipeline.StatefulOp implements DropWhileOp { public Op(AbstractPipeline upstream, StreamShape inputShape, int opFlags) { super(upstream, inputShape, opFlags); } @Override Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Long[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfLong.Dropping( (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); } } @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { return new DropWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink opWrapSink(int flags, Sink sink) { return opWrapSink(sink, false); } public DropWhileSink opWrapSink(Sink sink, boolean retainAndCountDroppedElements) { class OpSink extends Sink.ChainedLong implements DropWhileSink { long dropCount; boolean take; OpSink() { super(sink); } @Override public void accept(long t) { boolean takeElement = take || (take = !predicate.test(t)); // If ordered and element is dropped increment index // for possible future truncation if (retainAndCountDroppedElements && !takeElement) dropCount++; // If ordered need to process element, otherwise // skip if element is dropped if (retainAndCountDroppedElements || takeElement) downstream.accept(t); } @Override public long getDropCount() { return dropCount; } } return new OpSink(); } } return new Op(upstream, StreamShape.LONG_VALUE, DROP_FLAGS); } /** * Appends a "dropWhile" operation to the provided DoubleStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt dropping. */ static DoubleStream makeDropWhileDouble(AbstractPipeline upstream, DoublePredicate predicate) { Objects.requireNonNull(predicate); class Op extends DoublePipeline.StatefulOp implements DropWhileOp { public Op(AbstractPipeline upstream, StreamShape inputShape, int opFlags) { super(upstream, inputShape, opFlags); } @Override Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Double[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfDouble.Dropping( (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); } } @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) { return new DropWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink opWrapSink(int flags, Sink sink) { return opWrapSink(sink, false); } public DropWhileSink opWrapSink(Sink sink, boolean retainAndCountDroppedElements) { class OpSink extends Sink.ChainedDouble implements DropWhileSink { long dropCount; boolean take; OpSink() { super(sink); } @Override public void accept(double t) { boolean takeElement = take || (take = !predicate.test(t)); // If ordered and element is dropped increment index // for possible future truncation if (retainAndCountDroppedElements && !takeElement) dropCount++; // If ordered need to process element, otherwise // skip if element is dropped if (retainAndCountDroppedElements || takeElement) downstream.accept(t); } @Override public long getDropCount() { return dropCount; } } return new OpSink(); } } return new Op(upstream, StreamShape.DOUBLE_VALUE, DROP_FLAGS); } // /** * A spliterator supporting takeWhile and dropWhile operations over an * underlying spliterator whose covered elements have no encounter order. *

* Concrete subclasses of this spliterator support reference and primitive * types for takeWhile and dropWhile. *

* For the takeWhile operation if during traversal taking completes then * taking is cancelled globally for the splitting and traversal of all * related spliterators. * Cancellation is governed by a shared {@link AtomicBoolean} instance. A * spliterator in the process of taking when cancellation occurs will also * be cancelled but not necessarily immediately. To reduce contention on * the {@link AtomicBoolean} instance, cancellation make be acted on after * a small number of additional elements have been traversed. *

* For the dropWhile operation if during traversal dropping completes for * some, but not all elements, then it is cancelled globally for the * traversal of all related spliterators (splitting is not cancelled). * Cancellation is governed in the same manner as for the takeWhile * operation. * * @param the type of elements returned by this spliterator * @param the type of the spliterator */ abstract static class UnorderedWhileSpliterator> implements Spliterator { // Power of two constant minus one used for modulus of count static final int CANCEL_CHECK_COUNT = (1 << 6) - 1; // The underlying spliterator final T_SPLITR s; // True if no splitting should be performed, if true then // this spliterator may be used for an underlying spliterator whose // covered elements have an encounter order // See use in stream take/dropWhile default default methods final boolean noSplitting; // True when operations are cancelled for all related spliterators // For taking, spliterators cannot split or traversed // For dropping, spliterators cannot be traversed final AtomicBoolean cancel; // True while taking or dropping should be performed when traversing boolean takeOrDrop = true; // The count of elements traversed int count; UnorderedWhileSpliterator(T_SPLITR s, boolean noSplitting) { this.s = s; this.noSplitting = noSplitting; this.cancel = new AtomicBoolean(); } UnorderedWhileSpliterator(T_SPLITR s, UnorderedWhileSpliterator parent) { this.s = s; this.noSplitting = parent.noSplitting; this.cancel = parent.cancel; } @Override public long estimateSize() { return s.estimateSize(); } @Override public int characteristics() { // Size is not known return s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED); } @Override public long getExactSizeIfKnown() { return -1L; } @Override public Comparator getComparator() { return s.getComparator(); } @Override public T_SPLITR trySplit() { @SuppressWarnings("unchecked") T_SPLITR ls = noSplitting ? null : (T_SPLITR) s.trySplit(); return ls != null ? makeSpliterator(ls) : null; } boolean checkCancelOnCount() { return count != 0 || !cancel.get(); } abstract T_SPLITR makeSpliterator(T_SPLITR s); abstract static class OfRef extends UnorderedWhileSpliterator> implements Consumer { final Predicate p; T t; OfRef(Spliterator s, boolean noSplitting, Predicate p) { super(s, noSplitting); this.p = p; } OfRef(Spliterator s, OfRef parent) { super(s, parent); this.p = parent.p; } @Override public void accept(T t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } static final class Taking extends OfRef { Taking(Spliterator s, boolean noSplitting, Predicate p) { super(s, noSplitting, p); } Taking(Spliterator s, Taking parent) { super(s, parent); } @Override public boolean tryAdvance(Consumer action) { boolean test = true; if (takeOrDrop && // If can take checkCancelOnCount() && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Spliterator trySplit() { // Do not split if all operations are cancelled return cancel.get() ? null : super.trySplit(); } @Override Spliterator makeSpliterator(Spliterator s) { return new Taking<>(s, this); } } static final class Dropping extends OfRef { Dropping(Spliterator s, boolean noSplitting, Predicate p) { super(s, noSplitting, p); } Dropping(Spliterator s, Dropping parent) { super(s, parent); } @Override public boolean tryAdvance(Consumer action) { if (takeOrDrop) { takeOrDrop = false; boolean adv; boolean dropped = false; while ((adv = s.tryAdvance(this)) && // If advanced one element checkCancelOnCount() && // and if not cancelled p.test(t)) { // and test on element passes dropped = true; // then drop element } // Report advanced element, if any if (adv) { // Cancel all further dropping if one or more elements // were previously dropped if (dropped) cancel.set(true); action.accept(t); } return adv; } else { return s.tryAdvance(action); } } @Override Spliterator makeSpliterator(Spliterator s) { return new Dropping<>(s, this); } } } abstract static class OfInt extends UnorderedWhileSpliterator implements IntConsumer, Spliterator.OfInt { final IntPredicate p; int t; OfInt(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { super(s, noSplitting); this.p = p; } OfInt(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { super(s, parent); this.p = parent.p; } @Override public void accept(int t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } static final class Taking extends UnorderedWhileSpliterator.OfInt { Taking(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { super(s, noSplitting, p); } Taking(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { super(s, parent); } @Override public boolean tryAdvance(IntConsumer action) { boolean test = true; if (takeOrDrop && // If can take checkCancelOnCount() && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Spliterator.OfInt trySplit() { // Do not split if all operations are cancelled return cancel.get() ? null : super.trySplit(); } @Override Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { return new Taking(s, this); } } static final class Dropping extends UnorderedWhileSpliterator.OfInt { Dropping(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { super(s, noSplitting, p); } Dropping(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { super(s, parent); } @Override public boolean tryAdvance(IntConsumer action) { if (takeOrDrop) { takeOrDrop = false; boolean adv; boolean dropped = false; while ((adv = s.tryAdvance(this)) && // If advanced one element checkCancelOnCount() && // and if not cancelled p.test(t)) { // and test on element passes dropped = true; // then drop element } // Report advanced element, if any if (adv) { // Cancel all further dropping if one or more elements // were previously dropped if (dropped) cancel.set(true); action.accept(t); } return adv; } else { return s.tryAdvance(action); } } @Override Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { return new Dropping(s, this); } } } abstract static class OfLong extends UnorderedWhileSpliterator implements LongConsumer, Spliterator.OfLong { final LongPredicate p; long t; OfLong(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { super(s, noSplitting); this.p = p; } OfLong(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { super(s, parent); this.p = parent.p; } @Override public void accept(long t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } static final class Taking extends UnorderedWhileSpliterator.OfLong { Taking(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { super(s, noSplitting, p); } Taking(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { super(s, parent); } @Override public boolean tryAdvance(LongConsumer action) { boolean test = true; if (takeOrDrop && // If can take checkCancelOnCount() && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Spliterator.OfLong trySplit() { // Do not split if all operations are cancelled return cancel.get() ? null : super.trySplit(); } @Override Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { return new Taking(s, this); } } static final class Dropping extends UnorderedWhileSpliterator.OfLong { Dropping(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { super(s, noSplitting, p); } Dropping(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { super(s, parent); } @Override public boolean tryAdvance(LongConsumer action) { if (takeOrDrop) { takeOrDrop = false; boolean adv; boolean dropped = false; while ((adv = s.tryAdvance(this)) && // If advanced one element checkCancelOnCount() && // and if not cancelled p.test(t)) { // and test on element passes dropped = true; // then drop element } // Report advanced element, if any if (adv) { // Cancel all further dropping if one or more elements // were previously dropped if (dropped) cancel.set(true); action.accept(t); } return adv; } else { return s.tryAdvance(action); } } @Override Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { return new Dropping(s, this); } } } abstract static class OfDouble extends UnorderedWhileSpliterator implements DoubleConsumer, Spliterator.OfDouble { final DoublePredicate p; double t; OfDouble(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { super(s, noSplitting); this.p = p; } OfDouble(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { super(s, parent); this.p = parent.p; } @Override public void accept(double t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } static final class Taking extends UnorderedWhileSpliterator.OfDouble { Taking(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { super(s, noSplitting, p); } Taking(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { super(s, parent); } @Override public boolean tryAdvance(DoubleConsumer action) { boolean test = true; if (takeOrDrop && // If can take checkCancelOnCount() && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Spliterator.OfDouble trySplit() { // Do not split if all operations are cancelled return cancel.get() ? null : super.trySplit(); } @Override Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { return new Taking(s, this); } } static final class Dropping extends UnorderedWhileSpliterator.OfDouble { Dropping(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { super(s, noSplitting, p); } Dropping(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { super(s, parent); } @Override public boolean tryAdvance(DoubleConsumer action) { if (takeOrDrop) { takeOrDrop = false; boolean adv; boolean dropped = false; while ((adv = s.tryAdvance(this)) && // If advanced one element checkCancelOnCount() && // and if not cancelled p.test(t)) { // and test on element passes dropped = true; // then drop element } // Report advanced element, if any if (adv) { // Cancel all further dropping if one or more elements // were previously dropped if (dropped) cancel.set(true); action.accept(t); } return adv; } else { return s.tryAdvance(action); } } @Override Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { return new Dropping(s, this); } } } } // /** * {@code ForkJoinTask} implementing takeWhile computation. *

* If the pipeline has encounter order then all tasks to the right of * a task where traversal was short-circuited are cancelled. * The results of completed (and cancelled) tasks are discarded. * The result of merging a short-circuited left task and right task (which * may or may not be short-circuited) is that left task. *

* If the pipeline has no encounter order then all tasks to the right of * a task where traversal was short-circuited are cancelled. * The results of completed (and possibly cancelled) tasks are not * discarded, as there is no need to throw away computed results. * The result of merging does not change if a left task was * short-circuited. * No attempt is made, once a leaf task stopped taking, for it to cancel * all other tasks, and further more, short-circuit the computation with its * result. * * @param Input element type to the stream pipeline * @param Output element type from the stream pipeline */ @SuppressWarnings("serial") private static final class TakeWhileTask extends AbstractShortCircuitTask, TakeWhileTask> { private final AbstractPipeline op; private final IntFunction generator; private final boolean isOrdered; private long thisNodeSize; // True if a short-circuited private boolean shortCircuited; // True if completed, must be set after the local result private volatile boolean completed; TakeWhileTask(AbstractPipeline op, PipelineHelper helper, Spliterator spliterator, IntFunction generator) { super(helper, spliterator); this.op = op; this.generator = generator; this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); } TakeWhileTask(TakeWhileTask parent, Spliterator spliterator) { super(parent, spliterator); this.op = parent.op; this.generator = parent.generator; this.isOrdered = parent.isOrdered; } @Override protected TakeWhileTask makeChild(Spliterator spliterator) { return new TakeWhileTask<>(this, spliterator); } @Override protected final Node getEmptyResult() { return Nodes.emptyNode(op.getOutputShape()); } @Override protected final Node doLeaf() { Node.Builder builder = helper.makeNodeBuilder(-1, generator); Sink s = op.opWrapSink(helper.getStreamAndOpFlags(), builder); if (shortCircuited = helper.copyIntoWithCancel(helper.wrapSink(s), spliterator)) { // Cancel later nodes if the predicate returned false // during traversal cancelLaterNodes(); } Node node = builder.build(); thisNodeSize = node.count(); return node; } @Override public final void onCompletion(CountedCompleter caller) { if (!isLeaf()) { Node result; shortCircuited = leftChild.shortCircuited | rightChild.shortCircuited; if (isOrdered && canceled) { thisNodeSize = 0; result = getEmptyResult(); } else if (isOrdered && leftChild.shortCircuited) { // If taking finished on the left node then // use the left node result thisNodeSize = leftChild.thisNodeSize; result = leftChild.getLocalResult(); } else { thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; result = merge(); } setLocalResult(result); } completed = true; super.onCompletion(caller); } Node merge() { if (leftChild.thisNodeSize == 0) { // If the left node size is 0 then // use the right node result return rightChild.getLocalResult(); } else if (rightChild.thisNodeSize == 0) { // If the right node size is 0 then // use the left node result return leftChild.getLocalResult(); } else { // Combine the left and right nodes return Nodes.conc(op.getOutputShape(), leftChild.getLocalResult(), rightChild.getLocalResult()); } } @Override protected void cancel() { super.cancel(); if (isOrdered && completed) // If the task is completed then clear the result, if any // to aid GC setLocalResult(getEmptyResult()); } } /** * {@code ForkJoinTask} implementing dropWhile computation. *

* If the pipeline has encounter order then each leaf task will not * drop elements but will obtain a count of the elements that would have * been otherwise dropped. That count is used as an index to track * elements to be dropped. Merging will update the index so it corresponds * to the index that is the end of the global prefix of elements to be * dropped. The root is truncated according to that index. *

* If the pipeline has no encounter order then each leaf task will drop * elements. Leaf tasks are ordinarily merged. No truncation of the root * node is required. * No attempt is made, once a leaf task stopped dropping, for it to cancel * all other tasks, and further more, short-circuit the computation with * its result. * * @param Input element type to the stream pipeline * @param Output element type from the stream pipeline */ @SuppressWarnings("serial") private static final class DropWhileTask extends AbstractTask, DropWhileTask> { private final AbstractPipeline op; private final IntFunction generator; private final boolean isOrdered; private long thisNodeSize; // The index from which elements of the node should be taken // i.e. the node should be truncated from [takeIndex, thisNodeSize) // Equivalent to the count of dropped elements private long index; DropWhileTask(AbstractPipeline op, PipelineHelper helper, Spliterator spliterator, IntFunction generator) { super(helper, spliterator); assert op instanceof DropWhileOp; this.op = op; this.generator = generator; this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); } DropWhileTask(DropWhileTask parent, Spliterator spliterator) { super(parent, spliterator); this.op = parent.op; this.generator = parent.generator; this.isOrdered = parent.isOrdered; } @Override protected DropWhileTask makeChild(Spliterator spliterator) { return new DropWhileTask<>(this, spliterator); } @Override protected final Node doLeaf() { boolean isChild = !isRoot(); // If this not the root and pipeline is ordered and size is known // then pre-size the builder long sizeIfKnown = isChild && isOrdered && StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags) ? op.exactOutputSizeIfKnown(spliterator) : -1; Node.Builder builder = helper.makeNodeBuilder(sizeIfKnown, generator); @SuppressWarnings("unchecked") DropWhileOp dropOp = (DropWhileOp) op; // If this leaf is the root then there is no merging on completion // and there is no need to retain dropped elements DropWhileSink s = dropOp.opWrapSink(builder, isOrdered && isChild); helper.wrapAndCopyInto(s, spliterator); Node node = builder.build(); thisNodeSize = node.count(); index = s.getDropCount(); return node; } @Override public final void onCompletion(CountedCompleter caller) { if (!isLeaf()) { if (isOrdered) { index = leftChild.index; // If a contiguous sequence of dropped elements // include those of the right node, if any if (index == leftChild.thisNodeSize) index += rightChild.index; } thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; Node result = merge(); setLocalResult(isRoot() ? doTruncate(result) : result); } super.onCompletion(caller); } private Node merge() { if (leftChild.thisNodeSize == 0) { // If the left node size is 0 then // use the right node result return rightChild.getLocalResult(); } else if (rightChild.thisNodeSize == 0) { // If the right node size is 0 then // use the left node result return leftChild.getLocalResult(); } else { // Combine the left and right nodes return Nodes.conc(op.getOutputShape(), leftChild.getLocalResult(), rightChild.getLocalResult()); } } private Node doTruncate(Node input) { return isOrdered ? input.truncate(index, input.count(), generator) : input; } } }