112 super(upstream, StreamShape.REFERENCE,
113 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
114 this.isNaturalSort = true;
115 // Will throw CCE when we try to sort if T is not Comparable
116 this.comparator = (Comparator<? super T>) Comparator.naturalOrder();
117 }
118
119 /**
120 * Sort using the provided comparator.
121 *
122 * @param comparator The comparator to be used to evaluate ordering.
123 */
124 OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
125 super(upstream, StreamShape.REFERENCE,
126 StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
127 this.isNaturalSort = false;
128 this.comparator = Objects.requireNonNull(comparator);
129 }
130
131 @Override
132 public Sink<T> opWrapSink(int flags, Sink sink) {
133 Objects.requireNonNull(sink);
134
135 // If the input is already naturally sorted and this operation
136 // also naturally sorted then this is a no-op
137 if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
138 return sink;
139 else if (StreamOpFlag.SIZED.isKnown(flags))
140 return new SizedRefSortingSink<>(sink, comparator);
141 else
142 return new RefSortingSink<>(sink, comparator);
143 }
144
145 @Override
146 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
147 Spliterator<P_IN> spliterator,
148 IntFunction<T[]> generator) {
149 // If the input is already naturally sorted and this operation
150 // naturally sorts then collect the output
151 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
152 return helper.evaluate(spliterator, false, generator);
263 public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
264 Spliterator<P_IN> spliterator,
265 IntFunction<Double[]> generator) {
266 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
267 return helper.evaluate(spliterator, false, generator);
268 }
269 else {
270 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
271
272 double[] content = n.asPrimitiveArray();
273 Arrays.parallelSort(content);
274
275 return Nodes.node(content);
276 }
277 }
278 }
279
280 /**
281 * {@link ForkJoinTask} for implementing sort on SIZED reference streams.
282 */
283 private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T> {
284 private final Comparator<? super T> comparator;
285 private T[] array;
286 private int offset;
287
288 SizedRefSortingSink(Sink<T> sink, Comparator<? super T> comparator) {
289 super(sink);
290 this.comparator = comparator;
291 }
292
293 @Override
294 public void begin(long size) {
295 if (size >= Nodes.MAX_ARRAY_SIZE)
296 throw new IllegalArgumentException("Stream size exceeds max array size");
297 array = (T[]) new Object[(int) size];
298 }
299
300 @Override
301 public void end() {
302 // Need to use offset rather than array.length since the downstream
303 // many be short-circuiting
304 // @@@ A better approach is to know if the downstream short-circuits
305 // and check sink.cancellationRequested
306 Arrays.sort(array, 0, offset, comparator);
307 downstream.begin(offset);
308 for (int i = 0; i < offset; i++)
309 downstream.accept(array[i]);
310 downstream.end();
311 array = null;
312 }
313
314 @Override
315 public void accept(T t) {
316 array[offset++] = t;
317 }
318 }
319
320 /**
321 * {@link Sink} for implementing sort on reference streams.
322 */
323 private static final class RefSortingSink<T> extends Sink.ChainedReference<T> {
324 private final Comparator<? super T> comparator;
325 private ArrayList<T> list;
326
327 RefSortingSink(Sink<T> sink, Comparator<? super T> comparator) {
328 super(sink);
329 this.comparator = comparator;
330 }
331
332 @Override
333 public void begin(long size) {
334 list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
335 }
336
337 @Override
338 public void end() {
339 list.sort(comparator);
340 downstream.begin(list.size());
341 list.forEach(downstream::accept);
342 downstream.end();
343 list = null;
344 }
345
346 @Override
347 public void accept(T t) {
348 list.add(t);
349 }
350 }
351
352 /**
353 * {@link Sink} for implementing sort on SIZED int streams.
354 */
355 private static final class SizedIntSortingSink extends Sink.ChainedInt {
356 private int[] array;
357 private int offset;
358
359 SizedIntSortingSink(Sink downstream) {
360 super(downstream);
361 }
362
363 @Override
364 public void begin(long size) {
365 if (size >= Nodes.MAX_ARRAY_SIZE)
366 throw new IllegalArgumentException("Stream size exceeds max array size");
367 array = new int[(int) size];
368 }
369
370 @Override
371 public void end() {
372 Arrays.sort(array, 0, offset);
373 downstream.begin(offset);
374 for (int i = 0; i < offset; i++)
375 downstream.accept(array[i]);
376 downstream.end();
377 array = null;
378 }
379
380 @Override
381 public void accept(int t) {
382 array[offset++] = t;
383 }
384 }
385
386 /**
387 * {@link Sink} for implementing sort on int streams.
388 */
389 private static final class IntSortingSink extends Sink.ChainedInt {
390 private SpinedBuffer.OfInt b;
391
392 IntSortingSink(Sink sink) {
393 super(sink);
394 }
395
396 @Override
397 public void begin(long size) {
398 b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
399 }
400
401 @Override
402 public void end() {
403 int[] ints = b.asPrimitiveArray();
404 Arrays.sort(ints);
405 downstream.begin(ints.length);
406 for (int anInt : ints)
407 downstream.accept(anInt);
408 downstream.end();
409 }
410
411 @Override
412 public void accept(int t) {
413 b.accept(t);
414 }
415 }
416
417 /**
418 * {@link Sink} for implementing sort on SIZED long streams.
419 */
420 private static final class SizedLongSortingSink extends Sink.ChainedLong {
421 private long[] array;
422 private int offset;
423
424 SizedLongSortingSink(Sink downstream) {
425 super(downstream);
426 }
427
428 @Override
429 public void begin(long size) {
430 if (size >= Nodes.MAX_ARRAY_SIZE)
431 throw new IllegalArgumentException("Stream size exceeds max array size");
432 array = new long[(int) size];
433 }
434
435 @Override
436 public void end() {
437 Arrays.sort(array, 0, offset);
438 downstream.begin(offset);
439 for (int i = 0; i < offset; i++)
440 downstream.accept(array[i]);
441 downstream.end();
442 array = null;
443 }
444
445 @Override
446 public void accept(long t) {
447 array[offset++] = t;
448 }
449 }
450
451 /**
452 * {@link Sink} for implementing sort on long streams.
453 */
454 private static final class LongSortingSink extends Sink.ChainedLong {
455 private SpinedBuffer.OfLong b;
456
457 LongSortingSink(Sink sink) {
458 super(sink);
459 }
460
461 @Override
462 public void begin(long size) {
463 b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
464 }
465
466 @Override
467 public void end() {
468 long[] longs = b.asPrimitiveArray();
469 Arrays.sort(longs);
470 downstream.begin(longs.length);
471 for (long aLong : longs)
472 downstream.accept(aLong);
473 downstream.end();
474 }
475
476 @Override
477 public void accept(long t) {
478 b.accept(t);
479 }
480 }
481
482 /**
483 * {@link Sink} for implementing sort on SIZED double streams.
484 */
485 private static final class SizedDoubleSortingSink extends Sink.ChainedDouble {
486 private double[] array;
487 private int offset;
488
489 SizedDoubleSortingSink(Sink downstream) {
490 super(downstream);
491 }
492
493 @Override
494 public void begin(long size) {
495 if (size >= Nodes.MAX_ARRAY_SIZE)
496 throw new IllegalArgumentException("Stream size exceeds max array size");
497 array = new double[(int) size];
498 }
499
500 @Override
501 public void end() {
502 Arrays.sort(array, 0, offset);
503 downstream.begin(offset);
504 for (int i = 0; i < offset; i++)
505 downstream.accept(array[i]);
506 downstream.end();
507 array = null;
508 }
509
510 @Override
511 public void accept(double t) {
512 array[offset++] = t;
513 }
514 }
515
516 /**
517 * {@link Sink} for implementing sort on double streams.
518 */
519 private static final class DoubleSortingSink extends Sink.ChainedDouble {
520 private SpinedBuffer.OfDouble b;
521
522 DoubleSortingSink(Sink sink) {
523 super(sink);
524 }
525
526 @Override
527 public void begin(long size) {
528 b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
529 }
530
531 @Override
532 public void end() {
533 double[] doubles = b.asPrimitiveArray();
534 Arrays.sort(doubles);
535 downstream.begin(doubles.length);
536 for (double aDouble : doubles)
537 downstream.accept(aDouble);
538 downstream.end();
539 }
540
541 @Override
542 public void accept(double t) {
|
112 super(upstream, StreamShape.REFERENCE,
113 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
114 this.isNaturalSort = true;
115 // Will throw CCE when we try to sort if T is not Comparable
116 this.comparator = (Comparator<? super T>) Comparator.naturalOrder();
117 }
118
119 /**
120 * Sort using the provided comparator.
121 *
122 * @param comparator The comparator to be used to evaluate ordering.
123 */
124 OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
125 super(upstream, StreamShape.REFERENCE,
126 StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
127 this.isNaturalSort = false;
128 this.comparator = Objects.requireNonNull(comparator);
129 }
130
131 @Override
132 public Sink<T> opWrapSink(int flags, Sink<T> sink) {
133 Objects.requireNonNull(sink);
134
135 // If the input is already naturally sorted and this operation
136 // also naturally sorted then this is a no-op
137 if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
138 return sink;
139 else if (StreamOpFlag.SIZED.isKnown(flags))
140 return new SizedRefSortingSink<>(sink, comparator);
141 else
142 return new RefSortingSink<>(sink, comparator);
143 }
144
145 @Override
146 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
147 Spliterator<P_IN> spliterator,
148 IntFunction<T[]> generator) {
149 // If the input is already naturally sorted and this operation
150 // naturally sorts then collect the output
151 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
152 return helper.evaluate(spliterator, false, generator);
263 public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
264 Spliterator<P_IN> spliterator,
265 IntFunction<Double[]> generator) {
266 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
267 return helper.evaluate(spliterator, false, generator);
268 }
269 else {
270 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
271
272 double[] content = n.asPrimitiveArray();
273 Arrays.parallelSort(content);
274
275 return Nodes.node(content);
276 }
277 }
278 }
279
280 /**
281 * {@link ForkJoinTask} for implementing sort on SIZED reference streams.
282 */
283 private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T, T> {
284 private final Comparator<? super T> comparator;
285 private T[] array;
286 private int offset;
287
288 SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
289 super(sink);
290 this.comparator = comparator;
291 }
292
293 @Override
294 public void begin(long size) {
295 if (size >= Nodes.MAX_ARRAY_SIZE)
296 throw new IllegalArgumentException("Stream size exceeds max array size");
297 array = (T[]) new Object[(int) size];
298 }
299
300 @Override
301 public void end() {
302 // Need to use offset rather than array.length since the downstream
303 // many be short-circuiting
304 // @@@ A better approach is to know if the downstream short-circuits
305 // and check sink.cancellationRequested
306 Arrays.sort(array, 0, offset, comparator);
307 downstream.begin(offset);
308 for (int i = 0; i < offset; i++)
309 downstream.accept(array[i]);
310 downstream.end();
311 array = null;
312 }
313
314 @Override
315 public void accept(T t) {
316 array[offset++] = t;
317 }
318 }
319
320 /**
321 * {@link Sink} for implementing sort on reference streams.
322 */
323 private static final class RefSortingSink<T> extends Sink.ChainedReference<T, T> {
324 private final Comparator<? super T> comparator;
325 private ArrayList<T> list;
326
327 RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
328 super(sink);
329 this.comparator = comparator;
330 }
331
332 @Override
333 public void begin(long size) {
334 list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
335 }
336
337 @Override
338 public void end() {
339 list.sort(comparator);
340 downstream.begin(list.size());
341 list.forEach(downstream::accept);
342 downstream.end();
343 list = null;
344 }
345
346 @Override
347 public void accept(T t) {
348 list.add(t);
349 }
350 }
351
352 /**
353 * {@link Sink} for implementing sort on SIZED int streams.
354 */
355 private static final class SizedIntSortingSink extends Sink.ChainedInt<Integer> {
356 private int[] array;
357 private int offset;
358
359 SizedIntSortingSink(Sink<? super Integer> downstream) {
360 super(downstream);
361 }
362
363 @Override
364 public void begin(long size) {
365 if (size >= Nodes.MAX_ARRAY_SIZE)
366 throw new IllegalArgumentException("Stream size exceeds max array size");
367 array = new int[(int) size];
368 }
369
370 @Override
371 public void end() {
372 Arrays.sort(array, 0, offset);
373 downstream.begin(offset);
374 for (int i = 0; i < offset; i++)
375 downstream.accept(array[i]);
376 downstream.end();
377 array = null;
378 }
379
380 @Override
381 public void accept(int t) {
382 array[offset++] = t;
383 }
384 }
385
386 /**
387 * {@link Sink} for implementing sort on int streams.
388 */
389 private static final class IntSortingSink extends Sink.ChainedInt<Integer> {
390 private SpinedBuffer.OfInt b;
391
392 IntSortingSink(Sink<? super Integer> sink) {
393 super(sink);
394 }
395
396 @Override
397 public void begin(long size) {
398 b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
399 }
400
401 @Override
402 public void end() {
403 int[] ints = b.asPrimitiveArray();
404 Arrays.sort(ints);
405 downstream.begin(ints.length);
406 for (int anInt : ints)
407 downstream.accept(anInt);
408 downstream.end();
409 }
410
411 @Override
412 public void accept(int t) {
413 b.accept(t);
414 }
415 }
416
417 /**
418 * {@link Sink} for implementing sort on SIZED long streams.
419 */
420 private static final class SizedLongSortingSink extends Sink.ChainedLong<Long> {
421 private long[] array;
422 private int offset;
423
424 SizedLongSortingSink(Sink<? super Long> downstream) {
425 super(downstream);
426 }
427
428 @Override
429 public void begin(long size) {
430 if (size >= Nodes.MAX_ARRAY_SIZE)
431 throw new IllegalArgumentException("Stream size exceeds max array size");
432 array = new long[(int) size];
433 }
434
435 @Override
436 public void end() {
437 Arrays.sort(array, 0, offset);
438 downstream.begin(offset);
439 for (int i = 0; i < offset; i++)
440 downstream.accept(array[i]);
441 downstream.end();
442 array = null;
443 }
444
445 @Override
446 public void accept(long t) {
447 array[offset++] = t;
448 }
449 }
450
451 /**
452 * {@link Sink} for implementing sort on long streams.
453 */
454 private static final class LongSortingSink extends Sink.ChainedLong<Long> {
455 private SpinedBuffer.OfLong b;
456
457 LongSortingSink(Sink<? super Long> sink) {
458 super(sink);
459 }
460
461 @Override
462 public void begin(long size) {
463 b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
464 }
465
466 @Override
467 public void end() {
468 long[] longs = b.asPrimitiveArray();
469 Arrays.sort(longs);
470 downstream.begin(longs.length);
471 for (long aLong : longs)
472 downstream.accept(aLong);
473 downstream.end();
474 }
475
476 @Override
477 public void accept(long t) {
478 b.accept(t);
479 }
480 }
481
482 /**
483 * {@link Sink} for implementing sort on SIZED double streams.
484 */
485 private static final class SizedDoubleSortingSink extends Sink.ChainedDouble<Double> {
486 private double[] array;
487 private int offset;
488
489 SizedDoubleSortingSink(Sink<? super Double> downstream) {
490 super(downstream);
491 }
492
493 @Override
494 public void begin(long size) {
495 if (size >= Nodes.MAX_ARRAY_SIZE)
496 throw new IllegalArgumentException("Stream size exceeds max array size");
497 array = new double[(int) size];
498 }
499
500 @Override
501 public void end() {
502 Arrays.sort(array, 0, offset);
503 downstream.begin(offset);
504 for (int i = 0; i < offset; i++)
505 downstream.accept(array[i]);
506 downstream.end();
507 array = null;
508 }
509
510 @Override
511 public void accept(double t) {
512 array[offset++] = t;
513 }
514 }
515
516 /**
517 * {@link Sink} for implementing sort on double streams.
518 */
519 private static final class DoubleSortingSink extends Sink.ChainedDouble<Double> {
520 private SpinedBuffer.OfDouble b;
521
522 DoubleSortingSink(Sink<? super Double> sink) {
523 super(sink);
524 }
525
526 @Override
527 public void begin(long size) {
528 b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
529 }
530
531 @Override
532 public void end() {
533 double[] doubles = b.asPrimitiveArray();
534 Arrays.sort(doubles);
535 downstream.begin(doubles.length);
536 for (double aDouble : doubles)
537 downstream.accept(aDouble);
538 downstream.end();
539 }
540
541 @Override
542 public void accept(double t) {
|