/* * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.util.stream; import java.util.Comparator; import java.util.Spliterator; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.IntConsumer; import java.util.function.LongConsumer; import java.util.function.Supplier; /** * Spliterator implementations for wrapping and delegating spliterators, used * in the implementation of the {@link Stream#spliterator()} method. * * @since 1.8 */ class StreamSpliterators { /** * Abstract wrapping spliterator that binds to the spliterator of a * pipeline helper on first operation. * *

This spliterator is not late-binding and will bind to the source * spliterator when first operated on. * *

A wrapping spliterator produced from a sequential stream * cannot be split if there are stateful operations present. */ private static abstract class AbstractWrappingSpliterator implements Spliterator { // @@@ Detect if stateful operations are present or not // If not then can split otherwise cannot /** True if this spliterator supports splitting */ final boolean isParallel; final PipelineHelper ph; /** * Supplier for the source spliterator. Client provides either a * spliterator or a supplier. */ private Supplier> spliteratorSupplier; /** * Source spliterator. Either provided from client or obtained from * supplier. */ Spliterator spliterator; /** * Sink chain for the downstream stages of the pipeline, ultimately * leading to the buffer. Used during partial traversal. */ Sink bufferSink; /** * A function that advances one element of the spliterator, pushing * it to bufferSink. Returns whether any elements were processed. * Used during partial traversal. */ BooleanSupplier pusher; /** Next element to consume from the buffer, used during partial traversal */ long nextToConsume; /** Buffer into which elements are pushed. Used during partial traversal. */ T_BUFFER buffer; /** * True if full traversal has occurred (with possible cancelation). * If doing a partial traversal, there may be still elements in buffer. */ boolean finished; /** Construct an AbstractWrappingSpliterator from a * {@code Supplier}. */ AbstractWrappingSpliterator(PipelineHelper ph, Supplier> spliteratorSupplier, boolean parallel) { this.ph = ph; this.spliteratorSupplier = spliteratorSupplier; this.spliterator = null; this.isParallel = parallel; } /** Construct an AbstractWrappingSpliterator from a * {@code Spliterator}. */ AbstractWrappingSpliterator(PipelineHelper ph, Spliterator spliterator, boolean parallel) { this.ph = ph; this.spliteratorSupplier = null; this.spliterator = spliterator; this.isParallel = parallel; } /** Called before advancing to set up spliterator, if needed */ final void init() { if (spliterator == null) { spliterator = spliteratorSupplier.get(); spliteratorSupplier = null; } } /** * Get an element from the source, pushing it into the sink chain, * setting up the buffer if needed * @return whether there are elements to consume from the buffer */ final boolean doAdvance() { if (buffer == null) { if (finished) return false; init(); initPartialTraversalState(); nextToConsume = 0; bufferSink.begin(spliterator.getExactSizeIfKnown()); return fillBuffer(); } else { ++nextToConsume; boolean hasNext = nextToConsume < buffer.count(); if (!hasNext) { nextToConsume = 0; buffer.clear(); hasNext = fillBuffer(); } return hasNext; } } /** * Invokes the shape-specific constructor with the provided arguments * and returns the result */ abstract AbstractWrappingSpliterator wrap(Spliterator s); /** * Initializes buffer, sink chain, and pusher for a shape-specific * implementation */ abstract void initPartialTraversalState(); @Override public Spliterator trySplit() { if (isParallel && !finished) { init(); Spliterator split = spliterator.trySplit(); return (split == null) ? null : wrap(split); } else return null; } /** * If the buffer is empty, push elements into the sink chain until * the source is empty or cancellation is requested. * @return whether there are elements to consume from the buffer */ private boolean fillBuffer() { while (buffer.count() == 0) { if (bufferSink.cancellationRequested() || !pusher.getAsBoolean()) { if (finished) return false; else { bufferSink.end(); // might trigger more elements finished = true; } } } return true; } @Override public final long estimateSize() { init(); return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags()) ? spliterator.estimateSize() : Long.MAX_VALUE; } @Override public final long getExactSizeIfKnown() { init(); return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1; } @Override public final int characteristics() { init(); // Get the characteristics from the pipeline int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags())); // Mask off the size and uniform characteristics and replace with // those of the spliterator // Note that a non-uniform spliterator can change from something // with an exact size to an estimate for a sub-split, for example // with HashSet where the size is known at the top level spliterator // but for sub-splits only an estimate is known if ((c & Spliterator.SIZED) != 0) { c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED); c |= (spliterator.characteristics() & Spliterator.SIZED & Spliterator.SUBSIZED); } return c; } @Override public Comparator getComparator() { if (!hasCharacteristics(SORTED)) throw new IllegalStateException(); return null; } @Override public final String toString() { return String.format("%s[%s]", getClass().getName(), spliterator); } } static final class WrappingSpliterator extends AbstractWrappingSpliterator> { WrappingSpliterator(PipelineHelper ph, Supplier> supplier, boolean parallel) { super(ph, supplier, parallel); } WrappingSpliterator(PipelineHelper ph, Spliterator spliterator, boolean parallel) { super(ph, spliterator, parallel); } @Override WrappingSpliterator wrap(Spliterator s) { return new WrappingSpliterator<>(ph, s, isParallel); } @Override void initPartialTraversalState() { SpinedBuffer b = new SpinedBuffer<>(); buffer = b; bufferSink = ph.wrapSink(b::accept); pusher = () -> spliterator.tryAdvance(bufferSink); } @Override public boolean tryAdvance(Consumer consumer) { boolean hasNext = doAdvance(); if (hasNext) consumer.accept(buffer.get(nextToConsume)); return hasNext; } @Override public void forEachRemaining(Consumer consumer) { if (buffer == null && !finished) { init(); ph.wrapAndCopyInto((Sink) consumer::accept, spliterator); finished = true; } else { while (tryAdvance(consumer)) { } } } } static final class IntWrappingSpliterator extends AbstractWrappingSpliterator implements Spliterator.OfInt { IntWrappingSpliterator(PipelineHelper ph, Supplier> supplier, boolean parallel) { super(ph, supplier, parallel); } IntWrappingSpliterator(PipelineHelper ph, Spliterator spliterator, boolean parallel) { super(ph, spliterator, parallel); } @Override AbstractWrappingSpliterator wrap(Spliterator s) { return new IntWrappingSpliterator<>(ph, s, isParallel); } @Override void initPartialTraversalState() { SpinedBuffer.OfInt b = new SpinedBuffer.OfInt(); buffer = b; bufferSink = ph.wrapSink((Sink.OfInt) b::accept); pusher = () -> spliterator.tryAdvance(bufferSink); } @Override public Spliterator.OfInt trySplit() { return (Spliterator.OfInt) super.trySplit(); } @Override public boolean tryAdvance(IntConsumer consumer) { boolean hasNext = doAdvance(); if (hasNext) consumer.accept(buffer.get(nextToConsume)); return hasNext; } @Override public void forEachRemaining(IntConsumer consumer) { if (buffer == null && !finished) { init(); ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator); finished = true; } else { while (tryAdvance(consumer)) { } } } } static final class LongWrappingSpliterator extends AbstractWrappingSpliterator implements Spliterator.OfLong { LongWrappingSpliterator(PipelineHelper ph, Supplier> supplier, boolean parallel) { super(ph, supplier, parallel); } LongWrappingSpliterator(PipelineHelper ph, Spliterator spliterator, boolean parallel) { super(ph, spliterator, parallel); } @Override AbstractWrappingSpliterator wrap(Spliterator s) { return new LongWrappingSpliterator<>(ph, s, isParallel); } @Override void initPartialTraversalState() { SpinedBuffer.OfLong b = new SpinedBuffer.OfLong(); buffer = b; bufferSink = ph.wrapSink((Sink.OfLong) b::accept); pusher = () -> spliterator.tryAdvance(bufferSink); } @Override public Spliterator.OfLong trySplit() { return (Spliterator.OfLong) super.trySplit(); } @Override public boolean tryAdvance(LongConsumer consumer) { boolean hasNext = doAdvance(); if (hasNext) consumer.accept(buffer.get(nextToConsume)); return hasNext; } @Override public void forEachRemaining(LongConsumer consumer) { if (buffer == null && !finished) { init(); ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator); finished = true; } else { while (tryAdvance(consumer)) { } } } } static final class DoubleWrappingSpliterator extends AbstractWrappingSpliterator implements Spliterator.OfDouble { DoubleWrappingSpliterator(PipelineHelper ph, Supplier> supplier, boolean parallel) { super(ph, supplier, parallel); } DoubleWrappingSpliterator(PipelineHelper ph, Spliterator spliterator, boolean parallel) { super(ph, spliterator, parallel); } @Override AbstractWrappingSpliterator wrap(Spliterator s) { return new DoubleWrappingSpliterator<>(ph, s, isParallel); } @Override void initPartialTraversalState() { SpinedBuffer.OfDouble b = new SpinedBuffer.OfDouble(); buffer = b; bufferSink = ph.wrapSink((Sink.OfDouble) b::accept); pusher = () -> spliterator.tryAdvance(bufferSink); } @Override public Spliterator.OfDouble trySplit() { return (Spliterator.OfDouble) super.trySplit(); } @Override public boolean tryAdvance(DoubleConsumer consumer) { boolean hasNext = doAdvance(); if (hasNext) consumer.accept(buffer.get(nextToConsume)); return hasNext; } @Override public void forEachRemaining(DoubleConsumer consumer) { if (buffer == null && !finished) { init(); ph.wrapAndCopyInto((Sink.OfDouble) consumer::accept, spliterator); finished = true; } else { while (tryAdvance(consumer)) { } } } } /** * Spliterator implementation that delegates to an underlying spliterator, * acquiring the spliterator from a {@code Supplier} on the * first call to any spliterator method. * @param */ static class DelegatingSpliterator implements Spliterator { private final Supplier> supplier; private Spliterator s; @SuppressWarnings("unchecked") DelegatingSpliterator(Supplier> supplier) { this.supplier = (Supplier>) supplier; } Spliterator get() { if (s == null) { s = supplier.get(); } return s; } @Override public Spliterator trySplit() { return get().trySplit(); } @Override public boolean tryAdvance(Consumer consumer) { return get().tryAdvance(consumer); } @Override public void forEachRemaining(Consumer consumer) { get().forEachRemaining(consumer); } @Override public long estimateSize() { return get().estimateSize(); } @Override public int characteristics() { return get().characteristics(); } @Override public Comparator getComparator() { return get().getComparator(); } @Override public long getExactSizeIfKnown() { return get().getExactSizeIfKnown(); } @Override public String toString() { return getClass().getName() + "[" + get() + "]"; } static final class OfInt extends DelegatingSpliterator implements Spliterator.OfInt { private Spliterator.OfInt s; OfInt(Supplier supplier) { super(supplier); } @Override Spliterator.OfInt get() { if (s == null) { s = (Spliterator.OfInt) super.get(); } return s; } @Override public Spliterator.OfInt trySplit() { return get().trySplit(); } @Override public boolean tryAdvance(IntConsumer consumer) { return get().tryAdvance(consumer); } @Override public void forEachRemaining(IntConsumer consumer) { get().forEachRemaining(consumer); } } static final class OfLong extends DelegatingSpliterator implements Spliterator.OfLong { private Spliterator.OfLong s; OfLong(Supplier supplier) { super(supplier); } @Override Spliterator.OfLong get() { if (s == null) { s = (Spliterator.OfLong) super.get(); } return s; } @Override public Spliterator.OfLong trySplit() { return get().trySplit(); } @Override public boolean tryAdvance(LongConsumer consumer) { return get().tryAdvance(consumer); } @Override public void forEachRemaining(LongConsumer consumer) { get().forEachRemaining(consumer); } } static final class OfDouble extends DelegatingSpliterator implements Spliterator.OfDouble { private Spliterator.OfDouble s; OfDouble(Supplier supplier) { super(supplier); } @Override Spliterator.OfDouble get() { if (s == null) { s = (Spliterator.OfDouble) super.get(); } return s; } @Override public Spliterator.OfDouble trySplit() { return get().trySplit(); } @Override public boolean tryAdvance(DoubleConsumer consumer) { return get().tryAdvance(consumer); } @Override public void forEachRemaining(DoubleConsumer consumer) { get().forEachRemaining(consumer); } } } }