< prev index next >

src/java.base/share/classes/java/util/stream/AbstractTask.java

Print this page
rev 47749 : 8190974: Parallel stream execution within a custom ForkJoinPool should obey the parallelism
Reviewed-by: martin, tvaleev

*** 1,7 **** /* ! * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this --- 1,7 ---- /* ! * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this
*** 25,34 **** --- 25,35 ---- package java.util.stream; import java.util.Spliterator; import java.util.concurrent.CountedCompleter; import java.util.concurrent.ForkJoinPool; + import java.util.concurrent.ForkJoinWorkerThread; /** * Abstract base class for most fork-join tasks used to implement stream ops. * Manages splitting logic, tracking of child tasks, and intermediate results. * Each task is associated with a {@link Spliterator} that describes the portion
*** 86,102 **** @SuppressWarnings("serial") abstract class AbstractTask<P_IN, P_OUT, R, K extends AbstractTask<P_IN, P_OUT, R, K>> extends CountedCompleter<R> { ! /** ! * Default target factor of leaf tasks for parallel decomposition. ! * To allow load balancing, we over-partition, currently to approximately ! * four tasks per processor, which enables others to help out ! * if leaf tasks are uneven or some processors are otherwise busy. ! */ ! static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; /** The pipeline helper, common to all tasks in a computation */ protected final PipelineHelper<P_OUT> helper; /** --- 87,97 ---- @SuppressWarnings("serial") abstract class AbstractTask<P_IN, P_OUT, R, K extends AbstractTask<P_IN, P_OUT, R, K>> extends CountedCompleter<R> { ! private static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; /** The pipeline helper, common to all tasks in a computation */ protected final PipelineHelper<P_OUT> helper; /**
*** 155,164 **** --- 150,175 ---- this.helper = parent.helper; this.targetSize = parent.targetSize; } /** + * Default target of leaf tasks for parallel decomposition. + * To allow load balancing, we over-partition, currently to approximately + * four tasks per processor, which enables others to help out + * if leaf tasks are uneven or some processors are otherwise busy. + */ + public static int getLeafTarget() { + Thread t = Thread.currentThread(); + if (t instanceof ForkJoinWorkerThread) { + return ((ForkJoinWorkerThread) t).getPool().getParallelism() << 2; + } + else { + return LEAF_TARGET; + } + } + + /** * Constructs a new node of type T whose parent is the receiver; must call * the AbstractTask(T, Spliterator) constructor with the receiver and the * provided Spliterator. * * @param spliterator {@code Spliterator} describing the subtree rooted at
*** 179,189 **** * Returns a suggested target leaf size based on the initial size estimate. * * @return suggested target leaf size */ public static long suggestTargetSize(long sizeEstimate) { ! long est = sizeEstimate / LEAF_TARGET; return est > 0L ? est : 1L; } /** * Returns the targetSize, initializing it via the supplied --- 190,200 ---- * Returns a suggested target leaf size based on the initial size estimate. * * @return suggested target leaf size */ public static long suggestTargetSize(long sizeEstimate) { ! long est = sizeEstimate / getLeafTarget(); return est > 0L ? est : 1L; } /** * Returns the targetSize, initializing it via the supplied
< prev index next >