< prev index next >

src/java.base/share/classes/java/util/stream/StreamSpliterators.java

Print this page

        

@@ -26,10 +26,11 @@
 
 import java.util.Comparator;
 import java.util.Objects;
 import java.util.Spliterator;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.DoubleConsumer;
 import java.util.function.DoubleSupplier;

@@ -903,26 +904,30 @@
         static final int CHUNK_SIZE = 1 << 7;
 
         // The spliterator to slice
         protected final T_SPLITR s;
         protected final boolean unlimited;
+        protected final int chunkSize;
         private final long skipThreshold;
         private final AtomicLong permits;
 
         UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
             this.s = s;
             this.unlimited = limit < 0;
             this.skipThreshold = limit >= 0 ? limit : 0;
+            this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE, 
+                ((skip + limit) / ForkJoinPool.getCommonPoolParallelism()) + 1) : CHUNK_SIZE;
             this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
         }
 
         UnorderedSliceSpliterator(T_SPLITR s,
                                   UnorderedSliceSpliterator<T, T_SPLITR> parent) {
             this.s = s;
             this.unlimited = parent.unlimited;
             this.permits = parent.permits;
             this.skipThreshold = parent.skipThreshold;
+            this.chunkSize = parent.chunkSize;
         }
 
         /**
          * Acquire permission to skip or process elements.  The caller must
          * first acquire the elements, then consult this method for guidance

@@ -1027,17 +1032,17 @@
 
                 ArrayBuffer.OfRef<T> sb = null;
                 PermitStatus permitStatus;
                 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                     if (permitStatus == PermitStatus.MAYBE_MORE) {
-                        // Optimistically traverse elements up to a threshold of CHUNK_SIZE
+                        // Optimistically traverse elements up to a threshold of chunkSize
                         if (sb == null)
-                            sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
+                            sb = new ArrayBuffer.OfRef<>(chunkSize);
                         else
                             sb.reset();
                         long permitsRequested = 0;
-                        do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
+                        do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
                         if (permitsRequested == 0)
                             return;
                         sb.forEach(action, acquirePermits(permitsRequested));
                     }
                     else {

@@ -1100,19 +1105,19 @@
 
                 T_BUFF sb = null;
                 PermitStatus permitStatus;
                 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                     if (permitStatus == PermitStatus.MAYBE_MORE) {
-                        // Optimistically traverse elements up to a threshold of CHUNK_SIZE
+                        // Optimistically traverse elements up to a threshold of chunkSize
                         if (sb == null)
-                            sb = bufferCreate(CHUNK_SIZE);
+                            sb = bufferCreate(chunkSize);
                         else
                             sb.reset();
                         @SuppressWarnings("unchecked")
                         T_CONS sbc = (T_CONS) sb;
                         long permitsRequested = 0;
-                        do { } while (s.tryAdvance(sbc) && ++permitsRequested < CHUNK_SIZE);
+                        do { } while (s.tryAdvance(sbc) && ++permitsRequested < chunkSize);
                         if (permitsRequested == 0)
                             return;
                         sb.forEach(action, acquirePermits(permitsRequested));
                     }
                     else {
< prev index next >