--- old/src/java.base/share/classes/java/util/stream/SortedOps.java 2016-03-07 20:55:58.750723900 +0600 +++ new/src/java.base/share/classes/java/util/stream/SortedOps.java 2016-03-07 20:55:58.564200200 +0600 @@ -93,6 +93,209 @@ return new OfDouble(upstream); } + static class Limiter { + private final T[] data; + private final int limit; + private final Comparator comparator; + private int size; + private boolean initial = true; + + /** + * @param limit how many least elements to keep + * @param length length of the input if known, -1 otherwise + * @param comparator + */ + @SuppressWarnings("unchecked") + Limiter(int limit, long length, Comparator comparator) { + this.limit = limit; + this.comparator = comparator; + int dataSize = limit * 2; + if(length >= 0 && length < dataSize) + dataSize = (int) length; + this.data = (T[]) new Object[dataSize]; + } + + /** + * Accumulate new element + * + * @param t element to accumulate + * + * @return false if the element is definitely not included into result, so + * any bigger element could be skipped as well, or true if element + * will probably be included into result. + */ + final boolean put(T t) { + int l = limit; + T[] d = data; + if (l == 1) { + // limit == 1 is the special case: exactly one least element is stored, + // no sorting is performed + if (initial) { + initial = false; + size = 1; + } else if (comparator.compare(t, d[0]) >= 0) + return false; + d[0] = t; + return true; + } + if (initial) { + if (size < d.length) { + d[size++] = t; + return true; + } + Arrays.sort(d, comparator); + initial = false; + size = l; + } + if (size == d.length) { + sortTail(d, l, size, comparator); + size = limit; + } + if (comparator.compare(t, d[l - 1]) < 0) { + d[size++] = t; + return true; + } + return false; + } + + /** + * Merge other {@code Limiter} object into this (other object becomes unusable after that). + * + * @param other other object to merge + * @return this object + */ + Limiter putAll(Limiter other) { + int i = 0, l = limit; + T[] od = other.data; + if (!other.initial) { + // sorted part + for (; i < l; i++) { + if (!put(od[i])) + break; + } + i = l; + } + int os = other.size; + for (; i < os; i++) { + put(od[i]); + } + return this; + } + + /** + * Must be called after accumulation is finished. + */ + void sort() { + if (limit == 1) + return; + if (initial) + Arrays.sort(data, 0, size, comparator); + else if (size > limit) + sortTail(data, limit, size, comparator); + if (size > limit) + size = limit; + } + + /* + * Sort data[limit]..data[size] and merge with presorted data[0]..data[limit-1] + * Assuming that data[limit-1] is the biggest element + */ + private static void sortTail(T[] data, int limit, int size, Comparator comparator) { + assert size > limit; + Arrays.sort(data, limit, size, comparator); + if (comparator.compare(data[size - 1], data[0]) < 0) { + // Common case: descending sequence + System.arraycopy(data, 0, data, size - limit, 2 * limit - size); + System.arraycopy(data, limit, data, 0, size - limit); + } else { + // Merge presorted 0..limit-1 and limit..size-1 + @SuppressWarnings("unchecked") + T[] buf = (T[]) new Object[limit]; + int i = 0, j = limit, k = 0; + // data[limit-1] is guaranteed to be the worst element, thus no need + // to check it + while (i < limit - 1 && k < limit && j < size) { + buf[k++] = comparator.compare(data[i], data[j]) <= 0 ? data[i++] : data[j++]; + } + if (k < limit) { + System.arraycopy(data, i < limit - 1 ? i : j, data, k, limit - k); + } + System.arraycopy(buf, 0, data, 0, k); + } + } + } + + /** + * Performs sorted().limit(maxSize) operation in single step for maxSize < SORTED_LIMIT_MAX_SIZE + */ + private static final class SortedLimit extends ReferencePipeline.StatefulOp { + static final int SORTED_LIMIT_MAX_SIZE = 1000; + + private final Comparator comparator; + private final int limit; + + SortedLimit(AbstractPipeline upstream, Comparator comparator, int limit) { + super(upstream, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED); + this.comparator = comparator; + this.limit = limit; + } + + @Override + public Sink opWrapSink(int flags, Sink sink) { + Objects.requireNonNull(sink); + + return new AbstractRefSortingSink(sink, comparator) { + private Limiter limiter; + + @Override + @SuppressWarnings("unchecked") + public void begin(long size) { + limiter = new Limiter<>(limit, size, comparator); + } + + @Override + @SuppressWarnings("unchecked") + public void end() { + limiter.sort(); + T[] array = limiter.data; + int offset = limiter.size; + downstream.begin(offset); + if (!cancellationWasRequested) { + for(int i = 0; i Node opEvaluateParallel(PipelineHelper helper, + Spliterator spliterator, + IntFunction generator) { + TerminalOp> op = ReduceOps.>makeRef( + () -> new Limiter<>(limit, -1, comparator), Limiter::put, Limiter::putAll); + Limiter limiter = op.evaluateParallel(helper, spliterator); + // No reason to use parallelSort here as + // limit <= SORTED_LIMIT_MAX_SIZE < MIN_ARRAY_SORT_GRAN + limiter.sort(); + T[] array = limiter.data; + int length = limiter.size; + return Nodes.node(Arrays.copyOfRange(array, 0, length)); + } + } + /** * Specialized subtype for sorting reference streams */ @@ -102,6 +305,7 @@ */ private final boolean isNaturalSort; private final Comparator comparator; + private boolean skip; /** * Sort using natural order of {@literal } which must be @@ -130,12 +334,22 @@ } @Override + public final Stream limit(long maxSize) { + if(maxSize < 0 || maxSize > SortedLimit.SORTED_LIMIT_MAX_SIZE) + return super.limit(maxSize); + skip = true; + if(maxSize == 0) + return super.limit(maxSize); + return new SortedLimit<>(this, comparator, (int)maxSize); + } + + @Override public Sink opWrapSink(int flags, Sink sink) { Objects.requireNonNull(sink); // If the input is already naturally sorted and this operation // also naturally sorted then this is a no-op - if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) + if (skip || (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)) return sink; else if (StreamOpFlag.SIZED.isKnown(flags)) return new SizedRefSortingSink<>(sink, comparator); @@ -149,7 +363,7 @@ IntFunction generator) { // If the input is already naturally sorted and this operation // naturally sorts then collect the output - if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) { + if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) { return helper.evaluate(spliterator, false, generator); } else { @@ -159,6 +373,18 @@ return Nodes.node(flattenedData); } } + + @Override + Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) { + // If the input is already naturally sorted and this operation + // naturally sorts then perfrom a no-op + if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) { + return helper.wrapSpliterator(spliterator); + } + else { + return super.opEvaluateParallelLazy(helper, spliterator); + } + } } /**