rev 47749 : 8190974: Parallel stream execution within a custom ForkJoinPool should obey the parallelism
Reviewed-by: martin, tvaleev

   1 /*
   2  * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.Spliterator;
  29 import java.util.concurrent.ConcurrentHashMap;
  30 import java.util.concurrent.CountedCompleter;

  31 import java.util.function.Consumer;
  32 import java.util.function.DoubleConsumer;
  33 import java.util.function.IntConsumer;
  34 import java.util.function.IntFunction;
  35 import java.util.function.LongConsumer;
  36 
  37 /**
  38  * Factory for creating instances of {@code TerminalOp} that perform an
  39  * action for every element of a stream.  Supported variants include unordered
  40  * traversal (elements are provided to the {@code Consumer} as soon as they are
  41  * available), and ordered traversal (elements are provided to the
  42  * {@code Consumer} in encounter order.)
  43  *
  44  * <p>Elements are provided to the {@code Consumer} on whatever thread and
  45  * whatever order they become available.  For ordered traversals, it is
  46  * guaranteed that processing an element <em>happens-before</em> processing
  47  * subsequent elements in the encounter order.
  48  *
  49  * <p>Exceptions occurring as a result of sending an element to the
  50  * {@code Consumer} will be relayed to the caller and traversal will be
  51  * prematurely terminated.
  52  *
  53  * @since 1.8
  54  */
  55 final class ForEachOps {
  56 
  57     private ForEachOps() { }
  58 
  59     /**
  60      * Constructs a {@code TerminalOp} that perform an action for every element
  61      * of a stream.
  62      *
  63      * @param action the {@code Consumer} that receives all elements of a
  64      *        stream
  65      * @param ordered whether an ordered traversal is requested
  66      * @param <T> the type of the stream elements
  67      * @return the {@code TerminalOp} instance
  68      */
  69     public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
  70                                                   boolean ordered) {
  71         Objects.requireNonNull(action);
  72         return new ForEachOp.OfRef<>(action, ordered);
  73     }
  74 
  75     /**
  76      * Constructs a {@code TerminalOp} that perform an action for every element
  77      * of an {@code IntStream}.
  78      *
  79      * @param action the {@code IntConsumer} that receives all elements of a
  80      *        stream
  81      * @param ordered whether an ordered traversal is requested
  82      * @return the {@code TerminalOp} instance
  83      */
  84     public static TerminalOp<Integer, Void> makeInt(IntConsumer action,
  85                                                     boolean ordered) {
  86         Objects.requireNonNull(action);
  87         return new ForEachOp.OfInt(action, ordered);
  88     }
  89 
  90     /**
  91      * Constructs a {@code TerminalOp} that perform an action for every element
  92      * of a {@code LongStream}.
  93      *
  94      * @param action the {@code LongConsumer} that receives all elements of a
  95      *        stream
  96      * @param ordered whether an ordered traversal is requested
  97      * @return the {@code TerminalOp} instance
  98      */
  99     public static TerminalOp<Long, Void> makeLong(LongConsumer action,
 100                                                   boolean ordered) {
 101         Objects.requireNonNull(action);
 102         return new ForEachOp.OfLong(action, ordered);
 103     }
 104 
 105     /**
 106      * Constructs a {@code TerminalOp} that perform an action for every element
 107      * of a {@code DoubleStream}.
 108      *
 109      * @param action the {@code DoubleConsumer} that receives all elements of
 110      *        a stream
 111      * @param ordered whether an ordered traversal is requested
 112      * @return the {@code TerminalOp} instance
 113      */
 114     public static TerminalOp<Double, Void> makeDouble(DoubleConsumer action,
 115                                                       boolean ordered) {
 116         Objects.requireNonNull(action);
 117         return new ForEachOp.OfDouble(action, ordered);
 118     }
 119 
 120     /**
 121      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
 122      * output to itself as a {@code TerminalSink}.  Elements will be sent in
 123      * whatever thread they become available.  If the traversal is unordered,
 124      * they will be sent independent of the stream's encounter order.
 125      *
 126      * <p>This terminal operation is stateless.  For parallel evaluation, each
 127      * leaf instance of a {@code ForEachTask} will send elements to the same
 128      * {@code TerminalSink} reference that is an instance of this class.
 129      *
 130      * @param <T> the output type of the stream pipeline
 131      */
 132     abstract static class ForEachOp<T>
 133             implements TerminalOp<T, Void>, TerminalSink<T, Void> {
 134         private final boolean ordered;
 135 
 136         protected ForEachOp(boolean ordered) {
 137             this.ordered = ordered;
 138         }
 139 
 140         // TerminalOp
 141 
 142         @Override
 143         public int getOpFlags() {
 144             return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
 145         }
 146 
 147         @Override
 148         public <S> Void evaluateSequential(PipelineHelper<T> helper,
 149                                            Spliterator<S> spliterator) {
 150             return helper.wrapAndCopyInto(this, spliterator).get();
 151         }
 152 
 153         @Override
 154         public <S> Void evaluateParallel(PipelineHelper<T> helper,
 155                                          Spliterator<S> spliterator) {
 156             if (ordered)
 157                 new ForEachOrderedTask<>(helper, spliterator, this).invoke();
 158             else
 159                 new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
 160             return null;
 161         }
 162 
 163         // TerminalSink
 164 
 165         @Override
 166         public Void get() {
 167             return null;
 168         }
 169 
 170         // Implementations
 171 
 172         /** Implementation class for reference streams */
 173         static final class OfRef<T> extends ForEachOp<T> {
 174             final Consumer<? super T> consumer;
 175 
 176             OfRef(Consumer<? super T> consumer, boolean ordered) {
 177                 super(ordered);
 178                 this.consumer = consumer;
 179             }
 180 
 181             @Override
 182             public void accept(T t) {
 183                 consumer.accept(t);
 184             }
 185         }
 186 
 187         /** Implementation class for {@code IntStream} */
 188         static final class OfInt extends ForEachOp<Integer>
 189                 implements Sink.OfInt {
 190             final IntConsumer consumer;
 191 
 192             OfInt(IntConsumer consumer, boolean ordered) {
 193                 super(ordered);
 194                 this.consumer = consumer;
 195             }
 196 
 197             @Override
 198             public StreamShape inputShape() {
 199                 return StreamShape.INT_VALUE;
 200             }
 201 
 202             @Override
 203             public void accept(int t) {
 204                 consumer.accept(t);
 205             }
 206         }
 207 
 208         /** Implementation class for {@code LongStream} */
 209         static final class OfLong extends ForEachOp<Long>
 210                 implements Sink.OfLong {
 211             final LongConsumer consumer;
 212 
 213             OfLong(LongConsumer consumer, boolean ordered) {
 214                 super(ordered);
 215                 this.consumer = consumer;
 216             }
 217 
 218             @Override
 219             public StreamShape inputShape() {
 220                 return StreamShape.LONG_VALUE;
 221             }
 222 
 223             @Override
 224             public void accept(long t) {
 225                 consumer.accept(t);
 226             }
 227         }
 228 
 229         /** Implementation class for {@code DoubleStream} */
 230         static final class OfDouble extends ForEachOp<Double>
 231                 implements Sink.OfDouble {
 232             final DoubleConsumer consumer;
 233 
 234             OfDouble(DoubleConsumer consumer, boolean ordered) {
 235                 super(ordered);
 236                 this.consumer = consumer;
 237             }
 238 
 239             @Override
 240             public StreamShape inputShape() {
 241                 return StreamShape.DOUBLE_VALUE;
 242             }
 243 
 244             @Override
 245             public void accept(double t) {
 246                 consumer.accept(t);
 247             }
 248         }
 249     }
 250 
 251     /** A {@code ForkJoinTask} for performing a parallel for-each operation */
 252     @SuppressWarnings("serial")
 253     static final class ForEachTask<S, T> extends CountedCompleter<Void> {
 254         private Spliterator<S> spliterator;
 255         private final Sink<S> sink;
 256         private final PipelineHelper<T> helper;
 257         private long targetSize;
 258 
 259         ForEachTask(PipelineHelper<T> helper,
 260                     Spliterator<S> spliterator,
 261                     Sink<S> sink) {
 262             super(null);
 263             this.sink = sink;
 264             this.helper = helper;
 265             this.spliterator = spliterator;
 266             this.targetSize = 0L;
 267         }
 268 
 269         ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
 270             super(parent);
 271             this.spliterator = spliterator;
 272             this.sink = parent.sink;
 273             this.targetSize = parent.targetSize;
 274             this.helper = parent.helper;
 275         }
 276 
 277         // Similar to AbstractTask but doesn't need to track child tasks
 278         public void compute() {
 279             Spliterator<S> rightSplit = spliterator, leftSplit;
 280             long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
 281             if ((sizeThreshold = targetSize) == 0L)
 282                 targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
 283             boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
 284             boolean forkRight = false;
 285             Sink<S> taskSink = sink;
 286             ForEachTask<S, T> task = this;
 287             while (!isShortCircuit || !taskSink.cancellationRequested()) {
 288                 if (sizeEstimate <= sizeThreshold ||
 289                     (leftSplit = rightSplit.trySplit()) == null) {
 290                     task.helper.copyInto(taskSink, rightSplit);
 291                     break;
 292                 }
 293                 ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
 294                 task.addToPendingCount(1);
 295                 ForEachTask<S, T> taskToFork;
 296                 if (forkRight) {
 297                     forkRight = false;
 298                     rightSplit = leftSplit;
 299                     taskToFork = task;
 300                     task = leftTask;
 301                 }
 302                 else {
 303                     forkRight = true;
 304                     taskToFork = leftTask;
 305                 }
 306                 taskToFork.fork();
 307                 sizeEstimate = rightSplit.estimateSize();
 308             }
 309             task.spliterator = null;
 310             task.propagateCompletion();
 311         }
 312     }
 313 
 314     /**
 315      * A {@code ForkJoinTask} for performing a parallel for-each operation
 316      * which visits the elements in encounter order
 317      */
 318     @SuppressWarnings("serial")
 319     static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
 320         /*
 321          * Our goal is to ensure that the elements associated with a task are
 322          * processed according to an in-order traversal of the computation tree.
 323          * We use completion counts for representing these dependencies, so that
 324          * a task does not complete until all the tasks preceding it in this
 325          * order complete.  We use the "completion map" to associate the next
 326          * task in this order for any left child.  We increase the pending count
 327          * of any node on the right side of such a mapping by one to indicate
 328          * its dependency, and when a node on the left side of such a mapping
 329          * completes, it decrements the pending count of its corresponding right
 330          * side.  As the computation tree is expanded by splitting, we must
 331          * atomically update the mappings to maintain the invariant that the
 332          * completion map maps left children to the next node in the in-order
 333          * traversal.
 334          *
 335          * Take, for example, the following computation tree of tasks:
 336          *
 337          *       a
 338          *      / \
 339          *     b   c
 340          *    / \ / \
 341          *   d  e f  g
 342          *
 343          * The complete map will contain (not necessarily all at the same time)
 344          * the following associations:
 345          *
 346          *   d -> e
 347          *   b -> f
 348          *   f -> g
 349          *
 350          * Tasks e, f, g will have their pending counts increased by 1.
 351          *
 352          * The following relationships hold:
 353          *
 354          *   - completion of d "happens-before" e;
 355          *   - completion of d and e "happens-before b;
 356          *   - completion of b "happens-before" f; and
 357          *   - completion of f "happens-before" g
 358          *
 359          * Thus overall the "happens-before" relationship holds for the
 360          * reporting of elements, covered by tasks d, e, f and g, as specified
 361          * by the forEachOrdered operation.
 362          */
 363 
 364         private final PipelineHelper<T> helper;
 365         private Spliterator<S> spliterator;
 366         private final long targetSize;
 367         private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
 368         private final Sink<T> action;
 369         private final ForEachOrderedTask<S, T> leftPredecessor;
 370         private Node<T> node;
 371 
 372         protected ForEachOrderedTask(PipelineHelper<T> helper,
 373                                      Spliterator<S> spliterator,
 374                                      Sink<T> action) {
 375             super(null);
 376             this.helper = helper;
 377             this.spliterator = spliterator;
 378             this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
 379             // Size map to avoid concurrent re-sizes
 380             this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.getLeafTarget() << 1));
 381             this.action = action;
 382             this.leftPredecessor = null;
 383         }
 384 
 385         ForEachOrderedTask(ForEachOrderedTask<S, T> parent,
 386                            Spliterator<S> spliterator,
 387                            ForEachOrderedTask<S, T> leftPredecessor) {
 388             super(parent);
 389             this.helper = parent.helper;
 390             this.spliterator = spliterator;
 391             this.targetSize = parent.targetSize;
 392             this.completionMap = parent.completionMap;
 393             this.action = parent.action;
 394             this.leftPredecessor = leftPredecessor;
 395         }
 396 
 397         @Override
 398         public final void compute() {
 399             doCompute(this);
 400         }
 401 
 402         private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) {
 403             Spliterator<S> rightSplit = task.spliterator, leftSplit;
 404             long sizeThreshold = task.targetSize;
 405             boolean forkRight = false;
 406             while (rightSplit.estimateSize() > sizeThreshold &&
 407                    (leftSplit = rightSplit.trySplit()) != null) {
 408                 ForEachOrderedTask<S, T> leftChild =
 409                     new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
 410                 ForEachOrderedTask<S, T> rightChild =
 411                     new ForEachOrderedTask<>(task, rightSplit, leftChild);
 412 
 413                 // Fork the parent task
 414                 // Completion of the left and right children "happens-before"
 415                 // completion of the parent
 416                 task.addToPendingCount(1);
 417                 // Completion of the left child "happens-before" completion of
 418                 // the right child
 419                 rightChild.addToPendingCount(1);
 420                 task.completionMap.put(leftChild, rightChild);
 421 
 422                 // If task is not on the left spine
 423                 if (task.leftPredecessor != null) {
 424                     /*
 425                      * Completion of left-predecessor, or left subtree,
 426                      * "happens-before" completion of left-most leaf node of
 427                      * right subtree.
 428                      * The left child's pending count needs to be updated before
 429                      * it is associated in the completion map, otherwise the
 430                      * left child can complete prematurely and violate the
 431                      * "happens-before" constraint.
 432                      */
 433                     leftChild.addToPendingCount(1);
 434                     // Update association of left-predecessor to left-most
 435                     // leaf node of right subtree
 436                     if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) {
 437                         // If replaced, adjust the pending count of the parent
 438                         // to complete when its children complete
 439                         task.addToPendingCount(-1);
 440                     } else {
 441                         // Left-predecessor has already completed, parent's
 442                         // pending count is adjusted by left-predecessor;
 443                         // left child is ready to complete
 444                         leftChild.addToPendingCount(-1);
 445                     }
 446                 }
 447 
 448                 ForEachOrderedTask<S, T> taskToFork;
 449                 if (forkRight) {
 450                     forkRight = false;
 451                     rightSplit = leftSplit;
 452                     task = leftChild;
 453                     taskToFork = rightChild;
 454                 }
 455                 else {
 456                     forkRight = true;
 457                     task = rightChild;
 458                     taskToFork = leftChild;
 459                 }
 460                 taskToFork.fork();
 461             }
 462 
 463             /*
 464              * Task's pending count is either 0 or 1.  If 1 then the completion
 465              * map will contain a value that is task, and two calls to
 466              * tryComplete are required for completion, one below and one
 467              * triggered by the completion of task's left-predecessor in
 468              * onCompletion.  Therefore there is no data race within the if
 469              * block.
 470              */
 471             if (task.getPendingCount() > 0) {
 472                 // Cannot complete just yet so buffer elements into a Node
 473                 // for use when completion occurs
 474                 @SuppressWarnings("unchecked")
 475                 IntFunction<T[]> generator = size -> (T[]) new Object[size];
 476                 Node.Builder<T> nb = task.helper.makeNodeBuilder(
 477                         task.helper.exactOutputSizeIfKnown(rightSplit),
 478                         generator);
 479                 task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
 480                 task.spliterator = null;
 481             }
 482             task.tryComplete();
 483         }
 484 
 485         @Override
 486         public void onCompletion(CountedCompleter<?> caller) {
 487             if (node != null) {
 488                 // Dump buffered elements from this leaf into the sink
 489                 node.forEach(action);
 490                 node = null;
 491             }
 492             else if (spliterator != null) {
 493                 // Dump elements output from this leaf's pipeline into the sink
 494                 helper.wrapAndCopyInto(action, spliterator);
 495                 spliterator = null;
 496             }
 497 
 498             // The completion of this task *and* the dumping of elements
 499             // "happens-before" completion of the associated left-most leaf task
 500             // of right subtree (if any, which can be this task's right sibling)
 501             //
 502             ForEachOrderedTask<S, T> leftDescendant = completionMap.remove(this);
 503             if (leftDescendant != null)
 504                 leftDescendant.tryComplete();
 505         }
 506     }
 507 }
--- EOF ---