< prev index next >

src/java.base/share/classes/java/util/stream/SortedOps.java

Print this page




  76     /**
  77      * Appends a "sorted" operation to the provided stream.
  78      *
  79      * @param <T> the type of both input and output elements
  80      * @param upstream a reference stream with element type T
  81      */
  82     static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
  83         return new OfLong(upstream);
  84     }
  85 
  86     /**
  87      * Appends a "sorted" operation to the provided stream.
  88      *
  89      * @param <T> the type of both input and output elements
  90      * @param upstream a reference stream with element type T
  91      */
  92     static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
  93         return new OfDouble(upstream);
  94     }
  95 











































































































































































































  96     /**
  97      * Specialized subtype for sorting reference streams
  98      */
  99     private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
 100         /**
 101          * Comparator used for sorting
 102          */
 103         private final boolean isNaturalSort;
 104         private final Comparator<? super T> comparator;

 105 
 106         /**
 107          * Sort using natural order of {@literal <T>} which must be
 108          * {@code Comparable}.
 109          */
 110         OfRef(AbstractPipeline<?, T, ?> upstream) {
 111             super(upstream, StreamShape.REFERENCE,
 112                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 113             this.isNaturalSort = true;
 114             // Will throw CCE when we try to sort if T is not Comparable
 115             @SuppressWarnings("unchecked")
 116             Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
 117             this.comparator = comp;
 118         }
 119 
 120         /**
 121          * Sort using the provided comparator.
 122          *
 123          * @param comparator The comparator to be used to evaluate ordering.
 124          */
 125         OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
 126             super(upstream, StreamShape.REFERENCE,
 127                   StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
 128             this.isNaturalSort = false;
 129             this.comparator = Objects.requireNonNull(comparator);
 130         }
 131 
 132         @Override










 133         public Sink<T> opWrapSink(int flags, Sink<T> sink) {
 134             Objects.requireNonNull(sink);
 135 
 136             // If the input is already naturally sorted and this operation
 137             // also naturally sorted then this is a no-op
 138             if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
 139                 return sink;
 140             else if (StreamOpFlag.SIZED.isKnown(flags))
 141                 return new SizedRefSortingSink<>(sink, comparator);
 142             else
 143                 return new RefSortingSink<>(sink, comparator);
 144         }
 145 
 146         @Override
 147         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 148                                                  Spliterator<P_IN> spliterator,
 149                                                  IntFunction<T[]> generator) {
 150             // If the input is already naturally sorted and this operation
 151             // naturally sorts then collect the output
 152             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
 153                 return helper.evaluate(spliterator, false, generator);
 154             }
 155             else {
 156                 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
 157                 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
 158                 Arrays.parallelSort(flattenedData, comparator);
 159                 return Nodes.node(flattenedData);












 160             }
 161         }
 162     }
 163 
 164     /**
 165      * Specialized subtype for sorting int streams.
 166      */
 167     private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
 168         OfInt(AbstractPipeline<?, Integer, ?> upstream) {
 169             super(upstream, StreamShape.INT_VALUE,
 170                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 171         }
 172 
 173         @Override
 174         public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 175             Objects.requireNonNull(sink);
 176 
 177             if (StreamOpFlag.SORTED.isKnown(flags))
 178                 return sink;
 179             else if (StreamOpFlag.SIZED.isKnown(flags))




  76     /**
  77      * Appends a "sorted" operation to the provided stream.
  78      *
  79      * @param <T> the type of both input and output elements
  80      * @param upstream a reference stream with element type T
  81      */
  82     static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
  83         return new OfLong(upstream);
  84     }
  85 
  86     /**
  87      * Appends a "sorted" operation to the provided stream.
  88      *
  89      * @param <T> the type of both input and output elements
  90      * @param upstream a reference stream with element type T
  91      */
  92     static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
  93         return new OfDouble(upstream);
  94     }
  95 
  96     static class Limiter<T> {
  97         private final T[] data;
  98         private final int limit;
  99         private final Comparator<? super T> comparator;
 100         private int size;
 101         private boolean initial = true;
 102     
 103         /**
 104          * @param limit how many least elements to keep
 105          * @param length length of the input if known, -1 otherwise
 106          * @param comparator
 107          */
 108         @SuppressWarnings("unchecked")
 109         Limiter(int limit, long length, Comparator<? super T> comparator) {
 110             this.limit = limit;
 111             this.comparator = comparator;
 112             int dataSize = limit * 2;
 113             if(length >= 0 && length < dataSize)
 114                 dataSize = (int) length;
 115             this.data = (T[]) new Object[dataSize];
 116         }
 117     
 118         /**
 119          * Accumulate new element
 120          * 
 121          * @param t element to accumulate
 122          * 
 123          * @return false if the element is definitely not included into result, so
 124          *         any bigger element could be skipped as well, or true if element
 125          *         will probably be included into result.
 126          */
 127         final boolean put(T t) {
 128             int l = limit;
 129             T[] d = data;
 130             if (l == 1) {
 131                 // limit == 1 is the special case: exactly one least element is stored, 
 132                 // no sorting is performed
 133                 if (initial) {
 134                     initial = false;
 135                     size = 1;
 136                 } else if (comparator.compare(t, d[0]) >= 0)
 137                     return false;
 138                 d[0] = t;
 139                 return true;
 140             }
 141             if (initial) {
 142                 if (size < d.length) {
 143                     d[size++] = t;
 144                     return true;
 145                 }
 146                 Arrays.sort(d, comparator);
 147                 initial = false;
 148                 size = l;
 149             }
 150             if (size == d.length) {
 151                 sortTail(d, l, size, comparator);
 152                 size = limit;
 153             }
 154             if (comparator.compare(t, d[l - 1]) < 0) {
 155                 d[size++] = t;
 156                 return true;
 157             }
 158             return false;
 159         }
 160     
 161         /**
 162          * Merge other {@code Limiter} object into this (other object becomes unusable after that).
 163          * 
 164          * @param other other object to merge
 165          * @return this object
 166          */
 167         Limiter<T> putAll(Limiter<T> other) {
 168             int i = 0, l = limit;
 169             T[] od = other.data;
 170             if (!other.initial) {
 171                 // sorted part
 172                 for (; i < l; i++) {
 173                     if (!put(od[i]))
 174                         break;
 175                 }
 176                 i = l;
 177             }
 178             int os = other.size;
 179             for (; i < os; i++) {
 180                 put(od[i]);
 181             }
 182             return this;
 183         }
 184     
 185         /**
 186          * Must be called after accumulation is finished.
 187          */
 188         void sort() {
 189             if (limit == 1)
 190                 return;
 191             if (initial)
 192                 Arrays.sort(data, 0, size, comparator);
 193             else if (size > limit)
 194                 sortTail(data, limit, size, comparator);
 195             if (size > limit)
 196                 size = limit;
 197         }
 198     
 199         /*
 200          * Sort data[limit]..data[size] and merge with presorted data[0]..data[limit-1]
 201          * Assuming that data[limit-1] is the biggest element
 202          */
 203         private static <T> void sortTail(T[] data, int limit, int size, Comparator<? super T> comparator) {
 204             assert size > limit;
 205             Arrays.sort(data, limit, size, comparator);
 206             if (comparator.compare(data[size - 1], data[0]) < 0) {
 207                 // Common case: descending sequence
 208                 System.arraycopy(data, 0, data, size - limit, 2 * limit - size);
 209                 System.arraycopy(data, limit, data, 0, size - limit);
 210             } else {
 211                 // Merge presorted 0..limit-1 and limit..size-1
 212                 @SuppressWarnings("unchecked")
 213                 T[] buf = (T[]) new Object[limit];
 214                 int i = 0, j = limit, k = 0;
 215                 // data[limit-1] is guaranteed to be the worst element, thus no need
 216                 // to check it
 217                 while (i < limit - 1 && k < limit && j < size) {
 218                     buf[k++] = comparator.compare(data[i], data[j]) <= 0 ? data[i++] : data[j++];   
 219                 }
 220                 if (k < limit) {
 221                     System.arraycopy(data, i < limit - 1 ? i : j, data, k, limit - k);
 222                 }
 223                 System.arraycopy(buf, 0, data, 0, k);
 224             }
 225         }
 226     }
 227 
 228     /**
 229      * Performs sorted().limit(maxSize) operation in single step for maxSize < SORTED_LIMIT_MAX_SIZE
 230      */
 231     private static final class SortedLimit<T> extends ReferencePipeline.StatefulOp<T, T> {
 232         static final int SORTED_LIMIT_MAX_SIZE = 1000;
 233 
 234         private final Comparator<? super T> comparator;
 235         private final int limit;
 236 
 237         SortedLimit(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator, int limit) {
 238             super(upstream, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED);
 239             this.comparator = comparator;
 240             this.limit = limit;
 241         }
 242 
 243         @Override
 244         public Sink<T> opWrapSink(int flags, Sink<T> sink) {
 245             Objects.requireNonNull(sink);
 246 
 247             return new AbstractRefSortingSink<T>(sink, comparator) {
 248                 private Limiter<T> limiter;
 249 
 250                 @Override
 251                 @SuppressWarnings("unchecked")
 252                 public void begin(long size) {
 253                     limiter = new Limiter<>(limit, size, comparator);
 254                 }
 255 
 256                 @Override
 257                 @SuppressWarnings("unchecked")
 258                 public void end() {
 259                     limiter.sort();
 260                     T[] array = limiter.data;
 261                     int offset = limiter.size;
 262                     downstream.begin(offset);
 263                     if (!cancellationWasRequested) {
 264                         for(int i = 0; i<offset; i++)
 265                             downstream.accept(array[i]);
 266                     }
 267                     else {
 268                         for(int i = 0; i<offset; i++) {
 269                             if (downstream.cancellationRequested()) break;
 270                             downstream.accept(array[i]);
 271                         }
 272                     }
 273                     downstream.end();
 274                 }
 275 
 276                 @Override
 277                 public void accept(T t) {
 278                     limiter.put(t);
 279                 }
 280             };
 281         }
 282 
 283         @Override
 284         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 285                                                  Spliterator<P_IN> spliterator,
 286                                                  IntFunction<T[]> generator) {
 287             TerminalOp<T, Limiter<T>> op = ReduceOps.<T, Limiter<T>>makeRef(
 288                 () -> new Limiter<>(limit, -1, comparator), Limiter::put, Limiter::putAll);
 289             Limiter<T> limiter = op.evaluateParallel(helper, spliterator);
 290             // No reason to use parallelSort here as 
 291             // limit <= SORTED_LIMIT_MAX_SIZE < MIN_ARRAY_SORT_GRAN
 292             limiter.sort();
 293             T[] array = limiter.data;
 294             int length = limiter.size;
 295             return Nodes.node(Arrays.copyOfRange(array, 0, length));
 296         }
 297     }
 298 
 299     /**
 300      * Specialized subtype for sorting reference streams
 301      */
 302     private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
 303         /**
 304          * Comparator used for sorting
 305          */
 306         private final boolean isNaturalSort;
 307         private final Comparator<? super T> comparator;
 308         private boolean skip;
 309 
 310         /**
 311          * Sort using natural order of {@literal <T>} which must be
 312          * {@code Comparable}.
 313          */
 314         OfRef(AbstractPipeline<?, T, ?> upstream) {
 315             super(upstream, StreamShape.REFERENCE,
 316                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 317             this.isNaturalSort = true;
 318             // Will throw CCE when we try to sort if T is not Comparable
 319             @SuppressWarnings("unchecked")
 320             Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
 321             this.comparator = comp;
 322         }
 323 
 324         /**
 325          * Sort using the provided comparator.
 326          *
 327          * @param comparator The comparator to be used to evaluate ordering.
 328          */
 329         OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
 330             super(upstream, StreamShape.REFERENCE,
 331                   StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
 332             this.isNaturalSort = false;
 333             this.comparator = Objects.requireNonNull(comparator);
 334         }
 335 
 336         @Override
 337         public final Stream<T> limit(long maxSize) {
 338             if(maxSize < 0 || maxSize > SortedLimit.SORTED_LIMIT_MAX_SIZE)
 339                 return super.limit(maxSize);
 340             skip = true;
 341             if(maxSize == 0)
 342                 return super.limit(maxSize);
 343             return new SortedLimit<>(this, comparator, (int)maxSize);
 344         }
 345 
 346         @Override
 347         public Sink<T> opWrapSink(int flags, Sink<T> sink) {
 348             Objects.requireNonNull(sink);
 349 
 350             // If the input is already naturally sorted and this operation
 351             // also naturally sorted then this is a no-op
 352             if (skip || (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort))
 353                 return sink;
 354             else if (StreamOpFlag.SIZED.isKnown(flags))
 355                 return new SizedRefSortingSink<>(sink, comparator);
 356             else
 357                 return new RefSortingSink<>(sink, comparator);
 358         }
 359 
 360         @Override
 361         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 362                                                  Spliterator<P_IN> spliterator,
 363                                                  IntFunction<T[]> generator) {
 364             // If the input is already naturally sorted and this operation
 365             // naturally sorts then collect the output
 366             if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) {
 367                 return helper.evaluate(spliterator, false, generator);
 368             }
 369             else {
 370                 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
 371                 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
 372                 Arrays.parallelSort(flattenedData, comparator);
 373                 return Nodes.node(flattenedData);
 374             }
 375         }
 376 
 377         @Override
 378         <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
 379             // If the input is already naturally sorted and this operation
 380             // naturally sorts then perfrom a no-op
 381             if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) {
 382                 return helper.wrapSpliterator(spliterator);
 383             }
 384             else {
 385                 return super.opEvaluateParallelLazy(helper, spliterator);
 386             }
 387         }
 388     }
 389 
 390     /**
 391      * Specialized subtype for sorting int streams.
 392      */
 393     private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
 394         OfInt(AbstractPipeline<?, Integer, ?> upstream) {
 395             super(upstream, StreamShape.INT_VALUE,
 396                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 397         }
 398 
 399         @Override
 400         public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 401             Objects.requireNonNull(sink);
 402 
 403             if (StreamOpFlag.SORTED.isKnown(flags))
 404                 return sink;
 405             else if (StreamOpFlag.SIZED.isKnown(flags))


< prev index next >