--- /dev/null 2013-03-09 17:25:01.184291984 -0500 +++ new/src/share/classes/java/util/stream/ForEachOps.java 2013-03-11 17:44:00.000000000 -0400 @@ -0,0 +1,469 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Spliterator; +import java.util.concurrent.CountedCompleter; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.function.DoubleConsumer; +import java.util.function.IntConsumer; +import java.util.function.LongConsumer; +import java.util.Objects; + +/** + * A factory for creating instances of {@code TerminalOp) that implement + * {@code forEach} or {@code forEachUntil} traversal over elements of a stream. + * + *

{@code forEach} traverses all elements of a stream and sends those + * elements to a {@code Consumer}. + * + *

{@code forEachUntil} traverses elements of a stream and sends those + * elements to to a {@code Consumer} until a {@code BooleanProvider} indicates + * that a termination criteria has occurred and no more elements should be + * traversed and sent. + * + *

For either type of traversal elements will be sent to the {@code Consumer} + * in whatever thread and whatever order they become available, independent of + * the stream's encounter order. + * + *

Exceptions occurring as a result of sending an element to the + * {@code Consumer} will be relayed to the caller and traversal will be + * prematurely terminated. + * + * @apiNote + * The termination condition is an externally-imposed criteria, and is useful + * for problems like "find the best answer that can be found in ten seconds", + * "search until you find an answer at least as good as X" , etc. It is not + * designed to provide content-based cancellation, such as "process elements + * until you find one which matches a given criteria." + * + *

There is no guarantee that additional elements will not be traversed and + * sent after the termination criteria has transpired. For example, a + * termination criteria of {@code resultSet.size() > TARGET} does not guarantee + * that the result set will receive no more than {@code TARGET} elements, but + * instead that {@code forEachUntil} traversal will attempt to stop after + * {@code TARGET} elements have been placed in the {@code resultSet}. + * + * @since 1.8 + */ +final class ForEachOps { + + private ForEachOps() { } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEach} + * traversal, which traverses all elements of a {@code Stream} and sends + * those elements the provided {@code Consumer}. + * + * @param consumer The {@code Consumer} that receives all elements of a + * stream + * @param The type of the stream elements + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeRef(Consumer consumer) { + Objects.requireNonNull(consumer); + return new ForEachOp.OfRef<>(consumer); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEachUntil} + * traversal, which traverses all elements of a {@code Stream} and sends + * those elements to the provided {@code Consumer} until the specified + * {@code BooleanProvider} indicates that a termination criteria has + * occurred and no more elements should be traversed and sent. + * + * @param consumer The {@code Consumer} that receives elements of a stream + * @param until A {@code BooleanSupplier} that indicates whether the + * termination criteria has occurred. Once it returns {@code true} + * the first time, it must continue to return {@code true} for all + * future invocations + * @param The type of the stream elements + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeRef(Consumer consumer, BooleanSupplier until) { + Objects.requireNonNull(consumer); + Objects.requireNonNull(until); + return new ForEachOp.OfRef.Until<>(consumer, until); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEach} + * traversal, which traverses all {@code int} elements of a + * {@code IntStream} and sends those elements the provided + * {@code IntConsumer}. + * + * @param consumer The {@code IntConsumer} that receives all elements of a + * stream + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeInt(IntConsumer consumer) { + Objects.requireNonNull(consumer); + return new ForEachOp.OfInt(consumer); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEachUntil} + * traversal, which traverses all {@code int} elements of a + * {@code IntStream} and sends those elements to the provided + * {@code IntConsumer} until the specified {@code BooleanProvider} indicates + * that a termination criteria has occurred and no more elements should be + * traversed and sent. + * + * @param consumer The {@code IntConsumer} that receives elements of a + * stream + * @param until A {@code BooleanSupplier} that indicates whether the + * termination criteria has occurred. Once it returns {@code true} + * the first time, it must continue to return {@code true} for all + * future invocations + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeInt(IntConsumer consumer, BooleanSupplier until) { + Objects.requireNonNull(consumer); + Objects.requireNonNull(until); + return new ForEachOp.OfInt.Until(consumer, until); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEach} + * traversal, which traverses all {@code long} elements of a + * {@code LongStream} and sends those elements the provided + * {@code LongConsumer}. + * + * @param consumer The {@code LongConsumer} that receives all elements of a + * stream + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeLong(LongConsumer consumer) { + Objects.requireNonNull(consumer); + return new ForEachOp.OfLong(consumer); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEachUntil} + * traversal, which traverses all {@code long} elements of a + * {@code LongStream} and sends those elements to the provided + * {@code LongConsumer} until the specified {@code BooleanProvider} + * indicates that a termination criteria has occurred and no more elements + * should be traversed and sent. + * + * @param consumer The {@code LongConsumer} that receives elements of a + * stream + * @param until A {@code BooleanSupplier} that indicates whether the + * termination criteria has occurred. Once it returns {@code true} + * the first time, it must continue to return {@code true} for all + * future invocations + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeLong(LongConsumer consumer, BooleanSupplier until) { + Objects.requireNonNull(consumer); + Objects.requireNonNull(until); + return new ForEachOp.OfLong.Until(consumer, until); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEach} + * traversal, which traverses all {@code double} elements of a + * {@code DoubleStream} and sends those elements the provided + * {@code DoubleConsumer}. + * + * @param consumer The {@code DoubleConsumer} that receives all elements of + * a stream + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeDouble(DoubleConsumer consumer) { + Objects.requireNonNull(consumer); + return new ForEachOp.OfDouble(consumer); + } + + /** + * Constructs a {@code TerminalOp} that implements {@code forEachUntil} + * traversal, which traverses all {@code double} elements of a + * {@code DoubleStream} and sends those elements to the provided + * {@code DoubleConsumer} until the specified {@code BooleanProvider} + * indicates that a termination criteria has occurred and no more elements + * should be traversed and sent. + * + * @param consumer The {@code DoubleConsumer} that receives elements of a + * stream + * @param until A {@code BooleanSupplier} that indicates whether the + * termination criteria has occurred. Once it returns {@code true} + * the first time, it must continue to return {@code true} for all + * future invocations + * @return the {@code TerminalOp} instance + */ + public static TerminalOp makeDouble(DoubleConsumer consumer, BooleanSupplier until) { + Objects.requireNonNull(consumer); + Objects.requireNonNull(until); + return new ForEachOp.OfDouble.Until(consumer, until); + } + + /** + * A {@code TerminalOp} that evaluates a stream pipeline and sends the + * output to itself as a {@code TerminalSink}. Elements will be sent in + * whatever thread and whatever order they become available, independent of + * the stream's encounter order. + * + *

This terminal operation is stateless. For parallel evaluation each + * leaf instance of a {@code ForEachTask} will send elements to the same + * {@code TerminalSink} reference that is an instance of this class. State + * management, if any, is deferred to the consumer, held by the concrete + * sub-classes, that is the final receiver elements. + * + * @param The output type of the stream pipeline + */ + private static abstract class ForEachOp implements TerminalOp, TerminalSink { + + // TerminalOp + + @Override + public int getOpFlags() { + return StreamOpFlag.NOT_ORDERED; + } + + @Override + public Void evaluateSequential(PipelineHelper helper) { + return helper.into(this, helper.sourceSpliterator()).get(); + } + + @Override + public Void evaluateParallel(PipelineHelper helper) { + new ForEachTask<>(helper, helper.wrapSink(this)).invoke(); + return null; + } + + // TerminalSink + + @Override + public Void get() { + return null; + } + + // Implementations + + /** {@code forEach} with {@code Stream} */ + private static class OfRef extends ForEachOp { + final Consumer consumer; + + OfRef(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public void accept(T t) { + consumer.accept(t); + } + + /** {@code forEachUntil} with {@code Stream} */ + static final class Until extends ForEachOp.OfRef { + final BooleanSupplier until; + + Until(Consumer consumer, BooleanSupplier until) { + super(consumer); + this.until = until; + } + + @Override + public int getOpFlags() { + return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT; + } + + @Override + public boolean cancellationRequested() { + return until.getAsBoolean(); + } + } + } + + /** {@code forEach} with {@code IntStream} */ + private static class OfInt extends ForEachOp implements Sink.OfInt { + final IntConsumer consumer; + + OfInt(IntConsumer consumer) { + this.consumer = consumer; + } + + @Override + public StreamShape inputShape() { + return StreamShape.INT_VALUE; + } + + @Override + public void accept(int t) { + consumer.accept(t); + } + + /** {@code forEachUntil} with {@code IntStream} */ + static final class Until extends ForEachOp.OfInt { + final BooleanSupplier until; + + Until(IntConsumer consumer, BooleanSupplier until) { + super(consumer); + this.until = until; + } + + @Override + public int getOpFlags() { + return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT; + } + + @Override + public boolean cancellationRequested() { + return until.getAsBoolean(); + } + } + } + + /** {@code forEach} with {@code LongStream} */ + private static class OfLong extends ForEachOp implements Sink.OfLong { + final LongConsumer consumer; + + OfLong(LongConsumer consumer) { + this.consumer = consumer; + } + + @Override + public StreamShape inputShape() { + return StreamShape.LONG_VALUE; + } + + @Override + public void accept(long t) { + consumer.accept(t); + } + + /** {@code forEachUntil} with {@code LongStream} */ + private static final class Until extends ForEachOp.OfLong { + final BooleanSupplier until; + + Until(LongConsumer consumer, BooleanSupplier until) { + super(consumer); + this.until = until; + } + + @Override + public int getOpFlags() { + return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT; + } + + @Override + public boolean cancellationRequested() { + return until.getAsBoolean(); + } + } + } + + /** {@code forEach} with {@code DoubleStream} */ + private static class OfDouble extends ForEachOp implements Sink.OfDouble { + final DoubleConsumer consumer; + + OfDouble(DoubleConsumer consumer) { + this.consumer = consumer; + } + + @Override + public StreamShape inputShape() { + return StreamShape.DOUBLE_VALUE; + } + + @Override + public void accept(double t) { + consumer.accept(t); + } + + /** {@code forEachUntil} with {@code DoubleStream} */ + private static final class Until extends ForEachOp.OfDouble { + final BooleanSupplier until; + + Until(DoubleConsumer consumer, BooleanSupplier until) { + super(consumer); + this.until = until; + } + + @Override + public int getOpFlags() { + return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT; + } + + @Override + public boolean cancellationRequested() { + return until.getAsBoolean(); + } + } + } + } + + /** A {@code ForkJoinTask} for performing a parallel for-each operation */ + private static class ForEachTask extends CountedCompleter { + private Spliterator spliterator; + private final Sink sink; + private final PipelineHelper helper; + private final long targetSize; + + ForEachTask(PipelineHelper helper, Sink sink) { + super(null); + this.spliterator = helper.sourceSpliterator(); + this.sink = sink; + this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); + this.helper = helper; + } + + ForEachTask(ForEachTask parent, Spliterator spliterator) { + super(parent); + this.spliterator = spliterator; + this.sink = parent.sink; + this.targetSize = parent.targetSize; + this.helper = parent.helper; + } + + public void compute() { + doCompute(this); + } + + private static void doCompute(ForEachTask task) { + boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(task.helper.getStreamAndOpFlags()); + while (true) { + if (isShortCircuit && task.sink.cancellationRequested()) { + task.propagateCompletion(); + task.spliterator = null; + return; + } + + Spliterator split = null; + if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize) + || (split = task.spliterator.trySplit()) == null) { + task.helper.intoWrapped(task.sink, task.spliterator); + task.propagateCompletion(); + task.spliterator = null; + return; + } + else { + task.addToPendingCount(1); + new ForEachTask<>(task, split).fork(); + } + } + } + } +}