< prev index next >
src/java.base/share/classes/java/util/stream/StreamSpliterators.java
Print this page
*** 26,35 ****
--- 26,36 ----
import java.util.Comparator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.DoubleSupplier;
*** 903,928 ****
--- 904,933 ----
static final int CHUNK_SIZE = 1 << 7;
// The spliterator to slice
protected final T_SPLITR s;
protected final boolean unlimited;
+ protected final int chunkSize;
private final long skipThreshold;
private final AtomicLong permits;
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
this.s = s;
this.unlimited = limit < 0;
this.skipThreshold = limit >= 0 ? limit : 0;
+ this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
+ ((skip + limit) / ForkJoinPool.getCommonPoolParallelism()) + 1) : CHUNK_SIZE;
this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
}
UnorderedSliceSpliterator(T_SPLITR s,
UnorderedSliceSpliterator<T, T_SPLITR> parent) {
this.s = s;
this.unlimited = parent.unlimited;
this.permits = parent.permits;
this.skipThreshold = parent.skipThreshold;
+ this.chunkSize = parent.chunkSize;
}
/**
* Acquire permission to skip or process elements. The caller must
* first acquire the elements, then consult this method for guidance
*** 1027,1043 ****
ArrayBuffer.OfRef<T> sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
! // Optimistically traverse elements up to a threshold of CHUNK_SIZE
if (sb == null)
! sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
else
sb.reset();
long permitsRequested = 0;
! do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
--- 1032,1048 ----
ArrayBuffer.OfRef<T> sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
! // Optimistically traverse elements up to a threshold of chunkSize
if (sb == null)
! sb = new ArrayBuffer.OfRef<>(chunkSize);
else
sb.reset();
long permitsRequested = 0;
! do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
*** 1100,1118 ****
T_BUFF sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
! // Optimistically traverse elements up to a threshold of CHUNK_SIZE
if (sb == null)
! sb = bufferCreate(CHUNK_SIZE);
else
sb.reset();
@SuppressWarnings("unchecked")
T_CONS sbc = (T_CONS) sb;
long permitsRequested = 0;
! do { } while (s.tryAdvance(sbc) && ++permitsRequested < CHUNK_SIZE);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
--- 1105,1123 ----
T_BUFF sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
! // Optimistically traverse elements up to a threshold of chunkSize
if (sb == null)
! sb = bufferCreate(chunkSize);
else
sb.reset();
@SuppressWarnings("unchecked")
T_CONS sbc = (T_CONS) sb;
long permitsRequested = 0;
! do { } while (s.tryAdvance(sbc) && ++permitsRequested < chunkSize);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
< prev index next >