76 /** 77 * Appends a "sorted" operation to the provided stream. 78 * 79 * @param <T> the type of both input and output elements 80 * @param upstream a reference stream with element type T 81 */ 82 static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) { 83 return new OfLong(upstream); 84 } 85 86 /** 87 * Appends a "sorted" operation to the provided stream. 88 * 89 * @param <T> the type of both input and output elements 90 * @param upstream a reference stream with element type T 91 */ 92 static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) { 93 return new OfDouble(upstream); 94 } 95 96 /** 97 * Specialized subtype for sorting reference streams 98 */ 99 private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> { 100 /** 101 * Comparator used for sorting 102 */ 103 private final boolean isNaturalSort; 104 private final Comparator<? super T> comparator; 105 106 /** 107 * Sort using natural order of {@literal <T>} which must be 108 * {@code Comparable}. 109 */ 110 OfRef(AbstractPipeline<?, T, ?> upstream) { 111 super(upstream, StreamShape.REFERENCE, 112 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 113 this.isNaturalSort = true; 114 // Will throw CCE when we try to sort if T is not Comparable 115 @SuppressWarnings("unchecked") 116 Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder(); 117 this.comparator = comp; 118 } 119 120 /** 121 * Sort using the provided comparator. 122 * 123 * @param comparator The comparator to be used to evaluate ordering. 124 */ 125 OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { 126 super(upstream, StreamShape.REFERENCE, 127 StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED); 128 this.isNaturalSort = false; 129 this.comparator = Objects.requireNonNull(comparator); 130 } 131 132 @Override 133 public Sink<T> opWrapSink(int flags, Sink<T> sink) { 134 Objects.requireNonNull(sink); 135 136 // If the input is already naturally sorted and this operation 137 // also naturally sorted then this is a no-op 138 if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) 139 return sink; 140 else if (StreamOpFlag.SIZED.isKnown(flags)) 141 return new SizedRefSortingSink<>(sink, comparator); 142 else 143 return new RefSortingSink<>(sink, comparator); 144 } 145 146 @Override 147 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 148 Spliterator<P_IN> spliterator, 149 IntFunction<T[]> generator) { 150 // If the input is already naturally sorted and this operation 151 // naturally sorts then collect the output 152 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) { 153 return helper.evaluate(spliterator, false, generator); 154 } 155 else { 156 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort 157 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator); 158 Arrays.parallelSort(flattenedData, comparator); 159 return Nodes.node(flattenedData); 160 } 161 } 162 } 163 164 /** 165 * Specialized subtype for sorting int streams. 166 */ 167 private static final class OfInt extends IntPipeline.StatefulOp<Integer> { 168 OfInt(AbstractPipeline<?, Integer, ?> upstream) { 169 super(upstream, StreamShape.INT_VALUE, 170 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 171 } 172 173 @Override 174 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 175 Objects.requireNonNull(sink); 176 177 if (StreamOpFlag.SORTED.isKnown(flags)) 178 return sink; 179 else if (StreamOpFlag.SIZED.isKnown(flags)) | 76 /** 77 * Appends a "sorted" operation to the provided stream. 78 * 79 * @param <T> the type of both input and output elements 80 * @param upstream a reference stream with element type T 81 */ 82 static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) { 83 return new OfLong(upstream); 84 } 85 86 /** 87 * Appends a "sorted" operation to the provided stream. 88 * 89 * @param <T> the type of both input and output elements 90 * @param upstream a reference stream with element type T 91 */ 92 static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) { 93 return new OfDouble(upstream); 94 } 95 96 static class Limiter<T> { 97 private final T[] data; 98 private final int limit; 99 private final Comparator<? super T> comparator; 100 private int size; 101 private boolean initial = true; 102 103 /** 104 * @param limit how many least elements to keep 105 * @param length length of the input if known, -1 otherwise 106 * @param comparator 107 */ 108 @SuppressWarnings("unchecked") 109 Limiter(int limit, long length, Comparator<? super T> comparator) { 110 this.limit = limit; 111 this.comparator = comparator; 112 int dataSize = limit * 2; 113 if(length >= 0 && length < dataSize) 114 dataSize = (int) length; 115 this.data = (T[]) new Object[dataSize]; 116 } 117 118 /** 119 * Accumulate new element 120 * 121 * @param t element to accumulate 122 * 123 * @return false if the element is definitely not included into result, so 124 * any bigger element could be skipped as well, or true if element 125 * will probably be included into result. 126 */ 127 final boolean put(T t) { 128 int l = limit; 129 T[] d = data; 130 if (l == 1) { 131 // limit == 1 is the special case: exactly one least element is stored, 132 // no sorting is performed 133 if (initial) { 134 initial = false; 135 size = 1; 136 } else if (comparator.compare(t, d[0]) >= 0) 137 return false; 138 d[0] = t; 139 return true; 140 } 141 if (initial) { 142 if (size < d.length) { 143 d[size++] = t; 144 return true; 145 } 146 Arrays.sort(d, comparator); 147 initial = false; 148 size = l; 149 } 150 if (size == d.length) { 151 sortTail(d, l, size, comparator); 152 size = limit; 153 } 154 if (comparator.compare(t, d[l - 1]) < 0) { 155 d[size++] = t; 156 return true; 157 } 158 return false; 159 } 160 161 /** 162 * Merge other {@code Limiter} object into this (other object becomes unusable after that). 163 * 164 * @param other other object to merge 165 * @return this object 166 */ 167 Limiter<T> putAll(Limiter<T> other) { 168 int i = 0, l = limit; 169 T[] od = other.data; 170 if (!other.initial) { 171 // sorted part 172 for (; i < l; i++) { 173 if (!put(od[i])) 174 break; 175 } 176 i = l; 177 } 178 int os = other.size; 179 for (; i < os; i++) { 180 put(od[i]); 181 } 182 return this; 183 } 184 185 /** 186 * Must be called after accumulation is finished. 187 */ 188 void sort() { 189 if (limit == 1) 190 return; 191 if (initial) 192 Arrays.sort(data, 0, size, comparator); 193 else if (size > limit) 194 sortTail(data, limit, size, comparator); 195 if (size > limit) 196 size = limit; 197 } 198 199 /* 200 * Sort data[limit]..data[size] and merge with presorted data[0]..data[limit-1] 201 * Assuming that data[limit-1] is the biggest element 202 */ 203 private static <T> void sortTail(T[] data, int limit, int size, Comparator<? super T> comparator) { 204 assert size > limit; 205 Arrays.sort(data, limit, size, comparator); 206 if (comparator.compare(data[size - 1], data[0]) < 0) { 207 // Common case: descending sequence 208 System.arraycopy(data, 0, data, size - limit, 2 * limit - size); 209 System.arraycopy(data, limit, data, 0, size - limit); 210 } else { 211 // Merge presorted 0..limit-1 and limit..size-1 212 @SuppressWarnings("unchecked") 213 T[] buf = (T[]) new Object[limit]; 214 int i = 0, j = limit, k = 0; 215 // data[limit-1] is guaranteed to be the worst element, thus no need 216 // to check it 217 while (i < limit - 1 && k < limit && j < size) { 218 buf[k++] = comparator.compare(data[i], data[j]) <= 0 ? data[i++] : data[j++]; 219 } 220 if (k < limit) { 221 System.arraycopy(data, i < limit - 1 ? i : j, data, k, limit - k); 222 } 223 System.arraycopy(buf, 0, data, 0, k); 224 } 225 } 226 } 227 228 /** 229 * Performs sorted().limit(maxSize) operation in single step for maxSize < SORTED_LIMIT_MAX_SIZE 230 */ 231 private static final class SortedLimit<T> extends ReferencePipeline.StatefulOp<T, T> { 232 static final int SORTED_LIMIT_MAX_SIZE = 1000; 233 234 private final Comparator<? super T> comparator; 235 private final int limit; 236 237 SortedLimit(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator, int limit) { 238 super(upstream, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED); 239 this.comparator = comparator; 240 this.limit = limit; 241 } 242 243 @Override 244 public Sink<T> opWrapSink(int flags, Sink<T> sink) { 245 Objects.requireNonNull(sink); 246 247 return new AbstractRefSortingSink<T>(sink, comparator) { 248 private Limiter<T> limiter; 249 250 @Override 251 @SuppressWarnings("unchecked") 252 public void begin(long size) { 253 limiter = new Limiter<>(limit, size, comparator); 254 } 255 256 @Override 257 @SuppressWarnings("unchecked") 258 public void end() { 259 limiter.sort(); 260 T[] array = limiter.data; 261 int offset = limiter.size; 262 downstream.begin(offset); 263 if (!cancellationWasRequested) { 264 for(int i = 0; i<offset; i++) 265 downstream.accept(array[i]); 266 } 267 else { 268 for(int i = 0; i<offset; i++) { 269 if (downstream.cancellationRequested()) break; 270 downstream.accept(array[i]); 271 } 272 } 273 downstream.end(); 274 } 275 276 @Override 277 public void accept(T t) { 278 limiter.put(t); 279 } 280 }; 281 } 282 283 @Override 284 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 285 Spliterator<P_IN> spliterator, 286 IntFunction<T[]> generator) { 287 TerminalOp<T, Limiter<T>> op = ReduceOps.<T, Limiter<T>>makeRef( 288 () -> new Limiter<>(limit, -1, comparator), Limiter::put, Limiter::putAll); 289 Limiter<T> limiter = op.evaluateParallel(helper, spliterator); 290 // No reason to use parallelSort here as 291 // limit <= SORTED_LIMIT_MAX_SIZE < MIN_ARRAY_SORT_GRAN 292 limiter.sort(); 293 T[] array = limiter.data; 294 int length = limiter.size; 295 return Nodes.node(Arrays.copyOfRange(array, 0, length)); 296 } 297 } 298 299 /** 300 * Specialized subtype for sorting reference streams 301 */ 302 private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> { 303 /** 304 * Comparator used for sorting 305 */ 306 private final boolean isNaturalSort; 307 private final Comparator<? super T> comparator; 308 private boolean skip; 309 310 /** 311 * Sort using natural order of {@literal <T>} which must be 312 * {@code Comparable}. 313 */ 314 OfRef(AbstractPipeline<?, T, ?> upstream) { 315 super(upstream, StreamShape.REFERENCE, 316 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 317 this.isNaturalSort = true; 318 // Will throw CCE when we try to sort if T is not Comparable 319 @SuppressWarnings("unchecked") 320 Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder(); 321 this.comparator = comp; 322 } 323 324 /** 325 * Sort using the provided comparator. 326 * 327 * @param comparator The comparator to be used to evaluate ordering. 328 */ 329 OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { 330 super(upstream, StreamShape.REFERENCE, 331 StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED); 332 this.isNaturalSort = false; 333 this.comparator = Objects.requireNonNull(comparator); 334 } 335 336 @Override 337 public final Stream<T> limit(long maxSize) { 338 if(maxSize < 0 || maxSize > SortedLimit.SORTED_LIMIT_MAX_SIZE) 339 return super.limit(maxSize); 340 skip = true; 341 if(maxSize == 0) 342 return super.limit(maxSize); 343 return new SortedLimit<>(this, comparator, (int)maxSize); 344 } 345 346 @Override 347 public Sink<T> opWrapSink(int flags, Sink<T> sink) { 348 Objects.requireNonNull(sink); 349 350 // If the input is already naturally sorted and this operation 351 // also naturally sorted then this is a no-op 352 if (skip || (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)) 353 return sink; 354 else if (StreamOpFlag.SIZED.isKnown(flags)) 355 return new SizedRefSortingSink<>(sink, comparator); 356 else 357 return new RefSortingSink<>(sink, comparator); 358 } 359 360 @Override 361 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 362 Spliterator<P_IN> spliterator, 363 IntFunction<T[]> generator) { 364 // If the input is already naturally sorted and this operation 365 // naturally sorts then collect the output 366 if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) { 367 return helper.evaluate(spliterator, false, generator); 368 } 369 else { 370 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort 371 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator); 372 Arrays.parallelSort(flattenedData, comparator); 373 return Nodes.node(flattenedData); 374 } 375 } 376 377 @Override 378 <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { 379 // If the input is already naturally sorted and this operation 380 // naturally sorts then perfrom a no-op 381 if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) { 382 return helper.wrapSpliterator(spliterator); 383 } 384 else { 385 return super.opEvaluateParallelLazy(helper, spliterator); 386 } 387 } 388 } 389 390 /** 391 * Specialized subtype for sorting int streams. 392 */ 393 private static final class OfInt extends IntPipeline.StatefulOp<Integer> { 394 OfInt(AbstractPipeline<?, Integer, ?> upstream) { 395 super(upstream, StreamShape.INT_VALUE, 396 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 397 } 398 399 @Override 400 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 401 Objects.requireNonNull(sink); 402 403 if (StreamOpFlag.SORTED.isKnown(flags)) 404 return sink; 405 else if (StreamOpFlag.SIZED.isKnown(flags)) |