1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.ArrayList; 28 import java.util.Arrays; 29 import java.util.Comparator; 30 import java.util.Comparators; 31 import java.util.Objects; 32 import java.util.Spliterator; 33 import java.util.concurrent.ForkJoinTask; 34 import java.util.function.IntFunction; 35 36 37 /** 38 * Factory methods for transforming streams into sorted streams. 39 * 40 * @since 1.8 41 */ 42 final class SortedOps { 43 44 private SortedOps() { } 45 46 /** 47 * Appends a "sorted" operation to the provided stream. 48 * 49 * @param <T> The type of both input and output elements 50 * @param upstream A reference stream with element type T 51 */ 52 static<T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) { 53 return new OfRef<>(upstream); 54 } 55 56 /** 57 * Appends a "sorted" operation to the provided stream. 58 * 59 * @param <T> The type of both input and output elements 60 * @param upstream A reference stream with element type T 61 * @param comparator the comparator to order elements by 62 */ 63 static<T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream, 64 Comparator<? super T> comparator) { 65 return new OfRef<>(upstream, comparator); 66 } 67 68 /** 69 * Appends a "sorted" operation to the provided stream. 70 * 71 * @param <T> The type of both input and output elements 72 * @param upstream A reference stream with element type T 73 */ 74 static<T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) { 75 return new OfInt(upstream); 76 } 77 78 /** 79 * Appends a "sorted" operation to the provided stream. 80 * 81 * @param <T> The type of both input and output elements 82 * @param upstream A reference stream with element type T 83 */ 84 static<T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) { 85 return new OfLong(upstream); 86 } 87 88 /** 89 * Appends a "sorted" operation to the provided stream. 90 * 91 * @param <T> The type of both input and output elements 92 * @param upstream A reference stream with element type T 93 */ 94 static<T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) { 95 return new OfDouble(upstream); 96 } 97 98 /** Specialized subtype for sorting reference streams */ 99 private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> { 100 /** Comparator used for sorting */ 101 private final boolean isNaturalSort; 102 private final Comparator<? super T> comparator; 103 104 /** 105 * Sort using natural order of {@literal <T>} which must be 106 * {@code Comparable}. 107 */ 108 OfRef(AbstractPipeline<?, T, ?> upstream) { 109 super(upstream, StreamShape.REFERENCE, 110 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 111 this.isNaturalSort = true; 112 // Will throw CCE when we try to sort if T is not Comparable 113 this.comparator = (Comparator<? super T>) Comparators.naturalOrder(); 114 } 115 116 /** 117 * Sort using the provided comparator. 118 * 119 * @param comparator The comparator to be used to evaluate ordering. 120 */ 121 OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { 122 super(upstream, StreamShape.REFERENCE, 123 StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED); 124 this.isNaturalSort = false; 125 this.comparator = Objects.requireNonNull(comparator); 126 } 127 128 @Override 129 public Sink<T> opWrapSink(int flags, Sink sink) { 130 Objects.requireNonNull(sink); 131 132 // If the input is already naturally sorted and this operation 133 // also naturally sorted then this is a no-op 134 if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) 135 return sink; 136 else if (StreamOpFlag.SIZED.isKnown(flags)) 137 return new SizedRefSortingSink<>(sink, comparator); 138 else 139 return new RefSortingSink<>(sink, comparator); 140 } 141 142 @Override 143 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 144 Spliterator<P_IN> spliterator, 145 IntFunction<T[]> generator) { 146 // If the input is already naturally sorted and this operation 147 // naturally sorts then collect the output 148 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) { 149 return helper.evaluate(spliterator, false, generator); 150 } 151 else { 152 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort 153 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator); 154 Arrays.parallelSort(flattenedData, comparator); 155 return Nodes.node(flattenedData); 156 } 157 } 158 } 159 160 /** Specialized subtype for sorting int streams */ 161 private static final class OfInt extends IntPipeline.StatefulOp<Integer> { 162 OfInt(AbstractPipeline<?, Integer, ?> upstream) { 163 super(upstream, StreamShape.INT_VALUE, 164 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 165 } 166 167 @Override 168 public Sink<Integer> opWrapSink(int flags, Sink sink) { 169 Objects.requireNonNull(sink); 170 171 if (StreamOpFlag.SORTED.isKnown(flags)) 172 return sink; 173 else if (StreamOpFlag.SIZED.isKnown(flags)) 174 return new SizedIntSortingSink(sink); 175 else 176 return new IntSortingSink(sink); 177 } 178 179 @Override 180 public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 181 Spliterator<P_IN> spliterator, 182 IntFunction<Integer[]> generator) { 183 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 184 return helper.evaluate(spliterator, false, generator); 185 } 186 else { 187 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator); 188 189 int[] content = n.asIntArray(); 190 Arrays.parallelSort(content); 191 192 return Nodes.node(content); 193 } 194 } 195 } 196 197 /** Specialized subtype for sorting long streams */ 198 private static final class OfLong extends LongPipeline.StatefulOp<Long> { 199 OfLong(AbstractPipeline<?, Long, ?> upstream) { 200 super(upstream, StreamShape.LONG_VALUE, 201 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 202 } 203 204 @Override 205 public Sink<Long> opWrapSink(int flags, Sink sink) { 206 Objects.requireNonNull(sink); 207 208 if (StreamOpFlag.SORTED.isKnown(flags)) 209 return sink; 210 else if (StreamOpFlag.SIZED.isKnown(flags)) 211 return new SizedLongSortingSink(sink); 212 else 213 return new LongSortingSink(sink); 214 } 215 216 @Override 217 public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, 218 Spliterator<P_IN> spliterator, 219 IntFunction<Long[]> generator) { 220 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 221 return helper.evaluate(spliterator, false, generator); 222 } 223 else { 224 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator); 225 226 long[] content = n.asLongArray(); 227 Arrays.parallelSort(content); 228 229 return Nodes.node(content); 230 } 231 } 232 } 233 234 /** Specialized subtype for sorting double streams */ 235 private static final class OfDouble extends DoublePipeline.StatefulOp<Double> { 236 OfDouble(AbstractPipeline<?, Double, ?> upstream) { 237 super(upstream, StreamShape.DOUBLE_VALUE, 238 StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); 239 } 240 241 @Override 242 public Sink<Double> opWrapSink(int flags, Sink sink) { 243 Objects.requireNonNull(sink); 244 245 if (StreamOpFlag.SORTED.isKnown(flags)) 246 return sink; 247 else if (StreamOpFlag.SIZED.isKnown(flags)) 248 return new SizedDoubleSortingSink(sink); 249 else 250 return new DoubleSortingSink(sink); 251 } 252 253 @Override 254 public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 255 Spliterator<P_IN> spliterator, 256 IntFunction<Double[]> generator) { 257 if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) { 258 return helper.evaluate(spliterator, false, generator); 259 } 260 else { 261 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator); 262 263 double[] content = n.asDoubleArray(); 264 Arrays.parallelSort(content); 265 266 return Nodes.node(content); 267 } 268 } 269 } 270 271 /** {@link ForkJoinTask} for implementing sort on SIZED reference streams */ 272 private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T> { 273 private final Comparator<? super T> comparator; 274 private T[] array; 275 private int offset; 276 277 SizedRefSortingSink(Sink sink, Comparator<? super T> comparator) { 278 super(sink); 279 this.comparator = comparator; 280 } 281 282 @Override 283 public void begin(long size) { 284 if (size >= Nodes.MAX_ARRAY_SIZE) 285 throw new IllegalArgumentException("Stream size exceeds max array size"); 286 array = (T[]) new Object[(int) size]; 287 } 288 289 @Override 290 public void end() { 291 // Need to use offset rather than array.length since the downstream 292 // many be short-circuiting 293 // @@@ A better approach is to know if the downstream short-circuits 294 // and check sink.cancellationRequested 295 Arrays.sort(array, 0, offset, comparator); 296 downstream.begin(offset); 297 for (int i = 0; i < offset; i++) 298 downstream.accept(array[i]); 299 downstream.end(); 300 array = null; 301 } 302 303 @Override 304 public void accept(T t) { 305 array[offset++] = t; 306 } 307 } 308 309 /** {@link Sink} for implementing sort on reference streams */ 310 private static final class RefSortingSink<T> extends Sink.ChainedReference<T> { 311 private final Comparator<? super T> comparator; 312 private ArrayList<T> list; 313 314 RefSortingSink(Sink sink, Comparator<? super T> comparator) { 315 super(sink); 316 this.comparator = comparator; 317 } 318 319 @Override 320 public void begin(long size) { 321 list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>(); 322 } 323 324 @Override 325 public void end() { 326 list.sort(comparator); 327 downstream.begin(list.size()); 328 list.forEach(downstream::accept); 329 downstream.end(); 330 list = null; 331 } 332 333 @Override 334 public void accept(T t) { 335 list.add(t); 336 } 337 } 338 339 /** {@link Sink} for implementing sort on SIZED int streams */ 340 private static final class SizedIntSortingSink extends Sink.ChainedInt { 341 private int[] array; 342 private int offset; 343 344 SizedIntSortingSink(Sink downstream) { 345 super(downstream); 346 } 347 348 @Override 349 public void begin(long size) { 350 if (size >= Nodes.MAX_ARRAY_SIZE) 351 throw new IllegalArgumentException("Stream size exceeds max array size"); 352 array = new int[(int) size]; 353 } 354 355 @Override 356 public void end() { 357 Arrays.sort(array, 0, offset); 358 downstream.begin(offset); 359 for (int i = 0; i < offset; i++) 360 downstream.accept(array[i]); 361 downstream.end(); 362 array = null; 363 } 364 365 @Override 366 public void accept(int t) { 367 array[offset++] = t; 368 } 369 } 370 371 /** {@link Sink} for implementing sort on int streams */ 372 private static final class IntSortingSink extends Sink.ChainedInt { 373 private SpinedBuffer.OfInt b; 374 375 IntSortingSink(Sink sink) { 376 super(sink); 377 } 378 379 @Override 380 public void begin(long size) { 381 b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt(); 382 } 383 384 @Override 385 public void end() { 386 int[] ints = b.asIntArray(); 387 Arrays.sort(ints); 388 downstream.begin(ints.length); 389 for (int anInt : ints) 390 downstream.accept(anInt); 391 downstream.end(); 392 } 393 394 @Override 395 public void accept(int t) { 396 b.accept(t); 397 } 398 } 399 400 /** {@link Sink} for implementing sort on SIZED long streams */ 401 private static final class SizedLongSortingSink extends Sink.ChainedLong { 402 private long[] array; 403 private int offset; 404 405 SizedLongSortingSink(Sink downstream) { 406 super(downstream); 407 } 408 409 @Override 410 public void begin(long size) { 411 if (size >= Nodes.MAX_ARRAY_SIZE) 412 throw new IllegalArgumentException("Stream size exceeds max array size"); 413 array = new long[(int) size]; 414 } 415 416 @Override 417 public void end() { 418 Arrays.sort(array, 0, offset); 419 downstream.begin(offset); 420 for (int i = 0; i < offset; i++) 421 downstream.accept(array[i]); 422 downstream.end(); 423 array = null; 424 } 425 426 @Override 427 public void accept(long t) { 428 array[offset++] = t; 429 } 430 } 431 432 /** {@link Sink} for implementing sort on long streams */ 433 private static final class LongSortingSink extends Sink.ChainedLong { 434 private SpinedBuffer.OfLong b; 435 436 LongSortingSink(Sink sink) { 437 super(sink); 438 } 439 440 @Override 441 public void begin(long size) { 442 b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong(); 443 } 444 445 @Override 446 public void end() { 447 long[] longs = b.asLongArray(); 448 Arrays.sort(longs); 449 downstream.begin(longs.length); 450 for (long aLong : longs) 451 downstream.accept(aLong); 452 downstream.end(); 453 } 454 455 @Override 456 public void accept(long t) { 457 b.accept(t); 458 } 459 } 460 461 /** {@link Sink} for implementing sort on SIZED double streams */ 462 private static final class SizedDoubleSortingSink extends Sink.ChainedDouble { 463 private double[] array; 464 private int offset; 465 466 SizedDoubleSortingSink(Sink downstream) { 467 super(downstream); 468 } 469 470 @Override 471 public void begin(long size) { 472 if (size >= Nodes.MAX_ARRAY_SIZE) 473 throw new IllegalArgumentException("Stream size exceeds max array size"); 474 array = new double[(int) size]; 475 } 476 477 @Override 478 public void end() { 479 Arrays.sort(array, 0, offset); 480 downstream.begin(offset); 481 for (int i = 0; i < offset; i++) 482 downstream.accept(array[i]); 483 downstream.end(); 484 array = null; 485 } 486 487 @Override 488 public void accept(double t) { 489 array[offset++] = t; 490 } 491 } 492 493 /** {@link Sink} for implementing sort on double streams */ 494 private static final class DoubleSortingSink extends Sink.ChainedDouble { 495 private SpinedBuffer.OfDouble b; 496 497 DoubleSortingSink(Sink sink) { 498 super(sink); 499 } 500 501 @Override 502 public void begin(long size) { 503 b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble(); 504 } 505 506 @Override 507 public void end() { 508 double[] doubles = b.asDoubleArray(); 509 Arrays.sort(doubles); 510 downstream.begin(doubles.length); 511 for (double aDouble : doubles) 512 downstream.accept(aDouble); 513 downstream.end(); 514 } 515 516 @Override 517 public void accept(double t) { 518 b.accept(t); 519 } 520 } 521 }