1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.ArrayList;
  28 import java.util.Arrays;
  29 import java.util.Comparator;
  30 import java.util.Objects;
  31 import java.util.Spliterator;
  32 import java.util.concurrent.ForkJoinTask;
  33 import java.util.function.IntFunction;
  34 
  35 
  36 /**
  37  * Factory methods for transforming streams into sorted streams.
  38  *
  39  * @since 1.8
  40  */
  41 final class SortedOps {
  42 
  43     private SortedOps() { }
  44 
  45     /**
  46      * Appends a "sorted" operation to the provided stream.
  47      *
  48      * @param <T> the type of both input and output elements
  49      * @param upstream a reference stream with element type T
  50      */
  51     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
  52         return new OfRef<>(upstream);
  53     }
  54 
  55     /**
  56      * Appends a "sorted" operation to the provided stream.
  57      *
  58      * @param <T> the type of both input and output elements
  59      * @param upstream a reference stream with element type T
  60      * @param comparator the comparator to order elements by
  61      */
  62     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
  63                                 Comparator<? super T> comparator) {
  64         return new OfRef<>(upstream, comparator);
  65     }
  66 
  67     /**
  68      * Appends a "sorted" operation to the provided stream.
  69      *
  70      * @param <T> the type of both input and output elements
  71      * @param upstream a reference stream with element type T
  72      */
  73     static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {
  74         return new OfInt(upstream);
  75     }
  76 
  77     /**
  78      * Appends a "sorted" operation to the provided stream.
  79      *
  80      * @param <T> the type of both input and output elements
  81      * @param upstream a reference stream with element type T
  82      */
  83     static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
  84         return new OfLong(upstream);
  85     }
  86 
  87     /**
  88      * Appends a "sorted" operation to the provided stream.
  89      *
  90      * @param <T> the type of both input and output elements
  91      * @param upstream a reference stream with element type T
  92      */
  93     static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
  94         return new OfDouble(upstream);
  95     }
  96 
  97     /**
  98      * Specialized subtype for sorting reference streams
  99      */
 100     private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
 101         /**
 102          * Comparator used for sorting
 103          */
 104         private final boolean isNaturalSort;
 105         private final Comparator<? super T> comparator;
 106 
 107         /**
 108          * Sort using natural order of {@literal <T>} which must be
 109          * {@code Comparable}.
 110          */
 111         OfRef(AbstractPipeline<?, T, ?> upstream) {
 112             super(upstream, StreamShape.REFERENCE,
 113                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 114             this.isNaturalSort = true;
 115             // Will throw CCE when we try to sort if T is not Comparable
 116             this.comparator = (Comparator<? super T>) Comparator.naturalOrder();
 117         }
 118 
 119         /**
 120          * Sort using the provided comparator.
 121          *
 122          * @param comparator The comparator to be used to evaluate ordering.
 123          */
 124         OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
 125             super(upstream, StreamShape.REFERENCE,
 126                   StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
 127             this.isNaturalSort = false;
 128             this.comparator = Objects.requireNonNull(comparator);
 129         }
 130 
 131         @Override
 132         public Sink<T> opWrapSink(int flags, Sink sink) {
 133             Objects.requireNonNull(sink);
 134 
 135             // If the input is already naturally sorted and this operation
 136             // also naturally sorted then this is a no-op
 137             if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
 138                 return sink;
 139             else if (StreamOpFlag.SIZED.isKnown(flags))
 140                 return new SizedRefSortingSink<>(sink, comparator);
 141             else
 142                 return new RefSortingSink<>(sink, comparator);
 143         }
 144 
 145         @Override
 146         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 147                                                  Spliterator<P_IN> spliterator,
 148                                                  IntFunction<T[]> generator) {
 149             // If the input is already naturally sorted and this operation
 150             // naturally sorts then collect the output
 151             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
 152                 return helper.evaluate(spliterator, false, generator);
 153             }
 154             else {
 155                 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
 156                 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
 157                 Arrays.parallelSort(flattenedData, comparator);
 158                 return Nodes.node(flattenedData);
 159             }
 160         }
 161     }
 162 
 163     /**
 164      * Specialized subtype for sorting int streams.
 165      */
 166     private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
 167         OfInt(AbstractPipeline<?, Integer, ?> upstream) {
 168             super(upstream, StreamShape.INT_VALUE,
 169                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 170         }
 171 
 172         @Override
 173         public Sink<Integer> opWrapSink(int flags, Sink sink) {
 174             Objects.requireNonNull(sink);
 175 
 176             if (StreamOpFlag.SORTED.isKnown(flags))
 177                 return sink;
 178             else if (StreamOpFlag.SIZED.isKnown(flags))
 179                 return new SizedIntSortingSink(sink);
 180             else
 181                 return new IntSortingSink(sink);
 182         }
 183 
 184         @Override
 185         public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
 186                                                        Spliterator<P_IN> spliterator,
 187                                                        IntFunction<Integer[]> generator) {
 188             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 189                 return helper.evaluate(spliterator, false, generator);
 190             }
 191             else {
 192                 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
 193 
 194                 int[] content = n.asPrimitiveArray();
 195                 Arrays.parallelSort(content);
 196 
 197                 return Nodes.node(content);
 198             }
 199         }
 200     }
 201 
 202     /**
 203      * Specialized subtype for sorting long streams.
 204      */
 205     private static final class OfLong extends LongPipeline.StatefulOp<Long> {
 206         OfLong(AbstractPipeline<?, Long, ?> upstream) {
 207             super(upstream, StreamShape.LONG_VALUE,
 208                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 209         }
 210 
 211         @Override
 212         public Sink<Long> opWrapSink(int flags, Sink sink) {
 213             Objects.requireNonNull(sink);
 214 
 215             if (StreamOpFlag.SORTED.isKnown(flags))
 216                 return sink;
 217             else if (StreamOpFlag.SIZED.isKnown(flags))
 218                 return new SizedLongSortingSink(sink);
 219             else
 220                 return new LongSortingSink(sink);
 221         }
 222 
 223         @Override
 224         public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
 225                                                     Spliterator<P_IN> spliterator,
 226                                                     IntFunction<Long[]> generator) {
 227             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 228                 return helper.evaluate(spliterator, false, generator);
 229             }
 230             else {
 231                 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);
 232 
 233                 long[] content = n.asPrimitiveArray();
 234                 Arrays.parallelSort(content);
 235 
 236                 return Nodes.node(content);
 237             }
 238         }
 239     }
 240 
 241     /**
 242      * Specialized subtype for sorting double streams.
 243      */
 244     private static final class OfDouble extends DoublePipeline.StatefulOp<Double> {
 245         OfDouble(AbstractPipeline<?, Double, ?> upstream) {
 246             super(upstream, StreamShape.DOUBLE_VALUE,
 247                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 248         }
 249 
 250         @Override
 251         public Sink<Double> opWrapSink(int flags, Sink sink) {
 252             Objects.requireNonNull(sink);
 253 
 254             if (StreamOpFlag.SORTED.isKnown(flags))
 255                 return sink;
 256             else if (StreamOpFlag.SIZED.isKnown(flags))
 257                 return new SizedDoubleSortingSink(sink);
 258             else
 259                 return new DoubleSortingSink(sink);
 260         }
 261 
 262         @Override
 263         public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 264                                                       Spliterator<P_IN> spliterator,
 265                                                       IntFunction<Double[]> generator) {
 266             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 267                 return helper.evaluate(spliterator, false, generator);
 268             }
 269             else {
 270                 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
 271 
 272                 double[] content = n.asPrimitiveArray();
 273                 Arrays.parallelSort(content);
 274 
 275                 return Nodes.node(content);
 276             }
 277         }
 278     }
 279 
 280     /**
 281      * {@link ForkJoinTask} for implementing sort on SIZED reference streams.
 282      */
 283     private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T> {
 284         private final Comparator<? super T> comparator;
 285         private T[] array;
 286         private int offset;
 287 
 288         SizedRefSortingSink(Sink sink, Comparator<? super T> comparator) {
 289             super(sink);
 290             this.comparator = comparator;
 291         }
 292 
 293         @Override
 294         public void begin(long size) {
 295             if (size >= Nodes.MAX_ARRAY_SIZE)
 296                 throw new IllegalArgumentException("Stream size exceeds max array size");
 297             array = (T[]) new Object[(int) size];
 298         }
 299 
 300         @Override
 301         public void end() {
 302             // Need to use offset rather than array.length since the downstream
 303             // many be short-circuiting
 304             // @@@ A better approach is to know if the downstream short-circuits
 305             //     and check sink.cancellationRequested
 306             Arrays.sort(array, 0, offset, comparator);
 307             downstream.begin(offset);
 308             for (int i = 0; i < offset; i++)
 309                 downstream.accept(array[i]);
 310             downstream.end();
 311             array = null;
 312         }
 313 
 314         @Override
 315         public void accept(T t) {
 316             array[offset++] = t;
 317         }
 318     }
 319 
 320     /**
 321      * {@link Sink} for implementing sort on reference streams.
 322      */
 323     private static final class RefSortingSink<T> extends Sink.ChainedReference<T> {
 324         private final Comparator<? super T> comparator;
 325         private ArrayList<T> list;
 326 
 327         RefSortingSink(Sink sink, Comparator<? super T> comparator) {
 328             super(sink);
 329             this.comparator = comparator;
 330         }
 331 
 332         @Override
 333         public void begin(long size) {
 334             list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
 335         }
 336 
 337         @Override
 338         public void end() {
 339             list.sort(comparator);
 340             downstream.begin(list.size());
 341             list.forEach(downstream::accept);
 342             downstream.end();
 343             list = null;
 344         }
 345 
 346         @Override
 347         public void accept(T t) {
 348             list.add(t);
 349         }
 350     }
 351 
 352     /**
 353      * {@link Sink} for implementing sort on SIZED int streams.
 354      */
 355     private static final class SizedIntSortingSink extends Sink.ChainedInt {
 356         private int[] array;
 357         private int offset;
 358 
 359         SizedIntSortingSink(Sink downstream) {
 360             super(downstream);
 361         }
 362 
 363         @Override
 364         public void begin(long size) {
 365             if (size >= Nodes.MAX_ARRAY_SIZE)
 366                 throw new IllegalArgumentException("Stream size exceeds max array size");
 367             array = new int[(int) size];
 368         }
 369 
 370         @Override
 371         public void end() {
 372             Arrays.sort(array, 0, offset);
 373             downstream.begin(offset);
 374             for (int i = 0; i < offset; i++)
 375                 downstream.accept(array[i]);
 376             downstream.end();
 377             array = null;
 378         }
 379 
 380         @Override
 381         public void accept(int t) {
 382             array[offset++] = t;
 383         }
 384     }
 385 
 386     /**
 387      * {@link Sink} for implementing sort on int streams.
 388      */
 389     private static final class IntSortingSink extends Sink.ChainedInt {
 390         private SpinedBuffer.OfInt b;
 391 
 392         IntSortingSink(Sink sink) {
 393             super(sink);
 394         }
 395 
 396         @Override
 397         public void begin(long size) {
 398             b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
 399         }
 400 
 401         @Override
 402         public void end() {
 403             int[] ints = b.asPrimitiveArray();
 404             Arrays.sort(ints);
 405             downstream.begin(ints.length);
 406             for (int anInt : ints)
 407                 downstream.accept(anInt);
 408             downstream.end();
 409         }
 410 
 411         @Override
 412         public void accept(int t) {
 413             b.accept(t);
 414         }
 415     }
 416 
 417     /**
 418      * {@link Sink} for implementing sort on SIZED long streams.
 419      */
 420     private static final class SizedLongSortingSink extends Sink.ChainedLong {
 421         private long[] array;
 422         private int offset;
 423 
 424         SizedLongSortingSink(Sink downstream) {
 425             super(downstream);
 426         }
 427 
 428         @Override
 429         public void begin(long size) {
 430             if (size >= Nodes.MAX_ARRAY_SIZE)
 431                 throw new IllegalArgumentException("Stream size exceeds max array size");
 432             array = new long[(int) size];
 433         }
 434 
 435         @Override
 436         public void end() {
 437             Arrays.sort(array, 0, offset);
 438             downstream.begin(offset);
 439             for (int i = 0; i < offset; i++)
 440                 downstream.accept(array[i]);
 441             downstream.end();
 442             array = null;
 443         }
 444 
 445         @Override
 446         public void accept(long t) {
 447             array[offset++] = t;
 448         }
 449     }
 450 
 451     /**
 452      * {@link Sink} for implementing sort on long streams.
 453      */
 454     private static final class LongSortingSink extends Sink.ChainedLong {
 455         private SpinedBuffer.OfLong b;
 456 
 457         LongSortingSink(Sink sink) {
 458             super(sink);
 459         }
 460 
 461         @Override
 462         public void begin(long size) {
 463             b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
 464         }
 465 
 466         @Override
 467         public void end() {
 468             long[] longs = b.asPrimitiveArray();
 469             Arrays.sort(longs);
 470             downstream.begin(longs.length);
 471             for (long aLong : longs)
 472                 downstream.accept(aLong);
 473             downstream.end();
 474         }
 475 
 476         @Override
 477         public void accept(long t) {
 478             b.accept(t);
 479         }
 480     }
 481 
 482     /**
 483      * {@link Sink} for implementing sort on SIZED double streams.
 484      */
 485     private static final class SizedDoubleSortingSink extends Sink.ChainedDouble {
 486         private double[] array;
 487         private int offset;
 488 
 489         SizedDoubleSortingSink(Sink downstream) {
 490             super(downstream);
 491         }
 492 
 493         @Override
 494         public void begin(long size) {
 495             if (size >= Nodes.MAX_ARRAY_SIZE)
 496                 throw new IllegalArgumentException("Stream size exceeds max array size");
 497             array = new double[(int) size];
 498         }
 499 
 500         @Override
 501         public void end() {
 502             Arrays.sort(array, 0, offset);
 503             downstream.begin(offset);
 504             for (int i = 0; i < offset; i++)
 505                 downstream.accept(array[i]);
 506             downstream.end();
 507             array = null;
 508         }
 509 
 510         @Override
 511         public void accept(double t) {
 512             array[offset++] = t;
 513         }
 514     }
 515 
 516     /**
 517      * {@link Sink} for implementing sort on double streams.
 518      */
 519     private static final class DoubleSortingSink extends Sink.ChainedDouble {
 520         private SpinedBuffer.OfDouble b;
 521 
 522         DoubleSortingSink(Sink sink) {
 523             super(sink);
 524         }
 525 
 526         @Override
 527         public void begin(long size) {
 528             b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
 529         }
 530 
 531         @Override
 532         public void end() {
 533             double[] doubles = b.asPrimitiveArray();
 534             Arrays.sort(doubles);
 535             downstream.begin(doubles.length);
 536             for (double aDouble : doubles)
 537                 downstream.accept(aDouble);
 538             downstream.end();
 539         }
 540 
 541         @Override
 542         public void accept(double t) {
 543             b.accept(t);
 544         }
 545     }
 546 }