1 /*
   2  * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Spliterator;
  28 import java.util.concurrent.CountedCompleter;
  29 import java.util.concurrent.ForkJoinPool;
  30 
  31 /**
  32  * Abstract base class for most fork-join tasks used to implement stream ops.
  33  * Manages splitting logic, tracking of child tasks, and intermediate results.
  34  * Each task is associated with a {@link Spliterator} that describes a portion
  35  * of the input.  Tasks may be leaf nodes (which will traverse the elements of
  36  * the {@code Spliterator}) or internal nodes (which split the
  37  * {@code Spliterator} into multiple child tasks).
  38  *
  39  * <p>This class is based on {@link CountedCompleter}, a form of fork-join task
  40  * where each task has a semaphore-like count of uncompleted children, and the
  41  * task is implicitly completed and notified when its last child completes.
  42  * Internal node tasks will likely override the {@code onCompletion} method from
  43  * {@code CountedCompleter} to merge the results from child tasks into the
  44  * current task's result.
  45  *
  46  * <p>Splitting and setting up the child task links is done by {@code compute()}
  47  * for internal nodes.  At {@code compute()} time for leaf nodes, it is
  48  * guaranteed that the parent's child-related fields (including sibling links
  49  * for the parent's children) will be set up for all children.
  50  *
  51  * <p>For example, a task that performs a reduce would override {@code doLeaf()}
  52  * to perform a reduction on that leaf node's chunk using the
  53  * {@code Spliterator}, and override {@code onCompletion()} to merge the results
  54  * of the child tasks for internal nodes:
  55  *
  56  * <pre>
  57  *     protected S doLeaf() {
  58  *         spliterator.forEach(...);
  59  *         return localReductionResult;
  60  *     }
  61  *
  62  *     public void onCompletion(CountedCompleter caller) {
  63  *         if (!isLeaf()) {
  64  *             ReduceTask<P_IN, P_OUT, T, R> child = children;
  65  *             R result = child.getLocalResult();
  66  *             child = child.nextSibling;
  67  *             for (; child != null; child = child.nextSibling)
  68  *                 result = combine(result, child.getLocalResult());
  69  *             setLocalResult(result);
  70  *         }
  71  *     }
  72  * </pre>
  73  *
  74  * @param <P_IN> Type of elements input to the pipeline
  75  * @param <P_OUT> Type of elements output from the pipeline
  76  * @param <R> Type of intermediate result, which may be different from operation
  77  *        result type
  78  * @param <T> Type of child and sibling tasks
  79  * @since 1.8
  80  */
  81 abstract class AbstractTask<P_IN, P_OUT, R, T extends AbstractTask<P_IN, P_OUT, R, T>>
  82         extends CountedCompleter<R> {
  83 
  84     /** The pipeline helper, common to all tasks in a computation */
  85     protected final PipelineHelper<P_IN, P_OUT> helper;
  86 
  87     /**
  88      * The spliterator for the portion of the input associated with the subtree
  89      * rooted at this task
  90      */
  91     protected Spliterator<P_IN> spliterator;
  92 
  93     /** Target leaf size */
  94     protected final long targetSize;
  95 
  96     /** How many children does this task have? */
  97     protected int numChildren;
  98 
  99     /**
 100      * This task's first child.  Children are stored in a linked list, using
 101      * the {@code nextSibling} field as the link to the next child.
 102      */
 103     protected T children;
 104 
 105     /** Next sibling of this task */
 106     protected T nextSibling;
 107 
 108     /** The result of this node, if completed */
 109     private R localResult;
 110 
 111     /**
 112      * Constructor for root nodes.
 113      */
 114     protected AbstractTask(PipelineHelper<P_IN, P_OUT> helper) {
 115         super(null);
 116         this.helper = helper;
 117         this.spliterator = helper.sourceSpliterator();
 118         this.targetSize = suggestTargetSize(spliterator.estimateSize());
 119     }
 120 
 121     /**
 122      * Constructor for non-root nodes
 123      *
 124      * @param parent This node's parent task
 125      * @param spliterator Spliterator describing the subtree rooted at this
 126      *        node, obtained by splitting the parent spliterator
 127      */
 128     protected AbstractTask(T parent, Spliterator<P_IN> spliterator) {
 129         super(parent);
 130         this.spliterator = spliterator;
 131         this.helper = parent.helper;
 132         this.targetSize = parent.targetSize;
 133     }
 134 
 135     /**
 136      * Constructs a new node of type T whose parent is the receiver; must call
 137      * the AbstractTask(T, Spliterator) constructor with the receiver and the
 138      * provided Spliterator.
 139      */
 140     protected abstract T makeChild(Spliterator<P_IN> spliterator);
 141 
 142     /**
 143      * Computes the result associated with a leaf node.  Will be called by
 144      * {@code compute()} and the result passed to @{code setLocalResult()}
 145      */
 146     protected abstract R doLeaf();
 147 
 148     /** Suggests a target leaf size based on the initial size estimate */
 149     public static long suggestTargetSize(long sizeEstimate) {
 150         long est = sizeEstimate / (ForkJoinPool.getCommonPoolParallelism() << 3);
 151         return est > 0L ? est : 1L; // slack of 3; at least one
 152     }
 153 
 154     /**
 155      * Suggests whether it is adviseable to split the provided spliterator based
 156      * on target size and other considerations, such as pool state
 157      */
 158     public static<P_IN, P_OUT> boolean suggestSplit(PipelineHelper<P_IN, P_OUT> helper,
 159                                                     Spliterator spliterator,
 160                                                     long targetSize) {
 161         long remaining = spliterator.estimateSize();
 162         return (remaining > targetSize);
 163         // @@@ May want to fold in pool characteristics such as surplus task count
 164     }
 165 
 166     /**
 167      * Suggests whether it is adviseable to split this task based on target size
 168      * and other considerations
 169      */
 170     public boolean suggestSplit() {
 171         return suggestSplit(helper, spliterator, targetSize);
 172     }
 173 
 174     /** Returns the local result, if any */
 175     @Override
 176     public R getRawResult() {
 177         return localResult;
 178     }
 179 
 180     /** Does nothing; argument must be null, or an exception is thrown */
 181     @Override
 182     protected void setRawResult(R result) {
 183         if (result != null)
 184             throw new IllegalStateException();
 185     }
 186 
 187     /**
 188      * Retrieves a result previously stored with {@link #setLocalResult}
 189      */
 190     protected R getLocalResult() {
 191         return localResult;
 192     }
 193 
 194     /**
 195      * Associates the result with the task, can be retrieved with
 196      * {@link #getLocalResult}
 197      */
 198     protected void setLocalResult(R localResult) {
 199         this.localResult = localResult;
 200     }
 201 
 202     /**
 203      * Determines if this this task a leaf node.  (Only valid after {@link #compute} has been
 204      * called on this node).  If the node is not a leaf node, then children will
 205      * be non-null and numChildren will be positive.
 206      */
 207     protected boolean isLeaf() {
 208         return children == null;
 209     }
 210 
 211     /**
 212      * Determines if this task is a root node
 213      */
 214     protected boolean isRoot() {
 215         return getParent() == null;
 216     }
 217 
 218     /**
 219      * Returns the parent of this task, or null if this task is the root
 220      */
 221     @SuppressWarnings("unchecked")
 222     protected T getParent() {
 223         return (T) getCompleter();
 224     }
 225 
 226     /**
 227      * Decides whether or not to split this task further or compute it directly.
 228      * If computing directly, call {@code doLeaf} and pass the result to
 229      * {@code setRawResult}.  If splitting, set up the child-related fields,
 230      * create the child tasks, fork the leftmost (prefix) child task,
 231      * and compute the rightmost (remaining) child task.
 232      *
 233      * <p>Computing will continue for rightmost tasks while a task
 234      * can be computed as determined by {@link #canCompute()} and that
 235      * task should and can be split into left and right tasks.
 236      */
 237     @Override
 238     @SuppressWarnings("unchecked")
 239     public final void compute() {
 240         doCompute((T) this);
 241     }
 242 
 243     /**
 244      * Decides whether or not to split a task further or compute it directly. If
 245      * computing directly, call {@code doLeaf} and pass the result to
 246      * {@code setRawResult}.  If splitting, set up the child-related fields,
 247      * create the child tasks, fork the leftmost (prefix) child tasks, and
 248      * compute the rightmost (remaining) child tasks.
 249      *
 250      * <p>
 251      * Computing will continue for rightmost tasks while a task can be computed
 252      * as determined by {@link #canCompute()} and that task should and can be
 253      * split into left and right tasks.
 254      *
 255      * <p>
 256      * The rightmost tasks are computed in a loop rather than recursively to
 257      * avoid potential stack overflows when computing with a right-balanced
 258      * tree, such as that produced when splitting with a {@link Spliterator}
 259      * created from an {@link java.util.Iterator}.
 260      *
 261      * @param task the task to be computed.
 262      */
 263     private static <P_IN, P_OUT, R, T extends AbstractTask<P_IN, P_OUT, R, T>> void doCompute(T task) {
 264         while (task.canCompute()) {
 265             Spliterator<P_IN> split = null;
 266             if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) {
 267                 task.setLocalResult(task.doLeaf());
 268                 task.tryComplete();
 269                 return;
 270             }
 271             else {
 272                 // Common case -- binary splits
 273                 T leftChild = task.makeChild(split);
 274                 T rightChild = task.makeChild(task.spliterator);
 275                 task.setPendingCount(1);
 276                 task.numChildren = 2;
 277                 task.children = leftChild;
 278                 leftChild.nextSibling = rightChild;
 279                 leftChild.fork();
 280                 task = rightChild;
 281             }
 282         }
 283     }
 284 
 285     /**
 286      * {@inheritDoc}
 287      * Clears spliterator and children fields.  Overriders MUST call
 288      * {@code super.onCompletion} as the last thing they do if they want these
 289      * cleared
 290      */
 291     @Override
 292     public void onCompletion(CountedCompleter<?> caller) {
 293         spliterator = null;
 294         children = null;
 295     }
 296 
 297     /**
 298      * Determines if the task can be computed.
 299      *
 300      * @return true if this task can be computed to either calculate the leaf
 301      *         via {@link #doLeaf()} or split, otherwise false if this task
 302      *         cannot be computed, for example if this task has been cancelled
 303      *         and/or a result for the computation has been found by another
 304      *         task.
 305      */
 306     protected boolean canCompute() {
 307         return true;
 308     }
 309 }