1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.ArrayList;
  28 import java.util.Arrays;
  29 import java.util.Comparator;
  30 import java.util.Objects;
  31 import java.util.Spliterator;
  32 import java.util.function.IntFunction;
  33 
  34 
  35 /**
  36  * Factory methods for transforming streams into sorted streams.
  37  *
  38  * @since 1.8
  39  */
  40 final class SortedOps {
  41 
  42     private SortedOps() { }
  43 
  44     /**
  45      * Appends a "sorted" operation to the provided stream.
  46      *
  47      * @param <T> the type of both input and output elements
  48      * @param upstream a reference stream with element type T
  49      */
  50     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
  51         return new OfRef<>(upstream);
  52     }
  53 
  54     /**
  55      * Appends a "sorted" operation to the provided stream.
  56      *
  57      * @param <T> the type of both input and output elements
  58      * @param upstream a reference stream with element type T
  59      * @param comparator the comparator to order elements by
  60      */
  61     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
  62                                 Comparator<? super T> comparator) {
  63         return new OfRef<>(upstream, comparator);
  64     }
  65 
  66     /**
  67      * Appends a "sorted" operation to the provided stream.
  68      *
  69      * @param <T> the type of both input and output elements
  70      * @param upstream a reference stream with element type T
  71      */
  72     static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {
  73         return new OfInt(upstream);
  74     }
  75 
  76     /**
  77      * Appends a "sorted" operation to the provided stream.
  78      *
  79      * @param <T> the type of both input and output elements
  80      * @param upstream a reference stream with element type T
  81      */
  82     static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
  83         return new OfLong(upstream);
  84     }
  85 
  86     /**
  87      * Appends a "sorted" operation to the provided stream.
  88      *
  89      * @param <T> the type of both input and output elements
  90      * @param upstream a reference stream with element type T
  91      */
  92     static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
  93         return new OfDouble(upstream);
  94     }
  95 
  96     static class Limiter<T> {
  97         private final T[] data;
  98         private final int limit;
  99         private final Comparator<? super T> comparator;
 100         private int size;
 101         private boolean initial = true;
 102     
 103         /**
 104          * @param limit how many least elements to keep
 105          * @param length length of the input if known, -1 otherwise
 106          * @param comparator
 107          */
 108         @SuppressWarnings("unchecked")
 109         Limiter(int limit, long length, Comparator<? super T> comparator) {
 110             this.limit = limit;
 111             this.comparator = comparator;
 112             int dataSize = limit * 2;
 113             if(length >= 0 && length < dataSize)
 114                 dataSize = (int) length;
 115             this.data = (T[]) new Object[dataSize];
 116         }
 117     
 118         /**
 119          * Accumulate new element
 120          * 
 121          * @param t element to accumulate
 122          * 
 123          * @return false if the element is definitely not included into result, so
 124          *         any bigger element could be skipped as well, or true if element
 125          *         will probably be included into result.
 126          */
 127         final boolean put(T t) {
 128             int l = limit;
 129             T[] d = data;
 130             if (l == 1) {
 131                 // limit == 1 is the special case: exactly one least element is stored, 
 132                 // no sorting is performed
 133                 if (initial) {
 134                     initial = false;
 135                     size = 1;
 136                 } else if (comparator.compare(t, d[0]) >= 0)
 137                     return false;
 138                 d[0] = t;
 139                 return true;
 140             }
 141             if (initial) {
 142                 if (size < d.length) {
 143                     d[size++] = t;
 144                     return true;
 145                 }
 146                 Arrays.sort(d, comparator);
 147                 initial = false;
 148                 size = l;
 149             }
 150             if (size == d.length) {
 151                 sortTail(d, l, size, comparator);
 152                 size = limit;
 153             }
 154             if (comparator.compare(t, d[l - 1]) < 0) {
 155                 d[size++] = t;
 156                 return true;
 157             }
 158             return false;
 159         }
 160     
 161         /**
 162          * Merge other {@code Limiter} object into this (other object becomes unusable after that).
 163          * 
 164          * @param other other object to merge
 165          * @return this object
 166          */
 167         Limiter<T> putAll(Limiter<T> other) {
 168             int i = 0, l = limit;
 169             T[] od = other.data;
 170             if (!other.initial) {
 171                 // sorted part
 172                 for (; i < l; i++) {
 173                     if (!put(od[i]))
 174                         break;
 175                 }
 176                 i = l;
 177             }
 178             int os = other.size;
 179             for (; i < os; i++) {
 180                 put(od[i]);
 181             }
 182             return this;
 183         }
 184     
 185         /**
 186          * Must be called after accumulation is finished.
 187          */
 188         void sort() {
 189             if (limit == 1)
 190                 return;
 191             if (initial)
 192                 Arrays.sort(data, 0, size, comparator);
 193             else if (size > limit)
 194                 sortTail(data, limit, size, comparator);
 195             if (size > limit)
 196                 size = limit;
 197         }
 198     
 199         /*
 200          * Sort data[limit]..data[size] and merge with presorted data[0]..data[limit-1]
 201          * Assuming that data[limit-1] is the biggest element
 202          */
 203         private static <T> void sortTail(T[] data, int limit, int size, Comparator<? super T> comparator) {
 204             assert size > limit;
 205             Arrays.sort(data, limit, size, comparator);
 206             if (comparator.compare(data[size - 1], data[0]) < 0) {
 207                 // Common case: descending sequence
 208                 System.arraycopy(data, 0, data, size - limit, 2 * limit - size);
 209                 System.arraycopy(data, limit, data, 0, size - limit);
 210             } else {
 211                 // Merge presorted 0..limit-1 and limit..size-1
 212                 @SuppressWarnings("unchecked")
 213                 T[] buf = (T[]) new Object[limit];
 214                 int i = 0, j = limit, k = 0;
 215                 // data[limit-1] is guaranteed to be the worst element, thus no need
 216                 // to check it
 217                 while (i < limit - 1 && k < limit && j < size) {
 218                     buf[k++] = comparator.compare(data[i], data[j]) <= 0 ? data[i++] : data[j++];   
 219                 }
 220                 if (k < limit) {
 221                     System.arraycopy(data, i < limit - 1 ? i : j, data, k, limit - k);
 222                 }
 223                 System.arraycopy(buf, 0, data, 0, k);
 224             }
 225         }
 226     }
 227 
 228     /**
 229      * Performs sorted().limit(maxSize) operation in single step for maxSize < SORTED_LIMIT_MAX_SIZE
 230      */
 231     private static final class SortedLimit<T> extends ReferencePipeline.StatefulOp<T, T> {
 232         static final int SORTED_LIMIT_MAX_SIZE = 1000;
 233 
 234         private final Comparator<? super T> comparator;
 235         private final int limit;
 236 
 237         SortedLimit(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator, int limit) {
 238             super(upstream, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED);
 239             this.comparator = comparator;
 240             this.limit = limit;
 241         }
 242 
 243         @Override
 244         public Sink<T> opWrapSink(int flags, Sink<T> sink) {
 245             Objects.requireNonNull(sink);
 246 
 247             return new AbstractRefSortingSink<T>(sink, comparator) {
 248                 private Limiter<T> limiter;
 249 
 250                 @Override
 251                 @SuppressWarnings("unchecked")
 252                 public void begin(long size) {
 253                     limiter = new Limiter<>(limit, size, comparator);
 254                 }
 255 
 256                 @Override
 257                 @SuppressWarnings("unchecked")
 258                 public void end() {
 259                     limiter.sort();
 260                     T[] array = limiter.data;
 261                     int offset = limiter.size;
 262                     downstream.begin(offset);
 263                     if (!cancellationWasRequested) {
 264                         for(int i = 0; i<offset; i++)
 265                             downstream.accept(array[i]);
 266                     }
 267                     else {
 268                         for(int i = 0; i<offset; i++) {
 269                             if (downstream.cancellationRequested()) break;
 270                             downstream.accept(array[i]);
 271                         }
 272                     }
 273                     downstream.end();
 274                 }
 275 
 276                 @Override
 277                 public void accept(T t) {
 278                     limiter.put(t);
 279                 }
 280             };
 281         }
 282 
 283         @Override
 284         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 285                                                  Spliterator<P_IN> spliterator,
 286                                                  IntFunction<T[]> generator) {
 287             TerminalOp<T, Limiter<T>> op = ReduceOps.<T, Limiter<T>>makeRef(
 288                 () -> new Limiter<>(limit, -1, comparator), Limiter::put, Limiter::putAll);
 289             Limiter<T> limiter = op.evaluateParallel(helper, spliterator);
 290             // No reason to use parallelSort here as 
 291             // limit <= SORTED_LIMIT_MAX_SIZE < MIN_ARRAY_SORT_GRAN
 292             limiter.sort();
 293             T[] array = limiter.data;
 294             int length = limiter.size;
 295             return Nodes.node(Arrays.copyOfRange(array, 0, length));
 296         }
 297     }
 298 
 299     /**
 300      * Specialized subtype for sorting reference streams
 301      */
 302     private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
 303         /**
 304          * Comparator used for sorting
 305          */
 306         private final boolean isNaturalSort;
 307         private final Comparator<? super T> comparator;
 308         private boolean skip;
 309 
 310         /**
 311          * Sort using natural order of {@literal <T>} which must be
 312          * {@code Comparable}.
 313          */
 314         OfRef(AbstractPipeline<?, T, ?> upstream) {
 315             super(upstream, StreamShape.REFERENCE,
 316                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 317             this.isNaturalSort = true;
 318             // Will throw CCE when we try to sort if T is not Comparable
 319             @SuppressWarnings("unchecked")
 320             Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
 321             this.comparator = comp;
 322         }
 323 
 324         /**
 325          * Sort using the provided comparator.
 326          *
 327          * @param comparator The comparator to be used to evaluate ordering.
 328          */
 329         OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
 330             super(upstream, StreamShape.REFERENCE,
 331                   StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
 332             this.isNaturalSort = false;
 333             this.comparator = Objects.requireNonNull(comparator);
 334         }
 335 
 336         @Override
 337         public final Stream<T> limit(long maxSize) {
 338             if(maxSize < 0 || maxSize > SortedLimit.SORTED_LIMIT_MAX_SIZE)
 339                 return super.limit(maxSize);
 340             skip = true;
 341             if(maxSize == 0)
 342                 return super.limit(maxSize);
 343             return new SortedLimit<>(this, comparator, (int)maxSize);
 344         }
 345 
 346         @Override
 347         public Sink<T> opWrapSink(int flags, Sink<T> sink) {
 348             Objects.requireNonNull(sink);
 349 
 350             // If the input is already naturally sorted and this operation
 351             // also naturally sorted then this is a no-op
 352             if (skip || (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort))
 353                 return sink;
 354             else if (StreamOpFlag.SIZED.isKnown(flags))
 355                 return new SizedRefSortingSink<>(sink, comparator);
 356             else
 357                 return new RefSortingSink<>(sink, comparator);
 358         }
 359 
 360         @Override
 361         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
 362                                                  Spliterator<P_IN> spliterator,
 363                                                  IntFunction<T[]> generator) {
 364             // If the input is already naturally sorted and this operation
 365             // naturally sorts then collect the output
 366             if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) {
 367                 return helper.evaluate(spliterator, false, generator);
 368             }
 369             else {
 370                 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
 371                 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
 372                 Arrays.parallelSort(flattenedData, comparator);
 373                 return Nodes.node(flattenedData);
 374             }
 375         }
 376 
 377         @Override
 378         <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
 379             // If the input is already naturally sorted and this operation
 380             // naturally sorts then perfrom a no-op
 381             if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) {
 382                 return helper.wrapSpliterator(spliterator);
 383             }
 384             else {
 385                 return super.opEvaluateParallelLazy(helper, spliterator);
 386             }
 387         }
 388     }
 389 
 390     /**
 391      * Specialized subtype for sorting int streams.
 392      */
 393     private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
 394         OfInt(AbstractPipeline<?, Integer, ?> upstream) {
 395             super(upstream, StreamShape.INT_VALUE,
 396                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 397         }
 398 
 399         @Override
 400         public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
 401             Objects.requireNonNull(sink);
 402 
 403             if (StreamOpFlag.SORTED.isKnown(flags))
 404                 return sink;
 405             else if (StreamOpFlag.SIZED.isKnown(flags))
 406                 return new SizedIntSortingSink(sink);
 407             else
 408                 return new IntSortingSink(sink);
 409         }
 410 
 411         @Override
 412         public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
 413                                                        Spliterator<P_IN> spliterator,
 414                                                        IntFunction<Integer[]> generator) {
 415             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 416                 return helper.evaluate(spliterator, false, generator);
 417             }
 418             else {
 419                 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
 420 
 421                 int[] content = n.asPrimitiveArray();
 422                 Arrays.parallelSort(content);
 423 
 424                 return Nodes.node(content);
 425             }
 426         }
 427     }
 428 
 429     /**
 430      * Specialized subtype for sorting long streams.
 431      */
 432     private static final class OfLong extends LongPipeline.StatefulOp<Long> {
 433         OfLong(AbstractPipeline<?, Long, ?> upstream) {
 434             super(upstream, StreamShape.LONG_VALUE,
 435                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 436         }
 437 
 438         @Override
 439         public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
 440             Objects.requireNonNull(sink);
 441 
 442             if (StreamOpFlag.SORTED.isKnown(flags))
 443                 return sink;
 444             else if (StreamOpFlag.SIZED.isKnown(flags))
 445                 return new SizedLongSortingSink(sink);
 446             else
 447                 return new LongSortingSink(sink);
 448         }
 449 
 450         @Override
 451         public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
 452                                                     Spliterator<P_IN> spliterator,
 453                                                     IntFunction<Long[]> generator) {
 454             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 455                 return helper.evaluate(spliterator, false, generator);
 456             }
 457             else {
 458                 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);
 459 
 460                 long[] content = n.asPrimitiveArray();
 461                 Arrays.parallelSort(content);
 462 
 463                 return Nodes.node(content);
 464             }
 465         }
 466     }
 467 
 468     /**
 469      * Specialized subtype for sorting double streams.
 470      */
 471     private static final class OfDouble extends DoublePipeline.StatefulOp<Double> {
 472         OfDouble(AbstractPipeline<?, Double, ?> upstream) {
 473             super(upstream, StreamShape.DOUBLE_VALUE,
 474                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
 475         }
 476 
 477         @Override
 478         public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
 479             Objects.requireNonNull(sink);
 480 
 481             if (StreamOpFlag.SORTED.isKnown(flags))
 482                 return sink;
 483             else if (StreamOpFlag.SIZED.isKnown(flags))
 484                 return new SizedDoubleSortingSink(sink);
 485             else
 486                 return new DoubleSortingSink(sink);
 487         }
 488 
 489         @Override
 490         public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
 491                                                       Spliterator<P_IN> spliterator,
 492                                                       IntFunction<Double[]> generator) {
 493             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
 494                 return helper.evaluate(spliterator, false, generator);
 495             }
 496             else {
 497                 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
 498 
 499                 double[] content = n.asPrimitiveArray();
 500                 Arrays.parallelSort(content);
 501 
 502                 return Nodes.node(content);
 503             }
 504         }
 505     }
 506 
 507     /**
 508      * Abstract {@link Sink} for implementing sort on reference streams.
 509      *
 510      * <p>
 511      * Note: documentation below applies to reference and all primitive sinks.
 512      * <p>
 513      * Sorting sinks first accept all elements, buffering then into an array
 514      * or a re-sizable data structure, if the size of the pipeline is known or
 515      * unknown respectively.  At the end of the sink protocol those elements are
 516      * sorted and then pushed downstream.
 517      * This class records if {@link #cancellationRequested} is called.  If so it
 518      * can be inferred that the source pushing source elements into the pipeline
 519      * knows that the pipeline is short-circuiting.  In such cases sub-classes
 520      * pushing elements downstream will preserve the short-circuiting protocol
 521      * by calling {@code downstream.cancellationRequested()} and checking the
 522      * result is {@code false} before an element is pushed.
 523      * <p>
 524      * Note that the above behaviour is an optimization for sorting with
 525      * sequential streams.  It is not an error that more elements, than strictly
 526      * required to produce a result, may flow through the pipeline.  This can
 527      * occur, in general (not restricted to just sorting), for short-circuiting
 528      * parallel pipelines.
 529      */
 530     private abstract static class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
 531         protected final Comparator<? super T> comparator;
 532         // @@@ could be a lazy final value, if/when support is added
 533         protected boolean cancellationWasRequested;
 534 
 535         AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
 536             super(downstream);
 537             this.comparator = comparator;
 538         }
 539 
 540         /**
 541          * Records is cancellation is requested so short-circuiting behaviour
 542          * can be preserved when the sorted elements are pushed downstream.
 543          *
 544          * @return false, as this sink never short-circuits.
 545          */
 546         @Override
 547         public final boolean cancellationRequested() {
 548             cancellationWasRequested = true;
 549             return false;
 550         }
 551     }
 552 
 553     /**
 554      * {@link Sink} for implementing sort on SIZED reference streams.
 555      */
 556     private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
 557         private T[] array;
 558         private int offset;
 559 
 560         SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
 561             super(sink, comparator);
 562         }
 563 
 564         @Override
 565         @SuppressWarnings("unchecked")
 566         public void begin(long size) {
 567             if (size >= Nodes.MAX_ARRAY_SIZE)
 568                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
 569             array = (T[]) new Object[(int) size];
 570         }
 571 
 572         @Override
 573         public void end() {
 574             Arrays.sort(array, 0, offset, comparator);
 575             downstream.begin(offset);
 576             if (!cancellationWasRequested) {
 577                 for (int i = 0; i < offset; i++)
 578                     downstream.accept(array[i]);
 579             }
 580             else {
 581                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
 582                     downstream.accept(array[i]);
 583             }
 584             downstream.end();
 585             array = null;
 586         }
 587 
 588         @Override
 589         public void accept(T t) {
 590             array[offset++] = t;
 591         }
 592     }
 593 
 594     /**
 595      * {@link Sink} for implementing sort on reference streams.
 596      */
 597     private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
 598         private ArrayList<T> list;
 599 
 600         RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
 601             super(sink, comparator);
 602         }
 603 
 604         @Override
 605         public void begin(long size) {
 606             if (size >= Nodes.MAX_ARRAY_SIZE)
 607                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
 608             list = (size >= 0) ? new ArrayList<>((int) size) : new ArrayList<>();
 609         }
 610 
 611         @Override
 612         public void end() {
 613             list.sort(comparator);
 614             downstream.begin(list.size());
 615             if (!cancellationWasRequested) {
 616                 list.forEach(downstream::accept);
 617             }
 618             else {
 619                 for (T t : list) {
 620                     if (downstream.cancellationRequested()) break;
 621                     downstream.accept(t);
 622                 }
 623             }
 624             downstream.end();
 625             list = null;
 626         }
 627 
 628         @Override
 629         public void accept(T t) {
 630             list.add(t);
 631         }
 632     }
 633 
 634     /**
 635      * Abstract {@link Sink} for implementing sort on int streams.
 636      */
 637     private abstract static class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {
 638         protected boolean cancellationWasRequested;
 639 
 640         AbstractIntSortingSink(Sink<? super Integer> downstream) {
 641             super(downstream);
 642         }
 643 
 644         @Override
 645         public final boolean cancellationRequested() {
 646             cancellationWasRequested = true;
 647             return false;
 648         }
 649     }
 650 
 651     /**
 652      * {@link Sink} for implementing sort on SIZED int streams.
 653      */
 654     private static final class SizedIntSortingSink extends AbstractIntSortingSink {
 655         private int[] array;
 656         private int offset;
 657 
 658         SizedIntSortingSink(Sink<? super Integer> downstream) {
 659             super(downstream);
 660         }
 661 
 662         @Override
 663         public void begin(long size) {
 664             if (size >= Nodes.MAX_ARRAY_SIZE)
 665                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
 666             array = new int[(int) size];
 667         }
 668 
 669         @Override
 670         public void end() {
 671             Arrays.sort(array, 0, offset);
 672             downstream.begin(offset);
 673             if (!cancellationWasRequested) {
 674                 for (int i = 0; i < offset; i++)
 675                     downstream.accept(array[i]);
 676             }
 677             else {
 678                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
 679                     downstream.accept(array[i]);
 680             }
 681             downstream.end();
 682             array = null;
 683         }
 684 
 685         @Override
 686         public void accept(int t) {
 687             array[offset++] = t;
 688         }
 689     }
 690 
 691     /**
 692      * {@link Sink} for implementing sort on int streams.
 693      */
 694     private static final class IntSortingSink extends AbstractIntSortingSink {
 695         private SpinedBuffer.OfInt b;
 696 
 697         IntSortingSink(Sink<? super Integer> sink) {
 698             super(sink);
 699         }
 700 
 701         @Override
 702         public void begin(long size) {
 703             if (size >= Nodes.MAX_ARRAY_SIZE)
 704                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
 705             b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
 706         }
 707 
 708         @Override
 709         public void end() {
 710             int[] ints = b.asPrimitiveArray();
 711             Arrays.sort(ints);
 712             downstream.begin(ints.length);
 713             if (!cancellationWasRequested) {
 714                 for (int anInt : ints)
 715                     downstream.accept(anInt);
 716             }
 717             else {
 718                 for (int anInt : ints) {
 719                     if (downstream.cancellationRequested()) break;
 720                     downstream.accept(anInt);
 721                 }
 722             }
 723             downstream.end();
 724         }
 725 
 726         @Override
 727         public void accept(int t) {
 728             b.accept(t);
 729         }
 730     }
 731 
 732     /**
 733      * Abstract {@link Sink} for implementing sort on long streams.
 734      */
 735     private abstract static class AbstractLongSortingSink extends Sink.ChainedLong<Long> {
 736         protected boolean cancellationWasRequested;
 737 
 738         AbstractLongSortingSink(Sink<? super Long> downstream) {
 739             super(downstream);
 740         }
 741 
 742         @Override
 743         public final boolean cancellationRequested() {
 744             cancellationWasRequested = true;
 745             return false;
 746         }
 747     }
 748 
 749     /**
 750      * {@link Sink} for implementing sort on SIZED long streams.
 751      */
 752     private static final class SizedLongSortingSink extends AbstractLongSortingSink {
 753         private long[] array;
 754         private int offset;
 755 
 756         SizedLongSortingSink(Sink<? super Long> downstream) {
 757             super(downstream);
 758         }
 759 
 760         @Override
 761         public void begin(long size) {
 762             if (size >= Nodes.MAX_ARRAY_SIZE)
 763                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
 764             array = new long[(int) size];
 765         }
 766 
 767         @Override
 768         public void end() {
 769             Arrays.sort(array, 0, offset);
 770             downstream.begin(offset);
 771             if (!cancellationWasRequested) {
 772                 for (int i = 0; i < offset; i++)
 773                     downstream.accept(array[i]);
 774             }
 775             else {
 776                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
 777                     downstream.accept(array[i]);
 778             }
 779             downstream.end();
 780             array = null;
 781         }
 782 
 783         @Override
 784         public void accept(long t) {
 785             array[offset++] = t;
 786         }
 787     }
 788 
 789     /**
 790      * {@link Sink} for implementing sort on long streams.
 791      */
 792     private static final class LongSortingSink extends AbstractLongSortingSink {
 793         private SpinedBuffer.OfLong b;
 794 
 795         LongSortingSink(Sink<? super Long> sink) {
 796             super(sink);
 797         }
 798 
 799         @Override
 800         public void begin(long size) {
 801             if (size >= Nodes.MAX_ARRAY_SIZE)
 802                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
 803             b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
 804         }
 805 
 806         @Override
 807         public void end() {
 808             long[] longs = b.asPrimitiveArray();
 809             Arrays.sort(longs);
 810             downstream.begin(longs.length);
 811             if (!cancellationWasRequested) {
 812                 for (long aLong : longs)
 813                     downstream.accept(aLong);
 814             }
 815             else {
 816                 for (long aLong : longs) {
 817                     if (downstream.cancellationRequested()) break;
 818                     downstream.accept(aLong);
 819                 }
 820             }
 821             downstream.end();
 822         }
 823 
 824         @Override
 825         public void accept(long t) {
 826             b.accept(t);
 827         }
 828     }
 829 
 830     /**
 831      * Abstract {@link Sink} for implementing sort on long streams.
 832      */
 833     private abstract static class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {
 834         protected boolean cancellationWasRequested;
 835 
 836         AbstractDoubleSortingSink(Sink<? super Double> downstream) {
 837             super(downstream);
 838         }
 839 
 840         @Override
 841         public final boolean cancellationRequested() {
 842             cancellationWasRequested = true;
 843             return false;
 844         }
 845     }
 846 
 847     /**
 848      * {@link Sink} for implementing sort on SIZED double streams.
 849      */
 850     private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink {
 851         private double[] array;
 852         private int offset;
 853 
 854         SizedDoubleSortingSink(Sink<? super Double> downstream) {
 855             super(downstream);
 856         }
 857 
 858         @Override
 859         public void begin(long size) {
 860             if (size >= Nodes.MAX_ARRAY_SIZE)
 861                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
 862             array = new double[(int) size];
 863         }
 864 
 865         @Override
 866         public void end() {
 867             Arrays.sort(array, 0, offset);
 868             downstream.begin(offset);
 869             if (!cancellationWasRequested) {
 870                 for (int i = 0; i < offset; i++)
 871                     downstream.accept(array[i]);
 872             }
 873             else {
 874                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
 875                     downstream.accept(array[i]);
 876             }
 877             downstream.end();
 878             array = null;
 879         }
 880 
 881         @Override
 882         public void accept(double t) {
 883             array[offset++] = t;
 884         }
 885     }
 886 
 887     /**
 888      * {@link Sink} for implementing sort on double streams.
 889      */
 890     private static final class DoubleSortingSink extends AbstractDoubleSortingSink {
 891         private SpinedBuffer.OfDouble b;
 892 
 893         DoubleSortingSink(Sink<? super Double> sink) {
 894             super(sink);
 895         }
 896 
 897         @Override
 898         public void begin(long size) {
 899             if (size >= Nodes.MAX_ARRAY_SIZE)
 900                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
 901             b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
 902         }
 903 
 904         @Override
 905         public void end() {
 906             double[] doubles = b.asPrimitiveArray();
 907             Arrays.sort(doubles);
 908             downstream.begin(doubles.length);
 909             if (!cancellationWasRequested) {
 910                 for (double aDouble : doubles)
 911                     downstream.accept(aDouble);
 912             }
 913             else {
 914                 for (double aDouble : doubles) {
 915                     if (downstream.cancellationRequested()) break;
 916                     downstream.accept(aDouble);
 917                 }
 918             }
 919             downstream.end();
 920         }
 921 
 922         @Override
 923         public void accept(double t) {
 924             b.accept(t);
 925         }
 926     }
 927 }