--- /dev/null 2013-03-09 17:25:01.184291984 -0500
+++ new/src/share/classes/java/util/stream/ForEachOps.java 2013-03-11 17:44:00.000000000 -0400
@@ -0,0 +1,469 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Spliterator;
+import java.util.concurrent.CountedCompleter;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.DoubleConsumer;
+import java.util.function.IntConsumer;
+import java.util.function.LongConsumer;
+import java.util.Objects;
+
+/**
+ * A factory for creating instances of {@code TerminalOp) that implement
+ * {@code forEach} or {@code forEachUntil} traversal over elements of a stream.
+ *
+ *
{@code forEach} traverses all elements of a stream and sends those
+ * elements to a {@code Consumer}.
+ *
+ *
{@code forEachUntil} traverses elements of a stream and sends those
+ * elements to to a {@code Consumer} until a {@code BooleanProvider} indicates
+ * that a termination criteria has occurred and no more elements should be
+ * traversed and sent.
+ *
+ *
For either type of traversal elements will be sent to the {@code Consumer}
+ * in whatever thread and whatever order they become available, independent of
+ * the stream's encounter order.
+ *
+ *
Exceptions occurring as a result of sending an element to the
+ * {@code Consumer} will be relayed to the caller and traversal will be
+ * prematurely terminated.
+ *
+ * @apiNote
+ * The termination condition is an externally-imposed criteria, and is useful
+ * for problems like "find the best answer that can be found in ten seconds",
+ * "search until you find an answer at least as good as X" , etc. It is not
+ * designed to provide content-based cancellation, such as "process elements
+ * until you find one which matches a given criteria."
+ *
+ *
There is no guarantee that additional elements will not be traversed and
+ * sent after the termination criteria has transpired. For example, a
+ * termination criteria of {@code resultSet.size() > TARGET} does not guarantee
+ * that the result set will receive no more than {@code TARGET} elements, but
+ * instead that {@code forEachUntil} traversal will attempt to stop after
+ * {@code TARGET} elements have been placed in the {@code resultSet}.
+ *
+ * @since 1.8
+ */
+final class ForEachOps {
+
+ private ForEachOps() { }
+
+ /**
+ * Constructs a {@code TerminalOp} that implements {@code forEach}
+ * traversal, which traverses all elements of a {@code Stream} and sends
+ * those elements the provided {@code Consumer}.
+ *
+ * @param consumer The {@code Consumer} that receives all elements of a
+ * stream
+ * @param The type of the stream elements
+ * @return the {@code TerminalOp} instance
+ */
+ public static TerminalOp makeRef(Consumer super T> consumer) {
+ Objects.requireNonNull(consumer);
+ return new ForEachOp.OfRef<>(consumer);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} that implements {@code forEachUntil}
+ * traversal, which traverses all elements of a {@code Stream} and sends
+ * those elements to the provided {@code Consumer} until the specified
+ * {@code BooleanProvider} indicates that a termination criteria has
+ * occurred and no more elements should be traversed and sent.
+ *
+ * @param consumer The {@code Consumer} that receives elements of a stream
+ * @param until A {@code BooleanSupplier} that indicates whether the
+ * termination criteria has occurred. Once it returns {@code true}
+ * the first time, it must continue to return {@code true} for all
+ * future invocations
+ * @param The type of the stream elements
+ * @return the {@code TerminalOp} instance
+ */
+ public static TerminalOp makeRef(Consumer super T> consumer, BooleanSupplier until) {
+ Objects.requireNonNull(consumer);
+ Objects.requireNonNull(until);
+ return new ForEachOp.OfRef.Until<>(consumer, until);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} that implements {@code forEach}
+ * traversal, which traverses all {@code int} elements of a
+ * {@code IntStream} and sends those elements the provided
+ * {@code IntConsumer}.
+ *
+ * @param consumer The {@code IntConsumer} that receives all elements of a
+ * stream
+ * @return the {@code TerminalOp} instance
+ */
+ public static TerminalOp makeInt(IntConsumer consumer) {
+ Objects.requireNonNull(consumer);
+ return new ForEachOp.OfInt(consumer);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} that implements {@code forEachUntil}
+ * traversal, which traverses all {@code int} elements of a
+ * {@code IntStream} and sends those elements to the provided
+ * {@code IntConsumer} until the specified {@code BooleanProvider} indicates
+ * that a termination criteria has occurred and no more elements should be
+ * traversed and sent.
+ *
+ * @param consumer The {@code IntConsumer} that receives elements of a
+ * stream
+ * @param until A {@code BooleanSupplier} that indicates whether the
+ * termination criteria has occurred. Once it returns {@code true}
+ * the first time, it must continue to return {@code true} for all
+ * future invocations
+ * @return the {@code TerminalOp} instance
+ */
+ public static TerminalOp makeInt(IntConsumer consumer, BooleanSupplier until) {
+ Objects.requireNonNull(consumer);
+ Objects.requireNonNull(until);
+ return new ForEachOp.OfInt.Until(consumer, until);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} that implements {@code forEach}
+ * traversal, which traverses all {@code long} elements of a
+ * {@code LongStream} and sends those elements the provided
+ * {@code LongConsumer}.
+ *
+ * @param consumer The {@code LongConsumer} that receives all elements of a
+ * stream
+ * @return the {@code TerminalOp} instance
+ */
+ public static TerminalOp makeLong(LongConsumer consumer) {
+ Objects.requireNonNull(consumer);
+ return new ForEachOp.OfLong(consumer);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} that implements {@code forEachUntil}
+ * traversal, which traverses all {@code long} elements of a
+ * {@code LongStream} and sends those elements to the provided
+ * {@code LongConsumer} until the specified {@code BooleanProvider}
+ * indicates that a termination criteria has occurred and no more elements
+ * should be traversed and sent.
+ *
+ * @param consumer The {@code LongConsumer} that receives elements of a
+ * stream
+ * @param until A {@code BooleanSupplier} that indicates whether the
+ * termination criteria has occurred. Once it returns {@code true}
+ * the first time, it must continue to return {@code true} for all
+ * future invocations
+ * @return the {@code TerminalOp} instance
+ */
+ public static TerminalOp makeLong(LongConsumer consumer, BooleanSupplier until) {
+ Objects.requireNonNull(consumer);
+ Objects.requireNonNull(until);
+ return new ForEachOp.OfLong.Until(consumer, until);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} that implements {@code forEach}
+ * traversal, which traverses all {@code double} elements of a
+ * {@code DoubleStream} and sends those elements the provided
+ * {@code DoubleConsumer}.
+ *
+ * @param consumer The {@code DoubleConsumer} that receives all elements of
+ * a stream
+ * @return the {@code TerminalOp} instance
+ */
+ public static TerminalOp makeDouble(DoubleConsumer consumer) {
+ Objects.requireNonNull(consumer);
+ return new ForEachOp.OfDouble(consumer);
+ }
+
+ /**
+ * Constructs a {@code TerminalOp} that implements {@code forEachUntil}
+ * traversal, which traverses all {@code double} elements of a
+ * {@code DoubleStream} and sends those elements to the provided
+ * {@code DoubleConsumer} until the specified {@code BooleanProvider}
+ * indicates that a termination criteria has occurred and no more elements
+ * should be traversed and sent.
+ *
+ * @param consumer The {@code DoubleConsumer} that receives elements of a
+ * stream
+ * @param until A {@code BooleanSupplier} that indicates whether the
+ * termination criteria has occurred. Once it returns {@code true}
+ * the first time, it must continue to return {@code true} for all
+ * future invocations
+ * @return the {@code TerminalOp} instance
+ */
+ public static TerminalOp makeDouble(DoubleConsumer consumer, BooleanSupplier until) {
+ Objects.requireNonNull(consumer);
+ Objects.requireNonNull(until);
+ return new ForEachOp.OfDouble.Until(consumer, until);
+ }
+
+ /**
+ * A {@code TerminalOp} that evaluates a stream pipeline and sends the
+ * output to itself as a {@code TerminalSink}. Elements will be sent in
+ * whatever thread and whatever order they become available, independent of
+ * the stream's encounter order.
+ *
+ * This terminal operation is stateless. For parallel evaluation each
+ * leaf instance of a {@code ForEachTask} will send elements to the same
+ * {@code TerminalSink} reference that is an instance of this class. State
+ * management, if any, is deferred to the consumer, held by the concrete
+ * sub-classes, that is the final receiver elements.
+ *
+ * @param The output type of the stream pipeline
+ */
+ private static abstract class ForEachOp implements TerminalOp, TerminalSink {
+
+ // TerminalOp
+
+ @Override
+ public int getOpFlags() {
+ return StreamOpFlag.NOT_ORDERED;
+ }
+
+ @Override
+ public Void evaluateSequential(PipelineHelper helper) {
+ return helper.into(this, helper.sourceSpliterator()).get();
+ }
+
+ @Override
+ public Void evaluateParallel(PipelineHelper helper) {
+ new ForEachTask<>(helper, helper.wrapSink(this)).invoke();
+ return null;
+ }
+
+ // TerminalSink
+
+ @Override
+ public Void get() {
+ return null;
+ }
+
+ // Implementations
+
+ /** {@code forEach} with {@code Stream} */
+ private static class OfRef extends ForEachOp {
+ final Consumer super T> consumer;
+
+ OfRef(Consumer super T> consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void accept(T t) {
+ consumer.accept(t);
+ }
+
+ /** {@code forEachUntil} with {@code Stream} */
+ static final class Until extends ForEachOp.OfRef {
+ final BooleanSupplier until;
+
+ Until(Consumer super T> consumer, BooleanSupplier until) {
+ super(consumer);
+ this.until = until;
+ }
+
+ @Override
+ public int getOpFlags() {
+ return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT;
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return until.getAsBoolean();
+ }
+ }
+ }
+
+ /** {@code forEach} with {@code IntStream} */
+ private static class OfInt extends ForEachOp implements Sink.OfInt {
+ final IntConsumer consumer;
+
+ OfInt(IntConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public StreamShape inputShape() {
+ return StreamShape.INT_VALUE;
+ }
+
+ @Override
+ public void accept(int t) {
+ consumer.accept(t);
+ }
+
+ /** {@code forEachUntil} with {@code IntStream} */
+ static final class Until extends ForEachOp.OfInt {
+ final BooleanSupplier until;
+
+ Until(IntConsumer consumer, BooleanSupplier until) {
+ super(consumer);
+ this.until = until;
+ }
+
+ @Override
+ public int getOpFlags() {
+ return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT;
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return until.getAsBoolean();
+ }
+ }
+ }
+
+ /** {@code forEach} with {@code LongStream} */
+ private static class OfLong extends ForEachOp implements Sink.OfLong {
+ final LongConsumer consumer;
+
+ OfLong(LongConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public StreamShape inputShape() {
+ return StreamShape.LONG_VALUE;
+ }
+
+ @Override
+ public void accept(long t) {
+ consumer.accept(t);
+ }
+
+ /** {@code forEachUntil} with {@code LongStream} */
+ private static final class Until extends ForEachOp.OfLong {
+ final BooleanSupplier until;
+
+ Until(LongConsumer consumer, BooleanSupplier until) {
+ super(consumer);
+ this.until = until;
+ }
+
+ @Override
+ public int getOpFlags() {
+ return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT;
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return until.getAsBoolean();
+ }
+ }
+ }
+
+ /** {@code forEach} with {@code DoubleStream} */
+ private static class OfDouble extends ForEachOp implements Sink.OfDouble {
+ final DoubleConsumer consumer;
+
+ OfDouble(DoubleConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public StreamShape inputShape() {
+ return StreamShape.DOUBLE_VALUE;
+ }
+
+ @Override
+ public void accept(double t) {
+ consumer.accept(t);
+ }
+
+ /** {@code forEachUntil} with {@code DoubleStream} */
+ private static final class Until extends ForEachOp.OfDouble {
+ final BooleanSupplier until;
+
+ Until(DoubleConsumer consumer, BooleanSupplier until) {
+ super(consumer);
+ this.until = until;
+ }
+
+ @Override
+ public int getOpFlags() {
+ return StreamOpFlag.NOT_ORDERED | StreamOpFlag.IS_SHORT_CIRCUIT;
+ }
+
+ @Override
+ public boolean cancellationRequested() {
+ return until.getAsBoolean();
+ }
+ }
+ }
+ }
+
+ /** A {@code ForkJoinTask} for performing a parallel for-each operation */
+ private static class ForEachTask extends CountedCompleter {
+ private Spliterator spliterator;
+ private final Sink sink;
+ private final PipelineHelper helper;
+ private final long targetSize;
+
+ ForEachTask(PipelineHelper helper, Sink sink) {
+ super(null);
+ this.spliterator = helper.sourceSpliterator();
+ this.sink = sink;
+ this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
+ this.helper = helper;
+ }
+
+ ForEachTask(ForEachTask parent, Spliterator spliterator) {
+ super(parent);
+ this.spliterator = spliterator;
+ this.sink = parent.sink;
+ this.targetSize = parent.targetSize;
+ this.helper = parent.helper;
+ }
+
+ public void compute() {
+ doCompute(this);
+ }
+
+ private static void doCompute(ForEachTask task) {
+ boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(task.helper.getStreamAndOpFlags());
+ while (true) {
+ if (isShortCircuit && task.sink.cancellationRequested()) {
+ task.propagateCompletion();
+ task.spliterator = null;
+ return;
+ }
+
+ Spliterator split = null;
+ if (!AbstractTask.suggestSplit(task.helper, task.spliterator, task.targetSize)
+ || (split = task.spliterator.trySplit()) == null) {
+ task.helper.intoWrapped(task.sink, task.spliterator);
+ task.propagateCompletion();
+ task.spliterator = null;
+ return;
+ }
+ else {
+ task.addToPendingCount(1);
+ new ForEachTask<>(task, split).fork();
+ }
+ }
+ }
+ }
+}