1 /*
   2  * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Objects;
  28 import java.util.function.BooleanSupplier;
  29 import java.util.function.Consumer;
  30 import java.util.function.DoubleConsumer;
  31 import java.util.function.IntConsumer;
  32 import java.util.function.LongConsumer;
  33 
  34 /**
  35  * A {@code TerminalOp} that evaluates a stream pipeline and sends the output into a {@code TerminalSink}, attempting
  36  * to stop early if some external termination condition is reached.
  37  *
  38  * @apiNote
  39  * The termination condition is an externally-imposed criteria, and is useful for problems like "find the best
  40  * answer that can be found in ten seconds", "search until you find an answer at least as good as X", etc.  It is not
  41  * designed to provide content-based cancellation, such as "process elements until you find one which matches
  42  * a given criteria."
  43  *
  44  * <p>There is no guarantee that additional elements will not be processed after the termination criteria has
  45  * transpired.  For example, a termination criteria of {@code resultSet.size() > TARGET} does not guarantee that the
  46  * result set will receive no more than {@code TARGET} elements, but instead that @{code ForEachUntilOp} will attempt
  47  * to stop after {@code TARGET} elements have been placed in the result set.
  48  *
  49  * @param <T> The output type of the stream pipeline
  50  * @since 1.8
  51  */
  52 class ForEachUntilOp<T> extends ForEachOp<T> implements TerminalOp<T, Void> {
  53     public ForEachUntilOp(TerminalSink<T, Void> sink, StreamShape shape) {
  54         super(sink, shape);
  55     }
  56 
  57     /**
  58      * Construct a {@code ForEachUntilOp} that reads from a {@code Stream} and sends the stream output to
  59      * the provided {@code Consumer}, until the specified {@code BooleanProvider} indicates that no more input
  60      * should be sent
  61      * @param consumer The {@code Consumer} to send stream output to
  62      * @param until A {@code BooleanSupplier} that indicates whether the termination criteria has occurred.  Once
  63      *              it returns @{code true} the first time, it must continue to return {@code true} for all future
  64      *              invocations
  65      * @param <T> The type of the stream elements
  66      */
  67     public static <T> ForEachUntilOp<T> make(final Consumer<? super T> consumer, BooleanSupplier until) {
  68         Objects.requireNonNull(consumer);
  69         return new ForEachUntilOp<>(new VoidTerminalSink<T>() {
  70             @Override
  71             public void accept(T t) {
  72                 consumer.accept(t);
  73             }
  74 
  75             @Override
  76             public boolean cancellationRequested() {
  77                 return until.getAsBoolean();
  78             }
  79         }, StreamShape.REFERENCE);
  80     }
  81 
  82     /**
  83      * Construct a {@code ForEachUntilOp} that reads from an {@code IntStream} and sends the stream output to
  84      * the provided {@code Consumer}, until the specified {@code BooleanProvider} indicates that no more input
  85      * should be sent
  86      * @param consumer The {@code Consumer} to send stream output to
  87      * @param until A {@code BooleanSupplier} that indicates whether the termination criteria has occurred.  Once
  88      *              it returns @{code true} the first time, it must continue to return {@code true} for all future
  89      *              invocations
  90      */
  91     public static ForEachUntilOp<Integer> make(final IntConsumer consumer, BooleanSupplier until) {
  92         Objects.requireNonNull(consumer);
  93         return new ForEachUntilOp<>(new VoidTerminalSink.OfInt() {
  94             @Override
  95             public void accept(int i) {
  96                 consumer.accept(i);
  97             }
  98 
  99             @Override
 100             public boolean cancellationRequested() {
 101                 return until.getAsBoolean();
 102             }
 103         }, StreamShape.INT_VALUE);
 104     }
 105 
 106     /**
 107      * Construct a {@code ForEachUntilOp} that reads from a {@code LongStream} and sends the stream output to
 108      * the provided {@code Consumer}, until the specified {@code BooleanProvider} indicates that no more input
 109      * should be sent
 110      * @param consumer The {@code Consumer} to send stream output to
 111      * @param until A {@code BooleanSupplier} that indicates whether the termination criteria has occurred.  Once
 112      *              it returns @{code true} the first time, it must continue to return {@code true} for all future
 113      *              invocations
 114      */
 115     public static ForEachUntilOp<Long> make(final LongConsumer consumer, BooleanSupplier until) {
 116         Objects.requireNonNull(consumer);
 117         return new ForEachUntilOp<>(new VoidTerminalSink.OfLong() {
 118             @Override
 119             public void accept(long i) {
 120                 consumer.accept(i);
 121             }
 122 
 123             @Override
 124             public boolean cancellationRequested() {
 125                 return until.getAsBoolean();
 126             }
 127         }, StreamShape.LONG_VALUE);
 128     }
 129 
 130     /**
 131      * Construct a {@code ForEachUntilOp} that reads from a {@code DoubleStream} and sends the stream output to
 132      * the provided {@code Consumer}, until the specified {@code BooleanProvider} indicates that no more input
 133      * should be sent
 134      * @param consumer The {@code Consumer} to send stream output to
 135      * @param until A {@code BooleanSupplier} that indicates whether the termination criteria has occurred.  Once
 136      *              it returns @{code true} the first time, it must continue to return {@code true} for all future
 137      *              invocations
 138      */
 139     public static ForEachUntilOp<Double> make(final DoubleConsumer consumer, BooleanSupplier until) {
 140         Objects.requireNonNull(consumer);
 141         return new ForEachUntilOp<>(new VoidTerminalSink.OfDouble() {
 142             @Override
 143             public void accept(double i) {
 144                 consumer.accept(i);
 145             }
 146 
 147             @Override
 148             public boolean cancellationRequested() {
 149                 return until.getAsBoolean();
 150             }
 151         }, StreamShape.DOUBLE_VALUE);
 152     }
 153 
 154     @Override
 155     public int getOpFlags() {
 156         return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED;
 157     }
 158 }