1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Comparator;
  28 import java.util.Spliterator;
  29 import java.util.function.BooleanSupplier;
  30 import java.util.function.Consumer;
  31 import java.util.function.DoubleConsumer;
  32 import java.util.function.IntConsumer;
  33 import java.util.function.LongConsumer;
  34 import java.util.function.Supplier;
  35 
  36 /**
  37  * Spliterator implementations for wrapping and delegating spliterators, used
  38  * in the implementation of the {@link Stream#spliterator()} method.
  39  *
  40  * @since 1.8
  41  */
  42 class StreamSpliterators {
  43 
  44     /**
  45      * Abstract wrapping spliterator that binds to the spliterator of a
  46      * pipeline helper on first operation.
  47      *
  48      * <p>This spliterator is not late-binding and will bind to the source
  49      * spliterator when first operated on.
  50      *
  51      * <p>A wrapping spliterator produced from a sequential stream
  52      * cannot be split if there are stateful operations present.
  53      */
  54     private static abstract class AbstractWrappingSpliterator<P_IN, P_OUT,
  55                                                               T_BUFFER extends AbstractSpinedBuffer>
  56             implements Spliterator<P_OUT> {
  57 
  58         // @@@ Detect if stateful operations are present or not
  59         //     If not then can split otherwise cannot
  60 
  61         /** True if this spliterator supports splitting */
  62         final boolean isParallel;
  63 
  64         final PipelineHelper<P_OUT> ph;
  65 
  66         /**
  67          * Supplier for the source spliterator.  Client provides either a
  68          * spliterator or a supplier.
  69          */
  70         private Supplier<Spliterator<P_IN>> spliteratorSupplier;
  71 
  72         /**
  73          * Source spliterator.  Either provided from client or obtained from
  74          * supplier.
  75          */
  76         Spliterator<P_IN> spliterator;
  77 
  78         /**
  79          * Sink chain for the downstream stages of the pipeline, ultimately
  80          * leading to the buffer. Used during partial traversal.
  81          */
  82         Sink<P_IN> bufferSink;
  83 
  84         /**
  85          * A function that advances one element of the spliterator, pushing
  86          * it to bufferSink.  Returns whether any elements were processed.
  87          * Used during partial traversal.
  88          */
  89         BooleanSupplier pusher;
  90 
  91         /** Next element to consume from the buffer, used during partial traversal */
  92         long nextToConsume;
  93 
  94         /** Buffer into which elements are pushed.  Used during partial traversal. */
  95         T_BUFFER buffer;
  96 
  97         /**
  98          * True if full traversal has occurred (with possible cancelation).
  99          * If doing a partial traversal, there may be still elements in buffer.
 100          */
 101         boolean finished;
 102 
 103         /** Construct an AbstractWrappingSpliterator from a
 104          * {@code Supplier<Spliterator>}. */
 105         AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
 106                                     Supplier<Spliterator<P_IN>> spliteratorSupplier,
 107                                     boolean parallel) {
 108             this.ph = ph;
 109             this.spliteratorSupplier = spliteratorSupplier;
 110             this.spliterator = null;
 111             this.isParallel = parallel;
 112         }
 113 
 114         /** Construct an AbstractWrappingSpliterator from a
 115          * {@code Spliterator}. */
 116         AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
 117                                     Spliterator<P_IN> spliterator,
 118                                     boolean parallel) {
 119             this.ph = ph;
 120             this.spliteratorSupplier = null;
 121             this.spliterator = spliterator;
 122             this.isParallel = parallel;
 123         }
 124 
 125         /** Called before advancing to set up spliterator, if needed */
 126         final void init() {
 127             if (spliterator == null) {
 128                 spliterator = spliteratorSupplier.get();
 129                 spliteratorSupplier = null;
 130             }
 131         }
 132 
 133         /**
 134          * Get an element from the source, pushing it into the sink chain,
 135          * setting up the buffer if needed
 136          * @return whether there are elements to consume from the buffer
 137          */
 138         final boolean doAdvance() {
 139             if (buffer == null) {
 140                 if (finished)
 141                     return false;
 142 
 143                 init();
 144                 initPartialTraversalState();
 145                 nextToConsume = 0;
 146                 bufferSink.begin(spliterator.getExactSizeIfKnown());
 147                 return fillBuffer();
 148             }
 149             else {
 150                 ++nextToConsume;
 151                 boolean hasNext = nextToConsume < buffer.count();
 152                 if (!hasNext) {
 153                     nextToConsume = 0;
 154                     buffer.clear();
 155                     hasNext = fillBuffer();
 156                 }
 157                 return hasNext;
 158             }
 159         }
 160 
 161         /**
 162          * Invokes the shape-specific constructor with the provided arguments
 163          * and returns the result
 164          */
 165         abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?> wrap(Spliterator<P_IN> s);
 166 
 167         /**
 168          * Initializes buffer, sink chain, and pusher for a shape-specific
 169          * implementation
 170          */
 171         abstract void initPartialTraversalState();
 172 
 173         @Override
 174         public Spliterator<P_OUT> trySplit() {
 175             if (isParallel && !finished) {
 176                 init();
 177 
 178                 Spliterator<P_IN> split = spliterator.trySplit();
 179                 return (split == null) ? null : wrap(split);
 180             }
 181             else
 182                 return null;
 183         }
 184 
 185         /**
 186          * If the buffer is empty, push elements into the sink chain until
 187          * the source is empty or cancellation is requested.
 188          * @return whether there are elements to consume from the buffer
 189          */
 190         private boolean fillBuffer() {
 191             while (buffer.count() == 0) {
 192                 if (bufferSink.cancellationRequested() || !pusher.getAsBoolean()) {
 193                     if (finished)
 194                         return false;
 195                     else {
 196                         bufferSink.end(); // might trigger more elements
 197                         finished = true;
 198                     }
 199                 }
 200             }
 201             return true;
 202         }
 203 
 204         @Override
 205         public final long estimateSize() {
 206             init();
 207             return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags())
 208                    ? spliterator.estimateSize()
 209                    : Long.MAX_VALUE;
 210         }
 211 
 212         @Override
 213         public final long getExactSizeIfKnown() {
 214             init();
 215             return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags())
 216                    ? spliterator.getExactSizeIfKnown()
 217                    : -1;
 218         }
 219 
 220         @Override
 221         public final int characteristics() {
 222             init();
 223 
 224             // Get the characteristics from the pipeline
 225             int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags()));
 226 
 227             // Mask off the size and uniform characteristics and replace with
 228             // those of the spliterator
 229             // Note that a non-uniform spliterator can change from something
 230             // with an exact size to an estimate for a sub-split, for example
 231             // with HashSet where the size is known at the top level spliterator
 232             // but for sub-splits only an estimate is known
 233             if ((c & Spliterator.SIZED) != 0) {
 234                 c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED);
 235                 c |= (spliterator.characteristics() & Spliterator.SIZED & Spliterator.SUBSIZED);
 236             }
 237 
 238             return c;
 239         }
 240 
 241         @Override
 242         public Comparator<? super P_OUT> getComparator() {
 243             if (!hasCharacteristics(SORTED))
 244                 throw new IllegalStateException();
 245             return null;
 246         }
 247 
 248         @Override
 249         public final String toString() {
 250             return String.format("%s[%s]", getClass().getName(), spliterator);
 251         }
 252     }
 253 
 254     static final class WrappingSpliterator<P_IN, P_OUT>
 255             extends AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> {
 256 
 257         WrappingSpliterator(PipelineHelper<P_OUT> ph,
 258                             Supplier<Spliterator<P_IN>> supplier,
 259                             boolean parallel) {
 260             super(ph, supplier, parallel);
 261         }
 262 
 263         WrappingSpliterator(PipelineHelper<P_OUT> ph,
 264                             Spliterator<P_IN> spliterator,
 265                             boolean parallel) {
 266             super(ph, spliterator, parallel);
 267         }
 268 
 269         @Override
 270         WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> s) {
 271             return new WrappingSpliterator<>(ph, s, isParallel);
 272         }
 273 
 274         @Override
 275         void initPartialTraversalState() {
 276             SpinedBuffer<P_OUT> b = new SpinedBuffer<>();
 277             buffer = b;
 278             bufferSink = ph.wrapSink(b::accept);
 279             pusher = () -> spliterator.tryAdvance(bufferSink);
 280         }
 281 
 282         @Override
 283         public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
 284             boolean hasNext = doAdvance();
 285             if (hasNext)
 286                 consumer.accept(buffer.get(nextToConsume));
 287             return hasNext;
 288         }
 289 
 290         @Override
 291         public void forEachRemaining(Consumer<? super P_OUT> consumer) {
 292             if (buffer == null && !finished) {
 293                 init();
 294 
 295                 ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator);
 296                 finished = true;
 297             }
 298             else {
 299                 while (tryAdvance(consumer)) { }
 300             }
 301         }
 302     }
 303 
 304     static final class IntWrappingSpliterator<P_IN>
 305             extends AbstractWrappingSpliterator<P_IN, Integer, SpinedBuffer.OfInt>
 306             implements Spliterator.OfInt {
 307 
 308         IntWrappingSpliterator(PipelineHelper<Integer> ph,
 309                                Supplier<Spliterator<P_IN>> supplier,
 310                                boolean parallel) {
 311             super(ph, supplier, parallel);
 312         }
 313 
 314         IntWrappingSpliterator(PipelineHelper<Integer> ph,
 315                                Spliterator<P_IN> spliterator,
 316                                boolean parallel) {
 317             super(ph, spliterator, parallel);
 318         }
 319 
 320         @Override
 321         AbstractWrappingSpliterator<P_IN, Integer, ?> wrap(Spliterator<P_IN> s) {
 322             return new IntWrappingSpliterator<>(ph, s, isParallel);
 323         }
 324 
 325         @Override
 326         void initPartialTraversalState() {
 327             SpinedBuffer.OfInt b = new SpinedBuffer.OfInt();
 328             buffer = b;
 329             bufferSink = ph.wrapSink((Sink.OfInt) b::accept);
 330             pusher = () -> spliterator.tryAdvance(bufferSink);
 331         }
 332 
 333         @Override
 334         public Spliterator.OfInt trySplit() {
 335             return (Spliterator.OfInt) super.trySplit();
 336         }
 337 
 338         @Override
 339         public boolean tryAdvance(IntConsumer consumer) {
 340             boolean hasNext = doAdvance();
 341             if (hasNext)
 342                 consumer.accept(buffer.get(nextToConsume));
 343             return hasNext;
 344         }
 345 
 346         @Override
 347         public void forEachRemaining(IntConsumer consumer) {
 348             if (buffer == null && !finished) {
 349                 init();
 350 
 351                 ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator);
 352                 finished = true;
 353             }
 354             else {
 355                 while (tryAdvance(consumer)) { }
 356             }
 357         }
 358     }
 359 
 360     static final class LongWrappingSpliterator<P_IN>
 361             extends AbstractWrappingSpliterator<P_IN, Long, SpinedBuffer.OfLong>
 362             implements Spliterator.OfLong {
 363 
 364         LongWrappingSpliterator(PipelineHelper<Long> ph,
 365                                 Supplier<Spliterator<P_IN>> supplier,
 366                                 boolean parallel) {
 367             super(ph, supplier, parallel);
 368         }
 369 
 370         LongWrappingSpliterator(PipelineHelper<Long> ph,
 371                                 Spliterator<P_IN> spliterator,
 372                                 boolean parallel) {
 373             super(ph, spliterator, parallel);
 374         }
 375 
 376         @Override
 377         AbstractWrappingSpliterator<P_IN, Long, ?> wrap(Spliterator<P_IN> s) {
 378             return new LongWrappingSpliterator<>(ph, s, isParallel);
 379         }
 380 
 381         @Override
 382         void initPartialTraversalState() {
 383             SpinedBuffer.OfLong b = new SpinedBuffer.OfLong();
 384             buffer = b;
 385             bufferSink = ph.wrapSink((Sink.OfLong) b::accept);
 386             pusher = () -> spliterator.tryAdvance(bufferSink);
 387         }
 388 
 389         @Override
 390         public Spliterator.OfLong trySplit() {
 391             return (Spliterator.OfLong) super.trySplit();
 392         }
 393 
 394         @Override
 395         public boolean tryAdvance(LongConsumer consumer) {
 396             boolean hasNext = doAdvance();
 397             if (hasNext)
 398                 consumer.accept(buffer.get(nextToConsume));
 399             return hasNext;
 400         }
 401 
 402         @Override
 403         public void forEachRemaining(LongConsumer consumer) {
 404             if (buffer == null && !finished) {
 405                 init();
 406 
 407                 ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator);
 408                 finished = true;
 409             }
 410             else {
 411                 while (tryAdvance(consumer)) { }
 412             }
 413         }
 414     }
 415 
 416     static final class DoubleWrappingSpliterator<P_IN>
 417             extends AbstractWrappingSpliterator<P_IN, Double, SpinedBuffer.OfDouble>
 418             implements Spliterator.OfDouble {
 419 
 420         DoubleWrappingSpliterator(PipelineHelper<Double> ph,
 421                                   Supplier<Spliterator<P_IN>> supplier,
 422                                   boolean parallel) {
 423             super(ph, supplier, parallel);
 424         }
 425 
 426         DoubleWrappingSpliterator(PipelineHelper<Double> ph,
 427                                   Spliterator<P_IN> spliterator,
 428                                   boolean parallel) {
 429             super(ph, spliterator, parallel);
 430         }
 431 
 432         @Override
 433         AbstractWrappingSpliterator<P_IN, Double, ?> wrap(Spliterator<P_IN> s) {
 434             return new DoubleWrappingSpliterator<>(ph, s, isParallel);
 435         }
 436 
 437         @Override
 438         void initPartialTraversalState() {
 439             SpinedBuffer.OfDouble b = new SpinedBuffer.OfDouble();
 440             buffer = b;
 441             bufferSink = ph.wrapSink((Sink.OfDouble) b::accept);
 442             pusher = () -> spliterator.tryAdvance(bufferSink);
 443         }
 444 
 445         @Override
 446         public Spliterator.OfDouble trySplit() {
 447             return (Spliterator.OfDouble) super.trySplit();
 448         }
 449 
 450         @Override
 451         public boolean tryAdvance(DoubleConsumer consumer) {
 452             boolean hasNext = doAdvance();
 453             if (hasNext)
 454                 consumer.accept(buffer.get(nextToConsume));
 455             return hasNext;
 456         }
 457 
 458         @Override
 459         public void forEachRemaining(DoubleConsumer consumer) {
 460             if (buffer == null && !finished) {
 461                 init();
 462 
 463                 ph.wrapAndCopyInto((Sink.OfDouble) consumer::accept, spliterator);
 464                 finished = true;
 465             }
 466             else {
 467                 while (tryAdvance(consumer)) { }
 468             }
 469         }
 470     }
 471 
 472     /**
 473      * Spliterator implementation that delegates to an underlying spliterator,
 474      * acquiring the spliterator from a {@code Supplier<Spliterator>} on the
 475      * first call to any spliterator method.
 476      * @param <T>
 477      */
 478     static class DelegatingSpliterator<T> implements Spliterator<T> {
 479         private final Supplier<Spliterator<T>> supplier;
 480 
 481         private Spliterator<T> s;
 482 
 483         @SuppressWarnings("unchecked")
 484         DelegatingSpliterator(Supplier<? extends Spliterator<T>> supplier) {
 485             this.supplier = (Supplier<Spliterator<T>>) supplier;
 486         }
 487 
 488         Spliterator<T> get() {
 489             if (s == null) {
 490                 s = supplier.get();
 491             }
 492             return s;
 493         }
 494 
 495         @Override
 496         public Spliterator<T> trySplit() {
 497             return get().trySplit();
 498         }
 499 
 500         @Override
 501         public boolean tryAdvance(Consumer<? super T> consumer) {
 502             return get().tryAdvance(consumer);
 503         }
 504 
 505         @Override
 506         public void forEachRemaining(Consumer<? super T> consumer) {
 507             get().forEachRemaining(consumer);
 508         }
 509 
 510         @Override
 511         public long estimateSize() {
 512             return get().estimateSize();
 513         }
 514 
 515         @Override
 516         public int characteristics() {
 517             return get().characteristics();
 518         }
 519 
 520         @Override
 521         public Comparator<? super T> getComparator() {
 522             return get().getComparator();
 523         }
 524 
 525         @Override
 526         public long getExactSizeIfKnown() {
 527             return get().getExactSizeIfKnown();
 528         }
 529 
 530         @Override
 531         public String toString() {
 532             return getClass().getName() + "[" + get() + "]";
 533         }
 534 
 535         static final class OfInt extends DelegatingSpliterator<Integer> implements Spliterator.OfInt {
 536             private Spliterator.OfInt s;
 537 
 538             OfInt(Supplier<Spliterator.OfInt> supplier) {
 539                 super(supplier);
 540             }
 541 
 542             @Override
 543             Spliterator.OfInt get() {
 544                 if (s == null) {
 545                     s = (Spliterator.OfInt) super.get();
 546                 }
 547                 return s;
 548             }
 549 
 550             @Override
 551             public Spliterator.OfInt trySplit() {
 552                 return get().trySplit();
 553             }
 554 
 555             @Override
 556             public boolean tryAdvance(IntConsumer consumer) {
 557                 return get().tryAdvance(consumer);
 558             }
 559 
 560             @Override
 561             public void forEachRemaining(IntConsumer consumer) {
 562                 get().forEachRemaining(consumer);
 563             }
 564         }
 565 
 566         static final class OfLong extends DelegatingSpliterator<Long> implements Spliterator.OfLong {
 567             private Spliterator.OfLong s;
 568 
 569             OfLong(Supplier<Spliterator.OfLong> supplier) {
 570                 super(supplier);
 571             }
 572 
 573             @Override
 574             Spliterator.OfLong get() {
 575                 if (s == null) {
 576                     s = (Spliterator.OfLong) super.get();
 577                 }
 578                 return s;
 579             }
 580 
 581             @Override
 582             public Spliterator.OfLong trySplit() {
 583                 return get().trySplit();
 584             }
 585 
 586             @Override
 587             public boolean tryAdvance(LongConsumer consumer) {
 588                 return get().tryAdvance(consumer);
 589             }
 590 
 591             @Override
 592             public void forEachRemaining(LongConsumer consumer) {
 593                 get().forEachRemaining(consumer);
 594             }
 595         }
 596 
 597         static final class OfDouble extends DelegatingSpliterator<Double> implements Spliterator.OfDouble {
 598             private Spliterator.OfDouble s;
 599 
 600             OfDouble(Supplier<Spliterator.OfDouble> supplier) {
 601                 super(supplier);
 602             }
 603 
 604             @Override
 605             Spliterator.OfDouble get() {
 606                 if (s == null) {
 607                     s = (Spliterator.OfDouble) super.get();
 608                 }
 609                 return s;
 610             }
 611 
 612             @Override
 613             public Spliterator.OfDouble trySplit() {
 614                 return get().trySplit();
 615             }
 616 
 617             @Override
 618             public boolean tryAdvance(DoubleConsumer consumer) {
 619                 return get().tryAdvance(consumer);
 620             }
 621 
 622             @Override
 623             public void forEachRemaining(DoubleConsumer consumer) {
 624                 get().forEachRemaining(consumer);
 625             }
 626         }
 627     }
 628 }