1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.Spliterator;
  29 import java.util.concurrent.ConcurrentHashMap;
  30 import java.util.concurrent.CountedCompleter;
  31 import java.util.concurrent.ForkJoinTask;
  32 import java.util.function.Consumer;
  33 import java.util.function.DoubleConsumer;
  34 import java.util.function.IntConsumer;
  35 import java.util.function.IntFunction;
  36 import java.util.function.LongConsumer;
  37 
  38 /**
  39  * Factory for creating instances of {@code TerminalOp} that perform an
  40  * action for every element of a stream.  Supported variants include unordered
  41  * traversal (elements are provided to the {@code Consumer} as soon as they are
  42  * available), and ordered traversal (elements are provided to the
  43  * {@code Consumer} in encounter order.)
  44  *
  45  * <p>Elements are provided to the {@code Consumer} on whatever thread and
  46  * whatever order they become available.  For ordered traversals, it is
  47  * guaranteed that processing an element <em>happens-before</em> processing
  48  * subsequent elements in the encounter order.
  49  *
  50  * <p>Exceptions occurring as a result of sending an element to the
  51  * {@code Consumer} will be relayed to the caller and traversal will be
  52  * prematurely terminated.
  53  *
  54  * @since 1.8
  55  */
  56 final class ForEachOps {
  57 
  58     private ForEachOps() { }
  59 
  60     /**
  61      * Constructs a {@code TerminalOp} that perform an action for every element
  62      * of a stream.
  63      *
  64      * @param action the {@code Consumer} that receives all elements of a
  65      *        stream
  66      * @param ordered whether an ordered traversal is requested
  67      * @param <T> the type of the stream elements
  68      * @return the {@code TerminalOp} instance
  69      */
  70     public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
  71                                                   boolean ordered) {
  72         Objects.requireNonNull(action);
  73         return new ForEachOp.OfRef<>(action, ordered);
  74     }
  75 
  76     /**
  77      * Constructs a {@code TerminalOp} that perform an action for every element
  78      * of an {@code IntStream}.
  79      *
  80      * @param action the {@code IntConsumer} that receives all elements of a
  81      *        stream
  82      * @param ordered whether an ordered traversal is requested
  83      * @return the {@code TerminalOp} instance
  84      */
  85     public static TerminalOp<Integer, Void> makeInt(IntConsumer action,
  86                                                     boolean ordered) {
  87         Objects.requireNonNull(action);
  88         return new ForEachOp.OfInt(action, ordered);
  89     }
  90 
  91     /**
  92      * Constructs a {@code TerminalOp} that perform an action for every element
  93      * of a {@code LongStream}.
  94      *
  95      * @param action the {@code LongConsumer} that receives all elements of a
  96      *        stream
  97      * @param ordered whether an ordered traversal is requested
  98      * @return the {@code TerminalOp} instance
  99      */
 100     public static TerminalOp<Long, Void> makeLong(LongConsumer action,
 101                                                   boolean ordered) {
 102         Objects.requireNonNull(action);
 103         return new ForEachOp.OfLong(action, ordered);
 104     }
 105 
 106     /**
 107      * Constructs a {@code TerminalOp} that perform an action for every element
 108      * of a {@code DoubleStream}.
 109      *
 110      * @param action the {@code DoubleConsumer} that receives all elements of
 111      *        a stream
 112      * @param ordered whether an ordered traversal is requested
 113      * @return the {@code TerminalOp} instance
 114      */
 115     public static TerminalOp<Double, Void> makeDouble(DoubleConsumer action,
 116                                                       boolean ordered) {
 117         Objects.requireNonNull(action);
 118         return new ForEachOp.OfDouble(action, ordered);
 119     }
 120 
 121     /**
 122      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
 123      * output to itself as a {@code TerminalSink}.  Elements will be sent in
 124      * whatever thread they become available.  If the traversal is unordered,
 125      * they will be sent independent of the stream's encounter order.
 126      *
 127      * <p>This terminal operation is stateless.  For parallel evaluation, each
 128      * leaf instance of a {@code ForEachTask} will send elements to the same
 129      * {@code TerminalSink} reference that is an instance of this class.
 130      *
 131      * @param <T> the output type of the stream pipeline
 132      */
 133     abstract static class ForEachOp<T>
 134             implements TerminalOp<T, Void>, TerminalSink<T, Void> {
 135         private final boolean ordered;
 136 
 137         protected ForEachOp(boolean ordered) {
 138             this.ordered = ordered;
 139         }
 140 
 141         // TerminalOp
 142 
 143         @Override
 144         public int getOpFlags() {
 145             return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
 146         }
 147 
 148         @Override
 149         public <S> Void evaluateSequential(PipelineHelper<T> helper,
 150                                            Spliterator<S> spliterator) {
 151             return helper.wrapAndCopyInto(this, spliterator).get();
 152         }
 153 
 154         @Override
 155         public <S> Void evaluateParallel(PipelineHelper<T> helper,
 156                                          Spliterator<S> spliterator) {
 157             if (ordered)
 158                 new ForEachOrderedTask<>(helper, spliterator, this).invoke();
 159             else
 160                 new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
 161             return null;
 162         }
 163 
 164         // TerminalSink
 165 
 166         @Override
 167         public Void get() {
 168             return null;
 169         }
 170 
 171         // Implementations
 172 
 173         /** Implementation class for reference streams */
 174         static final class OfRef<T> extends ForEachOp<T> {
 175             final Consumer<? super T> consumer;
 176 
 177             OfRef(Consumer<? super T> consumer, boolean ordered) {
 178                 super(ordered);
 179                 this.consumer = consumer;
 180             }
 181 
 182             @Override
 183             public void accept(T t) {
 184                 consumer.accept(t);
 185             }
 186         }
 187 
 188         /** Implementation class for {@code IntStream} */
 189         static final class OfInt extends ForEachOp<Integer>
 190                 implements Sink.OfInt {
 191             final IntConsumer consumer;
 192 
 193             OfInt(IntConsumer consumer, boolean ordered) {
 194                 super(ordered);
 195                 this.consumer = consumer;
 196             }
 197 
 198             @Override
 199             public StreamShape inputShape() {
 200                 return StreamShape.INT_VALUE;
 201             }
 202 
 203             @Override
 204             public void accept(int t) {
 205                 consumer.accept(t);
 206             }
 207         }
 208 
 209         /** Implementation class for {@code LongStream} */
 210         static final class OfLong extends ForEachOp<Long>
 211                 implements Sink.OfLong {
 212             final LongConsumer consumer;
 213 
 214             OfLong(LongConsumer consumer, boolean ordered) {
 215                 super(ordered);
 216                 this.consumer = consumer;
 217             }
 218 
 219             @Override
 220             public StreamShape inputShape() {
 221                 return StreamShape.LONG_VALUE;
 222             }
 223 
 224             @Override
 225             public void accept(long t) {
 226                 consumer.accept(t);
 227             }
 228         }
 229 
 230         /** Implementation class for {@code DoubleStream} */
 231         static final class OfDouble extends ForEachOp<Double>
 232                 implements Sink.OfDouble {
 233             final DoubleConsumer consumer;
 234 
 235             OfDouble(DoubleConsumer consumer, boolean ordered) {
 236                 super(ordered);
 237                 this.consumer = consumer;
 238             }
 239 
 240             @Override
 241             public StreamShape inputShape() {
 242                 return StreamShape.DOUBLE_VALUE;
 243             }
 244 
 245             @Override
 246             public void accept(double t) {
 247                 consumer.accept(t);
 248             }
 249         }
 250     }
 251 
 252     /** A {@code ForkJoinTask} for performing a parallel for-each operation */
 253     @SuppressWarnings("serial")
 254     static final class ForEachTask<S, T> extends CountedCompleter<Void> {
 255         private Spliterator<S> spliterator;
 256         private final Sink<S> sink;
 257         private final PipelineHelper<T> helper;
 258         private long targetSize;
 259 
 260         ForEachTask(PipelineHelper<T> helper,
 261                     Spliterator<S> spliterator,
 262                     Sink<S> sink) {
 263             super(null);
 264             this.sink = sink;
 265             this.helper = helper;
 266             this.spliterator = spliterator;
 267             this.targetSize = 0L;
 268         }
 269 
 270         ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
 271             super(parent);
 272             this.spliterator = spliterator;
 273             this.sink = parent.sink;
 274             this.targetSize = parent.targetSize;
 275             this.helper = parent.helper;
 276         }
 277 
 278         // Similar to AbstractTask but doesn't need to track child tasks
 279         public void compute() {
 280             Spliterator<S> rightSplit = spliterator, leftSplit;
 281             long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
 282             if ((sizeThreshold = targetSize) == 0L)
 283                 targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
 284             boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
 285             boolean forkRight = false;
 286             Sink<S> taskSink = sink;
 287             ForEachTask<S, T> task = this;
 288             while (!isShortCircuit || !taskSink.cancellationRequested()) {
 289                 if (sizeEstimate <= sizeThreshold ||
 290                     (leftSplit = rightSplit.trySplit()) == null) {
 291                     task.helper.copyInto(taskSink, rightSplit);
 292                     break;
 293                 }
 294                 ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
 295                 task.addToPendingCount(1);
 296                 ForEachTask<S, T> taskToFork;
 297                 if (forkRight) {
 298                     forkRight = false;
 299                     rightSplit = leftSplit;
 300                     taskToFork = task;
 301                     task = leftTask;
 302                 }
 303                 else {
 304                     forkRight = true;
 305                     taskToFork = leftTask;
 306                 }
 307                 taskToFork.fork();
 308                 sizeEstimate = rightSplit.estimateSize();
 309             }
 310             task.spliterator = null;
 311             task.propagateCompletion();
 312         }
 313     }
 314 
 315     /**
 316      * A {@code ForkJoinTask} for performing a parallel for-each operation
 317      * which visits the elements in encounter order
 318      */
 319     @SuppressWarnings("serial")
 320     static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
 321         /*
 322          * Our goal is to ensure that the elements associated with a task are
 323          * processed according to an in-order traversal of the computation tree.
 324          * We use completion counts for representing these dependencies, so that
 325          * a task does not complete until all the tasks preceding it in this
 326          * order complete.  We use the "completion map" to associate the next
 327          * task in this order for any left child.  We increase the pending count
 328          * of any node on the right side of such a mapping by one to indicate
 329          * its dependency, and when a node on the left side of such a mapping
 330          * completes, it decrements the pending count of its corresponding right
 331          * side.  As the computation tree is expanded by splitting, we must
 332          * atomically update the mappings to maintain the invariant that the
 333          * completion map maps left children to the next node in the in-order
 334          * traversal.
 335          *
 336          * Take, for example, the following computation tree of tasks:
 337          *
 338          *       a
 339          *      / \
 340          *     b   c
 341          *    / \ / \
 342          *   d  e f  g
 343          *
 344          * The complete map will contain (not necessarily all at the same time)
 345          * the following associations:
 346          *
 347          *   d -> e
 348          *   b -> f
 349          *   f -> g
 350          *
 351          * Tasks e, f, g will have their pending counts increased by 1.
 352          *
 353          * The following relationships hold:
 354          *
 355          *   - completion of d "happens-before" e;
 356          *   - completion of d and e "happens-before b;
 357          *   - completion of b "happens-before" f; and
 358          *   - completion of f "happens-before" g
 359          *
 360          * Thus overall the "happens-before" relationship holds for the
 361          * reporting of elements, covered by tasks d, e, f and g, as specified
 362          * by the forEachOrdered operation.
 363          */
 364 
 365         private final PipelineHelper<T> helper;
 366         private Spliterator<S> spliterator;
 367         private final long targetSize;
 368         private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
 369         private final Sink<T> action;
 370         private final ForEachOrderedTask<S, T> leftPredecessor;
 371         private Node<T> node;
 372 
 373         protected ForEachOrderedTask(PipelineHelper<T> helper,
 374                                      Spliterator<S> spliterator,
 375                                      Sink<T> action) {
 376             super(null);
 377             this.helper = helper;
 378             this.spliterator = spliterator;
 379             this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
 380             // Size map to avoid concurrent re-sizes
 381             this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.LEAF_TARGET << 1));
 382             this.action = action;
 383             this.leftPredecessor = null;
 384         }
 385 
 386         ForEachOrderedTask(ForEachOrderedTask<S, T> parent,
 387                            Spliterator<S> spliterator,
 388                            ForEachOrderedTask<S, T> leftPredecessor) {
 389             super(parent);
 390             this.helper = parent.helper;
 391             this.spliterator = spliterator;
 392             this.targetSize = parent.targetSize;
 393             this.completionMap = parent.completionMap;
 394             this.action = parent.action;
 395             this.leftPredecessor = leftPredecessor;
 396         }
 397 
 398         @Override
 399         public final void compute() {
 400             doCompute(this);
 401         }
 402 
 403         private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) {
 404             Spliterator<S> rightSplit = task.spliterator, leftSplit;
 405             long sizeThreshold = task.targetSize;
 406             boolean forkRight = false;
 407             while (rightSplit.estimateSize() > sizeThreshold &&
 408                    (leftSplit = rightSplit.trySplit()) != null) {
 409                 ForEachOrderedTask<S, T> leftChild =
 410                     new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
 411                 ForEachOrderedTask<S, T> rightChild =
 412                     new ForEachOrderedTask<>(task, rightSplit, leftChild);
 413 
 414                 // Fork the parent task
 415                 // Completion of the left and right children "happens-before"
 416                 // completion of the parent
 417                 task.addToPendingCount(1);
 418                 // Completion of the left child "happens-before" completion of
 419                 // the right child
 420                 rightChild.addToPendingCount(1);
 421                 task.completionMap.put(leftChild, rightChild);
 422 
 423                 // If task is not on the left spine
 424                 if (task.leftPredecessor != null) {
 425                     /*
 426                      * Completion of left-predecessor, or left subtree,
 427                      * "happens-before" completion of left-most leaf node of
 428                      * right subtree.
 429                      * The left child's pending count needs to be updated before
 430                      * it is associated in the completion map, otherwise the
 431                      * left child can complete prematurely and violate the
 432                      * "happens-before" constraint.
 433                      */
 434                     leftChild.addToPendingCount(1);
 435                     // Update association of left-predecessor to left-most
 436                     // leaf node of right subtree
 437                     if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) {
 438                         // If replaced, adjust the pending count of the parent
 439                         // to complete when its children complete
 440                         task.addToPendingCount(-1);
 441                     } else {
 442                         // Left-predecessor has already completed, parent's
 443                         // pending count is adjusted by left-predecessor;
 444                         // left child is ready to complete
 445                         leftChild.addToPendingCount(-1);
 446                     }
 447                 }
 448 
 449                 ForEachOrderedTask<S, T> taskToFork;
 450                 if (forkRight) {
 451                     forkRight = false;
 452                     rightSplit = leftSplit;
 453                     task = leftChild;
 454                     taskToFork = rightChild;
 455                 }
 456                 else {
 457                     forkRight = true;
 458                     task = rightChild;
 459                     taskToFork = leftChild;
 460                 }
 461                 taskToFork.fork();
 462             }
 463 
 464             /*
 465              * Task's pending count is either 0 or 1.  If 1 then the completion
 466              * map will contain a value that is task, and two calls to
 467              * tryComplete are required for completion, one below and one
 468              * triggered by the completion of task's left-predecessor in
 469              * onCompletion.  Therefore there is no data race within the if
 470              * block.
 471              */
 472             if (task.getPendingCount() > 0) {
 473                 // Cannot complete just yet so buffer elements into a Node
 474                 // for use when completion occurs
 475                 @SuppressWarnings("unchecked")
 476                 IntFunction<T[]> generator = size -> (T[]) new Object[size];
 477                 Node.Builder<T> nb = task.helper.makeNodeBuilder(
 478                         task.helper.exactOutputSizeIfKnown(rightSplit),
 479                         generator);
 480                 task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
 481                 task.spliterator = null;
 482             }
 483             task.tryComplete();
 484         }
 485 
 486         @Override
 487         public void onCompletion(CountedCompleter<?> caller) {
 488             if (node != null) {
 489                 // Dump buffered elements from this leaf into the sink
 490                 node.forEach(action);
 491                 node = null;
 492             }
 493             else if (spliterator != null) {
 494                 // Dump elements output from this leaf's pipeline into the sink
 495                 helper.wrapAndCopyInto(action, spliterator);
 496                 spliterator = null;
 497             }
 498 
 499             // The completion of this task *and* the dumping of elements
 500             // "happens-before" completion of the associated left-most leaf task
 501             // of right subtree (if any, which can be this task's right sibling)
 502             //
 503             ForEachOrderedTask<S, T> leftDescendant = completionMap.remove(this);
 504             if (leftDescendant != null)
 505                 leftDescendant.tryComplete();
 506         }
 507     }
 508 }