1 /*
   2  * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Spliterator;
  28 import java.util.concurrent.CountedCompleter;
  29 import java.util.concurrent.ForkJoinPool;
  30 import java.util.concurrent.ForkJoinWorkerThread;
  31 
  32 /**
  33  * Abstract base class for most fork-join tasks used to implement stream ops.
  34  * Manages splitting logic, tracking of child tasks, and intermediate results.
  35  * Each task is associated with a {@link Spliterator} that describes the portion
  36  * of the input associated with the subtree rooted at this task.
  37  * Tasks may be leaf nodes (which will traverse the elements of
  38  * the {@code Spliterator}) or internal nodes (which split the
  39  * {@code Spliterator} into multiple child tasks).
  40  *
  41  * @implNote
  42  * <p>This class is based on {@link CountedCompleter}, a form of fork-join task
  43  * where each task has a semaphore-like count of uncompleted children, and the
  44  * task is implicitly completed and notified when its last child completes.
  45  * Internal node tasks will likely override the {@code onCompletion} method from
  46  * {@code CountedCompleter} to merge the results from child tasks into the
  47  * current task's result.
  48  *
  49  * <p>Splitting and setting up the child task links is done by {@code compute()}
  50  * for internal nodes.  At {@code compute()} time for leaf nodes, it is
  51  * guaranteed that the parent's child-related fields (including sibling links
  52  * for the parent's children) will be set up for all children.
  53  *
  54  * <p>For example, a task that performs a reduce would override {@code doLeaf()}
  55  * to perform a reduction on that leaf node's chunk using the
  56  * {@code Spliterator}, and override {@code onCompletion()} to merge the results
  57  * of the child tasks for internal nodes:
  58  *
  59  * <pre>{@code
  60  *     protected S doLeaf() {
  61  *         spliterator.forEach(...);
  62  *         return localReductionResult;
  63  *     }
  64  *
  65  *     public void onCompletion(CountedCompleter caller) {
  66  *         if (!isLeaf()) {
  67  *             ReduceTask<P_IN, P_OUT, T, R> child = children;
  68  *             R result = child.getLocalResult();
  69  *             child = child.nextSibling;
  70  *             for (; child != null; child = child.nextSibling)
  71  *                 result = combine(result, child.getLocalResult());
  72  *             setLocalResult(result);
  73  *         }
  74  *     }
  75  * }</pre>
  76  *
  77  * <p>Serialization is not supported as there is no intention to serialize
  78  * tasks managed by stream ops.
  79  *
  80  * @param <P_IN> Type of elements input to the pipeline
  81  * @param <P_OUT> Type of elements output from the pipeline
  82  * @param <R> Type of intermediate result, which may be different from operation
  83  *        result type
  84  * @param <K> Type of parent, child and sibling tasks
  85  * @since 1.8
  86  */
  87 @SuppressWarnings("serial")
  88 abstract class AbstractTask<P_IN, P_OUT, R,
  89                             K extends AbstractTask<P_IN, P_OUT, R, K>>
  90         extends CountedCompleter<R> {
  91 
  92     private static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
  93 
  94     /** The pipeline helper, common to all tasks in a computation */
  95     protected final PipelineHelper<P_OUT> helper;
  96 
  97     /**
  98      * The spliterator for the portion of the input associated with the subtree
  99      * rooted at this task
 100      */
 101     protected Spliterator<P_IN> spliterator;
 102 
 103     /** Target leaf size, common to all tasks in a computation */
 104     protected long targetSize; // may be lazily initialized
 105 
 106     /**
 107      * The left child.
 108      * null if no children
 109      * if non-null rightChild is non-null
 110      */
 111     protected K leftChild;
 112 
 113     /**
 114      * The right child.
 115      * null if no children
 116      * if non-null leftChild is non-null
 117      */
 118     protected K rightChild;
 119 
 120     /** The result of this node, if completed */
 121     private R localResult;
 122 
 123     /**
 124      * Constructor for root nodes.
 125      *
 126      * @param helper The {@code PipelineHelper} describing the stream pipeline
 127      *               up to this operation
 128      * @param spliterator The {@code Spliterator} describing the source for this
 129      *                    pipeline
 130      */
 131     protected AbstractTask(PipelineHelper<P_OUT> helper,
 132                            Spliterator<P_IN> spliterator) {
 133         super(null);
 134         this.helper = helper;
 135         this.spliterator = spliterator;
 136         this.targetSize = 0L;
 137     }
 138 
 139     /**
 140      * Constructor for non-root nodes.
 141      *
 142      * @param parent this node's parent task
 143      * @param spliterator {@code Spliterator} describing the subtree rooted at
 144      *        this node, obtained by splitting the parent {@code Spliterator}
 145      */
 146     protected AbstractTask(K parent,
 147                            Spliterator<P_IN> spliterator) {
 148         super(parent);
 149         this.spliterator = spliterator;
 150         this.helper = parent.helper;
 151         this.targetSize = parent.targetSize;
 152     }
 153 
 154     /**
 155      * Default target of leaf tasks for parallel decomposition.
 156      * To allow load balancing, we over-partition, currently to approximately
 157      * four tasks per processor, which enables others to help out
 158      * if leaf tasks are uneven or some processors are otherwise busy.
 159      */
 160     public static int getLeafTarget() {
 161         Thread t = Thread.currentThread();
 162         if (t instanceof ForkJoinWorkerThread) {
 163             return ((ForkJoinWorkerThread) t).getPool().getParallelism() << 2;
 164         }
 165         else {
 166             return LEAF_TARGET;
 167         }
 168     }
 169 
 170     /**
 171      * Constructs a new node of type T whose parent is the receiver; must call
 172      * the AbstractTask(T, Spliterator) constructor with the receiver and the
 173      * provided Spliterator.
 174      *
 175      * @param spliterator {@code Spliterator} describing the subtree rooted at
 176      *        this node, obtained by splitting the parent {@code Spliterator}
 177      * @return newly constructed child node
 178      */
 179     protected abstract K makeChild(Spliterator<P_IN> spliterator);
 180 
 181     /**
 182      * Computes the result associated with a leaf node.  Will be called by
 183      * {@code compute()} and the result passed to @{code setLocalResult()}
 184      *
 185      * @return the computed result of a leaf node
 186      */
 187     protected abstract R doLeaf();
 188 
 189     /**
 190      * Returns a suggested target leaf size based on the initial size estimate.
 191      *
 192      * @return suggested target leaf size
 193      */
 194     public static long suggestTargetSize(long sizeEstimate) {
 195         long est = sizeEstimate / getLeafTarget();
 196         return est > 0L ? est : 1L;
 197     }
 198 
 199     /**
 200      * Returns the targetSize, initializing it via the supplied
 201      * size estimate if not already initialized.
 202      */
 203     protected final long getTargetSize(long sizeEstimate) {
 204         long s;
 205         return ((s = targetSize) != 0 ? s :
 206                 (targetSize = suggestTargetSize(sizeEstimate)));
 207     }
 208 
 209     /**
 210      * Returns the local result, if any. Subclasses should use
 211      * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage
 212      * results.  This returns the local result so that calls from within the
 213      * fork-join framework will return the correct result.
 214      *
 215      * @return local result for this node previously stored with
 216      * {@link #setLocalResult}
 217      */
 218     @Override
 219     public R getRawResult() {
 220         return localResult;
 221     }
 222 
 223     /**
 224      * Does nothing; instead, subclasses should use
 225      * {@link #setLocalResult(Object)}} to manage results.
 226      *
 227      * @param result must be null, or an exception is thrown (this is a safety
 228      *        tripwire to detect when {@code setRawResult()} is being used
 229      *        instead of {@code setLocalResult()}
 230      */
 231     @Override
 232     protected void setRawResult(R result) {
 233         if (result != null)
 234             throw new IllegalStateException();
 235     }
 236 
 237     /**
 238      * Retrieves a result previously stored with {@link #setLocalResult}
 239      *
 240      * @return local result for this node previously stored with
 241      * {@link #setLocalResult}
 242      */
 243     protected R getLocalResult() {
 244         return localResult;
 245     }
 246 
 247     /**
 248      * Associates the result with the task, can be retrieved with
 249      * {@link #getLocalResult}
 250      *
 251      * @param localResult local result for this node
 252      */
 253     protected void setLocalResult(R localResult) {
 254         this.localResult = localResult;
 255     }
 256 
 257     /**
 258      * Indicates whether this task is a leaf node.  (Only valid after
 259      * {@link #compute} has been called on this node).  If the node is not a
 260      * leaf node, then children will be non-null and numChildren will be
 261      * positive.
 262      *
 263      * @return {@code true} if this task is a leaf node
 264      */
 265     protected boolean isLeaf() {
 266         return leftChild == null;
 267     }
 268 
 269     /**
 270      * Indicates whether this task is the root node
 271      *
 272      * @return {@code true} if this task is the root node.
 273      */
 274     protected boolean isRoot() {
 275         return getParent() == null;
 276     }
 277 
 278     /**
 279      * Returns the parent of this task, or null if this task is the root
 280      *
 281      * @return the parent of this task, or null if this task is the root
 282      */
 283     @SuppressWarnings("unchecked")
 284     protected K getParent() {
 285         return (K) getCompleter();
 286     }
 287 
 288     /**
 289      * Decides whether or not to split a task further or compute it
 290      * directly. If computing directly, calls {@code doLeaf} and pass
 291      * the result to {@code setRawResult}. Otherwise splits off
 292      * subtasks, forking one and continuing as the other.
 293      *
 294      * <p> The method is structured to conserve resources across a
 295      * range of uses.  The loop continues with one of the child tasks
 296      * when split, to avoid deep recursion. To cope with spliterators
 297      * that may be systematically biased toward left-heavy or
 298      * right-heavy splits, we alternate which child is forked versus
 299      * continued in the loop.
 300      */
 301     @Override
 302     public void compute() {
 303         Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
 304         long sizeEstimate = rs.estimateSize();
 305         long sizeThreshold = getTargetSize(sizeEstimate);
 306         boolean forkRight = false;
 307         @SuppressWarnings("unchecked") K task = (K) this;
 308         while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
 309             K leftChild, rightChild, taskToFork;
 310             task.leftChild  = leftChild = task.makeChild(ls);
 311             task.rightChild = rightChild = task.makeChild(rs);
 312             task.setPendingCount(1);
 313             if (forkRight) {
 314                 forkRight = false;
 315                 rs = ls;
 316                 task = leftChild;
 317                 taskToFork = rightChild;
 318             }
 319             else {
 320                 forkRight = true;
 321                 task = rightChild;
 322                 taskToFork = leftChild;
 323             }
 324             taskToFork.fork();
 325             sizeEstimate = rs.estimateSize();
 326         }
 327         task.setLocalResult(task.doLeaf());
 328         task.tryComplete();
 329     }
 330 
 331     /**
 332      * {@inheritDoc}
 333      *
 334      * @implNote
 335      * Clears spliterator and children fields.  Overriders MUST call
 336      * {@code super.onCompletion} as the last thing they do if they want these
 337      * cleared.
 338      */
 339     @Override
 340     public void onCompletion(CountedCompleter<?> caller) {
 341         spliterator = null;
 342         leftChild = rightChild = null;
 343     }
 344 
 345     /**
 346      * Returns whether this node is a "leftmost" node -- whether the path from
 347      * the root to this node involves only traversing leftmost child links.  For
 348      * a leaf node, this means it is the first leaf node in the encounter order.
 349      *
 350      * @return {@code true} if this node is a "leftmost" node
 351      */
 352     protected boolean isLeftmostNode() {
 353         @SuppressWarnings("unchecked")
 354         K node = (K) this;
 355         while (node != null) {
 356             K parent = node.getParent();
 357             if (parent != null && parent.leftChild != node)
 358                 return false;
 359             node = parent;
 360         }
 361         return true;
 362     }
 363 }