--- old/src/share/classes/java/util/Map.java 2013-03-11 17:43:59.000000000 -0400 +++ new/src/share/classes/java/util/Map.java 2013-03-11 17:43:58.000000000 -0400 @@ -25,6 +25,10 @@ package java.util; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; + /** * An object that maps keys to values. A map cannot contain duplicate keys; * each key can map to at most one value. @@ -115,6 +119,7 @@ * @since 1.2 */ public interface Map { + // Query Operations /** @@ -475,4 +480,454 @@ */ int hashCode(); + // Defaultable methods + + /** + * Execute the specified {@code BiConsumer} with the key and value of + * each entry in this map. + * + * @param block the {@code BiConsumer} to which entries will be applied + */ + default void forEach(BiConsumer block) { + Objects.requireNonNull(block); + for (Map.Entry entry : entrySet()) { + block.accept(entry.getKey(), entry.getValue()); + } + } + + /** + * Apply the specified function to each entry in this map, replacing + * each entry's value with the result of calling the function's + * {@link BiFunction#apply(Object, Object) BiFunction.apply(K key, V, value)} + * method with the current entry's key and value. + * + * @param function the function to apply to each entry + */ + default void replaceAll(BiFunction function) { + Objects.requireNonNull(function); + for (Map.Entry entry : entrySet()) { + entry.setValue(function.apply(entry.getKey(), entry.getValue())); + } + } + + /** + * If the specified key is not already associated with a value, + * associates it with the given value and returns {@code null}, + * else returns the current value. + * + *

The default implementation is equivalent to, for this {@code + * map}: + * + *

 {@code
+     * if (!map.containsKey(key))
+     *   return map.put(key, value);
+     * else
+     *   return map.get(key);}
+ * + *

The default implementation makes no guarantees about + * synchronization or atomicity properties of this method. Any + * class overriding this method must specify its concurrency + * properties. In particular, all implementations of + * subinterface {@link java.util.concurrent.ConcurrentMap} + * must ensure that this operation is performed atomically. + * + * @param key key with which the specified value is to be associated + * @param value value to be associated with the specified key + * @return the previous value associated with the specified key, or + * null if there was no mapping for the key. + * (A null return can also indicate that the map + * previously associated null with the key, + * if the implementation supports null values.) + * @throws UnsupportedOperationException if the put operation + * is not supported by this map + * @throws ClassCastException if the class of the specified key or value + * prevents it from being stored in this map + * @throws NullPointerException if the specified key or value is null, + * and this map does not permit null keys or values + * @throws IllegalArgumentException if some property of the specified key + * or value prevents it from being stored in this map + */ + default V putIfAbsent(K key, V value) { + return containsKey(key) ? get(key) : put(key, value); + } + + /** + * Removes the entry for the specified key only if it is currently + * mapped to the specified value. + * + *

The default implementation is equivalent to, for this {@code map}: + * + *

 {@code
+     * if (map.containsKey(key) && map.get(key).equals(value)) {
+     *   map.remove(key);
+     *   return true;
+     * } else
+     *   return false;}
+ * + *

The default implementation makes no guarantees about + * synchronization or atomicity properties of this method. Any + * class overriding this method must specify its concurrency + * properties. In particular, all implementations of + * subinterface {@link java.util.concurrent.ConcurrentMap} + * must ensure that this operation is performed atomically. + * + * @param key key with which the specified value is associated + * @param value value expected to be associated with the specified key + * @return true if the value was removed + * @throws UnsupportedOperationException if the remove operation + * is not supported by this map + * @throws ClassCastException if the key or value is of an inappropriate + * type for this map + * (optional) + * @throws NullPointerException if the specified key or value is null, + * and this map does not permit null keys or values + * (optional) + */ + default boolean remove(Object key, Object value) { + if (!containsKey(key) || !get(key).equals(value)) + return false; + remove(key); + return true; + } + + /** + * Replaces the entry for the specified key only if currently + * mapped to the specified value. + * + *

The default implementation is equivalent to, for this {@code map}: + * + *

 {@code
+     * if (map.containsKey(key) && map.get(key).equals(oldValue)) {
+     *   map.put(key, newValue);
+     *   return true;
+     * } else
+     *   return false;}
+ * + *

The default implementation makes no guarantees about + * synchronization or atomicity properties of this method. Any + * class overriding this method must specify its concurrency + * properties. In particular, all implementations of + * subinterface {@link java.util.concurrent.ConcurrentMap} + * must ensure that this operation is performed atomically. + * + * @param key key with which the specified value is associated + * @param oldValue value expected to be associated with the specified key + * @param newValue value to be associated with the specified key + * @return true if the value was replaced + * @throws UnsupportedOperationException if the put operation + * is not supported by this map + * @throws ClassCastException if the class of a specified key or value + * prevents it from being stored in this map + * @throws NullPointerException if a specified key or value is null, + * and this map does not permit null keys or values + * @throws IllegalArgumentException if some property of a specified key + * or value prevents it from being stored in this map + */ + default boolean replace(K key, V oldValue, V newValue) { + if (!containsKey(key) || !get(key).equals(oldValue)) + return false; + put(key, newValue); + return true; + } + + /** + * Replaces the entry for the specified key only if it is + * currently mapped to some value. + * + *

The default implementation is equivalent to, for this {@code map}: + * + *

 {@code
+     * if (map.containsKey(key)) {
+     *   return map.put(key, value);
+     * } else
+     *   return null;}
+ * + *

The default implementation makes no guarantees about + * synchronization or atomicity properties of this method. Any + * class overriding this method must specify its concurrency + * properties. In particular, all implementations of + * subinterface {@link java.util.concurrent.ConcurrentMap} + * must ensure that this operation is performed atomically. + * + * @param key key with which the specified value is associated + * @param value value to be associated with the specified key + * @return the previous value associated with the specified key, or + * null if there was no mapping for the key. + * (A null return can also indicate that the map + * previously associated null with the key, + * if the implementation supports null values.) + * @throws UnsupportedOperationException if the put operation + * is not supported by this map + * @throws ClassCastException if the class of the specified key or value + * prevents it from being stored in this map + * @throws NullPointerException if the specified key or value is null, + * and this map does not permit null keys or values + * @throws IllegalArgumentException if some property of the specified key + * or value prevents it from being stored in this map + */ + default V replace(K key, V value) { + return containsKey(key) ? put(key, value) : null; + } + + /** + * If the specified key is not already associated with a value (or + * is mapped to {@code null}), attempts to compute its value using + * the given mapping function and enters it into this map unless + * {@code null}. + * + *

The default implementation is equivalent to the following + * steps for this {@code map}, then returning the current value or + * {@code null} if now absent: + * + *

 {@code
+     * if (map.get(key) == null) {
+     *   V newValue = mappingFunction.apply(key);
+     *   if (newValue != null)
+     *      map.putIfAbsent(key, newValue);
+     * }}
+ * + * If the function returns {@code null} no mapping is recorded. If + * the function itself throws an (unchecked) exception, the + * exception is rethrown, and no mapping is recorded. The most + * common usage is to construct a new object serving as an initial + * mapped value or memoized result, as in: + * + *
 {@code
+     * map.computeIfAbsent(key, k -> new Value(f(k)));}
+ * + *

The default implementation makes no guarantees about + * synchronization or atomicity properties of this method or the + * application of the mapping function. Any class overriding this + * method must specify its concurrency properties. In particular, + * all implementations of subinterface {@link + * java.util.concurrent.ConcurrentMap} must document whether the + * function is applied once atomically only if the value is not + * present. Any class that permits null values must document + * whether and how this method distinguishes absence from null + * mappings. + * + * @param key key with which the specified value is to be associated + * @param mappingFunction the function to compute a value + * @return the current (existing or computed) value associated with + * the specified key, or null if the computed value is null + * @throws NullPointerException if the specified key is null and + * this map does not support null keys, or the + * mappingFunction is null + * @throws UnsupportedOperationException if the put operation + * is not supported by this map + * @throws ClassCastException if the class of the specified key or value + * prevents it from being stored in this map + * @throws RuntimeException or Error if the mappingFunction does so, + * in which case the mapping is left unestablished + */ + default V computeIfAbsent(K key, Function mappingFunction) { + V v, newValue; + return ((v = get(key)) == null && + (newValue = mappingFunction.apply(key)) != null && + (v = putIfAbsent(key, newValue)) == null) ? newValue : v; + } + + /** + * If the value for the specified key is present and non-null, + * attempts to compute a new mapping given the key and its current + * mapped value. + * + *

The default implementation is equivalent to performing the + * following steps for this {@code map}, then returning the + * current value or {@code null} if now absent: + * + *

 {@code
+     * if (map.get(key) != null) {
+     *   V oldValue = map.get(key);
+     *   V newValue = remappingFunction.apply(key, oldValue);
+     *   if (newValue != null)
+     *     map.replace(key, oldValue, newValue);
+     *   else
+     *     map.remove(key, oldValue);
+     * }}
+ * + * In concurrent contexts, the default implementation may retry + * these steps when multiple threads attempt updates. If the + * function returns {@code null}, the mapping is removed (or + * remains absent if initially absent). If the function itself + * throws an (unchecked) exception, the exception is rethrown, and + * the current mapping is left unchanged. + * + *

The default implementation makes no guarantees about + * synchronization or atomicity properties of this method or the + * application of the remapping function. Any class overriding + * this method must specify its concurrency properties. In + * particular, all implementations of subinterface {@link + * java.util.concurrent.ConcurrentMap} must document whether the + * function is applied once atomically only if the value is + * present. Any class that permits null values must document + * whether and how this method distinguishes absence from null + * mappings. + * + * @param key key with which the specified value is to be associated + * @param remappingFunction the function to compute a value + * @return the new value associated with the specified key, or null if none + * @throws NullPointerException if the specified key is null and + * this map does not support null keys, or the + * remappingFunction is null + * @throws UnsupportedOperationException if the put operation + * is not supported by this map + * @throws ClassCastException if the class of the specified key or value + * prevents it from being stored in this map + * @throws RuntimeException or Error if the remappingFunction does so, + * in which case the mapping is unchanged + */ + default V computeIfPresent(K key, + BiFunction remappingFunction) { + V v; + while ((v = get(key)) != null) { + V newValue = remappingFunction.apply(key, v); + if (newValue != null) { + if (replace(key, v, newValue)) + return newValue; + } + else if (remove(key, v)) + return null; + } + return v; + } + + /** + * Attempts to compute a mapping for the specified key and its + * current mapped value (or {@code null} if there is no current + * mapping). For example, to either create or append a {@code + * String msg} to a value mapping: + * + *

 {@code
+     * map.compute(key, (k, v) -> (v == null) ? msg : v.concat(msg))}
+ * (Method {@link #merge} is often simpler to use for such purposes.) + * + *

The default implementation is equivalent to + * performing the following steps for this {@code map}, then + * returning the current value or {@code null} if absent: + * + *

 {@code
+     * V oldValue = map.get(key);
+     * V newValue = remappingFunction.apply(key, oldValue);
+     * if (newValue != null)
+     *   map.replace(key, oldValue, newValue);
+     * else
+     *   map.remove(key, oldValue);
+     * }
+ * + * In concurrent contexts, the default implementation may retry + * these steps when multiple threads attempt updates. If the + * function returns {@code null}, the mapping is removed (or + * remains absent if initially absent). If the function itself + * throws an (unchecked) exception, the exception is rethrown, and + * the current mapping is left unchanged. + * + *

The default implementation makes no guarantees about + * synchronization or atomicity properties of this method or the + * application of the remapping function. Any class overriding + * this method must specify its concurrency properties. In + * particular, all implementations of subinterface {@link + * java.util.concurrent.ConcurrentMap} must document whether the + * function is applied exactly once atomically. Any class that + * permits null values must document whether and how this method + * distinguishes absence from null mappings. + * + * @param key key with which the specified value is to be associated + * @param remappingFunction the function to compute a value + * @return the new value associated with the specified key, or null if none + * @throws NullPointerException if the specified key is null and + * this map does not support null keys, or the + * remappingFunction is null + * @throws UnsupportedOperationException if the put operation + * is not supported by this map + * @throws ClassCastException if the class of the specified key or value + * prevents it from being stored in this map + * @throws RuntimeException or Error if the remappingFunction does so, + * in which case the mapping is unchanged + */ + default V compute(K key, + BiFunction remappingFunction) { + for (;;) { + V oldValue = get(key); + V newValue = remappingFunction.apply(key, oldValue); + if (newValue != null) { + if (replace(key, oldValue, newValue)) + return newValue; + } + else if (remove(key, oldValue)) + return null; + } + } + + /** + * If the specified key is not already associated with a + * (non-null) value, associates it with the given value. + * Otherwise, replaces the value with the results of the given + * remapping function, or removes if {@code null}. This method may + * be of use when combining multiple mapped values for a key. For + * example. to either create or append a {@code String msg} to a + * value mapping: + * + *

 {@code
+     * map.merge(key, msg, String::concat)}
+ * + *

The default implementation is equivalent to performing the + * following steps for this {@code map}, then returning the + * current value or {@code null} if absent: + * + *

 {@code
+     * V oldValue = map.get(key);
+     * V newValue = (oldValue == null) ? value :
+     *              remappingFunction.apply(oldValue, value);
+     * if (newValue == null)
+     *   map.remove(key, oldValue);
+     * else if (oldValue == null)
+     *   map.putIfAbsent(key, newValue);
+     * else
+     *   map.replace(key, oldValue, newValue);
+     * }
+ * + * In concurrent contexts, the default implementation may retry + * these steps when multiple threads attempt updates. If the + * function returns {@code null}, the mapping is removed (or + * remains absent if initially absent). If the function itself + * throws an (unchecked) exception, the exception is rethrown, and + * the current mapping is left unchanged. + * + *

The default implementation makes no guarantees about + * synchronization or atomicity properties of this method or the + * application of the remapping function. Any class overriding + * this method must specify its concurrency properties. In + * particular, all implementations of subinterface {@link + * java.util.concurrent.ConcurrentMap} must document whether the + * function is applied exactly once atomically. Any class that + * permits null values must document whether and how this method + * distinguishes absence from null mappings. + * + * @param key key with which the specified value is to be associated + * @param value the value to use if absent + * @param remappingFunction the function to recompute a value if present + * @return the new value associated with the specified key, or null if none + * @throws NullPointerException if the specified key is null and + * this map does not support null keys, or the + * remappingFunction is null + * @throws RuntimeException or Error if the remappingFunction does so, + * in which case the mapping is unchanged + */ + default V merge(K key, V value, + BiFunction remappingFunction) { + for (;;) { + V oldValue, newValue; + if ((oldValue = get(key)) == null) { + if (value == null || putIfAbsent(key, value) == null) + return value; + } + else if ((newValue = remappingFunction.apply(oldValue, value)) != null) { + if (replace(key, oldValue, newValue)) + return newValue; + } + else if (remove(key, oldValue)) + return null; + } + } } --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/AbstractShortCircuitTask.java 2013-03-11 17:43:59.000000000 -0400 @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Spliterator; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Abstract class for fork-join tasks used to implement short-circuiting + * stream ops, which can produce a result without processing all elements of the + * stream. + * + * @param Type of elements input to the pipeline + * @param Type of elements output from the pipeline + * @param Type of intermediate result, may be different from operation + * result type + * @param Type of child and sibling tasks + * @since 1.8 + */ +abstract class AbstractShortCircuitTask> + extends AbstractTask { + /** + * The result for this computation; this is shared among all tasks and set + * exactly once + */ + protected final AtomicReference sharedResult; + + /** + * Indicates whether this task has been canceled. Tasks may cancel other + * tasks in the computation under various conditions, such as in a + * find-first operation, a task that finds a value will cancel all tasks + * that are later in the encounter order. + */ + protected volatile boolean canceled; + + /** Constructor for root nodes */ + protected AbstractShortCircuitTask(PipelineHelper helper) { + super(helper); + sharedResult = new AtomicReference<>(null); + } + + /** Constructor for non-root nodes */ + protected AbstractShortCircuitTask(T parent, Spliterator spliterator) { + super(parent, spliterator); + sharedResult = parent.sharedResult; + } + + /** + * Returns the value indicating the computation completed with no task + * finding a short-circuitable result. For example, for a "find" operation, + * this might be null or an empty {@code Optional}. + */ + protected abstract R getEmptyResult(); + + @Override + protected boolean canCompute() { + // Have we already found an answer? + if (sharedResult.get() != null) { + tryComplete(); + return false; + } else if (taskCanceled()) { + setLocalResult(getEmptyResult()); + tryComplete(); + return false; + } + else { + return true; + } + } + + /** + * Declares that a globally valid result has been found. If another task has + * not already found the answer, the result is installed in + * {@code sharedResult}. The {@code compute()} method will check + * {@code sharedResult} before proceeding with computation, so this causes + * the computation to terminate early. + */ + protected void shortCircuit(R result) { + if (result != null) + sharedResult.compareAndSet(null, result); + } + + /** + * Sets a local result for this task. If this task is the root, set the + * shared result instead (if not already set). + */ + @Override + protected void setLocalResult(R localResult) { + if (isRoot()) { + if (localResult != null) + sharedResult.compareAndSet(null, localResult); + } + else + super.setLocalResult(localResult); + } + + /** Retrieves the local result for this task */ + @Override + public R getRawResult() { + return getLocalResult(); + } + + /** + * Retrieves the local result for this task. If this task is the root, + * retrieves the shared result instead. + */ + @Override + public R getLocalResult() { + if (isRoot()) { + R answer = sharedResult.get(); + return (answer == null) ? getEmptyResult() : answer; + } + else + return super.getLocalResult(); + } + + /** Set this node as canceled */ + protected void cancel() { + canceled = true; + } + + /** + * Queries whether this task is canceled. A task is considered canceled if it + * or any of its parents have been canceled. + */ + protected boolean taskCanceled() { + boolean cancel = canceled; + if (!cancel) + for (T parent = getParent(); !cancel && parent != null; parent = parent.getParent()) + cancel = parent.canceled; + return cancel; + } + + /** + * Cancels all tasks which succeed this one in the encounter order. This + * includes canceling all the current task's later siblings, as well as the + * later siblings of all its parents. + */ + protected void cancelLaterNodes() { + T parent = getParent(); + for (T sibling = this.nextSibling; sibling != null; sibling = sibling.nextSibling) + if (!sibling.canceled) + sibling.canceled = true; + // Go up the tree, cancel later siblings of all parents + if (parent != null) + parent.cancelLaterNodes(); + } + + /** + * Returns whether this node is a "leftmost" node -- whether the path from + * the root to this node involves only traversing leftmost child links. For + * a leaf node, this means it is the first leaf node in the encounter order. + */ + protected boolean isLeftmostNode() { + T node = (T) this; + while (node != null) { + T parent = node.getParent(); + if (parent != null && parent.children != node) + return false; + node = parent; + } + return true; + } +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/AbstractTask.java 2013-03-11 17:43:59.000000000 -0400 @@ -0,0 +1,309 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Spliterator; +import java.util.concurrent.CountedCompleter; +import java.util.concurrent.ForkJoinPool; + +/** + * Abstract base class for most fork-join tasks used to implement stream ops. + * Manages splitting logic, tracking of child tasks, and intermediate results. + * Each task is associated with a {@link Spliterator} that describes a portion + * of the input. Tasks may be leaf nodes (which will traverse the elements of + * the {@code Spliterator}) or internal nodes (which split the + * {@code Spliterator} into multiple child tasks). + * + *

This class is based on {@link CountedCompleter}, a form of fork-join task + * where each task has a semaphore-like count of uncompleted children, and the + * task is implicitly completed and notified when its last child completes. + * Internal node tasks will likely override the {@code onCompletion} method from + * {@code CountedCompleter} to merge the results from child tasks into the + * current task's result. + * + *

Splitting and setting up the child task links is done by {@code compute()} + * for internal nodes. At {@code compute()} time for leaf nodes, it is + * guaranteed that the parent's child-related fields (including sibling links + * for the parent's children) will be set up for all children. + * + *

For example, a task that performs a reduce would override {@code doLeaf()} + * to perform a reduction on that leaf node's chunk using the + * {@code Spliterator}, and override {@code onCompletion()} to merge the results + * of the child tasks for internal nodes: + * + *

+ *     protected S doLeaf() {
+ *         spliterator.forEach(...);
+ *         return localReductionResult;
+ *     }
+ *
+ *     public void onCompletion(CountedCompleter caller) {
+ *         if (!isLeaf()) {
+ *             ReduceTask child = children;
+ *             R result = child.getLocalResult();
+ *             child = child.nextSibling;
+ *             for (; child != null; child = child.nextSibling)
+ *                 result = combine(result, child.getLocalResult());
+ *             setLocalResult(result);
+ *         }
+ *     }
+ * 
+ * + * @param Type of elements input to the pipeline + * @param Type of elements output from the pipeline + * @param Type of intermediate result, which may be different from operation + * result type + * @param Type of child and sibling tasks + * @since 1.8 + */ +abstract class AbstractTask> + extends CountedCompleter { + + /** The pipeline helper, common to all tasks in a computation */ + protected final PipelineHelper helper; + + /** + * The spliterator for the portion of the input associated with the subtree + * rooted at this task + */ + protected Spliterator spliterator; + + /** Target leaf size */ + protected final long targetSize; + + /** How many children does this task have? */ + protected int numChildren; + + /** + * This task's first child. Children are stored in a linked list, using + * the {@code nextSibling} field as the link to the next child. + */ + protected T children; + + /** Next sibling of this task */ + protected T nextSibling; + + /** The result of this node, if completed */ + private R localResult; + + /** + * Constructor for root nodes. + */ + protected AbstractTask(PipelineHelper helper) { + super(null); + this.helper = helper; + this.spliterator = helper.sourceSpliterator(); + this.targetSize = suggestTargetSize(spliterator.estimateSize()); + } + + /** + * Constructor for non-root nodes + * + * @param parent This node's parent task + * @param spliterator Spliterator describing the subtree rooted at this + * node, obtained by splitting the parent spliterator + */ + protected AbstractTask(T parent, Spliterator spliterator) { + super(parent); + this.spliterator = spliterator; + this.helper = parent.helper; + this.targetSize = parent.targetSize; + } + + /** + * Constructs a new node of type T whose parent is the receiver; must call + * the AbstractTask(T, Spliterator) constructor with the receiver and the + * provided Spliterator. + */ + protected abstract T makeChild(Spliterator spliterator); + + /** + * Computes the result associated with a leaf node. Will be called by + * {@code compute()} and the result passed to @{code setLocalResult()} + */ + protected abstract R doLeaf(); + + /** Suggests a target leaf size based on the initial size estimate */ + public static long suggestTargetSize(long sizeEstimate) { + long est = sizeEstimate / (ForkJoinPool.getCommonPoolParallelism() << 3); + return est > 0L ? est : 1L; // slack of 3; at least one + } + + /** + * Suggests whether it is adviseable to split the provided spliterator based + * on target size and other considerations, such as pool state + */ + public static boolean suggestSplit(PipelineHelper helper, + Spliterator spliterator, + long targetSize) { + long remaining = spliterator.estimateSize(); + return (remaining > targetSize); + // @@@ May want to fold in pool characteristics such as surplus task count + } + + /** + * Suggests whether it is adviseable to split this task based on target size + * and other considerations + */ + public boolean suggestSplit() { + return suggestSplit(helper, spliterator, targetSize); + } + + /** Returns the local result, if any */ + @Override + public R getRawResult() { + return localResult; + } + + /** Does nothing; argument must be null, or an exception is thrown */ + @Override + protected void setRawResult(R result) { + if (result != null) + throw new IllegalStateException(); + } + + /** + * Retrieves a result previously stored with {@link #setLocalResult} + */ + protected R getLocalResult() { + return localResult; + } + + /** + * Associates the result with the task, can be retrieved with + * {@link #getLocalResult} + */ + protected void setLocalResult(R localResult) { + this.localResult = localResult; + } + + /** + * Determines if this this task a leaf node. (Only valid after {@link #compute} has been + * called on this node). If the node is not a leaf node, then children will + * be non-null and numChildren will be positive. + */ + protected boolean isLeaf() { + return children == null; + } + + /** + * Determines if this task is a root node + */ + protected boolean isRoot() { + return getParent() == null; + } + + /** + * Returns the parent of this task, or null if this task is the root + */ + @SuppressWarnings("unchecked") + protected T getParent() { + return (T) getCompleter(); + } + + /** + * Decides whether or not to split this task further or compute it directly. + * If computing directly, call {@code doLeaf} and pass the result to + * {@code setRawResult}. If splitting, set up the child-related fields, + * create the child tasks, fork the leftmost (prefix) child task, + * and compute the rightmost (remaining) child task. + * + *

Computing will continue for rightmost tasks while a task + * can be computed as determined by {@link #canCompute()} and that + * task should and can be split into left and right tasks. + */ + @Override + @SuppressWarnings("unchecked") + public final void compute() { + doCompute((T) this); + } + + /** + * Decides whether or not to split a task further or compute it directly. If + * computing directly, call {@code doLeaf} and pass the result to + * {@code setRawResult}. If splitting, set up the child-related fields, + * create the child tasks, fork the leftmost (prefix) child tasks, and + * compute the rightmost (remaining) child tasks. + * + *

+ * Computing will continue for rightmost tasks while a task can be computed + * as determined by {@link #canCompute()} and that task should and can be + * split into left and right tasks. + * + *

+ * The rightmost tasks are computed in a loop rather than recursively to + * avoid potential stack overflows when computing with a right-balanced + * tree, such as that produced when splitting with a {@link Spliterator} + * created from an {@link java.util.Iterator}. + * + * @param task the task to be computed. + */ + private static > void doCompute(T task) { + while (task.canCompute()) { + Spliterator split = null; + if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) { + task.setLocalResult(task.doLeaf()); + task.tryComplete(); + return; + } + else { + // Common case -- binary splits + T leftChild = task.makeChild(split); + T rightChild = task.makeChild(task.spliterator); + task.setPendingCount(1); + task.numChildren = 2; + task.children = leftChild; + leftChild.nextSibling = rightChild; + leftChild.fork(); + task = rightChild; + } + } + } + + /** + * {@inheritDoc} + * Clears spliterator and children fields. Overriders MUST call + * {@code super.onCompletion} as the last thing they do if they want these + * cleared + */ + @Override + public void onCompletion(CountedCompleter caller) { + spliterator = null; + children = null; + } + + /** + * Determines if the task can be computed. + * + * @return true if this task can be computed to either calculate the leaf + * via {@link #doLeaf()} or split, otherwise false if this task + * cannot be computed, for example if this task has been cancelled + * and/or a result for the computation has been found by another + * task. + */ + protected boolean canCompute() { + return true; + } +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/BaseStream.java 2013-03-11 17:43:59.000000000 -0400 @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Iterator; +import java.util.Spliterator; + +/** + * Base interface for stream types such as {@link Stream}, {@link IntStream}, + * etc. Contains methods common to all stream types. + * + * @param Type of stream elements. + * @since 1.8 + */ +interface BaseStream> { + /** + * Returns an iterator for the elements of this stream. This is + * a terminal operation. + * + * @return the element iterator for this stream + */ + Iterator iterator(); + + /** + * Returns a spliterator for the elements of this stream. This is + * a terminal operation. + * + * @return the element spliterator for this stream + */ + Spliterator spliterator(); + + /** + * Returns whether this stream, when executed, will execute in parallel + * + * @return whether this stream will execute in parallel + */ + boolean isParallel(); + + /** + * Returns the composition of stream flags of the stream source and all + * intermediate operations. + * + * @return the composition of stream flags of the stream source and all + * intermediate operations + * @see StreamOpFlag + */ + int getStreamFlags(); + + /** + * Produces a stream which has the same contents as this stream, but is a + * sequential stream. If this stream is already sequential, may return + * itself. This is a + * stateful intermediate operation. + * + * @return a sequential stream + */ + S sequential(); + + /** + * Produces a stream which has the same contents as this stream, but is a + * parallel stream. If this stream is already parallel, may return itself. + * This is a + * stateful intermediate operation. + * + * @return a parallel stream + */ + S parallel(); +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/FindOps.java 2013-03-11 17:44:00.000000000 -0400 @@ -0,0 +1,304 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.*; +import java.util.concurrent.CountedCompleter; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * A factory for creating instances of a short-circuiting {@code TerminalOp} + * that searches for an element in a stream pipeline, and terminates when it + * finds one. The search supports find-first (find the first element in the + * encounter order) and find-any (find any element, may not be the first in + * encounter order.) + * + * @since 1.8 + */ +final class FindOps { + + private FindOps() { } + + /** + * Constructs a {@code TerminalOp} for streams of objects + * + * @param mustFindFirst Whether the {@code TerminalOp} must produce the + * first element in the encounter order + * @param The type of elements of the stream + * @return A {@code TerminalOp} implementing the find operation + */ + public static FindOp> makeRef(boolean mustFindFirst) { + return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(), + Optional::isPresent, FindSink.OfRef::new); + } + + /** + * Constructs a {@code TerminalOp} for streams of ints + * + * @param mustFindFirst Whether the {@code TerminalOp} must produce the + * first element in the encounter order + * @return A {@code TerminalOp} implementing the find operation + */ + public static FindOp makeInt(boolean mustFindFirst) { + return new FindOp<>(mustFindFirst, StreamShape.INT_VALUE, OptionalInt.empty(), + OptionalInt::isPresent, FindSink.OfInt::new); + } + + /** + * Constructs a {@code TerminalOp} for streams of longs + * + * @param mustFindFirst Whether the {@code TerminalOp} must produce the + * first element in the encounter order + * @return A {@code TerminalOp} implementing the find operation + */ + public static FindOp makeLong(boolean mustFindFirst) { + return new FindOp<>(mustFindFirst, StreamShape.LONG_VALUE, OptionalLong.empty(), + OptionalLong::isPresent, FindSink.OfLong::new); + } + + /** + * Constructs a {@code TerminalOp} for streams of doubles + * + * @param mustFindFirst Whether the {@code TerminalOp} must produce the + * first element in the encounter order + * @return A {@code TerminalOp} implementing the find operation + */ + public static FindOp makeDouble(boolean mustFindFirst) { + return new FindOp<>(mustFindFirst, StreamShape.DOUBLE_VALUE, OptionalDouble.empty(), + OptionalDouble::isPresent, FindSink.OfDouble::new); + } + + /** + * A short-circuiting {@code TerminalOp} that searches for an element in a + * stream pipeline, and terminates when it finds one. Implements both + * find-first (find the first element in the encounter order) and find-any + * (find any element, may not be the first in encounter order.) + * + * @param The output type of the stream pipeline + * @param The result type of the find operation, typically an optional + * type + */ + private static final class FindOp implements TerminalOp { + private final StreamShape shape; + final boolean mustFindFirst; + final O emptyValue; + final Predicate presentPredicate; + final Supplier> sinkSupplier; + + /** + * Constructs a {@code FindOp} + * + * @param mustFindFirst If true, must find the first element in + * encounter order, otherwise can find any element + * @param shape Stream shape of elements to search + * @param emptyValue Result value corresponding to "found nothing" + * @param presentPredicate {@code Predicate} on result value + * corresponding to "found something" + * @param sinkSupplier Factory for a {@code TerminalSink} implementing + * the matching functionality + */ + FindOp(boolean mustFindFirst, + StreamShape shape, + O emptyValue, + Predicate presentPredicate, + Supplier> sinkSupplier) { + this.mustFindFirst = mustFindFirst; + this.shape = shape; + this.emptyValue = emptyValue; + this.presentPredicate = presentPredicate; + this.sinkSupplier = sinkSupplier; + } + + @Override + public int getOpFlags() { + return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED); + } + + @Override + public StreamShape inputShape() { + return shape; + } + + @Override + public O evaluateSequential(PipelineHelper helper) { + O result = helper.into(sinkSupplier.get(), helper.sourceSpliterator()).get(); + return result != null ? result : emptyValue; + } + + @Override + public O evaluateParallel(PipelineHelper helper) { + return new FindTask<>(helper, this).invoke(); + } + } + + /** + * Implementation of @{code TerminalSink} that implements the find + * functionality, requesting cancellation when something has been found + * + * @param The type of input element + * @param The result type, typically an optional type + */ + private static abstract class FindSink implements TerminalSink { + boolean hasValue; + T value; + + FindSink() {} // Avoid creation of special accessor + + @Override + public void accept(T value) { + if (!hasValue) { + hasValue = true; + this.value = value; + } + } + + @Override + public boolean cancellationRequested() { + return hasValue; + } + + /** Specialization of {@code FindSink} for reference streams */ + static final class OfRef extends FindSink> { + @Override + public Optional get() { + return hasValue ? Optional.of(value) : null; + } + } + + /** Specialization of {@code FindSink} for int streams */ + static final class OfInt extends FindSink implements Sink.OfInt { + @Override + public void accept(int value) { + // Boxing is OK here, since few values will actually flow into the sink + accept((Integer) value); + } + + @Override + public OptionalInt get() { + return hasValue ? OptionalInt.of(value) : null; + } + } + + /** Specialization of {@code FindSink} for long streams */ + static final class OfLong extends FindSink implements Sink.OfLong { + @Override + public void accept(long value) { + // Boxing is OK here, since few values will actually flow into the sink + accept((Long) value); + } + + @Override + public OptionalLong get() { + return hasValue ? OptionalLong.of(value) : null; + } + } + + /** Specialization of {@code FindSink} for double streams */ + static final class OfDouble extends FindSink implements Sink.OfDouble { + @Override + public void accept(double value) { + // Boxing is OK here, since few values will actually flow into the sink + accept((Double) value); + } + + @Override + public OptionalDouble get() { + return hasValue ? OptionalDouble.of(value) : null; + } + } + } + + /** + * {@code ForkJoinTask} implementing parallel short-circuiting search + * @param Input element type to the stream pipeline + * @param Output element type from the stream pipeline + * @param Result type from the find operation + */ + private static final class FindTask extends AbstractShortCircuitTask> { + private final FindOp op; + + FindTask(PipelineHelper helper, FindOp op) { + super(helper); + this.op = op; + } + + FindTask(FindTask parent, Spliterator spliterator) { + super(parent, spliterator); + this.op = parent.op; + } + + @Override + protected FindTask makeChild(Spliterator spliterator) { + return new FindTask<>(this, spliterator); + } + + @Override + protected O getEmptyResult() { + return op.emptyValue; + } + + private void foundResult(O answer) { + if (isLeftmostNode()) + shortCircuit(answer); + else + cancelLaterNodes(); + } + + @Override + protected O doLeaf() { + O result = helper.into(op.sinkSupplier.get(), spliterator).get(); + if (!op.mustFindFirst) { + if (result != null) + shortCircuit(result); + return null; + } + else { + if (result != null) { + foundResult(result); + return result; + } + else + return null; + } + } + + @Override + public void onCompletion(CountedCompleter caller) { + if (op.mustFindFirst) { + for (FindTask child = children; child != null; child = child.nextSibling) { + O result = child.getLocalResult(); + if (result != null && op.presentPredicate.test(result)) { + setLocalResult(result); + foundResult(result); + break; + } + } + } + super.onCompletion(caller); + } + } +} + --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/ForEachOps.java 2013-03-11 17:44:00.000000000 -0400 @@ -0,0 +1,469 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Spliterator; +import java.util.concurrent.CountedCompleter; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.function.DoubleConsumer; +import java.util.function.IntConsumer; +import java.util.function.LongConsumer; +import java.util.Objects; + +/** + * A factory for creating instances of {@code TerminalOp) that implement + * {@code forEach} or {@code forEachUntil} traversal over elements of a stream. + * + *

{@code forEach} traverses all elements of a stream and sends those + * elements to a {@code Consumer}. + * + *

{@code forEachUntil} traverses elements of a stream and sends those + * elements to to a {@code Consumer} until a {@code BooleanProvider} indicates + * that a termination criteria has occurred and no more elements should be + * traversed and sent. + * + *

For either type of traversal elements will be sent to the {@code Consumer} + * in whatever thread and whatever order they become available, independent of + * the stream's encounter order. + * + *

Exceptions occurring as a result of sending an element to the + * {@code Consumer} will be relayed to the caller and traversal will be + * prematurely terminated. + * + * @apiNote + * The termination condition is an externally-imposed criteria, and is useful + * for problems like "find the best answer that can be found in ten seconds", + * "search until you find an answer at least as good as X" , etc. It is not + * designed to provide content-based cancellation, such as "process elements + * until you find one which matches a given criteria." + * + *

There is no guarantee that additional elements will not be traversed and + * sent after the termination criteria has transpired. For example, a + * termination criteria of {@code resultSet.size() > TARGET} does not guarantee + * that the result set will receive no more than {@code TARGET} elements, but + * instead that {@code forEachUntil} traversal will attempt to stop after + * {@code TARGET} elements have been placed in the {@code resultSet}. + * + * @since 1.8 + */ +final class ForEachOps { + + private ForEachOps() { } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEach} + * traversal, which traverses all elements of a {@code Stream} and sends + * those elements the provided {@code Consumer}. + * + * @param consumer The {@code Consumer} that receives all elements of a + * stream + * @param The type of the stream elements + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeRef(Consumer consumer) { + Objects.requireNonNull(consumer); + return new ForEachOp.OfRef<>(consumer); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEachUntil} + * traversal, which traverses all elements of a {@code Stream} and sends + * those elements to the provided {@code Consumer} until the specified + * {@code BooleanProvider} indicates that a termination criteria has + * occurred and no more elements should be traversed and sent. + * + * @param consumer The {@code Consumer} that receives elements of a stream + * @param until A {@code BooleanSupplier} that indicates whether the + * termination criteria has occurred. Once it returns {@code true} + * the first time, it must continue to return {@code true} for all + * future invocations + * @param The type of the stream elements + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeRef(Consumer consumer, BooleanSupplier until) { + Objects.requireNonNull(consumer); + Objects.requireNonNull(until); + return new ForEachOp.OfRef.Until<>(consumer, until); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEach} + * traversal, which traverses all {@code int} elements of a + * {@code IntStream} and sends those elements the provided + * {@code IntConsumer}. + * + * @param consumer The {@code IntConsumer} that receives all elements of a + * stream + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeInt(IntConsumer consumer) { + Objects.requireNonNull(consumer); + return new ForEachOp.OfInt(consumer); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEachUntil} + * traversal, which traverses all {@code int} elements of a + * {@code IntStream} and sends those elements to the provided + * {@code IntConsumer} until the specified {@code BooleanProvider} indicates + * that a termination criteria has occurred and no more elements should be + * traversed and sent. + * + * @param consumer The {@code IntConsumer} that receives elements of a + * stream + * @param until A {@code BooleanSupplier} that indicates whether the + * termination criteria has occurred. Once it returns {@code true} + * the first time, it must continue to return {@code true} for all + * future invocations + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeInt(IntConsumer consumer, BooleanSupplier until) { + Objects.requireNonNull(consumer); + Objects.requireNonNull(until); + return new ForEachOp.OfInt.Until(consumer, until); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEach} + * traversal, which traverses all {@code long} elements of a + * {@code LongStream} and sends those elements the provided + * {@code LongConsumer}. + * + * @param consumer The {@code LongConsumer} that receives all elements of a + * stream + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeLong(LongConsumer consumer) { + Objects.requireNonNull(consumer); + return new ForEachOp.OfLong(consumer); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEachUntil} + * traversal, which traverses all {@code long} elements of a + * {@code LongStream} and sends those elements to the provided + * {@code LongConsumer} until the specified {@code BooleanProvider} + * indicates that a termination criteria has occurred and no more elements + * should be traversed and sent. + * + * @param consumer The {@code LongConsumer} that receives elements of a + * stream + * @param until A {@code BooleanSupplier} that indicates whether the + * termination criteria has occurred. Once it returns {@code true} + * the first time, it must continue to return {@code true} for all + * future invocations + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeLong(LongConsumer consumer, BooleanSupplier until) { + Objects.requireNonNull(consumer); + Objects.requireNonNull(until); + return new ForEachOp.OfLong.Until(consumer, until); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEach} + * traversal, which traverses all {@code double} elements of a + * {@code DoubleStream} and sends those elements the provided + * {@code DoubleConsumer}. + * + * @param consumer The {@code DoubleConsumer} that receives all elements of + * a stream + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeDouble(DoubleConsumer consumer) { + Objects.requireNonNull(consumer); + return new ForEachOp.OfDouble(consumer); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEachUntil} + * traversal, which traverses all {@code double} elements of a + * {@code DoubleStream} and sends those elements to the provided + * {@code DoubleConsumer} until the specified {@code BooleanProvider} + * indicates that a termination criteria has occurred and no more elements + * should be traversed and sent. + * + * @param consumer The {@code DoubleConsumer} that receives elements of a + * stream + * @param until A {@code BooleanSupplier} that indicates whether the + * termination criteria has occurred. Once it returns {@code true} + * the first time, it must continue to return {@code true} for all + * future invocations + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeDouble(DoubleConsumer consumer, BooleanSupplier until) { + Objects.requireNonNull(consumer); + Objects.requireNonNull(until); + return new ForEachOp.OfDouble.Until(consumer, until); + } + + /** + * A {@code TerminalOp} that evaluates a stream pipeline and sends the + * output to itself as a {@code TerminalSink}. Elements will be sent in + * whatever thread and whatever order they become available, independent of + * the stream's encounter order. + * + *

This terminal operation is stateless. For parallel evaluation each + * leaf instance of a {@code ForEachTask} will send elements to the same + * {@code TerminalSink} reference that is an instance of this class. State + * management, if any, is deferred to the consumer, held by the concrete + * sub-classes, that is the final receiver elements. + * + * @param The output type of the stream pipeline + */ + private static abstract class ForEachOp implements TerminalOp, TerminalSink { + + // TerminalOp + + @Override + public int getOpFlags() { + return StreamOpFlag.NOT_ORDERED; + } + + @Override + public Void evaluateSequential(PipelineHelper helper) { + return helper.into(this, helper.sourceSpliterator()).get(); + } + + @Override + public Void evaluateParallel(PipelineHelper helper) { + new ForEachTask<>(helper, helper.wrapSink(this)).invoke(); + return null; + } + + // TerminalSink + + @Override + public Void get() { + return null; + } + + // Implementations + + /** {@code forEach} with {@code Stream} */ + private static class OfRef extends ForEachOp { + final Consumer consumer; + + OfRef(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public void accept(T t) { + consumer.accept(t); + } + + /** {@code forEachUntil} with {@code Stream} */ + static final class Until extends ForEachOp.OfRef { + final BooleanSupplier until; + + Until(Consumer consumer, BooleanSupplier until) { + super(consumer); + this.until = until; + } + + @Override + public int getOpFlags() { + return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT; + } + + @Override + public boolean cancellationRequested() { + return until.getAsBoolean(); + } + } + } + + /** {@code forEach} with {@code IntStream} */ + private static class OfInt extends ForEachOp implements Sink.OfInt { + final IntConsumer consumer; + + OfInt(IntConsumer consumer) { + this.consumer = consumer; + } + + @Override + public StreamShape inputShape() { + return StreamShape.INT_VALUE; + } + + @Override + public void accept(int t) { + consumer.accept(t); + } + + /** {@code forEachUntil} with {@code IntStream} */ + static final class Until extends ForEachOp.OfInt { + final BooleanSupplier until; + + Until(IntConsumer consumer, BooleanSupplier until) { + super(consumer); + this.until = until; + } + + @Override + public int getOpFlags() { + return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT; + } + + @Override + public boolean cancellationRequested() { + return until.getAsBoolean(); + } + } + } + + /** {@code forEach} with {@code LongStream} */ + private static class OfLong extends ForEachOp implements Sink.OfLong { + final LongConsumer consumer; + + OfLong(LongConsumer consumer) { + this.consumer = consumer; + } + + @Override + public StreamShape inputShape() { + return StreamShape.LONG_VALUE; + } + + @Override + public void accept(long t) { + consumer.accept(t); + } + + /** {@code forEachUntil} with {@code LongStream} */ + private static final class Until extends ForEachOp.OfLong { + final BooleanSupplier until; + + Until(LongConsumer consumer, BooleanSupplier until) { + super(consumer); + this.until = until; + } + + @Override + public int getOpFlags() { + return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT; + } + + @Override + public boolean cancellationRequested() { + return until.getAsBoolean(); + } + } + } + + /** {@code forEach} with {@code DoubleStream} */ + private static class OfDouble extends ForEachOp implements Sink.OfDouble { + final DoubleConsumer consumer; + + OfDouble(DoubleConsumer consumer) { + this.consumer = consumer; + } + + @Override + public StreamShape inputShape() { + return StreamShape.DOUBLE_VALUE; + } + + @Override + public void accept(double t) { + consumer.accept(t); + } + + /** {@code forEachUntil} with {@code DoubleStream} */ + private static final class Until extends ForEachOp.OfDouble { + final BooleanSupplier until; + + Until(DoubleConsumer consumer, BooleanSupplier until) { + super(consumer); + this.until = until; + } + + @Override + public int getOpFlags() { + return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT; + } + + @Override + public boolean cancellationRequested() { + return until.getAsBoolean(); + } + } + } + } + + /** A {@code ForkJoinTask} for performing a parallel for-each operation */ + private static class ForEachTask extends CountedCompleter { + private Spliterator spliterator; + private final Sink sink; + private final PipelineHelper helper; + private final long targetSize; + + ForEachTask(PipelineHelper helper, Sink sink) { + super(null); + this.spliterator = helper.sourceSpliterator(); + this.sink = sink; + this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); + this.helper = helper; + } + + ForEachTask(ForEachTask parent, Spliterator spliterator) { + super(parent); + this.spliterator = spliterator; + this.sink = parent.sink; + this.targetSize = parent.targetSize; + this.helper = parent.helper; + } + + public void compute() { + doCompute(this); + } + + private static void doCompute(ForEachTask task) { + boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(task.helper.getStreamAndOpFlags()); + while (true) { + if (isShortCircuit && task.sink.cancellationRequested()) { + task.propagateCompletion(); + task.spliterator = null; + return; + } + + Spliterator split = null; + if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize) + || (split = task.spliterator.trySplit()) == null) { + task.helper.intoWrapped(task.sink, task.spliterator); + task.propagateCompletion(); + task.spliterator = null; + return; + } + else { + task.addToPendingCount(1); + new ForEachTask<>(task, split).fork(); + } + } + } + } +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/IntermediateOp.java 2013-03-11 17:44:00.000000000 -0400 @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +/** + * An operation in a stream pipeline that takes a stream as input and produces + * a stream, possibly of a different type, as output. An intermediate operation + * has an input type and an output type (and, an associated input shape and + * output shape). An intermediate operation also has a set of operation + * flags that describes how it transforms characteristics of the stream + * (such as sortedness or size; see {@link StreamOpFlag}). + * + *

Intermediate operations are implemented in terms of sink transforms + * ; given a {@code Sink} for the output type of the operation, produce a + * {@code Sink} for the input type of the operation, which, when fed with + * values, has the effect of implementing the desired operation on the input + * values and feeding them to the output sink. + * + *

Some intermediate operations are stateful. This means that the + * sinks they produce as a result of the above wrapping may maintain state from + * processing earlier elements. Stateful intermediate operations must implement + * the {@link StatefulOp} interface. Statefulness has an effect on how the + * operation can be parallelized. Stateless operations parallelize trivially + * because they are homomorphisms under concatenation: + * + *

+ *     statelessOp(a || b) = statelessOp(a) || statelessOp(b)
+ * 
+ * + * where {@code ||} denotes concatenation. Stateful operations may still be + * parallelizable, but are not amenable to the automatic parallelization of + * stateless operations. Accordingly, a stateful operation must provide its own + * parallel execution implementation + * ({@link StatefulOp#evaluateParallel(PipelineHelper)}). + * + * @apiNote + * As an example, consider the stream pipeline: + *
+ *     int oldestBob = people.stream().
+ *                            filter(p -> p.getFirstName.equals("Bob")).
+ *                            map(p -> p.getAge()).max();
+ * 
+ * + *

This pipeline has two intermediate operations, filter and map. The + * filtering operation has input and output types of {@code Person} (with input + * and output shape of {@code REFERENCE}), and the mapping operation has an + * input type of {@code Person} and an output type of {@code Integer} (with + * shape {@code INT_VALUE}.) When we construct a sink chain, the mapping + * operation will be asked to transform a {@code Sink.OfInt} which computes the + * maximum value into a {@code Sink} which accepts {@code Person} objects, and + * whose behavior is to take the supplied {@code Person}, call {@code getAge()} + * on it, and pass the resulting value to the downstream sink. This sink + * transform might be implement as: + * + *

+ *     new Sink.ChainedReference(sink) {
+ *         public void accept(U u) {
+ *             downstream.accept(mappingFunction.applyAsInt(u));
+ *         }
+ *     }
+ * 
+ * + * @param Type of input elements to the operation + * @param Type of output elements to the operation + * @see TerminalOp + * @see StatefulOp + * @since 1.8 + */ +interface IntermediateOp { + + /** + * Gets the shape of the input type of this operation + * + * @implSpec The default returns {@code StreamShape.REFERENCE} + * @return Shape of the input type of this operation + */ + default StreamShape inputShape() { return StreamShape.REFERENCE; } + + /** + * Gets the shape of the output type of this operation + * + * @implSpec The default returns {@code StreamShape.REFERENCE} + * @return Shape of the output type of this operation + */ + default StreamShape outputShape() { return StreamShape.REFERENCE; } + + /** + * Gets the operation flags of this operation. + * + * @implSpec The default returns {@code 0} + * @return a bitmap describing the operation flags of this operation + * @see StreamOpFlag + */ + default int getOpFlags() { return 0; } + + /** + * Returns whether this operation is stateful or not. If it is stateful, + * then the method {@link #evaluateParallel(PipelineHelper)} must be + * overridden. + * + * @implSpec The default implementation returns {@code false}. + * @return {@code true} if this operation is stateful + */ + default boolean isStateful() { return false; } + + /** + * Accepts a {@code Sink} which will receive the results of this operation, + * and return a {@code Sink} which accepts elements of the input type of + * this operation and which performs the operation, passing the results to + * the provided {@code Sink}. + * + *

The implementation may use the {@code flags} parameter to optimize the + * sink wrapping. For example, if the input is already {@code DISTINCT}, + * the implementation for the {@code Stream#distinct()} method could just + * return the sink it was passed. + * + * @param flags The combined stream and operation flags up to, but not + * including, this operation. + * @param sink elements will be sent to this sink after the processing. + * @return a sink which will accept elements and perform the operation upon + * each element, passing the results (if any) to the provided + * {@code Sink}. + */ + Sink wrapSink(int flags, Sink sink); + + /** + * Performs a parallel evaluation of the operation using the specified + * {@code PipelineHelper}, which describes the stream source and upstream + * intermediate operations. Only called on stateful operations. If + * {@link #isStateful()} returns true then implementations must override the + * default implementation. + * + * @implSpec The default implementation throws an + * {@link UnsupportedOperationException} + * + * @param helper the pipeline helper + * @param the type of elements in the pipeline source + * @return a {@code Node} describing the result of the evaluation + */ + default Node evaluateParallel(PipelineHelper helper) { + throw new UnsupportedOperationException("Parallel evaluation is not supported"); + } +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/MatchOps.java 2013-03-11 17:44:00.000000000 -0400 @@ -0,0 +1,323 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Objects; +import java.util.Spliterator; +import java.util.function.DoublePredicate; +import java.util.function.IntPredicate; +import java.util.function.LongPredicate; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * A factory for creating instances of a short-circuiting {@code TerminalOp} + * that evaluates a predicate on the elements of a stream and determines whether + * all, any or none of those elements match the predicate. + * + * @since 1.8 + */ +final class MatchOps { + + private MatchOps() { } + + /** + * Enum describing quantified match options -- all match, any match, none + * match + */ + enum MatchKind { + /** Do all elements match the predicate? */ + ANY(true, true), + + /** Do any elements match the predicate? */ + ALL(false, false), + + /** Do no elements match the predicate? */ + NONE(true, false); + + private final boolean stopOnPredicateMatches; + private final boolean shortCircuitResult; + + private MatchKind(boolean stopOnPredicateMatches, boolean shortCircuitResult) { + this.stopOnPredicateMatches = stopOnPredicateMatches; + this.shortCircuitResult = shortCircuitResult; + } + } + + /** + * Constructs a {@code TerminalOp} for the given predicate and quantified + * match criteria + * + * @param predicate The {@code Predicate} to apply to stream elements + * @param matchKind The kind of quantified match (all, any, none) + * @param The type of stream elements + * @return A {@code TerminalOp} implementing the desired quantified match + * criteria + */ + public static TerminalOp makeRef(Predicate predicate, MatchKind matchKind) { + Objects.requireNonNull(predicate); + Objects.requireNonNull(matchKind); + class MatchSink extends BooleanTerminalSink { + MatchSink() { + super(matchKind); + } + + @Override + public void accept(T t) { + // @@@ assert !stop when SortedOp supports short-circuit on Sink.end + // for sequential operations + if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) { + stop = true; + value = matchKind.shortCircuitResult; + } + } + } + + // @@@ Change to return MatchSink::new when compiler and runtime bugs are fixed + Supplier> s = new Supplier>() { + @Override + public BooleanTerminalSink get() {return new MatchSink();} + }; + return new MatchOp<>(StreamShape.REFERENCE, matchKind, s); + } + + /** + * Constructs a {@code TerminalOp} for the given predicate and quantified + * match criteria for an {@code IntStream} + * + * @param predicate The {@code Predicate} to apply to stream elements + * @param matchKind The kind of quantified match (all, any, none) + * @return A {@code TerminalOp} implementing the desired quantified match + * criteria + */ + public static TerminalOp makeInt(IntPredicate predicate, MatchKind matchKind) { + Objects.requireNonNull(predicate); + Objects.requireNonNull(matchKind); + class MatchSink extends BooleanTerminalSink implements Sink.OfInt { + MatchSink() { + super(matchKind); + } + + @Override + public void accept(int t) { + if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) { + stop = true; + value = matchKind.shortCircuitResult; + } + } + } + + Supplier> s = new Supplier>() { + @Override + public BooleanTerminalSink get() {return new MatchSink();} + }; + return new MatchOp<>(StreamShape.INT_VALUE, matchKind, s); + } + + /** + * Constructs a {@code TerminalOp} for the given predicate and quantified + * match criteria for a {@code LongStream} + * + * @param predicate The {@code Predicate} to apply to stream elements + * @param matchKind The kind of quantified match (all, any, none) + * @return A {@code TerminalOp} implementing the desired quantified match + * criteria + */ + public static TerminalOp makeLong(LongPredicate predicate, MatchKind matchKind) { + Objects.requireNonNull(predicate); + Objects.requireNonNull(matchKind); + class MatchSink extends BooleanTerminalSink implements Sink.OfLong { + + MatchSink() { + super(matchKind); + } + + @Override + public void accept(long t) { + if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) { + stop = true; + value = matchKind.shortCircuitResult; + } + } + } + + Supplier> s = new Supplier>() { + @Override + public BooleanTerminalSink get() {return new MatchSink();} + }; + return new MatchOp<>(StreamShape.LONG_VALUE, matchKind, s); + } + + /** + * Constructs a {@code TerminalOp} for the given predicate and quantified + * match criteria for a {@code DoubleStream} + * + * @param predicate The {@code Predicate} to apply to stream elements + * @param matchKind The kind of quantified match (all, any, none) + * @return A {@code TerminalOp} implementing the desired quantified match + * criteria + */ + public static TerminalOp makeDouble(DoublePredicate predicate, MatchKind matchKind) { + Objects.requireNonNull(predicate); + Objects.requireNonNull(matchKind); + class MatchSink extends BooleanTerminalSink implements Sink.OfDouble { + + MatchSink() { + super(matchKind); + } + + @Override + public void accept(double t) { + if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) { + stop = true; + value = matchKind.shortCircuitResult; + } + } + } + + Supplier> s = new Supplier>() { + @Override + public BooleanTerminalSink get() {return new MatchSink();} + }; + return new MatchOp<>(StreamShape.DOUBLE_VALUE, matchKind, s); + } + + /** + * A short-circuiting {@code TerminalOp} that evaluates a predicate on the + * elements of a stream and determines whether all, any or none of those + * elements match the predicate. + * + * @param The output type of the stream pipeline + */ + private static final class MatchOp implements TerminalOp { + private final StreamShape inputShape; + final MatchKind matchKind; + final Supplier> sinkSupplier; + + /** + * Constructs a {@code MatchOp} + * + * @param shape The output shape of the stream pipeline + * @param matchKind The kind of quantified match (all, any, none) + * @param sinkSupplier {@code Supplier} for a {@code Sink} of the + * appropriate shape which implements the matching operation + */ + MatchOp(StreamShape shape, MatchKind matchKind, Supplier> sinkSupplier) { + this.inputShape = shape; + this.matchKind = matchKind; + this.sinkSupplier = sinkSupplier; + } + + @Override + public int getOpFlags() { + return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED; + } + + @Override + public StreamShape inputShape() { + return inputShape; + } + + @Override + public Boolean evaluateSequential(PipelineHelper helper) { + return helper.into(sinkSupplier.get(), helper.sourceSpliterator()).getAndClearState(); + } + + @Override + public Boolean evaluateParallel(PipelineHelper helper) { + // Approach for parallel implementation: + // - Decompose as per usual + // - run match on leaf chunks, call result "b" + // - if b == matchKind.shortCircuitOn, complete early and return b + // - else if we complete normally, return !shortCircuitOn + + return new MatchTask<>(this, helper).invoke(); + } + } + + /** + * Boolean specific terminal sink to avoid the boxing costs when returning + * results. Subclasses implement the shape-specific functionality. + * + * @param The output type of the stream pipeline + */ + private static abstract class BooleanTerminalSink implements Sink { + boolean stop; + boolean value; + + BooleanTerminalSink(MatchKind matchKind) { + value = !matchKind.shortCircuitResult; + } + + public boolean getAndClearState() { + return value; + } + + @Override + public boolean cancellationRequested() { + return stop; + } + } + + /** + * ForkJoinTask implementation to implement a parallel short-circuiting + * quantified match + * + * @param The type of source elements for the pipeline + * @param The type of output elements for the pipeline + */ + private static final class MatchTask extends AbstractShortCircuitTask> { + private final MatchOp op; + + MatchTask(MatchOp op, PipelineHelper helper) { + super(helper); + this.op = op; + } + + MatchTask(MatchTask parent, Spliterator spliterator) { + super(parent, spliterator); + this.op = parent.op; + } + + @Override + protected MatchTask makeChild(Spliterator spliterator) { + return new MatchTask<>(this, spliterator); + } + + @Override + protected Boolean doLeaf() { + boolean b = helper.into(op.sinkSupplier.get(), spliterator).getAndClearState(); + if (b == op.matchKind.shortCircuitResult) + shortCircuit(b); + return null; + } + + @Override + protected Boolean getEmptyResult() { + return !op.matchKind.shortCircuitResult; + } + } +} + --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/Node.java 2013-03-11 17:44:01.000000000 -0400 @@ -0,0 +1,523 @@ +/* + * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.function.DoubleConsumer; +import java.util.function.IntConsumer; +import java.util.function.IntFunction; +import java.util.function.LongConsumer; + +/** + * An immutable container for describing an ordered sequence of elements of some + * type {@code T}. + * + *

A {@code Node} contains a fixed number of elements, which can be accessed + * via the {@link #count}, {@link #spliterator}, {@link #forEach}, + * {@link #asArray}, or {@link #copyInto} methods. A {@code Node} may have zero + * or more child {@code Node}s; if it has no children (accessed via + * {@link #getChildCount} and {@link #getChild(int)}, it is considered flat + * or a leaf; if it has children, it is considered an + * internal node. The size of an internal node is the sum of sizes of + * its children. + * + * @apiNote + *

A {@code Node} typically does not store the elements directly, but instead + * mediates access to one or more existing (effectively immutable) data + * structures such as a {@code Collection}, array, or a set of other + * {@code Node}s. {@code Node}s directly representing existing data structures + * are considered flat (have no children); commonly {@code Node}s are + * formed into a tree whose shape corresponds to the computation tree that + * produced the elements that are contained in the leaf nodes. The use of + * {@code Node} within the stream framework is largely to avoid copying data + * unnecessarily during parallel operations. + * + * @param the type of elements. + * @since 1.8 + */ +interface Node { + + /** + * Returns a {@link Spliterator} describing the elements contained in this + * {@code Node}. + * + * @return a {@code Spliterator describing the elements contained in this + * {@code Node}. + */ + Spliterator spliterator(); + + /** + * Traverses the elements of this node, and invoke the provided + * {@code Consumer} with each element. + * + * @param consumer A {@code Consumer} that is to be invoked with each + * element in this {@code Node} + */ + void forEach(Consumer consumer); + + /** + * Returns the number of child nodes of this node. + * + * @implSpec The default implementation returns zero + * @return the number of child nodes + */ + default int getChildCount() { + return 0; + } + + /** + * Retrieves the child {@code Node} at a given index. + * + * @implSpec The default implementation throws + * {@code IndexOutOfBoundsException} + * @param i the index to the child node + * @return the child node + * @throws IndexOutOfBoundsException if the index is less than 0 or greater + * than or equal to the + * number of child nodes. + */ + default Node getChild(int i) { + throw new IndexOutOfBoundsException(); + } + + /** + * Views this node as an array. + * + *

Depending on the underlying implementation this may return a reference + * to an internal array rather than a copy. It is the callers + * responsibility to decide if either this node or the array is utilized as + * the primary reference for the data.

+ * + * @return an array containing the contents of this {@code Node} + */ + T[] asArray(IntFunction generator); + + /** + * Copies the content of this {@code Node} into an array, starting at a given + * offset into the array. It is the caller's responsibility to ensure there + * is sufficient room in the array. + * + * @param array the array into which to copy the contents of this + * {@code Node} + * @param offset the starting offset within the array + * @throws IndexOutOfBoundsException if copying would cause access of data + * outside array bounds + * @throws NullPointerException if {@code array} is {@code null} + */ + void copyInto(T[] array, int offset); + + /** + * Gets the {@code StreamShape} associated with this {@code Node}. + * + * @implSpec The default in {@code Node} returns + * {@code StreamShape.REFERENCE} + * @return the stream shape associated with this node + */ + default StreamShape getShape() { + return StreamShape.REFERENCE; + } + + /** + * Returns the number of elements contained in this node + * + * @return the number of elements contained in this node + */ + long count(); + + /** + * A mutable builder for a {@code Node} that implements {@link Sink}, which + * builds a flat node containing the elements that have been pushed to it. + * + */ + interface Builder extends Sink { + + /** + * Builds the node. Should be called after all elements have been pushed + * and signalled with an invocation of {@link Sink#end()}. + * + * @return the resulting {@code Node} + */ + Node build(); + + /** Specialized @{code Node.Builder} for int elements */ + interface OfInt extends Node.Builder, Sink.OfInt { + @Override + Node.OfInt build(); + } + + /** Specialized @{code Node.Builder} for long elements */ + interface OfLong extends Node.Builder, Sink.OfLong { + @Override + Node.OfLong build(); + } + + /** Specialized @{code Node.Builder} for double elements */ + interface OfDouble extends Node.Builder, Sink.OfDouble { + @Override + Node.OfDouble build(); + } + } + + /** Specialized {@code Node} for int elements */ + interface OfInt extends Node { + + /** + * {@inheritDoc} + * @return A {@link Spliterator.OfInt} describing the elements of this + * node + */ + @Override + Spliterator.OfInt spliterator(); + + /** + * {@inheritDoc} + * @param consumer A {@code Consumer} that is to be invoked with each + * element in this {@code Node}. If this is an + * {@code IntConsumer}, it is cast to {@code IntConsumer} so the + * elements may be processed without boxing. + */ + @Override + default void forEach(Consumer consumer) { + if (consumer instanceof IntConsumer) { + forEach((IntConsumer) consumer); + } + else { + if (Tripwire.ENABLED) + Tripwire.trip(getClass(), "{0} calling Node.OfInt.forEach(Consumer)"); + spliterator().forEach(consumer); + } + } + + /** + * Traverses the elements of this node, and invoke the provided + * {@code IntConsumer} with each element. + * + * @param consumer A {@code IntConsumer} that is to be invoked with each + * element in this {@code Node} + */ + void forEach(IntConsumer consumer); + + /** + * {@inheritDoc} + * @implSpec the default implementation invokes the generator to create + * an instance of an Integer[] array with a length of {@link #count()} + * and then invokes {@link #copyInto(Integer[], int)} with that + * Integer[] array at an offset of 0. This is not efficient and it is + * recommended to invoke {@link #asIntArray()}. + */ + @Override + default Integer[] asArray(IntFunction generator) { + Integer[] boxed = generator.apply((int) count()); + copyInto(boxed, 0); + return boxed; + } + + /** + * {@inheritDoc} + * @implSpec the default implementation invokes {@link #asIntArray()} to + * obtain an int[] array then and copies the elements from that int[] + * array into the boxed Integer[] array. This is not efficient and it + * is recommended to invoke {@link #copyInto(int[], int)}. + */ + @Override + default void copyInto(Integer[] boxed, int offset) { + if (Tripwire.ENABLED) + Tripwire.trip(getClass(), "{0} calling Node.OfInt.copyInto(Integer[], int)"); + + int[] array = asIntArray(); + for (int i = 0; i < array.length; i++) { + boxed[offset + i] = array[i]; + } + } + + @Override + default Node.OfInt getChild(int i) { + throw new IndexOutOfBoundsException(); + } + + /** + * Views this node as an int[] array. + * + *

Depending on the underlying implementation this may return a + * reference to an internal array rather than a copy. It is the callers + * responsibility to decide if either this node or the array is utilized + * as the primary reference for the data.

+ * + * @return an array containing the contents of this {@code Node} + */ + int[] asIntArray(); + + /** + * Copies the content of this {@code Node} into an int[] array, starting + * at a given offset into the array. It is the caller's responsibility + * to ensure there is sufficient room in the array. + * + * @param array the array into which to copy the contents of this {@code Node} + * @param offset the starting offset within the array + * @throws IndexOutOfBoundsException if copying would cause access of + * data outside array bounds + * @throws NullPointerException if {@code array} is {@code null} + */ + void copyInto(int[] array, int offset); + + /** + * {@inheritDoc} + * @implSpec The default in {@code Node.OfInt} returns + * {@code StreamShape.INT_VALUE} + */ + default StreamShape getShape() { + return StreamShape.INT_VALUE; + } + } + + /** Specialized {@code Node} for long elements */ + interface OfLong extends Node { + + /** + * {@inheritDoc} + * @return A {@link Spliterator.OfLong} describing the elements of this + * node + */ + @Override + Spliterator.OfLong spliterator(); + + /** + * {@inheritDoc} + * @param consumer A {@code Consumer} that is to be invoked with each + * element in this {@code Node}. If this is an + * {@code LongConsumer}, it is cast to {@code LongConsumer} so + * the elements may be processed without boxing. + */ + @Override + default void forEach(Consumer consumer) { + if (consumer instanceof LongConsumer) { + forEach((LongConsumer) consumer); + } + else { + if (Tripwire.ENABLED) + Tripwire.trip(getClass(), "{0} calling Node.OfLong.forEach(Consumer)"); + spliterator().forEach(consumer); + } + } + + /** + * Traverses the elements of this node, and invoke the provided + * {@code LongConsumer} with each element. + * + * @param consumer A {@code LongConsumer} that is to be invoked with + * each element in this {@code Node} + */ + void forEach(LongConsumer consumer); + + /** + * {@inheritDoc} + * @implSpec the default implementation invokes the generator to create + * an instance of a Long[] array with a length of {@link #count()} and + * then invokes {@link #copyInto(Long[], int)} with that Long[] array at + * an offset of 0. This is not efficient and it is recommended to + * invoke {@link #asLongArray()}. + */ + @Override + default Long[] asArray(IntFunction generator) { + Long[] boxed = generator.apply((int) count()); + copyInto(boxed, 0); + return boxed; + } + + /** + * {@inheritDoc} + * @implSpec the default implementation invokes {@link #asLongArray()} + * to obtain a long[] array then and copies the elements from that + * long[] array into the boxed Long[] array. This is not efficient and + * it is recommended to invoke {@link #copyInto(long[], int)}. + */ + @Override + default void copyInto(Long[] boxed, int offset) { + if (Tripwire.ENABLED) + Tripwire.trip(getClass(), "{0} calling Node.OfInt.copyInto(Long[], int)"); + + long[] array = asLongArray(); + for (int i = 0; i < array.length; i++) { + boxed[offset + i] = array[i]; + } + } + + @Override + default Node.OfLong getChild(int i) { + throw new IndexOutOfBoundsException(); + } + + /** + * Views this node as a long[] array. + * + *

Depending on the underlying implementation this may return a + * reference to an internal array rather than a copy. It is the callers + * responsibility to decide if either this node or the array is utilized + * as the primary reference for the data.

+ * + * @return an array containing the contents of this {@code Node} + */ + long[] asLongArray(); + + /** + * Copies the content of this {@code Node} into a long[] array, starting + * at a given offset into the array. It is the caller's responsibility + * to ensure there is sufficient room in the array. + * + * @param array the array into which to copy the contents of this + * {@code Node} + * @param offset the starting offset within the array + * @throws IndexOutOfBoundsException if copying would cause access of + * data outside array bounds + * @throws NullPointerException if {@code array} is {@code null} + */ + void copyInto(long[] array, int offset); + + /** + * {@inheritDoc} + * @implSpec The default in {@code Node.OfLong} returns + * {@code StreamShape.LONG_VALUE} + */ + default StreamShape getShape() { + return StreamShape.LONG_VALUE; + } + } + + /** Specialized {@code Node} for double elements */ + interface OfDouble extends Node { + + /** + * {@inheritDoc} + * @return A {@link Spliterator.OfDouble} describing the elements of + * this node + */ + @Override + Spliterator.OfDouble spliterator(); + + /** + * {@inheritDoc} + * @param consumer A {@code Consumer} that is to be invoked with each + * element in this {@code Node}. If this is an + * {@code DoubleConsumer}, it is cast to {@code DoubleConsumer} + * so the elements may be processed without boxing. + */ + @Override + default void forEach(Consumer consumer) { + if (consumer instanceof DoubleConsumer) { + forEach((DoubleConsumer) consumer); + } + else { + if (Tripwire.ENABLED) + Tripwire.trip(getClass(), "{0} calling Node.OfLong.forEach(Consumer)"); + spliterator().forEach(consumer); + } + } + + /** + * Traverses the elements of this node, and invoke the provided + * {@code DoubleConsumer} with each element. + * + * @param consumer A {@code DoubleConsumer} that is to be invoked with + * each element in this {@code Node} + */ + void forEach(DoubleConsumer consumer); + + // + + /** + * {@inheritDoc} + * @implSpec the default implementation invokes the generator to create + * an instance of a Double[] array with a length of {@link #count()} and + * then invokes {@link #copyInto(Double[], int)} with that Double[] + * array at an offset of 0. This is not efficient and it is recommended + * to invoke {@link #asDoubleArray()}. + */ + @Override + default Double[] asArray(IntFunction generator) { + Double[] boxed = generator.apply((int) count()); + copyInto(boxed, 0); + return boxed; + } + + /** + * {@inheritDoc} + * @implSpec the default implementation invokes {@link #asDoubleArray()} + * to obtain a double[] array then and copies the elements from that + * double[] array into the boxed Double[] array. This is not efficient + * and it is recommended to invoke {@link #copyInto(double[], int)}. + */ + @Override + default void copyInto(Double[] boxed, int offset) { + if (Tripwire.ENABLED) + Tripwire.trip(getClass(), "{0} calling Node.OfDouble.copyInto(Double[], int)"); + + double[] array = asDoubleArray(); + for (int i = 0; i < array.length; i++) { + boxed[offset + i] = array[i]; + } + } + + @Override + default Node.OfDouble getChild(int i) { + throw new IndexOutOfBoundsException(); + } + + /** + * Views this node as a double[] array. + * + *

Depending on the underlying implementation this may return a + * reference to an internal array rather than a copy. It is the callers + * responsibility to decide if either this node or the array is utilized + * as the primary reference for the data.

+ * + * @return an array containing the contents of this {@code Node} + */ + double[] asDoubleArray(); + + /** + * Copies the content of this {@code Node} into a double[] array, starting + * at a given offset into the array. It is the caller's responsibility + * to ensure there is sufficient room in the array. + * + * @param array the array into which to copy the contents of this + * {@code Node} + * @param offset the starting offset within the array + * @throws IndexOutOfBoundsException if copying would cause access of + * data outside array bounds + * @throws NullPointerException if {@code array} is {@code null} + */ + void copyInto(double[] array, int offset); + + /** + * {@inheritDoc} + * @implSpec The default in {@code Node.OfDouble} returns + * {@code StreamShape.DOUBLE_VALUE} + */ + default StreamShape getShape() { + return StreamShape.DOUBLE_VALUE; + } + } +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/PipelineHelper.java 2013-03-11 17:44:01.000000000 -0400 @@ -0,0 +1,258 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Spliterator; +import java.util.function.IntFunction; + +/** + * Helper class for executing + * stream pipelines, + * capturing all of the information about a stream pipeline (source, output + * shape, stream flags, parallelism, etc) in one place. + * + * @apiNote + * A stream pipeline consists of a source, zero or more intermediate operations, + * and a terminal operation. Execution of the stream pipeline begins when the + * terminal operation is executed. A {@code PipelineHelper} describes the + * portion of a stream pipeline including its source, some or all of its + * intermediate operations, and certain information about the terminal (or + * stateful) operation which follows the last intermediate operation described + * by this {@code PipelineHelper}. The {@code PipelineHelper} is passed to the + * {@link TerminalOp#evaluateParallel(PipelineHelper)}, + * {@link TerminalOp#evaluateSequential(PipelineHelper)}, and + * {@link StatefulOp#evaluateParallel(PipelineHelper)}, methods, which can use + * the {@code PipelineHelper} to access the source {@code Spliterator} for the + * pipeline, information about the pipeline such as input shape, output shape, + * stream flags, and size, and use the helper methods such as + * {@link #into(Sink, Spliterator)}, {@link #intoWrapped(Sink, Spliterator)}, + * and {@link #wrapSink(Sink)} to execute pipeline operations. + * + * @param Type of input elements to the pipeline + * @param Type of output elements from the pipeline + * @since 1.8 + */ +interface PipelineHelper { + + /** + * Gets the {@code StreamShape} describing the input shape of the pipeline + * @return The input shape of the pipeline + */ + StreamShape getInputShape(); + + /** + * Gets the {@code StreamShape} describing the output shape of the pipeline + * @return The output shape of the pipeline + */ + StreamShape getOutputShape(); + + /** + * Gets the combined stream and operation flags for the output of the + * pipeline. This will incorporate stream flags from the stream source, all + * the intermediate operations and the terminal operation. + * + * @return the combined stream and operation flags for the output of the + * pipeline + * @see StreamOpFlag + */ + int getStreamAndOpFlags(); + + /** + * Gets the operation flags for the terminal operation. + * + * @return the operation flags for the terminal operation. + * @see StreamOpFlag + */ + // @@@ Specifying this concisely is somewhat complicated since since the actual terminal operation flags + // are masked by StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK as the flags propagate upstream through parallel + // pipeline chunks + int getTerminalOpFlags(); + + /** + * Returns whether this pipeline is parallel or sequential + * + * @return true if the pipeline is a parallel pipeline, otherwise false + */ + boolean isParallel(); + + /** + * Gets the {@code Spliterator} for the source of the pipeline. This + * {@code Spliterator} reflects only the source elements, not the actions of + * any of the intermediate stages. + * + * @return the source spliterator + */ + Spliterator sourceSpliterator(); + + /** + * Returns the exact output size of the portion of the output resulting from + * applying the pipeline stages described by this {@code PipelineHelper} to + * the the portion of the input described by the provided + * {@code Spliterator}, if known. If not known or known infinite, will + * return {@code -1}. + * + * @apiNote + * The exact output size is known if the {@code Spliterator} has the + * {@code SIZED} characteristic, and the operation flags + * {@link StreamOpFlag#SIZED} is known on the combined stream and operation + * flags. + * + * @param spliterator the spliterator describing the relevant portion of the + * source data + * @return the exact size if known, or -1 if infinite or unknown + */ + long exactOutputSizeIfKnown(Spliterator spliterator); + + /** + * Applies the pipeline stages described by this {@code PipelineHelper} to + * the provided {@code Spliterator} and send the results to the provided + * {@code Sink}. + * + * @implSpec + * The implementation behaves as if: + *
+     *     intoWrapped(wrapSink(sink), spliterator);
+     * 
+ * + * @param sink the {@code Sink} to receive the results + * @param spliterator the spliterator describing the portion of the source + * input to process + */ + > S into(S sink, Spliterator spliterator); + + /** + * Pushes elements obtained from the {@code Spliterator} into the provided + * {@code Sink}. If the stream pipeline is known to have short-circuiting + * stages in it (see {@link StreamOpFlag#SHORT_CIRCUIT}), then the elements + * are delivered as per {@link #intoWrappedWithCancel(Sink, Spliterator)}. + * + * @implSpec + * This method conforms to the {@code Sink} protocol of calling + * {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and + * calling {@code Sink.end} after all elements have been pushed. + * + * @param wrappedSink the destination {@code Sink} + * @param spliterator the source {@code Spliterator} + */ + void intoWrapped(Sink wrappedSink, Spliterator spliterator); + + /** + * Pushes elements obtained from the {@code Spliterator} into the provided + * {@code Sink}, checking {@link Sink#cancellationRequested()} after each + * element, and stopping if cancellation is requested. + * + * @implSpec + * This method conforms to the {@code Sink} protocol of calling + * {@code Sink.begin} before pushing elements, via {@code Sink.accept}, and + * calling {@code Sink.end} after all elements have been pushed or if + * cancellation is requested. + * + * @param wrappedSink the destination {@code Sink} + * @param spliterator the source {@code Spliterator} + */ + void intoWrappedWithCancel(Sink wrappedSink, Spliterator spliterator); + + /** + * Takes a {@code Sink} that accepts elements of the output type of the + * {@code PipelineHelper}, and wrap it with a {@code Sink} that accepts + * elements of the input type and implements all the intermediate operations + * described by this {@code PipelineHelper}, delivering the result into the + * provided {@code Sink}. + * + * @param sink the {@code Sink} to receive the results + * @return a {@code Sink} that implements the pipeline stages and sends + * results to the provided {@code Sink} + */ + Sink wrapSink(Sink sink); + + /** + * Constructs a @{link Node.Builder} compatible with the output shape of + * this {@code PipelineHelper} + * + * @param exactSizeIfKnown if >=0 then a builder will be created that has a + * fixed capacity of exactly sizeIfKnown elements; if < 0 then the + * builder has variable capacity. A fixed capacity builder will fail + * if an element is added and the builder has reached capacity. + * @return A {@code Node.Builder} compatible with the output shape of this + * {@code PipelineHelper} + */ + Node.Builder makeNodeBuilder(long exactSizeIfKnown); + + /** + * Collects all output elements resulting from applying the pipeline stages + * to the source {@code Spliterator} into a {@code Node}. + * + * @implSpec + * If the pipeline has no intermediate operations and the source is backed + * by a {@code Node} then that {@code Node} will be returned or flattened + * and then returned. This reduces copying for a pipeline consisting of a + * stateful operation followed by a terminal operation that returns an + * array, such as: + *
{@code
+     *     stream.sorted().toArray();
+     * }
+ * + * @param flatten if true and the pipeline is a parallel pipeline then the + * {@code Node} returned will contain no children, otherwise the + * {@code Node} may represent the root in a tree that reflects the + * shape of the computation tree. + * @return the {@code Node} containing all output elements + */ + Node collectOutput(boolean flatten); + + /** + * Gets an array factory associated with the output type of this pipeline. + * + * @return a factory for arrays of the output type of this pipeline. + */ + IntFunction arrayGenerator(); + + /** + * Collects all output elements resulting from the applying the pipeline + * stages, plus an additional final stage that is an intermediate operation, + * to the source {@code Spliterator} into a {code Node}. The order of + * output elements will respect the encounter order of the source stream, + * and all computation will happen in the invoking thread. + *

+ * Implementations of {@link StatefulOp#evaluateParallel(PipelineHelper)} + * can defer to this method if a sequential implementation is acceptable. + * + * @implSpec + * If the intermediate operation injects {@link StreamOpFlag#SHORT_CIRCUIT} + * then this implementation must stop collecting output elements when the + * sink returned from {@link IntermediateOp#wrapSink(int, Sink)} reports it + * is cancelled. + *

+ * If the intermediate operation preserves or injects + * {@link StreamOpFlag#SIZED} and the output size of the pipeline is known + * then this implementation may apply size optimizations since the output + * size is known. + * + * @param op An {@code IntermediateOp} representing the final stage in the + * pipeline, typically a {@code StatefulOp} + * @return A {@code Node} containing the output of the stream pipeline + */ + Node evaluateSequential(IntermediateOp op); +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/Sink.java 2013-03-11 17:44:01.000000000 -0400 @@ -0,0 +1,349 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.DoubleConsumer; +import java.util.function.IntConsumer; +import java.util.function.LongConsumer; + +/** + * An extension of {@link Consumer} used to conduct values through the stages of + * a stream pipeline, with additional methods to manage size information, + * control flow, etc. Before calling the {@code accept()} method on a + * {@code Sink} for the first time, you must first call the {@code begin()} + * method to inform it that data is coming (optionally informing the sink how + * much data is coming), and after all data has been sent, you must call the + * {@code end()} method. After calling {@code end()}, you should not call + * {@code accept()} without again calling {@code begin()}. {@code Sink} also + * offers a mechanism by which the sink can cooperatively signal that it does + * not wish to receive any more data (the {@code cancellationRequested()} + * method), which a source can poll before sending more data to the + * {@code Sink}. + * + * @apiNote + * + * A stream pipeline consists of a source, zero or more intermediate stages + * (such as filtering or mapping), and a terminal stage, such as reduction or + * for-each. For concreteness, consider the pipeline: + * + *

+ *     int longestStringLengthStartingWithA
+ *         = strings.stream()
+ *                  .filter(s -> s.startsWith("A"))
+ *                  .map(String::length)
+ *                  .max();
+ * 
+ * + * Here, we have three stages, filtering, mapping, and reducing. The filtering + * stage consumes strings and emits a subset of those strings; the mapping stage + * consumes strings and emits ints; the reduction stage consumes those ints and + * computes the maximal value. + * + * A {@code Sink} instance is used to represent each stage of this pipeline, + * whether the stage accepts objects, ints, longs, or doubles. Sink has entry + * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do + * not need a specialized interface for each primitive specialization. (It + * might be called a "kitchen sink" for this omnivorous tendency.) The entry + * point to the pipeline is the {@code Sink} for the filtering stage, which + * sends some elements "downstream" -- into the {@code Sink} for the mapping + * stage, which in turn sends integral values downstream into the {@code Sink} + * for the reduction stage. The {@code Sink} implementations associated with a + * given stage is expected to know the data type for the next stage, and call + * the correct {@code accept} method on its downstream {@code Sink}. Similarly, + * each stage must implement the correct {@code accept} method corresponding to + * the data type it accepts. + * + * The specialized subtypes such as {@link Sink.OfInt} bridge + * {@code accept(Object)} to call the appropriate primitive specialization of + * {@code accept}, implement the appropriate primitive specialization of + * {@code Consumer}, and re-abstract the appropriate primitive specialization of + * {@code accept}. + * + * The chaining subtypes such as {@link ChainedInt} not only implement + * {@code Sink.OfInt}, but also maintain a {@code downstream} field which + * represents the downstream {@code Sink}, and implement the methods + * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to + * delegate to the downstream {@code Sink}. Most implementations of + * intermediate operations will use these chaining wrappers. For example, the + * mapping stage in the above example would look like: + * + *
+ *     IntSink is = new Sink.ChainedReference(sink) {
+ *         public void accept(U u) {
+ *             downstream.accept(mapper.applyAsInt(u));
+ *         }
+ *     };
+ * 
+ * + * Here, we implement {@code Sink.ChanedReference}, meaning that we expect to + * receive elements of type {@code U} as input, and pass the downstream sink to + * the constructor. Because the next stage expects to receive integers, we must + * call the {@code accept(int)} method when emitting values to the downstream. + * The {@code accept()} method applies the mapping function from {@code U} to + * {@code int} and passes the resulting value to the downstream {@code Sink}. + * + * @param Type of elements for value streams + * @since 1.8 + */ +@FunctionalInterface +interface Sink extends Consumer { + /** + * Resets the sink state to receive a fresh data set. This is used when a + * {@code Sink} is being reused by multiple calculations. + * @param size The exact size of the data to be pushed downstream, if + * known or {@code Long.MAX_VALUE} if unknown or infinite. + */ + default void begin(long size) {} + + /** + * Indicates that all elements have been pushed. If the {@code Sink} buffers + * any results from previous values, they should dump their contents + * downstream and clear any stored state. + */ + default void end() {} + + /** + * Communicates to upstream sources that this {@code Sink} does not + * wish to receive any more data + * + * @return true if cancellation is requested + */ + default boolean cancellationRequested() { + return false; + } + + /** + * Accepts an int value + * @implSpec The default implementation throws IllegalStateException + * + * @throws IllegalStateException If this sink does not accept int values + */ + default void accept(int value) { + throw new IllegalStateException("called wrong accept method"); + } + + /** + * Accepts a long value + * @implSpec The default implementation throws IllegalStateException + * + * @throws IllegalStateException If this sink does not accept long values + */ + default void accept(long value) { + throw new IllegalStateException("called wrong accept method"); + } + + /** + * Accepts a double value + * @implSpec The default implementation throws IllegalStateException + * + * @throws IllegalStateException If this sink does not accept double values + */ + default void accept(double value) { + throw new IllegalStateException("called wrong accept method"); + } + + /** + * {@code Sink} that implements {@code Sink}, re-abstracts + * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to + * {@code accept(int)}. + */ + @FunctionalInterface + interface OfInt extends Sink, IntConsumer { + @Override + void accept(int value); + + @Override + default void accept(Integer i) { + if (Tripwire.ENABLED) + Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)"); + accept(i.intValue()); + } + } + + /** + * {@code Sink} that implements {@code Sink}, re-abstracts + * {@code accept(long)}, and wires {@code accept(Long)} to bridge to + * {@code accept(long)}. + */ + @FunctionalInterface + interface OfLong extends Sink, LongConsumer { + @Override + void accept(long value); + + @Override + default void accept(Long i) { + if (Tripwire.ENABLED) + Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)"); + accept(i.longValue()); + } + } + + /** + * {@code Sink} that implements {@code Sink}, re-abstracts + * {@code accept(double)}, and wires {@code accept(Double)} to bridge to + * {@code accept(double)}. + */ + @FunctionalInterface + interface OfDouble extends Sink, DoubleConsumer { + @Override + void accept(double value); + + @Override + default void accept(Double i) { + if (Tripwire.ENABLED) + Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)"); + accept(i.doubleValue()); + } + } + + /** + * {@code Sink} implementation designed for creating chains of sinks. The + * {@code begin} and {@code end}, and {@code cancellationRequested} methods + * are wired to chain to the downstream {@code Sink}. This implementation + * takes a downstream {@code Sink} of unknown input shape and produces a + * {@code Sink}. The implementation of the {@code accept()} method must + * call the correct {@code accept()} method on the downstream {@code Sink}. + */ + static abstract class ChainedReference implements Sink { + protected final Sink downstream; + + public ChainedReference(Sink downstream) { + this.downstream = Objects.requireNonNull(downstream); + } + + @Override + public void begin(long size) { + downstream.begin(size); + } + + @Override + public void end() { + downstream.end(); + } + + @Override + public boolean cancellationRequested() { + return downstream.cancellationRequested(); + } + } + + /** + * {@code Sink} implementation designed for creating chains of sinks. The + * {@code begin} and {@code end}, and {@code cancellationRequested} methods + * are wired to chain to the downstream {@code Sink}. This implementation + * takes a downstream {@code Sink} of unknown input shape and produces a + * {@code Sink.OfInt}. The implementation of the {@code accept()} method + * must call the correct {@code accept()} method on the downstream + * {@code Sink}. + */ + static abstract class ChainedInt implements Sink.OfInt { + protected final Sink downstream; + + public ChainedInt(Sink downstream) { + this.downstream = Objects.requireNonNull(downstream); + } + + @Override + public void begin(long size) { + downstream.begin(size); + } + + @Override + public void end() { + downstream.end(); + } + + @Override + public boolean cancellationRequested() { + return downstream.cancellationRequested(); + } + } + + /** + * {@code Sink} implementation designed for creating chains of sinks. The + * {@code begin} and {@code end}, and {@code cancellationRequested} methods + * are wired to chain to the downstream {@code Sink}. This implementation + * takes a downstream {@code Sink} of unknown input shape and produces a + * {@code Sink.OfLong}. The implementation of the {@code accept()} method + * must call the correct {@code accept()} method on the downstream + * {@code Sink}. + */ + static abstract class ChainedLong implements Sink.OfLong { + protected final Sink downstream; + + public ChainedLong(Sink downstream) { + this.downstream = Objects.requireNonNull(downstream); + } + + @Override + public void begin(long size) { + downstream.begin(size); + } + + @Override + public void end() { + downstream.end(); + } + + @Override + public boolean cancellationRequested() { + return downstream.cancellationRequested(); + } + } + + /** + * {@code Sink} implementation designed for creating chains of sinks. The + * {@code begin} and {@code end}, and {@code cancellationRequested} methods + * are wired to chain to the downstream {@code Sink}. This implementation + * takes a downstream {@code Sink} of unknown input shape and produces a + * {@code Sink.OfDouble}. The implementation of the {@code accept()} method + * must call the correct {@code accept()} method on the downstream + * {@code Sink}. + */ + static abstract class ChainedDouble implements Sink.OfDouble { + protected final Sink downstream; + + public ChainedDouble(Sink downstream) { + this.downstream = Objects.requireNonNull(downstream); + } + + @Override + public void begin(long size) { + downstream.begin(size); + } + + @Override + public void end() { + downstream.end(); + } + + @Override + public boolean cancellationRequested() { + return downstream.cancellationRequested(); + } + } +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/StatefulOp.java 2013-03-11 17:44:02.000000000 -0400 @@ -0,0 +1,49 @@ +package java.util.stream; + +/** + * A stateful intermediate stream operation ({@link IntermediateOp}). + * Stateful means that state is accumulated as elements are processed. + * Examples of stateful operations are sorting, extracting a subsequence of the + * input, or removing duplicates. Statefulness has an effect on how the + * operation can be parallelized. Stateless operations parallelize trivially + * because they are homomorphisms under concatenation: + * + *
+ *     statelessOp(a || b) = statelessOp(a) || statelessOp(b)
+ * 
+ * + * where {@code ||} denotes concatenation. Stateful operations may still be + * parallelizable, but are not amenable to the automatic parallelization of + * stateless operations. Accordingly, a stateful operation must provide its + * own parallel execution implementation + * ({@link #evaluateParallel(PipelineHelper)}). + * + * @param Type of input and output elements. + * + * @see IntermediateOp + * @see TerminalOp + * @since 1.8 + */ +interface StatefulOp extends IntermediateOp { + + /** + * Returns {@code true}. Any overriding implementations must also return + * {@code true} + * @implSpec The default implementation returns {@code true} + * @return {@code true} + */ + @Override + default boolean isStateful() { + return true; + } + + /** + * {@inheritDoc} + * + * An implementation of this method must be provided, but it is acceptable + * if the implementation is sequential. A generic sequential implementation + * is available as + * {@link PipelineHelper#evaluateSequential(IntermediateOp)}. + */ + Node evaluateParallel(PipelineHelper helper); +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/StreamOpFlag.java 2013-03-11 17:44:02.000000000 -0400 @@ -0,0 +1,701 @@ +/* + * Copyright (c) 2012, 2013 Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.EnumMap; +import java.util.Map; +import java.util.Spliterator; + +/** + * Flags corresponding to characteristics of streams and operations. Flags are + * utilized by the stream framework to control, specialize or optimize + * computation. + * + *

+ * Stream flags may be used to describe characteristics of several different + * entities associated with streams: stream sources, intermediate operations, + * and terminal operations. Not all stream flags are meaningful for all + * entities; the following table summarizes which flags are meaningful in what + * contexts: + *

+ *                        DISTINCT  SORTED  ORDERED  SIZED  SHORT_CIRCUIT  PARALLEL
+ * Stream source             Y        Y        Y       Y         N            Y
+ * Intermediate operation    PCI      PCI      PCI     PC        PI           PC
+ * Terminal operation        N        N        PC      N         PI           N
+ * 
+ * + *

In the above table, "PCI" means "may preserve, clear, or inject"; "PC" + * means "may preserve or clear", "PI" means "may preserve or inject", and "N" + * means "not valid". + * + *

Stream flags are represented by unioned bit sets, so that a single word + * may describe all the characteristics of a given stream entity, and that, for + * example, the flags for a stream source can be efficiently combined with the + * flags for later operations on that stream. + * + *

The bit masks {@link #STREAM_MASK}, {@link #OP_MASK}, and + * {@link #TERMINAL_OP_MASK} can be ANDed with a bit set of stream flags to + * produce a mask containing only the valid flags for that entity type. + * + *

When describing a stream source, one only need describe what + * characteristics that stream has; when describing a stream operation, one need + * describe whether the operation preserves, injects, or clears that + * characteristic. Accordingly, two bits are used for each flag, so as to allow + * representing not only the presence of of a characteristic, but how an + * operation modifies that characteristic. There are two common forms in which + * flag bits are combined into an {@code int} bit set. Stream flags + * are a unioned bit set constructed by ORing the enum characteristic values of + * {@link #set()} (or, more commonly, ORing the corresponding static named + * constants prefixed with {@code IS_}). Operation flags are a unioned + * bit set constructed by ORing the enum characteristic values of {@link #set()} + * or {@link #clear()} (to inject, or clear, respectively, the corresponding + * flag), or more commonly ORing the corresponding named constants prefixed with + * {@code IS_} or {@code NOT_}. Flags that are not marked with {@code IS_} or + * {@code NOT_} are implicitly treated as preserved. Care must be taken when + * combining bitsets that the correct combining operations are applied in the + * correct order. + * + *

+ * With the exception of {@link #PARALLEL}, stream characteristics can be + * derived from the equivalent {@link java.util.Spliterator} characteristics: + * {@link java.util.Spliterator#DISTINCT}, {@link java.util.Spliterator#SORTED}, + * {@link java.util.Spliterator#ORDERED}, and + * {@link java.util.Spliterator#SIZED}. A spliterator characteristics bit set + * can be converted to stream flags using the method + * {@link #fromCharacteristics(java.util.Spliterator)} and converted back using + * {@link #toCharacteristics(int)}. (The bit set + * {@link #SPLITERATOR_CHARACTERISTICS_MASK} is used to AND with a bit set to + * produce a valid spliterator characteristics bit set that can be converted to + * stream flags.) + * + *

+ * The source of a stream encapsulates a spliterator. The characteristics of + * that source spliterator when transformed to stream flags will be a proper + * subset of stream flags of that stream. + * For example: + *

 {@code
+ *     Spliterator s = ...;
+ *     Stream stream = Streams.stream(s);
+ *     flagsFromSplitr = fromCharacteristics(s.characteristics());
+ *     assert(flagsFromSplitr & stream.getStreamFlags() == flagsFromSplitr);
+ * }
+ * + *

+ * An intermediate operation, performed on an input stream to create a new + * output stream, may preserve, clear or inject stream or operation + * characteristics. Similarly, a terminal operation, performed on an input + * stream to produce an output result may preserve, clear or inject stream or + * operation characteristics. Preservation means that if that characteristic + * is present on the input, then it is also present on the output. Clearing + * means that the characteristic is not present on the output regardless of the + * input. Injection means that the characteristic is present on the output + * regardless of the input. If a characteristic is not cleared or injected then + * it is implicitly preserved. + * + *

+ * A pipeline consists of a stream source encapsulating a spliterator, one or + * more intermediate operations, and finally a terminal operation that produces + * a result. At each stage of the pipeline, a combined stream and operation + * flags can be calculated, using {@link #combineOpFlags(int, int)}. Such flags + * ensure that preservation, clearing and injecting information is retained at + * each stage. + * + * The combined stream and operation flags for the source stage of the pipeline + * is calculated as follows: + *

 {@code
+ *     int flagsForSourceStage = combineOpFlags(sourceFlags, INITIAL_OPS_VALUE);
+ * }
+ * + * The combined stream and operation flags of each subsequent intermediate + * operation stage in the pipeline is calculated as follows: + *
 {@code
+ *     int flagsForThisStage = combineOpFlags(flagsForPreviousStage, thisOpFlags);
+ * }
+ * + * Finally the flags output from the last intermediate operation of the pipeline + * are combined with the operation flags of the terminal operation to produce + * the flags output from the pipeline. + * + *

Those flags can then be used to apply optimizations. For example, if + * {@code SIZED.isKnown(flags)} returns true then the stream size remains + * constant throughout the pipeline, this information can be utilized to + * pre-allocate data structures and combined with + * {@link java.util.Spliterator#SUBSIZED} that information can be utilized to + * perform concurrent in-place updates into a shared array. + * + * For specific details see the {@link AbstractPipeline} constructors. + * + * @since 1.8 + */ +// @@@ When a new flag is added what should happen for existing operations? +// Need to move to a builder approach used by ops where the masks for the new flag are +// taken into account for default behaviour. +enum StreamOpFlag { + + /* + * Each characteristic takes up 2 bits in a bit set to accommodate + * preserving, clearing and setting/injecting information. + * + * This applies to stream flags, intermediate/terminal operation flags, and + * combined stream and operation flags. Even though the former only requires + * 1 bit of information per characteristic, is it more efficient when + * combining flags to align set and inject bits. + * + * Characteristics belong to certain types, see the Type enum. Bit masks for + * the types are constructed as per the following table: + * + * DISTINCT SORTED ORDERED SIZED SHORT_CIRCUIT PARALLEL + * SPLITERATOR 01 01 01 01 00 00 + * STREAM 01 01 01 01 00 01 + * OP 11 11 11 10 01 10 + * TERMINAL_OP 00 00 10 00 01 00 + * UPSTREAM_TERMINAL_OP 00 00 10 00 00 00 + * + * 01 = set/inject + * 10 = clear + * 11 = preserve + * + * Construction of the columns is performed using a simple builder for + * non-zero values. + */ + + + // The following flags correspond to characteristics on Spliterator + // and the values MUST be equal. + // + + /** + * Characteristic value signifying that, for each pair of + * encountered elements in a stream {@code x, y}, {@code !x.equals(y)}. + *

+ * A stream may have this value or an intermediate operation can preserve, + * clear or inject this value. + */ + // 0, 0x00000001 + // Matches Spliterator.DISTINCT + DISTINCT(0, + set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)), + + /** + * Characteristic value signifying that encounter order follows a natural + * sort order of comparable elements. + *

+ * A stream can have this value or an intermediate operation can preserve, + * clear or inject this value. + *

+ * Note: The {@link java.util.Spliterator#SORTED} characteristic can define + * a sort order with an associated non-null comparator. Augmenting flag + * state with addition properties such that those properties can be passed + * to operations requires some disruptive changes for a singular use-case. + * Furthermore, comparing comparators for equality beyond that of identity + * is likely to be unreliable. Therefore the {@code SORTED} characteristic + * for a defined non-natural sort order is not mapped internally to the + * {@code SORTED} flag. + */ + // 1, 0x00000004 + // Matches Spliterator.SORTED + SORTED(1, + set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)), + + /** + * Characteristic value signifying that an encounter order is + * defined for stream elements. + *

+ * A stream can have this value, an intermediate operation can preserve, + * clear or inject this value, or a terminal operation can preserve or clear + * this value. + */ + // 2, 0x00000010 + // Matches Spliterator.ORDERED + ORDERED(2, + set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP).clear(Type.TERMINAL_OP) + .clear(Type.UPSTREAM_TERMINAL_OP)), + + /** + * Characteristic value signifying that size of the stream + * is of a known finite size that is equal to the known finite + * size of the source spliterator input to the first stream + * in the pipeline. + *

+ * A stream can have this value or an intermediate operation can preserve or + * clear this value. + */ + // 3, 0x00000040 + // Matches Spliterator.SIZED + SIZED(3, + set(Type.SPLITERATOR).set(Type.STREAM).clear(Type.OP)), + + // The following Spliterator characteristics are not currently used but a + // gap in the bit set is deliberately retained to enable corresponding + // stream flags if//when required without modification to other flag values. + // + // 4, 0x00000100 INFINITE(4, ... + // 5, 0x00000400 NONNULL(5, ... + // 6, 0x00001000 IMMUTABLE(6, ... + // 7, 0x00004000 CONCURRENT(7, ... + // 8, 0x00010000 SUBSIZED(8, ...) + + // The following 3 flags are currently undefined and a free for any further + // spliterator characteristics. + // + // 9, 0x00040000 + // 10, 0x00100000 + // 11, 0x00400000 + + // The following flags are specific to streams and operations + // + + /** + * Characteristic value signifying that an operation may short-circuit the + * stream. + *

+ * An intermediate operation can preserve or inject this value, + * or a terminal operation can preserve or inject this value. + */ + // 12, 0x01000000 + SHORT_CIRCUIT(12, + set(Type.OP).set(Type.TERMINAL_OP)), + + + /** + * Characteristic value signifying that the stream is to be evaluated in + * parallel rather than sequentially. + *

+ * A stream can have this value or an intermediate operation can preserve or + * clear this value. + */ + // 13, 0x04000000 + PARALLEL(13, + set(Type.STREAM).clear(Type.OP)); + + // The following 2 flags are currently undefined and a free for any further + // stream flags if/when required + // + // 14, 0x10000000 + // 15, 0x40000000 + + /** + * Type of a flag + */ + enum Type { + /** The flag is associated with spliterator characteristics. */ + SPLITERATOR, + + /** The flag is associated with stream flags. */ + STREAM, + + /** The flag is associated with intermediate operation flags. */ + OP, + + /** The flag is associated with terminal operation flags. */ + TERMINAL_OP, + + /** + * The flag is associated with terminal operation flags that are + * propagated upstream across the last stateful operation boundary + */ + UPSTREAM_TERMINAL_OP + } + + /** + * The bit pattern for setting/injecting a flag. + */ + private static final int SET_BITS = 0b01; + + /** + * The bit pattern for clearing a flag. + */ + private static final int CLEAR_BITS = 0b10; + + /** + * The bit pattern for preserving a flag. + */ + private static final int PRESERVE_BITS = 0b11; + + private static MaskBuilder set(Type t) { + return new MaskBuilder(new EnumMap<>(Type.class)).set(t); + } + + private static class MaskBuilder { + final Map map; + + MaskBuilder(Map map) { + this.map = map; + } + + MaskBuilder mask(Type t, Integer i) { + map.put(t, i); + return this; + } + + MaskBuilder set(Type t) { + return mask(t, SET_BITS); + } + + MaskBuilder clear(Type t) { + return mask(t, CLEAR_BITS); + } + + MaskBuilder setAndClear(Type t) { + return mask(t, PRESERVE_BITS); + } + + Map build() { + for (Type t : Type.values()) { + map.putIfAbsent(t, 0b00); + } + return map; + } + } + + // The mask table for a flag, this is used to determine + // if a flag corresponds to a certain flag type and for creating + // mask constants. + private final Map maskTable; + + // The bit position in the bit mask + private final int bitPosition; + + // The set 2 bit set offset at the bit position + private final int set; + + // The clear 2 bit set offset at the bit position + private final int clear; + + // The preserve 2 bit set offset at the bit position + private final int preserve; + + private StreamOpFlag(int position, MaskBuilder maskBuilder) { + this.maskTable = maskBuilder.build(); + // Two bits per flag + position *= 2; + this.bitPosition = position; + this.set = SET_BITS << position; + this.clear = CLEAR_BITS << position; + this.preserve = PRESERVE_BITS << position; + } + + /** + * Gets the bitmap associated with setting this characteristic + * @return the bitmap for setting this characteristic + */ + int set() { + return set; + } + + /** + * Gets the bitmap associated with clearing this characteristic + * @return the bitmap for clearing this characteristic + */ + int clear() { + return clear; + } + + /** + * Determines if this flag is a stream-based flag. + * + * @return true if a stream-based flag, otherwise false. + */ + boolean isStreamFlag() { + return maskTable.get(Type.STREAM) > 0; + } + + /** + * Checks if this flag is set on stream flags, injected on operation flags, + * and injected on combined stream and operation flags. + * + * @param flags the stream flags, operation flags, or combined stream and + * operation flags + * @return true if this flag is known, otherwise false. + */ + boolean isKnown(int flags) { + return (flags & preserve) == set; + } + + /** + * Checks if this flag is cleared on operation flags or combined stream and + * operation flags. + * + * @param flags the operation flags or combined stream and operations flags. + * @return true if this flag is preserved, otherwise false. + */ + boolean isCleared(int flags) { + return (flags & preserve) == clear; + } + + /** + * Checks if this flag is preserved on combined stream and operation flags. + * + * @param flags the combined stream and operations flags. + * @return true if this flag is preserved, otherwise false. + */ + boolean isPreserved(int flags) { + return (flags & preserve) == preserve; + } + + /** + * Determines if this flag can be set for a flag type. + * + * @param t the flag type. + * @return true if this flag can be set for the flag type, otherwise false. + */ + boolean canSet(Type t) { + return (maskTable.get(t) & SET_BITS) > 0; + } + + /** + * The bit mask for spliterator characteristics + */ + static final int SPLITERATOR_CHARACTERISTICS_MASK = createMask(Type.SPLITERATOR); + + /** + * The bit mask for source stream flags. + */ + static final int STREAM_MASK = createMask(Type.STREAM); + + /** + * The bit mask for intermediate operation flags. + */ + static final int OP_MASK = createMask(Type.OP); + + /** + * The bit mask for terminal operation flags. + */ + static final int TERMINAL_OP_MASK = createMask(Type.TERMINAL_OP); + + /** + * The bit mask for upstream terminal operation flags. + */ + static final int UPSTREAM_TERMINAL_OP_MASK = createMask(Type.UPSTREAM_TERMINAL_OP); + + private static int createMask(Type t) { + int mask = 0; + for (StreamOpFlag flag : StreamOpFlag.values()) { + mask |= flag.maskTable.get(t) << flag.bitPosition; + } + return mask; + } + + // Complete flag mask + private static final int FLAG_MASK = createFlagMask(); + + private static int createFlagMask() { + int mask = 0; + for (StreamOpFlag flag : StreamOpFlag.values()) { + mask |= flag.preserve; + } + return mask; + } + + // Flag mask for stream flags that are set + private static final int FLAG_MASK_IS = STREAM_MASK; + + // Flag mask for stream flags that are cleared + private static final int FLAG_MASK_NOT = STREAM_MASK << 1; + + /** + * The initial value to be combined with the stream flags of the first + * stream in the pipeline. + */ + static final int INITIAL_OPS_VALUE = FLAG_MASK_IS | FLAG_MASK_NOT; + + /** + * The bit value to set or inject {@link #DISTINCT} + */ + static final int IS_DISTINCT = DISTINCT.set; + + /** + * The bit value to clear {@link #DISTINCT} + */ + static final int NOT_DISTINCT = DISTINCT.clear; + + /** + * The bit value to set or inject {@link #SORTED} + */ + static final int IS_SORTED = SORTED.set; + + /** + * The bit value to clear {@link #SORTED} + */ + static final int NOT_SORTED = SORTED.clear; + + /** + * The bit value to set or inject {@link #ORDERED} + */ + static final int IS_ORDERED = ORDERED.set; + + /** + * The bit value to clear {@link #ORDERED} + */ + static final int NOT_ORDERED = ORDERED.clear; + + /** + * The bit value to set {@link #SIZED} + */ + static final int IS_SIZED = SIZED.set; + + /** + * The bit value to clear {@link #SIZED} + */ + static final int NOT_SIZED = SIZED.clear; + + /** + * The bit value to inject {@link #SHORT_CIRCUIT} + */ + static final int IS_SHORT_CIRCUIT = SHORT_CIRCUIT.set; + + /** + * The bit value to set {@link #PARALLEL} + */ + static final int IS_PARALLEL = PARALLEL.set; + + /** + * The bit value to clear {@link #PARALLEL} + */ + static final int NOT_PARALLEL = PARALLEL.clear; + + private static int getMask(int flags) { + return (flags == 0) + ? FLAG_MASK + : ~(flags | ((FLAG_MASK_IS & flags) << 1) | ((FLAG_MASK_NOT & flags) >> 1)); + } + + /** + * Combines stream or operation flags with previously combined stream and + * operation flags to produce updated combined stream and operation flags. + *

+ * A flag set on stream flags or injected on operation flags, + * and injected combined stream and operation flags, + * will be injected on the updated combined stream and operation flags. + *

+ *

+ * A flag set on stream flags or injected on operation flags, + * and cleared on the combined stream and operation flags, + * will be cleared on the updated combined stream and operation flags. + *

+ *

+ * A flag set on the stream flags or injected on operation flags, + * and preserved on the combined stream and operation flags, + * will be injected on the updated combined stream and operation flags. + *

+ *

+ * A flag not set on the stream flags or cleared/preserved on operation + * flags, and injected on the combined stream and operation flags, + * will be injected on the updated combined stream and operation flags. + *

+ *

+ * A flag not set on the stream flags or cleared/preserved on operation + * flags, and cleared on the combined stream and operation flags, + * will be cleared on the updated combined stream and operation flags. + *

+ *

+ * A flag not set on the stream flags, + * and preserved on the combined stream and operation flags + * will be preserved on the updated combined stream and operation flags. + *

+ *

+ * A flag cleared on operation flags, + * and preserved on the combined stream and operation flags + * will be cleared on the updated combined stream and operation flags. + *

+ *

+ * A flag preserved on operation flags, + * and preserved on the combined stream and operation flags + * will be preserved on the updated combined stream and operation flags. + *

+ * + * @param newStreamOrOpFlags the stream or operation flags. + * @param prevCombOpFlags previously combined stream and operation flags. + * The value {#link INITIAL_OPS_VALUE} must be used as the seed value. + * @return the updated combined stream and operation flags. + */ + static int combineOpFlags(int newStreamOrOpFlags, int prevCombOpFlags) { + // 0x01 or 0x10 nibbles are transformed to 0x11 + // 0x00 nibbles remain unchanged + // Then all the bits are flipped + // Then the result is logically or'ed with the operation flags. + return (prevCombOpFlags & StreamOpFlag.getMask(newStreamOrOpFlags)) | newStreamOrOpFlags; + } + + /** + * Converts combined stream and operation flags to stream flags. + * + *

Each flag injected on the combined stream and operation flags will be + * set on the stream flags. + * + * @param combOpFlags the combined stream and operation flags. + * @return the stream flags. + */ + static int toStreamFlags(int combOpFlags) { + // By flipping the nibbles 0x11 become 0x00 and 0x01 become 0x10 + // Shift left 1 to restore set flags and mask off anything other than the set flags + return ((~combOpFlags) >> 1) & FLAG_MASK_IS & combOpFlags; + } + + /** + * Converts stream flags to a spliterator characteristic bit set. + * + * @param streamFlags the stream flags. + * @return the spliterator characteristic bit set. + */ + static int toCharacteristics(int streamFlags) { + return streamFlags & SPLITERATOR_CHARACTERISTICS_MASK; + } + + /** + * Converts a spliterator characteristic bit set to stream flags. + * + * @implSpec + * If the spliterator is naturally {@code SORTED} (the associated + * {@code Comparator} is {@code null}) then the characteristic is converted + * to the {@link #SORTED} flag, otherwise the characteristic is not + * converted. + * + * @param spliterator the spliterator from which to obtain characteristic + * bit set. + * @return the stream flags. + */ + static int fromCharacteristics(Spliterator spliterator) { + int characteristics = spliterator.characteristics(); + if ((characteristics & Spliterator.SORTED) != 0 && spliterator.getComparator() != null) { + // Do not propagate the SORTED characteristic if it does not correspond + // to a natural sort order + return characteristics & SPLITERATOR_CHARACTERISTICS_MASK & ~Spliterator.SORTED; + } + else { + return characteristics & SPLITERATOR_CHARACTERISTICS_MASK; + } + } + + /** + * Converts a spliterator characteristic bit set to stream flags. + * + * @param characteristics the spliterator characteristic bit set. + * @return the stream flags. + */ + static int fromCharacteristics(int characteristics) { + return characteristics & SPLITERATOR_CHARACTERISTICS_MASK; + } +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/StreamShape.java 2013-03-11 17:44:02.000000000 -0400 @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.function.ToIntFunction; + +/** + * An enum describing the known shape specializations for stream abstractions. Each will correspond to + * a specific subinterface of {@link BaseStream} (e.g., {@code REFERENCE} corresponds to {@code Stream}, + * {@code INT_VALUE} corresponds to {@code IntStream}). Each may also correspond to specializations of + * value-handling abstractions such as {@code Spliterator}, {@code Consumer}, etc. + * + * @apiNote + * This enum is used by implementations to determine compatibility between streams and operations (i.e., if the + * output shape of a stream is compatible with the input shape of the next operation). It is also used to reduce + * the code bloat associated with having multiple specialized stream types for primitives by allowing some code + * to be largely shape-independent. + * + *

Some APIs require you to specify both a generic type and a stream shape for input or output elements, such as + * {@link IntermediateOp} which has both generic type parameters for its input and output types, and getters for + * the input and output shape. When representing primitive streams in this way, the generic type parameter should + * correspond to the wrapper type for that primitive type. Accordingly, the {@code IntermediateOp} implementating + * {@link Stream#map(ToIntFunction)} would have an output type parameter of {@code Integer} and an output + * shape of @{code INT_VALUE}. + * @since 1.8 + */ +enum StreamShape { + REFERENCE, + INT_VALUE, + LONG_VALUE, + DOUBLE_VALUE +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/TerminalOp.java 2013-03-11 17:44:02.000000000 -0400 @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +/** + * An operation in a stream pipeline that takes a stream as input and produces + * a result or side-effect. A {@code TerminalOp} has an input type and stream + * shape, and a result type. A {@code TerminalOp} also has a set of + * operation flags that describes how the operation processes elements + * of the stream (such as short-circuiting or respecting encounter order; see + * {@link StreamOpFlag}). + * + *

A {@code TerminalOp} must provide a sequential and parallel implementation + * of the operation relative to a given stream source and set of intermediate + * operations. + * + * @param The type of input elements + * @param The type of the result + * @see StatefulOp + * @see IntermediateOp + * @since 1.8 + */ +interface TerminalOp { + /** + * Gets the shape of the input type of this operation + * + * @implSpec The default returns {@code StreamShape.REFERENCE} + * @return Shape of the input type of this operation + */ + default StreamShape inputShape() { return StreamShape.REFERENCE; } + + /** + * Gets the properties of the operation. Terminal operations may set a + * limited subset of the stream flags defined in {@link StreamOpFlag}, and + * these flags are combined with the previously combined stream and + * intermediate operation flags for the pipeline. + * + * @implSpec The default implementation returns zero + * @return the properties of the operation + * @see {@link StreamOpFlag} + */ + default int getOpFlags() { return 0; } + + /** + * Performs a parallel evaluation of the operation using the specified + * {@code PipelineHelper}, which describes the stream source and upstream + * intermediate operations. + * + * @implSpec The default performs a sequential evaluation of the operation + * using the specified {@code PipelineHelper} + * + * @param helper the pipeline helper + * @param the type of elements in the pipeline source + * @return the result of the evaluation + */ + default R evaluateParallel(PipelineHelper helper) { + if (Tripwire.ENABLED) + Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default"); + return evaluateSequential(helper); + } + + /** + * Performs a sequential evaluation of the operation using the specified + * {@code PipelineHelper}, which describes the stream source and upstream + * intermediate operations. + * + * @param helper the pipeline helper + * @param the type of elements in the pipeline source + * @return the result of the evaluation + */ + R evaluateSequential(PipelineHelper helper); +} --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/TerminalSink.java 2013-03-11 17:44:03.000000000 -0400 @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.function.Supplier; + +/** + * A Sink which accumulates state as elements are accepted, and allows a result + * to be retrieved after the computation is finished. + * + * @param The type of elements to be accepted + * @param The type of the result + * + * @since 1.8 + */ +interface TerminalSink extends Sink, Supplier { } --- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/Tripwire.java 2013-03-11 17:44:03.000000000 -0400 @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Utility class for detecting inadvertent uses of boxing in + * {@code java.util.stream} classes. The detection is turned on or off based on + * whether the system property {@code org.openjdk.java.util.stream.tripwire} is + * considered {@code true} according to {@link Boolean#getBoolean(String)}. + * Turned off for production. + * + * @apiNote + * Typical usage would be for boxing code to do: + *

+ *     if (Tripwire.enabled)
+ *         Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
+ * 
+ * + * @since 1.8 + */ +final class Tripwire { + private static final String TRIPWIRE_PROPERTY = "org.openjdk.java.util.stream.tripwire"; + + /** Should debugging checks be enabled? */ + static final boolean ENABLED = true; +// = Boolean.getBoolean(TRIPWIRE_PROPERTY); + + private Tripwire() { } + + /** + * Produces a log warning, using {@code Logger.getLogger(className)}, using + * the supplied message. The class name of {@code trippingClass} will be + * used as the first parameter to the message. + * + * @param trippingClass Name of the class generating the message + * @param msg A message format string of the type expected by {@link Logger} + */ + static void trip(Class trippingClass, String msg) { + Logger.getLogger(trippingClass.getName()).log(Level.WARNING, msg, trippingClass.getName()); + } +}