< prev index next >
src/java.base/share/classes/java/util/stream/SortedOps.java
Print this page
*** 91,109 ****
--- 91,313 ----
*/
static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
return new OfDouble(upstream);
}
+ static class Limiter<T> {
+ private final T[] data;
+ private final int limit;
+ private final Comparator<? super T> comparator;
+ private int size;
+ private boolean initial = true;
+
+ /**
+ * @param limit how many least elements to keep
+ * @param length length of the input if known, -1 otherwise
+ * @param comparator
+ */
+ @SuppressWarnings("unchecked")
+ Limiter(int limit, long length, Comparator<? super T> comparator) {
+ this.limit = limit;
+ this.comparator = comparator;
+ int dataSize = limit * 2;
+ if(length >= 0 && length < dataSize)
+ dataSize = (int) length;
+ this.data = (T[]) new Object[dataSize];
+ }
+
+ /**
+ * Accumulate new element
+ *
+ * @param t element to accumulate
+ *
+ * @return false if the element is definitely not included into result, so
+ * any bigger element could be skipped as well, or true if element
+ * will probably be included into result.
+ */
+ final boolean put(T t) {
+ int l = limit;
+ T[] d = data;
+ if (l == 1) {
+ // limit == 1 is the special case: exactly one least element is stored,
+ // no sorting is performed
+ if (initial) {
+ initial = false;
+ size = 1;
+ } else if (comparator.compare(t, d[0]) >= 0)
+ return false;
+ d[0] = t;
+ return true;
+ }
+ if (initial) {
+ if (size < d.length) {
+ d[size++] = t;
+ return true;
+ }
+ Arrays.sort(d, comparator);
+ initial = false;
+ size = l;
+ }
+ if (size == d.length) {
+ sortTail(d, l, size, comparator);
+ size = limit;
+ }
+ if (comparator.compare(t, d[l - 1]) < 0) {
+ d[size++] = t;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Merge other {@code Limiter} object into this (other object becomes unusable after that).
+ *
+ * @param other other object to merge
+ * @return this object
+ */
+ Limiter<T> putAll(Limiter<T> other) {
+ int i = 0, l = limit;
+ T[] od = other.data;
+ if (!other.initial) {
+ // sorted part
+ for (; i < l; i++) {
+ if (!put(od[i]))
+ break;
+ }
+ i = l;
+ }
+ int os = other.size;
+ for (; i < os; i++) {
+ put(od[i]);
+ }
+ return this;
+ }
+
+ /**
+ * Must be called after accumulation is finished.
+ */
+ void sort() {
+ if (limit == 1)
+ return;
+ if (initial)
+ Arrays.sort(data, 0, size, comparator);
+ else if (size > limit)
+ sortTail(data, limit, size, comparator);
+ if (size > limit)
+ size = limit;
+ }
+
+ /*
+ * Sort data[limit]..data[size] and merge with presorted data[0]..data[limit-1]
+ * Assuming that data[limit-1] is the biggest element
+ */
+ private static <T> void sortTail(T[] data, int limit, int size, Comparator<? super T> comparator) {
+ assert size > limit;
+ Arrays.sort(data, limit, size, comparator);
+ if (comparator.compare(data[size - 1], data[0]) < 0) {
+ // Common case: descending sequence
+ System.arraycopy(data, 0, data, size - limit, 2 * limit - size);
+ System.arraycopy(data, limit, data, 0, size - limit);
+ } else {
+ // Merge presorted 0..limit-1 and limit..size-1
+ @SuppressWarnings("unchecked")
+ T[] buf = (T[]) new Object[limit];
+ int i = 0, j = limit, k = 0;
+ // data[limit-1] is guaranteed to be the worst element, thus no need
+ // to check it
+ while (i < limit - 1 && k < limit && j < size) {
+ buf[k++] = comparator.compare(data[i], data[j]) <= 0 ? data[i++] : data[j++];
+ }
+ if (k < limit) {
+ System.arraycopy(data, i < limit - 1 ? i : j, data, k, limit - k);
+ }
+ System.arraycopy(buf, 0, data, 0, k);
+ }
+ }
+ }
+
+ /**
+ * Performs sorted().limit(maxSize) operation in single step for maxSize < SORTED_LIMIT_MAX_SIZE
+ */
+ private static final class SortedLimit<T> extends ReferencePipeline.StatefulOp<T, T> {
+ static final int SORTED_LIMIT_MAX_SIZE = 1000;
+
+ private final Comparator<? super T> comparator;
+ private final int limit;
+
+ SortedLimit(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator, int limit) {
+ super(upstream, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED);
+ this.comparator = comparator;
+ this.limit = limit;
+ }
+
+ @Override
+ public Sink<T> opWrapSink(int flags, Sink<T> sink) {
+ Objects.requireNonNull(sink);
+
+ return new AbstractRefSortingSink<T>(sink, comparator) {
+ private Limiter<T> limiter;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void begin(long size) {
+ limiter = new Limiter<>(limit, size, comparator);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void end() {
+ limiter.sort();
+ T[] array = limiter.data;
+ int offset = limiter.size;
+ downstream.begin(offset);
+ if (!cancellationWasRequested) {
+ for(int i = 0; i<offset; i++)
+ downstream.accept(array[i]);
+ }
+ else {
+ for(int i = 0; i<offset; i++) {
+ if (downstream.cancellationRequested()) break;
+ downstream.accept(array[i]);
+ }
+ }
+ downstream.end();
+ }
+
+ @Override
+ public void accept(T t) {
+ limiter.put(t);
+ }
+ };
+ }
+
+ @Override
+ public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
+ Spliterator<P_IN> spliterator,
+ IntFunction<T[]> generator) {
+ TerminalOp<T, Limiter<T>> op = ReduceOps.<T, Limiter<T>>makeRef(
+ () -> new Limiter<>(limit, -1, comparator), Limiter::put, Limiter::putAll);
+ Limiter<T> limiter = op.evaluateParallel(helper, spliterator);
+ // No reason to use parallelSort here as
+ // limit <= SORTED_LIMIT_MAX_SIZE < MIN_ARRAY_SORT_GRAN
+ limiter.sort();
+ T[] array = limiter.data;
+ int length = limiter.size;
+ return Nodes.node(Arrays.copyOfRange(array, 0, length));
+ }
+ }
+
/**
* Specialized subtype for sorting reference streams
*/
private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
/**
* Comparator used for sorting
*/
private final boolean isNaturalSort;
private final Comparator<? super T> comparator;
+ private boolean skip;
/**
* Sort using natural order of {@literal <T>} which must be
* {@code Comparable}.
*/
*** 128,143 ****
this.isNaturalSort = false;
this.comparator = Objects.requireNonNull(comparator);
}
@Override
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// If the input is already naturally sorted and this operation
// also naturally sorted then this is a no-op
! if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedRefSortingSink<>(sink, comparator);
else
return new RefSortingSink<>(sink, comparator);
--- 332,357 ----
this.isNaturalSort = false;
this.comparator = Objects.requireNonNull(comparator);
}
@Override
+ public final Stream<T> limit(long maxSize) {
+ if(maxSize < 0 || maxSize > SortedLimit.SORTED_LIMIT_MAX_SIZE)
+ return super.limit(maxSize);
+ skip = true;
+ if(maxSize == 0)
+ return super.limit(maxSize);
+ return new SortedLimit<>(this, comparator, (int)maxSize);
+ }
+
+ @Override
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// If the input is already naturally sorted and this operation
// also naturally sorted then this is a no-op
! if (skip || (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort))
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedRefSortingSink<>(sink, comparator);
else
return new RefSortingSink<>(sink, comparator);
*** 147,166 ****
public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
IntFunction<T[]> generator) {
// If the input is already naturally sorted and this operation
// naturally sorts then collect the output
! if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
return helper.evaluate(spliterator, false, generator);
}
else {
// @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
Arrays.parallelSort(flattenedData, comparator);
return Nodes.node(flattenedData);
}
}
}
/**
* Specialized subtype for sorting int streams.
*/
--- 361,392 ----
public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
IntFunction<T[]> generator) {
// If the input is already naturally sorted and this operation
// naturally sorts then collect the output
! if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) {
return helper.evaluate(spliterator, false, generator);
}
else {
// @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
Arrays.parallelSort(flattenedData, comparator);
return Nodes.node(flattenedData);
}
}
+
+ @Override
+ <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
+ // If the input is already naturally sorted and this operation
+ // naturally sorts then perfrom a no-op
+ if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) {
+ return helper.wrapSpliterator(spliterator);
+ }
+ else {
+ return super.opEvaluateParallelLazy(helper, spliterator);
+ }
+ }
}
/**
* Specialized subtype for sorting int streams.
*/
< prev index next >