State of the Lambda: Libraries Edition

THIS DOCUMENT HAS BEEN SUPERSEDED BY A LATER VERSION AND IS PROVIDED FOR HISTORICAL CONTEXT ONLY

April 2012

This is an informal overview of the major proposed library enhancements to take advantage of new language features, primarily lambda expressions and extension methods, specified by JSR 335 and implemented in the OpenJDK Lambda Project.

This document describes the design approach taken in the rough prototype that has been implemented in the Lambda Project repository. It is intended as a working straw-man proposal; the final version may look different, but there is a working design and implementation that may now serve as a jumping-off point for discussions.

Background

Had lambda expressions (closures) been part of the Java language from the beginning, our Collections APIs would certainly look different than they do today. As the Java language acquires lambda expressions as part of JSR 335, this has the unfortunate side effect of making our Collections interfaces look even more out of date! While it might be tempting to start from scratch and build a replacement Collection framework ("Collections II"), replacing the Collection framework would be a major task, as the Collections interfaces permeate the JDK libraries. Instead, we will pursue an evolutionary strategy of adding extension methods to existing interfaces (such as Collection, List, or Iterable), or perhaps to new interfaces (such as 'Stream') that are retrofitted onto existing classes, enabling many of the desired idioms without making people trade in their trusty ArrayLists and HashMaps. (This is not to say that Java will never have a new Collections framework; clearly there are limitations with the existing Collections framework beyond simply not being designed for lambdas. Creating a new-and-improved collections framework is a fine candidate for consideration in a future version of the JDK.)

Parallelism is an important driver for this work. Therefore, it is important to encourage idioms that are both sequential- and parallel-friendly. We achieve this by primarily focusing less on in-place mutation and more on computations that produce new values. It is also important to strike the balance between making parallelism easier but not going so far as to make it invisible; our goal is explicit but unobstrusive parallelism for both old and new collections.

For a description of the language features being specified by JSR 335, see the State of the Lambda.

Internal vs external iteration

The Collections framework relies on the concept of external iteration, where a Collection provides a way for its client to enumerate its elements (Collection extends Iterable), and clients use this to step sequentially through the elements of a collection. For example, if we wanted to set the color of every block in a collection of blocks to red, we would write:

for (Block b : blocks) {
    b.setColor(RED);
}

This example illustrates external iteration; the for-each loop calls the iterator() method of blocks, and steps through the collection one by one. External iteration is straightforward enough, but it has several problems:

Sometimes the tight specification of the for-each loop (sequential, in-order) is desirable, but sometimes this is an impediment to performance.

The alternative to external iteration is internal iteration, where instead of controlling the iteration, the client delegates that to the library and passes in snippets of code to execute at various points in the computation.

The internal-iteration equivalent of the previous example is:

blocks.forEach(b -> { b.setColor(RED); });

This approach moves the control flow management from the client code to the library code, allowing the libraries not only to abstract over common control flow operations, but also enabling them to potentially use laziness, parallelism, and out-of-order execution to improve performance. (Whether the implementation of forEach actually does any of these things is a matter for the library implementor forEach to decide, but with internal iteration they are at least possible, whereas with external iteration, they are not.)

Internal iteration lends itself to a programming style where operations can be "pipelined" together. For example, if we wanted to color only the blue blocks red, we could say:

blocks.filter(b -> b.getColor() == BLUE)
      .forEach(b -> { b.setColor(RED); });

The filter operation produces a stream of values matching the provided condition, and the result of the filter operation is piped into forEach.

If we wanted to collect the blue blocks into a new List, we could say:

List<Block> blue = blocks.filter(b -> b.getColor() == BLUE)
                         .into(new ArrayList<>());

If each block were contained in a Box, and we wanted to know which boxes contained at least one blue block, we could say:

Set<Box> hasBlueBlock = blocks.filter(b -> b.getColor() == BLUE)
                              .map(b -> b.getContainingBox())
                              .into(new HashSet<>());

If we wanted to add up the total weight of the blue blocks, we could express that as:

int sum = blocks.filter(b -> b.getColor() == BLUE)
                .map(b -> b.getWeight())
                .sum();

So far, we haven't yet written down the signatures of these operations -- these are shown later. The examples here simply illustrate the types of problems that are easily solved with internal iteration, and illustrate functionality we want to expose on collections.

The role of laziness

Operations like filtering or mapping, can be performed "eagerly" (where the filtering is complete on the return from the filter method), or "lazily" (where the filtering is only done when you start iterating the elements of the result of the filter method.) Stream operations that produce new streams, such as filtering or mapping, lend themselves to lazy implementation, which often can result in significant performance improvements. We can think of these operations as "naturally lazy", whether or not they are implemented as such. On the other hand, operations like accumulation, or those that produce side effects such as dumping the results into a collection or doing something for each element (such as printing them out), are "naturally eager."

Based on examination of many existing loops, a significant proportion can be restated (often getting significantly smaller in the process) as bulk operations drawing from a data source (array or collection), doing a series of lazy operations (filtering, mapping, etc), and then doing a single eager operation -- such as filter-map-accumulate, filter-map-sort-foreach, etc. Accordingly, most of the naturally lazy operations tend to be used to compute temporary intermediate results, and we can exploit this property to produce more efficient libraries. (For example, a library that does filtering or mapping lazily can fuse pipelines like filter-map-accumulate into a single pass on the data, rather than three distinct passes; a library that does them eagerly cannot. Similarly, if we are looking for the first element that matches a certain characteristic, a lazy approach lets us get to the answer having examined fewer elements.)

This observation informs a critical design choice: what should the return value of filter and map be? One candidate would be that List.filter returns a new List, which would push us towards an all-eager approach. This is straightforward, but may well end up doing way more work than we really need. Another approach would be to create a whole new set of abstractions for explicit laziness -- LazyList, LazySet, etc. (But note that lazy collections still have operations that trigger eager computation -- such as size.) And, this approach has the risk to devolve into a combinatorial explosion of types like MutableSynchronizedLazySortedSet, etc.

Our preferred approach is to treat the naturally-lazy operations as returning a stream (such as Iterable) rather than a new collection (which might just get thrown away by the next pipeline stage anyway). Applying this to the examples above, filter draws from a source (which might be another stream) and produces a stream of values matching the provided Predicate. In most cases where potentially-lazy operations are being applied to aggregates, this turns out to be exactly what we want -- a stream of values that can be passed to the next stage in the pipeline. For the time being, Iterable will be our abstraction for streams, but this is an explicitly temporary choice that we will revisit soon, perhaps creating a Stream abstraction that doesn't have the issues that Iterable does (inherent check-then-act behavior; assumption of mutability of underlying source; lives in java.lang.)

The stream approach has the advantage that, when used in a source-lazy-lazy-eager pipeline, the laziness is mostly invisible, as the pipeline is "sealed" at both ends, but yields both good usability and performance without dramatically increasing the conceptual surface area of the library.

Streams

A strawman set of stream operations is shown below. These methods are inherently sequential, processing elements in the order returned by upstream iterators (encounter order). In the current implementation, we've used Iterable as our host for these methods. The methods that return a new Iterable are lazy; those that do not are eager. All of these operations can be implemented by default methods solely in terms of iterator(), so no additional work is required for existing Collection implementations to acquire the new functionality. Note also that the Stream functionality is only tangentially tied to Collections; if an alternate collection framework wanted to acquire these methods, all they would have to do is implement Iterable.

Streams differ from Collections in several ways:

The following shows a basic set of stream operations, expressed as extension methods on Iterable.

public interface Iterable<T> {
    // Abstract methods
    Iterator<T> iterator();

    // Lazy operations
    Iterable<T> filter(Predicate<? super T> predicate) default ...

    <U> Iterable<U> map(Mapper<? super T, ? extends U> mapper) default ...

    <U> Iterable<U> flatMap(Mapper<? super T, ? extends Iterable<U>> mapper) default ...

    Iterable<T> cumulate(BinaryOperator<T> op) default ...

    Iterable<T> sorted(Comparator<? super T> comparator) default ...

    <U extends Comparable<? super U>> Iterable<T> sortedBy(Mapper<? super T, U> extractor) default ...

    Iterable<T> uniqueElements() default ...

    <U> Iterable<U> pipeline(Mapper<Iterable<T>, ? extends Iterable<U>> mapper) default ...

    <U> BiStream<T, U> mapped(Mapper<? super T, ? extends U> mapper) default ...
    <U> BiStream<U, Iterable<T>> groupBy(Mapper<? super T, ? extends U> mapper) default ...
    <U> BiStream<U, Iterable<T>> groupByMulti(Mapper<? super T, ? extends Iterable<U>> mapper) default ...

    // Eager operations

    boolean isEmpty() default ...;
    long count() default ...

    T getFirst() default ...
    T getOnly() default ...
    T getAny() default ...

    void forEach(Block<? super T> block) default ...

    T reduce(T base, BinaryOperator<T> reducer) default ...

    <A extends Fillable<? super T>> A into(A target) default ...

    boolean anyMatch(Predicate<? super T> filter) default ...
    boolean noneMatch(Predicate<? super T> filter) default ...
    boolean allMatch(Predicate<? super T> filter) default ...

    <U extends Comparable<? super U>> T maxBy(Mapper<? super T, U> extractor) default ...
    <U extends Comparable<? super U>> T minBy(Mapper<? super T, U> extractor) default ...
}

Laziness and short-circuiting

Methods like anyMatch, while eager, can use short-circuiting to stop processing once they can determine the final result -- it need only evaluate the predicate on enough elements to find a single element for which the predicate is true.

In a pipeline like:

int sum = blocks.filter(b -> b.getColor() == BLUE)
                .map(b -> b.getWeight())
                .sum();

the filter and map operations are lazy. This means that we don't start drawing elements from the source until we start the sum step, minimizing the bookkeeping costs required to manage intermediate elements. Additionally, given a pipeline like:

Block firstBlue = blocks.filter(b -> b.getColor() == BLUE)
                        .getFirst();

Because the filter step is lazy, the getFirst step will only draw on the upstream Iterator until it gets an element, which means we need only evaluate the predicate on elements until we find one for which the predicate is true, rather than all of them.

Note that the user didn't have to ask for laziness, or even think about it very much; the right thing happened, with the library arranging for as little computation as it could.

The user could invoke this pipeline as:

Iterable<Block> it = blocks.filter(b -> b.getColor() == BLUE);

and obtain an Iterator from that, though we have tried to design the feature set to not require this usage. In this case, this operation would merely create an Iterable, but do no work other than to retain a reference to the upstream Iterable (blocks) and the Predicate on which we are filtering. All the work is done later, when an Iterator is obtained from this Iterable.

Common functional interfaces

Lambda expressions in Java are converted into instances of one-method interfaces (functional interfaces). The package java.util.functions contains a "starter set" of functional interfaces:

It may be desirable, for performance reasons, to provide specialized primitive versions of these core interfaces. (In this case, the full complement of primitive specializations is probably not needed; if we provide versions for Integer, Long, and Double, the other primitive types can be accomodated through conversions.) The details of primitive specialization are not determined at this time.

Non-interference assumptions

Because Iterable may describe a mutable collection, there is the possibility for interference if the collection is modified while it is being traversed. The new operations on Iterable are intended to be used while the underlying source is held constant for the duration of the operation. (This condition is generally easy to maintain; if the collection is confined to the current thread, simply ensure that the lambda expressions passed to filter, map, etc., do not mutate the underlying collection. This condition is not substantially different from the restrictions on iterating Collections today; if a Collection is modified while being iterated, most implementations throw ConcurrentModificationException.) In the example above where we create an Iterable by filtering a collection, the elements encountered when traversing the filtered Iterable are based on those returned by the Iterator of the underlying collection. Accordingly, repeated calls to iterator() will result in repeated traversals of the upstream Iterables; there is no caching of lazily-computed results here. (Because most pipelines will look like source-lazy-lazy-eager, most of the time the underlying collection will be traversed only once anyway.)

Examples

Below is an example from the JDK class Class (the getEnclosingMethod method), which loops over all declared methods, matching method name, return type, and number and type of parameters. Here is the original code:

 for (Method m : enclosingInfo.getEnclosingClass().getDeclaredMethods()) {
     if (m.getName().equals(enclosingInfo.getName()) ) {
         Class<?>[] candidateParamClasses = m.getParameterTypes();
         if (candidateParamClasses.length == parameterClasses.length) {
             boolean matches = true;
             for(int i = 0; i < candidateParamClasses.length; i++) {
                 if (!candidateParamClasses[i].equals(parameterClasses[i])) {
                     matches = false;
                     break;
                 }
             }

             if (matches) { // finally, check return type
                 if (m.getReturnType().equals(returnType) )
                     return m;
             }
         }
     }
 }

 throw new InternalError("Enclosing method not found");

Using filter and getFirst, we can eliminate all the temporary variables and move the control logic into the library. We fetch the list of methods from reflection, turn it into a Iterable with Arrays.asList (we could, alternately, inject a stream-like interface into array types), and then use a series of filters to reject the ones that don't match name, parameter types, or return type:

Method matching =
     Arrays.asList(enclosingInfo.getEnclosingClass().getDeclaredMethods())
        .filter(m -> Objects.equals(m.getName(), enclosingInfo.getName())
        .filter(m ->  Arrays.equals(m.getParameterTypes(), parameterClasses))
        .filter(m -> Objects.equals(m.getReturnType(), returnType))
        .getFirst();
if (matching == null)
    throw new InternalError("Enclosing method not found");
return matching;

This version of the code is both more compact and less error-prone.

Stream operations are very effective for ad-hoc queries over collections. Consider a hypothetical "music library" application, where a library has a list of albums, an album has a title and a list of tracks, and a track has a name, artist, and rating.

Consider the query "find me the names of albums that have at least one track rated four or higher, sorted by name." To construct this set, we might write:

List<Album> favs = new ArrayList<>();
for (Album a : albums) {
    boolean hasFavorite = false;
    for (Track t : a.tracks) {
        if (t.rating >= 4) {
            hasFavorite = true;
            break;
        }
    }
    if (hasFavorite)
        favs.add(a);
}
Collections.sort(favs, new Comparator<Album>() {
                           public int compare(Album a1, Album a2) {
                               return a1.name.compareTo(a2.name);
                           }});

We can use the stream operations to simplify each of the three major steps -- identification of whether any track in an album has a rating of at least for (anyMatch), the sorting, and the collection of albums matching our criteria into a List:

List<Album> sortedFavs =
  albums.filter(a -> a.tracks.anyMatch(t -> (t.rating >= 4)))
        .sortedBy(a -> a.name)
        .into(new ArrayList<>());

Nonlinear streams

The "obvious" stream shape, described above, is a simple linear stream of values, such as might be managed by an array or Collection. There are other common shapes we might want to represent, such as a stream of (key, value) pairs (possibly with the restriction that the keys be unique.)

It might be convenient to represent a bi-valued stream as a stream of Pair<X,Y> values. This would be easy and allow us to reuse the existing stream machinery, but creates a new problem: if there are new operations we might want to perform on a key-value stream (such as splitting it into a keys or values stream), erasure gets in the way -- we have no way to express methods that exist only if the type variables of the class satisfy some constraint, such as being a Pair. (It is worth nothing that this is an advantage of the static extension methods from C#, which are injected into instantiated generic types rather than classes.) Additionally, modeling a bi-valued stream as a stream of Pair objects might have significant "boxing" overhead. In general, each distinct "shape" of stream will likely require its own stream abstraction, but this is not unreasonable as each distinct shape will have its own set of operations that are sensible on that shape.

For this reason, we model bi-valued streams using a separate abstraction, which we have tentatively called BiStream. So our stream library has two basic stream shapes: linear (Iterable) and map-shaped (BiStream), just as the Collections framework has two basic shapes (Collection and Map.)

A bi-valued stream can model the result of a "zip" operation, the contents of a map, or the results of a group-by operation (where the result is a BiStream<U, Stream<V>>.) For example, consider the problem of constructing a histogram of the lengths of words in a document. If we model the document as a stream of words, we can do a "group by" operation on the stream, grouping by length, and then do a "reduce" (sum) operation on the values associated with a given key to obtain a map from word lengths to counts of words with that length:

Map<Integer, Integer>
    counts = document.words()                             // stream of strings
                     .groupBy(s -> s.length())            // bi-stream length -> stream of words with that length
                     .mapValues(stream -> stream.count()) // bi-stream length -> count of words
                     .into(new HashMap<>());              // Map length -> count

Parallelism

While the use of internal iteration makes it possible that operations be done in parallel, we do not wish to inflict any sort of "transparent parallelism" on the user. Instead, users should be able to select parallelism in an explicit but unobtrusive manner. We accomplish this by allowing clients to explicitly ask for a "parallel view" of the collection, whose operations execute in parallel; this is exposed on Collection via the parallel() method. If we wanted to calculate our "sum of the weights of blue blocks" query in parallel, we need only add a call to parallel():

int sum = blocks.parallel()
                .filter(b -> b.getColor() == BLUE)
                .map(b -> b.getWeight())
                .sum();

This looks very similar to the serial version, but is clearly identified as parallel without the parallel machinery overwhelming the code.

With the Fork/Join framework added in Java SE 7, we have efficient machinery for implementing parallel operations. However, one of the goals of this effort is to reduce the gap between the serial and parallel versions of the same computation, and currently parallelizing a computation with Fork/Join looks very different from (and much bigger than) the serial code -- a barrier to parallelization. By exposing parallel versions of the stream operations and enabling users to explicitly choose between serial and parallel execution, we can close this gap substantially.

The steps involved in implementing parallel computations with Fork/Join are: dividing a problem into subproblems, solving the subproblems sequentially, and combining the results of subproblems. The Fork/Join machinery is designed to automate this process.

We model the structural requirements of Fork/Join with an abstraction for splitting, called Splittable, which describes a sub-aggregate that can be further split into smaller pieces, or whose elements can be iterated sequentially.

public interface Splittable<T, S extends Splittable<T, S>> {
    /** Return an {@link Iterator}  for the elements of this split.   In general, this method is only called
     * at the leaves of a decomposition tree, though it can be called at any level.  */
    Iterator<T> iterator();

    /** Decompose this split into two splits, and return the left split.  If further splitting is impossible,
     * {@code left} may return a {@code Splittable} representing the entire split, or an empty split.
     */
    S left();

    /** Decompose this split into two splits, and return the right split.  If further splitting is impossible,
     * {@code right} may return a {@code Splittable} representing the entire split, or an empty split.
     */
    S right();

    /**
     * Produce an {@link Iterable} representing the contents of this {@code Splittable}.  In general, this method is
     * only called at the top of a decomposition tree, indicating that operations that produced the {@code Spliterable}
     * can happen in parallel, but the results are assembled for sequential traversal.  This is designed to support
     * patterns like:
     *     collection.filter(t -> t.matches(k))
     *               .map(t -> t.getLabel())
     *               .sorted()
     *               .sequential()
     *               .forEach(e -> println(e));
     * where the filter / map / sort operations can occur in parallel, and then the results can be traversed
     * sequentially in a predicatable order.
     */
    Iterable<T> sequential();
}

Implementing Splittable for common data structures like array-based lists, binary trees, and maps is straightforward.

We describe sequential collections with Iterable, which means a collection knows how to dispense its members sequentially. The parallel analogue of Iterable embodies the Splittable behavior, as well as aggregate operations analogous to those on Iterable. We are currently calling this ParallelIterable.

public interface ParallelIterable<T> extends Splittable<T, ParallelIterable<T>> {
    // Lazy operations
    ParallelIterable<T> filter(Predicate<? super T> predicate) default ...

    <U> ParallelIterable<U> map(Mapper<? super T, ? extends U> mapper) default ...

    <U> ParallelIterable<U> flatMap(Mapper<? super T, ? extends Iterable<U>> mapper) default ...

    ParallelIterable<T> cumulate(BinaryOperator<T> op) default ...

    ParallelIterable<T> sorted(Comparator<? super T> comparator) default ...

    <U extends Comparable<? super U>> ParallelIterable<T> sortedBy(Mapper<? super T, U> extractor) default ...

    ParallelIterable<T> uniqueElements() default ...

    // Eager operations

    boolean isEmpty() default ...;
    long count() default ...

    T getFirst() default ...
    T getOnly() default ...
    T getAny() default ...

    void forEach(Block<? super T> block) default ...

    T reduce(T base, BinaryOperator<T> reducer) default ...

    <A extends ParallelFillable<? super T>> A into(A target) default ...
    <A extends Fillable<? super T>> A into(A target) default ...

    boolean anyMatch(Predicate<? super T> filter) default ...
    boolean noneMatch(Predicate<? super T> filter) default ...
    boolean allMatch(Predicate<? super T> filter) default ...

    <U extends Comparable<? super U>> T maxBy(Mapper<? super T, U> extractor) default ...
    <U extends Comparable<? super U>> T minBy(Mapper<? super T, U> extractor) default ...
}

You will notice that the set of operations on ParallelIterable are very similar to those on Iterable, except that the lazy operations return a ParallelIterable instead of an Iterable. This means that pipelines of operations on sequential collections will also work the same way (just in parallel) on parallel collections.

The last step needed is a way to get a ParallelIterable from a (sequential) collection; this is what is returned by the new parallel() method on Collection.

interface Collection<T> {
    ....
    ParallelIterable<T> parallel();
}

What we have achieved here is a separation of the structural properties of recursive decomposition from the algorithms that can be executed in parallel on decomposible data structures. The author of a data structure need only implement the Splittable methods and can then have immediate access to the parallel implementations of filter, map, and friends. Similarly, adding a new method to ParallelIterable makes it immediately available on any data structure that knows how to split itself.

Mutative operations

Many use cases for bulk operations on collections produce a new value, collection, or side-effect. However, sometimes we do want to mutate the collection in-place. The primary in-place mutations on Collection we intend to add are:

These will be added as extension methods on the appropriate interface.

Open Issues

The design space for this problem is huge, and we've deliberately narrowed our focus thusfar to identifying the primary abstractions and operations. As such, there are a significant number of open issues in the design, including:

Status

There is currently a strawman implementation of most of the features shown so far in the OpenJDK lambda repository, including serial and parallel implementations of most of the aggregate operations.