1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Comparator; 28 import java.util.Spliterator; 29 import java.util.function.BooleanSupplier; 30 import java.util.function.Consumer; 31 import java.util.function.DoubleConsumer; 32 import java.util.function.IntConsumer; 33 import java.util.function.LongConsumer; 34 import java.util.function.Supplier; 35 36 /** 37 * Spliterator implementations for wrapping and delegating spliterators, used 38 * in the implementation of the {@link Stream#spliterator()} method. 39 * 40 * @since 1.8 41 */ 42 class StreamSpliterators { 43 44 /** 45 * Abstract wrapping spliterator that binds to the spliterator of a 46 * pipeline helper on first operation. 47 * 48 * <p>This spliterator is not late-binding and will bind to the source 49 * spliterator when first operated on. 50 * 51 * <p>A wrapping spliterator produced from a sequential stream 52 * cannot be split if there are stateful operations present. 53 */ 54 private static abstract class AbstractWrappingSpliterator<P_IN, P_OUT, 55 T_BUFFER extends AbstractSpinedBuffer> 56 implements Spliterator<P_OUT> { 57 58 // @@@ Detect if stateful operations are present or not 59 // If not then can split otherwise cannot 60 61 /** True if this spliterator supports splitting */ 62 final boolean isParallel; 63 64 final PipelineHelper<P_OUT> ph; 65 66 /** 67 * Supplier for the source spliterator. Client provides either a 68 * spliterator or a supplier. 69 */ 70 private Supplier<Spliterator<P_IN>> spliteratorSupplier; 71 72 /** 73 * Source spliterator. Either provided from client or obtained from 74 * supplier. 75 */ 76 Spliterator<P_IN> spliterator; 77 78 /** 79 * Sink chain for the downstream stages of the pipeline, ultimately 80 * leading to the buffer. Used during partial traversal. 81 */ 82 Sink<P_IN> bufferSink; 83 84 /** 85 * A function that advances one element of the spliterator, pushing 86 * it to bufferSink. Returns whether any elements were processed. 87 * Used during partial traversal. 88 */ 89 BooleanSupplier pusher; 90 91 /** Next element to consume from the buffer, used during partial traversal */ 92 long nextToConsume; 93 94 /** Buffer into which elements are pushed. Used during partial traversal. */ 95 T_BUFFER buffer; 96 97 /** 98 * True if full traversal has occurred (with possible cancelation). 99 * If doing a partial traversal, there may be still elements in buffer. 100 */ 101 boolean finished; 102 103 /** Construct an AbstractWrappingSpliterator from a 104 * {@code Supplier<Spliterator>}. */ 105 AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, 106 Supplier<Spliterator<P_IN>> spliteratorSupplier, 107 boolean parallel) { 108 this.ph = ph; 109 this.spliteratorSupplier = spliteratorSupplier; 110 this.spliterator = null; 111 this.isParallel = parallel; 112 } 113 114 /** Construct an AbstractWrappingSpliterator from a 115 * {@code Spliterator}. */ 116 AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, 117 Spliterator<P_IN> spliterator, 118 boolean parallel) { 119 this.ph = ph; 120 this.spliteratorSupplier = null; 121 this.spliterator = spliterator; 122 this.isParallel = parallel; 123 } 124 125 /** Called before advancing to set up spliterator, if needed */ 126 final void init() { 127 if (spliterator == null) { 128 spliterator = spliteratorSupplier.get(); 129 spliteratorSupplier = null; 130 } 131 } 132 133 /** 134 * Get an element from the source, pushing it into the sink chain, 135 * setting up the buffer if needed 136 * @return whether there are elements to consume from the buffer 137 */ 138 final boolean doAdvance() { 139 if (buffer == null) { 140 if (finished) 141 return false; 142 143 init(); 144 initPartialTraversalState(); 145 nextToConsume = 0; 146 bufferSink.begin(spliterator.getExactSizeIfKnown()); 147 return fillBuffer(); 148 } 149 else { 150 ++nextToConsume; 151 boolean hasNext = nextToConsume < buffer.count(); 152 if (!hasNext) { 153 nextToConsume = 0; 154 buffer.clear(); 155 hasNext = fillBuffer(); 156 } 157 return hasNext; 158 } 159 } 160 161 /** 162 * Invokes the shape-specific constructor with the provided arguments 163 * and returns the result 164 */ 165 abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?> wrap(Spliterator<P_IN> s); 166 167 /** 168 * Initializes buffer, sink chain, and pusher for a shape-specific 169 * implementation 170 */ 171 abstract void initPartialTraversalState(); 172 173 @Override 174 public Spliterator<P_OUT> trySplit() { 175 if (isParallel && !finished) { 176 init(); 177 178 Spliterator<P_IN> split = spliterator.trySplit(); 179 return (split == null) ? null : wrap(split); 180 } 181 else 182 return null; 183 } 184 185 /** 186 * If the buffer is empty, push elements into the sink chain until 187 * the source is empty or cancellation is requested. 188 * @return whether there are elements to consume from the buffer 189 */ 190 private boolean fillBuffer() { 191 while (buffer.count() == 0) { 192 if (bufferSink.cancellationRequested() || !pusher.getAsBoolean()) { 193 if (finished) 194 return false; 195 else { 196 bufferSink.end(); // might trigger more elements 197 finished = true; 198 } 199 } 200 } 201 return true; 202 } 203 204 @Override 205 public final long estimateSize() { 206 init(); 207 return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags()) 208 ? spliterator.estimateSize() 209 : Long.MAX_VALUE; 210 } 211 212 @Override 213 public final long getExactSizeIfKnown() { 214 init(); 215 return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags()) 216 ? spliterator.getExactSizeIfKnown() 217 : -1; 218 } 219 220 @Override 221 public final int characteristics() { 222 init(); 223 224 // Get the characteristics from the pipeline 225 int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags())); 226 227 // Mask off the size and uniform characteristics and replace with 228 // those of the spliterator 229 // Note that a non-uniform spliterator can change from something 230 // with an exact size to an estimate for a sub-split, for example 231 // with HashSet where the size is known at the top level spliterator 232 // but for sub-splits only an estimate is known 233 if ((c & Spliterator.SIZED) != 0) { 234 c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED); 235 c |= (spliterator.characteristics() & Spliterator.SIZED & Spliterator.SUBSIZED); 236 } 237 238 return c; 239 } 240 241 @Override 242 public Comparator<? super P_OUT> getComparator() { 243 if (!hasCharacteristics(SORTED)) 244 throw new IllegalStateException(); 245 return null; 246 } 247 248 @Override 249 public final String toString() { 250 return String.format("%s[%s]", getClass().getName(), spliterator); 251 } 252 } 253 254 static final class WrappingSpliterator<P_IN, P_OUT> 255 extends AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> { 256 257 WrappingSpliterator(PipelineHelper<P_OUT> ph, 258 Supplier<Spliterator<P_IN>> supplier, 259 boolean parallel) { 260 super(ph, supplier, parallel); 261 } 262 263 WrappingSpliterator(PipelineHelper<P_OUT> ph, 264 Spliterator<P_IN> spliterator, 265 boolean parallel) { 266 super(ph, spliterator, parallel); 267 } 268 269 @Override 270 WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> s) { 271 return new WrappingSpliterator<>(ph, s, isParallel); 272 } 273 274 @Override 275 void initPartialTraversalState() { 276 SpinedBuffer<P_OUT> b = new SpinedBuffer<>(); 277 buffer = b; 278 bufferSink = ph.wrapSink(b::accept); 279 pusher = () -> spliterator.tryAdvance(bufferSink); 280 } 281 282 @Override 283 public boolean tryAdvance(Consumer<? super P_OUT> consumer) { 284 boolean hasNext = doAdvance(); 285 if (hasNext) 286 consumer.accept(buffer.get(nextToConsume)); 287 return hasNext; 288 } 289 290 @Override 291 public void forEachRemaining(Consumer<? super P_OUT> consumer) { 292 if (buffer == null && !finished) { 293 init(); 294 295 ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator); 296 finished = true; 297 } 298 else { 299 while (tryAdvance(consumer)) { } 300 } 301 } 302 } 303 304 static final class IntWrappingSpliterator<P_IN> 305 extends AbstractWrappingSpliterator<P_IN, Integer, SpinedBuffer.OfInt> 306 implements Spliterator.OfInt { 307 308 IntWrappingSpliterator(PipelineHelper<Integer> ph, 309 Supplier<Spliterator<P_IN>> supplier, 310 boolean parallel) { 311 super(ph, supplier, parallel); 312 } 313 314 IntWrappingSpliterator(PipelineHelper<Integer> ph, 315 Spliterator<P_IN> spliterator, 316 boolean parallel) { 317 super(ph, spliterator, parallel); 318 } 319 320 @Override 321 AbstractWrappingSpliterator<P_IN, Integer, ?> wrap(Spliterator<P_IN> s) { 322 return new IntWrappingSpliterator<>(ph, s, isParallel); 323 } 324 325 @Override 326 void initPartialTraversalState() { 327 SpinedBuffer.OfInt b = new SpinedBuffer.OfInt(); 328 buffer = b; 329 bufferSink = ph.wrapSink((Sink.OfInt) b::accept); 330 pusher = () -> spliterator.tryAdvance(bufferSink); 331 } 332 333 @Override 334 public Spliterator.OfInt trySplit() { 335 return (Spliterator.OfInt) super.trySplit(); 336 } 337 338 @Override 339 public boolean tryAdvance(IntConsumer consumer) { 340 boolean hasNext = doAdvance(); 341 if (hasNext) 342 consumer.accept(buffer.get(nextToConsume)); 343 return hasNext; 344 } 345 346 @Override 347 public void forEachRemaining(IntConsumer consumer) { 348 if (buffer == null && !finished) { 349 init(); 350 351 ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator); 352 finished = true; 353 } 354 else { 355 while (tryAdvance(consumer)) { } 356 } 357 } 358 } 359 360 static final class LongWrappingSpliterator<P_IN> 361 extends AbstractWrappingSpliterator<P_IN, Long, SpinedBuffer.OfLong> 362 implements Spliterator.OfLong { 363 364 LongWrappingSpliterator(PipelineHelper<Long> ph, 365 Supplier<Spliterator<P_IN>> supplier, 366 boolean parallel) { 367 super(ph, supplier, parallel); 368 } 369 370 LongWrappingSpliterator(PipelineHelper<Long> ph, 371 Spliterator<P_IN> spliterator, 372 boolean parallel) { 373 super(ph, spliterator, parallel); 374 } 375 376 @Override 377 AbstractWrappingSpliterator<P_IN, Long, ?> wrap(Spliterator<P_IN> s) { 378 return new LongWrappingSpliterator<>(ph, s, isParallel); 379 } 380 381 @Override 382 void initPartialTraversalState() { 383 SpinedBuffer.OfLong b = new SpinedBuffer.OfLong(); 384 buffer = b; 385 bufferSink = ph.wrapSink((Sink.OfLong) b::accept); 386 pusher = () -> spliterator.tryAdvance(bufferSink); 387 } 388 389 @Override 390 public Spliterator.OfLong trySplit() { 391 return (Spliterator.OfLong) super.trySplit(); 392 } 393 394 @Override 395 public boolean tryAdvance(LongConsumer consumer) { 396 boolean hasNext = doAdvance(); 397 if (hasNext) 398 consumer.accept(buffer.get(nextToConsume)); 399 return hasNext; 400 } 401 402 @Override 403 public void forEachRemaining(LongConsumer consumer) { 404 if (buffer == null && !finished) { 405 init(); 406 407 ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator); 408 finished = true; 409 } 410 else { 411 while (tryAdvance(consumer)) { } 412 } 413 } 414 } 415 416 static final class DoubleWrappingSpliterator<P_IN> 417 extends AbstractWrappingSpliterator<P_IN, Double, SpinedBuffer.OfDouble> 418 implements Spliterator.OfDouble { 419 420 DoubleWrappingSpliterator(PipelineHelper<Double> ph, 421 Supplier<Spliterator<P_IN>> supplier, 422 boolean parallel) { 423 super(ph, supplier, parallel); 424 } 425 426 DoubleWrappingSpliterator(PipelineHelper<Double> ph, 427 Spliterator<P_IN> spliterator, 428 boolean parallel) { 429 super(ph, spliterator, parallel); 430 } 431 432 @Override 433 AbstractWrappingSpliterator<P_IN, Double, ?> wrap(Spliterator<P_IN> s) { 434 return new DoubleWrappingSpliterator<>(ph, s, isParallel); 435 } 436 437 @Override 438 void initPartialTraversalState() { 439 SpinedBuffer.OfDouble b = new SpinedBuffer.OfDouble(); 440 buffer = b; 441 bufferSink = ph.wrapSink((Sink.OfDouble) b::accept); 442 pusher = () -> spliterator.tryAdvance(bufferSink); 443 } 444 445 @Override 446 public Spliterator.OfDouble trySplit() { 447 return (Spliterator.OfDouble) super.trySplit(); 448 } 449 450 @Override 451 public boolean tryAdvance(DoubleConsumer consumer) { 452 boolean hasNext = doAdvance(); 453 if (hasNext) 454 consumer.accept(buffer.get(nextToConsume)); 455 return hasNext; 456 } 457 458 @Override 459 public void forEachRemaining(DoubleConsumer consumer) { 460 if (buffer == null && !finished) { 461 init(); 462 463 ph.wrapAndCopyInto((Sink.OfDouble) consumer::accept, spliterator); 464 finished = true; 465 } 466 else { 467 while (tryAdvance(consumer)) { } 468 } 469 } 470 } 471 472 /** 473 * Spliterator implementation that delegates to an underlying spliterator, 474 * acquiring the spliterator from a {@code Supplier<Spliterator>} on the 475 * first call to any spliterator method. 476 * @param <T> 477 */ 478 static class DelegatingSpliterator<T> implements Spliterator<T> { 479 private final Supplier<Spliterator<T>> supplier; 480 481 private Spliterator<T> s; 482 483 @SuppressWarnings("unchecked") 484 DelegatingSpliterator(Supplier<? extends Spliterator<T>> supplier) { 485 this.supplier = (Supplier<Spliterator<T>>) supplier; 486 } 487 488 Spliterator<T> get() { 489 if (s == null) { 490 s = supplier.get(); 491 } 492 return s; 493 } 494 495 @Override 496 public Spliterator<T> trySplit() { 497 return get().trySplit(); 498 } 499 500 @Override 501 public boolean tryAdvance(Consumer<? super T> consumer) { 502 return get().tryAdvance(consumer); 503 } 504 505 @Override 506 public void forEachRemaining(Consumer<? super T> consumer) { 507 get().forEachRemaining(consumer); 508 } 509 510 @Override 511 public long estimateSize() { 512 return get().estimateSize(); 513 } 514 515 @Override 516 public int characteristics() { 517 return get().characteristics(); 518 } 519 520 @Override 521 public Comparator<? super T> getComparator() { 522 return get().getComparator(); 523 } 524 525 @Override 526 public long getExactSizeIfKnown() { 527 return get().getExactSizeIfKnown(); 528 } 529 530 @Override 531 public String toString() { 532 return getClass().getName() + "[" + get() + "]"; 533 } 534 535 static final class OfInt extends DelegatingSpliterator<Integer> implements Spliterator.OfInt { 536 private Spliterator.OfInt s; 537 538 OfInt(Supplier<Spliterator.OfInt> supplier) { 539 super(supplier); 540 } 541 542 @Override 543 Spliterator.OfInt get() { 544 if (s == null) { 545 s = (Spliterator.OfInt) super.get(); 546 } 547 return s; 548 } 549 550 @Override 551 public Spliterator.OfInt trySplit() { 552 return get().trySplit(); 553 } 554 555 @Override 556 public boolean tryAdvance(IntConsumer consumer) { 557 return get().tryAdvance(consumer); 558 } 559 560 @Override 561 public void forEachRemaining(IntConsumer consumer) { 562 get().forEachRemaining(consumer); 563 } 564 } 565 566 static final class OfLong extends DelegatingSpliterator<Long> implements Spliterator.OfLong { 567 private Spliterator.OfLong s; 568 569 OfLong(Supplier<Spliterator.OfLong> supplier) { 570 super(supplier); 571 } 572 573 @Override 574 Spliterator.OfLong get() { 575 if (s == null) { 576 s = (Spliterator.OfLong) super.get(); 577 } 578 return s; 579 } 580 581 @Override 582 public Spliterator.OfLong trySplit() { 583 return get().trySplit(); 584 } 585 586 @Override 587 public boolean tryAdvance(LongConsumer consumer) { 588 return get().tryAdvance(consumer); 589 } 590 591 @Override 592 public void forEachRemaining(LongConsumer consumer) { 593 get().forEachRemaining(consumer); 594 } 595 } 596 597 static final class OfDouble extends DelegatingSpliterator<Double> implements Spliterator.OfDouble { 598 private Spliterator.OfDouble s; 599 600 OfDouble(Supplier<Spliterator.OfDouble> supplier) { 601 super(supplier); 602 } 603 604 @Override 605 Spliterator.OfDouble get() { 606 if (s == null) { 607 s = (Spliterator.OfDouble) super.get(); 608 } 609 return s; 610 } 611 612 @Override 613 public Spliterator.OfDouble trySplit() { 614 return get().trySplit(); 615 } 616 617 @Override 618 public boolean tryAdvance(DoubleConsumer consumer) { 619 return get().tryAdvance(consumer); 620 } 621 622 @Override 623 public void forEachRemaining(DoubleConsumer consumer) { 624 get().forEachRemaining(consumer); 625 } 626 } 627 } 628 }