1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.ArrayList;
  28 import java.util.Arrays;
  29 import java.util.Comparator;
  30 import java.util.Comparators;
  31 import java.util.Objects;
  32 import java.util.Spliterator;
  33 import java.util.concurrent.ForkJoinTask;
  34 import java.util.function.IntFunction;
  35 
  36 
  37 /**
  38  * Factory methods for transforming streams into sorted streams.
  39  *
  40  * @since 1.8
  41  */
  42 final class SortedOps {
  43 
  44     private SortedOps() { }
  45 
  46     /**
  47      * Appends a "sorted" operation to the provided stream.
  48      *
  49      * @param <T> The type of both input and output elements
  50      * @param upstream A reference stream with element type T
  51      */
  52     static<T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
  53         return new OfRef<>(upstream);
  54     }
  55 
  56     /**
  57      * Appends a "sorted" operation to the provided stream.
  58      *
  59      * @param <T> The type of both input and output elements
  60      * @param upstream A reference stream with element type T
  61      * @param comparator the comparator to order elements by
  62      */
  63     static<T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
  64                                 Comparator<? super T> comparator) {
  65         return new OfRef<>(upstream, comparator);
  66     }
  67 
  68     /**
  69      * Appends a "sorted" operation to the provided stream.
  70      *
  71      * @param <T> The type of both input and output elements
  72      * @param upstream A reference stream with element type T
  73      */
  74     static<T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {
  75         return new OfInt(upstream);
  76     }
  77 
  78     /**
  79      * Appends a "sorted" operation to the provided stream.
  80      *
  81      * @param <T> The type of both input and output elements
  82      * @param upstream A reference stream with element type T
  83      */
  84     static<T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
  85         return new OfLong(upstream);
  86     }
  87 
  88     /**
  89      * Appends a "sorted" operation to the provided stream.
  90      *
  91      * @param <T> The type of both input and output elements
  92      * @param upstream A reference stream with element type T
  93      */
  94     static<T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
  95         return new OfDouble(upstream);
  96     }
  97 
  98     /** Specialized subtype for sorting reference streams */
  99     private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
 100         /** Comparator used for sorting */
 101         private final boolean isNaturalSort;
 102         private final Comparator<? super T> comparator;
 103 
 104         /**
 105          * Sort using natural order of {@literal <T>} which must be
 106          * {@code Comparable}.
 107          */
 108         OfRef(AbstractPipeline<?, T, ?> upstream) {
 109             super(upstream, StreamShape.REFERENCE,
 110                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 111             this.isNaturalSort = true;
 112             // Will throw CCE when we try to sort if T is not Comparable
 113             this.comparator = (Comparator<? super T>) Comparators.naturalOrder();
 114         }
 115 
 116         /**
 117          * Sort using the provided comparator.
 118          *
 119          * @param comparator The comparator to be used to evaluate ordering.
 120          */
 121         OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
 122             super(upstream, StreamShape.REFERENCE,
 123                   StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
 124             this.isNaturalSort = false;
 125             this.comparator = Objects.requireNonNull(comparator);
 126         }
 127 
 128         @Override
 129         public Sink<T> opWrapSink(int flags, Sink sink) {
 130             Objects.requireNonNull(sink);
 131 
 132             // If the input is already naturally sorted and this operation
 133             // also naturally sorted then this is a no-op
 134             if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
 135                 return sink;
 136             else if (StreamOpFlag.SIZED.isKnown(flags))
 137                 return new SizedRefSortingSink<>(sink, comparator);
 138             else
 139                 return new RefSortingSink<>(sink, comparator);
 140         }
 141 
 142         @Override
 143         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 144                                                  Spliterator<P_IN> spliterator,
 145                                                  IntFunction<T[]> generator) {
 146             // If the input is already naturally sorted and this operation
 147             // naturally sorts then collect the output
 148             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
 149                 return helper.evaluate(spliterator, false, generator);
 150             }
 151             else {
 152                 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
 153                 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
 154                 Arrays.parallelSort(flattenedData, comparator);
 155                 return Nodes.node(flattenedData);
 156             }
 157         }
 158     }
 159 
 160     /** Specialized subtype for sorting int streams */
 161     private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
 162         OfInt(AbstractPipeline<?, Integer, ?> upstream) {
 163             super(upstream, StreamShape.INT_VALUE,
 164                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 165         }
 166 
 167         @Override
 168         public Sink<Integer> opWrapSink(int flags, Sink sink) {
 169             Objects.requireNonNull(sink);
 170 
 171             if (StreamOpFlag.SORTED.isKnown(flags))
 172                 return sink;
 173             else if (StreamOpFlag.SIZED.isKnown(flags))
 174                 return new SizedIntSortingSink(sink);
 175             else
 176                 return new IntSortingSink(sink);
 177         }
 178 
 179         @Override
 180         public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
 181                                                        Spliterator<P_IN> spliterator,
 182                                                        IntFunction<Integer[]> generator) {
 183             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 184                 return helper.evaluate(spliterator, false, generator);
 185             }
 186             else {
 187                 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
 188 
 189                 int[] content = n.asIntArray();
 190                 Arrays.parallelSort(content);
 191 
 192                 return Nodes.node(content);
 193             }
 194         }
 195     }
 196 
 197     /** Specialized subtype for sorting long streams */
 198     private static final class OfLong extends LongPipeline.StatefulOp<Long> {
 199         OfLong(AbstractPipeline<?, Long, ?> upstream) {
 200             super(upstream, StreamShape.LONG_VALUE,
 201                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 202         }
 203 
 204         @Override
 205         public Sink<Long> opWrapSink(int flags, Sink sink) {
 206             Objects.requireNonNull(sink);
 207 
 208             if (StreamOpFlag.SORTED.isKnown(flags))
 209                 return sink;
 210             else if (StreamOpFlag.SIZED.isKnown(flags))
 211                 return new SizedLongSortingSink(sink);
 212             else
 213                 return new LongSortingSink(sink);
 214         }
 215 
 216         @Override
 217         public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
 218                                                     Spliterator<P_IN> spliterator,
 219                                                     IntFunction<Long[]> generator) {
 220             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 221                 return helper.evaluate(spliterator, false, generator);
 222             }
 223             else {
 224                 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);
 225 
 226                 long[] content = n.asLongArray();
 227                 Arrays.parallelSort(content);
 228 
 229                 return Nodes.node(content);
 230             }
 231         }
 232     }
 233 
 234     /** Specialized subtype for sorting double streams */
 235     private static final class OfDouble extends DoublePipeline.StatefulOp<Double> {
 236         OfDouble(AbstractPipeline<?, Double, ?> upstream) {
 237             super(upstream, StreamShape.DOUBLE_VALUE,
 238                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 239         }
 240 
 241         @Override
 242         public Sink<Double> opWrapSink(int flags, Sink sink) {
 243             Objects.requireNonNull(sink);
 244 
 245             if (StreamOpFlag.SORTED.isKnown(flags))
 246                 return sink;
 247             else if (StreamOpFlag.SIZED.isKnown(flags))
 248                 return new SizedDoubleSortingSink(sink);
 249             else
 250                 return new DoubleSortingSink(sink);
 251         }
 252 
 253         @Override
 254         public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 255                                                       Spliterator<P_IN> spliterator,
 256                                                       IntFunction<Double[]> generator) {
 257             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 258                 return helper.evaluate(spliterator, false, generator);
 259             }
 260             else {
 261                 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
 262 
 263                 double[] content = n.asDoubleArray();
 264                 Arrays.parallelSort(content);
 265 
 266                 return Nodes.node(content);
 267             }
 268         }
 269     }
 270 
 271     /** {@link ForkJoinTask} for implementing sort on SIZED reference streams */
 272     private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T> {
 273         private final Comparator<? super T> comparator;
 274         private T[] array;
 275         private int offset;
 276 
 277         SizedRefSortingSink(Sink sink, Comparator<? super T> comparator) {
 278             super(sink);
 279             this.comparator = comparator;
 280         }
 281 
 282         @Override
 283         public void begin(long size) {
 284             if (size >= Nodes.MAX_ARRAY_SIZE)
 285                 throw new IllegalArgumentException("Stream size exceeds max array size");
 286             array = (T[]) new Object[(int) size];
 287         }
 288 
 289         @Override
 290         public void end() {
 291             // Need to use offset rather than array.length since the downstream
 292             // many be short-circuiting
 293             // @@@ A better approach is to know if the downstream short-circuits
 294             //     and check sink.cancellationRequested
 295             Arrays.sort(array, 0, offset, comparator);
 296             downstream.begin(offset);
 297             for (int i = 0; i < offset; i++)
 298                 downstream.accept(array[i]);
 299             downstream.end();
 300             array = null;
 301         }
 302 
 303         @Override
 304         public void accept(T t) {
 305             array[offset++] = t;
 306         }
 307     }
 308 
 309     /** {@link Sink} for implementing sort on reference streams */
 310     private static final class RefSortingSink<T> extends Sink.ChainedReference<T> {
 311         private final Comparator<? super T> comparator;
 312         private ArrayList<T> list;
 313 
 314         RefSortingSink(Sink sink, Comparator<? super T> comparator) {
 315             super(sink);
 316             this.comparator = comparator;
 317         }
 318 
 319         @Override
 320         public void begin(long size) {
 321             list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
 322         }
 323 
 324         @Override
 325         public void end() {
 326             list.sort(comparator);
 327             downstream.begin(list.size());
 328             list.forEach(downstream::accept);
 329             downstream.end();
 330             list = null;
 331         }
 332 
 333         @Override
 334         public void accept(T t) {
 335             list.add(t);
 336         }
 337     }
 338 
 339     /** {@link Sink} for implementing sort on SIZED int streams */
 340     private static final class SizedIntSortingSink extends Sink.ChainedInt {
 341         private int[] array;
 342         private int offset;
 343 
 344         SizedIntSortingSink(Sink downstream) {
 345             super(downstream);
 346         }
 347 
 348         @Override
 349         public void begin(long size) {
 350             if (size >= Nodes.MAX_ARRAY_SIZE)
 351                 throw new IllegalArgumentException("Stream size exceeds max array size");
 352             array = new int[(int) size];
 353         }
 354 
 355         @Override
 356         public void end() {
 357             Arrays.sort(array, 0, offset);
 358             downstream.begin(offset);
 359             for (int i = 0; i < offset; i++)
 360                 downstream.accept(array[i]);
 361             downstream.end();
 362             array = null;
 363         }
 364 
 365         @Override
 366         public void accept(int t) {
 367             array[offset++] = t;
 368         }
 369     }
 370 
 371     /** {@link Sink} for implementing sort on int streams */
 372     private static final class IntSortingSink extends Sink.ChainedInt {
 373         private SpinedBuffer.OfInt b;
 374 
 375         IntSortingSink(Sink sink) {
 376             super(sink);
 377         }
 378 
 379         @Override
 380         public void begin(long size) {
 381             b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
 382         }
 383 
 384         @Override
 385         public void end() {
 386             int[] ints = b.asIntArray();
 387             Arrays.sort(ints);
 388             downstream.begin(ints.length);
 389             for (int anInt : ints)
 390                 downstream.accept(anInt);
 391             downstream.end();
 392         }
 393 
 394         @Override
 395         public void accept(int t) {
 396             b.accept(t);
 397         }
 398     }
 399 
 400     /** {@link Sink} for implementing sort on SIZED long streams */
 401     private static final class SizedLongSortingSink extends Sink.ChainedLong {
 402         private long[] array;
 403         private int offset;
 404 
 405         SizedLongSortingSink(Sink downstream) {
 406             super(downstream);
 407         }
 408 
 409         @Override
 410         public void begin(long size) {
 411             if (size >= Nodes.MAX_ARRAY_SIZE)
 412                 throw new IllegalArgumentException("Stream size exceeds max array size");
 413             array = new long[(int) size];
 414         }
 415 
 416         @Override
 417         public void end() {
 418             Arrays.sort(array, 0, offset);
 419             downstream.begin(offset);
 420             for (int i = 0; i < offset; i++)
 421                 downstream.accept(array[i]);
 422             downstream.end();
 423             array = null;
 424         }
 425 
 426         @Override
 427         public void accept(long t) {
 428             array[offset++] = t;
 429         }
 430     }
 431 
 432     /** {@link Sink} for implementing sort on long streams */
 433     private static final class LongSortingSink extends Sink.ChainedLong {
 434         private SpinedBuffer.OfLong b;
 435 
 436         LongSortingSink(Sink sink) {
 437             super(sink);
 438         }
 439 
 440         @Override
 441         public void begin(long size) {
 442             b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
 443         }
 444 
 445         @Override
 446         public void end() {
 447             long[] longs = b.asLongArray();
 448             Arrays.sort(longs);
 449             downstream.begin(longs.length);
 450             for (long aLong : longs)
 451                 downstream.accept(aLong);
 452             downstream.end();
 453         }
 454 
 455         @Override
 456         public void accept(long t) {
 457             b.accept(t);
 458         }
 459     }
 460 
 461     /** {@link Sink} for implementing sort on SIZED double streams */
 462     private static final class SizedDoubleSortingSink extends Sink.ChainedDouble {
 463         private double[] array;
 464         private int offset;
 465 
 466         SizedDoubleSortingSink(Sink downstream) {
 467             super(downstream);
 468         }
 469 
 470         @Override
 471         public void begin(long size) {
 472             if (size >= Nodes.MAX_ARRAY_SIZE)
 473                 throw new IllegalArgumentException("Stream size exceeds max array size");
 474             array = new double[(int) size];
 475         }
 476 
 477         @Override
 478         public void end() {
 479             Arrays.sort(array, 0, offset);
 480             downstream.begin(offset);
 481             for (int i = 0; i < offset; i++)
 482                 downstream.accept(array[i]);
 483             downstream.end();
 484             array = null;
 485         }
 486 
 487         @Override
 488         public void accept(double t) {
 489             array[offset++] = t;
 490         }
 491     }
 492 
 493     /** {@link Sink} for implementing sort on double streams */
 494     private static final class DoubleSortingSink extends Sink.ChainedDouble {
 495         private SpinedBuffer.OfDouble b;
 496 
 497         DoubleSortingSink(Sink sink) {
 498             super(sink);
 499         }
 500 
 501         @Override
 502         public void begin(long size) {
 503             b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
 504         }
 505 
 506         @Override
 507         public void end() {
 508             double[] doubles = b.asDoubleArray();
 509             Arrays.sort(doubles);
 510             downstream.begin(doubles.length);
 511             for (double aDouble : doubles)
 512                 downstream.accept(aDouble);
 513             downstream.end();
 514         }
 515 
 516         @Override
 517         public void accept(double t) {
 518             b.accept(t);
 519         }
 520     }
 521 }