src/share/classes/java/util/stream/SortedOps.java

Print this page
rev 7923 : 8023681: Fix raw type warning caused by Sink
Reviewed-by:


 112             super(upstream, StreamShape.REFERENCE,
 113                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 114             this.isNaturalSort = true;
 115             // Will throw CCE when we try to sort if T is not Comparable
 116             this.comparator = (Comparator<? super T>) Comparator.naturalOrder();
 117         }
 118 
 119         /**
 120          * Sort using the provided comparator.
 121          *
 122          * @param comparator The comparator to be used to evaluate ordering.
 123          */
 124         OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
 125             super(upstream, StreamShape.REFERENCE,
 126                   StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
 127             this.isNaturalSort = false;
 128             this.comparator = Objects.requireNonNull(comparator);
 129         }
 130 
 131         @Override
 132         public Sink<T> opWrapSink(int flags, Sink sink) {
 133             Objects.requireNonNull(sink);
 134 
 135             // If the input is already naturally sorted and this operation
 136             // also naturally sorted then this is a no-op
 137             if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
 138                 return sink;
 139             else if (StreamOpFlag.SIZED.isKnown(flags))
 140                 return new SizedRefSortingSink<>(sink, comparator);
 141             else
 142                 return new RefSortingSink<>(sink, comparator);
 143         }
 144 
 145         @Override
 146         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 147                                                  Spliterator<P_IN> spliterator,
 148                                                  IntFunction<T[]> generator) {
 149             // If the input is already naturally sorted and this operation
 150             // naturally sorts then collect the output
 151             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
 152                 return helper.evaluate(spliterator, false, generator);


 263         public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 264                                                       Spliterator<P_IN> spliterator,
 265                                                       IntFunction<Double[]> generator) {
 266             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 267                 return helper.evaluate(spliterator, false, generator);
 268             }
 269             else {
 270                 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
 271 
 272                 double[] content = n.asPrimitiveArray();
 273                 Arrays.parallelSort(content);
 274 
 275                 return Nodes.node(content);
 276             }
 277         }
 278     }
 279 
 280     /**
 281      * {@link ForkJoinTask} for implementing sort on SIZED reference streams.
 282      */
 283     private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T> {
 284         private final Comparator<? super T> comparator;
 285         private T[] array;
 286         private int offset;
 287 
 288         SizedRefSortingSink(Sink<T> sink, Comparator<? super T> comparator) {
 289             super(sink);
 290             this.comparator = comparator;
 291         }
 292 
 293         @Override
 294         public void begin(long size) {
 295             if (size >= Nodes.MAX_ARRAY_SIZE)
 296                 throw new IllegalArgumentException("Stream size exceeds max array size");
 297             array = (T[]) new Object[(int) size];
 298         }
 299 
 300         @Override
 301         public void end() {
 302             // Need to use offset rather than array.length since the downstream
 303             // many be short-circuiting
 304             // @@@ A better approach is to know if the downstream short-circuits
 305             //     and check sink.cancellationRequested
 306             Arrays.sort(array, 0, offset, comparator);
 307             downstream.begin(offset);
 308             for (int i = 0; i < offset; i++)
 309                 downstream.accept(array[i]);
 310             downstream.end();
 311             array = null;
 312         }
 313 
 314         @Override
 315         public void accept(T t) {
 316             array[offset++] = t;
 317         }
 318     }
 319 
 320     /**
 321      * {@link Sink} for implementing sort on reference streams.
 322      */
 323     private static final class RefSortingSink<T> extends Sink.ChainedReference<T> {
 324         private final Comparator<? super T> comparator;
 325         private ArrayList<T> list;
 326 
 327         RefSortingSink(Sink<T> sink, Comparator<? super T> comparator) {
 328             super(sink);
 329             this.comparator = comparator;
 330         }
 331 
 332         @Override
 333         public void begin(long size) {
 334             list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
 335         }
 336 
 337         @Override
 338         public void end() {
 339             list.sort(comparator);
 340             downstream.begin(list.size());
 341             list.forEach(downstream::accept);
 342             downstream.end();
 343             list = null;
 344         }
 345 
 346         @Override
 347         public void accept(T t) {
 348             list.add(t);
 349         }
 350     }
 351 
 352     /**
 353      * {@link Sink} for implementing sort on SIZED int streams.
 354      */
 355     private static final class SizedIntSortingSink extends Sink.ChainedInt {
 356         private int[] array;
 357         private int offset;
 358 
 359         SizedIntSortingSink(Sink downstream) {
 360             super(downstream);
 361         }
 362 
 363         @Override
 364         public void begin(long size) {
 365             if (size >= Nodes.MAX_ARRAY_SIZE)
 366                 throw new IllegalArgumentException("Stream size exceeds max array size");
 367             array = new int[(int) size];
 368         }
 369 
 370         @Override
 371         public void end() {
 372             Arrays.sort(array, 0, offset);
 373             downstream.begin(offset);
 374             for (int i = 0; i < offset; i++)
 375                 downstream.accept(array[i]);
 376             downstream.end();
 377             array = null;
 378         }
 379 
 380         @Override
 381         public void accept(int t) {
 382             array[offset++] = t;
 383         }
 384     }
 385 
 386     /**
 387      * {@link Sink} for implementing sort on int streams.
 388      */
 389     private static final class IntSortingSink extends Sink.ChainedInt {
 390         private SpinedBuffer.OfInt b;
 391 
 392         IntSortingSink(Sink sink) {
 393             super(sink);
 394         }
 395 
 396         @Override
 397         public void begin(long size) {
 398             b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
 399         }
 400 
 401         @Override
 402         public void end() {
 403             int[] ints = b.asPrimitiveArray();
 404             Arrays.sort(ints);
 405             downstream.begin(ints.length);
 406             for (int anInt : ints)
 407                 downstream.accept(anInt);
 408             downstream.end();
 409         }
 410 
 411         @Override
 412         public void accept(int t) {
 413             b.accept(t);
 414         }
 415     }
 416 
 417     /**
 418      * {@link Sink} for implementing sort on SIZED long streams.
 419      */
 420     private static final class SizedLongSortingSink extends Sink.ChainedLong {
 421         private long[] array;
 422         private int offset;
 423 
 424         SizedLongSortingSink(Sink downstream) {
 425             super(downstream);
 426         }
 427 
 428         @Override
 429         public void begin(long size) {
 430             if (size >= Nodes.MAX_ARRAY_SIZE)
 431                 throw new IllegalArgumentException("Stream size exceeds max array size");
 432             array = new long[(int) size];
 433         }
 434 
 435         @Override
 436         public void end() {
 437             Arrays.sort(array, 0, offset);
 438             downstream.begin(offset);
 439             for (int i = 0; i < offset; i++)
 440                 downstream.accept(array[i]);
 441             downstream.end();
 442             array = null;
 443         }
 444 
 445         @Override
 446         public void accept(long t) {
 447             array[offset++] = t;
 448         }
 449     }
 450 
 451     /**
 452      * {@link Sink} for implementing sort on long streams.
 453      */
 454     private static final class LongSortingSink extends Sink.ChainedLong {
 455         private SpinedBuffer.OfLong b;
 456 
 457         LongSortingSink(Sink sink) {
 458             super(sink);
 459         }
 460 
 461         @Override
 462         public void begin(long size) {
 463             b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
 464         }
 465 
 466         @Override
 467         public void end() {
 468             long[] longs = b.asPrimitiveArray();
 469             Arrays.sort(longs);
 470             downstream.begin(longs.length);
 471             for (long aLong : longs)
 472                 downstream.accept(aLong);
 473             downstream.end();
 474         }
 475 
 476         @Override
 477         public void accept(long t) {
 478             b.accept(t);
 479         }
 480     }
 481 
 482     /**
 483      * {@link Sink} for implementing sort on SIZED double streams.
 484      */
 485     private static final class SizedDoubleSortingSink extends Sink.ChainedDouble {
 486         private double[] array;
 487         private int offset;
 488 
 489         SizedDoubleSortingSink(Sink downstream) {
 490             super(downstream);
 491         }
 492 
 493         @Override
 494         public void begin(long size) {
 495             if (size >= Nodes.MAX_ARRAY_SIZE)
 496                 throw new IllegalArgumentException("Stream size exceeds max array size");
 497             array = new double[(int) size];
 498         }
 499 
 500         @Override
 501         public void end() {
 502             Arrays.sort(array, 0, offset);
 503             downstream.begin(offset);
 504             for (int i = 0; i < offset; i++)
 505                 downstream.accept(array[i]);
 506             downstream.end();
 507             array = null;
 508         }
 509 
 510         @Override
 511         public void accept(double t) {
 512             array[offset++] = t;
 513         }
 514     }
 515 
 516     /**
 517      * {@link Sink} for implementing sort on double streams.
 518      */
 519     private static final class DoubleSortingSink extends Sink.ChainedDouble {
 520         private SpinedBuffer.OfDouble b;
 521 
 522         DoubleSortingSink(Sink sink) {
 523             super(sink);
 524         }
 525 
 526         @Override
 527         public void begin(long size) {
 528             b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
 529         }
 530 
 531         @Override
 532         public void end() {
 533             double[] doubles = b.asPrimitiveArray();
 534             Arrays.sort(doubles);
 535             downstream.begin(doubles.length);
 536             for (double aDouble : doubles)
 537                 downstream.accept(aDouble);
 538             downstream.end();
 539         }
 540 
 541         @Override
 542         public void accept(double t) {


 112             super(upstream, StreamShape.REFERENCE,
 113                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 114             this.isNaturalSort = true;
 115             // Will throw CCE when we try to sort if T is not Comparable
 116             this.comparator = (Comparator<? super T>) Comparator.naturalOrder();
 117         }
 118 
 119         /**
 120          * Sort using the provided comparator.
 121          *
 122          * @param comparator The comparator to be used to evaluate ordering.
 123          */
 124         OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
 125             super(upstream, StreamShape.REFERENCE,
 126                   StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
 127             this.isNaturalSort = false;
 128             this.comparator = Objects.requireNonNull(comparator);
 129         }
 130 
 131         @Override
 132         public Sink<T> opWrapSink(int flags, Sink<T> sink) {
 133             Objects.requireNonNull(sink);
 134 
 135             // If the input is already naturally sorted and this operation
 136             // also naturally sorted then this is a no-op
 137             if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
 138                 return sink;
 139             else if (StreamOpFlag.SIZED.isKnown(flags))
 140                 return new SizedRefSortingSink<>(sink, comparator);
 141             else
 142                 return new RefSortingSink<>(sink, comparator);
 143         }
 144 
 145         @Override
 146         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 147                                                  Spliterator<P_IN> spliterator,
 148                                                  IntFunction<T[]> generator) {
 149             // If the input is already naturally sorted and this operation
 150             // naturally sorts then collect the output
 151             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
 152                 return helper.evaluate(spliterator, false, generator);


 263         public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 264                                                       Spliterator<P_IN> spliterator,
 265                                                       IntFunction<Double[]> generator) {
 266             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 267                 return helper.evaluate(spliterator, false, generator);
 268             }
 269             else {
 270                 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
 271 
 272                 double[] content = n.asPrimitiveArray();
 273                 Arrays.parallelSort(content);
 274 
 275                 return Nodes.node(content);
 276             }
 277         }
 278     }
 279 
 280     /**
 281      * {@link ForkJoinTask} for implementing sort on SIZED reference streams.
 282      */
 283     private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T, T> {
 284         private final Comparator<? super T> comparator;
 285         private T[] array;
 286         private int offset;
 287 
 288         SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
 289             super(sink);
 290             this.comparator = comparator;
 291         }
 292 
 293         @Override
 294         public void begin(long size) {
 295             if (size >= Nodes.MAX_ARRAY_SIZE)
 296                 throw new IllegalArgumentException("Stream size exceeds max array size");
 297             array = (T[]) new Object[(int) size];
 298         }
 299 
 300         @Override
 301         public void end() {
 302             // Need to use offset rather than array.length since the downstream
 303             // many be short-circuiting
 304             // @@@ A better approach is to know if the downstream short-circuits
 305             //     and check sink.cancellationRequested
 306             Arrays.sort(array, 0, offset, comparator);
 307             downstream.begin(offset);
 308             for (int i = 0; i < offset; i++)
 309                 downstream.accept(array[i]);
 310             downstream.end();
 311             array = null;
 312         }
 313 
 314         @Override
 315         public void accept(T t) {
 316             array[offset++] = t;
 317         }
 318     }
 319 
 320     /**
 321      * {@link Sink} for implementing sort on reference streams.
 322      */
 323     private static final class RefSortingSink<T> extends Sink.ChainedReference<T, T> {
 324         private final Comparator<? super T> comparator;
 325         private ArrayList<T> list;
 326 
 327         RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
 328             super(sink);
 329             this.comparator = comparator;
 330         }
 331 
 332         @Override
 333         public void begin(long size) {
 334             list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
 335         }
 336 
 337         @Override
 338         public void end() {
 339             list.sort(comparator);
 340             downstream.begin(list.size());
 341             list.forEach(downstream::accept);
 342             downstream.end();
 343             list = null;
 344         }
 345 
 346         @Override
 347         public void accept(T t) {
 348             list.add(t);
 349         }
 350     }
 351 
 352     /**
 353      * {@link Sink} for implementing sort on SIZED int streams.
 354      */
 355     private static final class SizedIntSortingSink extends Sink.ChainedInt<Integer> {
 356         private int[] array;
 357         private int offset;
 358 
 359         SizedIntSortingSink(Sink<? super Integer> downstream) {
 360             super(downstream);
 361         }
 362 
 363         @Override
 364         public void begin(long size) {
 365             if (size >= Nodes.MAX_ARRAY_SIZE)
 366                 throw new IllegalArgumentException("Stream size exceeds max array size");
 367             array = new int[(int) size];
 368         }
 369 
 370         @Override
 371         public void end() {
 372             Arrays.sort(array, 0, offset);
 373             downstream.begin(offset);
 374             for (int i = 0; i < offset; i++)
 375                 downstream.accept(array[i]);
 376             downstream.end();
 377             array = null;
 378         }
 379 
 380         @Override
 381         public void accept(int t) {
 382             array[offset++] = t;
 383         }
 384     }
 385 
 386     /**
 387      * {@link Sink} for implementing sort on int streams.
 388      */
 389     private static final class IntSortingSink extends Sink.ChainedInt<Integer> {
 390         private SpinedBuffer.OfInt b;
 391 
 392         IntSortingSink(Sink<? super Integer> sink) {
 393             super(sink);
 394         }
 395 
 396         @Override
 397         public void begin(long size) {
 398             b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
 399         }
 400 
 401         @Override
 402         public void end() {
 403             int[] ints = b.asPrimitiveArray();
 404             Arrays.sort(ints);
 405             downstream.begin(ints.length);
 406             for (int anInt : ints)
 407                 downstream.accept(anInt);
 408             downstream.end();
 409         }
 410 
 411         @Override
 412         public void accept(int t) {
 413             b.accept(t);
 414         }
 415     }
 416 
 417     /**
 418      * {@link Sink} for implementing sort on SIZED long streams.
 419      */
 420     private static final class SizedLongSortingSink extends Sink.ChainedLong<Long> {
 421         private long[] array;
 422         private int offset;
 423 
 424         SizedLongSortingSink(Sink<? super Long> downstream) {
 425             super(downstream);
 426         }
 427 
 428         @Override
 429         public void begin(long size) {
 430             if (size >= Nodes.MAX_ARRAY_SIZE)
 431                 throw new IllegalArgumentException("Stream size exceeds max array size");
 432             array = new long[(int) size];
 433         }
 434 
 435         @Override
 436         public void end() {
 437             Arrays.sort(array, 0, offset);
 438             downstream.begin(offset);
 439             for (int i = 0; i < offset; i++)
 440                 downstream.accept(array[i]);
 441             downstream.end();
 442             array = null;
 443         }
 444 
 445         @Override
 446         public void accept(long t) {
 447             array[offset++] = t;
 448         }
 449     }
 450 
 451     /**
 452      * {@link Sink} for implementing sort on long streams.
 453      */
 454     private static final class LongSortingSink extends Sink.ChainedLong<Long> {
 455         private SpinedBuffer.OfLong b;
 456 
 457         LongSortingSink(Sink<? super Long> sink) {
 458             super(sink);
 459         }
 460 
 461         @Override
 462         public void begin(long size) {
 463             b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
 464         }
 465 
 466         @Override
 467         public void end() {
 468             long[] longs = b.asPrimitiveArray();
 469             Arrays.sort(longs);
 470             downstream.begin(longs.length);
 471             for (long aLong : longs)
 472                 downstream.accept(aLong);
 473             downstream.end();
 474         }
 475 
 476         @Override
 477         public void accept(long t) {
 478             b.accept(t);
 479         }
 480     }
 481 
 482     /**
 483      * {@link Sink} for implementing sort on SIZED double streams.
 484      */
 485     private static final class SizedDoubleSortingSink extends Sink.ChainedDouble<Double> {
 486         private double[] array;
 487         private int offset;
 488 
 489         SizedDoubleSortingSink(Sink<? super Double> downstream) {
 490             super(downstream);
 491         }
 492 
 493         @Override
 494         public void begin(long size) {
 495             if (size >= Nodes.MAX_ARRAY_SIZE)
 496                 throw new IllegalArgumentException("Stream size exceeds max array size");
 497             array = new double[(int) size];
 498         }
 499 
 500         @Override
 501         public void end() {
 502             Arrays.sort(array, 0, offset);
 503             downstream.begin(offset);
 504             for (int i = 0; i < offset; i++)
 505                 downstream.accept(array[i]);
 506             downstream.end();
 507             array = null;
 508         }
 509 
 510         @Override
 511         public void accept(double t) {
 512             array[offset++] = t;
 513         }
 514     }
 515 
 516     /**
 517      * {@link Sink} for implementing sort on double streams.
 518      */
 519     private static final class DoubleSortingSink extends Sink.ChainedDouble<Double> {
 520         private SpinedBuffer.OfDouble b;
 521 
 522         DoubleSortingSink(Sink<? super Double> sink) {
 523             super(sink);
 524         }
 525 
 526         @Override
 527         public void begin(long size) {
 528             b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
 529         }
 530 
 531         @Override
 532         public void end() {
 533             double[] doubles = b.asPrimitiveArray();
 534             Arrays.sort(doubles);
 535             downstream.begin(doubles.length);
 536             for (double aDouble : doubles)
 537                 downstream.accept(aDouble);
 538             downstream.end();
 539         }
 540 
 541         @Override
 542         public void accept(double t) {