< prev index next >

src/java.base/share/classes/java/util/stream/StreamSpliterators.java

Print this page

        

*** 26,35 **** --- 26,36 ---- import java.util.Comparator; import java.util.Objects; import java.util.Spliterator; import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.DoubleSupplier;
*** 903,928 **** --- 904,933 ---- static final int CHUNK_SIZE = 1 << 7; // The spliterator to slice protected final T_SPLITR s; protected final boolean unlimited; + protected final int chunkSize; private final long skipThreshold; private final AtomicLong permits; UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) { this.s = s; this.unlimited = limit < 0; this.skipThreshold = limit >= 0 ? limit : 0; + this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE, + ((skip + limit) / ForkJoinPool.getCommonPoolParallelism()) + 1) : CHUNK_SIZE; this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip); } UnorderedSliceSpliterator(T_SPLITR s, UnorderedSliceSpliterator<T, T_SPLITR> parent) { this.s = s; this.unlimited = parent.unlimited; this.permits = parent.permits; this.skipThreshold = parent.skipThreshold; + this.chunkSize = parent.chunkSize; } /** * Acquire permission to skip or process elements. The caller must * first acquire the elements, then consult this method for guidance
*** 1027,1043 **** ArrayBuffer.OfRef<T> sb = null; PermitStatus permitStatus; while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { if (permitStatus == PermitStatus.MAYBE_MORE) { ! // Optimistically traverse elements up to a threshold of CHUNK_SIZE if (sb == null) ! sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE); else sb.reset(); long permitsRequested = 0; ! do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE); if (permitsRequested == 0) return; sb.forEach(action, acquirePermits(permitsRequested)); } else { --- 1032,1048 ---- ArrayBuffer.OfRef<T> sb = null; PermitStatus permitStatus; while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { if (permitStatus == PermitStatus.MAYBE_MORE) { ! // Optimistically traverse elements up to a threshold of chunkSize if (sb == null) ! sb = new ArrayBuffer.OfRef<>(chunkSize); else sb.reset(); long permitsRequested = 0; ! do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize); if (permitsRequested == 0) return; sb.forEach(action, acquirePermits(permitsRequested)); } else {
*** 1100,1118 **** T_BUFF sb = null; PermitStatus permitStatus; while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { if (permitStatus == PermitStatus.MAYBE_MORE) { ! // Optimistically traverse elements up to a threshold of CHUNK_SIZE if (sb == null) ! sb = bufferCreate(CHUNK_SIZE); else sb.reset(); @SuppressWarnings("unchecked") T_CONS sbc = (T_CONS) sb; long permitsRequested = 0; ! do { } while (s.tryAdvance(sbc) && ++permitsRequested < CHUNK_SIZE); if (permitsRequested == 0) return; sb.forEach(action, acquirePermits(permitsRequested)); } else { --- 1105,1123 ---- T_BUFF sb = null; PermitStatus permitStatus; while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { if (permitStatus == PermitStatus.MAYBE_MORE) { ! // Optimistically traverse elements up to a threshold of chunkSize if (sb == null) ! sb = bufferCreate(chunkSize); else sb.reset(); @SuppressWarnings("unchecked") T_CONS sbc = (T_CONS) sb; long permitsRequested = 0; ! do { } while (s.tryAdvance(sbc) && ++permitsRequested < chunkSize); if (permitsRequested == 0) return; sb.forEach(action, acquirePermits(permitsRequested)); } else {
< prev index next >