1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Optional;
  28 import java.util.OptionalDouble;
  29 import java.util.OptionalInt;
  30 import java.util.OptionalLong;
  31 import java.util.Spliterator;
  32 import java.util.concurrent.CountedCompleter;
  33 import java.util.function.Predicate;
  34 import java.util.function.Supplier;
  35 
  36 /**
  37  * Factory for instances of a short-circuiting {@code TerminalOp} that searches
  38  * for an element in a stream pipeline, and terminates when it finds one.
  39  * Supported variants include find-first (find the first element in the
  40  * encounter order) and find-any (find any element, may not be the first in
  41  * encounter order.)
  42  *
  43  * @since 1.8
  44  */
  45 final class FindOps {
  46 
  47     private FindOps() { }
  48 
  49     /**
  50      * Constructs a {@code TerminalOp} for streams of objects.
  51      *
  52      * @param <T> the type of elements of the stream
  53      * @param mustFindFirst whether the {@code TerminalOp} must produce the
  54      *        first element in the encounter order
  55      * @return a {@code TerminalOp} implementing the find operation
  56      */
  57     public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {
  58         return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(),
  59                             Optional::isPresent, FindSink.OfRef::new);
  60     }
  61 
  62     /**
  63      * Constructs a {@code TerminalOp} for streams of ints.
  64      *
  65      * @param mustFindFirst whether the {@code TerminalOp} must produce the
  66      *        first element in the encounter order
  67      * @return a {@code TerminalOp} implementing the find operation
  68      */
  69     public static TerminalOp<Integer, OptionalInt> makeInt(boolean mustFindFirst) {
  70         return new FindOp<>(mustFindFirst, StreamShape.INT_VALUE, OptionalInt.empty(),
  71                             OptionalInt::isPresent, FindSink.OfInt::new);
  72     }
  73 
  74     /**
  75      * Constructs a {@code TerminalOp} for streams of longs.
  76      *
  77      * @param mustFindFirst whether the {@code TerminalOp} must produce the
  78      *        first element in the encounter order
  79      * @return a {@code TerminalOp} implementing the find operation
  80      */
  81     public static TerminalOp<Long, OptionalLong> makeLong(boolean mustFindFirst) {
  82         return new FindOp<>(mustFindFirst, StreamShape.LONG_VALUE, OptionalLong.empty(),
  83                             OptionalLong::isPresent, FindSink.OfLong::new);
  84     }
  85 
  86     /**
  87      * Constructs a {@code FindOp} for streams of doubles.
  88      *
  89      * @param mustFindFirst whether the {@code TerminalOp} must produce the
  90      *        first element in the encounter order
  91      * @return a {@code TerminalOp} implementing the find operation
  92      */
  93     public static TerminalOp<Double, OptionalDouble> makeDouble(boolean mustFindFirst) {
  94         return new FindOp<>(mustFindFirst, StreamShape.DOUBLE_VALUE, OptionalDouble.empty(),
  95                             OptionalDouble::isPresent, FindSink.OfDouble::new);
  96     }
  97 
  98     /**
  99      * A short-circuiting {@code TerminalOp} that searches for an element in a
 100      * stream pipeline, and terminates when it finds one.  Implements both
 101      * find-first (find the first element in the encounter order) and find-any
 102      * (find any element, may not be the first in encounter order.)
 103      *
 104      * @param <T> the output type of the stream pipeline
 105      * @param <O> the result type of the find operation, typically an optional
 106      *        type
 107      */
 108     private static final class FindOp<T, O> implements TerminalOp<T, O> {
 109         private final StreamShape shape;
 110         final boolean mustFindFirst;
 111         final O emptyValue;
 112         final Predicate<O> presentPredicate;
 113         final Supplier<TerminalSink<T, O>> sinkSupplier;
 114 
 115         /**
 116          * Constructs a {@code FindOp}.
 117          *
 118          * @param mustFindFirst if true, must find the first element in
 119          *        encounter order, otherwise can find any element
 120          * @param shape stream shape of elements to search
 121          * @param emptyValue result value corresponding to "found nothing"
 122          * @param presentPredicate {@code Predicate} on result value
 123          *        corresponding to "found something"
 124          * @param sinkSupplier supplier for a {@code TerminalSink} implementing
 125          *        the matching functionality
 126          */
 127         FindOp(boolean mustFindFirst,
 128                        StreamShape shape,
 129                        O emptyValue,
 130                        Predicate<O> presentPredicate,
 131                        Supplier<TerminalSink<T, O>> sinkSupplier) {
 132             this.mustFindFirst = mustFindFirst;
 133             this.shape = shape;
 134             this.emptyValue = emptyValue;
 135             this.presentPredicate = presentPredicate;
 136             this.sinkSupplier = sinkSupplier;
 137         }
 138 
 139         @Override
 140         public int getOpFlags() {
 141             return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
 142         }
 143 
 144         @Override
 145         public StreamShape inputShape() {
 146             return shape;
 147         }
 148 
 149         @Override
 150         public <S> O evaluateSequential(PipelineHelper<T> helper,
 151                                         Spliterator<S> spliterator) {
 152             O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
 153             return result != null ? result : emptyValue;
 154         }
 155 
 156         @Override
 157         public <P_IN> O evaluateParallel(PipelineHelper<T> helper,
 158                                          Spliterator<P_IN> spliterator) {
 159             return new FindTask<>(this, helper, spliterator).invoke();
 160         }
 161     }
 162 
 163     /**
 164      * Implementation of @{code TerminalSink} that implements the find
 165      * functionality, requesting cancellation when something has been found
 166      *
 167      * @param <T> The type of input element
 168      * @param <O> The result type, typically an optional type
 169      */
 170     private abstract static class FindSink<T, O> implements TerminalSink<T, O> {
 171         boolean hasValue;
 172         T value;
 173 
 174         FindSink() {} // Avoid creation of special accessor
 175 
 176         @Override
 177         public void accept(T value) {
 178             if (!hasValue) {
 179                 hasValue = true;
 180                 this.value = value;
 181             }
 182         }
 183 
 184         @Override
 185         public boolean cancellationRequested() {
 186             return hasValue;
 187         }
 188 
 189         /** Specialization of {@code FindSink} for reference streams */
 190         static final class OfRef<T> extends FindSink<T, Optional<T>> {
 191             @Override
 192             public Optional<T> get() {
 193                 return hasValue ? Optional.of(value) : null;
 194             }
 195         }
 196 
 197         /** Specialization of {@code FindSink} for int streams */
 198         static final class OfInt extends FindSink<Integer, OptionalInt>
 199                 implements Sink.OfInt {
 200             @Override
 201             public void accept(int value) {
 202                 // Boxing is OK here, since few values will actually flow into the sink
 203                 accept((Integer) value);
 204             }
 205 
 206             @Override
 207             public OptionalInt get() {
 208                 return hasValue ? OptionalInt.of(value) : null;
 209             }
 210         }
 211 
 212         /** Specialization of {@code FindSink} for long streams */
 213         static final class OfLong extends FindSink<Long, OptionalLong>
 214                 implements Sink.OfLong {
 215             @Override
 216             public void accept(long value) {
 217                 // Boxing is OK here, since few values will actually flow into the sink
 218                 accept((Long) value);
 219             }
 220 
 221             @Override
 222             public OptionalLong get() {
 223                 return hasValue ? OptionalLong.of(value) : null;
 224             }
 225         }
 226 
 227         /** Specialization of {@code FindSink} for double streams */
 228         static final class OfDouble extends FindSink<Double, OptionalDouble>
 229                 implements Sink.OfDouble {
 230             @Override
 231             public void accept(double value) {
 232                 // Boxing is OK here, since few values will actually flow into the sink
 233                 accept((Double) value);
 234             }
 235 
 236             @Override
 237             public OptionalDouble get() {
 238                 return hasValue ? OptionalDouble.of(value) : null;
 239             }
 240         }
 241     }
 242 
 243     /**
 244      * {@code ForkJoinTask} implementing parallel short-circuiting search
 245      * @param <P_IN> Input element type to the stream pipeline
 246      * @param <P_OUT> Output element type from the stream pipeline
 247      * @param <O> Result type from the find operation
 248      */
 249     @SuppressWarnings("serial")
 250     private static final class FindTask<P_IN, P_OUT, O>
 251             extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> {
 252         private final FindOp<P_OUT, O> op;
 253 
 254         FindTask(FindOp<P_OUT, O> op,
 255                  PipelineHelper<P_OUT> helper,
 256                  Spliterator<P_IN> spliterator) {
 257             super(helper, spliterator);
 258             this.op = op;
 259         }
 260 
 261         FindTask(FindTask<P_IN, P_OUT, O> parent, Spliterator<P_IN> spliterator) {
 262             super(parent, spliterator);
 263             this.op = parent.op;
 264         }
 265 
 266         @Override
 267         protected FindTask<P_IN, P_OUT, O> makeChild(Spliterator<P_IN> spliterator) {
 268             return new FindTask<>(this, spliterator);
 269         }
 270 
 271         @Override
 272         protected O getEmptyResult() {
 273             return op.emptyValue;
 274         }
 275 
 276         private void foundResult(O answer) {
 277             if (isLeftmostNode())
 278                 shortCircuit(answer);
 279             else
 280                 cancelLaterNodes();
 281         }
 282 
 283         @Override
 284         protected O doLeaf() {
 285             O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get();
 286             if (!op.mustFindFirst) {
 287                 if (result != null)
 288                     shortCircuit(result);
 289                 return null;
 290             }
 291             else {
 292                 if (result != null) {
 293                     foundResult(result);
 294                     return result;
 295                 }
 296                 else
 297                     return null;
 298             }
 299         }
 300 
 301         @Override
 302         public void onCompletion(CountedCompleter<?> caller) {
 303             if (op.mustFindFirst) {
 304                     for (FindTask<P_IN, P_OUT, O> child = leftChild, p = null; child != p;
 305                          p = child, child = rightChild) {
 306                     O result = child.getLocalResult();
 307                     if (result != null && op.presentPredicate.test(result)) {
 308                         setLocalResult(result);
 309                         foundResult(result);
 310                         break;
 311                     }
 312                 }
 313             }
 314             super.onCompletion(caller);
 315         }
 316     }
 317 }
 318