Package java.util.stream

java.util.stream

See: Description

Package java.util.stream Description

java.util.stream

Classes to support functional-style operations on streams of values, as in the following:
int sumOfWeights = blocks.stream().filter(b -> b.getColor() == RED)
                                       .mapToInt(b -> b.getWeight())
                                       .sum();
 

Here we use blocks, which might be a Collection, as a source for a stream, and then perform a filter-map-reduce (sum() is an example of a reduction operation) on the stream to obtain the sum of the weights of the red blocks.

The key abstraction used in this approach is Stream, as well as its primitive specializations IntStream, LongStream, and DoubleStream. Streams differ from Collections in several ways:

Stream pipelines

Streams are used to create pipelines of operations. A complete stream pipeline has several components: a source (which may be a Collection, an array, a generator function, or an IO channel); zero or more intermediate operations such as Stream#filter or Stream#map; and a terminal operation such as Stream#forEach or Stream#reduce. Stream operations may take as parameters function values (which are often lambda expressions, but could be method references or objects) which parameterize the behavior of the operation, such as a Predicate passed to the Stream#filter method.

Intermediate operations return a new Stream. They are lazy; executing an intermediate operation such as Stream#filter does not actually perform any filtering, instead creating a new Stream that, when traversed, contains the elements of the initial Stream that match the given Predicate. Consuming elements from the stream source does not begin until the terminal operation is executed.

Terminal operations consume the Stream and produce a result or a side-effect. After a terminal operation is performed, the stream can no longer be used and you must return to the data source, or select a new data source, to get a new stream. For example, obtaining the sum of weights of all red blocks, and then of all blue blocks, requires a filter-map-reduce on two different streams:

int sumOfRedWeights  = blocks.stream().filter(b -> b.getColor() == RED)
                                           .mapToInt(b -> b.getWeight())
                                           .sum();
     int sumOfBlueWeights = blocks.stream().filter(b -> b.getColor() == BLUE)
                                           .mapToInt(b -> b.getWeight())
                                           .sum();
 

However, there are other techniques that allow you to obtain both results in a single pass if multiple traversal is impractical or inefficient. TODO provide link

Stream operations

Intermediate stream operation (such as filter or sorted) always produce a new Stream, and are alwayslazy. Executing a lazy operations does not trigger processing of the stream contents; all processing is deferred until the terminal operation commences. Processing streams lazily allows for significant efficiencies; in a pipeline such as the filter-map-sum example above, filtering, mapping, and addition can be fused into a single pass, with minimal intermediate state. Laziness also enables us to avoid examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", one need not examine all the input strings, just enough to find one that has the desired characteristics. (This behavior becomes even more important when the input stream is infinite and not merely large.)

Intermediate operations are further divided into stateless and stateful operations. Stateless operations retain no state from previously seen values when processing a new value; examples of stateless intermediate operations include filter and map. Stateful operations may incorporate state from previously seen elements in processing new values; examples of stateful intermediate operations include distict and sorted. Stateful operations may need to process the entire input before producing a result; for example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations have to be executed in multiple passes. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel.

Further, some operations are deemed short-circuiting operations. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. (Having a short-circuiting operation is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.) Terminal operations (such as forEach or findFirst) are always eager (they execute completely before returning), and produce a non-Stream result, such as a primitive value or a Collection, or have side-effects.

Parallelism

By recasting aggregate operations as a pipeline of operations on a stream of values, many aggregate operations can be more easily parallelized. A Stream can execute either in serial or in parallel. When streams are created, they are either created as sequential or parallel streams; the parallel-ness of streams can also be switched by the Stream#sequential() and BaseStream.parallel() operations. The Stream implementations in the JDK create serial streams unless parallelism is explicitly requested. For example, Collection has methods Collection.stream() and Collection.parallelStream(), which produce sequential and parallel streams respectively; other stream-bearing methods such as Streams.intRange(int, int) produce sequential streams but these can be efficiently parallelized by calling parallel() on the result. The set of operations on serial and parallel streams is identical. To execute the "sum of weights of blocks" query in parallel, we would do:

int sumOfWeights = blocks.parallelStream().filter(b -> b.getColor() == RED)
                                               .mapToInt(b -> b.getWeight())
                                               .sum();
 

The only difference between the serial and parallel versions of this example code is the creation of the initial Stream. Whether a Stream will execute in serial or parallel can be determined by the Stream#isParallel method. When the terminal operation is initiated, the entire stream pipeline is either executed sequentially or in parallel, determined by the last operation that affected the stream's serial-parallel orientation (which could be the stream source, or the sequential() or parallel() methods.)

In order for the results of parallel operations to be deterministic and consistent with their serial equivalent, the function values passed into the various stream operations should be stateless.

Ordering

Streams may or may not have an encounter order. Whether or not there is an encounter order depends on the source, the intermediate operations, and the terminal operation. Certain stream sources (such as List or arrays) are intrinsically ordered, whereas others (such as HashSet) are not. Some intermediate operations may impose an encounter order on an otherwise unordered stream, such as Stream.sorted(), and others may render an ordered stream unordered (such as BaseStream.unordered()). Some terminal operations may ignore encounter order, such as Stream.forEach(java.util.function.Consumer<? super T>).

If a Stream is ordered, most operations are constrained to operate on the elements in their encounter order; if the source of a stream is a List containing [1, 2, 3], then the result of executing map(x -> x*2) must be [2, 4, 6]. However, if the source has no defined encounter order, than any permutation of the values [2, 4, 6] would be a valid result. Many operations can still be efficiently parallelized even under ordering constraints.

For sequential streams, ordering is only relevant to the determinism of operations performed repeatedly on the same source. (An ArrayList is constrained to iterate elements in order; a HashSet is not, and repeated iteration might produce a different order.)

For parallel streams, relaxing the ordering constraint can enable optimized implementation for some operations. For example, duplicate filtration on an ordered stream must completely process the first partition before it can return any elements from a subsequent partition, even if those elements are available earlier. On the other hand, without the constraint of ordering, duplicate filtration can be done more efficiently by using a shared ConcurrentHashSet. There will be cases where the stream is structurally ordered (the source is ordered and the intermediate operations are order-preserving), but the user does not particularly care about the encounter order. In some cases, explicitly de-ordering the stream with the BaseStream.unordered() method may result in improved parallel performance for some stateful or terminal operations.

Non-interference

The java.util.stream package enables you to execute possibly-parallel bulk-data operations over a variety of data sources, including even non-thread-safe collections such as ArrayList. This is possible only if we can prevent interference with the data source during the execution of a stream pipeline. (Execution begins when the terminal operation is invoked, and ends when the terminal operation completes.) For most data sources, preventing interference means ensuring that the data source is not modified at all during the execution of the stream pipeline. (Some data sources, such as concurrent collections, are specifically designed to handle concurrent modification.)

Accordingly, lambda expressions (or other objects implementing the appropriate functional interface) passed to stream methods should never modify the stream's data source. An implementation is said to interfere with the data source if it modifies, or causes to be modified, the stream's data source. The need for non-interference applies to all pipelines, not just parallel ones. Unless the stream source is concurrent, modifying a stream's data source during execution of a stream pipeline can cause exceptions, incorrect answers, or nonconformant results.

Further, results may be nondeterministic or incorrect if the lambda expressions passed to stream operations are stateful. A stateful lambda (or other object implementing the appropriate functional interface) is one whose result depends on any state which might change during the execution of the stream pipeline. An example of a stateful lambda is:

Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
     stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
 
Here, if the mapping operation us performed in parallel, the results for the same input could vary from run to run, due to thread scheduling differences, whereas, with a stateless lambda expression the results would always be the same.

Side-effects

Reduction operations

A reduction operation takes a stream of elements and processes them in a way that reduces to a single value or summary description, such as finding the sum or maximum of a set of numbers. (In more complex scenarios, the reduction operation might need to extract data from the elements before reducing that data to a single value, such as finding the sum of weights of a set of blocks. This would require extracting the weight from each block before summing up the weights.)

Of course, such operations can be readily implemented as simple sequential loops, as in:

int sum = 0;
    for (int x : numbers) {
       sum += x;
    }
 
However, there may be a significant advantage to preferring a reduce operation over a mutative accumulation such as the above -- a properly constructed reduce operation is inherently parallelizable, so long as the BinaryOperator has the right characteristics, specifically that it is associative. For example, given a stream of numbers for which we want to find the sum, we can write:
int sum = numbers.reduce(0, (x,y) -> x+y);
 
or more succinctly:
int sum = numbers.reduce(0, Integer::sum);
 

(The primitive specializations of Stream, such as IntStream, even have convenience methods for common reductions, such as IntStream.sum() or IntStream.max(), which are implemented as simple wrappers around reduce.)

Reduction parallellizes well since the implementation of reduce can operate on subsets of the stream in parallel, and then combine the intermediate results to get the final correct answer. Even if you were to use a parallelizable form of the forEach() method in place of the original for-each loop above, you would still have to provide thread-safe updates to the shared accumulating variable sum, and the required synchronization would likely eliminate any performance gain from parallelism. Using a reduce method instead removes all of the burden of parallelizing the reduction operation, and the library can provide an efficient parallel implementation with no additional synchronization needed.

The "blocks" examples shown earlier shows how reduction combines with other operations to replace for loops with bulk operations. If blocks is a collection of Block objects, which have a getWeight method, we can find the heaviest block with:

OptionalInt heaviest = blocks.stream()
                                  .mapToInt(Block::getWeight)
                                  .reduce(Integer::max);
 

In its more general form, a reduce operation on elements of type T yielding a result of type U requires three parameters:

<U> U reduce(U identity,
              BiFunction<U, ? super T, U> accumlator,
              BinaryOperator<U> combiner);
 
Here, the identity element is both an initial seed for the reduction, and a default result if there are no elements. The accumulator function takes a partial result and the next element, and produce a new partial result. The combiner function combines the partial results of two accumulators to produce a new partial result, and eventually the final result.

This form is a generalization of the two-argument form, and is also a generalization of the map-reduce construct illustrated above. If we wanted to re-cast the simple sum example using the more general form, 0 would be the identity element, while Integer::sum would be both the accumulator and combiner. For the sum-of-weights example, this could be re-cast as:

int sumOfWeights = blocks.stream().reduce(0,
                                               (sum, b) -> sum + b.getWeight())
                                               Integer::sum);
 
though the map-reduce form is more readable and generally preferable. The generalized form is provided for cases where significant work can be optimized away by combining mapping and reducing into a single function.

More formally, the identity value must be an identity for the combiner function. This means that for all u, combiner.apply(identity, u) is equal to u. Additionally, the combiner function must be associative and must be compatible with the accumulator function; for all u and t, the following must hold:

combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
 

Mutable Reduction

A mutable reduction operation is similar to an ordinary reduction, in that it reduces a stream of values to a single value, but instead of producing a distinct single-valued result, it mutates a general result container, such as a Collection or StringBuilder, as it processes the elements in the stream.

For example, if we wanted to take a stream of strings and concatenate them into a single long string, we could achieve this with ordinary reduction:

String concatenated = strings.reduce("", String::concat)
 
We would get the desired result, and it would even work in parallel. However, we might not be happy about the performance! Such an implementation would do a great deal of string copying, and the run time would be O(n^2) in the number of elements. A more performant approach would be to accumulate the results into a StringBuilder, which is a mutable container for accumulating strings. We can use the same technique to parallelize mutable reduction as we do with ordinary reduction.

The mutable reduction operation is called collect(), as it collects together the desired results into a result container such as StringBuilder. A collect operation requires three things: a factory function which will construct new instances of the result container, an accumulating function that will update a result container by incorporating a new element, and a combining function that can take two result containers and merge their contents. The form of this is very similar to the general form of ordinary reduction:

<R> R collect(Supplier<R> resultFactory,
               BiConsumer<R, ? super T> accumulator,
               BiConsumer<R, R> combiner);
 
As with reduce(), the benefit of expressing in this abstract way is that it is directly amenable to parallelization: we can accumulate partial results in parallel and then combine them. For example, to collect the string representations of the elements in a stream into an ArrayList, we could write the obvious sequential for-each form:
ArrayList<String> strings = new ArrayList<>();
     for (T element : stream) {
         strings.add(element.toString());
     }
 
Or we could use a parallelizable collect form:
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
                                                (c, e) -> c.add(e.toString()),
                                                (c1, c2) -> c1.addAll(c2));
 
or, noting that we have buried a mapping operation inside the accumlator function, more succinctly as:
ArrayList<String> strings = stream.map(Object::toString)
                                       .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
 
Here, our supplier is just the ArrayList constructor, the accumulator adds the stringified element to an ArrayList, and the combiner simply uses addAll to copy the strings from one container into the other.

As with the regular reduction operation, the ability to parallelize only comes if an associativity condition is met. The combiner is associative if for result containers r1, r2, and r3:

combiner.accept(r1, r2);
    combiner.accept(r1, r3);
 
is equivalent to
combiner.accept(r2, r3);
    combiner.accept(r1, r2);
 
where equivalence means that r1 is left in the same state (according to the meaning of equals for the element types). Similarly, the resultFactory must act as an identity with respect to the combiner so that for any result container r:
combiner.accept(r, resultFactory.get());
 
does not modify the state of r (again according to the meaning of equals). Finally, the accumulator and combiner must be compatible such that for a result container r and element t:
r2 = resultFactory.get();
    accumulator.accept(r2, t);
    combiner.accept(r, r2);
 
is equivalent to:
accumulator.accept(r,t);
 
where equivalence means that r is left in the same state (again according to the meaning of equals).

The three aspects of collect: supplier, accumulator, and combiner, are often very tightly coupled, and it is convenient to introduce the notion of a Collector as being an object that embodies all three aspects. There is a collect method that simply takes a Collector and returns the resulting container. The above example for collecting strings into a List can be rewritten using a standard Collector as:

ArrayList<String> strings = stream.map(Object::toString)
                                       .collect(Collectors.toList());
 

Reduction, Concurrency, and Ordering

With some complex reduction operations, for example a collect that produces a Map, such as:
Map<Buyer, List<Transaction>> salesByBuyer
         = txns.parallelStream()
               .collect(Collectors.groupingBy(Transaction::getBuyer));
 
(where Collectors.groupingBy(java.util.function.Function<? super T, ? extends K>) is a utility function that returns a Collector for grouping sets of elements based on some key) it may actually be counterproductive to perform the operation in parallel. This is because the combining step (merging one Map into another by key) can be expensive for some Map implementations.

Suppose, however, that the result container used in this reduction was a concurrently modifiable collection -- such as a ConcurrentHashMap. In that case, the parallel invocations of the accumulator could actually deposit their results concurrently into the same shared result container, elminating the need for the combiner to merge distinct result containers. This potentially provides a boost to the parallel execution performance. We call this a concurrent reduction.

A Collector that supports concurrent reduction is marked with the Collector.Characteristics.CONCURRENT characteristic. Having a concurrent collector is a necessary condition for performing a concurrent reduction, but that alone is not sufficient. If you imagine multiple accumulators depositing results into a shared container, the order in which results are deposited is non-deterministic. Consequently, a concurrent reduction is only possible if ordering is not important for the stream being processed. The Stream.collect(Collector) implementation will only perform a concurrent reduction if

For example:
Map<Buyer, List<Transaction>> salesByBuyer
         = txns.parallelStream()
               .unordered()
               .collect(groupingByConcurrent(Transaction::getBuyer));
 
(where Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>) is the concurrent companion to groupingBy).

Note that if it is important that the elements for a given key appear in the order they appear in the source, then we cannot use a concurrent reduction, as ordering is one of the casualties of concurrent insertion. We would then be constrained to implement either a sequential reduction or a merge-based parallel reduction.

Associativity

An operator or function op is associative if the following holds:
(a op b) op c == a op (b op c)
 
The importance of this to parallel evaluation can be seen if we expand this to four terms:
a op b op c op d == (a op b) op (c op d)
 
So we can evaluate (a op b) in parallel with (c op d) and then invoke op on the results. TODO what does associative mean for mutative combining functions? FIXME: we described mutative associativity above.

Stream sources

TODO where does this section go? XXX - change to section to stream construction gradually introducing more complex ways to construct - construction from Collection - construction from Iterator - construction from array - construction from generators - construction from spliterator XXX - the following is quite low-level but important aspect of stream constriction

A pipeline is initially constructed from a spliterator (see Spliterator) supplied by a stream source. The spliterator covers elements of the source and provides element traversal operations for a possibly-parallel computation. See methods on Streams for construction of pipelines using spliterators.

A source may directly supply a spliterator. If so, the spliterator is traversed, split, or queried for estimated size after, and never before, the terminal operation commences. It is strongly recommended that the spliterator report a characteristic of IMMUTABLE or CONCURRENT, or be late-binding and not bind to the elements it covers until traversed, split or queried for estimated size

If a source cannot directly supply a recommended spliterator then it may indirectly supply a spliterator using a Supplier. The spliterator is obtained from the supplier after, and never before, the terminal operation of the stream pipeline commences.

Such requirements significantly reduce the scope of potential interference to the interval starting with the commencing of the terminal operation and ending with the producing a result or side-effect. See Non-Interference for more details. XXX - move the following to the non-interference section

A source can be modified before the terminal operation commences and those modifications will be reflected in the covered elements. Afterwards, and depending on the properties of the source, further modifications might not be reflected and the throwing of a ConcurrentModificationException may occur.

For example, consider the following code:

List<String> l = new ArrayList(Arrays.asList("one", "two"));
     Stream<String> sl = l.stream();
     l.add("three");
     String s = sl.collect(toStringJoiner(" ")).toString();
 
First a list is created consisting of two strings: "one"; and "two". Then a stream is created from that list. Next the list is modified by adding a third string: "three". Finally the elements of the stream are collected and joined together. Since the list was modified before the terminal collect operation commenced the result will be a string of "one two three". However, if the list is modified after the terminal operation commences, as in:
List<String> l = new ArrayList(Arrays.asList("one", "two"));
     Stream<String> sl = l.stream();
     String s = sl.peek(s -> l.add("BAD LAMBDA")).collect(toStringJoiner(" ")).toString();
 
then a ConcurrentModificationException will be thrown since the peek operation will attempt to add the string "BAD LAMBDA" to the list after the terminal operation has commenced.