1 /* 2 * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.*; 28 import java.util.concurrent.CountedCompleter; 29 import java.util.function.Predicate; 30 import java.util.function.Supplier; 31 32 /** 33 * A short-circuiting {@code TerminalOp} that searches for an element in a stream pipeline, and terminates when 34 * it finds one. Implements both find-first (find the first element in the encounter order) and find-any (find 35 * any element, may not be the first in encounter order.) 36 * 37 * @param <T> The output type of the stream pipeline 38 * @param <O> The result type of the find operation, typically an optional type 39 * @since 1.8 40 */ 41 class FindOp<T, O> implements TerminalOp<T, O> { 42 43 /** 44 * Construct a {@code FindOp} for streams of objects 45 * @param mustFindFirst Whether the {@code FindOp} must produce the first element in the encounter order 46 * @param <T> The type of elements of the stream 47 * @return A {@code FindOp} implementing the find operation 48 */ 49 public static<T> FindOp<T, Optional<T>> makeRef(boolean mustFindFirst) { 50 return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(), Optional::isPresent, FindSink.OfRef::new); 51 } 52 53 /** 54 * Construct a {@code FindOp} for streams of ints 55 * @param mustFindFirst Whether the {@code FindOp} must produce the first element in the encounter order 56 * @return A {@code FindOp} implementing the find operation 57 */ 58 public static FindOp<Integer, OptionalInt> makeInt(boolean mustFindFirst) { 59 return new FindOp<>(mustFindFirst, StreamShape.INT_VALUE, OptionalInt.empty(), OptionalInt::isPresent, FindSink.OfInt::new); 60 } 61 62 /** 63 * Construct a {@code FindOp} for streams of longs 64 * @param mustFindFirst Whether the {@code FindOp} must produce the first element in the encounter order 65 * @return A {@code FindOp} implementing the find operation 66 */ 67 public static FindOp<Long, OptionalLong> makeLong(boolean mustFindFirst) { 68 return new FindOp<>(mustFindFirst, StreamShape.LONG_VALUE, OptionalLong.empty(), OptionalLong::isPresent, FindSink.OfLong::new); 69 } 70 71 /** 72 * Construct a {@code FindOp} for streams of doubles 73 * @param mustFindFirst Whether the {@code FindOp} must produce the first element in the encounter order 74 * @return A {@code FindOp} implementing the find operation 75 */ 76 public static FindOp<Double, OptionalDouble> makeDouble(boolean mustFindFirst) { 77 return new FindOp<>(mustFindFirst, StreamShape.DOUBLE_VALUE, OptionalDouble.empty(), OptionalDouble::isPresent, FindSink.OfDouble::new); 78 } 79 80 /** 81 * Implementation of @{code TerminalSink} that implements the find functionality, requesting cancellation 82 * when something has been found 83 * 84 * @param <T> The type of input element 85 * @param <O> The result type, typically an optional type 86 */ 87 static abstract class FindSink<T, O> implements TerminalSink<T, O> { 88 boolean hasValue; 89 T value; 90 91 @Override 92 public void accept(T value) { 93 if (!hasValue) { 94 hasValue = true; 95 this.value = value; 96 } 97 } 98 99 @Override 100 public boolean cancellationRequested() { 101 return hasValue; 102 } 103 104 /** Specialization of {@code FindSink} for reference streams */ 105 static class OfRef<T> extends FindSink<T, Optional<T>> { 106 @Override 107 public Optional<T> getAndClearState() { 108 return hasValue ? Optional.of(value) : null; 109 } 110 } 111 112 /** Specialization of {@code FindSink} for int streams */ 113 static class OfInt extends FindSink<Integer, OptionalInt> implements Sink.OfInt { 114 @Override 115 public void accept(int value) { 116 // Boxing is OK here, since few values will actually flow into the sink 117 accept((Integer) value); 118 } 119 120 @Override 121 public OptionalInt getAndClearState() { 122 return hasValue ? OptionalInt.of(value) : null; 123 } 124 } 125 126 /** Specialization of {@code FindSink} for long streams */ 127 static class OfLong extends FindSink<Long, OptionalLong> implements Sink.OfLong { 128 @Override 129 public void accept(long value) { 130 // Boxing is OK here, since few values will actually flow into the sink 131 accept((Long) value); 132 } 133 134 @Override 135 public OptionalLong getAndClearState() { 136 return hasValue ? OptionalLong.of(value) : null; 137 } 138 } 139 140 /** Specialization of {@code FindSink} for double streams */ 141 static class OfDouble extends FindSink<Double, OptionalDouble> implements Sink.OfDouble { 142 @Override 143 public void accept(double value) { 144 // Boxing is OK here, since few values will actually flow into the sink 145 accept((Double) value); 146 } 147 148 @Override 149 public OptionalDouble getAndClearState() { 150 return hasValue ? OptionalDouble.of(value) : null; 151 } 152 } 153 } 154 155 private final boolean mustFindFirst; 156 private final StreamShape shape; 157 private final O emptyValue; 158 private final Predicate<O> presentPredicate; 159 private final Supplier<TerminalSink<T, O>> sinkSupplier; 160 161 /** 162 * Construct a {@code FindOp} 163 * @param mustFindFirst If true, must find the first element in encounter order, otherwise can find any element 164 * @param shape Stream shape of elements to search 165 * @param emptyValue Result value corresponding to "found nothing" 166 * @param presentPredicate {@code Predicate} on result value corresponding to "found something" 167 * @param sinkSupplier Factory for a {@code TerminalSink} implementing the matching functionality 168 */ 169 private FindOp(boolean mustFindFirst, 170 StreamShape shape, 171 O emptyValue, 172 Predicate<O> presentPredicate, 173 Supplier<TerminalSink<T, O>> sinkSupplier) { 174 this.mustFindFirst = mustFindFirst; 175 this.shape = shape; 176 this.emptyValue = emptyValue; 177 this.presentPredicate = presentPredicate; 178 this.sinkSupplier = sinkSupplier; 179 } 180 181 @Override 182 public int getOpFlags() { 183 return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED); 184 } 185 186 @Override 187 public StreamShape inputShape() { 188 return shape; 189 } 190 191 @Override 192 public <S> O evaluateSequential(PipelineHelper<S, T> helper) { 193 O result = helper.into(sinkSupplier.get(), helper.sourceSpliterator()).getAndClearState(); 194 return result != null ? result : emptyValue; 195 } 196 197 @Override 198 public <P_IN> O evaluateParallel(PipelineHelper<P_IN, T> helper) { 199 return new FindTask<>(helper, this).invoke(); 200 } 201 202 /** 203 * {@code ForkJoinTask} implementing parallel short-circuiting search 204 * @param <S> Input element type to the stream pipeline 205 * @param <T> Output element type from the stream pipeline 206 * @param <O> Result type from the find operation 207 */ 208 private static class FindTask<S, T, O> extends AbstractShortCircuitTask<S, T, O, FindTask<S, T, O>> { 209 private final FindOp<T, O> op; 210 211 private FindTask(PipelineHelper<S, T> helper, FindOp<T, O> op) { 212 super(helper); 213 this.op = op; 214 } 215 216 private FindTask(FindTask<S, T, O> parent, Spliterator<S> spliterator) { 217 super(parent, spliterator); 218 this.op = parent.op; 219 } 220 221 @Override 222 protected FindTask<S, T, O> makeChild(Spliterator<S> spliterator) { 223 return new FindTask<>(this, spliterator); 224 } 225 226 @Override 227 protected O getEmptyResult() { 228 return op.emptyValue; 229 } 230 231 private void foundResult(O answer) { 232 if (isLeftmostNode()) 233 shortCircuit(answer); 234 else 235 cancelLaterNodes(); 236 } 237 238 @Override 239 protected O doLeaf() { 240 O result = helper.into(op.sinkSupplier.get(), spliterator).getAndClearState(); 241 if (!op.mustFindFirst) { 242 if (result != null) 243 shortCircuit(result); 244 return null; 245 } 246 else { 247 if (result != null) { 248 foundResult(result); 249 return result; 250 } 251 else 252 return null; 253 } 254 } 255 256 @Override 257 public void onCompletion(CountedCompleter<?> caller) { 258 if (op.mustFindFirst) { 259 for (FindTask<S, T, O> child = children; child != null; child = child.nextSibling) { 260 O result = child.getLocalResult(); 261 if (result != null && op.presentPredicate.test(result)) { 262 setLocalResult(result); 263 foundResult(result); 264 break; 265 } 266 } 267 } 268 } 269 } 270 }