1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.ArrayList; 28 import java.util.Arrays; 29 import java.util.Comparator; 30 import java.util.Comparators; 31 import java.util.Objects; 32 import java.util.Spliterator; 33 import java.util.concurrent.ForkJoinTask; 34 import java.util.function.IntFunction; 35 36 37 /** 38 * Factory methods for transforming streams into sorted streams. 39 * 40 * @since 1.8 41 */ 42 final class SortedOps { 43 44 private SortedOps() { } 45 46 /** 47 * Appends a "sorted" operation to the provided stream. 48 * 49 * @param <T> the type of both input and output elements 50 * @param upstream a reference stream with element type T 51 */ 52 static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) { 53 return new OfRef<>(upstream); 54 } 55 56 /** 57 * Appends a "sorted" operation to the provided stream. 58 * 59 * @param <T> the type of both input and output elements 60 * @param upstream a reference stream with element type T 61 * @param comparator the comparator to order elements by 62 */ 63 static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream, 64 Comparator<? super T> comparator) { 65 return new OfRef<>(upstream, comparator); 66 } 67 68 /** 69 * Appends a "sorted" operation to the provided stream. 70 * 71 * @param <T> the type of both input and output elements 72 * @param upstream a reference stream with element type T 73 */ 74 static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) { 75 return new OfInt(upstream); 76 } 77 78 /** 79 * Appends a "sorted" operation to the provided stream. 80 * 81 * @param <T> the type of both input and output elements 82 * @param upstream a reference stream with element type T 83 */ 84 static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) { 85 return new OfLong(upstream); 86 } 87 88 /** 89 * Appends a "sorted" operation to the provided stream. 90 * 91 * @param <T> the type of both input and output elements 92 * @param upstream a reference stream with element type T 93 */ 94 static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) { 95 return new OfDouble(upstream); 96 } 97 98 /** 99 * Specialized subtype for sorting reference streams 100 */ 101 private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> { 102 /** 103 * Comparator used for sorting 104 */ 105 private final boolean isNaturalSort; 106 private final Comparator<? super T> comparator; 107 108 /** 109 * Sort using natural order of {@literal <T>} which must be 110 * {@code Comparable}. 111 */ 112 OfRef(AbstractPipeline<?, T, ?> upstream) { 113 super(upstream, StreamShape.REFERENCE, 114 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 115 this.isNaturalSort = true; 116 // Will throw CCE when we try to sort if T is not Comparable 117 this.comparator = (Comparator<? super T>) Comparators.naturalOrder(); 118 } 119 120 /** 121 * Sort using the provided comparator. 122 * 123 * @param comparator The comparator to be used to evaluate ordering. 124 */ 125 OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { 126 super(upstream, StreamShape.REFERENCE, 127 StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED); 128 this.isNaturalSort = false; 129 this.comparator = Objects.requireNonNull(comparator); 130 } 131 132 @Override 133 public Sink<T> opWrapSink(int flags, Sink sink) { 134 Objects.requireNonNull(sink); 135 136 // If the input is already naturally sorted and this operation 137 // also naturally sorted then this is a no-op 138 if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) 139 return sink; 140 else if (StreamOpFlag.SIZED.isKnown(flags)) 141 return new SizedRefSortingSink<>(sink, comparator); 142 else 143 return new RefSortingSink<>(sink, comparator); 144 } 145 146 @Override 147 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 148 Spliterator<P_IN> spliterator, 149 IntFunction<T[]> generator) { 150 // If the input is already naturally sorted and this operation 151 // naturally sorts then collect the output 152 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) { 153 return helper.evaluate(spliterator, false, generator); 154 } 155 else { 156 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort 157 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator); 158 Arrays.parallelSort(flattenedData, comparator); 159 return Nodes.node(flattenedData); 160 } 161 } 162 } 163 164 /** 165 * Specialized subtype for sorting int streams. 166 */ 167 private static final class OfInt extends IntPipeline.StatefulOp<Integer> { 168 OfInt(AbstractPipeline<?, Integer, ?> upstream) { 169 super(upstream, StreamShape.INT_VALUE, 170 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 171 } 172 173 @Override 174 public Sink<Integer> opWrapSink(int flags, Sink sink) { 175 Objects.requireNonNull(sink); 176 177 if (StreamOpFlag.SORTED.isKnown(flags)) 178 return sink; 179 else if (StreamOpFlag.SIZED.isKnown(flags)) 180 return new SizedIntSortingSink(sink); 181 else 182 return new IntSortingSink(sink); 183 } 184 185 @Override 186 public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 187 Spliterator<P_IN> spliterator, 188 IntFunction<Integer[]> generator) { 189 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 190 return helper.evaluate(spliterator, false, generator); 191 } 192 else { 193 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator); 194 195 int[] content = n.asIntArray(); 196 Arrays.parallelSort(content); 197 198 return Nodes.node(content); 199 } 200 } 201 } 202 203 /** 204 * Specialized subtype for sorting long streams. 205 */ 206 private static final class OfLong extends LongPipeline.StatefulOp<Long> { 207 OfLong(AbstractPipeline<?, Long, ?> upstream) { 208 super(upstream, StreamShape.LONG_VALUE, 209 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 210 } 211 212 @Override 213 public Sink<Long> opWrapSink(int flags, Sink sink) { 214 Objects.requireNonNull(sink); 215 216 if (StreamOpFlag.SORTED.isKnown(flags)) 217 return sink; 218 else if (StreamOpFlag.SIZED.isKnown(flags)) 219 return new SizedLongSortingSink(sink); 220 else 221 return new LongSortingSink(sink); 222 } 223 224 @Override 225 public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, 226 Spliterator<P_IN> spliterator, 227 IntFunction<Long[]> generator) { 228 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 229 return helper.evaluate(spliterator, false, generator); 230 } 231 else { 232 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator); 233 234 long[] content = n.asLongArray(); 235 Arrays.parallelSort(content); 236 237 return Nodes.node(content); 238 } 239 } 240 } 241 242 /** 243 * Specialized subtype for sorting double streams. 244 */ 245 private static final class OfDouble extends DoublePipeline.StatefulOp<Double> { 246 OfDouble(AbstractPipeline<?, Double, ?> upstream) { 247 super(upstream, StreamShape.DOUBLE_VALUE, 248 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 249 } 250 251 @Override 252 public Sink<Double> opWrapSink(int flags, Sink sink) { 253 Objects.requireNonNull(sink); 254 255 if (StreamOpFlag.SORTED.isKnown(flags)) 256 return sink; 257 else if (StreamOpFlag.SIZED.isKnown(flags)) 258 return new SizedDoubleSortingSink(sink); 259 else 260 return new DoubleSortingSink(sink); 261 } 262 263 @Override 264 public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 265 Spliterator<P_IN> spliterator, 266 IntFunction<Double[]> generator) { 267 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 268 return helper.evaluate(spliterator, false, generator); 269 } 270 else { 271 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator); 272 273 double[] content = n.asDoubleArray(); 274 Arrays.parallelSort(content); 275 276 return Nodes.node(content); 277 } 278 } 279 } 280 281 /** 282 * {@link ForkJoinTask} for implementing sort on SIZED reference streams. 283 */ 284 private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T> { 285 private final Comparator<? super T> comparator; 286 private T[] array; 287 private int offset; 288 289 SizedRefSortingSink(Sink sink, Comparator<? super T> comparator) { 290 super(sink); 291 this.comparator = comparator; 292 } 293 294 @Override 295 public void begin(long size) { 296 if (size >= Nodes.MAX_ARRAY_SIZE) 297 throw new IllegalArgumentException("Stream size exceeds max array size"); 298 array = (T[]) new Object[(int) size]; 299 } 300 301 @Override 302 public void end() { 303 // Need to use offset rather than array.length since the downstream 304 // many be short-circuiting 305 // @@@ A better approach is to know if the downstream short-circuits 306 // and check sink.cancellationRequested 307 Arrays.sort(array, 0, offset, comparator); 308 downstream.begin(offset); 309 for (int i = 0; i < offset; i++) 310 downstream.accept(array[i]); 311 downstream.end(); 312 array = null; 313 } 314 315 @Override 316 public void accept(T t) { 317 array[offset++] = t; 318 } 319 } 320 321 /** 322 * {@link Sink} for implementing sort on reference streams. 323 */ 324 private static final class RefSortingSink<T> extends Sink.ChainedReference<T> { 325 private final Comparator<? super T> comparator; 326 private ArrayList<T> list; 327 328 RefSortingSink(Sink sink, Comparator<? super T> comparator) { 329 super(sink); 330 this.comparator = comparator; 331 } 332 333 @Override 334 public void begin(long size) { 335 list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>(); 336 } 337 338 @Override 339 public void end() { 340 list.sort(comparator); 341 downstream.begin(list.size()); 342 list.forEach(downstream::accept); 343 downstream.end(); 344 list = null; 345 } 346 347 @Override 348 public void accept(T t) { 349 list.add(t); 350 } 351 } 352 353 /** 354 * {@link Sink} for implementing sort on SIZED int streams. 355 */ 356 private static final class SizedIntSortingSink extends Sink.ChainedInt { 357 private int[] array; 358 private int offset; 359 360 SizedIntSortingSink(Sink downstream) { 361 super(downstream); 362 } 363 364 @Override 365 public void begin(long size) { 366 if (size >= Nodes.MAX_ARRAY_SIZE) 367 throw new IllegalArgumentException("Stream size exceeds max array size"); 368 array = new int[(int) size]; 369 } 370 371 @Override 372 public void end() { 373 Arrays.sort(array, 0, offset); 374 downstream.begin(offset); 375 for (int i = 0; i < offset; i++) 376 downstream.accept(array[i]); 377 downstream.end(); 378 array = null; 379 } 380 381 @Override 382 public void accept(int t) { 383 array[offset++] = t; 384 } 385 } 386 387 /** 388 * {@link Sink} for implementing sort on int streams. 389 */ 390 private static final class IntSortingSink extends Sink.ChainedInt { 391 private SpinedBuffer.OfInt b; 392 393 IntSortingSink(Sink sink) { 394 super(sink); 395 } 396 397 @Override 398 public void begin(long size) { 399 b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt(); 400 } 401 402 @Override 403 public void end() { 404 int[] ints = b.asIntArray(); 405 Arrays.sort(ints); 406 downstream.begin(ints.length); 407 for (int anInt : ints) 408 downstream.accept(anInt); 409 downstream.end(); 410 } 411 412 @Override 413 public void accept(int t) { 414 b.accept(t); 415 } 416 } 417 418 /** 419 * {@link Sink} for implementing sort on SIZED long streams. 420 */ 421 private static final class SizedLongSortingSink extends Sink.ChainedLong { 422 private long[] array; 423 private int offset; 424 425 SizedLongSortingSink(Sink downstream) { 426 super(downstream); 427 } 428 429 @Override 430 public void begin(long size) { 431 if (size >= Nodes.MAX_ARRAY_SIZE) 432 throw new IllegalArgumentException("Stream size exceeds max array size"); 433 array = new long[(int) size]; 434 } 435 436 @Override 437 public void end() { 438 Arrays.sort(array, 0, offset); 439 downstream.begin(offset); 440 for (int i = 0; i < offset; i++) 441 downstream.accept(array[i]); 442 downstream.end(); 443 array = null; 444 } 445 446 @Override 447 public void accept(long t) { 448 array[offset++] = t; 449 } 450 } 451 452 /** 453 * {@link Sink} for implementing sort on long streams. 454 */ 455 private static final class LongSortingSink extends Sink.ChainedLong { 456 private SpinedBuffer.OfLong b; 457 458 LongSortingSink(Sink sink) { 459 super(sink); 460 } 461 462 @Override 463 public void begin(long size) { 464 b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong(); 465 } 466 467 @Override 468 public void end() { 469 long[] longs = b.asLongArray(); 470 Arrays.sort(longs); 471 downstream.begin(longs.length); 472 for (long aLong : longs) 473 downstream.accept(aLong); 474 downstream.end(); 475 } 476 477 @Override 478 public void accept(long t) { 479 b.accept(t); 480 } 481 } 482 483 /** 484 * {@link Sink} for implementing sort on SIZED double streams. 485 */ 486 private static final class SizedDoubleSortingSink extends Sink.ChainedDouble { 487 private double[] array; 488 private int offset; 489 490 SizedDoubleSortingSink(Sink downstream) { 491 super(downstream); 492 } 493 494 @Override 495 public void begin(long size) { 496 if (size >= Nodes.MAX_ARRAY_SIZE) 497 throw new IllegalArgumentException("Stream size exceeds max array size"); 498 array = new double[(int) size]; 499 } 500 501 @Override 502 public void end() { 503 Arrays.sort(array, 0, offset); 504 downstream.begin(offset); 505 for (int i = 0; i < offset; i++) 506 downstream.accept(array[i]); 507 downstream.end(); 508 array = null; 509 } 510 511 @Override 512 public void accept(double t) { 513 array[offset++] = t; 514 } 515 } 516 517 /** 518 * {@link Sink} for implementing sort on double streams. 519 */ 520 private static final class DoubleSortingSink extends Sink.ChainedDouble { 521 private SpinedBuffer.OfDouble b; 522 523 DoubleSortingSink(Sink sink) { 524 super(sink); 525 } 526 527 @Override 528 public void begin(long size) { 529 b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble(); 530 } 531 532 @Override 533 public void end() { 534 double[] doubles = b.asDoubleArray(); 535 Arrays.sort(doubles); 536 downstream.begin(doubles.length); 537 for (double aDouble : doubles) 538 downstream.accept(aDouble); 539 downstream.end(); 540 } 541 542 @Override 543 public void accept(double t) { 544 b.accept(t); 545 } 546 } 547 }