1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.ArrayList;
  28 import java.util.Arrays;
  29 import java.util.Comparator;
  30 import java.util.Comparators;
  31 import java.util.Objects;
  32 import java.util.Spliterator;
  33 import java.util.concurrent.ForkJoinTask;
  34 import java.util.function.IntFunction;
  35 
  36 
  37 /**
  38  * Factory methods for transforming streams into sorted streams.
  39  *
  40  * @since 1.8
  41  */
  42 final class SortedOps {
  43 
  44     private SortedOps() { }
  45 
  46     /**
  47      * Appends a "sorted" operation to the provided stream.
  48      *
  49      * @param <T> the type of both input and output elements
  50      * @param upstream a reference stream with element type T
  51      */
  52     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
  53         return new OfRef<>(upstream);
  54     }
  55 
  56     /**
  57      * Appends a "sorted" operation to the provided stream.
  58      *
  59      * @param <T> the type of both input and output elements
  60      * @param upstream a reference stream with element type T
  61      * @param comparator the comparator to order elements by
  62      */
  63     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
  64                                 Comparator<? super T> comparator) {
  65         return new OfRef<>(upstream, comparator);
  66     }
  67 
  68     /**
  69      * Appends a "sorted" operation to the provided stream.
  70      *
  71      * @param <T> the type of both input and output elements
  72      * @param upstream a reference stream with element type T
  73      */
  74     static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {
  75         return new OfInt(upstream);
  76     }
  77 
  78     /**
  79      * Appends a "sorted" operation to the provided stream.
  80      *
  81      * @param <T> the type of both input and output elements
  82      * @param upstream a reference stream with element type T
  83      */
  84     static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
  85         return new OfLong(upstream);
  86     }
  87 
  88     /**
  89      * Appends a "sorted" operation to the provided stream.
  90      *
  91      * @param <T> the type of both input and output elements
  92      * @param upstream a reference stream with element type T
  93      */
  94     static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
  95         return new OfDouble(upstream);
  96     }
  97 
  98     /**
  99      * Specialized subtype for sorting reference streams
 100      */
 101     private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
 102         /**
 103          * Comparator used for sorting
 104          */
 105         private final boolean isNaturalSort;
 106         private final Comparator<? super T> comparator;
 107 
 108         /**
 109          * Sort using natural order of {@literal <T>} which must be
 110          * {@code Comparable}.
 111          */
 112         OfRef(AbstractPipeline<?, T, ?> upstream) {
 113             super(upstream, StreamShape.REFERENCE,
 114                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 115             this.isNaturalSort = true;
 116             // Will throw CCE when we try to sort if T is not Comparable
 117             this.comparator = (Comparator<? super T>) Comparators.naturalOrder();
 118         }
 119 
 120         /**
 121          * Sort using the provided comparator.
 122          *
 123          * @param comparator The comparator to be used to evaluate ordering.
 124          */
 125         OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
 126             super(upstream, StreamShape.REFERENCE,
 127                   StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
 128             this.isNaturalSort = false;
 129             this.comparator = Objects.requireNonNull(comparator);
 130         }
 131 
 132         @Override
 133         public Sink<T> opWrapSink(int flags, Sink sink) {
 134             Objects.requireNonNull(sink);
 135 
 136             // If the input is already naturally sorted and this operation
 137             // also naturally sorted then this is a no-op
 138             if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
 139                 return sink;
 140             else if (StreamOpFlag.SIZED.isKnown(flags))
 141                 return new SizedRefSortingSink<>(sink, comparator);
 142             else
 143                 return new RefSortingSink<>(sink, comparator);
 144         }
 145 
 146         @Override
 147         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 148                                                  Spliterator<P_IN> spliterator,
 149                                                  IntFunction<T[]> generator) {
 150             // If the input is already naturally sorted and this operation
 151             // naturally sorts then collect the output
 152             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
 153                 return helper.evaluate(spliterator, false, generator);
 154             }
 155             else {
 156                 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
 157                 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
 158                 Arrays.parallelSort(flattenedData, comparator);
 159                 return Nodes.node(flattenedData);
 160             }
 161         }
 162     }
 163 
 164     /**
 165      * Specialized subtype for sorting int streams.
 166      */
 167     private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
 168         OfInt(AbstractPipeline<?, Integer, ?> upstream) {
 169             super(upstream, StreamShape.INT_VALUE,
 170                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 171         }
 172 
 173         @Override
 174         public Sink<Integer> opWrapSink(int flags, Sink sink) {
 175             Objects.requireNonNull(sink);
 176 
 177             if (StreamOpFlag.SORTED.isKnown(flags))
 178                 return sink;
 179             else if (StreamOpFlag.SIZED.isKnown(flags))
 180                 return new SizedIntSortingSink(sink);
 181             else
 182                 return new IntSortingSink(sink);
 183         }
 184 
 185         @Override
 186         public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
 187                                                        Spliterator<P_IN> spliterator,
 188                                                        IntFunction<Integer[]> generator) {
 189             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 190                 return helper.evaluate(spliterator, false, generator);
 191             }
 192             else {
 193                 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
 194 
 195                 int[] content = n.asIntArray();
 196                 Arrays.parallelSort(content);
 197 
 198                 return Nodes.node(content);
 199             }
 200         }
 201     }
 202 
 203     /**
 204      * Specialized subtype for sorting long streams.
 205      */
 206     private static final class OfLong extends LongPipeline.StatefulOp<Long> {
 207         OfLong(AbstractPipeline<?, Long, ?> upstream) {
 208             super(upstream, StreamShape.LONG_VALUE,
 209                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 210         }
 211 
 212         @Override
 213         public Sink<Long> opWrapSink(int flags, Sink sink) {
 214             Objects.requireNonNull(sink);
 215 
 216             if (StreamOpFlag.SORTED.isKnown(flags))
 217                 return sink;
 218             else if (StreamOpFlag.SIZED.isKnown(flags))
 219                 return new SizedLongSortingSink(sink);
 220             else
 221                 return new LongSortingSink(sink);
 222         }
 223 
 224         @Override
 225         public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
 226                                                     Spliterator<P_IN> spliterator,
 227                                                     IntFunction<Long[]> generator) {
 228             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 229                 return helper.evaluate(spliterator, false, generator);
 230             }
 231             else {
 232                 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);
 233 
 234                 long[] content = n.asLongArray();
 235                 Arrays.parallelSort(content);
 236 
 237                 return Nodes.node(content);
 238             }
 239         }
 240     }
 241 
 242     /**
 243      * Specialized subtype for sorting double streams.
 244      */
 245     private static final class OfDouble extends DoublePipeline.StatefulOp<Double> {
 246         OfDouble(AbstractPipeline<?, Double, ?> upstream) {
 247             super(upstream, StreamShape.DOUBLE_VALUE,
 248                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 249         }
 250 
 251         @Override
 252         public Sink<Double> opWrapSink(int flags, Sink sink) {
 253             Objects.requireNonNull(sink);
 254 
 255             if (StreamOpFlag.SORTED.isKnown(flags))
 256                 return sink;
 257             else if (StreamOpFlag.SIZED.isKnown(flags))
 258                 return new SizedDoubleSortingSink(sink);
 259             else
 260                 return new DoubleSortingSink(sink);
 261         }
 262 
 263         @Override
 264         public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 265                                                       Spliterator<P_IN> spliterator,
 266                                                       IntFunction<Double[]> generator) {
 267             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 268                 return helper.evaluate(spliterator, false, generator);
 269             }
 270             else {
 271                 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
 272 
 273                 double[] content = n.asDoubleArray();
 274                 Arrays.parallelSort(content);
 275 
 276                 return Nodes.node(content);
 277             }
 278         }
 279     }
 280 
 281     /**
 282      * {@link ForkJoinTask} for implementing sort on SIZED reference streams.
 283      */
 284     private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T> {
 285         private final Comparator<? super T> comparator;
 286         private T[] array;
 287         private int offset;
 288 
 289         SizedRefSortingSink(Sink sink, Comparator<? super T> comparator) {
 290             super(sink);
 291             this.comparator = comparator;
 292         }
 293 
 294         @Override
 295         public void begin(long size) {
 296             if (size >= Nodes.MAX_ARRAY_SIZE)
 297                 throw new IllegalArgumentException("Stream size exceeds max array size");
 298             array = (T[]) new Object[(int) size];
 299         }
 300 
 301         @Override
 302         public void end() {
 303             // Need to use offset rather than array.length since the downstream
 304             // many be short-circuiting
 305             // @@@ A better approach is to know if the downstream short-circuits
 306             //     and check sink.cancellationRequested
 307             Arrays.sort(array, 0, offset, comparator);
 308             downstream.begin(offset);
 309             for (int i = 0; i < offset; i++)
 310                 downstream.accept(array[i]);
 311             downstream.end();
 312             array = null;
 313         }
 314 
 315         @Override
 316         public void accept(T t) {
 317             array[offset++] = t;
 318         }
 319     }
 320 
 321     /**
 322      * {@link Sink} for implementing sort on reference streams.
 323      */
 324     private static final class RefSortingSink<T> extends Sink.ChainedReference<T> {
 325         private final Comparator<? super T> comparator;
 326         private ArrayList<T> list;
 327 
 328         RefSortingSink(Sink sink, Comparator<? super T> comparator) {
 329             super(sink);
 330             this.comparator = comparator;
 331         }
 332 
 333         @Override
 334         public void begin(long size) {
 335             list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
 336         }
 337 
 338         @Override
 339         public void end() {
 340             list.sort(comparator);
 341             downstream.begin(list.size());
 342             list.forEach(downstream::accept);
 343             downstream.end();
 344             list = null;
 345         }
 346 
 347         @Override
 348         public void accept(T t) {
 349             list.add(t);
 350         }
 351     }
 352 
 353     /**
 354      * {@link Sink} for implementing sort on SIZED int streams.
 355      */
 356     private static final class SizedIntSortingSink extends Sink.ChainedInt {
 357         private int[] array;
 358         private int offset;
 359 
 360         SizedIntSortingSink(Sink downstream) {
 361             super(downstream);
 362         }
 363 
 364         @Override
 365         public void begin(long size) {
 366             if (size >= Nodes.MAX_ARRAY_SIZE)
 367                 throw new IllegalArgumentException("Stream size exceeds max array size");
 368             array = new int[(int) size];
 369         }
 370 
 371         @Override
 372         public void end() {
 373             Arrays.sort(array, 0, offset);
 374             downstream.begin(offset);
 375             for (int i = 0; i < offset; i++)
 376                 downstream.accept(array[i]);
 377             downstream.end();
 378             array = null;
 379         }
 380 
 381         @Override
 382         public void accept(int t) {
 383             array[offset++] = t;
 384         }
 385     }
 386 
 387     /**
 388      * {@link Sink} for implementing sort on int streams.
 389      */
 390     private static final class IntSortingSink extends Sink.ChainedInt {
 391         private SpinedBuffer.OfInt b;
 392 
 393         IntSortingSink(Sink sink) {
 394             super(sink);
 395         }
 396 
 397         @Override
 398         public void begin(long size) {
 399             b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
 400         }
 401 
 402         @Override
 403         public void end() {
 404             int[] ints = b.asIntArray();
 405             Arrays.sort(ints);
 406             downstream.begin(ints.length);
 407             for (int anInt : ints)
 408                 downstream.accept(anInt);
 409             downstream.end();
 410         }
 411 
 412         @Override
 413         public void accept(int t) {
 414             b.accept(t);
 415         }
 416     }
 417 
 418     /**
 419      * {@link Sink} for implementing sort on SIZED long streams.
 420      */
 421     private static final class SizedLongSortingSink extends Sink.ChainedLong {
 422         private long[] array;
 423         private int offset;
 424 
 425         SizedLongSortingSink(Sink downstream) {
 426             super(downstream);
 427         }
 428 
 429         @Override
 430         public void begin(long size) {
 431             if (size >= Nodes.MAX_ARRAY_SIZE)
 432                 throw new IllegalArgumentException("Stream size exceeds max array size");
 433             array = new long[(int) size];
 434         }
 435 
 436         @Override
 437         public void end() {
 438             Arrays.sort(array, 0, offset);
 439             downstream.begin(offset);
 440             for (int i = 0; i < offset; i++)
 441                 downstream.accept(array[i]);
 442             downstream.end();
 443             array = null;
 444         }
 445 
 446         @Override
 447         public void accept(long t) {
 448             array[offset++] = t;
 449         }
 450     }
 451 
 452     /**
 453      * {@link Sink} for implementing sort on long streams.
 454      */
 455     private static final class LongSortingSink extends Sink.ChainedLong {
 456         private SpinedBuffer.OfLong b;
 457 
 458         LongSortingSink(Sink sink) {
 459             super(sink);
 460         }
 461 
 462         @Override
 463         public void begin(long size) {
 464             b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
 465         }
 466 
 467         @Override
 468         public void end() {
 469             long[] longs = b.asLongArray();
 470             Arrays.sort(longs);
 471             downstream.begin(longs.length);
 472             for (long aLong : longs)
 473                 downstream.accept(aLong);
 474             downstream.end();
 475         }
 476 
 477         @Override
 478         public void accept(long t) {
 479             b.accept(t);
 480         }
 481     }
 482 
 483     /**
 484      * {@link Sink} for implementing sort on SIZED double streams.
 485      */
 486     private static final class SizedDoubleSortingSink extends Sink.ChainedDouble {
 487         private double[] array;
 488         private int offset;
 489 
 490         SizedDoubleSortingSink(Sink downstream) {
 491             super(downstream);
 492         }
 493 
 494         @Override
 495         public void begin(long size) {
 496             if (size >= Nodes.MAX_ARRAY_SIZE)
 497                 throw new IllegalArgumentException("Stream size exceeds max array size");
 498             array = new double[(int) size];
 499         }
 500 
 501         @Override
 502         public void end() {
 503             Arrays.sort(array, 0, offset);
 504             downstream.begin(offset);
 505             for (int i = 0; i < offset; i++)
 506                 downstream.accept(array[i]);
 507             downstream.end();
 508             array = null;
 509         }
 510 
 511         @Override
 512         public void accept(double t) {
 513             array[offset++] = t;
 514         }
 515     }
 516 
 517     /**
 518      * {@link Sink} for implementing sort on double streams.
 519      */
 520     private static final class DoubleSortingSink extends Sink.ChainedDouble {
 521         private SpinedBuffer.OfDouble b;
 522 
 523         DoubleSortingSink(Sink sink) {
 524             super(sink);
 525         }
 526 
 527         @Override
 528         public void begin(long size) {
 529             b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
 530         }
 531 
 532         @Override
 533         public void end() {
 534             double[] doubles = b.asDoubleArray();
 535             Arrays.sort(doubles);
 536             downstream.begin(doubles.length);
 537             for (double aDouble : doubles)
 538                 downstream.accept(aDouble);
 539             downstream.end();
 540         }
 541 
 542         @Override
 543         public void accept(double t) {
 544             b.accept(t);
 545         }
 546     }
 547 }