1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Optional; 28 import java.util.OptionalDouble; 29 import java.util.OptionalInt; 30 import java.util.OptionalLong; 31 import java.util.Spliterator; 32 import java.util.concurrent.CountedCompleter; 33 import java.util.function.Predicate; 34 import java.util.function.Supplier; 35 36 /** 37 * Factory for instances of a short-circuiting {@code TerminalOp} that searches 38 * for an element in a stream pipeline, and terminates when it finds one. 39 * Supported variants include find-first (find the first element in the 40 * encounter order) and find-any (find any element, may not be the first in 41 * encounter order.) 42 * 43 * @since 1.8 44 */ 45 final class FindOps { 46 47 private FindOps() { } 48 49 /** 50 * Constructs a {@code TerminalOp} for streams of objects. 51 * 52 * @param <T> the type of elements of the stream 53 * @param mustFindFirst whether the {@code TerminalOp} must produce the 54 * first element in the encounter order 55 * @return a {@code TerminalOp} implementing the find operation 56 */ 57 public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) { 58 return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(), 59 Optional::isPresent, FindSink.OfRef::new); 60 } 61 62 /** 63 * Constructs a {@code TerminalOp} for streams of ints. 64 * 65 * @param mustFindFirst whether the {@code TerminalOp} must produce the 66 * first element in the encounter order 67 * @return a {@code TerminalOp} implementing the find operation 68 */ 69 public static TerminalOp<Integer, OptionalInt> makeInt(boolean mustFindFirst) { 70 return new FindOp<>(mustFindFirst, StreamShape.INT_VALUE, OptionalInt.empty(), 71 OptionalInt::isPresent, FindSink.OfInt::new); 72 } 73 74 /** 75 * Constructs a {@code TerminalOp} for streams of longs. 76 * 77 * @param mustFindFirst whether the {@code TerminalOp} must produce the 78 * first element in the encounter order 79 * @return a {@code TerminalOp} implementing the find operation 80 */ 81 public static TerminalOp<Long, OptionalLong> makeLong(boolean mustFindFirst) { 82 return new FindOp<>(mustFindFirst, StreamShape.LONG_VALUE, OptionalLong.empty(), 83 OptionalLong::isPresent, FindSink.OfLong::new); 84 } 85 86 /** 87 * Constructs a {@code FindOp} for streams of doubles. 88 * 89 * @param mustFindFirst whether the {@code TerminalOp} must produce the 90 * first element in the encounter order 91 * @return a {@code TerminalOp} implementing the find operation 92 */ 93 public static TerminalOp<Double, OptionalDouble> makeDouble(boolean mustFindFirst) { 94 return new FindOp<>(mustFindFirst, StreamShape.DOUBLE_VALUE, OptionalDouble.empty(), 95 OptionalDouble::isPresent, FindSink.OfDouble::new); 96 } 97 98 /** 99 * A short-circuiting {@code TerminalOp} that searches for an element in a 100 * stream pipeline, and terminates when it finds one. Implements both 101 * find-first (find the first element in the encounter order) and find-any 102 * (find any element, may not be the first in encounter order.) 103 * 104 * @param <T> the output type of the stream pipeline 105 * @param <O> the result type of the find operation, typically an optional 106 * type 107 */ 108 private static final class FindOp<T, O> implements TerminalOp<T, O> { 109 private final StreamShape shape; 110 final int opFlags; 111 final O emptyValue; 112 final Predicate<O> presentPredicate; 113 final Supplier<TerminalSink<T, O>> sinkSupplier; 114 115 /** 116 * Constructs a {@code FindOp}. 117 * 118 * @param mustFindFirst if true, must find the first element in 119 * encounter order, otherwise can find any element 120 * @param shape stream shape of elements to search 121 * @param emptyValue result value corresponding to "found nothing" 122 * @param presentPredicate {@code Predicate} on result value 123 * corresponding to "found something" 124 * @param sinkSupplier supplier for a {@code TerminalSink} implementing 125 * the matching functionality 126 */ 127 FindOp(boolean mustFindFirst, 128 StreamShape shape, 129 O emptyValue, 130 Predicate<O> presentPredicate, 131 Supplier<TerminalSink<T, O>> sinkSupplier) { 132 this.opFlags = StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED); 133 this.shape = shape; 134 this.emptyValue = emptyValue; 135 this.presentPredicate = presentPredicate; 136 this.sinkSupplier = sinkSupplier; 137 } 138 139 @Override 140 public int getOpFlags() { 141 return opFlags; 142 } 143 144 @Override 145 public StreamShape inputShape() { 146 return shape; 147 } 148 149 @Override 150 public <S> O evaluateSequential(PipelineHelper<T> helper, 151 Spliterator<S> spliterator) { 152 O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get(); 153 return result != null ? result : emptyValue; 154 } 155 156 @Override 157 public <P_IN> O evaluateParallel(PipelineHelper<T> helper, 158 Spliterator<P_IN> spliterator) { 159 // This takes into account the upstream ops flags and the terminal 160 // op flags and therefore takes into account findFirst or findAny 161 boolean mustFindFirst = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); 162 return new FindTask<>(this, mustFindFirst, helper, spliterator).invoke(); 163 } 164 } 165 166 /** 167 * Implementation of @{code TerminalSink} that implements the find 168 * functionality, requesting cancellation when something has been found 169 * 170 * @param <T> The type of input element 171 * @param <O> The result type, typically an optional type 172 */ 173 private abstract static class FindSink<T, O> implements TerminalSink<T, O> { 174 boolean hasValue; 175 T value; 176 177 FindSink() {} // Avoid creation of special accessor 178 179 @Override 180 public void accept(T value) { 181 if (!hasValue) { 182 hasValue = true; 183 this.value = value; 184 } 185 } 186 187 @Override 188 public boolean cancellationRequested() { 189 return hasValue; 190 } 191 192 /** Specialization of {@code FindSink} for reference streams */ 193 static final class OfRef<T> extends FindSink<T, Optional<T>> { 194 @Override 195 public Optional<T> get() { 196 return hasValue ? Optional.of(value) : null; 197 } 198 } 199 200 /** Specialization of {@code FindSink} for int streams */ 201 static final class OfInt extends FindSink<Integer, OptionalInt> 202 implements Sink.OfInt { 203 @Override 204 public void accept(int value) { 205 // Boxing is OK here, since few values will actually flow into the sink 206 accept((Integer) value); 207 } 208 209 @Override 210 public OptionalInt get() { 211 return hasValue ? OptionalInt.of(value) : null; 212 } 213 } 214 215 /** Specialization of {@code FindSink} for long streams */ 216 static final class OfLong extends FindSink<Long, OptionalLong> 217 implements Sink.OfLong { 218 @Override 219 public void accept(long value) { 220 // Boxing is OK here, since few values will actually flow into the sink 221 accept((Long) value); 222 } 223 224 @Override 225 public OptionalLong get() { 226 return hasValue ? OptionalLong.of(value) : null; 227 } 228 } 229 230 /** Specialization of {@code FindSink} for double streams */ 231 static final class OfDouble extends FindSink<Double, OptionalDouble> 232 implements Sink.OfDouble { 233 @Override 234 public void accept(double value) { 235 // Boxing is OK here, since few values will actually flow into the sink 236 accept((Double) value); 237 } 238 239 @Override 240 public OptionalDouble get() { 241 return hasValue ? OptionalDouble.of(value) : null; 242 } 243 } 244 } 245 246 /** 247 * {@code ForkJoinTask} implementing parallel short-circuiting search 248 * @param <P_IN> Input element type to the stream pipeline 249 * @param <P_OUT> Output element type from the stream pipeline 250 * @param <O> Result type from the find operation 251 */ 252 @SuppressWarnings("serial") 253 private static final class FindTask<P_IN, P_OUT, O> 254 extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> { 255 private final FindOp<P_OUT, O> op; 256 private final boolean mustFindFirst; 257 258 FindTask(FindOp<P_OUT, O> op, 259 boolean mustFindFirst, 260 PipelineHelper<P_OUT> helper, 261 Spliterator<P_IN> spliterator) { 262 super(helper, spliterator); 263 this.mustFindFirst = mustFindFirst; 264 this.op = op; 265 } 266 267 FindTask(FindTask<P_IN, P_OUT, O> parent, Spliterator<P_IN> spliterator) { 268 super(parent, spliterator); 269 this.mustFindFirst = parent.mustFindFirst; 270 this.op = parent.op; 271 } 272 273 @Override 274 protected FindTask<P_IN, P_OUT, O> makeChild(Spliterator<P_IN> spliterator) { 275 return new FindTask<>(this, spliterator); 276 } 277 278 @Override 279 protected O getEmptyResult() { 280 return op.emptyValue; 281 } 282 283 private void foundResult(O answer) { 284 if (isLeftmostNode()) 285 shortCircuit(answer); 286 else 287 cancelLaterNodes(); 288 } 289 290 @Override 291 protected O doLeaf() { 292 O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get(); 293 if (!this.mustFindFirst) { 294 if (result != null) 295 shortCircuit(result); 296 return null; 297 } 298 else { 299 if (result != null) { 300 foundResult(result); 301 return result; 302 } 303 else 304 return null; 305 } 306 } 307 308 @Override 309 public void onCompletion(CountedCompleter<?> caller) { 310 if (this.mustFindFirst) { 311 for (FindTask<P_IN, P_OUT, O> child = leftChild, p = null; child != p; 312 p = child, child = rightChild) { 313 O result = child.getLocalResult(); 314 if (result != null && op.presentPredicate.test(result)) { 315 setLocalResult(result); 316 foundResult(result); 317 break; 318 } 319 } 320 } 321 super.onCompletion(caller); 322 } 323 } 324 } 325