1 /*
   2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.util.stream;
  26 
  27 import java.util.Optional;
  28 import java.util.OptionalDouble;
  29 import java.util.OptionalInt;
  30 import java.util.OptionalLong;
  31 import java.util.Spliterator;
  32 import java.util.concurrent.CountedCompleter;
  33 import java.util.function.Predicate;
  34 import java.util.function.Supplier;
  35 
  36 /**
  37  * Factory for instances of a short-circuiting {@code TerminalOp} that searches
  38  * for an element in a stream pipeline, and terminates when it finds one.
  39  * Supported variants include find-first (find the first element in the
  40  * encounter order) and find-any (find any element, may not be the first in
  41  * encounter order.)
  42  *
  43  * @since 1.8
  44  */
  45 final class FindOps {
  46 
  47     private FindOps() { }
  48 
  49     /**
  50      * Constructs a {@code TerminalOp} for streams of objects.
  51      *
  52      * @param <T> the type of elements of the stream
  53      * @param mustFindFirst whether the {@code TerminalOp} must produce the
  54      *        first element in the encounter order
  55      * @return a {@code TerminalOp} implementing the find operation
  56      */
  57     public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {
  58         return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(),
  59                             Optional::isPresent, FindSink.OfRef::new);
  60     }
  61 
  62     /**
  63      * Constructs a {@code TerminalOp} for streams of ints.
  64      *
  65      * @param mustFindFirst whether the {@code TerminalOp} must produce the
  66      *        first element in the encounter order
  67      * @return a {@code TerminalOp} implementing the find operation
  68      */
  69     public static TerminalOp<Integer, OptionalInt> makeInt(boolean mustFindFirst) {
  70         return new FindOp<>(mustFindFirst, StreamShape.INT_VALUE, OptionalInt.empty(),
  71                             OptionalInt::isPresent, FindSink.OfInt::new);
  72     }
  73 
  74     /**
  75      * Constructs a {@code TerminalOp} for streams of longs.
  76      *
  77      * @param mustFindFirst whether the {@code TerminalOp} must produce the
  78      *        first element in the encounter order
  79      * @return a {@code TerminalOp} implementing the find operation
  80      */
  81     public static TerminalOp<Long, OptionalLong> makeLong(boolean mustFindFirst) {
  82         return new FindOp<>(mustFindFirst, StreamShape.LONG_VALUE, OptionalLong.empty(),
  83                             OptionalLong::isPresent, FindSink.OfLong::new);
  84     }
  85 
  86     /**
  87      * Constructs a {@code FindOp} for streams of doubles.
  88      *
  89      * @param mustFindFirst whether the {@code TerminalOp} must produce the
  90      *        first element in the encounter order
  91      * @return a {@code TerminalOp} implementing the find operation
  92      */
  93     public static TerminalOp<Double, OptionalDouble> makeDouble(boolean mustFindFirst) {
  94         return new FindOp<>(mustFindFirst, StreamShape.DOUBLE_VALUE, OptionalDouble.empty(),
  95                             OptionalDouble::isPresent, FindSink.OfDouble::new);
  96     }
  97 
  98     /**
  99      * A short-circuiting {@code TerminalOp} that searches for an element in a
 100      * stream pipeline, and terminates when it finds one.  Implements both
 101      * find-first (find the first element in the encounter order) and find-any
 102      * (find any element, may not be the first in encounter order.)
 103      *
 104      * @param <T> the output type of the stream pipeline
 105      * @param <O> the result type of the find operation, typically an optional
 106      *        type
 107      */
 108     private static final class FindOp<T, O> implements TerminalOp<T, O> {
 109         private final StreamShape shape;
 110         final int opFlags;
 111         final O emptyValue;
 112         final Predicate<O> presentPredicate;
 113         final Supplier<TerminalSink<T, O>> sinkSupplier;
 114 
 115         /**
 116          * Constructs a {@code FindOp}.
 117          *
 118          * @param mustFindFirst if true, must find the first element in
 119          *        encounter order, otherwise can find any element
 120          * @param shape stream shape of elements to search
 121          * @param emptyValue result value corresponding to "found nothing"
 122          * @param presentPredicate {@code Predicate} on result value
 123          *        corresponding to "found something"
 124          * @param sinkSupplier supplier for a {@code TerminalSink} implementing
 125          *        the matching functionality
 126          */
 127         FindOp(boolean mustFindFirst,
 128                        StreamShape shape,
 129                        O emptyValue,
 130                        Predicate<O> presentPredicate,
 131                        Supplier<TerminalSink<T, O>> sinkSupplier) {
 132             this.opFlags = StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
 133             this.shape = shape;
 134             this.emptyValue = emptyValue;
 135             this.presentPredicate = presentPredicate;
 136             this.sinkSupplier = sinkSupplier;
 137         }
 138 
 139         @Override
 140         public int getOpFlags() {
 141             return opFlags;
 142         }
 143 
 144         @Override
 145         public StreamShape inputShape() {
 146             return shape;
 147         }
 148 
 149         @Override
 150         public <S> O evaluateSequential(PipelineHelper<T> helper,
 151                                         Spliterator<S> spliterator) {
 152             O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
 153             return result != null ? result : emptyValue;
 154         }
 155 
 156         @Override
 157         public <P_IN> O evaluateParallel(PipelineHelper<T> helper,
 158                                          Spliterator<P_IN> spliterator) {
 159             // This takes into account the upstream ops flags and the terminal
 160             // op flags and therefore takes into account findFirst or findAny
 161             boolean mustFindFirst = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags());
 162             return new FindTask<>(this, mustFindFirst, helper, spliterator).invoke();
 163         }
 164     }
 165 
 166     /**
 167      * Implementation of @{code TerminalSink} that implements the find
 168      * functionality, requesting cancellation when something has been found
 169      *
 170      * @param <T> The type of input element
 171      * @param <O> The result type, typically an optional type
 172      */
 173     private abstract static class FindSink<T, O> implements TerminalSink<T, O> {
 174         boolean hasValue;
 175         T value;
 176 
 177         FindSink() {} // Avoid creation of special accessor
 178 
 179         @Override
 180         public void accept(T value) {
 181             if (!hasValue) {
 182                 hasValue = true;
 183                 this.value = value;
 184             }
 185         }
 186 
 187         @Override
 188         public boolean cancellationRequested() {
 189             return hasValue;
 190         }
 191 
 192         /** Specialization of {@code FindSink} for reference streams */
 193         static final class OfRef<T> extends FindSink<T, Optional<T>> {
 194             @Override
 195             public Optional<T> get() {
 196                 return hasValue ? Optional.of(value) : null;
 197             }
 198         }
 199 
 200         /** Specialization of {@code FindSink} for int streams */
 201         static final class OfInt extends FindSink<Integer, OptionalInt>
 202                 implements Sink.OfInt {
 203             @Override
 204             public void accept(int value) {
 205                 // Boxing is OK here, since few values will actually flow into the sink
 206                 accept((Integer) value);
 207             }
 208 
 209             @Override
 210             public OptionalInt get() {
 211                 return hasValue ? OptionalInt.of(value) : null;
 212             }
 213         }
 214 
 215         /** Specialization of {@code FindSink} for long streams */
 216         static final class OfLong extends FindSink<Long, OptionalLong>
 217                 implements Sink.OfLong {
 218             @Override
 219             public void accept(long value) {
 220                 // Boxing is OK here, since few values will actually flow into the sink
 221                 accept((Long) value);
 222             }
 223 
 224             @Override
 225             public OptionalLong get() {
 226                 return hasValue ? OptionalLong.of(value) : null;
 227             }
 228         }
 229 
 230         /** Specialization of {@code FindSink} for double streams */
 231         static final class OfDouble extends FindSink<Double, OptionalDouble>
 232                 implements Sink.OfDouble {
 233             @Override
 234             public void accept(double value) {
 235                 // Boxing is OK here, since few values will actually flow into the sink
 236                 accept((Double) value);
 237             }
 238 
 239             @Override
 240             public OptionalDouble get() {
 241                 return hasValue ? OptionalDouble.of(value) : null;
 242             }
 243         }
 244     }
 245 
 246     /**
 247      * {@code ForkJoinTask} implementing parallel short-circuiting search
 248      * @param <P_IN> Input element type to the stream pipeline
 249      * @param <P_OUT> Output element type from the stream pipeline
 250      * @param <O> Result type from the find operation
 251      */
 252     @SuppressWarnings("serial")
 253     private static final class FindTask<P_IN, P_OUT, O>
 254             extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> {
 255         private final FindOp<P_OUT, O> op;
 256         private final boolean mustFindFirst;
 257 
 258         FindTask(FindOp<P_OUT, O> op,
 259                  boolean mustFindFirst,
 260                  PipelineHelper<P_OUT> helper,
 261                  Spliterator<P_IN> spliterator) {
 262             super(helper, spliterator);
 263             this.mustFindFirst = mustFindFirst;
 264             this.op = op;
 265         }
 266 
 267         FindTask(FindTask<P_IN, P_OUT, O> parent, Spliterator<P_IN> spliterator) {
 268             super(parent, spliterator);
 269             this.mustFindFirst = parent.mustFindFirst;
 270             this.op = parent.op;
 271         }
 272 
 273         @Override
 274         protected FindTask<P_IN, P_OUT, O> makeChild(Spliterator<P_IN> spliterator) {
 275             return new FindTask<>(this, spliterator);
 276         }
 277 
 278         @Override
 279         protected O getEmptyResult() {
 280             return op.emptyValue;
 281         }
 282 
 283         private void foundResult(O answer) {
 284             if (isLeftmostNode())
 285                 shortCircuit(answer);
 286             else
 287                 cancelLaterNodes();
 288         }
 289 
 290         @Override
 291         protected O doLeaf() {
 292             O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get();
 293             if (!this.mustFindFirst) {
 294                 if (result != null)
 295                     shortCircuit(result);
 296                 return null;
 297             }
 298             else {
 299                 if (result != null) {
 300                     foundResult(result);
 301                     return result;
 302                 }
 303                 else
 304                     return null;
 305             }
 306         }
 307 
 308         @Override
 309         public void onCompletion(CountedCompleter<?> caller) {
 310             if (this.mustFindFirst) {
 311                     for (FindTask<P_IN, P_OUT, O> child = leftChild, p = null; child != p;
 312                          p = child, child = rightChild) {
 313                     O result = child.getLocalResult();
 314                     if (result != null && op.presentPredicate.test(result)) {
 315                         setLocalResult(result);
 316                         foundResult(result);
 317                         break;
 318                     }
 319                 }
 320             }
 321             super.onCompletion(caller);
 322         }
 323     }
 324 }
 325