/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.IntConsumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
/**
* Spliterator implementations for wrapping and delegating spliterators, used
* in the implementation of the {@link Stream#spliterator()} method.
*
* @since 1.8
*/
class StreamSpliterators {
/**
* Abstract wrapping spliterator that binds to the spliterator of a
* pipeline helper on first operation.
*
*
This spliterator is not late-binding and will bind to the source
* spliterator when first operated on.
*
*
A wrapping spliterator produced from a sequential stream
* cannot be split if there are stateful operations present.
*/
private static abstract class AbstractWrappingSpliterator
implements Spliterator {
// @@@ Detect if stateful operations are present or not
// If not then can split otherwise cannot
/** True if this spliterator supports splitting */
final boolean isParallel;
final PipelineHelper ph;
/**
* Supplier for the source spliterator. Client provides either a
* spliterator or a supplier.
*/
private Supplier> spliteratorSupplier;
/**
* Source spliterator. Either provided from client or obtained from
* supplier.
*/
Spliterator spliterator;
/**
* Sink chain for the downstream stages of the pipeline, ultimately
* leading to the buffer. Used during partial traversal.
*/
Sink bufferSink;
/**
* A function that advances one element of the spliterator, pushing
* it to bufferSink. Returns whether any elements were processed.
* Used during partial traversal.
*/
BooleanSupplier pusher;
/** Next element to consume from the buffer, used during partial traversal */
long nextToConsume;
/** Buffer into which elements are pushed. Used during partial traversal. */
T_BUFFER buffer;
/**
* True if full traversal has occurred (with possible cancelation).
* If doing a partial traversal, there may be still elements in buffer.
*/
boolean finished;
/** Construct an AbstractWrappingSpliterator from a
* {@code Supplier}. */
AbstractWrappingSpliterator(PipelineHelper ph,
Supplier> spliteratorSupplier,
boolean parallel) {
this.ph = ph;
this.spliteratorSupplier = spliteratorSupplier;
this.spliterator = null;
this.isParallel = parallel;
}
/** Construct an AbstractWrappingSpliterator from a
* {@code Spliterator}. */
AbstractWrappingSpliterator(PipelineHelper ph,
Spliterator spliterator,
boolean parallel) {
this.ph = ph;
this.spliteratorSupplier = null;
this.spliterator = spliterator;
this.isParallel = parallel;
}
/** Called before advancing to set up spliterator, if needed */
final void init() {
if (spliterator == null) {
spliterator = spliteratorSupplier.get();
spliteratorSupplier = null;
}
}
/**
* Get an element from the source, pushing it into the sink chain,
* setting up the buffer if needed
* @return whether there are elements to consume from the buffer
*/
final boolean doAdvance() {
if (buffer == null) {
if (finished)
return false;
init();
initPartialTraversalState();
nextToConsume = 0;
bufferSink.begin(spliterator.getExactSizeIfKnown());
return fillBuffer();
}
else {
++nextToConsume;
boolean hasNext = nextToConsume < buffer.count();
if (!hasNext) {
nextToConsume = 0;
buffer.clear();
hasNext = fillBuffer();
}
return hasNext;
}
}
/**
* Invokes the shape-specific constructor with the provided arguments
* and returns the result
*/
abstract AbstractWrappingSpliterator wrap(Spliterator s);
/**
* Initializes buffer, sink chain, and pusher for a shape-specific
* implementation
*/
abstract void initPartialTraversalState();
@Override
public Spliterator trySplit() {
if (isParallel && !finished) {
init();
Spliterator split = spliterator.trySplit();
return (split == null) ? null : wrap(split);
}
else
return null;
}
/**
* If the buffer is empty, push elements into the sink chain until
* the source is empty or cancellation is requested.
* @return whether there are elements to consume from the buffer
*/
private boolean fillBuffer() {
while (buffer.count() == 0) {
if (bufferSink.cancellationRequested() || !pusher.getAsBoolean()) {
if (finished)
return false;
else {
bufferSink.end(); // might trigger more elements
finished = true;
}
}
}
return true;
}
@Override
public final long estimateSize() {
init();
return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags())
? spliterator.estimateSize()
: Long.MAX_VALUE;
}
@Override
public final long getExactSizeIfKnown() {
init();
return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags())
? spliterator.getExactSizeIfKnown()
: -1;
}
@Override
public final int characteristics() {
init();
// Get the characteristics from the pipeline
int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags()));
// Mask off the size and uniform characteristics and replace with
// those of the spliterator
// Note that a non-uniform spliterator can change from something
// with an exact size to an estimate for a sub-split, for example
// with HashSet where the size is known at the top level spliterator
// but for sub-splits only an estimate is known
if ((c & Spliterator.SIZED) != 0) {
c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED);
c |= (spliterator.characteristics() & Spliterator.SIZED & Spliterator.SUBSIZED);
}
return c;
}
@Override
public Comparator super P_OUT> getComparator() {
if (!hasCharacteristics(SORTED))
throw new IllegalStateException();
return null;
}
@Override
public final String toString() {
return String.format("%s[%s]", getClass().getName(), spliterator);
}
}
static final class WrappingSpliterator
extends AbstractWrappingSpliterator> {
WrappingSpliterator(PipelineHelper ph,
Supplier> supplier,
boolean parallel) {
super(ph, supplier, parallel);
}
WrappingSpliterator(PipelineHelper ph,
Spliterator spliterator,
boolean parallel) {
super(ph, spliterator, parallel);
}
@Override
WrappingSpliterator wrap(Spliterator s) {
return new WrappingSpliterator<>(ph, s, isParallel);
}
@Override
void initPartialTraversalState() {
SpinedBuffer b = new SpinedBuffer<>();
buffer = b;
bufferSink = ph.wrapSink(b::accept);
pusher = () -> spliterator.tryAdvance(bufferSink);
}
@Override
public boolean tryAdvance(Consumer super P_OUT> consumer) {
boolean hasNext = doAdvance();
if (hasNext)
consumer.accept(buffer.get(nextToConsume));
return hasNext;
}
@Override
public void forEachRemaining(Consumer super P_OUT> consumer) {
if (buffer == null && !finished) {
init();
ph.wrapAndCopyInto((Sink) consumer::accept, spliterator);
finished = true;
}
else {
while (tryAdvance(consumer)) { }
}
}
}
static final class IntWrappingSpliterator
extends AbstractWrappingSpliterator
implements Spliterator.OfInt {
IntWrappingSpliterator(PipelineHelper ph,
Supplier> supplier,
boolean parallel) {
super(ph, supplier, parallel);
}
IntWrappingSpliterator(PipelineHelper ph,
Spliterator spliterator,
boolean parallel) {
super(ph, spliterator, parallel);
}
@Override
AbstractWrappingSpliterator wrap(Spliterator s) {
return new IntWrappingSpliterator<>(ph, s, isParallel);
}
@Override
void initPartialTraversalState() {
SpinedBuffer.OfInt b = new SpinedBuffer.OfInt();
buffer = b;
bufferSink = ph.wrapSink((Sink.OfInt) b::accept);
pusher = () -> spliterator.tryAdvance(bufferSink);
}
@Override
public Spliterator.OfInt trySplit() {
return (Spliterator.OfInt) super.trySplit();
}
@Override
public boolean tryAdvance(IntConsumer consumer) {
boolean hasNext = doAdvance();
if (hasNext)
consumer.accept(buffer.get(nextToConsume));
return hasNext;
}
@Override
public void forEachRemaining(IntConsumer consumer) {
if (buffer == null && !finished) {
init();
ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator);
finished = true;
}
else {
while (tryAdvance(consumer)) { }
}
}
}
static final class LongWrappingSpliterator
extends AbstractWrappingSpliterator
implements Spliterator.OfLong {
LongWrappingSpliterator(PipelineHelper ph,
Supplier> supplier,
boolean parallel) {
super(ph, supplier, parallel);
}
LongWrappingSpliterator(PipelineHelper ph,
Spliterator spliterator,
boolean parallel) {
super(ph, spliterator, parallel);
}
@Override
AbstractWrappingSpliterator wrap(Spliterator s) {
return new LongWrappingSpliterator<>(ph, s, isParallel);
}
@Override
void initPartialTraversalState() {
SpinedBuffer.OfLong b = new SpinedBuffer.OfLong();
buffer = b;
bufferSink = ph.wrapSink((Sink.OfLong) b::accept);
pusher = () -> spliterator.tryAdvance(bufferSink);
}
@Override
public Spliterator.OfLong trySplit() {
return (Spliterator.OfLong) super.trySplit();
}
@Override
public boolean tryAdvance(LongConsumer consumer) {
boolean hasNext = doAdvance();
if (hasNext)
consumer.accept(buffer.get(nextToConsume));
return hasNext;
}
@Override
public void forEachRemaining(LongConsumer consumer) {
if (buffer == null && !finished) {
init();
ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator);
finished = true;
}
else {
while (tryAdvance(consumer)) { }
}
}
}
static final class DoubleWrappingSpliterator
extends AbstractWrappingSpliterator
implements Spliterator.OfDouble {
DoubleWrappingSpliterator(PipelineHelper ph,
Supplier> supplier,
boolean parallel) {
super(ph, supplier, parallel);
}
DoubleWrappingSpliterator(PipelineHelper ph,
Spliterator spliterator,
boolean parallel) {
super(ph, spliterator, parallel);
}
@Override
AbstractWrappingSpliterator wrap(Spliterator s) {
return new DoubleWrappingSpliterator<>(ph, s, isParallel);
}
@Override
void initPartialTraversalState() {
SpinedBuffer.OfDouble b = new SpinedBuffer.OfDouble();
buffer = b;
bufferSink = ph.wrapSink((Sink.OfDouble) b::accept);
pusher = () -> spliterator.tryAdvance(bufferSink);
}
@Override
public Spliterator.OfDouble trySplit() {
return (Spliterator.OfDouble) super.trySplit();
}
@Override
public boolean tryAdvance(DoubleConsumer consumer) {
boolean hasNext = doAdvance();
if (hasNext)
consumer.accept(buffer.get(nextToConsume));
return hasNext;
}
@Override
public void forEachRemaining(DoubleConsumer consumer) {
if (buffer == null && !finished) {
init();
ph.wrapAndCopyInto((Sink.OfDouble) consumer::accept, spliterator);
finished = true;
}
else {
while (tryAdvance(consumer)) { }
}
}
}
/**
* Spliterator implementation that delegates to an underlying spliterator,
* acquiring the spliterator from a {@code Supplier} on the
* first call to any spliterator method.
* @param
*/
static class DelegatingSpliterator implements Spliterator {
private final Supplier> supplier;
private Spliterator s;
@SuppressWarnings("unchecked")
DelegatingSpliterator(Supplier extends Spliterator> supplier) {
this.supplier = (Supplier>) supplier;
}
Spliterator get() {
if (s == null) {
s = supplier.get();
}
return s;
}
@Override
public Spliterator trySplit() {
return get().trySplit();
}
@Override
public boolean tryAdvance(Consumer super T> consumer) {
return get().tryAdvance(consumer);
}
@Override
public void forEachRemaining(Consumer super T> consumer) {
get().forEachRemaining(consumer);
}
@Override
public long estimateSize() {
return get().estimateSize();
}
@Override
public int characteristics() {
return get().characteristics();
}
@Override
public Comparator super T> getComparator() {
return get().getComparator();
}
@Override
public long getExactSizeIfKnown() {
return get().getExactSizeIfKnown();
}
@Override
public String toString() {
return getClass().getName() + "[" + get() + "]";
}
static final class OfInt extends DelegatingSpliterator implements Spliterator.OfInt {
private Spliterator.OfInt s;
OfInt(Supplier supplier) {
super(supplier);
}
@Override
Spliterator.OfInt get() {
if (s == null) {
s = (Spliterator.OfInt) super.get();
}
return s;
}
@Override
public Spliterator.OfInt trySplit() {
return get().trySplit();
}
@Override
public boolean tryAdvance(IntConsumer consumer) {
return get().tryAdvance(consumer);
}
@Override
public void forEachRemaining(IntConsumer consumer) {
get().forEachRemaining(consumer);
}
}
static final class OfLong extends DelegatingSpliterator implements Spliterator.OfLong {
private Spliterator.OfLong s;
OfLong(Supplier supplier) {
super(supplier);
}
@Override
Spliterator.OfLong get() {
if (s == null) {
s = (Spliterator.OfLong) super.get();
}
return s;
}
@Override
public Spliterator.OfLong trySplit() {
return get().trySplit();
}
@Override
public boolean tryAdvance(LongConsumer consumer) {
return get().tryAdvance(consumer);
}
@Override
public void forEachRemaining(LongConsumer consumer) {
get().forEachRemaining(consumer);
}
}
static final class OfDouble extends DelegatingSpliterator implements Spliterator.OfDouble {
private Spliterator.OfDouble s;
OfDouble(Supplier supplier) {
super(supplier);
}
@Override
Spliterator.OfDouble get() {
if (s == null) {
s = (Spliterator.OfDouble) super.get();
}
return s;
}
@Override
public Spliterator.OfDouble trySplit() {
return get().trySplit();
}
@Override
public boolean tryAdvance(DoubleConsumer consumer) {
return get().tryAdvance(consumer);
}
@Override
public void forEachRemaining(DoubleConsumer consumer) {
get().forEachRemaining(consumer);
}
}
}
}