1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package java.util.stream; 24 25 import java.util.Iterator; 26 import java.util.Spliterator; 27 import java.util.function.Consumer; 28 import java.util.function.Function; 29 30 /** 31 * Test scenarios for reference streams. 32 * 33 * Each scenario is provided with a data source, a function that maps a fresh 34 * stream (as provided by the data source) to a new stream, and a sink to 35 * receive results. Each scenario describes a different way of computing the 36 * stream contents. The test driver will ensure that all scenarios produce 37 * the same output (modulo allowable differences in ordering). 38 */ 39 @SuppressWarnings({"rawtypes", "unchecked"}) 40 public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario { 41 42 STREAM_FOR_EACH(false) { 43 <T, U, S_IN extends BaseStream<T, S_IN>> 44 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 45 Stream<U> s = m.apply(data.stream()); 46 if (s.isParallel()) { 47 s = s.sequential(); 48 } 49 s.forEach(b); 50 } 51 }, 52 53 // Collec to list 54 STREAM_COLLECT(false) { 55 <T, U, S_IN extends BaseStream<T, S_IN>> 56 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 57 for (U t : m.apply(data.stream()).collect(Collectors.toList())) { 58 b.accept(t); 59 } 60 } 61 }, 62 63 // To array 64 STREAM_TO_ARRAY(false) { 65 <T, U, S_IN extends BaseStream<T, S_IN>> 66 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 67 for (Object t : m.apply(data.stream()).toArray()) { 68 b.accept((U) t); 69 } 70 } 71 }, 72 73 // Wrap as stream, and iterate in pull mode 74 STREAM_ITERATOR(false) { 75 <T, U, S_IN extends BaseStream<T, S_IN>> 76 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 77 for (Iterator<U> seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); ) 78 b.accept(seqIter.next()); 79 } 80 }, 81 82 // Wrap as stream, and spliterate then iterate in pull mode 83 STREAM_SPLITERATOR(false) { 84 <T, U, S_IN extends BaseStream<T, S_IN>> 85 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 86 for (Spliterator<U> spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) { } 87 } 88 }, 89 90 // Wrap as stream, spliterate, then split a few times mixing advances with forEach 91 STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) { 92 <T, U, S_IN extends BaseStream<T, S_IN>> 93 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 94 SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator()); 95 } 96 }, 97 98 // Wrap as stream, and spliterate then iterate in pull mode 99 STREAM_SPLITERATOR_FOREACH(false) { 100 <T, U, S_IN extends BaseStream<T, S_IN>> 101 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 102 m.apply(data.stream()).spliterator().forEachRemaining(b); 103 } 104 }, 105 106 // Wrap as parallel stream + sequential 107 PAR_STREAM_SEQUENTIAL_FOR_EACH(true) { 108 <T, U, S_IN extends BaseStream<T, S_IN>> 109 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 110 m.apply(data.parallelStream()).sequential().forEach(b); 111 } 112 }, 113 114 // Wrap as parallel stream + forEachOrdered 115 PAR_STREAM_FOR_EACH_ORDERED(true) { 116 <T, U, S_IN extends BaseStream<T, S_IN>> 117 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 118 // @@@ Want to explicitly select ordered equalator 119 m.apply(data.parallelStream()).forEachOrdered(b); 120 } 121 }, 122 123 // Wrap as stream, and spliterate then iterate sequentially 124 PAR_STREAM_SPLITERATOR(true) { 125 <T, U, S_IN extends BaseStream<T, S_IN>> 126 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 127 for (Spliterator<U> spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) { } 128 } 129 }, 130 131 // Wrap as stream, and spliterate then iterate sequentially 132 PAR_STREAM_SPLITERATOR_FOREACH(true) { 133 <T, U, S_IN extends BaseStream<T, S_IN>> 134 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 135 m.apply(data.parallelStream()).spliterator().forEachRemaining(b); 136 } 137 }, 138 139 // Wrap as parallel stream + toArray 140 PAR_STREAM_TO_ARRAY(true) { 141 <T, U, S_IN extends BaseStream<T, S_IN>> 142 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 143 for (Object t : m.apply(data.parallelStream()).toArray()) 144 b.accept((U) t); 145 } 146 }, 147 148 // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray 149 PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) { 150 <T, U, S_IN extends BaseStream<T, S_IN>> 151 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 152 Stream<U> s = m.apply(data.parallelStream()); 153 Spliterator<U> sp = s.spliterator(); 154 Stream<U> ss = StreamSupport.stream(() -> sp, 155 StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s)) 156 | (sp.getExactSizeIfKnown() < 0 ? 0 : Spliterator.SIZED), true); 157 for (Object t : ss.toArray()) 158 b.accept((U) t); 159 } 160 }, 161 162 // Wrap as parallel stream + toArray and clear SIZED flag 163 PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) { 164 <T, U, S_IN extends BaseStream<T, S_IN>> 165 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 166 S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(), 167 new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape())); 168 Stream<U> pipe2 = m.apply(pipe1); 169 170 for (Object t : pipe2.toArray()) 171 b.accept((U) t); 172 } 173 }, 174 175 // Wrap as parallel + collect 176 PAR_STREAM_COLLECT(true) { 177 <T, U, S_IN extends BaseStream<T, S_IN>> 178 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 179 for (U u : m.apply(data.parallelStream()).collect(Collectors.toList())) 180 b.accept(u); 181 } 182 }, 183 184 // Wrap sequential as parallel, + collect 185 STREAM_TO_PAR_STREAM_COLLECT(true) { 186 <T, U, S_IN extends BaseStream<T, S_IN>> 187 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 188 for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList())) 189 b.accept(u); 190 } 191 }, 192 193 // Wrap parallel as sequential,, + collect 194 PAR_STREAM_TO_STREAM_COLLECT(true) { 195 <T, U, S_IN extends BaseStream<T, S_IN>> 196 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) { 197 for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList())) 198 b.accept(u); 199 } 200 }, 201 ; 202 203 private boolean isParallel; 204 205 StreamTestScenario(boolean isParallel) { 206 this.isParallel = isParallel; 207 } 208 209 public StreamShape getShape() { 210 return StreamShape.REFERENCE; 211 } 212 213 public boolean isParallel() { 214 return isParallel; 215 } 216 217 public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> 218 void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) { 219 _run(data, b, (Function<S_IN, Stream<U>>) m); 220 } 221 222 abstract <T, U, S_IN extends BaseStream<T, S_IN>> 223 void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m); 224 225 }