1 /* 2 * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Spliterator; 28 import java.util.concurrent.CountedCompleter; 29 import java.util.concurrent.ForkJoinPool; 30 31 /** 32 * Abstract base class for most fork-join tasks used to implement stream ops. 33 * Manages splitting logic, tracking of child tasks, and intermediate results. 34 * Each task is associated with a {@link Spliterator} that describes a portion 35 * of the input. Tasks may be leaf nodes (which will traverse the elements of 36 * the {@code Spliterator}) or internal nodes (which split the 37 * {@code Spliterator} into multiple child tasks). 38 * 39 * <p>This class is based on {@link CountedCompleter}, a form of fork-join task 40 * where each task has a semaphore-like count of uncompleted children, and the 41 * task is implicitly completed and notified when its last child completes. 42 * Internal node tasks will likely override the {@code onCompletion} method from 43 * {@code CountedCompleter} to merge the results from child tasks into the 44 * current task's result. 45 * 46 * <p>Splitting and setting up the child task links is done by {@code compute()} 47 * for internal nodes. At {@code compute()} time for leaf nodes, it is 48 * guaranteed that the parent's child-related fields (including sibling links 49 * for the parent's children) will be set up for all children. 50 * 51 * <p>For example, a task that performs a reduce would override {@code doLeaf()} 52 * to perform a reduction on that leaf node's chunk using the 53 * {@code Spliterator}, and override {@code onCompletion()} to merge the results 54 * of the child tasks for internal nodes: 55 * 56 * <pre> 57 * protected S doLeaf() { 58 * spliterator.forEach(...); 59 * return localReductionResult; 60 * } 61 * 62 * public void onCompletion(CountedCompleter caller) { 63 * if (!isLeaf()) { 64 * ReduceTask<P_IN, P_OUT, T, R> child = children; 65 * R result = child.getLocalResult(); 66 * child = child.nextSibling; 67 * for (; child != null; child = child.nextSibling) 68 * result = combine(result, child.getLocalResult()); 69 * setLocalResult(result); 70 * } 71 * } 72 * </pre> 73 * 74 * @param <P_IN> Type of elements input to the pipeline 75 * @param <P_OUT> Type of elements output from the pipeline 76 * @param <R> Type of intermediate result, which may be different from operation 77 * result type 78 * @param <T> Type of child and sibling tasks 79 * @since 1.8 80 */ 81 abstract class AbstractTask<P_IN, P_OUT, R, T extends AbstractTask<P_IN, P_OUT, R, T>> 82 extends CountedCompleter<R> { 83 84 /** The pipeline helper, common to all tasks in a computation */ 85 protected final PipelineHelper<P_IN, P_OUT> helper; 86 87 /** 88 * The spliterator for the portion of the input associated with the subtree 89 * rooted at this task 90 */ 91 protected Spliterator<P_IN> spliterator; 92 93 /** Target leaf size */ 94 protected final long targetSize; 95 96 /** How many children does this task have? */ 97 protected int numChildren; 98 99 /** 100 * This task's first child. Children are stored in a linked list, using 101 * the {@code nextSibling} field as the link to the next child. 102 */ 103 protected T children; 104 105 /** Next sibling of this task */ 106 protected T nextSibling; 107 108 /** The result of this node, if completed */ 109 private R localResult; 110 111 /** 112 * Constructor for root nodes. 113 */ 114 protected AbstractTask(PipelineHelper<P_IN, P_OUT> helper) { 115 super(null); 116 this.helper = helper; 117 this.spliterator = helper.sourceSpliterator(); 118 this.targetSize = suggestTargetSize(spliterator.estimateSize()); 119 } 120 121 /** 122 * Constructor for non-root nodes 123 * 124 * @param parent This node's parent task 125 * @param spliterator Spliterator describing the subtree rooted at this 126 * node, obtained by splitting the parent spliterator 127 */ 128 protected AbstractTask(T parent, Spliterator<P_IN> spliterator) { 129 super(parent); 130 this.spliterator = spliterator; 131 this.helper = parent.helper; 132 this.targetSize = parent.targetSize; 133 } 134 135 /** 136 * Constructs a new node of type T whose parent is the receiver; must call 137 * the AbstractTask(T, Spliterator) constructor with the receiver and the 138 * provided Spliterator. 139 */ 140 protected abstract T makeChild(Spliterator<P_IN> spliterator); 141 142 /** 143 * Computes the result associated with a leaf node. Will be called by 144 * {@code compute()} and the result passed to @{code setLocalResult()} 145 */ 146 protected abstract R doLeaf(); 147 148 /** Suggests a target leaf size based on the initial size estimate */ 149 public static long suggestTargetSize(long sizeEstimate) { 150 long est = sizeEstimate / (ForkJoinPool.getCommonPoolParallelism() << 3); 151 return est > 0L ? est : 1L; // slack of 3; at least one 152 } 153 154 /** 155 * Suggests whether it is adviseable to split the provided spliterator based 156 * on target size and other considerations, such as pool state 157 */ 158 public static<P_IN, P_OUT> boolean suggestSplit(PipelineHelper<P_IN, P_OUT> helper, 159 Spliterator spliterator, 160 long targetSize) { 161 long remaining = spliterator.estimateSize(); 162 return (remaining > targetSize); 163 // @@@ May want to fold in pool characteristics such as surplus task count 164 } 165 166 /** 167 * Suggests whether it is adviseable to split this task based on target size 168 * and other considerations 169 */ 170 public boolean suggestSplit() { 171 return suggestSplit(helper, spliterator, targetSize); 172 } 173 174 /** Returns the local result, if any */ 175 @Override 176 public R getRawResult() { 177 return localResult; 178 } 179 180 /** Does nothing; argument must be null, or an exception is thrown */ 181 @Override 182 protected void setRawResult(R result) { 183 if (result != null) 184 throw new IllegalStateException(); 185 } 186 187 /** 188 * Retrieves a result previously stored with {@link #setLocalResult} 189 */ 190 protected R getLocalResult() { 191 return localResult; 192 } 193 194 /** 195 * Associates the result with the task, can be retrieved with 196 * {@link #getLocalResult} 197 */ 198 protected void setLocalResult(R localResult) { 199 this.localResult = localResult; 200 } 201 202 /** 203 * Determines if this this task a leaf node. (Only valid after {@link #compute} has been 204 * called on this node). If the node is not a leaf node, then children will 205 * be non-null and numChildren will be positive. 206 */ 207 protected boolean isLeaf() { 208 return children == null; 209 } 210 211 /** 212 * Determines if this task is a root node 213 */ 214 protected boolean isRoot() { 215 return getParent() == null; 216 } 217 218 /** 219 * Returns the parent of this task, or null if this task is the root 220 */ 221 @SuppressWarnings("unchecked") 222 protected T getParent() { 223 return (T) getCompleter(); 224 } 225 226 /** 227 * Decides whether or not to split this task further or compute it directly. 228 * If computing directly, call {@code doLeaf} and pass the result to 229 * {@code setRawResult}. If splitting, set up the child-related fields, 230 * create the child tasks, fork the leftmost (prefix) child task, 231 * and compute the rightmost (remaining) child task. 232 * 233 * <p>Computing will continue for rightmost tasks while a task 234 * can be computed as determined by {@link #canCompute()} and that 235 * task should and can be split into left and right tasks. 236 */ 237 @Override 238 @SuppressWarnings("unchecked") 239 public final void compute() { 240 doCompute((T) this); 241 } 242 243 /** 244 * Decides whether or not to split a task further or compute it directly. If 245 * computing directly, call {@code doLeaf} and pass the result to 246 * {@code setRawResult}. If splitting, set up the child-related fields, 247 * create the child tasks, fork the leftmost (prefix) child tasks, and 248 * compute the rightmost (remaining) child tasks. 249 * 250 * <p> 251 * Computing will continue for rightmost tasks while a task can be computed 252 * as determined by {@link #canCompute()} and that task should and can be 253 * split into left and right tasks. 254 * 255 * <p> 256 * The rightmost tasks are computed in a loop rather than recursively to 257 * avoid potential stack overflows when computing with a right-balanced 258 * tree, such as that produced when splitting with a {@link Spliterator} 259 * created from an {@link java.util.Iterator}. 260 * 261 * @param task the task to be computed. 262 */ 263 private static <P_IN, P_OUT, R, T extends AbstractTask<P_IN, P_OUT, R, T>> void doCompute(T task) { 264 while (task.canCompute()) { 265 Spliterator<P_IN> split = null; 266 if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) { 267 task.setLocalResult(task.doLeaf()); 268 task.tryComplete(); 269 return; 270 } 271 else { 272 // Common case -- binary splits 273 T leftChild = task.makeChild(split); 274 T rightChild = task.makeChild(task.spliterator); 275 task.setPendingCount(1); 276 task.numChildren = 2; 277 task.children = leftChild; 278 leftChild.nextSibling = rightChild; 279 leftChild.fork(); 280 task = rightChild; 281 } 282 } 283 } 284 285 /** 286 * {@inheritDoc} 287 * Clears spliterator and children fields. Overriders MUST call 288 * {@code super.onCompletion} as the last thing they do if they want these 289 * cleared 290 */ 291 @Override 292 public void onCompletion(CountedCompleter<?> caller) { 293 spliterator = null; 294 children = null; 295 } 296 297 /** 298 * Determines if the task can be computed. 299 * 300 * @return true if this task can be computed to either calculate the leaf 301 * via {@link #doLeaf()} or split, otherwise false if this task 302 * cannot be computed, for example if this task has been cancelled 303 * and/or a result for the computation has been found by another 304 * task. 305 */ 306 protected boolean canCompute() { 307 return true; 308 } 309 }