1 /* 2 * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.*; 28 import java.util.concurrent.CountedCompleter; 29 import java.util.function.Predicate; 30 import java.util.function.Supplier; 31 32 /** 33 * A factory for creating instances of a short-circuiting {@code TerminalOp} 34 * that searches for an element in a stream pipeline, and terminates when it 35 * finds one. The search supports find-first (find the first element in the 36 * encounter order) and find-any (find any element, may not be the first in 37 * encounter order.) 38 * 39 * @since 1.8 40 */ 41 final class FindOps { 42 43 private FindOps() { } 44 45 /** 46 * Constructs a {@code TerminalOp} for streams of objects 47 * 48 * @param mustFindFirst Whether the {@code TerminalOp} must produce the 49 * first element in the encounter order 50 * @param <T> The type of elements of the stream 51 * @return A {@code TerminalOp} implementing the find operation 52 */ 53 public static<T> FindOp<T, Optional<T>> makeRef(boolean mustFindFirst) { 54 return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(), 55 Optional::isPresent, FindSink.OfRef::new); 56 } 57 58 /** 59 * Constructs a {@code TerminalOp} for streams of ints 60 * 61 * @param mustFindFirst Whether the {@code TerminalOp} must produce the 62 * first element in the encounter order 63 * @return A {@code TerminalOp} implementing the find operation 64 */ 65 public static FindOp<Integer, OptionalInt> makeInt(boolean mustFindFirst) { 66 return new FindOp<>(mustFindFirst, StreamShape.INT_VALUE, OptionalInt.empty(), 67 OptionalInt::isPresent, FindSink.OfInt::new); 68 } 69 70 /** 71 * Constructs a {@code TerminalOp} for streams of longs 72 * 73 * @param mustFindFirst Whether the {@code TerminalOp} must produce the 74 * first element in the encounter order 75 * @return A {@code TerminalOp} implementing the find operation 76 */ 77 public static FindOp<Long, OptionalLong> makeLong(boolean mustFindFirst) { 78 return new FindOp<>(mustFindFirst, StreamShape.LONG_VALUE, OptionalLong.empty(), 79 OptionalLong::isPresent, FindSink.OfLong::new); 80 } 81 82 /** 83 * Constructs a {@code TerminalOp} for streams of doubles 84 * 85 * @param mustFindFirst Whether the {@code TerminalOp} must produce the 86 * first element in the encounter order 87 * @return A {@code TerminalOp} implementing the find operation 88 */ 89 public static FindOp<Double, OptionalDouble> makeDouble(boolean mustFindFirst) { 90 return new FindOp<>(mustFindFirst, StreamShape.DOUBLE_VALUE, OptionalDouble.empty(), 91 OptionalDouble::isPresent, FindSink.OfDouble::new); 92 } 93 94 /** 95 * A short-circuiting {@code TerminalOp} that searches for an element in a 96 * stream pipeline, and terminates when it finds one. Implements both 97 * find-first (find the first element in the encounter order) and find-any 98 * (find any element, may not be the first in encounter order.) 99 * 100 * @param <T> The output type of the stream pipeline 101 * @param <O> The result type of the find operation, typically an optional 102 * type 103 */ 104 private static final class FindOp<T, O> implements TerminalOp<T, O> { 105 private final StreamShape shape; 106 final boolean mustFindFirst; 107 final O emptyValue; 108 final Predicate<O> presentPredicate; 109 final Supplier<TerminalSink<T, O>> sinkSupplier; 110 111 /** 112 * Constructs a {@code FindOp} 113 * 114 * @param mustFindFirst If true, must find the first element in 115 * encounter order, otherwise can find any element 116 * @param shape Stream shape of elements to search 117 * @param emptyValue Result value corresponding to "found nothing" 118 * @param presentPredicate {@code Predicate} on result value 119 * corresponding to "found something" 120 * @param sinkSupplier Factory for a {@code TerminalSink} implementing 121 * the matching functionality 122 */ 123 FindOp(boolean mustFindFirst, 124 StreamShape shape, 125 O emptyValue, 126 Predicate<O> presentPredicate, 127 Supplier<TerminalSink<T, O>> sinkSupplier) { 128 this.mustFindFirst = mustFindFirst; 129 this.shape = shape; 130 this.emptyValue = emptyValue; 131 this.presentPredicate = presentPredicate; 132 this.sinkSupplier = sinkSupplier; 133 } 134 135 @Override 136 public int getOpFlags() { 137 return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED); 138 } 139 140 @Override 141 public StreamShape inputShape() { 142 return shape; 143 } 144 145 @Override 146 public <S> O evaluateSequential(PipelineHelper<S, T> helper) { 147 O result = helper.into(sinkSupplier.get(), helper.sourceSpliterator()).get(); 148 return result != null ? result : emptyValue; 149 } 150 151 @Override 152 public <P_IN> O evaluateParallel(PipelineHelper<P_IN, T> helper) { 153 return new FindTask<>(helper, this).invoke(); 154 } 155 } 156 157 /** 158 * Implementation of @{code TerminalSink} that implements the find 159 * functionality, requesting cancellation when something has been found 160 * 161 * @param <T> The type of input element 162 * @param <O> The result type, typically an optional type 163 */ 164 private static abstract class FindSink<T, O> implements TerminalSink<T, O> { 165 boolean hasValue; 166 T value; 167 168 FindSink() {} // Avoid creation of special accessor 169 170 @Override 171 public void accept(T value) { 172 if (!hasValue) { 173 hasValue = true; 174 this.value = value; 175 } 176 } 177 178 @Override 179 public boolean cancellationRequested() { 180 return hasValue; 181 } 182 183 /** Specialization of {@code FindSink} for reference streams */ 184 static final class OfRef<T> extends FindSink<T, Optional<T>> { 185 @Override 186 public Optional<T> get() { 187 return hasValue ? Optional.of(value) : null; 188 } 189 } 190 191 /** Specialization of {@code FindSink} for int streams */ 192 static final class OfInt extends FindSink<Integer, OptionalInt> implements Sink.OfInt { 193 @Override 194 public void accept(int value) { 195 // Boxing is OK here, since few values will actually flow into the sink 196 accept((Integer) value); 197 } 198 199 @Override 200 public OptionalInt get() { 201 return hasValue ? OptionalInt.of(value) : null; 202 } 203 } 204 205 /** Specialization of {@code FindSink} for long streams */ 206 static final class OfLong extends FindSink<Long, OptionalLong> implements Sink.OfLong { 207 @Override 208 public void accept(long value) { 209 // Boxing is OK here, since few values will actually flow into the sink 210 accept((Long) value); 211 } 212 213 @Override 214 public OptionalLong get() { 215 return hasValue ? OptionalLong.of(value) : null; 216 } 217 } 218 219 /** Specialization of {@code FindSink} for double streams */ 220 static final class OfDouble extends FindSink<Double, OptionalDouble> implements Sink.OfDouble { 221 @Override 222 public void accept(double value) { 223 // Boxing is OK here, since few values will actually flow into the sink 224 accept((Double) value); 225 } 226 227 @Override 228 public OptionalDouble get() { 229 return hasValue ? OptionalDouble.of(value) : null; 230 } 231 } 232 } 233 234 /** 235 * {@code ForkJoinTask} implementing parallel short-circuiting search 236 * @param <S> Input element type to the stream pipeline 237 * @param <T> Output element type from the stream pipeline 238 * @param <O> Result type from the find operation 239 */ 240 private static final class FindTask<S, T, O> extends AbstractShortCircuitTask<S, T, O, FindTask<S, T, O>> { 241 private final FindOp<T, O> op; 242 243 FindTask(PipelineHelper<S, T> helper, FindOp<T, O> op) { 244 super(helper); 245 this.op = op; 246 } 247 248 FindTask(FindTask<S, T, O> parent, Spliterator<S> spliterator) { 249 super(parent, spliterator); 250 this.op = parent.op; 251 } 252 253 @Override 254 protected FindTask<S, T, O> makeChild(Spliterator<S> spliterator) { 255 return new FindTask<>(this, spliterator); 256 } 257 258 @Override 259 protected O getEmptyResult() { 260 return op.emptyValue; 261 } 262 263 private void foundResult(O answer) { 264 if (isLeftmostNode()) 265 shortCircuit(answer); 266 else 267 cancelLaterNodes(); 268 } 269 270 @Override 271 protected O doLeaf() { 272 O result = helper.into(op.sinkSupplier.get(), spliterator).get(); 273 if (!op.mustFindFirst) { 274 if (result != null) 275 shortCircuit(result); 276 return null; 277 } 278 else { 279 if (result != null) { 280 foundResult(result); 281 return result; 282 } 283 else 284 return null; 285 } 286 } 287 288 @Override 289 public void onCompletion(CountedCompleter<?> caller) { 290 if (op.mustFindFirst) { 291 for (FindTask<S, T, O> child = children; child != null; child = child.nextSibling) { 292 O result = child.getLocalResult(); 293 if (result != null && op.presentPredicate.test(result)) { 294 setLocalResult(result); 295 foundResult(result); 296 break; 297 } 298 } 299 } 300 super.onCompletion(caller); 301 } 302 } 303 } 304