/*
* Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Spliterator;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.ForkJoinPool;
/**
* Abstract base class for most fork-join tasks used to implement stream ops.
* Manages splitting logic, tracking of child tasks, and intermediate results.
* Each task is associated with a {@link Spliterator} that describes a portion of the
* input. Tasks may be leaf nodes (which will traverse the elements of the {@code Spliterator})
* or internal nodes (which split the {@code Spliterator} into multiple child tasks).
*
*
This class is based on {@link CountedCompleter}, a form of fork-join task where each
* task has a semaphore-like count of uncompleted children, and the task is implicitly
* completed and notified when its last child completes Internal node tasks will likely
* override the {@code onCompletion} method from {@code CountedCompleter} to merge the
* results from child tasks into the current task's result.
*
*
Splitting and setting up the child task links is done by {@code compute()}
* for internal nodes. At {@code compute()} time for leaf nodes, it is guaranteed
* that the parent's child-related fields (including sibling links for the parent's children)
* will be set up for all children.
*
*
For example, a task that performs a reduce would override {@code doLeaf()} to perform a reduction
* on that leaf node's chunk using the {@code Spliterator}, and override {@code onCompletion()}
* to merge the results of the child tasks for internal nodes:
*
*
* protected S doLeaf() {
* spliterator.forEach(...);
* return localReductionResult;
* }
*
* public void onCompletion(CountedCompleter caller) {
* if (!isLeaf()) {
* ReduceTask child = children;
* R result = child.getLocalResult();
* child = child.nextSibling;
* for (; child != null; child = child.nextSibling)
* result = combine(result, child.getLocalResult());
* setLocalResult(result);
* }
* }
*
*
* @param Type of elements input to the pipeline
* @param Type of elements output from the pipeline
* @param Type of intermediate result, which may be different from operation result type
* @param Type of child and sibling tasks
* @since 1.8
*/
abstract class AbstractTask>
extends CountedCompleter {
/** The pipeline helper, common to all tasks in a computation */
protected final PipelineHelper helper;
/** The spliterator for the portion of the input associated with the subtree rooted at this task */
protected final Spliterator spliterator;
/** Target leaf size */
protected final long targetSize;
/** How many children does this task have? */
protected int numChildren;
/** This task's first child. Children are stored in a linked list, using the {@code nextSibling} field
* as the link to the next child. */
protected T children;
/** Next sibling of this task */
protected T nextSibling;
/** The result of this node, if completed */
private R localResult;
/**
* Constructor for root nodes.
*/
protected AbstractTask(PipelineHelper helper) {
super(null);
this.helper = helper;
this.spliterator = helper.sourceSpliterator();
this.targetSize = suggestTargetSize(spliterator.estimateSize());
}
/**
* Constructor for non-root nodes
* @param parent This node's parent task
* @param spliterator Spliterator describing the subtree rooted at this node,
* obtained by splitting the parent spliterator
*/
protected AbstractTask(T parent, Spliterator spliterator) {
super(parent);
this.spliterator = spliterator;
this.helper = parent.helper;
this.targetSize = parent.targetSize;
}
/** Construct a new node of type T whose parent is the receiver; must call
* the AbstractTask(T, Spliterator) constructor with the receiver and the provided Spliterator. */
protected abstract T makeChild(Spliterator spliterator);
/**
* Compute the result associated with a leaf node. Will be called by {@code compute()} and
* the result passed to @{code setLocalResult()}
*/
protected abstract R doLeaf();
/** Suggest a target leaf size based on the initial size estimate */
public static long suggestTargetSize(long sizeEstimate) {
long est = sizeEstimate / (ForkJoinPool.getCommonPoolParallelism() << 3);
return est > 0L ? est : 1L; // slack of 3; at least one
}
/**
* Suggest whether it is adviseable to split the provided spliterator based on
* target size and other considerations, such as pool state
*/
public static boolean suggestSplit(PipelineHelper helper,
Spliterator spliterator,
long targetSize) {
long remaining = spliterator.estimateSize();
return (remaining > targetSize);
// @@@ May want to fold in pool characteristics such as surplus task count
}
/**
* Suggest whether it is adviseable to split this task based on target size and other considerations
*/
public boolean suggestSplit() {
return suggestSplit(helper, spliterator, targetSize);
}
/** Returns the local result, if any */
@Override
public R getRawResult() {
return localResult;
}
/** Does nothing; argument must be null, or an exception is thrown */
@Override
protected void setRawResult(R result) {
if (result != null)
throw new IllegalStateException();
}
/**
* Retrieve a result previously stored with {@link #setLocalResult}
*/
protected R getLocalResult() {
return localResult;
}
/**
* Associate the result with the task, can be retrieved with {@link #getLocalResult}
*/
protected void setLocalResult(R localResult) {
this.localResult = localResult;
}
/** Is this task a leaf node? (Only valid after {@link #compute} has been called on this node).
* If the node is not a leaf node, then children will be non-null and numChildren will be positive. */
protected boolean isLeaf() {
return children == null;
}
/** Is this task the root node? */
protected boolean isRoot() {
return getParent() == null;
}
/**
* Return the parent of this task, or null if this task is the root
*/
protected T getParent() {
return (T) getCompleter();
}
/**
* Decide whether or not to split this task further or compute it directly.
* If computing directly, call {@code doLeaf} and pass the result to
* {@code setRawResult}. If splitting, set up the child-related fields,
* create the child tasks, fork the leftmost (prefix) child task,
* and compute the rightmost (remaining) child task.
*
* Computing will continue for rightmost tasks while a task
* can be computed as determined by {@link #canCompute()} and that
* task should and can be split into left and right tasks.
*/
@Override
public final void compute() {
doCompute((T) this);
}
/**
* Decide whether or not to split a task further or compute it directly.
* If computing directly, call {@code doLeaf} and pass the result to
* {@code setRawResult}. If splitting, set up the child-related fields,
* create the child tasks, fork the leftmost (prefix) child tasks,
* and compute the rightmost (remaining) child tasks.
*
*
Computing will continue for rightmost tasks while a task
* can be computed as determined by {@link #canCompute()} and that
* task should and can be split into left and right tasks.
*
*
The rightmost tasks are computed in a loop rather than recursively to
* avoid potential stack overflows when computing with a right-balanced tree,
* such as that produced when splitting with a {@link Spliterator} created
* from an {@link java.util.Iterator}.
*
* @param task the task to be computed.
*/
private static > void doCompute(T task) {
while (task.canCompute()) {
Spliterator split = null;
if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) {
task.setLocalResult(task.doLeaf());
task.tryComplete();
return;
}
else {
// Common case -- binary splits
T leftChild = task.makeChild(split);
T rightChild = task.makeChild(task.spliterator);
task.setPendingCount(1);
task.numChildren = 2;
task.children = leftChild;
leftChild.nextSibling = rightChild;
leftChild.fork();
task = rightChild;
}
}
}
/**
* Determine if the task can be computed.
*
* @return true if this task can be computed to either calculate the leaf
* via {@link #doLeaf()} or split, otherwise false if this task cannot be
* computed, for example if this task has been cancelled and/or a result
* for the computation has been found by another task.
*/
protected boolean canCompute() {
return true;
}
}