< prev index next >

src/java.base/share/classes/java/util/stream/SortedOps.java

Print this page

        

*** 91,109 **** --- 91,313 ---- */ static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) { return new OfDouble(upstream); } + static class Limiter<T> { + private final T[] data; + private final int limit; + private final Comparator<? super T> comparator; + private int size; + private boolean initial = true; + + /** + * @param limit how many least elements to keep + * @param length length of the input if known, -1 otherwise + * @param comparator + */ + @SuppressWarnings("unchecked") + Limiter(int limit, long length, Comparator<? super T> comparator) { + this.limit = limit; + this.comparator = comparator; + int dataSize = limit * 2; + if(length >= 0 && length < dataSize) + dataSize = (int) length; + this.data = (T[]) new Object[dataSize]; + } + + /** + * Accumulate new element + * + * @param t element to accumulate + * + * @return false if the element is definitely not included into result, so + * any bigger element could be skipped as well, or true if element + * will probably be included into result. + */ + final boolean put(T t) { + int l = limit; + T[] d = data; + if (l == 1) { + // limit == 1 is the special case: exactly one least element is stored, + // no sorting is performed + if (initial) { + initial = false; + size = 1; + } else if (comparator.compare(t, d[0]) >= 0) + return false; + d[0] = t; + return true; + } + if (initial) { + if (size < d.length) { + d[size++] = t; + return true; + } + Arrays.sort(d, comparator); + initial = false; + size = l; + } + if (size == d.length) { + sortTail(d, l, size, comparator); + size = limit; + } + if (comparator.compare(t, d[l - 1]) < 0) { + d[size++] = t; + return true; + } + return false; + } + + /** + * Merge other {@code Limiter} object into this (other object becomes unusable after that). + * + * @param other other object to merge + * @return this object + */ + Limiter<T> putAll(Limiter<T> other) { + int i = 0, l = limit; + T[] od = other.data; + if (!other.initial) { + // sorted part + for (; i < l; i++) { + if (!put(od[i])) + break; + } + i = l; + } + int os = other.size; + for (; i < os; i++) { + put(od[i]); + } + return this; + } + + /** + * Must be called after accumulation is finished. + */ + void sort() { + if (limit == 1) + return; + if (initial) + Arrays.sort(data, 0, size, comparator); + else if (size > limit) + sortTail(data, limit, size, comparator); + if (size > limit) + size = limit; + } + + /* + * Sort data[limit]..data[size] and merge with presorted data[0]..data[limit-1] + * Assuming that data[limit-1] is the biggest element + */ + private static <T> void sortTail(T[] data, int limit, int size, Comparator<? super T> comparator) { + assert size > limit; + Arrays.sort(data, limit, size, comparator); + if (comparator.compare(data[size - 1], data[0]) < 0) { + // Common case: descending sequence + System.arraycopy(data, 0, data, size - limit, 2 * limit - size); + System.arraycopy(data, limit, data, 0, size - limit); + } else { + // Merge presorted 0..limit-1 and limit..size-1 + @SuppressWarnings("unchecked") + T[] buf = (T[]) new Object[limit]; + int i = 0, j = limit, k = 0; + // data[limit-1] is guaranteed to be the worst element, thus no need + // to check it + while (i < limit - 1 && k < limit && j < size) { + buf[k++] = comparator.compare(data[i], data[j]) <= 0 ? data[i++] : data[j++]; + } + if (k < limit) { + System.arraycopy(data, i < limit - 1 ? i : j, data, k, limit - k); + } + System.arraycopy(buf, 0, data, 0, k); + } + } + } + + /** + * Performs sorted().limit(maxSize) operation in single step for maxSize < SORTED_LIMIT_MAX_SIZE + */ + private static final class SortedLimit<T> extends ReferencePipeline.StatefulOp<T, T> { + static final int SORTED_LIMIT_MAX_SIZE = 1000; + + private final Comparator<? super T> comparator; + private final int limit; + + SortedLimit(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator, int limit) { + super(upstream, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED); + this.comparator = comparator; + this.limit = limit; + } + + @Override + public Sink<T> opWrapSink(int flags, Sink<T> sink) { + Objects.requireNonNull(sink); + + return new AbstractRefSortingSink<T>(sink, comparator) { + private Limiter<T> limiter; + + @Override + @SuppressWarnings("unchecked") + public void begin(long size) { + limiter = new Limiter<>(limit, size, comparator); + } + + @Override + @SuppressWarnings("unchecked") + public void end() { + limiter.sort(); + T[] array = limiter.data; + int offset = limiter.size; + downstream.begin(offset); + if (!cancellationWasRequested) { + for(int i = 0; i<offset; i++) + downstream.accept(array[i]); + } + else { + for(int i = 0; i<offset; i++) { + if (downstream.cancellationRequested()) break; + downstream.accept(array[i]); + } + } + downstream.end(); + } + + @Override + public void accept(T t) { + limiter.put(t); + } + }; + } + + @Override + public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, + Spliterator<P_IN> spliterator, + IntFunction<T[]> generator) { + TerminalOp<T, Limiter<T>> op = ReduceOps.<T, Limiter<T>>makeRef( + () -> new Limiter<>(limit, -1, comparator), Limiter::put, Limiter::putAll); + Limiter<T> limiter = op.evaluateParallel(helper, spliterator); + // No reason to use parallelSort here as + // limit <= SORTED_LIMIT_MAX_SIZE < MIN_ARRAY_SORT_GRAN + limiter.sort(); + T[] array = limiter.data; + int length = limiter.size; + return Nodes.node(Arrays.copyOfRange(array, 0, length)); + } + } + /** * Specialized subtype for sorting reference streams */ private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> { /** * Comparator used for sorting */ private final boolean isNaturalSort; private final Comparator<? super T> comparator; + private boolean skip; /** * Sort using natural order of {@literal <T>} which must be * {@code Comparable}. */
*** 128,143 **** this.isNaturalSort = false; this.comparator = Objects.requireNonNull(comparator); } @Override public Sink<T> opWrapSink(int flags, Sink<T> sink) { Objects.requireNonNull(sink); // If the input is already naturally sorted and this operation // also naturally sorted then this is a no-op ! if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) return sink; else if (StreamOpFlag.SIZED.isKnown(flags)) return new SizedRefSortingSink<>(sink, comparator); else return new RefSortingSink<>(sink, comparator); --- 332,357 ---- this.isNaturalSort = false; this.comparator = Objects.requireNonNull(comparator); } @Override + public final Stream<T> limit(long maxSize) { + if(maxSize < 0 || maxSize > SortedLimit.SORTED_LIMIT_MAX_SIZE) + return super.limit(maxSize); + skip = true; + if(maxSize == 0) + return super.limit(maxSize); + return new SortedLimit<>(this, comparator, (int)maxSize); + } + + @Override public Sink<T> opWrapSink(int flags, Sink<T> sink) { Objects.requireNonNull(sink); // If the input is already naturally sorted and this operation // also naturally sorted then this is a no-op ! if (skip || (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)) return sink; else if (StreamOpFlag.SIZED.isKnown(flags)) return new SizedRefSortingSink<>(sink, comparator); else return new RefSortingSink<>(sink, comparator);
*** 147,166 **** public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator) { // If the input is already naturally sorted and this operation // naturally sorts then collect the output ! if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) { return helper.evaluate(spliterator, false, generator); } else { // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator); Arrays.parallelSort(flattenedData, comparator); return Nodes.node(flattenedData); } } } /** * Specialized subtype for sorting int streams. */ --- 361,392 ---- public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator) { // If the input is already naturally sorted and this operation // naturally sorts then collect the output ! if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) { return helper.evaluate(spliterator, false, generator); } else { // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator); Arrays.parallelSort(flattenedData, comparator); return Nodes.node(flattenedData); } } + + @Override + <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { + // If the input is already naturally sorted and this operation + // naturally sorts then perfrom a no-op + if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) { + return helper.wrapSpliterator(spliterator); + } + else { + return super.opEvaluateParallelLazy(helper, spliterator); + } + } } /** * Specialized subtype for sorting int streams. */
< prev index next >