/* * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.util.stream; import java.util.Spliterator; import java.util.concurrent.CountedCompleter; import java.util.concurrent.ForkJoinPool; /** * Abstract base class for most fork-join tasks used to implement stream ops. * Manages splitting logic, tracking of child tasks, and intermediate results. * Each task is associated with a {@link Spliterator} that describes a portion of the * input. Tasks may be leaf nodes (which will traverse the elements of the {@code Spliterator}) * or internal nodes (which split the {@code Spliterator} into multiple child tasks). * *

This class is based on {@link CountedCompleter}, a form of fork-join task where each * task has a semaphore-like count of uncompleted children, and the task is implicitly * completed and notified when its last child completes Internal node tasks will likely * override the {@code onCompletion} method from {@code CountedCompleter} to merge the * results from child tasks into the current task's result. * *

Splitting and setting up the child task links is done by {@code compute()} * for internal nodes. At {@code compute()} time for leaf nodes, it is guaranteed * that the parent's child-related fields (including sibling links for the parent's children) * will be set up for all children. * *

For example, a task that performs a reduce would override {@code doLeaf()} to perform a reduction * on that leaf node's chunk using the {@code Spliterator}, and override {@code onCompletion()} * to merge the results of the child tasks for internal nodes: * *

 *     protected S doLeaf() {
 *         spliterator.forEach(...);
 *         return localReductionResult;
 *     }
 *
 *     public void onCompletion(CountedCompleter caller) {
 *         if (!isLeaf()) {
 *             ReduceTask child = children;
 *             R result = child.getLocalResult();
 *             child = child.nextSibling;
 *             for (; child != null; child = child.nextSibling)
 *                 result = combine(result, child.getLocalResult());
 *             setLocalResult(result);
 *         }
 *     }
 * 
* * @param Type of elements input to the pipeline * @param Type of elements output from the pipeline * @param Type of intermediate result, which may be different from operation result type * @param Type of child and sibling tasks * @since 1.8 */ abstract class AbstractTask> extends CountedCompleter { /** The pipeline helper, common to all tasks in a computation */ protected final PipelineHelper helper; /** The spliterator for the portion of the input associated with the subtree rooted at this task */ protected final Spliterator spliterator; /** Target leaf size */ protected final long targetSize; /** How many children does this task have? */ protected int numChildren; /** This task's first child. Children are stored in a linked list, using the {@code nextSibling} field * as the link to the next child. */ protected T children; /** Next sibling of this task */ protected T nextSibling; /** The result of this node, if completed */ private R localResult; /** * Constructor for root nodes. */ protected AbstractTask(PipelineHelper helper) { super(null); this.helper = helper; this.spliterator = helper.sourceSpliterator(); this.targetSize = suggestTargetSize(spliterator.estimateSize()); } /** * Constructor for non-root nodes * @param parent This node's parent task * @param spliterator Spliterator describing the subtree rooted at this node, * obtained by splitting the parent spliterator */ protected AbstractTask(T parent, Spliterator spliterator) { super(parent); this.spliterator = spliterator; this.helper = parent.helper; this.targetSize = parent.targetSize; } /** Construct a new node of type T whose parent is the receiver; must call * the AbstractTask(T, Spliterator) constructor with the receiver and the provided Spliterator. */ protected abstract T makeChild(Spliterator spliterator); /** * Compute the result associated with a leaf node. Will be called by {@code compute()} and * the result passed to @{code setLocalResult()} */ protected abstract R doLeaf(); /** Suggest a target leaf size based on the initial size estimate */ public static long suggestTargetSize(long sizeEstimate) { long est = sizeEstimate / (ForkJoinPool.getCommonPoolParallelism() << 3); return est > 0L ? est : 1L; // slack of 3; at least one } /** * Suggest whether it is adviseable to split the provided spliterator based on * target size and other considerations, such as pool state */ public static boolean suggestSplit(PipelineHelper helper, Spliterator spliterator, long targetSize) { long remaining = spliterator.estimateSize(); return (remaining > targetSize); // @@@ May want to fold in pool characteristics such as surplus task count } /** * Suggest whether it is adviseable to split this task based on target size and other considerations */ public boolean suggestSplit() { return suggestSplit(helper, spliterator, targetSize); } /** Returns the local result, if any */ @Override public R getRawResult() { return localResult; } /** Does nothing; argument must be null, or an exception is thrown */ @Override protected void setRawResult(R result) { if (result != null) throw new IllegalStateException(); } /** * Retrieve a result previously stored with {@link #setLocalResult} */ protected R getLocalResult() { return localResult; } /** * Associate the result with the task, can be retrieved with {@link #getLocalResult} */ protected void setLocalResult(R localResult) { this.localResult = localResult; } /** Is this task a leaf node? (Only valid after {@link #compute} has been called on this node). * If the node is not a leaf node, then children will be non-null and numChildren will be positive. */ protected boolean isLeaf() { return children == null; } /** Is this task the root node? */ protected boolean isRoot() { return getParent() == null; } /** * Return the parent of this task, or null if this task is the root */ protected T getParent() { return (T) getCompleter(); } /** * Decide whether or not to split this task further or compute it directly. * If computing directly, call {@code doLeaf} and pass the result to * {@code setRawResult}. If splitting, set up the child-related fields, * create the child tasks, fork the leftmost (prefix) child task, * and compute the rightmost (remaining) child task. * *

Computing will continue for rightmost tasks while a task * can be computed as determined by {@link #canCompute()} and that * task should and can be split into left and right tasks. */ @Override public final void compute() { doCompute((T) this); } /** * Decide whether or not to split a task further or compute it directly. * If computing directly, call {@code doLeaf} and pass the result to * {@code setRawResult}. If splitting, set up the child-related fields, * create the child tasks, fork the leftmost (prefix) child tasks, * and compute the rightmost (remaining) child tasks. * *

Computing will continue for rightmost tasks while a task * can be computed as determined by {@link #canCompute()} and that * task should and can be split into left and right tasks. * *

The rightmost tasks are computed in a loop rather than recursively to * avoid potential stack overflows when computing with a right-balanced tree, * such as that produced when splitting with a {@link Spliterator} created * from an {@link java.util.Iterator}. * * @param task the task to be computed. */ private static > void doCompute(T task) { while (task.canCompute()) { Spliterator split = null; if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) { task.setLocalResult(task.doLeaf()); task.tryComplete(); return; } else { // Common case -- binary splits T leftChild = task.makeChild(split); T rightChild = task.makeChild(task.spliterator); task.setPendingCount(1); task.numChildren = 2; task.children = leftChild; leftChild.nextSibling = rightChild; leftChild.fork(); task = rightChild; } } } /** * Determine if the task can be computed. * * @return true if this task can be computed to either calculate the leaf * via {@link #doLeaf()} or split, otherwise false if this task cannot be * computed, for example if this task has been cancelled and/or a result * for the computation has been found by another task. */ protected boolean canCompute() { return true; } }