1 /* 2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package java.util.stream; 24 25 import java.util.Collections; 26 import java.util.EnumSet; 27 import java.util.Iterator; 28 import java.util.Set; 29 import java.util.Spliterator; 30 import java.util.function.Consumer; 31 import java.util.function.Function; 32 33 /** 34 * Test scenarios for reference streams. 35 * 36 * Each scenario is provided with a data source, a function that maps a fresh 37 * stream (as provided by the data source) to a new stream, and a sink to 38 * receive results. Each scenario describes a different way of computing the 39 * stream contents. The test driver will ensure that all scenarios produce 40 * the same output (modulo allowable differences in ordering). 41 */ 42 @SuppressWarnings({"rawtypes", "unchecked"}) 43 public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario { 44 45 STREAM_FOR_EACH(false) { 46 <T, U, S_IN extends BaseStream<T, S_IN>> 47 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 48 Stream<U> s = m.apply(source); 49 if (s.isParallel()) { 50 s = s.sequential(); 51 } 52 s.forEach(b); 53 } 54 }, 55 56 // Collec to list 57 STREAM_COLLECT(false) { 58 <T, U, S_IN extends BaseStream<T, S_IN>> 59 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 60 for (U t : m.apply(source).collect(Collectors.toList())) { 61 b.accept(t); 62 } 63 } 64 }, 65 66 // To array 67 STREAM_TO_ARRAY(false) { 68 <T, U, S_IN extends BaseStream<T, S_IN>> 69 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 70 for (Object t : m.apply(source).toArray()) { 71 b.accept((U) t); 72 } 73 } 74 }, 75 76 // Wrap as stream, and iterate in pull mode 77 STREAM_ITERATOR(false) { 78 <T, U, S_IN extends BaseStream<T, S_IN>> 79 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 80 for (Iterator<U> seqIter = m.apply(source).iterator(); seqIter.hasNext(); ) 81 b.accept(seqIter.next()); 82 } 83 }, 84 85 // Wrap as stream, and spliterate then iterate in pull mode 86 STREAM_SPLITERATOR(false) { 87 <T, U, S_IN extends BaseStream<T, S_IN>> 88 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 89 for (Spliterator<U> spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) { 90 } 91 } 92 }, 93 94 // Wrap as stream, spliterate, then split a few times mixing advances with forEach 95 STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) { 96 <T, U, S_IN extends BaseStream<T, S_IN>> 97 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 98 SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(source).spliterator()); 99 } 100 }, 101 102 // Wrap as stream, and spliterate then iterate in pull mode 103 STREAM_SPLITERATOR_FOREACH(false) { 104 <T, U, S_IN extends BaseStream<T, S_IN>> 105 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 106 m.apply(source).spliterator().forEachRemaining(b); 107 } 108 }, 109 110 // Wrap as parallel stream + sequential 111 PAR_STREAM_SEQUENTIAL_FOR_EACH(true) { 112 <T, U, S_IN extends BaseStream<T, S_IN>> 113 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 114 m.apply(source).sequential().forEach(b); 115 } 116 }, 117 118 // Wrap as parallel stream + forEachOrdered 119 PAR_STREAM_FOR_EACH_ORDERED(true) { 120 <T, U, S_IN extends BaseStream<T, S_IN>> 121 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 122 // @@@ Want to explicitly select ordered equalator 123 m.apply(source).forEachOrdered(b); 124 } 125 }, 126 127 // Wrap as stream, and spliterate then iterate sequentially 128 PAR_STREAM_SPLITERATOR(true) { 129 <T, U, S_IN extends BaseStream<T, S_IN>> 130 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 131 for (Spliterator<U> spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) { 132 } 133 } 134 }, 135 136 // Wrap as stream, and spliterate then iterate sequentially 137 PAR_STREAM_SPLITERATOR_FOREACH(true) { 138 <T, U, S_IN extends BaseStream<T, S_IN>> 139 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 140 m.apply(source).spliterator().forEachRemaining(b); 141 } 142 }, 143 144 // Wrap as parallel stream + toArray 145 PAR_STREAM_TO_ARRAY(true) { 146 <T, U, S_IN extends BaseStream<T, S_IN>> 147 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 148 for (Object t : m.apply(source).toArray()) 149 b.accept((U) t); 150 } 151 }, 152 153 // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray 154 PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) { 155 <T, U, S_IN extends BaseStream<T, S_IN>> 156 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 157 Stream<U> s = m.apply(source); 158 Spliterator<U> sp = s.spliterator(); 159 Stream<U> ss = StreamSupport.stream(() -> sp, 160 StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s)) 161 | (sp.getExactSizeIfKnown() < 0 ? 0 : Spliterator.SIZED), true); 162 for (Object t : ss.toArray()) 163 b.accept((U) t); 164 } 165 }, 166 167 // Wrap as parallel stream + toArray and clear SIZED flag 168 PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) { 169 <T, U, S_IN extends BaseStream<T, S_IN>> 170 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 171 S_IN pipe1 = (S_IN) OpTestCase.chain(source, 172 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); 173 Stream<U> pipe2 = m.apply(pipe1); 174 175 for (Object t : pipe2.toArray()) 176 b.accept((U) t); 177 } 178 }, 179 180 // Wrap as parallel + collect to list 181 PAR_STREAM_COLLECT_TO_LIST(true) { 182 <T, U, S_IN extends BaseStream<T, S_IN>> 183 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 184 for (U u : m.apply(source).collect(Collectors.toList())) 185 b.accept(u); 186 } 187 }, 188 189 // Wrap sequential as parallel, + collect to list 190 STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) { 191 public <T, S_IN extends BaseStream<T, S_IN>> 192 S_IN getStream(TestData<T, S_IN> data) { 193 return data.stream().parallel(); 194 } 195 196 <T, U, S_IN extends BaseStream<T, S_IN>> 197 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 198 for (U u : m.apply(source).collect(Collectors.toList())) 199 b.accept(u); 200 } 201 }, 202 203 // Wrap parallel as sequential,, + collect 204 PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) { 205 <T, U, S_IN extends BaseStream<T, S_IN>> 206 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 207 for (U u : m.apply(source).collect(Collectors.toList())) 208 b.accept(u); 209 } 210 }, 211 212 // Wrap as parallel stream + forEach synchronizing 213 PAR_STREAM_FOR_EACH(true, false) { 214 <T, U, S_IN extends BaseStream<T, S_IN>> 215 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 216 m.apply(source).forEach(e -> { 217 synchronized (data) { 218 b.accept(e); 219 } 220 }); 221 } 222 }, 223 224 // Wrap as parallel stream + forEach synchronizing and clear SIZED flag 225 PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) { 226 <T, U, S_IN extends BaseStream<T, S_IN>> 227 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) { 228 S_IN pipe1 = (S_IN) OpTestCase.chain(source, 229 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); 230 m.apply(pipe1).forEach(e -> { 231 synchronized (data) { 232 b.accept(e); 233 } 234 }); 235 } 236 }, 237 ; 238 239 // The set of scenarios that clean the SIZED flag 240 public static final Set<StreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet( 241 EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED)); 242 243 private final boolean isParallel; 244 245 private final boolean isOrdered; 246 247 StreamTestScenario(boolean isParallel) { 248 this(isParallel, true); 249 } 250 251 StreamTestScenario(boolean isParallel, boolean isOrdered) { 252 this.isParallel = isParallel; 253 this.isOrdered = isOrdered; 254 } 255 256 public StreamShape getShape() { 257 return StreamShape.REFERENCE; 258 } 259 260 public boolean isParallel() { 261 return isParallel; 262 } 263 264 public boolean isOrdered() { 265 return isOrdered; 266 } 267 268 public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> 269 void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) { 270 try (S_IN source = getStream(data)) { 271 run(data, source, b, (Function<S_IN, Stream<U>>) m); 272 } 273 } 274 275 abstract <T, U, S_IN extends BaseStream<T, S_IN>> 276 void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m); 277 278 }