State of the Lambda: Libraries Edition

THIS DOCUMENT HAS BEEN SUPERSEDED AND IS PROVIDED FOR HISTORICAL CONTEXT ONLY

November 2012

This is an informal overview of the major proposed library enhancements to take advantage of new language features, primarily lambda expressions and extension methods, specified by JSR 335 and implemented in the OpenJDK Lambda Project. This version describes "Iteration 3" of the API design; for comparison, see Iteration 1. We are in the process of implementing this in the lambda repository.

Background

Had lambda expressions (closures) been part of the Java language from the beginning, our Collections APIs would certainly look different than they do today. As the Java language acquires lambda expressions as part of JSR 335, this has the unfortunate side effect of making our Collections interfaces look even more out of date! While it might be tempting to start from scratch and build a replacement Collections framework ("Collections II"), replacing the Collections framework would be a major task, as the Collections interfaces permeate the JDK libraries. Instead, we will pursue an evolutionary strategy of adding extension methods to existing interfaces (such as Collection, List, or Iterable), and adding new interfaces (such as Stream) that are retrofitted onto existing classes, enabling many of the desired idioms without making people trade in their trusty ArrayLists and HashMaps. (This is not to say that Java will never have a new Collections framework; clearly there are limitations with the existing Collections framework beyond simply not being designed for lambdas. Creating a new-and-improved collections framework is a fine candidate for consideration in a future version of the JDK.)

Parallelism is an important driver for this work. Therefore, it is important to encourage idioms that are both sequential- and parallel-friendly. We achieve this by focusing less on in-place mutation and more on computations that produce new values. It is also important to strike the balance between making parallelism easier but not going so far as to make it invisible; our goal is explicit but unobstrusive parallelism for both old and new collections.

For a description of the language features being specified by [JSR 335][], see the State of the Lambda.

Internal vs external iteration

The Collections framework relies on the concept of external iteration, where a Collection provides a way for its client to enumerate its elements (Collection extends Iterable), and clients use this to step sequentially through the elements of a collection. For example, if we wanted to set the color of every shape in a collection of shapes to red, we would write:

for (Shape s : shapes) {
    s.setColor(RED);
}

This example illustrates external iteration; the for-each loop calls the iterator() method of shapes, and steps through the collection one by one. External iteration is straightforward enough, but it has several problems:

Sometimes the tight specification of the for-each loop (sequential, in-order) is desirable, but sometimes this is an impediment to performance.

The alternative to external iteration is internal iteration, where instead of controlling the iteration, the client delegates that to the library and passes in snippets of code to execute at various points in the computation.

The internal-iteration equivalent of the previous example is:

shapes.forEach(s -> { s.setColor(RED); });

This approach moves the control flow management from the client code to the library code, allowing the libraries not only to abstract over common control flow operations, but also enabling them to potentially use laziness, parallelism, and out-of-order execution to improve performance. (Whether the implementation of forEach actually does any of these things is a matter for the library implementor forEach to decide, but with internal iteration they are at least possible, whereas with external iteration, they are not.)

Internal iteration lends itself to a programming style where operations can be "pipelined" together. For example, if we wanted to color only the blue shapes red, we could say:

shapes.stream()
      .filter(s -> s.getColor() == BLUE)
      .forEach(s -> { s.setColor(RED); });

The filter operation produces a stream of values matching the provided condition, and the result of the filter operation is piped into forEach.

If we wanted to collect the blue shapes into a new List, we could say:

List<Shape> blue = shapes.stream()
                         .filter(s -> s.getColor() == BLUE)
                         .into(new ArrayList<>());

If each shape were contained in a Box, and we wanted to know which boxes contained at least one blue shape, we could say:

Set<Box> hasBlueShape = shapes.stream()
                              .filter(s -> s.getColor() == BLUE)
                              .map(s -> s.getContainingBox())
                              .into(new HashSet<>());

If we wanted to add up the total weight of the blue shapes, we could express that as:

int sum = shapes.stream()
                .filter(s -> s.getColor() == BLUE)
                .map(s -> s.getWeight())
                .sum();

So far, we haven't yet written down the signatures of these operations or how they relate to existing Collections classes. These examples simply illustrate the types of problems that are easily solved with internal iteration, and illustrate functionality we want to expose on collections.

The role of laziness

Operations like filtering or mapping, can be performed "eagerly" (where the filtering is complete on the return from the filter method), or "lazily" (where the filtering is only done when you start iterating the elements of the result of the filter method.) Stream operations that produce new streams, such as filtering or mapping, lend themselves to lazy implementation, which often can result in significant performance improvements. We can think of these operations as "naturally lazy", whether or not they are implemented as such. On the other hand, operations like accumulation, or those that produce side effects such as dumping the results into a collection or doing something for each element (such as printing them out), are "naturally eager."

Based on examination of many existing loops, a significant proportion can be restated as bulk operations drawing from a data source (array or collection), doing a series of lazy operations (filtering, mapping, etc), and then doing a single eager operation -- such as filter-map-accumulate, filter-map-sort-foreach, etc. Accordingly, most of the naturally lazy operations tend to be used to compute temporary intermediate results, and we can exploit this property to produce more efficient libraries. (For example, a library that does filtering or mapping lazily can fuse pipelines like filter-map-accumulate into a single pass on the data, rather than three distinct passes; a library that does them eagerly cannot. Similarly, if we are looking for the first element that matches a certain characteristic, a lazy approach lets us get to the answer having examined fewer elements.)

This observation informs a critical design choice: what should the return type of filter and map be? One candidate would be that List.filter returns a new List, which would push us towards an all-eager approach. This is straightforward, but may well end up doing way more work than we really need. Another approach would be to create a whole new set of abstractions for explicit laziness -- LazyList, LazySet, etc. (But note that lazy collections would probably still have operations that trigger eager computation -- such as size.) And, this approach has the risk to devolve into a combinatorial explosion of types like MutableSynchronizedLazySortedSet, etc.

Our preferred approach is to treat the naturally-lazy operations as returning a stream rather than a new collection (which might just get thrown away by the next pipeline stage anyway). Applying this to the examples above, filter draws from a source and produces a stream of values matching the provided Predicate. In most cases where potentially-lazy operations are being applied to aggregates, this turns out to be exactly what we want -- a stream of values that can be passed to the next stage in the pipeline.

The stream approach has the advantage that, when used in a source-lazy-lazy-eager pipeline, the laziness is mostly invisible, as the pipeline is "sealed" at both ends with instantiated data structures, but yields both good usability and performance without dramatically increasing the conceptual surface area of the library.

Streams

A basic set of stream operations is shown below. The methods that return a new Stream are termed intermediate operations; those that do not are termed terminal operations.

Streams differ from Collections in several ways:

The following shows a basic set of stream operations.

public interface Stream<T> {

    Stream<T> filter(Predicate<? super T> predicate);

    <R> Stream<R> map(Mapper<? extends R, ? super T> mapper);

    <R> Stream<R> flatMap(FlatMapper<? extends R, ? super T> mapper);

    Stream<T> uniqueElements();

    Stream<T> sorted(Comparator<? super T> comparator);

    Stream<T> cumulate(BinaryOperator<T> operator);

    void forEach(Block<? super T> block);

    Stream<T> tee(Block<? super T> block);

    Stream<T> limit(int n);
    Stream<T> skip(int n);

    <A extends Destination<? super T>> A into(A target);

    Object[] toArray();

    <U> Map<U, Collection<T>> groupBy(Mapper<? extends U, ? super T> classifier);

    <U, W> Map<U, W> reduceBy(Mapper<? extends U, ? super T> classifier,
                              Factory<W> baseFactory,
                              Combiner<W, W, T> reducer);

    T reduce(T base, BinaryOperator<T> op);

    Optional<T> reduce(BinaryOperator<T> op);

    <U> U fold(Factory<U> baseFactory,
               Combiner<U, U, T> reducer,
               BinaryOperator<U> combiner);

    boolean anyMatch(Predicate<? super T> predicate);
    boolean allMatch(Predicate<? super T> predicate);
    boolean noneMatch(Predicate<? super T> predicate);

    Optional<T> findFirst();
    Optional<T> findAny();

    Stream<T> sequential();
    Stream<T> unordered();
}

Rather than retrofitting all the Stream methods onto Collection, as was done in an earlier iteration of the API, instead we add a single stream() method into Collection which yields a stream backed by the collection.

Stream operations can operate in either serial or parallel; whether the stream is serial or parallel is a property of the stream source. We have also added a parallel() method to Collection that returns a parallel Stream.

Stream functionality is only tangentially tied to Collections; aggregates other than Collection can be used as sources for streams as well. All of the stream operations can be implemented in terms of iteration, so the minimum needed to create a Stream is to create an Iterator for the elements. If additional information is available (such as size or metadata about stream contents, such as sortedness), the library can provide optimized implementations. (To execute in parallel, a stream source needs to provide a Spliterator which manages decomposition in addition to iteration.)

Laziness and short-circuiting

Methods like anyMatch, while eager, can use short-circuiting to stop processing once they can determine the final result -- it need only evaluate the predicate on enough elements to find a single element for which the predicate is true.

In a pipeline like:

int sum = shapes.stream()
                .filter(s -> s.getColor() == BLUE)
                .map(s -> s.getWeight())
                .sum();

the filter and map operations are lazy. This means that we don't start drawing elements from the source until we start the sum step, minimizing the bookkeeping costs required to manage intermediate elements. Additionally, given a pipeline like:

Optional<Shape> firstBlue = shapes.stream()
                                  .filter(s -> s.getColor() == BLUE)
                                  .findFirst();

Because the filter step is lazy, the findFirst step will only draw from upstream until it gets an element, which means we need only evaluate the predicate on elements until we find one for which the predicate is true, rather than all of them. The findFirst() method returns an Optional, since it might be the case that there were no elements matching the desired criteria. Optional provides a means to describe a value that might or might not be present.

Note that the user didn't have to ask for laziness, or even think about it very much; the right thing happened, with the library arranging for as little computation as it could.

Common functional interfaces

Lambda expressions in Java are converted into instances of one-method interfaces (functional interfaces). The package java.util.function contains a "starter set" of functional interfaces:

Additionally, we plan to provide specialized primitive versions of these core interfaces. Rather than provide the full complement of primitive specializations, we intend to provide versions for Integer, Long, and Double, and the other primitive types can be accomodated through conversions.) Similarly, we plan to provide specialized versions for different arities; for example, BiFunction<T,U,V> represents a function from (T,U) to V.

Non-interference assumptions

Because the stream source might be a mutable collection, there is the possibility for interference if the source is modified while it is being traversed. The stream operations are intended to be used while the underlying source is held constant for the duration of the operation. (This condition is generally easy to maintain; if the collection is confined to the current thread, simply ensure that the lambda expressions passed to filter, map, etc., do not mutate the underlying collection. This condition is not substantially different from the restrictions on iterating Collections today; if a Collection is modified while being iterated, most implementations throw ConcurrentModificationException.)

Examples

Below is an example from the JDK class Class (the getEnclosingMethod method), which loops over all declared methods, matching method name, return type, and number and type of parameters. Here is the original code:

 for (Method m : enclosingInfo.getEnclosingClass().getDeclaredMethods()) {
     if (m.getName().equals(enclosingInfo.getName()) ) {
         Class<?>[] candidateParamClasses = m.getParameterTypes();
         if (candidateParamClasses.length == parameterClasses.length) {
             boolean matches = true;
             for(int i = 0; i < candidateParamClasses.length; i++) {
                 if (!candidateParamClasses[i].equals(parameterClasses[i])) {
                     matches = false;
                     break;
                 }
             }

             if (matches) { // finally, check return type
                 if (m.getReturnType().equals(returnType) )
                     return m;
             }
         }
     }
 }

 throw new InternalError("Enclosing method not found");

Using filter and getFirst, we can eliminate all the temporary variables and move the control logic into the library. We fetch the list of methods via reflection, turn it into a Stream with Arrays.stream, and then use a series of filters to reject the ones that don't match name, parameter types, or return type. The result of findFirst is an Optional<Method>, and we then either fetch and return the resulting method or throw an exception.

return Arrays.stream(enclosingInfo.getEnclosingClass().getDeclaredMethods())
             .filter(m -> Objects.equals(m.getName(), enclosingInfo.getName())
             .filter(m ->  Arrays.equals(m.getParameterTypes(), parameterClasses))
             .filter(m -> Objects.equals(m.getReturnType(), returnType))
             .findFirst()
             .getOrThrow(() -> new InternalError("Enclosing method not found");

This version of the code is more compact, more readable, and less error-prone.

Stream operations are very effective for ad-hoc queries over collections. Consider a hypothetical "music library" application, where a library has a list of albums, an album has a title and a list of tracks, and a track has a name, artist, and rating.

Consider the query "find me the names of albums that have at least one track rated four or higher, sorted by name." To construct this set, we might write:

List<Album> favs = new ArrayList<>();
for (Album a : albums) {
    boolean hasFavorite = false;
    for (Track t : a.tracks) {
        if (t.rating >= 4) {
            hasFavorite = true;
            break;
        }
    }
    if (hasFavorite)
        favs.add(a);
}
Collections.sort(favs, new Comparator<Album>() {
                           public int compare(Album a1, Album a2) {
                               return a1.name.compareTo(a2.name);
                           }});

We can use the stream operations to simplify each of the three major steps -- identification of whether any track in an album has a rating of at least for (anyMatch), the sorting, and the collection of albums matching our criteria into a List:

List<Album> sortedFavs =
  albums.stream()
        .filter(a -> a.tracks.anyMatch(t -> (t.rating >= 4)))
        .sorted(comparing(a -> a.name))
        .into(new ArrayList<>());

Infinite streams

Unlike a Collection, there is nothing about a stream that requires it to be of finite size. While certain operations on an infinite stream (such as forEach) would never terminate under normal conditions, there are many operations that can deal perfectly well with infinite streams (e.g., limit will truncate a stream; findFirst or findAny will terminate as soon as they find a match, etc.) Similarly, an infinite stream can be turned into an Iterator and iterated directly.

Nulls

Some collections permit nulls; some do not. There were three choices we could have made about the role of nulls in streams:

We have chosen the "null agnostic" strategy for the Streams classes. The "error" option would have outlawed some useful pipelines that could perfectly well handle nulls; the "ignore" option would have undermined some useful invariants and optimizations (such as, for size-preserving operations like map, the stream input and output are the same size).

The "null agnostic" strategy leaves maximal flexibility (at the cost of more complicated reasoning) to the user. If all the operations (and the lambdas they are passed) can handle nulls, everything is fine. If you prefer the "error" strategy for a given pipeline, you can add a filter(e -> { (if e == null) throw new NPE(); } stage; if you prefer the "ignore" strategy, you can add a filter(e -> e != null) stage.

Mutative collection operations

Many use cases for bulk operations on collections produce a new value, collection, or side-effect. However, sometimes we do want to mutate the collection in-place. The primary in-place mutations on Collection and friends that we intend to add are:

These will be added as default methods on the appropriate interface.

Parallelism

While the use of internal iteration makes it possible that operations be done in parallel, we do not wish to inflict any sort of "transparent parallelism" on the user. Instead, users should be able to select parallelism in an explicit but unobtrusive manner. We accomplish this by allowing clients to explicitly ask for a "parallel view" of the collection, whose operations execute in parallel; this is exposed on Collection via the parallel() method. If we wanted to calculate our "sum of the weights of blue blocks" query in parallel, we need only add a call to parallel():

int sum = shapes.parallel()
                .filter(s -> s.getColor() == BLUE)
                .map(s -> s.getWeight())
                .sum();

This looks very similar to the serial version, but is clearly identified as parallel without the parallel machinery overwhelming the code.

Because of the non-interference requirements describe above, you can execute parallel operations even on non-thread-safe sources such as ArrayList.

Parallelism under the hood

With the Fork/Join framework added in Java SE 7, we have efficient machinery for implementing parallel operations. However, one of the goals of this effort is to reduce the gap between the serial and parallel versions of the same computation, and currently parallelizing a computation with Fork/Join looks very different from (and much bigger than) the serial code -- a barrier to parallelization. By exposing separate parallel and serial stream views of collections, users can explicitly but unobtrusively choose between serial and parallel execution, and we can close this gap substantially.

The steps involved in implementing parallel computations with Fork/Join are: dividing a problem into subproblems, solving the subproblems sequentially, and combining the results of subproblems. The Fork/Join machinery is designed to automate this process.

We model the structural requirements of Fork/Join with an abstraction for decomposition. In various drafts, this was called Splittable, Spliterator, or StreamAccessor, but the basic concept is that it is an extension of iteration that add the ability to ask the data structure to subdivide itself into recursively decomposible chunks. Just as an Iterator lets you carve off a single element and leave the rest in the Iterator, a Spliterator lets you carve off a chunk of the data, described by a new Spliterator (which can then be further decomposed) and leave the rest in the original Spliterator. Once a data structure provides a means of decomposition, the library can provide all the parallel stream operations. Decomposition for for common data structures like array-based lists, binary trees, and maps is straightforward.

The basic form of Spliterator is shown below:

public interface Spliterator<T> {
    /** Carve off a portion of the data into a separate Spliterator */
    Spliterator<T> split();

    /** Iterate the data described by this Spliterator */
    Iterator<T> iterator();

    /** The size of the data described by this Spliterator, if known */
    int getSizeIfKnown();
}

This approach separates the structural properties of recursive decomposition from the algorithms that can be executed in parallel on decomposible data structures. The author of a data structure need only provide the decomposition logic, and then immediately gets the benefit of the parallel implementations of filter, map, and friends. Similarly, new stream operations can be parallelized in terms of decomposition and immediately available on any data structure that knows how to decompose itself. (Most users won't ever have to implement Spliterator.)

Primitive streams

We would like to have an efficient way of expressing idioms like:

List<String> strings = ...
int sumOfLengths = strings.stream()
                          .map(String::length)
                          .reduce(0, Integer::plus);

Without any special consideration for primitives, this will cause trouble as the result of the map operation will be a Stream<Integer> and therefore each string length will be boxed and unboxed (since it starts out as an int) at the boundary between the map and reduce operations.

Ideally we would not have to distort the API to work around this performance challenge, but VM-based techniques for rendering this problem irrelevant (e.g., better box elimination, tagged fixnums, etc) are unlikely to arrive with Java SE 8. That leaves two viable library-based strategies for dealing with this problem:

While the fused-operation approach is less intrusive, it is not as complete a solution as the IntStream approach is. With the IntStream approach, we can have specialized operations tailored to what we know about primitives, such as sum(), sorted(), and specialized operations like sum can use numerical techniques to improve the quality of the result when adding floating point numbers (whereas simply reducing on Double::plus cannot.)

List<String> strings = ...
int sumOfLengths = strings.stream()
                          .map(String::length)
                          .sum()

This approach entails overloading map(Function<T,U>) with map(IntMapper<T>) (and similar for other primitive types), and specialized Stream types (IntStream, DoubleStream, etc):

interface Stream<T> {
    ...
    <U> Stream<U> map(Mapper<T,U>);
    IntStream map(IntMapper<T>);
    LongStream map(LongMapper<T>);
    DoubleStream map(DoubleMapper<T>);
}

We will likely limit specialization to int, long, and double; the other primitive types can be handled by these. We do not intend to provide specialized implementations of List and other collections; instead we will provide stream generators such as range(0, 100).

From an implementation perspective, this approach is straightforward; there is lots of almost-duplicated code, but this is par for the course (see implementation of Arrays for a dramatic illustration.)

Streams in the JDK

Having exposed Stream as a top-level abstraction, we want to ensure that the features of Stream are available as widely throughout the JDK as possible. Much of this can be accomplished simply be providing a means to convert a Collection, Iterable, Iterator, Enumeration, or array into a Stream. Adding a stream() methods to the above classes, as well as Arrays.stream(array), will accomplish much of the needed "streamification" of the JDK, since many APIs already return aggregates in one of these forms.

Additionally, we can add a set of new Stream-bearing methods, such as String.chars() or BufferedReader.lines(), where the above techniques do not suffice.

Finally, we provide a set of APIs for constructing streams, to be used by library writers who wish to expose stream functionality on non-standard aggregates. The minimial information needed to create a Stream is an Iterator, but if the creator has additional metadata (such as knowing the size), the library can provide a more efficient implementation, and if the creator can provide a Spliterator then parallel streams can be created as well.