1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.ArrayList; 28 import java.util.Arrays; 29 import java.util.Comparator; 30 import java.util.Objects; 31 import java.util.Spliterator; 32 import java.util.function.IntFunction; 33 34 35 /** 36 * Factory methods for transforming streams into sorted streams. 37 * 38 * @since 1.8 39 */ 40 final class SortedOps { 41 42 private SortedOps() { } 43 44 /** 45 * Appends a "sorted" operation to the provided stream. 46 * 47 * @param <T> the type of both input and output elements 48 * @param upstream a reference stream with element type T 49 */ 50 static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) { 51 return new OfRef<>(upstream); 52 } 53 54 /** 55 * Appends a "sorted" operation to the provided stream. 56 * 57 * @param <T> the type of both input and output elements 58 * @param upstream a reference stream with element type T 59 * @param comparator the comparator to order elements by 60 */ 61 static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream, 62 Comparator<? super T> comparator) { 63 return new OfRef<>(upstream, comparator); 64 } 65 66 /** 67 * Appends a "sorted" operation to the provided stream. 68 * 69 * @param <T> the type of both input and output elements 70 * @param upstream a reference stream with element type T 71 */ 72 static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) { 73 return new OfInt(upstream); 74 } 75 76 /** 77 * Appends a "sorted" operation to the provided stream. 78 * 79 * @param <T> the type of both input and output elements 80 * @param upstream a reference stream with element type T 81 */ 82 static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) { 83 return new OfLong(upstream); 84 } 85 86 /** 87 * Appends a "sorted" operation to the provided stream. 88 * 89 * @param <T> the type of both input and output elements 90 * @param upstream a reference stream with element type T 91 */ 92 static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) { 93 return new OfDouble(upstream); 94 } 95 96 static class Limiter<T> { 97 private final T[] data; 98 private final int limit; 99 private final Comparator<? super T> comparator; 100 private int size; 101 private boolean initial = true; 102 103 /** 104 * @param limit how many least elements to keep 105 * @param length length of the input if known, -1 otherwise 106 * @param comparator 107 */ 108 @SuppressWarnings("unchecked") 109 Limiter(int limit, long length, Comparator<? super T> comparator) { 110 this.limit = limit; 111 this.comparator = comparator; 112 int dataSize = limit * 2; 113 if(length >= 0 && length < dataSize) 114 dataSize = (int) length; 115 this.data = (T[]) new Object[dataSize]; 116 } 117 118 /** 119 * Accumulate new element 120 * 121 * @param t element to accumulate 122 * 123 * @return false if the element is definitely not included into result, so 124 * any bigger element could be skipped as well, or true if element 125 * will probably be included into result. 126 */ 127 final boolean put(T t) { 128 int l = limit; 129 T[] d = data; 130 if (l == 1) { 131 // limit == 1 is the special case: exactly one least element is stored, 132 // no sorting is performed 133 if (initial) { 134 initial = false; 135 size = 1; 136 } else if (comparator.compare(t, d[0]) >= 0) 137 return false; 138 d[0] = t; 139 return true; 140 } 141 if (initial) { 142 if (size < d.length) { 143 d[size++] = t; 144 return true; 145 } 146 Arrays.sort(d, comparator); 147 initial = false; 148 size = l; 149 } 150 if (size == d.length) { 151 sortTail(d, l, size, comparator); 152 size = limit; 153 } 154 if (comparator.compare(t, d[l - 1]) < 0) { 155 d[size++] = t; 156 return true; 157 } 158 return false; 159 } 160 161 /** 162 * Merge other {@code Limiter} object into this (other object becomes unusable after that). 163 * 164 * @param other other object to merge 165 * @return this object 166 */ 167 Limiter<T> putAll(Limiter<T> other) { 168 int i = 0, l = limit; 169 T[] od = other.data; 170 if (!other.initial) { 171 // sorted part 172 for (; i < l; i++) { 173 if (!put(od[i])) 174 break; 175 } 176 i = l; 177 } 178 int os = other.size; 179 for (; i < os; i++) { 180 put(od[i]); 181 } 182 return this; 183 } 184 185 /** 186 * Must be called after accumulation is finished. 187 */ 188 void sort() { 189 if (limit == 1) 190 return; 191 if (initial) 192 Arrays.sort(data, 0, size, comparator); 193 else if (size > limit) 194 sortTail(data, limit, size, comparator); 195 if (size > limit) 196 size = limit; 197 } 198 199 /* 200 * Sort data[limit]..data[size] and merge with presorted data[0]..data[limit-1] 201 * Assuming that data[limit-1] is the biggest element 202 */ 203 private static <T> void sortTail(T[] data, int limit, int size, Comparator<? super T> comparator) { 204 assert size > limit; 205 Arrays.sort(data, limit, size, comparator); 206 if (comparator.compare(data[size - 1], data[0]) < 0) { 207 // Common case: descending sequence 208 System.arraycopy(data, 0, data, size - limit, 2 * limit - size); 209 System.arraycopy(data, limit, data, 0, size - limit); 210 } else { 211 // Merge presorted 0..limit-1 and limit..size-1 212 @SuppressWarnings("unchecked") 213 T[] buf = (T[]) new Object[limit]; 214 int i = 0, j = limit, k = 0; 215 // data[limit-1] is guaranteed to be the worst element, thus no need 216 // to check it 217 while (i < limit - 1 && k < limit && j < size) { 218 buf[k++] = comparator.compare(data[i], data[j]) <= 0 ? data[i++] : data[j++]; 219 } 220 if (k < limit) { 221 System.arraycopy(data, i < limit - 1 ? i : j, data, k, limit - k); 222 } 223 System.arraycopy(buf, 0, data, 0, k); 224 } 225 } 226 } 227 228 /** 229 * Performs sorted().limit(maxSize) operation in single step for maxSize < SORTED_LIMIT_MAX_SIZE 230 */ 231 private static final class SortedLimit<T> extends ReferencePipeline.StatefulOp<T, T> { 232 static final int SORTED_LIMIT_MAX_SIZE = 1000; 233 234 private final Comparator<? super T> comparator; 235 private final int limit; 236 237 SortedLimit(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator, int limit) { 238 super(upstream, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED); 239 this.comparator = comparator; 240 this.limit = limit; 241 } 242 243 @Override 244 public Sink<T> opWrapSink(int flags, Sink<T> sink) { 245 Objects.requireNonNull(sink); 246 247 return new AbstractRefSortingSink<T>(sink, comparator) { 248 private Limiter<T> limiter; 249 250 @Override 251 @SuppressWarnings("unchecked") 252 public void begin(long size) { 253 limiter = new Limiter<>(limit, size, comparator); 254 } 255 256 @Override 257 @SuppressWarnings("unchecked") 258 public void end() { 259 limiter.sort(); 260 T[] array = limiter.data; 261 int offset = limiter.size; 262 downstream.begin(offset); 263 if (!cancellationWasRequested) { 264 for(int i = 0; i<offset; i++) 265 downstream.accept(array[i]); 266 } 267 else { 268 for(int i = 0; i<offset; i++) { 269 if (downstream.cancellationRequested()) break; 270 downstream.accept(array[i]); 271 } 272 } 273 downstream.end(); 274 } 275 276 @Override 277 public void accept(T t) { 278 limiter.put(t); 279 } 280 }; 281 } 282 283 @Override 284 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 285 Spliterator<P_IN> spliterator, 286 IntFunction<T[]> generator) { 287 TerminalOp<T, Limiter<T>> op = ReduceOps.<T, Limiter<T>>makeRef( 288 () -> new Limiter<>(limit, -1, comparator), Limiter::put, Limiter::putAll); 289 Limiter<T> limiter = op.evaluateParallel(helper, spliterator); 290 // No reason to use parallelSort here as 291 // limit <= SORTED_LIMIT_MAX_SIZE < MIN_ARRAY_SORT_GRAN 292 limiter.sort(); 293 T[] array = limiter.data; 294 int length = limiter.size; 295 return Nodes.node(Arrays.copyOfRange(array, 0, length)); 296 } 297 } 298 299 /** 300 * Specialized subtype for sorting reference streams 301 */ 302 private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> { 303 /** 304 * Comparator used for sorting 305 */ 306 private final boolean isNaturalSort; 307 private final Comparator<? super T> comparator; 308 private boolean skip; 309 310 /** 311 * Sort using natural order of {@literal <T>} which must be 312 * {@code Comparable}. 313 */ 314 OfRef(AbstractPipeline<?, T, ?> upstream) { 315 super(upstream, StreamShape.REFERENCE, 316 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 317 this.isNaturalSort = true; 318 // Will throw CCE when we try to sort if T is not Comparable 319 @SuppressWarnings("unchecked") 320 Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder(); 321 this.comparator = comp; 322 } 323 324 /** 325 * Sort using the provided comparator. 326 * 327 * @param comparator The comparator to be used to evaluate ordering. 328 */ 329 OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { 330 super(upstream, StreamShape.REFERENCE, 331 StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED); 332 this.isNaturalSort = false; 333 this.comparator = Objects.requireNonNull(comparator); 334 } 335 336 @Override 337 public final Stream<T> limit(long maxSize) { 338 if(maxSize < 0 || maxSize > SortedLimit.SORTED_LIMIT_MAX_SIZE) 339 return super.limit(maxSize); 340 skip = true; 341 if(maxSize == 0) 342 return super.limit(maxSize); 343 return new SortedLimit<>(this, comparator, (int)maxSize); 344 } 345 346 @Override 347 public Sink<T> opWrapSink(int flags, Sink<T> sink) { 348 Objects.requireNonNull(sink); 349 350 // If the input is already naturally sorted and this operation 351 // also naturally sorted then this is a no-op 352 if (skip || (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)) 353 return sink; 354 else if (StreamOpFlag.SIZED.isKnown(flags)) 355 return new SizedRefSortingSink<>(sink, comparator); 356 else 357 return new RefSortingSink<>(sink, comparator); 358 } 359 360 @Override 361 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 362 Spliterator<P_IN> spliterator, 363 IntFunction<T[]> generator) { 364 // If the input is already naturally sorted and this operation 365 // naturally sorts then collect the output 366 if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) { 367 return helper.evaluate(spliterator, false, generator); 368 } 369 else { 370 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort 371 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator); 372 Arrays.parallelSort(flattenedData, comparator); 373 return Nodes.node(flattenedData); 374 } 375 } 376 377 @Override 378 <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { 379 // If the input is already naturally sorted and this operation 380 // naturally sorts then perfrom a no-op 381 if (skip || (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort)) { 382 return helper.wrapSpliterator(spliterator); 383 } 384 else { 385 return super.opEvaluateParallelLazy(helper, spliterator); 386 } 387 } 388 } 389 390 /** 391 * Specialized subtype for sorting int streams. 392 */ 393 private static final class OfInt extends IntPipeline.StatefulOp<Integer> { 394 OfInt(AbstractPipeline<?, Integer, ?> upstream) { 395 super(upstream, StreamShape.INT_VALUE, 396 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 397 } 398 399 @Override 400 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 401 Objects.requireNonNull(sink); 402 403 if (StreamOpFlag.SORTED.isKnown(flags)) 404 return sink; 405 else if (StreamOpFlag.SIZED.isKnown(flags)) 406 return new SizedIntSortingSink(sink); 407 else 408 return new IntSortingSink(sink); 409 } 410 411 @Override 412 public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 413 Spliterator<P_IN> spliterator, 414 IntFunction<Integer[]> generator) { 415 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 416 return helper.evaluate(spliterator, false, generator); 417 } 418 else { 419 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator); 420 421 int[] content = n.asPrimitiveArray(); 422 Arrays.parallelSort(content); 423 424 return Nodes.node(content); 425 } 426 } 427 } 428 429 /** 430 * Specialized subtype for sorting long streams. 431 */ 432 private static final class OfLong extends LongPipeline.StatefulOp<Long> { 433 OfLong(AbstractPipeline<?, Long, ?> upstream) { 434 super(upstream, StreamShape.LONG_VALUE, 435 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 436 } 437 438 @Override 439 public Sink<Long> opWrapSink(int flags, Sink<Long> sink) { 440 Objects.requireNonNull(sink); 441 442 if (StreamOpFlag.SORTED.isKnown(flags)) 443 return sink; 444 else if (StreamOpFlag.SIZED.isKnown(flags)) 445 return new SizedLongSortingSink(sink); 446 else 447 return new LongSortingSink(sink); 448 } 449 450 @Override 451 public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, 452 Spliterator<P_IN> spliterator, 453 IntFunction<Long[]> generator) { 454 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 455 return helper.evaluate(spliterator, false, generator); 456 } 457 else { 458 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator); 459 460 long[] content = n.asPrimitiveArray(); 461 Arrays.parallelSort(content); 462 463 return Nodes.node(content); 464 } 465 } 466 } 467 468 /** 469 * Specialized subtype for sorting double streams. 470 */ 471 private static final class OfDouble extends DoublePipeline.StatefulOp<Double> { 472 OfDouble(AbstractPipeline<?, Double, ?> upstream) { 473 super(upstream, StreamShape.DOUBLE_VALUE, 474 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 475 } 476 477 @Override 478 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 479 Objects.requireNonNull(sink); 480 481 if (StreamOpFlag.SORTED.isKnown(flags)) 482 return sink; 483 else if (StreamOpFlag.SIZED.isKnown(flags)) 484 return new SizedDoubleSortingSink(sink); 485 else 486 return new DoubleSortingSink(sink); 487 } 488 489 @Override 490 public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 491 Spliterator<P_IN> spliterator, 492 IntFunction<Double[]> generator) { 493 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 494 return helper.evaluate(spliterator, false, generator); 495 } 496 else { 497 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator); 498 499 double[] content = n.asPrimitiveArray(); 500 Arrays.parallelSort(content); 501 502 return Nodes.node(content); 503 } 504 } 505 } 506 507 /** 508 * Abstract {@link Sink} for implementing sort on reference streams. 509 * 510 * <p> 511 * Note: documentation below applies to reference and all primitive sinks. 512 * <p> 513 * Sorting sinks first accept all elements, buffering then into an array 514 * or a re-sizable data structure, if the size of the pipeline is known or 515 * unknown respectively. At the end of the sink protocol those elements are 516 * sorted and then pushed downstream. 517 * This class records if {@link #cancellationRequested} is called. If so it 518 * can be inferred that the source pushing source elements into the pipeline 519 * knows that the pipeline is short-circuiting. In such cases sub-classes 520 * pushing elements downstream will preserve the short-circuiting protocol 521 * by calling {@code downstream.cancellationRequested()} and checking the 522 * result is {@code false} before an element is pushed. 523 * <p> 524 * Note that the above behaviour is an optimization for sorting with 525 * sequential streams. It is not an error that more elements, than strictly 526 * required to produce a result, may flow through the pipeline. This can 527 * occur, in general (not restricted to just sorting), for short-circuiting 528 * parallel pipelines. 529 */ 530 private abstract static class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> { 531 protected final Comparator<? super T> comparator; 532 // @@@ could be a lazy final value, if/when support is added 533 protected boolean cancellationWasRequested; 534 535 AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) { 536 super(downstream); 537 this.comparator = comparator; 538 } 539 540 /** 541 * Records is cancellation is requested so short-circuiting behaviour 542 * can be preserved when the sorted elements are pushed downstream. 543 * 544 * @return false, as this sink never short-circuits. 545 */ 546 @Override 547 public final boolean cancellationRequested() { 548 cancellationWasRequested = true; 549 return false; 550 } 551 } 552 553 /** 554 * {@link Sink} for implementing sort on SIZED reference streams. 555 */ 556 private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> { 557 private T[] array; 558 private int offset; 559 560 SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { 561 super(sink, comparator); 562 } 563 564 @Override 565 @SuppressWarnings("unchecked") 566 public void begin(long size) { 567 if (size >= Nodes.MAX_ARRAY_SIZE) 568 throw new IllegalArgumentException(Nodes.BAD_SIZE); 569 array = (T[]) new Object[(int) size]; 570 } 571 572 @Override 573 public void end() { 574 Arrays.sort(array, 0, offset, comparator); 575 downstream.begin(offset); 576 if (!cancellationWasRequested) { 577 for (int i = 0; i < offset; i++) 578 downstream.accept(array[i]); 579 } 580 else { 581 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) 582 downstream.accept(array[i]); 583 } 584 downstream.end(); 585 array = null; 586 } 587 588 @Override 589 public void accept(T t) { 590 array[offset++] = t; 591 } 592 } 593 594 /** 595 * {@link Sink} for implementing sort on reference streams. 596 */ 597 private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> { 598 private ArrayList<T> list; 599 600 RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { 601 super(sink, comparator); 602 } 603 604 @Override 605 public void begin(long size) { 606 if (size >= Nodes.MAX_ARRAY_SIZE) 607 throw new IllegalArgumentException(Nodes.BAD_SIZE); 608 list = (size >= 0) ? new ArrayList<>((int) size) : new ArrayList<>(); 609 } 610 611 @Override 612 public void end() { 613 list.sort(comparator); 614 downstream.begin(list.size()); 615 if (!cancellationWasRequested) { 616 list.forEach(downstream::accept); 617 } 618 else { 619 for (T t : list) { 620 if (downstream.cancellationRequested()) break; 621 downstream.accept(t); 622 } 623 } 624 downstream.end(); 625 list = null; 626 } 627 628 @Override 629 public void accept(T t) { 630 list.add(t); 631 } 632 } 633 634 /** 635 * Abstract {@link Sink} for implementing sort on int streams. 636 */ 637 private abstract static class AbstractIntSortingSink extends Sink.ChainedInt<Integer> { 638 protected boolean cancellationWasRequested; 639 640 AbstractIntSortingSink(Sink<? super Integer> downstream) { 641 super(downstream); 642 } 643 644 @Override 645 public final boolean cancellationRequested() { 646 cancellationWasRequested = true; 647 return false; 648 } 649 } 650 651 /** 652 * {@link Sink} for implementing sort on SIZED int streams. 653 */ 654 private static final class SizedIntSortingSink extends AbstractIntSortingSink { 655 private int[] array; 656 private int offset; 657 658 SizedIntSortingSink(Sink<? super Integer> downstream) { 659 super(downstream); 660 } 661 662 @Override 663 public void begin(long size) { 664 if (size >= Nodes.MAX_ARRAY_SIZE) 665 throw new IllegalArgumentException(Nodes.BAD_SIZE); 666 array = new int[(int) size]; 667 } 668 669 @Override 670 public void end() { 671 Arrays.sort(array, 0, offset); 672 downstream.begin(offset); 673 if (!cancellationWasRequested) { 674 for (int i = 0; i < offset; i++) 675 downstream.accept(array[i]); 676 } 677 else { 678 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) 679 downstream.accept(array[i]); 680 } 681 downstream.end(); 682 array = null; 683 } 684 685 @Override 686 public void accept(int t) { 687 array[offset++] = t; 688 } 689 } 690 691 /** 692 * {@link Sink} for implementing sort on int streams. 693 */ 694 private static final class IntSortingSink extends AbstractIntSortingSink { 695 private SpinedBuffer.OfInt b; 696 697 IntSortingSink(Sink<? super Integer> sink) { 698 super(sink); 699 } 700 701 @Override 702 public void begin(long size) { 703 if (size >= Nodes.MAX_ARRAY_SIZE) 704 throw new IllegalArgumentException(Nodes.BAD_SIZE); 705 b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt(); 706 } 707 708 @Override 709 public void end() { 710 int[] ints = b.asPrimitiveArray(); 711 Arrays.sort(ints); 712 downstream.begin(ints.length); 713 if (!cancellationWasRequested) { 714 for (int anInt : ints) 715 downstream.accept(anInt); 716 } 717 else { 718 for (int anInt : ints) { 719 if (downstream.cancellationRequested()) break; 720 downstream.accept(anInt); 721 } 722 } 723 downstream.end(); 724 } 725 726 @Override 727 public void accept(int t) { 728 b.accept(t); 729 } 730 } 731 732 /** 733 * Abstract {@link Sink} for implementing sort on long streams. 734 */ 735 private abstract static class AbstractLongSortingSink extends Sink.ChainedLong<Long> { 736 protected boolean cancellationWasRequested; 737 738 AbstractLongSortingSink(Sink<? super Long> downstream) { 739 super(downstream); 740 } 741 742 @Override 743 public final boolean cancellationRequested() { 744 cancellationWasRequested = true; 745 return false; 746 } 747 } 748 749 /** 750 * {@link Sink} for implementing sort on SIZED long streams. 751 */ 752 private static final class SizedLongSortingSink extends AbstractLongSortingSink { 753 private long[] array; 754 private int offset; 755 756 SizedLongSortingSink(Sink<? super Long> downstream) { 757 super(downstream); 758 } 759 760 @Override 761 public void begin(long size) { 762 if (size >= Nodes.MAX_ARRAY_SIZE) 763 throw new IllegalArgumentException(Nodes.BAD_SIZE); 764 array = new long[(int) size]; 765 } 766 767 @Override 768 public void end() { 769 Arrays.sort(array, 0, offset); 770 downstream.begin(offset); 771 if (!cancellationWasRequested) { 772 for (int i = 0; i < offset; i++) 773 downstream.accept(array[i]); 774 } 775 else { 776 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) 777 downstream.accept(array[i]); 778 } 779 downstream.end(); 780 array = null; 781 } 782 783 @Override 784 public void accept(long t) { 785 array[offset++] = t; 786 } 787 } 788 789 /** 790 * {@link Sink} for implementing sort on long streams. 791 */ 792 private static final class LongSortingSink extends AbstractLongSortingSink { 793 private SpinedBuffer.OfLong b; 794 795 LongSortingSink(Sink<? super Long> sink) { 796 super(sink); 797 } 798 799 @Override 800 public void begin(long size) { 801 if (size >= Nodes.MAX_ARRAY_SIZE) 802 throw new IllegalArgumentException(Nodes.BAD_SIZE); 803 b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong(); 804 } 805 806 @Override 807 public void end() { 808 long[] longs = b.asPrimitiveArray(); 809 Arrays.sort(longs); 810 downstream.begin(longs.length); 811 if (!cancellationWasRequested) { 812 for (long aLong : longs) 813 downstream.accept(aLong); 814 } 815 else { 816 for (long aLong : longs) { 817 if (downstream.cancellationRequested()) break; 818 downstream.accept(aLong); 819 } 820 } 821 downstream.end(); 822 } 823 824 @Override 825 public void accept(long t) { 826 b.accept(t); 827 } 828 } 829 830 /** 831 * Abstract {@link Sink} for implementing sort on long streams. 832 */ 833 private abstract static class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> { 834 protected boolean cancellationWasRequested; 835 836 AbstractDoubleSortingSink(Sink<? super Double> downstream) { 837 super(downstream); 838 } 839 840 @Override 841 public final boolean cancellationRequested() { 842 cancellationWasRequested = true; 843 return false; 844 } 845 } 846 847 /** 848 * {@link Sink} for implementing sort on SIZED double streams. 849 */ 850 private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink { 851 private double[] array; 852 private int offset; 853 854 SizedDoubleSortingSink(Sink<? super Double> downstream) { 855 super(downstream); 856 } 857 858 @Override 859 public void begin(long size) { 860 if (size >= Nodes.MAX_ARRAY_SIZE) 861 throw new IllegalArgumentException(Nodes.BAD_SIZE); 862 array = new double[(int) size]; 863 } 864 865 @Override 866 public void end() { 867 Arrays.sort(array, 0, offset); 868 downstream.begin(offset); 869 if (!cancellationWasRequested) { 870 for (int i = 0; i < offset; i++) 871 downstream.accept(array[i]); 872 } 873 else { 874 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) 875 downstream.accept(array[i]); 876 } 877 downstream.end(); 878 array = null; 879 } 880 881 @Override 882 public void accept(double t) { 883 array[offset++] = t; 884 } 885 } 886 887 /** 888 * {@link Sink} for implementing sort on double streams. 889 */ 890 private static final class DoubleSortingSink extends AbstractDoubleSortingSink { 891 private SpinedBuffer.OfDouble b; 892 893 DoubleSortingSink(Sink<? super Double> sink) { 894 super(sink); 895 } 896 897 @Override 898 public void begin(long size) { 899 if (size >= Nodes.MAX_ARRAY_SIZE) 900 throw new IllegalArgumentException(Nodes.BAD_SIZE); 901 b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble(); 902 } 903 904 @Override 905 public void end() { 906 double[] doubles = b.asPrimitiveArray(); 907 Arrays.sort(doubles); 908 downstream.begin(doubles.length); 909 if (!cancellationWasRequested) { 910 for (double aDouble : doubles) 911 downstream.accept(aDouble); 912 } 913 else { 914 for (double aDouble : doubles) { 915 if (downstream.cancellationRequested()) break; 916 downstream.accept(aDouble); 917 } 918 } 919 downstream.end(); 920 } 921 922 @Override 923 public void accept(double t) { 924 b.accept(t); 925 } 926 } 927 }