< prev index next >

src/java.base/share/classes/java/util/stream/ForEachOps.java

Print this page
rev 47749 : 8190974: Parallel stream execution within a custom ForkJoinPool should obey the parallelism
Reviewed-by: martin, tvaleev
   1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.Spliterator;
  29 import java.util.concurrent.ConcurrentHashMap;
  30 import java.util.concurrent.CountedCompleter;
  31 import java.util.concurrent.ForkJoinTask;
  32 import java.util.function.Consumer;
  33 import java.util.function.DoubleConsumer;
  34 import java.util.function.IntConsumer;
  35 import java.util.function.IntFunction;
  36 import java.util.function.LongConsumer;
  37 
  38 /**
  39  * Factory for creating instances of {@code TerminalOp} that perform an
  40  * action for every element of a stream.  Supported variants include unordered
  41  * traversal (elements are provided to the {@code Consumer} as soon as they are
  42  * available), and ordered traversal (elements are provided to the
  43  * {@code Consumer} in encounter order.)
  44  *
  45  * <p>Elements are provided to the {@code Consumer} on whatever thread and
  46  * whatever order they become available.  For ordered traversals, it is
  47  * guaranteed that processing an element <em>happens-before</em> processing
  48  * subsequent elements in the encounter order.
  49  *
  50  * <p>Exceptions occurring as a result of sending an element to the
  51  * {@code Consumer} will be relayed to the caller and traversal will be


 361          * reporting of elements, covered by tasks d, e, f and g, as specified
 362          * by the forEachOrdered operation.
 363          */
 364 
 365         private final PipelineHelper<T> helper;
 366         private Spliterator<S> spliterator;
 367         private final long targetSize;
 368         private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
 369         private final Sink<T> action;
 370         private final ForEachOrderedTask<S, T> leftPredecessor;
 371         private Node<T> node;
 372 
 373         protected ForEachOrderedTask(PipelineHelper<T> helper,
 374                                      Spliterator<S> spliterator,
 375                                      Sink<T> action) {
 376             super(null);
 377             this.helper = helper;
 378             this.spliterator = spliterator;
 379             this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
 380             // Size map to avoid concurrent re-sizes
 381             this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.LEAF_TARGET << 1));
 382             this.action = action;
 383             this.leftPredecessor = null;
 384         }
 385 
 386         ForEachOrderedTask(ForEachOrderedTask<S, T> parent,
 387                            Spliterator<S> spliterator,
 388                            ForEachOrderedTask<S, T> leftPredecessor) {
 389             super(parent);
 390             this.helper = parent.helper;
 391             this.spliterator = spliterator;
 392             this.targetSize = parent.targetSize;
 393             this.completionMap = parent.completionMap;
 394             this.action = parent.action;
 395             this.leftPredecessor = leftPredecessor;
 396         }
 397 
 398         @Override
 399         public final void compute() {
 400             doCompute(this);
 401         }


   1 /*
   2  * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.Spliterator;
  29 import java.util.concurrent.ConcurrentHashMap;
  30 import java.util.concurrent.CountedCompleter;

  31 import java.util.function.Consumer;
  32 import java.util.function.DoubleConsumer;
  33 import java.util.function.IntConsumer;
  34 import java.util.function.IntFunction;
  35 import java.util.function.LongConsumer;
  36 
  37 /**
  38  * Factory for creating instances of {@code TerminalOp} that perform an
  39  * action for every element of a stream.  Supported variants include unordered
  40  * traversal (elements are provided to the {@code Consumer} as soon as they are
  41  * available), and ordered traversal (elements are provided to the
  42  * {@code Consumer} in encounter order.)
  43  *
  44  * <p>Elements are provided to the {@code Consumer} on whatever thread and
  45  * whatever order they become available.  For ordered traversals, it is
  46  * guaranteed that processing an element <em>happens-before</em> processing
  47  * subsequent elements in the encounter order.
  48  *
  49  * <p>Exceptions occurring as a result of sending an element to the
  50  * {@code Consumer} will be relayed to the caller and traversal will be


 360          * reporting of elements, covered by tasks d, e, f and g, as specified
 361          * by the forEachOrdered operation.
 362          */
 363 
 364         private final PipelineHelper<T> helper;
 365         private Spliterator<S> spliterator;
 366         private final long targetSize;
 367         private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
 368         private final Sink<T> action;
 369         private final ForEachOrderedTask<S, T> leftPredecessor;
 370         private Node<T> node;
 371 
 372         protected ForEachOrderedTask(PipelineHelper<T> helper,
 373                                      Spliterator<S> spliterator,
 374                                      Sink<T> action) {
 375             super(null);
 376             this.helper = helper;
 377             this.spliterator = spliterator;
 378             this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
 379             // Size map to avoid concurrent re-sizes
 380             this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.getLeafTarget() << 1));
 381             this.action = action;
 382             this.leftPredecessor = null;
 383         }
 384 
 385         ForEachOrderedTask(ForEachOrderedTask<S, T> parent,
 386                            Spliterator<S> spliterator,
 387                            ForEachOrderedTask<S, T> leftPredecessor) {
 388             super(parent);
 389             this.helper = parent.helper;
 390             this.spliterator = spliterator;
 391             this.targetSize = parent.targetSize;
 392             this.completionMap = parent.completionMap;
 393             this.action = parent.action;
 394             this.leftPredecessor = leftPredecessor;
 395         }
 396 
 397         @Override
 398         public final void compute() {
 399             doCompute(this);
 400         }


< prev index next >