1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.ArrayList; 28 import java.util.Arrays; 29 import java.util.Comparator; 30 import java.util.Objects; 31 import java.util.Spliterator; 32 import java.util.concurrent.ForkJoinTask; 33 import java.util.function.IntFunction; 34 35 36 /** 37 * Factory methods for transforming streams into sorted streams. 38 * 39 * @since 1.8 40 */ 41 final class SortedOps { 42 43 private SortedOps() { } 44 45 /** 46 * Appends a "sorted" operation to the provided stream. 47 * 48 * @param <T> the type of both input and output elements 49 * @param upstream a reference stream with element type T 50 */ 51 static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) { 52 return new OfRef<>(upstream); 53 } 54 55 /** 56 * Appends a "sorted" operation to the provided stream. 57 * 58 * @param <T> the type of both input and output elements 59 * @param upstream a reference stream with element type T 60 * @param comparator the comparator to order elements by 61 */ 62 static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream, 63 Comparator<? super T> comparator) { 64 return new OfRef<>(upstream, comparator); 65 } 66 67 /** 68 * Appends a "sorted" operation to the provided stream. 69 * 70 * @param <T> the type of both input and output elements 71 * @param upstream a reference stream with element type T 72 */ 73 static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) { 74 return new OfInt(upstream); 75 } 76 77 /** 78 * Appends a "sorted" operation to the provided stream. 79 * 80 * @param <T> the type of both input and output elements 81 * @param upstream a reference stream with element type T 82 */ 83 static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) { 84 return new OfLong(upstream); 85 } 86 87 /** 88 * Appends a "sorted" operation to the provided stream. 89 * 90 * @param <T> the type of both input and output elements 91 * @param upstream a reference stream with element type T 92 */ 93 static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) { 94 return new OfDouble(upstream); 95 } 96 97 /** 98 * Specialized subtype for sorting reference streams 99 */ 100 private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> { 101 /** 102 * Comparator used for sorting 103 */ 104 private final boolean isNaturalSort; 105 private final Comparator<? super T> comparator; 106 107 /** 108 * Sort using natural order of {@literal <T>} which must be 109 * {@code Comparable}. 110 */ 111 OfRef(AbstractPipeline<?, T, ?> upstream) { 112 super(upstream, StreamShape.REFERENCE, 113 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 114 this.isNaturalSort = true; 115 // Will throw CCE when we try to sort if T is not Comparable 116 this.comparator = (Comparator<? super T>) Comparator.naturalOrder(); 117 } 118 119 /** 120 * Sort using the provided comparator. 121 * 122 * @param comparator The comparator to be used to evaluate ordering. 123 */ 124 OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { 125 super(upstream, StreamShape.REFERENCE, 126 StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED); 127 this.isNaturalSort = false; 128 this.comparator = Objects.requireNonNull(comparator); 129 } 130 131 @Override 132 public Sink<T> opWrapSink(int flags, Sink sink) { 133 Objects.requireNonNull(sink); 134 135 // If the input is already naturally sorted and this operation 136 // also naturally sorted then this is a no-op 137 if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) 138 return sink; 139 else if (StreamOpFlag.SIZED.isKnown(flags)) 140 return new SizedRefSortingSink<>(sink, comparator); 141 else 142 return new RefSortingSink<>(sink, comparator); 143 } 144 145 @Override 146 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 147 Spliterator<P_IN> spliterator, 148 IntFunction<T[]> generator) { 149 // If the input is already naturally sorted and this operation 150 // naturally sorts then collect the output 151 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) { 152 return helper.evaluate(spliterator, false, generator); 153 } 154 else { 155 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort 156 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator); 157 Arrays.parallelSort(flattenedData, comparator); 158 return Nodes.node(flattenedData); 159 } 160 } 161 } 162 163 /** 164 * Specialized subtype for sorting int streams. 165 */ 166 private static final class OfInt extends IntPipeline.StatefulOp<Integer> { 167 OfInt(AbstractPipeline<?, Integer, ?> upstream) { 168 super(upstream, StreamShape.INT_VALUE, 169 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 170 } 171 172 @Override 173 public Sink<Integer> opWrapSink(int flags, Sink sink) { 174 Objects.requireNonNull(sink); 175 176 if (StreamOpFlag.SORTED.isKnown(flags)) 177 return sink; 178 else if (StreamOpFlag.SIZED.isKnown(flags)) 179 return new SizedIntSortingSink(sink); 180 else 181 return new IntSortingSink(sink); 182 } 183 184 @Override 185 public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 186 Spliterator<P_IN> spliterator, 187 IntFunction<Integer[]> generator) { 188 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 189 return helper.evaluate(spliterator, false, generator); 190 } 191 else { 192 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator); 193 194 int[] content = n.asPrimitiveArray(); 195 Arrays.parallelSort(content); 196 197 return Nodes.node(content); 198 } 199 } 200 } 201 202 /** 203 * Specialized subtype for sorting long streams. 204 */ 205 private static final class OfLong extends LongPipeline.StatefulOp<Long> { 206 OfLong(AbstractPipeline<?, Long, ?> upstream) { 207 super(upstream, StreamShape.LONG_VALUE, 208 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 209 } 210 211 @Override 212 public Sink<Long> opWrapSink(int flags, Sink sink) { 213 Objects.requireNonNull(sink); 214 215 if (StreamOpFlag.SORTED.isKnown(flags)) 216 return sink; 217 else if (StreamOpFlag.SIZED.isKnown(flags)) 218 return new SizedLongSortingSink(sink); 219 else 220 return new LongSortingSink(sink); 221 } 222 223 @Override 224 public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, 225 Spliterator<P_IN> spliterator, 226 IntFunction<Long[]> generator) { 227 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 228 return helper.evaluate(spliterator, false, generator); 229 } 230 else { 231 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator); 232 233 long[] content = n.asPrimitiveArray(); 234 Arrays.parallelSort(content); 235 236 return Nodes.node(content); 237 } 238 } 239 } 240 241 /** 242 * Specialized subtype for sorting double streams. 243 */ 244 private static final class OfDouble extends DoublePipeline.StatefulOp<Double> { 245 OfDouble(AbstractPipeline<?, Double, ?> upstream) { 246 super(upstream, StreamShape.DOUBLE_VALUE, 247 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 248 } 249 250 @Override 251 public Sink<Double> opWrapSink(int flags, Sink sink) { 252 Objects.requireNonNull(sink); 253 254 if (StreamOpFlag.SORTED.isKnown(flags)) 255 return sink; 256 else if (StreamOpFlag.SIZED.isKnown(flags)) 257 return new SizedDoubleSortingSink(sink); 258 else 259 return new DoubleSortingSink(sink); 260 } 261 262 @Override 263 public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 264 Spliterator<P_IN> spliterator, 265 IntFunction<Double[]> generator) { 266 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 267 return helper.evaluate(spliterator, false, generator); 268 } 269 else { 270 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator); 271 272 double[] content = n.asPrimitiveArray(); 273 Arrays.parallelSort(content); 274 275 return Nodes.node(content); 276 } 277 } 278 } 279 280 /** 281 * {@link ForkJoinTask} for implementing sort on SIZED reference streams. 282 */ 283 private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T> { 284 private final Comparator<? super T> comparator; 285 private T[] array; 286 private int offset; 287 288 SizedRefSortingSink(Sink sink, Comparator<? super T> comparator) { 289 super(sink); 290 this.comparator = comparator; 291 } 292 293 @Override 294 public void begin(long size) { 295 if (size >= Nodes.MAX_ARRAY_SIZE) 296 throw new IllegalArgumentException("Stream size exceeds max array size"); 297 array = (T[]) new Object[(int) size]; 298 } 299 300 @Override 301 public void end() { 302 // Need to use offset rather than array.length since the downstream 303 // many be short-circuiting 304 // @@@ A better approach is to know if the downstream short-circuits 305 // and check sink.cancellationRequested 306 Arrays.sort(array, 0, offset, comparator); 307 downstream.begin(offset); 308 for (int i = 0; i < offset; i++) 309 downstream.accept(array[i]); 310 downstream.end(); 311 array = null; 312 } 313 314 @Override 315 public void accept(T t) { 316 array[offset++] = t; 317 } 318 } 319 320 /** 321 * {@link Sink} for implementing sort on reference streams. 322 */ 323 private static final class RefSortingSink<T> extends Sink.ChainedReference<T> { 324 private final Comparator<? super T> comparator; 325 private ArrayList<T> list; 326 327 RefSortingSink(Sink sink, Comparator<? super T> comparator) { 328 super(sink); 329 this.comparator = comparator; 330 } 331 332 @Override 333 public void begin(long size) { 334 list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>(); 335 } 336 337 @Override 338 public void end() { 339 list.sort(comparator); 340 downstream.begin(list.size()); 341 list.forEach(downstream::accept); 342 downstream.end(); 343 list = null; 344 } 345 346 @Override 347 public void accept(T t) { 348 list.add(t); 349 } 350 } 351 352 /** 353 * {@link Sink} for implementing sort on SIZED int streams. 354 */ 355 private static final class SizedIntSortingSink extends Sink.ChainedInt { 356 private int[] array; 357 private int offset; 358 359 SizedIntSortingSink(Sink downstream) { 360 super(downstream); 361 } 362 363 @Override 364 public void begin(long size) { 365 if (size >= Nodes.MAX_ARRAY_SIZE) 366 throw new IllegalArgumentException("Stream size exceeds max array size"); 367 array = new int[(int) size]; 368 } 369 370 @Override 371 public void end() { 372 Arrays.sort(array, 0, offset); 373 downstream.begin(offset); 374 for (int i = 0; i < offset; i++) 375 downstream.accept(array[i]); 376 downstream.end(); 377 array = null; 378 } 379 380 @Override 381 public void accept(int t) { 382 array[offset++] = t; 383 } 384 } 385 386 /** 387 * {@link Sink} for implementing sort on int streams. 388 */ 389 private static final class IntSortingSink extends Sink.ChainedInt { 390 private SpinedBuffer.OfInt b; 391 392 IntSortingSink(Sink sink) { 393 super(sink); 394 } 395 396 @Override 397 public void begin(long size) { 398 b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt(); 399 } 400 401 @Override 402 public void end() { 403 int[] ints = b.asPrimitiveArray(); 404 Arrays.sort(ints); 405 downstream.begin(ints.length); 406 for (int anInt : ints) 407 downstream.accept(anInt); 408 downstream.end(); 409 } 410 411 @Override 412 public void accept(int t) { 413 b.accept(t); 414 } 415 } 416 417 /** 418 * {@link Sink} for implementing sort on SIZED long streams. 419 */ 420 private static final class SizedLongSortingSink extends Sink.ChainedLong { 421 private long[] array; 422 private int offset; 423 424 SizedLongSortingSink(Sink downstream) { 425 super(downstream); 426 } 427 428 @Override 429 public void begin(long size) { 430 if (size >= Nodes.MAX_ARRAY_SIZE) 431 throw new IllegalArgumentException("Stream size exceeds max array size"); 432 array = new long[(int) size]; 433 } 434 435 @Override 436 public void end() { 437 Arrays.sort(array, 0, offset); 438 downstream.begin(offset); 439 for (int i = 0; i < offset; i++) 440 downstream.accept(array[i]); 441 downstream.end(); 442 array = null; 443 } 444 445 @Override 446 public void accept(long t) { 447 array[offset++] = t; 448 } 449 } 450 451 /** 452 * {@link Sink} for implementing sort on long streams. 453 */ 454 private static final class LongSortingSink extends Sink.ChainedLong { 455 private SpinedBuffer.OfLong b; 456 457 LongSortingSink(Sink sink) { 458 super(sink); 459 } 460 461 @Override 462 public void begin(long size) { 463 b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong(); 464 } 465 466 @Override 467 public void end() { 468 long[] longs = b.asPrimitiveArray(); 469 Arrays.sort(longs); 470 downstream.begin(longs.length); 471 for (long aLong : longs) 472 downstream.accept(aLong); 473 downstream.end(); 474 } 475 476 @Override 477 public void accept(long t) { 478 b.accept(t); 479 } 480 } 481 482 /** 483 * {@link Sink} for implementing sort on SIZED double streams. 484 */ 485 private static final class SizedDoubleSortingSink extends Sink.ChainedDouble { 486 private double[] array; 487 private int offset; 488 489 SizedDoubleSortingSink(Sink downstream) { 490 super(downstream); 491 } 492 493 @Override 494 public void begin(long size) { 495 if (size >= Nodes.MAX_ARRAY_SIZE) 496 throw new IllegalArgumentException("Stream size exceeds max array size"); 497 array = new double[(int) size]; 498 } 499 500 @Override 501 public void end() { 502 Arrays.sort(array, 0, offset); 503 downstream.begin(offset); 504 for (int i = 0; i < offset; i++) 505 downstream.accept(array[i]); 506 downstream.end(); 507 array = null; 508 } 509 510 @Override 511 public void accept(double t) { 512 array[offset++] = t; 513 } 514 } 515 516 /** 517 * {@link Sink} for implementing sort on double streams. 518 */ 519 private static final class DoubleSortingSink extends Sink.ChainedDouble { 520 private SpinedBuffer.OfDouble b; 521 522 DoubleSortingSink(Sink sink) { 523 super(sink); 524 } 525 526 @Override 527 public void begin(long size) { 528 b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble(); 529 } 530 531 @Override 532 public void end() { 533 double[] doubles = b.asPrimitiveArray(); 534 Arrays.sort(doubles); 535 downstream.begin(doubles.length); 536 for (double aDouble : doubles) 537 downstream.accept(aDouble); 538 downstream.end(); 539 } 540 541 @Override 542 public void accept(double t) { 543 b.accept(t); 544 } 545 } 546 }